From 90c60f8e646a9b822c95efca56ee83db1cacb71c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lubom=C3=ADr=20Sedl=C3=A1=C5=99?= Date: Mon, 10 Sep 2018 10:16:44 +0200 Subject: [PATCH] Add script to orchestrate multiple composes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit It may make sense to break a big compose into smaller chunks that can be done independently. This script allows describing the smaller parts, runs them with correct dependencies and arranges the result to look like a single big compose. All parts use the same koji event, that is either obtained from Koji, or from command line argument. JIRA: COMPOSE-2654 Signed-off-by: Lubomír Sedlář --- Makefile | 3 + bin/pungi-orchestrate | 16 + doc/contributing.rst | 3 +- doc/index.rst | 1 + doc/multi_compose.rst | 60 ++ pungi.spec | 1 + pungi/linker.py | 12 + pungi/phases/pkgset/sources/source_koji.py | 30 +- pungi_utils/orchestrator.py | 546 ++++++++++++ setup.py | 1 + tests/data/client.conf | 14 + tests/data/multi-compose-variants.xml | 47 + tests/data/multi-compose.conf | 19 + tests/data/resilient-storage.conf | 15 + tests/data/server.conf | 16 + .../compose/metadata/composeinfo.json | 25 + .../compose/metadata/images.json | 18 + .../compose/metadata/modules.json | 18 + .../compose/metadata/osbs.json | 4 + .../compose/metadata/rpms.json | 18 + .../compose/metadata/composeinfo.json | 27 + .../compose/metadata/images.json | 20 + .../compose/metadata/modules.json | 20 + .../compose/metadata/osbs.json | 6 + .../compose/metadata/rpms.json | 20 + .../compose/metadata/composeinfo.json | 25 + .../compose/metadata/images.json | 18 + .../compose/metadata/modules.json | 18 + .../basic-metadata/compose/metadata/osbs.json | 4 + .../basic-metadata/compose/metadata/rpms.json | 18 + .../compose/metadata/composeinfo.json | 25 + .../compose/metadata/images.json | 18 + .../compose/metadata/modules.json | 18 + .../compose/metadata/osbs.json | 4 + .../compose/metadata/rpms.json | 18 + .../compose/metadata/composeinfo.json | 23 + tests/test_orchestrator.py | 806 ++++++++++++++++++ 37 files changed, 1943 insertions(+), 12 deletions(-) create mode 100755 bin/pungi-orchestrate create mode 100644 doc/multi_compose.rst create mode 100644 pungi_utils/orchestrator.py create mode 100644 tests/data/client.conf create mode 100644 tests/data/multi-compose-variants.xml create mode 100644 tests/data/multi-compose.conf create mode 100644 tests/data/resilient-storage.conf create mode 100644 tests/data/server.conf create mode 100644 tests/fixtures/DP-1.0-20181001.n.0/compose/metadata/composeinfo.json create mode 100644 tests/fixtures/DP-1.0-20181001.n.0/compose/metadata/images.json create mode 100644 tests/fixtures/DP-1.0-20181001.n.0/compose/metadata/modules.json create mode 100644 tests/fixtures/DP-1.0-20181001.n.0/compose/metadata/osbs.json create mode 100644 tests/fixtures/DP-1.0-20181001.n.0/compose/metadata/rpms.json create mode 100644 tests/fixtures/basic-metadata-merged/compose/metadata/composeinfo.json create mode 100644 tests/fixtures/basic-metadata-merged/compose/metadata/images.json create mode 100644 tests/fixtures/basic-metadata-merged/compose/metadata/modules.json create mode 100644 tests/fixtures/basic-metadata-merged/compose/metadata/osbs.json create mode 100644 tests/fixtures/basic-metadata-merged/compose/metadata/rpms.json create mode 100644 tests/fixtures/basic-metadata/compose/metadata/composeinfo.json create mode 100644 tests/fixtures/basic-metadata/compose/metadata/images.json create mode 100644 tests/fixtures/basic-metadata/compose/metadata/modules.json create mode 100644 tests/fixtures/basic-metadata/compose/metadata/osbs.json create mode 100644 tests/fixtures/basic-metadata/compose/metadata/rpms.json create mode 100644 tests/fixtures/empty-metadata-merged/compose/metadata/composeinfo.json create mode 100644 tests/fixtures/empty-metadata-merged/compose/metadata/images.json create mode 100644 tests/fixtures/empty-metadata-merged/compose/metadata/modules.json create mode 100644 tests/fixtures/empty-metadata-merged/compose/metadata/osbs.json create mode 100644 tests/fixtures/empty-metadata-merged/compose/metadata/rpms.json create mode 100644 tests/fixtures/empty-metadata/compose/metadata/composeinfo.json create mode 100644 tests/test_orchestrator.py diff --git a/Makefile b/Makefile index 55d2aab2..90e8a087 100644 --- a/Makefile +++ b/Makefile @@ -104,5 +104,8 @@ test-data: test-compose: cd tests && ./test_compose.sh +test-multi-compose: + PYTHONPATH=$$(pwd) PATH=$$(pwd)/bin:$$PATH pungi-orchestrate --debug start tests/data/multi-compose.conf + doc: cd doc; make html diff --git a/bin/pungi-orchestrate b/bin/pungi-orchestrate new file mode 100755 index 00000000..0cb4348d --- /dev/null +++ b/bin/pungi-orchestrate @@ -0,0 +1,16 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import os +import sys + +here = sys.path[0] +if here != '/usr/bin': + # Git checkout + sys.path[0] = os.path.dirname(here) + +from pungi_utils import orchestrator + + +if __name__ == '__main__': + orchestrator.main() diff --git a/doc/contributing.rst b/doc/contributing.rst index 3aff0e87..02ae13ae 100644 --- a/doc/contributing.rst +++ b/doc/contributing.rst @@ -46,6 +46,7 @@ For running unit tests, these packages are recommended as well: * python-nose-cov * python-unittest2 * rpmdevtools + * python-parameterized While being difficult, it is possible to work on *Pungi* using *virtualenv*. Install *python-virtualenvwrapper* (after installation you have to add the command @@ -60,7 +61,7 @@ packages above as they are used by calling an executable. :: $ for pkg in _deltarpm krbV _selinux deltarpm sqlitecachec _sqlitecache; do ln -vs "$(deactivate && python -c 'import os, '$pkg'; print('$pkg'.__file__)')" "$(virtualenvwrapper_get_site_packages_dir)"; done $ pip install -U pip $ PYCURL_SSL_LIBRARY=nss pip install pycurl --no-binary :all: - $ pip install beanbag jsonschema 'kobo>=0.6.0' lockfile lxml mock nose nose-cov productmd pyopenssl python-multilib requests requests-kerberos setuptools sphinx ordered_set koji PyYAML dogpile.cache + $ pip install beanbag jsonschema 'kobo>=0.6.0' lockfile lxml mock nose nose-cov productmd pyopenssl python-multilib requests requests-kerberos setuptools sphinx ordered_set koji PyYAML dogpile.cache parameterized Now you should be able to run all existing tests. diff --git a/doc/index.rst b/doc/index.rst index 15d244fc..0081fdea 100644 --- a/doc/index.rst +++ b/doc/index.rst @@ -21,3 +21,4 @@ Contents: comps contributing testing + multi_compose diff --git a/doc/multi_compose.rst b/doc/multi_compose.rst new file mode 100644 index 00000000..8848ad6b --- /dev/null +++ b/doc/multi_compose.rst @@ -0,0 +1,60 @@ +.. _multi_compose: + +Managing compose from multiple parts +==================================== + +There may be cases where it makes sense to split a big compose into separate +parts, but create a compose output that links all output into one familiar +structure. + +The `pungi-orchestrate` tools allows that. + +It works with an INI-style configuration file. The ``[general]`` section +contains information about identity of the main compose. Other sections define +individual parts. + +The parts are scheduled to run in parallel, with the minimal amount of +serialization. The final compose directory will contain hard-links to the +files. + + +General settings +---------------- + +**target** + Path to directory where the final compose should be created. +**compose_type** + Type of compose to make. +**release_name** + Name of the product for the final compose. +**release_short** + Short name of the product for the final compose. +**release_version** + Version of the product for the final compose. +**release_type** + Type of the product for the final compose. +**extra_args** + Additional arguments that wil be passed to the child Pungi processes. +**koji_profile** + If specified, a current event will be retrieved from the Koji instance and + used for all parts. + + +Partial compose settings +------------------------ + +Each part should have a separate section in the config file. + +It can specify these options: + +**config** + Path to configuration file that describes this part. If relative, it is + resolved relative to the file with parts configuration. +**just_phase**, **skip_phase** + Customize which phases should run for this part. +**depends_on** + A comma separated list of other parts that must be finished before this part + starts. +**failable** + A boolean toggle to mark a part as failable. A failure in such part will + mark the final compose as incomplete, but still successful. diff --git a/pungi.spec b/pungi.spec index 79010f10..108a0b1f 100644 --- a/pungi.spec +++ b/pungi.spec @@ -21,6 +21,7 @@ BuildRequires: python2-libcomps BuildRequires: python2-six BuildRequires: python2-multilib BuildRequires: python2-dogpile-cache +BuildRequires: python2-parameterized Requires: yum => 3.4.3-28 Requires: lorax >= 22.1 diff --git a/pungi/linker.py b/pungi/linker.py index d90c4fc0..7a2389f5 100644 --- a/pungi/linker.py +++ b/pungi/linker.py @@ -14,6 +14,7 @@ # along with this program; if not, see . +import contextlib import errno import os import shutil @@ -39,6 +40,17 @@ class LinkerPool(ThreadPool): return pool +@contextlib.contextmanager +def linker_pool(link_type="hardlink-or-copy", num_workers=10): + """Create a linker and make sure it is stopped no matter what.""" + linker = LinkerPool.with_workers(num_workers=num_workers, link_type=link_type) + linker.start() + try: + yield linker + finally: + linker.stop() + + class LinkerThread(WorkerThread): def process(self, item, num): src, dst = item diff --git a/pungi/phases/pkgset/sources/source_koji.py b/pungi/phases/pkgset/sources/source_koji.py index bb793c73..dd1b64fb 100644 --- a/pungi/phases/pkgset/sources/source_koji.py +++ b/pungi/phases/pkgset/sources/source_koji.py @@ -694,22 +694,30 @@ def populate_global_pkgset(compose, koji_wrapper, path_prefix, event): def get_koji_event_info(compose, koji_wrapper): - koji_proxy = koji_wrapper.koji_proxy event_file = os.path.join(compose.paths.work.topdir(arch="global"), "koji-event") - if compose.koji_event: - koji_event = koji_proxy.getEvent(compose.koji_event) - compose.log_info("Setting koji event to a custom value: %s" % compose.koji_event) - json.dump(koji_event, open(event_file, "w")) - return koji_event - msg = "Getting koji event" if compose.DEBUG and os.path.exists(event_file): compose.log_warning("[SKIP ] %s" % msg) result = json.load(open(event_file, "r")) else: - compose.log_info(msg) - result = koji_proxy.getLastEvent() - json.dump(result, open(event_file, "w")) - compose.log_info("Koji event: %s" % result["id"]) + result = get_koji_event_raw(koji_wrapper, compose.koji_event, event_file) + if compose.koji_event: + compose.log_info("Setting koji event to a custom value: %s" % compose.koji_event) + else: + compose.log_info(msg) + compose.log_info("Koji event: %s" % result["id"]) + return result + + +def get_koji_event_raw(koji_wrapper, event_id, event_file): + if event_id: + koji_event = koji_wrapper.koji_proxy.getEvent(event_id) + else: + koji_event = koji_wrapper.koji_proxy.getLastEvent() + + with open(event_file, "w") as f: + json.dump(koji_event, f) + + return koji_event diff --git a/pungi_utils/orchestrator.py b/pungi_utils/orchestrator.py new file mode 100644 index 00000000..e8534847 --- /dev/null +++ b/pungi_utils/orchestrator.py @@ -0,0 +1,546 @@ +# -*- coding: utf-8 -*- + +from __future__ import print_function + +import argparse +import errno +import json +import logging +import os +import re +import shutil +import subprocess +import sys +from collections import namedtuple + +import kobo.conf +import kobo.log +import productmd +from six.moves import configparser, shlex_quote + +from pungi.compose import get_compose_dir +from pungi.linker import linker_pool +from pungi.phases.pkgset.sources.source_koji import get_koji_event_raw +from pungi.util import find_old_compose, parse_koji_event, temp_dir +from pungi.wrappers.kojiwrapper import KojiWrapper + + +Config = namedtuple( + "Config", + [ + # Path to directory with the compose + "target", + "compose_type", + "label", + # Path to the selected old compose that will be reused + "old_compose", + # Path to directory with config file copies + "config_dir", + # Which koji event to use (if any) + "event", + # Additional arguments to pungi-koji executable + "extra_args", + ], +) + +log = logging.getLogger(__name__) + + +class Status(object): + # Ready to start + READY = "READY" + # Waiting for dependencies to finish. + WAITING = "WAITING" + # Part is currently running + STARTED = "STARTED" + # A dependency failed, this one will never start. + BLOCKED = "BLOCKED" + + +class ComposePart(object): + def __init__(self, name, config, just_phase=[], skip_phase=[], dependencies=[]): + self.name = name + self.config = config + self.status = Status.WAITING if dependencies else Status.READY + self.just_phase = just_phase + self.skip_phase = skip_phase + self.blocked_on = set(dependencies) + self.depends_on = set(dependencies) + self.path = None + self.log_file = None + self.failable = False + + def __str__(self): + return self.name + + def __repr__(self): + return ( + "ComposePart({0.name!r}," + " {0.config!r}," + " {0.status!r}," + " just_phase={0.just_phase!r}," + " skip_phase={0.skip_phase!r}," + " dependencies={0.depends_on!r})" + ).format(self) + + def refresh_status(self): + """Refresh status of this part with the result of the compose. This + should only be called once the compose finished. + """ + try: + with open(os.path.join(self.path, "STATUS")) as fh: + self.status = fh.read().strip() + except IOError as exc: + log.error("Failed to update status of %s: %s", self.name, exc) + log.error("Assuming %s is DOOMED", self.name) + self.status = "DOOMED" + + def is_finished(self): + return "FINISHED" in self.status + + def unblock_on(self, finished_part): + """Update set of blockers for this part. If it's empty, mark us as ready.""" + self.blocked_on.discard(finished_part) + if self.status == Status.WAITING and not self.blocked_on: + log.debug("%s is ready to start", self) + self.status = Status.READY + + def setup_start(self, global_config, parts): + substitutions = {name: p.path for name, p in parts.items() if p.is_finished()} + substitutions["configdir"] = global_config.config_dir + + config = kobo.conf.PyConfigParser() + config.load_from_file(self.config) + + for f in config.opened_files: + # apply substitutions + fill_in_config_file(f, substitutions) + + self.status = Status.STARTED + self.path = get_compose_dir( + os.path.join(global_config.target, "parts"), + config, + compose_type=global_config.compose_type, + compose_label=global_config.label, + ) + self.log_file = os.path.join(global_config.target, "logs", "%s.log" % self.name) + log.info("Starting %s in %s", self.name, self.path) + + def get_cmd(self, global_config): + cmd = ["pungi-koji", "--config", self.config, "--compose-dir", self.path] + cmd.append("--%s" % global_config.compose_type) + if global_config.label: + cmd.extend(["--label", global_config.label]) + for phase in self.just_phase: + cmd.extend(["--just-phase", phase]) + for phase in self.skip_phase: + cmd.extend(["--skip-phase", phase]) + if global_config.old_compose: + cmd.extend( + ["--old-compose", os.path.join(global_config.old_compose, "parts")] + ) + if global_config.event: + cmd.extend(["--koji-event", str(global_config.event)]) + if global_config.extra_args: + cmd.extend(global_config.extra_args) + cmd.extend(["--no-latest-link"]) + return cmd + + @classmethod + def from_config(cls, config, section, config_dir): + part = cls( + name=section, + config=os.path.join(config_dir, config.get(section, "config")), + just_phase=_safe_get_list(config, section, "just_phase", []), + skip_phase=_safe_get_list(config, section, "skip_phase", []), + dependencies=_safe_get_list(config, section, "depends_on", []), + ) + if config.has_option(section, "failable"): + part.failable = config.getboolean(section, "failable") + return part + + +def _safe_get_list(config, section, option, default=None): + """Get a value from config parser. The result is split into a list on + commas or spaces, and `default` is returned if the key does not exist. + """ + if config.has_option(section, option): + value = config.get(section, option) + return [x.strip() for x in re.split(r"[, ]+", value) if x] + return default + + +def fill_in_config_file(fp, substs): + """Templating function. It works with Jinja2 style placeholders such as + {{foo}}. Whitespace around the key name is fine. The file is modified in place. + + :param fp string: path to the file to process + :param substs dict: a mapping for values to put into the file + """ + + def repl(match): + try: + return substs[match.group(1)] + except KeyError as exc: + raise RuntimeError( + "Unknown placeholder %s in %s" % (exc, os.path.basename(fp)) + ) + + with open(fp, "r") as f: + contents = re.sub(r"{{ *([a-zA-Z-_]+) *}}", repl, f.read()) + with open(fp, "w") as f: + f.write(contents) + + +def start_part(global_config, parts, part): + part.setup_start(global_config, parts) + fh = open(part.log_file, "w") + cmd = part.get_cmd(global_config) + log.debug("Running command %r", " ".join(shlex_quote(x) for x in cmd)) + return subprocess.Popen(cmd, stdout=fh, stderr=subprocess.STDOUT) + + +def handle_finished(global_config, linker, parts, proc, finished_part): + finished_part.refresh_status() + log.info("%s finished with status %s", finished_part, finished_part.status) + if proc.returncode == 0: + # Success, unblock other parts... + for part in parts.values(): + part.unblock_on(finished_part.name) + # ...and link the results into final destination. + copy_part(global_config, linker, finished_part) + update_metadata(global_config, finished_part) + else: + # Failure, other stuff may be blocked. + log.info("See details in %s", finished_part.log_file) + block_on(parts, finished_part.name) + + +def copy_part(global_config, linker, part): + c = productmd.Compose(part.path) + for variant in c.info.variants: + data_path = os.path.join(part.path, "compose", variant) + link = os.path.join(global_config.target, "compose", variant) + log.info("Hardlinking content %s -> %s", data_path, link) + hardlink_dir(linker, data_path, link) + + +def hardlink_dir(linker, srcdir, dstdir): + for root, dirs, files in os.walk(srcdir): + root = os.path.relpath(root, srcdir) + for f in files: + src = os.path.normpath(os.path.join(srcdir, root, f)) + dst = os.path.normpath(os.path.join(dstdir, root, f)) + linker.queue_put((src, dst)) + + +def update_metadata(global_config, part): + part_metadata_dir = os.path.join(part.path, "compose", "metadata") + final_metadata_dir = os.path.join(global_config.target, "compose", "metadata") + for f in os.listdir(part_metadata_dir): + # Load the metadata + with open(os.path.join(part_metadata_dir, f)) as fh: + part_metadata = json.load(fh) + final_metadata = os.path.join(final_metadata_dir, f) + if os.path.exists(final_metadata): + # We already have this file, will need to merge. + merge_metadata(final_metadata, part_metadata) + else: + # A new file, just copy it. + copy_metadata(global_config, final_metadata, part_metadata) + + +def copy_metadata(global_config, final_metadata, source): + """Copy file to final location, but update compose information.""" + with open( + os.path.join(global_config.target, "compose/metadata/composeinfo.json") + ) as f: + composeinfo = json.load(f) + try: + source["payload"]["compose"].update(composeinfo["payload"]["compose"]) + except KeyError: + # No [payload][compose], probably OSBS metadata + pass + with open(final_metadata, "w") as f: + json.dump(source, f, indent=2, sort_keys=True) + + +def merge_metadata(final_metadata, source): + with open(final_metadata) as f: + metadata = json.load(f) + + try: + key = { + "productmd.composeinfo": "variants", + "productmd.modules": "modules", + "productmd.images": "images", + "productmd.rpms": "rpms", + }[source["header"]["type"]] + # TODO what if multiple parts create images for the same variant + metadata["payload"][key].update(source["payload"][key]) + except KeyError: + # OSBS metadata, merge whole file + metadata.update(source) + with open(final_metadata, "w") as f: + json.dump(metadata, f, indent=2, sort_keys=True) + + +def block_on(parts, name): + """Part ``name`` failed, mark everything depending on it as blocked.""" + for part in parts.values(): + if name in part.blocked_on: + log.warning("%s is blocked now and will not run", part) + part.status = Status.BLOCKED + block_on(parts, part.name) + + +def check_finished_processes(processes): + """Walk through all active processes and check if something finished. + """ + for proc in processes.keys(): + proc.poll() + if proc.returncode is not None: + yield proc, processes[proc] + + +def run_all(global_config, parts): + # Mapping subprocess.Popen -> ComposePart + processes = dict() + remaining = set(p.name for p in parts.values() if not p.is_finished()) + + with linker_pool("hardlink") as linker: + while remaining or processes: + update_status(global_config, parts) + + for proc, part in check_finished_processes(processes): + del processes[proc] + handle_finished(global_config, linker, parts, proc, part) + + # Start new available processes. + for name in list(remaining): + part = parts[name] + # Start all ready parts + if part.status == Status.READY: + remaining.remove(name) + processes[start_part(global_config, parts, part)] = part + # Remove blocked parts from todo list + elif part.status == Status.BLOCKED: + remaining.remove(part.name) + + # Wait for any child process to finish if there is any. + if processes: + pid, reason = os.wait() + for proc in processes.keys(): + # Set the return code for process that we caught by os.wait(). + # Calling poll() on it would not set the return code properly + # since the value was already consumed by os.wait(). + if proc.pid == pid: + proc.returncode = (reason >> 8) & 0xFF + + log.info("Waiting for linking to finish...") + return update_status(global_config, parts) + + +def get_target_dir(config, compose_info, label, reldir=""): + """Find directory where this compose will be. + + @param reldir: if target path in config is relative, it will be resolved + against this directory + """ + dir = os.path.realpath(os.path.join(reldir, config.get("general", "target"))) + target_dir = get_compose_dir( + dir, + compose_info, + compose_type=config.get("general", "compose_type"), + compose_label=label, + ) + return target_dir + + +def setup_logging(debug=False): + FORMAT = "%(asctime)s: %(levelname)s: %(message)s" + level = logging.DEBUG if debug else logging.INFO + kobo.log.add_stderr_logger(log, log_level=level, format=FORMAT) + log.setLevel(level) + + +def compute_status(statuses): + if any(map(lambda x: x[0] in ("STARTED", "WAITING"), statuses)): + # If there is anything still running or waiting to start, the whole is + # still running. + return "STARTED" + elif any(map(lambda x: x[0] in ("DOOMED", "BLOCKED") and not x[1], statuses)): + # If any required part is doomed or blocked, the whole is doomed + return "DOOMED" + elif all(map(lambda x: x[0] == "FINISHED", statuses)): + # If all parts are complete, the whole is complete + return "FINISHED" + else: + return "FINISHED_INCOMPLETE" + + +def update_status(global_config, parts): + log.debug("Updating status metadata") + metadata = {} + statuses = set() + for part in parts.values(): + metadata[part.name] = {"status": part.status, "path": part.path} + statuses.add((part.status, part.failable)) + metadata_path = os.path.join( + global_config.target, "compose", "metadata", "parts.json" + ) + with open(metadata_path, "w") as fh: + json.dump(metadata, fh, indent=2, sort_keys=True, separators=(",", ": ")) + + status = compute_status(statuses) + log.info("Overall status is %s", status) + with open(os.path.join(global_config.target, "STATUS"), "w") as fh: + fh.write(status) + + return status != "DOOMED" + + +def prepare_compose_dir(config, args, main_config_file, compose_info): + if not hasattr(args, "compose_path"): + # Creating a brand new compose + target_dir = get_target_dir( + config, compose_info, args.label, reldir=os.path.dirname(main_config_file) + ) + for dir in ("logs", "parts", "compose/metadata", "work/global"): + try: + os.makedirs(os.path.join(target_dir, dir)) + except OSError as exc: + if exc.errno != errno.EEXIST: + raise + # Copy initial composeinfo for new compose + shutil.copy( + os.path.join(target_dir, "work/global/composeinfo-base.json"), + os.path.join(target_dir, "compose/metadata/composeinfo.json"), + ) + else: + # Restarting a particular compose + target_dir = args.compose_path + + return target_dir + + +def load_parts_metadata(global_config): + parts_metadata = os.path.join(global_config.target, "compose/metadata/parts.json") + with open(parts_metadata) as f: + return json.load(f) + + +def setup_for_restart(global_config, parts, to_restart): + has_stuff_to_do = False + metadata = load_parts_metadata(global_config) + for key in metadata: + # Update state to match what is on disk + log.debug( + "Reusing %s (%s) from %s", + key, + metadata[key]["status"], + metadata[key]["path"], + ) + parts[key].status = metadata[key]["status"] + parts[key].path = metadata[key]["path"] + for key in to_restart: + # Set restarted parts to run again + parts[key].status = Status.WAITING + parts[key].path = None + + for key in to_restart: + # Remove blockers that are already finished + for blocker in list(parts[key].blocked_on): + if parts[blocker].is_finished(): + parts[key].blocked_on.discard(blocker) + if not parts[key].blocked_on: + log.debug("Part %s in not blocked", key) + # Nothing blocks it; let's go + parts[key].status = Status.READY + has_stuff_to_do = True + + if not has_stuff_to_do: + raise RuntimeError("All restarted parts are blocked. Nothing to do.") + + +def run(work_dir, main_config_file, args): + config_dir = os.path.join(work_dir, "config") + shutil.copytree(os.path.dirname(main_config_file), config_dir) + + # Read main config + parser = configparser.RawConfigParser() + parser.read(main_config_file) + + compose_info = dict(parser.items("general")) + compose_type = parser.get("general", "compose_type") + + target_dir = prepare_compose_dir(parser, args, main_config_file, compose_info) + kobo.log.add_file_logger(log, os.path.join(target_dir, "logs", "orchestrator.log")) + log.info("Composing %s", target_dir) + + old_compose = find_old_compose( + os.path.dirname(target_dir), + compose_info["release_short"], + compose_info["release_version"], + "", + ) + if old_compose: + log.info("Reusing old compose %s", old_compose) + + global_config = Config( + target=target_dir, + compose_type=compose_type, + label=args.label, + old_compose=old_compose, + config_dir=os.path.dirname(main_config_file), + event=args.koji_event, + extra_args=_safe_get_list(parser, "general", "extra_args"), + ) + + if not global_config.event and parser.has_option("general", "koji_profile"): + koji_wrapper = KojiWrapper(parser.get("general", "koji_profile")) + event_file = os.path.join(global_config.target, "work/global/koji-event") + result = get_koji_event_raw(koji_wrapper, None, event_file) + global_config = global_config._replace(event=result["id"]) + + parts = {} + for section in parser.sections(): + if section == "general": + continue + parts[section] = ComposePart.from_config(parser, section, config_dir) + + if hasattr(args, "part"): + setup_for_restart() + + return run_all(global_config, parts) + + +def parse_args(argv): + parser = argparse.ArgumentParser() + parser.add_argument("--debug", action="store_true") + parser.add_argument("--koji-event", metavar="ID", type=parse_koji_event) + subparsers = parser.add_subparsers() + start = subparsers.add_parser("start") + start.add_argument("config", metavar="CONFIG") + start.add_argument("--label") + + restart = subparsers.add_parser("restart") + restart.add_argument("config", metavar="CONFIG") + restart.add_argument("compose_path", metavar="COMPOSE_PATH") + restart.add_argument( + "part", metavar="PART", nargs="*", help="which parts to restart" + ) + restart.add_argument("--label") + + return parser.parse_args(argv) + + +def main(argv=None): + args = parse_args(argv) + setup_logging(args.debug) + + main_config_file = os.path.abspath(args.config) + + with temp_dir() as work_dir: + if not run(work_dir, main_config_file, args): + sys.exit(1) diff --git a/setup.py b/setup.py index ac8b10b8..dcb7f0eb 100755 --- a/setup.py +++ b/setup.py @@ -43,6 +43,7 @@ setup( 'bin/pungi-gather', 'bin/pungi-koji', 'bin/pungi-make-ostree', + 'bin/pungi-orchestrate', 'bin/pungi-patch-iso', 'bin/pungi-wait-for-signed-ostree-handler', diff --git a/tests/data/client.conf b/tests/data/client.conf new file mode 100644 index 00000000..1ce23ca7 --- /dev/null +++ b/tests/data/client.conf @@ -0,0 +1,14 @@ +from dummy-pungi import * + +tree_variants = ["Client"] +pkgset_repos = { + "i386": [ + "{{configdir}}/repo", + ], + "x86_64": [ + "{{configdir}}/repo", + ], + "s390x": [ + "{{configdir}}/repo", + ], +} diff --git a/tests/data/multi-compose-variants.xml b/tests/data/multi-compose-variants.xml new file mode 100644 index 00000000..3f836525 --- /dev/null +++ b/tests/data/multi-compose-variants.xml @@ -0,0 +1,47 @@ + + + + + + + x86_64 + + + resilient-storage + + + + + + i386 + x86_64 + + + core + standard + text-internet + firefox + skype + + + minimal + desktop + + + + + + x86_64 + s390x + + + core + standard + text-internet + + + minimal + + + + diff --git a/tests/data/multi-compose.conf b/tests/data/multi-compose.conf new file mode 100644 index 00000000..f1807639 --- /dev/null +++ b/tests/data/multi-compose.conf @@ -0,0 +1,19 @@ +[general] +release_name = Multi Compose +release_short = multi +release_version = 1.0 +release_type = ga +compose_type = nightly +target = ../_composes/ +extra_args = --quiet + +[server] +config = server.conf + +[client] +config = client.conf + +[resilient-storage] +config = resilient-storage.conf +depends_on = server +failable = yes diff --git a/tests/data/resilient-storage.conf b/tests/data/resilient-storage.conf new file mode 100644 index 00000000..b414f875 --- /dev/null +++ b/tests/data/resilient-storage.conf @@ -0,0 +1,15 @@ +from dummy-pungi import * + +tree_variants = ["ResilientStorage"] +variants_file = "multi-compose-variants.xml" +pkgset_repos = { + "i386": [ + "{{configdir}}/repo", + ], + "x86_64": [ + "{{configdir}}/repo", + ], + "s390x": [ + "{{configdir}}/repo", + ], +} diff --git a/tests/data/server.conf b/tests/data/server.conf new file mode 100644 index 00000000..cec3e3fb --- /dev/null +++ b/tests/data/server.conf @@ -0,0 +1,16 @@ +from dummy-pungi import * + +tree_variants = ["Server"] +variants_file = "multi-compose-variants.xml" +pkgset_repos = { + "i386": [ + "{{configdir}}/repo", + ], + "x86_64": [ + "{{configdir}}/repo", + ], + "s390x": [ + "{{configdir}}/repo", + ], +} +extra_isos = {} diff --git a/tests/fixtures/DP-1.0-20181001.n.0/compose/metadata/composeinfo.json b/tests/fixtures/DP-1.0-20181001.n.0/compose/metadata/composeinfo.json new file mode 100644 index 00000000..ea13f0a6 --- /dev/null +++ b/tests/fixtures/DP-1.0-20181001.n.0/compose/metadata/composeinfo.json @@ -0,0 +1,25 @@ +{ + "header": { + "type": "productmd.composeinfo", + "version": "1.2" + }, + "payload": { + "compose": { + "date": "20181001", + "id": "DP-1.0-20181001.n.0", + "respin": 0, + "type": "nightly" + }, + "release": { + "internal": false, + "name": "Dummy Product", + "short": "DP", + "type": "ga", + "version": "1.0" + }, + "variants": { + "Var1": { + } + } + } +} diff --git a/tests/fixtures/DP-1.0-20181001.n.0/compose/metadata/images.json b/tests/fixtures/DP-1.0-20181001.n.0/compose/metadata/images.json new file mode 100644 index 00000000..dbc4ed08 --- /dev/null +++ b/tests/fixtures/DP-1.0-20181001.n.0/compose/metadata/images.json @@ -0,0 +1,18 @@ +{ + "header": { + "type": "productmd.images", + "version": "1.2" + }, + "payload": { + "compose": { + "date": "20181001", + "id": "DP-1.0-20181001.n.0", + "respin": 0, + "type": "nightly" + }, + "images": { + "Var1": { + } + } + } +} diff --git a/tests/fixtures/DP-1.0-20181001.n.0/compose/metadata/modules.json b/tests/fixtures/DP-1.0-20181001.n.0/compose/metadata/modules.json new file mode 100644 index 00000000..d3c8fc2f --- /dev/null +++ b/tests/fixtures/DP-1.0-20181001.n.0/compose/metadata/modules.json @@ -0,0 +1,18 @@ +{ + "header": { + "type": "productmd.modules", + "version": "1.2" + }, + "payload": { + "compose": { + "date": "20181001", + "id": "DP-1.0-20181001.n.0", + "respin": 0, + "type": "nightly" + }, + "modules": { + "Var1": { + } + } + } +} diff --git a/tests/fixtures/DP-1.0-20181001.n.0/compose/metadata/osbs.json b/tests/fixtures/DP-1.0-20181001.n.0/compose/metadata/osbs.json new file mode 100644 index 00000000..6fe980e6 --- /dev/null +++ b/tests/fixtures/DP-1.0-20181001.n.0/compose/metadata/osbs.json @@ -0,0 +1,4 @@ +{ + "Var1": { + } +} diff --git a/tests/fixtures/DP-1.0-20181001.n.0/compose/metadata/rpms.json b/tests/fixtures/DP-1.0-20181001.n.0/compose/metadata/rpms.json new file mode 100644 index 00000000..2fe27c84 --- /dev/null +++ b/tests/fixtures/DP-1.0-20181001.n.0/compose/metadata/rpms.json @@ -0,0 +1,18 @@ +{ + "header": { + "type": "productmd.rpms", + "version": "1.2" + }, + "payload": { + "compose": { + "date": "20181001", + "id": "DP-1.0-20181001.n.0", + "respin": 0, + "type": "nightly" + }, + "rpms": { + "Var1": { + } + } + } +} diff --git a/tests/fixtures/basic-metadata-merged/compose/metadata/composeinfo.json b/tests/fixtures/basic-metadata-merged/compose/metadata/composeinfo.json new file mode 100644 index 00000000..3d4b9b18 --- /dev/null +++ b/tests/fixtures/basic-metadata-merged/compose/metadata/composeinfo.json @@ -0,0 +1,27 @@ +{ + "header": { + "type": "productmd.composeinfo", + "version": "1.2" + }, + "payload": { + "compose": { + "date": "20181001", + "id": "Mixed-1.0-20181001.n.0", + "respin": 0, + "type": "nightly" + }, + "release": { + "internal": false, + "name": "Dummy Product", + "short": "Mixed", + "type": "ga", + "version": "1.0" + }, + "variants": { + "Var0": { + }, + "Var1": { + } + } + } +} diff --git a/tests/fixtures/basic-metadata-merged/compose/metadata/images.json b/tests/fixtures/basic-metadata-merged/compose/metadata/images.json new file mode 100644 index 00000000..4f114a2f --- /dev/null +++ b/tests/fixtures/basic-metadata-merged/compose/metadata/images.json @@ -0,0 +1,20 @@ +{ + "header": { + "type": "productmd.images", + "version": "1.2" + }, + "payload": { + "compose": { + "date": "20181001", + "id": "Mixed-1.0-20181001.n.0", + "respin": 0, + "type": "nightly" + }, + "images": { + "Var0": { + }, + "Var1": { + } + } + } +} diff --git a/tests/fixtures/basic-metadata-merged/compose/metadata/modules.json b/tests/fixtures/basic-metadata-merged/compose/metadata/modules.json new file mode 100644 index 00000000..e4767297 --- /dev/null +++ b/tests/fixtures/basic-metadata-merged/compose/metadata/modules.json @@ -0,0 +1,20 @@ +{ + "header": { + "type": "productmd.modules", + "version": "1.2" + }, + "payload": { + "compose": { + "date": "20181001", + "id": "Mixed-1.0-20181001.n.0", + "respin": 0, + "type": "nightly" + }, + "modules": { + "Var0": { + }, + "Var1": { + } + } + } +} diff --git a/tests/fixtures/basic-metadata-merged/compose/metadata/osbs.json b/tests/fixtures/basic-metadata-merged/compose/metadata/osbs.json new file mode 100644 index 00000000..a4e51795 --- /dev/null +++ b/tests/fixtures/basic-metadata-merged/compose/metadata/osbs.json @@ -0,0 +1,6 @@ +{ + "Var0": { + }, + "Var1": { + } +} diff --git a/tests/fixtures/basic-metadata-merged/compose/metadata/rpms.json b/tests/fixtures/basic-metadata-merged/compose/metadata/rpms.json new file mode 100644 index 00000000..199b6422 --- /dev/null +++ b/tests/fixtures/basic-metadata-merged/compose/metadata/rpms.json @@ -0,0 +1,20 @@ +{ + "header": { + "type": "productmd.rpms", + "version": "1.2" + }, + "payload": { + "compose": { + "date": "20181001", + "id": "Mixed-1.0-20181001.n.0", + "respin": 0, + "type": "nightly" + }, + "rpms": { + "Var0": { + }, + "Var1": { + } + } + } +} diff --git a/tests/fixtures/basic-metadata/compose/metadata/composeinfo.json b/tests/fixtures/basic-metadata/compose/metadata/composeinfo.json new file mode 100644 index 00000000..eb64af2d --- /dev/null +++ b/tests/fixtures/basic-metadata/compose/metadata/composeinfo.json @@ -0,0 +1,25 @@ +{ + "header": { + "type": "productmd.composeinfo", + "version": "1.2" + }, + "payload": { + "compose": { + "date": "20181001", + "id": "Mixed-1.0-20181001.n.0", + "respin": 0, + "type": "nightly" + }, + "release": { + "internal": false, + "name": "Dummy Product", + "short": "Mixed", + "type": "ga", + "version": "1.0" + }, + "variants": { + "Var0": { + } + } + } +} diff --git a/tests/fixtures/basic-metadata/compose/metadata/images.json b/tests/fixtures/basic-metadata/compose/metadata/images.json new file mode 100644 index 00000000..1948b04f --- /dev/null +++ b/tests/fixtures/basic-metadata/compose/metadata/images.json @@ -0,0 +1,18 @@ +{ + "header": { + "type": "productmd.images", + "version": "1.2" + }, + "payload": { + "compose": { + "date": "20181001", + "id": "Mixed-1.0-20181001.n.0", + "respin": 0, + "type": "nightly" + }, + "images": { + "Var0": { + } + } + } +} diff --git a/tests/fixtures/basic-metadata/compose/metadata/modules.json b/tests/fixtures/basic-metadata/compose/metadata/modules.json new file mode 100644 index 00000000..6418d24d --- /dev/null +++ b/tests/fixtures/basic-metadata/compose/metadata/modules.json @@ -0,0 +1,18 @@ +{ + "header": { + "type": "productmd.modules", + "version": "1.2" + }, + "payload": { + "compose": { + "date": "20181001", + "id": "Mixed-1.0-20181001.n.0", + "respin": 0, + "type": "nightly" + }, + "modules": { + "Var0": { + } + } + } +} diff --git a/tests/fixtures/basic-metadata/compose/metadata/osbs.json b/tests/fixtures/basic-metadata/compose/metadata/osbs.json new file mode 100644 index 00000000..d51fa3c6 --- /dev/null +++ b/tests/fixtures/basic-metadata/compose/metadata/osbs.json @@ -0,0 +1,4 @@ +{ + "Var0": { + } +} diff --git a/tests/fixtures/basic-metadata/compose/metadata/rpms.json b/tests/fixtures/basic-metadata/compose/metadata/rpms.json new file mode 100644 index 00000000..d6b71023 --- /dev/null +++ b/tests/fixtures/basic-metadata/compose/metadata/rpms.json @@ -0,0 +1,18 @@ +{ + "header": { + "type": "productmd.rpms", + "version": "1.2" + }, + "payload": { + "compose": { + "date": "20181001", + "id": "Mixed-1.0-20181001.n.0", + "respin": 0, + "type": "nightly" + }, + "rpms": { + "Var0": { + } + } + } +} diff --git a/tests/fixtures/empty-metadata-merged/compose/metadata/composeinfo.json b/tests/fixtures/empty-metadata-merged/compose/metadata/composeinfo.json new file mode 100644 index 00000000..d446c93b --- /dev/null +++ b/tests/fixtures/empty-metadata-merged/compose/metadata/composeinfo.json @@ -0,0 +1,25 @@ +{ + "header": { + "type": "productmd.composeinfo", + "version": "1.2" + }, + "payload": { + "compose": { + "date": "20181001", + "id": "Empty-1.0-20181001.n.0", + "respin": 0, + "type": "nightly" + }, + "release": { + "internal": false, + "name": "Dummy Product", + "short": "Empty", + "type": "ga", + "version": "1.0" + }, + "variants": { + "Var1": { + } + } + } +} diff --git a/tests/fixtures/empty-metadata-merged/compose/metadata/images.json b/tests/fixtures/empty-metadata-merged/compose/metadata/images.json new file mode 100644 index 00000000..60cc6ba7 --- /dev/null +++ b/tests/fixtures/empty-metadata-merged/compose/metadata/images.json @@ -0,0 +1,18 @@ +{ + "header": { + "type": "productmd.images", + "version": "1.2" + }, + "payload": { + "compose": { + "date": "20181001", + "id": "Empty-1.0-20181001.n.0", + "respin": 0, + "type": "nightly" + }, + "images": { + "Var1": { + } + } + } +} diff --git a/tests/fixtures/empty-metadata-merged/compose/metadata/modules.json b/tests/fixtures/empty-metadata-merged/compose/metadata/modules.json new file mode 100644 index 00000000..53ed3b6a --- /dev/null +++ b/tests/fixtures/empty-metadata-merged/compose/metadata/modules.json @@ -0,0 +1,18 @@ +{ + "header": { + "type": "productmd.modules", + "version": "1.2" + }, + "payload": { + "compose": { + "date": "20181001", + "id": "Empty-1.0-20181001.n.0", + "respin": 0, + "type": "nightly" + }, + "modules": { + "Var1": { + } + } + } +} diff --git a/tests/fixtures/empty-metadata-merged/compose/metadata/osbs.json b/tests/fixtures/empty-metadata-merged/compose/metadata/osbs.json new file mode 100644 index 00000000..6fe980e6 --- /dev/null +++ b/tests/fixtures/empty-metadata-merged/compose/metadata/osbs.json @@ -0,0 +1,4 @@ +{ + "Var1": { + } +} diff --git a/tests/fixtures/empty-metadata-merged/compose/metadata/rpms.json b/tests/fixtures/empty-metadata-merged/compose/metadata/rpms.json new file mode 100644 index 00000000..8a8e5a93 --- /dev/null +++ b/tests/fixtures/empty-metadata-merged/compose/metadata/rpms.json @@ -0,0 +1,18 @@ +{ + "header": { + "type": "productmd.rpms", + "version": "1.2" + }, + "payload": { + "compose": { + "date": "20181001", + "id": "Empty-1.0-20181001.n.0", + "respin": 0, + "type": "nightly" + }, + "rpms": { + "Var1": { + } + } + } +} diff --git a/tests/fixtures/empty-metadata/compose/metadata/composeinfo.json b/tests/fixtures/empty-metadata/compose/metadata/composeinfo.json new file mode 100644 index 00000000..d7660318 --- /dev/null +++ b/tests/fixtures/empty-metadata/compose/metadata/composeinfo.json @@ -0,0 +1,23 @@ +{ + "header": { + "type": "productmd.composeinfo", + "version": "1.2" + }, + "payload": { + "compose": { + "date": "20181001", + "id": "Empty-1.0-20181001.n.0", + "respin": 0, + "type": "nightly" + }, + "release": { + "internal": false, + "name": "Dummy Product", + "short": "Empty", + "type": "ga", + "version": "1.0" + }, + "variants": { + } + } +} diff --git a/tests/test_orchestrator.py b/tests/test_orchestrator.py new file mode 100644 index 00000000..35fe1ddc --- /dev/null +++ b/tests/test_orchestrator.py @@ -0,0 +1,806 @@ +# -*- coding: utf-8 -*- + +import itertools +import json +from functools import wraps +import operator +import os +import shutil +import subprocess +import sys +from textwrap import dedent + +import mock +import six +from six.moves import configparser + +from parameterized import parameterized + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) + +from tests.helpers import BaseTestCase, PungiTestCase, touch, FIXTURE_DIR +from pungi_utils import orchestrator as o + + +class TestConfigSubstitute(PungiTestCase): + def setUp(self): + super(TestConfigSubstitute, self).setUp() + self.fp = os.path.join(self.topdir, "config.conf") + + @parameterized.expand( + [ + ("hello = 'world'", "hello = 'world'"), + ("hello = '{{foo}}'", "hello = 'bar'"), + ("hello = '{{ foo}}'", "hello = 'bar'"), + ("hello = '{{foo }}'", "hello = 'bar'"), + ] + ) + def test_substitutions(self, initial, expected): + touch(self.fp, initial) + o.fill_in_config_file(self.fp, {"foo": "bar"}) + with open(self.fp) as f: + self.assertEqual(expected, f.read()) + + def test_missing_key(self): + touch(self.fp, "hello = '{{unknown}}'") + with self.assertRaises(RuntimeError) as ctx: + o.fill_in_config_file(self.fp, {}) + self.assertEqual( + "Unknown placeholder 'unknown' in config.conf", str(ctx.exception) + ) + + +class TestSafeGetList(BaseTestCase): + @parameterized.expand( + [ + ("", []), + ("foo", ["foo"]), + ("foo,bar", ["foo", "bar"]), + ("foo bar", ["foo", "bar"]), + ] + ) + def test_success(self, value, expected): + cf = configparser.RawConfigParser() + cf.add_section("general") + cf.set("general", "key", value) + self.assertEqual(o._safe_get_list(cf, "general", "key"), expected) + + def test_default(self): + cf = configparser.RawConfigParser() + cf.add_section("general") + self.assertEqual(o._safe_get_list(cf, "general", "missing", "hello"), "hello") + + +class TestComposePart(PungiTestCase): + def test_from_minimal_config(self): + cf = configparser.RawConfigParser() + cf.add_section("test") + cf.set("test", "config", "my.conf") + + part = o.ComposePart.from_config(cf, "test", "/tmp/config") + deps = "set()" if six.PY3 else "set([])" + self.assertEqual(str(part), "test") + self.assertEqual( + repr(part), + "ComposePart('test', '/tmp/config/my.conf', 'READY', " + "just_phase=[], skip_phase=[], dependencies=%s)" % deps, + ) + self.assertFalse(part.failable) + + def test_from_full_config(self): + cf = configparser.RawConfigParser() + cf.add_section("test") + cf.set("test", "config", "my.conf") + cf.set("test", "depends_on", "base") + cf.set("test", "skip_phase", "skip") + cf.set("test", "just_phase", "just") + cf.set("test", "failable", "yes") + + part = o.ComposePart.from_config(cf, "test", "/tmp/config") + deps = "{'base'}" if six.PY3 else "set(['base'])" + self.assertEqual( + repr(part), + "ComposePart('test', '/tmp/config/my.conf', 'WAITING', " + "just_phase=['just'], skip_phase=['skip'], dependencies=%s)" % deps, + ) + self.assertTrue(part.failable) + + def test_get_cmd(self): + conf = o.Config( + "/tgt/", "production", "RC-1.0", "/old", "/cfg", 1234, ["--quiet"] + ) + part = o.ComposePart( + "test", "/tmp/my.conf", just_phase=["just"], skip_phase=["skip"] + ) + part.path = "/compose" + + self.assertEqual( + part.get_cmd(conf), + [ + "pungi-koji", + "--config", + "/tmp/my.conf", + "--compose-dir", + "/compose", + "--production", + "--label", + "RC-1.0", + "--just-phase", + "just", + "--skip-phase", + "skip", + "--old-compose", + "/old/parts", + "--koji-event", + "1234", + "--quiet", + "--no-latest-link", + ], + ) + + def test_refresh_status(self): + part = o.ComposePart("test", "/tmp/my.conf") + part.path = os.path.join(self.topdir) + touch(os.path.join(self.topdir, "STATUS"), "FINISHED") + part.refresh_status() + self.assertEqual(part.status, "FINISHED") + + def test_refresh_status_missing_file(self): + part = o.ComposePart("test", "/tmp/my.conf") + part.path = os.path.join(self.topdir) + part.refresh_status() + self.assertEqual(part.status, "DOOMED") + + @parameterized.expand(["FINISHED", "FINISHED_INCOMPLETE"]) + def test_is_finished(self, status): + part = o.ComposePart("test", "/tmp/my.conf") + part.status = status + self.assertTrue(part.is_finished()) + + @parameterized.expand(["STARTED", "WAITING"]) + def test_is_not_finished(self, status): + part = o.ComposePart("test", "/tmp/my.conf") + part.status = status + self.assertFalse(part.is_finished()) + + @mock.patch("pungi_utils.orchestrator.fill_in_config_file") + @mock.patch("pungi_utils.orchestrator.get_compose_dir") + @mock.patch("kobo.conf.PyConfigParser") + def test_setup_start(self, Conf, gcd, ficf): + def pth(*path): + return os.path.join(self.topdir, *path) + + conf = o.Config( + pth("tgt"), "production", "RC-1.0", "/old", pth("cfg"), None, None + ) + part = o.ComposePart("test", "/tmp/my.conf") + parts = {"base": mock.Mock(path="/base", is_finished=lambda: True)} + Conf.return_value.opened_files = ["foo.conf"] + + part.setup_start(conf, parts) + + self.assertEqual(part.status, "STARTED") + self.assertEqual(part.path, gcd.return_value) + self.assertEqual(part.log_file, pth("tgt", "logs", "test.log")) + self.assertEqual( + ficf.call_args_list, + [mock.call("foo.conf", {"base": "/base", "configdir": pth("cfg")})], + ) + self.assertEqual( + gcd.call_args_list, + [ + mock.call( + pth("tgt/parts"), + Conf.return_value, + compose_type="production", + compose_label="RC-1.0", + ) + ], + ) + + @parameterized.expand( + [ + # Nothing blocking, no change + ([], [], o.Status.READY), + # Remove last blocker and switch to READY + (["finished"], [], o.Status.READY), + # Blocker remaining, stay in WAITING + (["finished", "block"], ["block"], o.Status.WAITING), + ] + ) + def test_unblock_on(self, deps, blockers, status): + part = o.ComposePart("test", "/tmp/my.conf", dependencies=deps) + part.unblock_on("finished") + self.assertItemsEqual(part.blocked_on, blockers) + self.assertEqual(part.status, status) + + +class TestStartPart(PungiTestCase): + @mock.patch("subprocess.Popen") + def test_start(self, Popen): + part = mock.Mock(log_file=os.path.join(self.topdir, "log")) + config = mock.Mock() + parts = mock.Mock() + cmd = ["pungi-koji", "..."] + + part.get_cmd.return_value = cmd + + proc = o.start_part(config, parts, part) + + self.assertEqual( + part.mock_calls, + [mock.call.setup_start(config, parts), mock.call.get_cmd(config)], + ) + self.assertEqual(proc, Popen.return_value) + self.assertEqual( + Popen.call_args_list, + [mock.call(cmd, stdout=mock.ANY, stderr=subprocess.STDOUT)], + ) + + +class TestHandleFinished(BaseTestCase): + def setUp(self): + self.config = mock.Mock() + self.linker = mock.Mock() + self.parts = {"a": mock.Mock(), "b": mock.Mock()} + + @mock.patch("pungi_utils.orchestrator.update_metadata") + @mock.patch("pungi_utils.orchestrator.copy_part") + def test_handle_success(self, cp, um): + proc = mock.Mock(returncode=0) + o.handle_finished(self.config, self.linker, self.parts, proc, self.parts["a"]) + + self.assertEqual( + self.parts["a"].mock_calls, + [mock.call.refresh_status(), mock.call.unblock_on(self.parts["a"].name)], + ) + self.assertEqual( + self.parts["b"].mock_calls, [mock.call.unblock_on(self.parts["a"].name)] + ) + self.assertEqual( + cp.call_args_list, [mock.call(self.config, self.linker, self.parts["a"])] + ) + self.assertEqual(um.call_args_list, [mock.call(self.config, self.parts["a"])]) + + @mock.patch("pungi_utils.orchestrator.block_on") + def test_handle_failure(self, bo): + proc = mock.Mock(returncode=1) + o.handle_finished(self.config, self.linker, self.parts, proc, self.parts["a"]) + + self.assertEqual(self.parts["a"].mock_calls, [mock.call.refresh_status()]) + + self.assertEqual( + bo.call_args_list, [mock.call(self.parts, self.parts["a"].name)] + ) + + +class TestBlockOn(BaseTestCase): + def test_single(self): + parts = {"b": o.ComposePart("b", "b.conf", dependencies=["a"])} + + o.block_on(parts, "a") + + self.assertEqual(parts["b"].status, o.Status.BLOCKED) + + def test_chain(self): + parts = { + "b": o.ComposePart("b", "b.conf", dependencies=["a"]), + "c": o.ComposePart("c", "c.conf", dependencies=["b"]), + "d": o.ComposePart("d", "d.conf", dependencies=["c"]), + } + + o.block_on(parts, "a") + + self.assertEqual(parts["b"].status, o.Status.BLOCKED) + self.assertEqual(parts["c"].status, o.Status.BLOCKED) + self.assertEqual(parts["d"].status, o.Status.BLOCKED) + + +class TestUpdateMetadata(PungiTestCase): + def assertEqualJSON(self, f1, f2): + with open(f1) as f: + actual = json.load(f) + with open(f2) as f: + expected = json.load(f) + self.assertEqual(actual, expected) + + def assertEqualMetadata(self, expected): + expected_dir = os.path.join(FIXTURE_DIR, expected, "compose/metadata") + for f in os.listdir(expected_dir): + self.assertEqualJSON( + os.path.join(self.tgt, "compose/metadata", f), + os.path.join(expected_dir, f), + ) + + @parameterized.expand(["empty-metadata", "basic-metadata"]) + def test_merge_into_empty(self, fixture): + self.tgt = os.path.join(self.topdir, "target") + + conf = o.Config(self.tgt, "production", None, None, None, None, []) + part = o.ComposePart("test", "/tmp/my.conf") + part.path = os.path.join(FIXTURE_DIR, "DP-1.0-20181001.n.0") + + shutil.copytree(os.path.join(FIXTURE_DIR, fixture), self.tgt) + + o.update_metadata(conf, part) + + self.assertEqualMetadata(fixture + "-merged") + + +class TestCopyPart(PungiTestCase): + @mock.patch("pungi_utils.orchestrator.hardlink_dir") + def test_copy(self, hd): + self.tgt = os.path.join(self.topdir, "target") + conf = o.Config(self.tgt, "production", None, None, None, None, []) + linker = mock.Mock() + part = o.ComposePart("test", "/tmp/my.conf") + part.path = os.path.join(FIXTURE_DIR, "DP-1.0-20161013.t.4") + + o.copy_part(conf, linker, part) + + self.assertItemsEqual( + hd.call_args_list, + [ + mock.call( + linker, + os.path.join(part.path, "compose", variant), + os.path.join(self.tgt, "compose", variant), + ) + for variant in ["Client", "Server"] + ], + ) + + +class TestHardlinkDir(PungiTestCase): + def test_hardlinking(self): + linker = mock.Mock() + src = os.path.join(self.topdir, "src") + dst = os.path.join(self.topdir, "dst") + files = ["file.txt", "nested/deep/another.txt"] + + for f in files: + touch(os.path.join(src, f)) + + o.hardlink_dir(linker, src, dst) + + self.assertItemsEqual( + linker.queue_put.call_args_list, + [mock.call((os.path.join(src, f), os.path.join(dst, f))) for f in files], + ) + + +class TestCheckFinishedProcesses(BaseTestCase): + def test_nothing_finished(self): + k1 = mock.Mock(returncode=None) + v1 = mock.Mock() + processes = {k1: v1} + + self.assertItemsEqual(o.check_finished_processes(processes), []) + + def test_yields_finished(self): + k1 = mock.Mock(returncode=None) + v1 = mock.Mock() + k2 = mock.Mock(returncode=0) + v2 = mock.Mock() + processes = {k1: v1, k2: v2} + + self.assertItemsEqual(o.check_finished_processes(processes), [(k2, v2)]) + + def test_yields_failed(self): + k1 = mock.Mock(returncode=1) + v1 = mock.Mock() + processes = {k1: v1} + + self.assertItemsEqual(o.check_finished_processes(processes), [(k1, v1)]) + + +class _Part(object): + def __init__(self, name, parent=None, fails=False, status=None): + self.name = name + self.finished = False + self.status = o.Status.WAITING if parent else o.Status.READY + if status: + self.status = status + self.proc = mock.Mock(name="proc_%s" % name, pid=hash(self)) + self.parent = parent + self.fails = fails + self.failable = False + self.path = "/path/to/%s" % name + self.blocked_on = set([parent]) if parent else set() + + def is_finished(self): + return self.finished or self.status == "FINISHED" + + def __repr__(self): + return "<_Part(%r, parent=%r)>" % (self.name, self.parent) + + +def with_mocks(parts, finish_order, wait_results): + """Setup all mocks and create dict with the parts. + :param finish_order: nested list: first element contains parts that finish + in first iteration, etc. + :param wait_results: list of names of processes that are returned by wait in each + iteration + """ + + def decorator(func): + @wraps(func) + def worker(self, lp, update_status, cfp, hf, sp, wait): + self.parts = dict((p.name, p) for p in parts) + self.linker = lp.return_value.__enter__.return_value + + update_status.side_effect = self.mock_update + hf.side_effect = self.mock_finish + sp.side_effect = self.mock_start + + finish = [[]] + for grp in finish_order: + finish.append([(self.parts[p].proc, self.parts[p]) for p in grp]) + + cfp.side_effect = finish + wait.side_effect = [(self.parts[p].proc.pid, 0) for p in wait_results] + + func(self) + + self.assertEqual(lp.call_args_list, [mock.call("hardlink")]) + + return worker + + return decorator + + +@mock.patch("os.wait") +@mock.patch("pungi_utils.orchestrator.start_part") +@mock.patch("pungi_utils.orchestrator.handle_finished") +@mock.patch("pungi_utils.orchestrator.check_finished_processes") +@mock.patch("pungi_utils.orchestrator.update_status") +@mock.patch("pungi_utils.orchestrator.linker_pool") +class TestRunAll(BaseTestCase): + def setUp(self): + self.maxDiff = None + self.conf = mock.Mock(name="global_config") + self.calls = [] + + def mock_update(self, global_config, parts): + self.assertEqual(global_config, self.conf) + self.assertEqual(parts, self.parts) + self.calls.append("update_status") + + def mock_start(self, global_config, parts, part): + self.assertEqual(global_config, self.conf) + self.assertEqual(parts, self.parts) + self.calls.append(("start_part", part.name)) + part.status = o.Status.STARTED + return part.proc + + @property + def sorted_calls(self): + """Sort the consecutive calls of the same function based on the argument.""" + + def key(val): + return val[0] if isinstance(val, tuple) else val + + return list( + itertools.chain.from_iterable( + sorted(grp, key=operator.itemgetter(1)) + for _, grp in itertools.groupby(self.calls, key) + ) + ) + + def mock_finish(self, global_config, linker, parts, proc, part): + self.assertEqual(global_config, self.conf) + self.assertEqual(linker, self.linker) + self.assertEqual(parts, self.parts) + self.calls.append(("handle_finished", part.name)) + for child in parts.values(): + if child.parent == part.name: + child.status = o.Status.BLOCKED if part.fails else o.Status.READY + part.status = "DOOMED" if part.fails else "FINISHED" + + @with_mocks( + [_Part("fst"), _Part("snd", parent="fst")], [["fst"], ["snd"]], ["fst", "snd"] + ) + def test_sequential(self): + o.run_all(self.conf, self.parts) + + self.assertEqual( + self.sorted_calls, + [ + # First iteration starts fst + "update_status", + ("start_part", "fst"), + # Second iteration handles finish of fst and starts snd + "update_status", + ("handle_finished", "fst"), + ("start_part", "snd"), + # Third iteration handles finish of snd + "update_status", + ("handle_finished", "snd"), + # Final update of status + "update_status", + ], + ) + + @with_mocks([_Part("fst"), _Part("snd")], [["fst", "snd"]], ["fst"]) + def test_parallel(self): + o.run_all(self.conf, self.parts) + + self.assertEqual( + self.sorted_calls, + [ + # First iteration starts both fst and snd + "update_status", + ("start_part", "fst"), + ("start_part", "snd"), + # Second iteration handles finish of both of them + "update_status", + ("handle_finished", "fst"), + ("handle_finished", "snd"), + # Final update of status + "update_status", + ], + ) + + @with_mocks( + [_Part("1"), _Part("2", parent="1"), _Part("3", parent="1")], + [["1"], ["2", "3"]], + ["1", "2"], + ) + def test_waits_for_dep_then_parallel_with_simultaneous_end(self): + o.run_all(self.conf, self.parts) + + self.assertEqual( + self.sorted_calls, + [ + # First iteration starts first part + "update_status", + ("start_part", "1"), + # Second iteration starts 2 and 3 + "update_status", + ("handle_finished", "1"), + ("start_part", "2"), + ("start_part", "3"), + # Both 2 and 3 end in third iteration + "update_status", + ("handle_finished", "2"), + ("handle_finished", "3"), + # Final update of status + "update_status", + ], + ) + + @with_mocks( + [_Part("1"), _Part("2", parent="1"), _Part("3", parent="1")], + [["1"], ["3"], ["2"]], + ["1", "3", "2"], + ) + def test_waits_for_dep_then_parallel_with_different_end_times(self): + o.run_all(self.conf, self.parts) + + self.assertEqual( + self.sorted_calls, + [ + # First iteration starts first part + "update_status", + ("start_part", "1"), + # Second iteration starts 2 and 3 + "update_status", + ("handle_finished", "1"), + ("start_part", "2"), + ("start_part", "3"), + # Third iteration sees 3 finish + "update_status", + ("handle_finished", "3"), + # Fourth iteration, 2 finishes + "update_status", + ("handle_finished", "2"), + # Final update of status + "update_status", + ], + ) + + @with_mocks( + [_Part("fst", fails=True), _Part("snd", parent="fst")], [["fst"]], ["fst"] + ) + def test_blocked(self): + o.run_all(self.conf, self.parts) + + self.assertEqual( + self.sorted_calls, + [ + # First iteration starts first part + "update_status", + ("start_part", "fst"), + # Second iteration handles fail of first part + "update_status", + ("handle_finished", "fst"), + # Final update of status + "update_status", + ], + ) + + +@mock.patch("pungi_utils.orchestrator.get_compose_dir") +class TestGetTargetDir(BaseTestCase): + def test_with_absolute_path(self, gcd): + config = {"target": "/tgt", "compose_type": "nightly"} + cfg = mock.Mock() + cfg.get.side_effect = lambda _, k: config[k] + ci = mock.Mock() + res = o.get_target_dir(cfg, ci, None, reldir="/checkout") + self.assertEqual(res, gcd.return_value) + self.assertEqual( + gcd.call_args_list, + [mock.call("/tgt", ci, compose_type="nightly", compose_label=None)], + ) + + def test_with_relative_path(self, gcd): + config = {"target": "tgt", "compose_type": "nightly"} + cfg = mock.Mock() + cfg.get.side_effect = lambda _, k: config[k] + ci = mock.Mock() + res = o.get_target_dir(cfg, ci, None, reldir="/checkout") + self.assertEqual(res, gcd.return_value) + self.assertEqual( + gcd.call_args_list, + [ + mock.call( + "/checkout/tgt", ci, compose_type="nightly", compose_label=None + ) + ], + ) + + +class TestComputeStatus(BaseTestCase): + @parameterized.expand( + [ + ([("FINISHED", False)], "FINISHED"), + ([("FINISHED", False), ("STARTED", False)], "STARTED"), + ([("FINISHED", False), ("STARTED", False), ("WAITING", False)], "STARTED"), + ([("FINISHED", False), ("DOOMED", False)], "DOOMED"), + ( + [("FINISHED", False), ("BLOCKED", True), ("DOOMED", True)], + "FINISHED_INCOMPLETE", + ), + ([("FINISHED", False), ("BLOCKED", False), ("DOOMED", True)], "DOOMED"), + ([("FINISHED", False), ("DOOMED", True)], "FINISHED_INCOMPLETE"), + ([("FINISHED", False), ("STARTED", False), ("DOOMED", False)], "STARTED"), + ] + ) + def test_cases(self, statuses, expected): + self.assertEqual(o.compute_status(statuses), expected) + + +class TestUpdateStatus(PungiTestCase): + def test_updating(self): + os.makedirs(os.path.join(self.topdir, "compose/metadata")) + conf = o.Config( + self.topdir, "production", "RC-1.0", "/old", "/cfg", 1234, ["--quiet"] + ) + o.update_status( + conf, + {"1": _Part("1", status="FINISHED"), "2": _Part("2", status="STARTED")}, + ) + self.assertFileContent(os.path.join(self.topdir, "STATUS"), "STARTED") + self.assertFileContent( + os.path.join(self.topdir, "compose/metadata/parts.json"), + dedent( + """\ + { + "1": { + "path": "/path/to/1", + "status": "FINISHED" + }, + "2": { + "path": "/path/to/2", + "status": "STARTED" + } + } + """ + ), + ) + + +@mock.patch("pungi_utils.orchestrator.get_target_dir") +class TestPrepareComposeDir(PungiTestCase): + def setUp(self): + super(TestPrepareComposeDir, self).setUp() + self.conf = mock.Mock(name="config") + self.main_config = "/some/config" + self.compose_info = mock.Mock(name="compose_info") + + def test_new_compose(self, gtd): + def mock_get_target(conf, compose_info, label, reldir): + self.assertEqual(conf, self.conf) + self.assertEqual(compose_info, self.compose_info) + self.assertEqual(label, args.label) + self.assertEqual(reldir, "/some") + touch(os.path.join(self.topdir, "work/global/composeinfo-base.json"), "WOO") + return self.topdir + + gtd.side_effect = mock_get_target + args = mock.Mock(name="args", spec=["label"]) + retval = o.prepare_compose_dir( + self.conf, args, self.main_config, self.compose_info + ) + self.assertEqual(retval, self.topdir) + self.assertFileContent( + os.path.join(self.topdir, "compose/metadata/composeinfo.json"), "WOO" + ) + self.assertTrue(os.path.isdir(os.path.join(self.topdir, "logs"))) + self.assertTrue(os.path.isdir(os.path.join(self.topdir, "parts"))) + self.assertTrue(os.path.isdir(os.path.join(self.topdir, "work/global"))) + + def test_restarting_compose(self, gtd): + args = mock.Mock(name="args", spec=["label", "compose_path"]) + retval = o.prepare_compose_dir( + self.conf, args, self.main_config, self.compose_info + ) + self.assertEqual(gtd.call_args_list, []) + self.assertEqual(retval, args.compose_path) + + +class TestLoadPartsMetadata(PungiTestCase): + def test_loading(self): + touch( + os.path.join(self.topdir, "compose/metadata/parts.json"), '{"foo": "bar"}' + ) + conf = mock.Mock(target=self.topdir) + + self.assertEqual(o.load_parts_metadata(conf), {"foo": "bar"}) + + +@mock.patch("pungi_utils.orchestrator.load_parts_metadata") +class TestSetupForRestart(BaseTestCase): + def setUp(self): + self.conf = mock.Mock(name="global_config") + + def test_restart_ok(self, lpm): + lpm.return_value = { + "p1": {"status": "FINISHED", "path": "/p1"}, + "p2": {"status": "DOOMED", "path": "/p2"}, + } + parts = {"p1": _Part("p1"), "p2": _Part("p2", parent="p1")} + + o.setup_for_restart(self.conf, parts, ["p2"]) + + self.assertEqual(parts["p1"].status, "FINISHED") + self.assertEqual(parts["p1"].path, "/p1") + self.assertEqual(parts["p2"].status, "READY") + self.assertEqual(parts["p2"].path, None) + + def test_restart_one_blocked_one_ok(self, lpm): + lpm.return_value = { + "p1": {"status": "DOOMED", "path": "/p1"}, + "p2": {"status": "DOOMED", "path": "/p2"}, + "p3": {"status": "WAITING", "path": None}, + } + parts = { + "p1": _Part("p1"), + "p2": _Part("p2", parent="p1"), + "p3": _Part("p3", parent="p2"), + } + + o.setup_for_restart(self.conf, parts, ["p1", "p3"]) + + self.assertEqual(parts["p1"].status, "READY") + self.assertEqual(parts["p1"].path, None) + self.assertEqual(parts["p2"].status, "DOOMED") + self.assertEqual(parts["p2"].path, "/p2") + self.assertEqual(parts["p3"].status, "WAITING") + self.assertEqual(parts["p3"].path, None) + + def test_restart_all_blocked(self, lpm): + lpm.return_value = { + "p1": {"status": "DOOMED", "path": "/p1"}, + "p2": {"status": "STARTED", "path": "/p2"}, + } + parts = {"p1": _Part("p1"), "p2": _Part("p2", parent="p1")} + + with self.assertRaises(RuntimeError): + o.setup_for_restart(self.conf, parts, ["p2"]) + + self.assertEqual(parts["p1"].status, "DOOMED") + self.assertEqual(parts["p1"].path, "/p1") + self.assertEqual(parts["p2"].status, "WAITING") + self.assertEqual(parts["p2"].path, None)