From b7adbf8a91cf167bc7e0eb0d60b78c76bc5be2b5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lubom=C3=ADr=20Sedl=C3=A1=C5=99?= Date: Tue, 18 Oct 2022 15:42:37 +0200 Subject: [PATCH] Drop pungi-orchestrator code MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This was never actually used. JIRA: RHELCMP-10218 Signed-off-by: Lubomír Sedlář --- doc/index.rst | 1 - doc/multi_compose.rst | 107 ---- pungi.spec | 1 - pungi/scripts/config_dump.py | 21 - pungi_utils/orchestrator.py | 705 -------------------------- setup.py | 1 - tests/test_orchestrator.py | 934 ----------------------------------- 7 files changed, 1770 deletions(-) delete mode 100644 doc/multi_compose.rst delete mode 100644 pungi_utils/orchestrator.py delete mode 100644 tests/test_orchestrator.py diff --git a/doc/index.rst b/doc/index.rst index 5a92b34a..16d9bf58 100644 --- a/doc/index.rst +++ b/doc/index.rst @@ -22,4 +22,3 @@ Contents: comps contributing testing - multi_compose diff --git a/doc/multi_compose.rst b/doc/multi_compose.rst deleted file mode 100644 index f0c26e28..00000000 --- a/doc/multi_compose.rst +++ /dev/null @@ -1,107 +0,0 @@ -.. _multi_compose: - -Managing compose from multiple parts -==================================== - -There may be cases where it makes sense to split a big compose into separate -parts, but create a compose output that links all output into one familiar -structure. - -The `pungi-orchestrate` tools allows that. - -It works with an INI-style configuration file. The ``[general]`` section -contains information about identity of the main compose. Other sections define -individual parts. - -The parts are scheduled to run in parallel, with the minimal amount of -serialization. The final compose directory will contain hard-links to the -files. - - -General settings ----------------- - -**target** - Path to directory where the final compose should be created. -**compose_type** - Type of compose to make. -**release_name** - Name of the product for the final compose. -**release_short** - Short name of the product for the final compose. -**release_version** - Version of the product for the final compose. -**release_type** - Type of the product for the final compose. -**extra_args** - Additional arguments that will be passed to the child Pungi processes. -**koji_profile** - If specified, a current event will be retrieved from the Koji instance and - used for all parts. - -**kerberos** - If set to yes, a kerberos ticket will be automatically created at the start. - Set keytab and principal as well. -**kerberos_keytab** - Path to keytab file used to create the kerberos ticket. -**kerberos_principal** - Kerberos principal for the ticket - -**pre_compose_script** - Commands to execute before first part is started. Can contain multiple - commands on separate lines. -**post_compose_script** - Commands to execute after the last part finishes and final status is - updated. Can contain multiple commands on separate lines. :: - - post_compose_script = - compose-latest-symlink $COMPOSE_PATH - custom-post-compose-script.sh - - Multiple environment variables are defined for the scripts: - - * ``COMPOSE_PATH`` - * ``COMPOSE_ID`` - * ``COMPOSE_DATE`` - * ``COMPOSE_TYPE`` - * ``COMPOSE_RESPIN`` - * ``COMPOSE_LABEL`` - * ``RELEASE_ID`` - * ``RELEASE_NAME`` - * ``RELEASE_SHORT`` - * ``RELEASE_VERSION`` - * ``RELEASE_TYPE`` - * ``RELEASE_IS_LAYERED`` – ``YES`` for layered products, empty otherwise - * ``BASE_PRODUCT_NAME`` – only set for layered products - * ``BASE_PRODUCT_SHORT`` – only set for layered products - * ``BASE_PRODUCT_VERSION`` – only set for layered products - * ``BASE_PRODUCT_TYPE`` – only set for layered products - -**notification_script** - Executable name (or path to a script) that will be used to send a message - once the compose is finished. In order for a valid URL to be included in the - message, at least one part must configure path translation that would apply - to location of main compose. - - Only two messages will be sent, one for start and one for finish (either - successful or not). - - -Partial compose settings ------------------------- - -Each part should have a separate section in the config file. - -It can specify these options: - -**config** - Path to configuration file that describes this part. If relative, it is - resolved relative to the file with parts configuration. -**just_phase**, **skip_phase** - Customize which phases should run for this part. -**depends_on** - A comma separated list of other parts that must be finished before this part - starts. -**failable** - A boolean toggle to mark a part as failable. A failure in such part will - mark the final compose as incomplete, but still successful. diff --git a/pungi.spec b/pungi.spec index c62d9c11..89d92d0b 100644 --- a/pungi.spec +++ b/pungi.spec @@ -100,7 +100,6 @@ rm -rf %{buildroot} %{_bindir}/%{name}-config-validate %{_bindir}/%{name}-fedmsg-notification %{_bindir}/%{name}-notification-report-progress -%{_bindir}/%{name}-orchestrate %{_bindir}/%{name}-patch-iso %{_bindir}/%{name}-compare-depsolving %{_bindir}/%{name}-wait-for-signed-ostree-handler diff --git a/pungi/scripts/config_dump.py b/pungi/scripts/config_dump.py index cafd66cf..c53ac5ea 100644 --- a/pungi/scripts/config_dump.py +++ b/pungi/scripts/config_dump.py @@ -171,32 +171,11 @@ def main(): group.add_argument( "--offline", action="store_true", help="Do not resolve git references." ) - parser.add_argument( - "--multi", - metavar="DIR", - help=( - "Treat source as config for pungi-orchestrate and store dump into " - "given directory." - ), - ) args = parser.parse_args() defines = config_utils.extract_defines(args.define) - if args.multi: - if len(args.sources) > 1: - parser.error("Only one multi config can be specified.") - - return dump_multi_config( - args.sources[0], - dest=args.multi, - defines=defines, - just_dump=args.just_dump, - event=args.freeze_event, - offline=args.offline, - ) - return process_file( args.sources, defines=defines, diff --git a/pungi_utils/orchestrator.py b/pungi_utils/orchestrator.py deleted file mode 100644 index e63838aa..00000000 --- a/pungi_utils/orchestrator.py +++ /dev/null @@ -1,705 +0,0 @@ -# -*- coding: utf-8 -*- - -from __future__ import print_function - -import argparse -import atexit -import errno -import json -import logging -import os -import re -import shutil -import subprocess -import sys -import tempfile -import time -import threading -from collections import namedtuple - -import kobo.conf -import kobo.log -import productmd -from kobo import shortcuts -from six.moves import configparser, shlex_quote - -import pungi.util -from pungi.compose import get_compose_dir -from pungi.linker import linker_pool -from pungi.phases.pkgset.sources.source_koji import get_koji_event_raw -from pungi.util import find_old_compose, parse_koji_event, temp_dir -from pungi.wrappers.kojiwrapper import KojiWrapper - - -Config = namedtuple( - "Config", - [ - # Path to directory with the compose - "target", - "compose_type", - "label", - # Path to the selected old compose that will be reused - "old_compose", - # Path to directory with config file copies - "config_dir", - # Which koji event to use (if any) - "event", - # Additional arguments to pungi-koji executable - "extra_args", - ], -) - -log = logging.getLogger(__name__) - - -class Status(object): - # Ready to start - READY = "READY" - # Waiting for dependencies to finish. - WAITING = "WAITING" - # Part is currently running - STARTED = "STARTED" - # A dependency failed, this one will never start. - BLOCKED = "BLOCKED" - - -class ComposePart(object): - def __init__(self, name, config, just_phase=[], skip_phase=[], dependencies=[]): - self.name = name - self.config = config - self.status = Status.WAITING if dependencies else Status.READY - self.just_phase = just_phase - self.skip_phase = skip_phase - self.blocked_on = set(dependencies) - self.depends_on = set(dependencies) - self.path = None - self.log_file = None - self.failable = False - - def __str__(self): - return self.name - - def __repr__(self): - return ( - "ComposePart({0.name!r}," - " {0.config!r}," - " {0.status!r}," - " just_phase={0.just_phase!r}," - " skip_phase={0.skip_phase!r}," - " dependencies={0.depends_on!r})" - ).format(self) - - def refresh_status(self): - """Refresh status of this part with the result of the compose. This - should only be called once the compose finished. - """ - try: - with open(os.path.join(self.path, "STATUS")) as fh: - self.status = fh.read().strip() - except IOError as exc: - log.error("Failed to update status of %s: %s", self.name, exc) - log.error("Assuming %s is DOOMED", self.name) - self.status = "DOOMED" - - def is_finished(self): - return "FINISHED" in self.status - - def unblock_on(self, finished_part): - """Update set of blockers for this part. If it's empty, mark us as ready.""" - self.blocked_on.discard(finished_part) - if self.status == Status.WAITING and not self.blocked_on: - log.debug("%s is ready to start", self) - self.status = Status.READY - - def setup_start(self, global_config, parts): - substitutions = dict( - ("part-%s" % name, p.path) for name, p in parts.items() if p.is_finished() - ) - substitutions["configdir"] = global_config.config_dir - - config = pungi.util.load_config(self.config) - - for f in config.opened_files: - # apply substitutions - fill_in_config_file(f, substitutions) - - self.status = Status.STARTED - self.path = get_compose_dir( - os.path.join(global_config.target, "parts"), - config, - compose_type=global_config.compose_type, - compose_label=global_config.label, - ) - self.log_file = os.path.join(global_config.target, "logs", "%s.log" % self.name) - log.info("Starting %s in %s", self.name, self.path) - - def get_cmd(self, global_config): - cmd = ["pungi-koji", "--config", self.config, "--compose-dir", self.path] - cmd.append("--%s" % global_config.compose_type) - if global_config.label: - cmd.extend(["--label", global_config.label]) - for phase in self.just_phase: - cmd.extend(["--just-phase", phase]) - for phase in self.skip_phase: - cmd.extend(["--skip-phase", phase]) - if global_config.old_compose: - cmd.extend( - ["--old-compose", os.path.join(global_config.old_compose, "parts")] - ) - if global_config.event: - cmd.extend(["--koji-event", str(global_config.event)]) - if global_config.extra_args: - cmd.extend(global_config.extra_args) - cmd.extend(["--no-latest-link"]) - return cmd - - @classmethod - def from_config(cls, config, section, config_dir): - part = cls( - name=section, - config=os.path.join(config_dir, config.get(section, "config")), - just_phase=_safe_get_list(config, section, "just_phase", []), - skip_phase=_safe_get_list(config, section, "skip_phase", []), - dependencies=_safe_get_list(config, section, "depends_on", []), - ) - if config.has_option(section, "failable"): - part.failable = config.getboolean(section, "failable") - return part - - -def _safe_get_list(config, section, option, default=None): - """Get a value from config parser. The result is split into a list on - commas or spaces, and `default` is returned if the key does not exist. - """ - if config.has_option(section, option): - value = config.get(section, option) - return [x.strip() for x in re.split(r"[, ]+", value) if x] - return default - - -def fill_in_config_file(fp, substs): - """Templating function. It works with Jinja2 style placeholders such as - {{foo}}. Whitespace around the key name is fine. The file is modified in place. - - :param fp string: path to the file to process - :param substs dict: a mapping for values to put into the file - """ - - def repl(match): - try: - return substs[match.group(1)] - except KeyError as exc: - raise RuntimeError( - "Unknown placeholder %s in %s" % (exc, os.path.basename(fp)) - ) - - with open(fp, "r") as f: - contents = re.sub(r"{{ *([a-zA-Z-_]+) *}}", repl, f.read()) - with open(fp, "w") as f: - f.write(contents) - - -def start_part(global_config, parts, part): - part.setup_start(global_config, parts) - fh = open(part.log_file, "w") - cmd = part.get_cmd(global_config) - log.debug("Running command %r", " ".join(shlex_quote(x) for x in cmd)) - return subprocess.Popen(cmd, stdout=fh, stderr=subprocess.STDOUT) - - -def handle_finished(global_config, linker, parts, proc, finished_part): - finished_part.refresh_status() - log.info("%s finished with status %s", finished_part, finished_part.status) - if proc.returncode == 0: - # Success, unblock other parts... - for part in parts.values(): - part.unblock_on(finished_part.name) - # ...and link the results into final destination. - copy_part(global_config, linker, finished_part) - update_metadata(global_config, finished_part) - else: - # Failure, other stuff may be blocked. - log.info("See details in %s", finished_part.log_file) - block_on(parts, finished_part.name) - - -def copy_part(global_config, linker, part): - c = productmd.Compose(part.path) - for variant in c.info.variants: - data_path = os.path.join(part.path, "compose", variant) - link = os.path.join(global_config.target, "compose", variant) - log.info("Hardlinking content %s -> %s", data_path, link) - hardlink_dir(linker, data_path, link) - - -def hardlink_dir(linker, srcdir, dstdir): - for root, dirs, files in os.walk(srcdir): - root = os.path.relpath(root, srcdir) - for f in files: - src = os.path.normpath(os.path.join(srcdir, root, f)) - dst = os.path.normpath(os.path.join(dstdir, root, f)) - linker.queue_put((src, dst)) - - -def update_metadata(global_config, part): - part_metadata_dir = os.path.join(part.path, "compose", "metadata") - final_metadata_dir = os.path.join(global_config.target, "compose", "metadata") - for f in os.listdir(part_metadata_dir): - # Load the metadata - with open(os.path.join(part_metadata_dir, f)) as fh: - part_metadata = json.load(fh) - final_metadata = os.path.join(final_metadata_dir, f) - if os.path.exists(final_metadata): - # We already have this file, will need to merge. - merge_metadata(final_metadata, part_metadata) - else: - # A new file, just copy it. - copy_metadata(global_config, final_metadata, part_metadata) - - -def copy_metadata(global_config, final_metadata, source): - """Copy file to final location, but update compose information.""" - with open( - os.path.join(global_config.target, "compose/metadata/composeinfo.json") - ) as f: - composeinfo = json.load(f) - try: - source["payload"]["compose"].update(composeinfo["payload"]["compose"]) - except KeyError: - # No [payload][compose], probably OSBS metadata - pass - with open(final_metadata, "w") as f: - json.dump(source, f, indent=2, sort_keys=True) - - -def merge_metadata(final_metadata, source): - with open(final_metadata) as f: - metadata = json.load(f) - - try: - key = { - "productmd.composeinfo": "variants", - "productmd.modules": "modules", - "productmd.images": "images", - "productmd.rpms": "rpms", - }[source["header"]["type"]] - # TODO what if multiple parts create images for the same variant - metadata["payload"][key].update(source["payload"][key]) - except KeyError: - # OSBS metadata, merge whole file - metadata.update(source) - with open(final_metadata, "w") as f: - json.dump(metadata, f, indent=2, sort_keys=True) - - -def block_on(parts, name): - """Part ``name`` failed, mark everything depending on it as blocked.""" - for part in parts.values(): - if name in part.blocked_on: - log.warning("%s is blocked now and will not run", part) - part.status = Status.BLOCKED - block_on(parts, part.name) - - -def check_finished_processes(processes): - """Walk through all active processes and check if something finished.""" - for proc in processes.keys(): - proc.poll() - if proc.returncode is not None: - yield proc, processes[proc] - - -def run_all(global_config, parts): - # Mapping subprocess.Popen -> ComposePart - processes = dict() - remaining = set(p.name for p in parts.values() if not p.is_finished()) - - with linker_pool("hardlink") as linker: - while remaining or processes: - update_status(global_config, parts) - - for proc, part in check_finished_processes(processes): - del processes[proc] - handle_finished(global_config, linker, parts, proc, part) - - # Start new available processes. - for name in list(remaining): - part = parts[name] - # Start all ready parts - if part.status == Status.READY: - remaining.remove(name) - processes[start_part(global_config, parts, part)] = part - # Remove blocked parts from todo list - elif part.status == Status.BLOCKED: - remaining.remove(part.name) - - # Wait for any child process to finish if there is any. - if processes: - pid, reason = os.wait() - for proc in processes.keys(): - # Set the return code for process that we caught by os.wait(). - # Calling poll() on it would not set the return code properly - # since the value was already consumed by os.wait(). - if proc.pid == pid: - proc.returncode = (reason >> 8) & 0xFF - - log.info("Waiting for linking to finish...") - return update_status(global_config, parts) - - -def get_target_dir(config, compose_info, label, reldir=""): - """Find directory where this compose will be. - - @param reldir: if target path in config is relative, it will be resolved - against this directory - """ - dir = os.path.realpath(os.path.join(reldir, config.get("general", "target"))) - target_dir = get_compose_dir( - dir, - compose_info, - compose_type=config.get("general", "compose_type"), - compose_label=label, - ) - return target_dir - - -def setup_logging(debug=False): - FORMAT = "%(asctime)s: %(levelname)s: %(message)s" - level = logging.DEBUG if debug else logging.INFO - kobo.log.add_stderr_logger(log, log_level=level, format=FORMAT) - log.setLevel(level) - - -def compute_status(statuses): - if any(map(lambda x: x[0] in ("STARTED", "WAITING"), statuses)): - # If there is anything still running or waiting to start, the whole is - # still running. - return "STARTED" - elif any(map(lambda x: x[0] in ("DOOMED", "BLOCKED") and not x[1], statuses)): - # If any required part is doomed or blocked, the whole is doomed - return "DOOMED" - elif all(map(lambda x: x[0] == "FINISHED", statuses)): - # If all parts are complete, the whole is complete - return "FINISHED" - else: - return "FINISHED_INCOMPLETE" - - -def update_status(global_config, parts): - log.debug("Updating status metadata") - metadata = {} - statuses = set() - for part in parts.values(): - metadata[part.name] = {"status": part.status, "path": part.path} - statuses.add((part.status, part.failable)) - metadata_path = os.path.join( - global_config.target, "compose", "metadata", "parts.json" - ) - with open(metadata_path, "w") as fh: - json.dump(metadata, fh, indent=2, sort_keys=True, separators=(",", ": ")) - - status = compute_status(statuses) - log.info("Overall status is %s", status) - with open(os.path.join(global_config.target, "STATUS"), "w") as fh: - fh.write(status) - - return status != "DOOMED" - - -def prepare_compose_dir(config, args, main_config_file, compose_info): - if not hasattr(args, "compose_path"): - # Creating a brand new compose - target_dir = get_target_dir( - config, compose_info, args.label, reldir=os.path.dirname(main_config_file) - ) - for dir in ("logs", "parts", "compose/metadata", "work/global"): - try: - os.makedirs(os.path.join(target_dir, dir)) - except OSError as exc: - if exc.errno != errno.EEXIST: - raise - with open(os.path.join(target_dir, "STATUS"), "w") as fh: - fh.write("STARTED") - # Copy initial composeinfo for new compose - shutil.copy( - os.path.join(target_dir, "work/global/composeinfo-base.json"), - os.path.join(target_dir, "compose/metadata/composeinfo.json"), - ) - else: - # Restarting a particular compose - target_dir = args.compose_path - - return target_dir - - -def load_parts_metadata(global_config): - parts_metadata = os.path.join(global_config.target, "compose/metadata/parts.json") - with open(parts_metadata) as f: - return json.load(f) - - -def setup_for_restart(global_config, parts, to_restart): - has_stuff_to_do = False - metadata = load_parts_metadata(global_config) - for key in metadata: - # Update state to match what is on disk - log.debug( - "Reusing %s (%s) from %s", - key, - metadata[key]["status"], - metadata[key]["path"], - ) - parts[key].status = metadata[key]["status"] - parts[key].path = metadata[key]["path"] - for key in to_restart: - # Set restarted parts to run again - parts[key].status = Status.WAITING - parts[key].path = None - - for key in to_restart: - # Remove blockers that are already finished - for blocker in list(parts[key].blocked_on): - if parts[blocker].is_finished(): - parts[key].blocked_on.discard(blocker) - if not parts[key].blocked_on: - log.debug("Part %s in not blocked", key) - # Nothing blocks it; let's go - parts[key].status = Status.READY - has_stuff_to_do = True - - if not has_stuff_to_do: - raise RuntimeError("All restarted parts are blocked. Nothing to do.") - - -def run_kinit(config): - if not config.getboolean("general", "kerberos"): - return - - keytab = config.get("general", "kerberos_keytab") - principal = config.get("general", "kerberos_principal") - - fd, fname = tempfile.mkstemp(prefix="krb5cc_pungi-orchestrate_") - os.close(fd) - os.environ["KRB5CCNAME"] = fname - shortcuts.run(["kinit", "-k", "-t", keytab, principal]) - log.debug("Created a kerberos ticket for %s", principal) - - atexit.register(os.remove, fname) - - -def get_compose_data(compose_path): - try: - compose = productmd.compose.Compose(compose_path) - data = { - "compose_id": compose.info.compose.id, - "compose_date": compose.info.compose.date, - "compose_type": compose.info.compose.type, - "compose_respin": str(compose.info.compose.respin), - "compose_label": compose.info.compose.label, - "release_id": compose.info.release_id, - "release_name": compose.info.release.name, - "release_short": compose.info.release.short, - "release_version": compose.info.release.version, - "release_type": compose.info.release.type, - "release_is_layered": compose.info.release.is_layered, - } - if compose.info.release.is_layered: - data.update( - { - "base_product_name": compose.info.base_product.name, - "base_product_short": compose.info.base_product.short, - "base_product_version": compose.info.base_product.version, - "base_product_type": compose.info.base_product.type, - } - ) - return data - except Exception: - return {} - - -def get_script_env(compose_path): - env = os.environ.copy() - env["COMPOSE_PATH"] = compose_path - for key, value in get_compose_data(compose_path).items(): - if isinstance(value, bool): - env[key.upper()] = "YES" if value else "" - else: - env[key.upper()] = str(value) if value else "" - return env - - -def run_scripts(prefix, compose_dir, scripts): - env = get_script_env(compose_dir) - for idx, script in enumerate(scripts.strip().splitlines()): - command = script.strip() - logfile = os.path.join(compose_dir, "logs", "%s%s.log" % (prefix, idx)) - log.debug("Running command: %r", command) - log.debug("See output in %s", logfile) - shortcuts.run(command, env=env, logfile=logfile) - - -def try_translate_path(parts, path): - translation = [] - for part in parts.values(): - conf = pungi.util.load_config(part.config) - translation.extend(conf.get("translate_paths", [])) - return pungi.util.translate_path_raw(translation, path) - - -def send_notification(compose_dir, command, parts): - if not command: - return - from pungi.notifier import PungiNotifier - - data = get_compose_data(compose_dir) - data["location"] = try_translate_path(parts, compose_dir) - notifier = PungiNotifier([command]) - with open(os.path.join(compose_dir, "STATUS")) as f: - status = f.read().strip() - notifier.send("status-change", workdir=compose_dir, status=status, **data) - - -def setup_progress_monitor(global_config, parts): - """Update configuration so that each part send notifications about its - progress to the orchestrator. - - There is a file to which the notification is written. The orchestrator is - reading it and mapping the entries to particular parts. The path to this - file is stored in an environment variable. - """ - tmp_file = tempfile.NamedTemporaryFile(prefix="pungi-progress-monitor_") - os.environ["_PUNGI_ORCHESTRATOR_PROGRESS_MONITOR"] = tmp_file.name - atexit.register(os.remove, tmp_file.name) - - global_config.extra_args.append( - "--notification-script=pungi-notification-report-progress" - ) - - def reader(): - while True: - line = tmp_file.readline() - if not line: - time.sleep(0.1) - continue - path, msg = line.split(":", 1) - for part in parts: - if parts[part].path == os.path.dirname(path): - log.debug("%s: %s", part, msg.strip()) - break - - monitor = threading.Thread(target=reader) - monitor.daemon = True - monitor.start() - - -def run(work_dir, main_config_file, args): - config_dir = os.path.join(work_dir, "config") - shutil.copytree(os.path.dirname(main_config_file), config_dir) - - # Read main config - parser = configparser.RawConfigParser( - defaults={ - "kerberos": "false", - "pre_compose_script": "", - "post_compose_script": "", - "notification_script": "", - } - ) - parser.read(main_config_file) - - # Create kerberos ticket - run_kinit(parser) - - compose_info = dict(parser.items("general")) - compose_type = parser.get("general", "compose_type") - - target_dir = prepare_compose_dir(parser, args, main_config_file, compose_info) - kobo.log.add_file_logger(log, os.path.join(target_dir, "logs", "orchestrator.log")) - log.info("Composing %s", target_dir) - - run_scripts("pre_compose_", target_dir, parser.get("general", "pre_compose_script")) - - old_compose = find_old_compose( - os.path.dirname(target_dir), - compose_info["release_short"], - compose_info["release_version"], - "", - ) - if old_compose: - log.info("Reusing old compose %s", old_compose) - - global_config = Config( - target=target_dir, - compose_type=compose_type, - label=args.label, - old_compose=old_compose, - config_dir=os.path.dirname(main_config_file), - event=args.koji_event, - extra_args=_safe_get_list(parser, "general", "extra_args"), - ) - - if not global_config.event and parser.has_option("general", "koji_profile"): - koji_wrapper = KojiWrapper(parser.get("general", "koji_profile")) - event_file = os.path.join(global_config.target, "work/global/koji-event") - result = get_koji_event_raw(koji_wrapper, None, event_file) - global_config = global_config._replace(event=result["id"]) - - parts = {} - for section in parser.sections(): - if section == "general": - continue - parts[section] = ComposePart.from_config(parser, section, config_dir) - - if hasattr(args, "part"): - setup_for_restart(global_config, parts, args.part) - - setup_progress_monitor(global_config, parts) - - send_notification(target_dir, parser.get("general", "notification_script"), parts) - - retcode = run_all(global_config, parts) - - if retcode: - # Only run the script if we are not doomed. - run_scripts( - "post_compose_", target_dir, parser.get("general", "post_compose_script") - ) - - send_notification(target_dir, parser.get("general", "notification_script"), parts) - - return retcode - - -def parse_args(argv): - parser = argparse.ArgumentParser() - parser.add_argument("--debug", action="store_true") - parser.add_argument("--koji-event", metavar="ID", type=parse_koji_event) - subparsers = parser.add_subparsers() - start = subparsers.add_parser("start") - start.add_argument("config", metavar="CONFIG") - start.add_argument("--label") - - restart = subparsers.add_parser("restart") - restart.add_argument("config", metavar="CONFIG") - restart.add_argument("compose_path", metavar="COMPOSE_PATH") - restart.add_argument( - "part", metavar="PART", nargs="*", help="which parts to restart" - ) - restart.add_argument("--label") - - return parser.parse_args(argv) - - -def main(argv=None): - args = parse_args(argv) - setup_logging(args.debug) - - main_config_file = os.path.abspath(args.config) - - with temp_dir() as work_dir: - try: - if not run(work_dir, main_config_file, args): - sys.exit(1) - except Exception: - log.exception("Unhandled exception!") - sys.exit(1) diff --git a/setup.py b/setup.py index cbb67d22..6ab2c8bd 100755 --- a/setup.py +++ b/setup.py @@ -36,7 +36,6 @@ setup( "pungi-patch-iso = pungi.scripts.patch_iso:cli_main", "pungi-make-ostree = pungi.ostree:main", "pungi-notification-report-progress = pungi.scripts.report_progress:main", - "pungi-orchestrate = pungi_utils.orchestrator:main", "pungi-wait-for-signed-ostree-handler = pungi.scripts.wait_for_signed_ostree_handler:main", # noqa: E501 "pungi-koji = pungi.scripts.pungi_koji:cli_main", "pungi-gather = pungi.scripts.pungi_gather:cli_main", diff --git a/tests/test_orchestrator.py b/tests/test_orchestrator.py deleted file mode 100644 index b26fcc67..00000000 --- a/tests/test_orchestrator.py +++ /dev/null @@ -1,934 +0,0 @@ -# -*- coding: utf-8 -*- - -import itertools -import json -from functools import wraps -import operator -import os -import shutil -import subprocess -from textwrap import dedent - -import mock -import six -from six.moves import configparser - -from parameterized import parameterized - -from tests.helpers import BaseTestCase, PungiTestCase, touch, FIXTURE_DIR -from pungi_utils import orchestrator as o - - -class TestConfigSubstitute(PungiTestCase): - def setUp(self): - super(TestConfigSubstitute, self).setUp() - self.fp = os.path.join(self.topdir, "config.conf") - - @parameterized.expand( - [ - ("hello = 'world'", "hello = 'world'"), - ("hello = '{{foo}}'", "hello = 'bar'"), - ("hello = '{{ foo}}'", "hello = 'bar'"), - ("hello = '{{foo }}'", "hello = 'bar'"), - ] - ) - def test_substitutions(self, initial, expected): - touch(self.fp, initial) - o.fill_in_config_file(self.fp, {"foo": "bar"}) - with open(self.fp) as f: - self.assertEqual(expected, f.read()) - - def test_missing_key(self): - touch(self.fp, "hello = '{{unknown}}'") - with self.assertRaises(RuntimeError) as ctx: - o.fill_in_config_file(self.fp, {}) - self.assertEqual( - "Unknown placeholder 'unknown' in config.conf", str(ctx.exception) - ) - - -class TestSafeGetList(BaseTestCase): - @parameterized.expand( - [ - ("", []), - ("foo", ["foo"]), - ("foo,bar", ["foo", "bar"]), - ("foo bar", ["foo", "bar"]), - ] - ) - def test_success(self, value, expected): - cf = configparser.RawConfigParser() - cf.add_section("general") - cf.set("general", "key", value) - self.assertEqual(o._safe_get_list(cf, "general", "key"), expected) - - def test_default(self): - cf = configparser.RawConfigParser() - cf.add_section("general") - self.assertEqual(o._safe_get_list(cf, "general", "missing", "hello"), "hello") - - -class TestComposePart(PungiTestCase): - def test_from_minimal_config(self): - cf = configparser.RawConfigParser() - cf.add_section("test") - cf.set("test", "config", "my.conf") - - part = o.ComposePart.from_config(cf, "test", "/tmp/config") - deps = "set()" if six.PY3 else "set([])" - self.assertEqual(str(part), "test") - self.assertEqual( - repr(part), - "ComposePart('test', '/tmp/config/my.conf', 'READY', " - "just_phase=[], skip_phase=[], dependencies=%s)" % deps, - ) - self.assertFalse(part.failable) - - def test_from_full_config(self): - cf = configparser.RawConfigParser() - cf.add_section("test") - cf.set("test", "config", "my.conf") - cf.set("test", "depends_on", "base") - cf.set("test", "skip_phase", "skip") - cf.set("test", "just_phase", "just") - cf.set("test", "failable", "yes") - - part = o.ComposePart.from_config(cf, "test", "/tmp/config") - deps = "{'base'}" if six.PY3 else "set(['base'])" - self.assertEqual( - repr(part), - "ComposePart('test', '/tmp/config/my.conf', 'WAITING', " - "just_phase=['just'], skip_phase=['skip'], dependencies=%s)" % deps, - ) - self.assertTrue(part.failable) - - def test_get_cmd(self): - conf = o.Config( - "/tgt/", "production", "RC-1.0", "/old", "/cfg", 1234, ["--quiet"] - ) - part = o.ComposePart( - "test", "/tmp/my.conf", just_phase=["just"], skip_phase=["skip"] - ) - part.path = "/compose" - - self.assertEqual( - part.get_cmd(conf), - [ - "pungi-koji", - "--config", - "/tmp/my.conf", - "--compose-dir", - "/compose", - "--production", - "--label", - "RC-1.0", - "--just-phase", - "just", - "--skip-phase", - "skip", - "--old-compose", - "/old/parts", - "--koji-event", - "1234", - "--quiet", - "--no-latest-link", - ], - ) - - def test_refresh_status(self): - part = o.ComposePart("test", "/tmp/my.conf") - part.path = os.path.join(self.topdir) - touch(os.path.join(self.topdir, "STATUS"), "FINISHED") - part.refresh_status() - self.assertEqual(part.status, "FINISHED") - - def test_refresh_status_missing_file(self): - part = o.ComposePart("test", "/tmp/my.conf") - part.path = os.path.join(self.topdir) - part.refresh_status() - self.assertEqual(part.status, "DOOMED") - - @parameterized.expand(["FINISHED", "FINISHED_INCOMPLETE"]) - def test_is_finished(self, status): - part = o.ComposePart("test", "/tmp/my.conf") - part.status = status - self.assertTrue(part.is_finished()) - - @parameterized.expand(["STARTED", "WAITING"]) - def test_is_not_finished(self, status): - part = o.ComposePart("test", "/tmp/my.conf") - part.status = status - self.assertFalse(part.is_finished()) - - @mock.patch("pungi_utils.orchestrator.fill_in_config_file") - @mock.patch("pungi_utils.orchestrator.get_compose_dir") - @mock.patch("kobo.conf.PyConfigParser") - def test_setup_start(self, Conf, gcd, ficf): - def pth(*path): - return os.path.join(self.topdir, *path) - - conf = o.Config( - pth("tgt"), "production", "RC-1.0", "/old", pth("cfg"), None, None - ) - part = o.ComposePart("test", "/tmp/my.conf") - parts = {"base": mock.Mock(path="/base", is_finished=lambda: True)} - Conf.return_value.opened_files = ["foo.conf"] - - part.setup_start(conf, parts) - - self.assertEqual(part.status, "STARTED") - self.assertEqual(part.path, gcd.return_value) - self.assertEqual(part.log_file, pth("tgt", "logs", "test.log")) - self.assertEqual( - ficf.call_args_list, - [mock.call("foo.conf", {"part-base": "/base", "configdir": pth("cfg")})], - ) - self.assertEqual( - gcd.call_args_list, - [ - mock.call( - pth("tgt/parts"), - Conf.return_value, - compose_type="production", - compose_label="RC-1.0", - ) - ], - ) - - @parameterized.expand( - [ - # Nothing blocking, no change - ([], [], o.Status.READY), - # Remove last blocker and switch to READY - (["finished"], [], o.Status.READY), - # Blocker remaining, stay in WAITING - (["finished", "block"], ["block"], o.Status.WAITING), - ] - ) - def test_unblock_on(self, deps, blockers, status): - part = o.ComposePart("test", "/tmp/my.conf", dependencies=deps) - part.unblock_on("finished") - six.assertCountEqual(self, part.blocked_on, blockers) - self.assertEqual(part.status, status) - - -class TestStartPart(PungiTestCase): - @mock.patch("subprocess.Popen") - def test_start(self, Popen): - part = mock.Mock(log_file=os.path.join(self.topdir, "log")) - config = mock.Mock() - parts = mock.Mock() - cmd = ["pungi-koji", "..."] - - part.get_cmd.return_value = cmd - - proc = o.start_part(config, parts, part) - - self.assertEqual( - part.mock_calls, - [mock.call.setup_start(config, parts), mock.call.get_cmd(config)], - ) - self.assertEqual(proc, Popen.return_value) - self.assertEqual( - Popen.call_args_list, - [mock.call(cmd, stdout=mock.ANY, stderr=subprocess.STDOUT)], - ) - - -class TestHandleFinished(BaseTestCase): - def setUp(self): - self.config = mock.Mock() - self.linker = mock.Mock() - self.parts = {"a": mock.Mock(), "b": mock.Mock()} - - @mock.patch("pungi_utils.orchestrator.update_metadata") - @mock.patch("pungi_utils.orchestrator.copy_part") - def test_handle_success(self, cp, um): - proc = mock.Mock(returncode=0) - o.handle_finished(self.config, self.linker, self.parts, proc, self.parts["a"]) - - self.assertEqual( - self.parts["a"].mock_calls, - [mock.call.refresh_status(), mock.call.unblock_on(self.parts["a"].name)], - ) - self.assertEqual( - self.parts["b"].mock_calls, [mock.call.unblock_on(self.parts["a"].name)] - ) - self.assertEqual( - cp.call_args_list, [mock.call(self.config, self.linker, self.parts["a"])] - ) - self.assertEqual(um.call_args_list, [mock.call(self.config, self.parts["a"])]) - - @mock.patch("pungi_utils.orchestrator.block_on") - def test_handle_failure(self, bo): - proc = mock.Mock(returncode=1) - o.handle_finished(self.config, self.linker, self.parts, proc, self.parts["a"]) - - self.assertEqual(self.parts["a"].mock_calls, [mock.call.refresh_status()]) - - self.assertEqual( - bo.call_args_list, [mock.call(self.parts, self.parts["a"].name)] - ) - - -class TestBlockOn(BaseTestCase): - def test_single(self): - parts = {"b": o.ComposePart("b", "b.conf", dependencies=["a"])} - - o.block_on(parts, "a") - - self.assertEqual(parts["b"].status, o.Status.BLOCKED) - - def test_chain(self): - parts = { - "b": o.ComposePart("b", "b.conf", dependencies=["a"]), - "c": o.ComposePart("c", "c.conf", dependencies=["b"]), - "d": o.ComposePart("d", "d.conf", dependencies=["c"]), - } - - o.block_on(parts, "a") - - self.assertEqual(parts["b"].status, o.Status.BLOCKED) - self.assertEqual(parts["c"].status, o.Status.BLOCKED) - self.assertEqual(parts["d"].status, o.Status.BLOCKED) - - -class TestUpdateMetadata(PungiTestCase): - def assertEqualJSON(self, f1, f2): - with open(f1) as f: - actual = json.load(f) - with open(f2) as f: - expected = json.load(f) - self.assertEqual(actual, expected) - - def assertEqualMetadata(self, expected): - expected_dir = os.path.join(FIXTURE_DIR, expected, "compose/metadata") - for f in os.listdir(expected_dir): - self.assertEqualJSON( - os.path.join(self.tgt, "compose/metadata", f), - os.path.join(expected_dir, f), - ) - - @parameterized.expand(["empty-metadata", "basic-metadata"]) - def test_merge_into_empty(self, fixture): - self.tgt = os.path.join(self.topdir, "target") - - conf = o.Config(self.tgt, "production", None, None, None, None, []) - part = o.ComposePart("test", "/tmp/my.conf") - part.path = os.path.join(FIXTURE_DIR, "DP-1.0-20181001.n.0") - - shutil.copytree(os.path.join(FIXTURE_DIR, fixture), self.tgt) - - o.update_metadata(conf, part) - - self.assertEqualMetadata(fixture + "-merged") - - -class TestCopyPart(PungiTestCase): - @mock.patch("pungi_utils.orchestrator.hardlink_dir") - def test_copy(self, hd): - self.tgt = os.path.join(self.topdir, "target") - conf = o.Config(self.tgt, "production", None, None, None, None, []) - linker = mock.Mock() - part = o.ComposePart("test", "/tmp/my.conf") - part.path = os.path.join(FIXTURE_DIR, "DP-1.0-20161013.t.4") - - o.copy_part(conf, linker, part) - - six.assertCountEqual( - self, - hd.call_args_list, - [ - mock.call( - linker, - os.path.join(part.path, "compose", variant), - os.path.join(self.tgt, "compose", variant), - ) - for variant in ["Client", "Server"] - ], - ) - - -class TestHardlinkDir(PungiTestCase): - def test_hardlinking(self): - linker = mock.Mock() - src = os.path.join(self.topdir, "src") - dst = os.path.join(self.topdir, "dst") - files = ["file.txt", "nested/deep/another.txt"] - - for f in files: - touch(os.path.join(src, f)) - - o.hardlink_dir(linker, src, dst) - - six.assertCountEqual( - self, - linker.queue_put.call_args_list, - [mock.call((os.path.join(src, f), os.path.join(dst, f))) for f in files], - ) - - -class TestCheckFinishedProcesses(BaseTestCase): - def test_nothing_finished(self): - k1 = mock.Mock(returncode=None) - v1 = mock.Mock() - processes = {k1: v1} - - six.assertCountEqual(self, o.check_finished_processes(processes), []) - - def test_yields_finished(self): - k1 = mock.Mock(returncode=None) - v1 = mock.Mock() - k2 = mock.Mock(returncode=0) - v2 = mock.Mock() - processes = {k1: v1, k2: v2} - - six.assertCountEqual(self, o.check_finished_processes(processes), [(k2, v2)]) - - def test_yields_failed(self): - k1 = mock.Mock(returncode=1) - v1 = mock.Mock() - processes = {k1: v1} - - six.assertCountEqual(self, o.check_finished_processes(processes), [(k1, v1)]) - - -class _Part(object): - def __init__(self, name, parent=None, fails=False, status=None): - self.name = name - self.finished = False - self.status = o.Status.WAITING if parent else o.Status.READY - if status: - self.status = status - self.proc = mock.Mock(name="proc_%s" % name, pid=hash(self)) - self.parent = parent - self.fails = fails - self.failable = False - self.path = "/path/to/%s" % name - self.blocked_on = set([parent]) if parent else set() - - def is_finished(self): - return self.finished or self.status == "FINISHED" - - def __repr__(self): - return "<_Part(%r, parent=%r)>" % (self.name, self.parent) - - -def with_mocks(parts, finish_order, wait_results): - """Setup all mocks and create dict with the parts. - :param finish_order: nested list: first element contains parts that finish - in first iteration, etc. - :param wait_results: list of names of processes that are returned by wait in each - iteration - """ - - def decorator(func): - @wraps(func) - def worker(self, lp, update_status, cfp, hf, sp, wait): - self.parts = dict((p.name, p) for p in parts) - self.linker = lp.return_value.__enter__.return_value - - update_status.side_effect = self.mock_update - hf.side_effect = self.mock_finish - sp.side_effect = self.mock_start - - finish = [[]] - for grp in finish_order: - finish.append([(self.parts[p].proc, self.parts[p]) for p in grp]) - - cfp.side_effect = finish - wait.side_effect = [(self.parts[p].proc.pid, 0) for p in wait_results] - - func(self) - - self.assertEqual(lp.call_args_list, [mock.call("hardlink")]) - - return worker - - return decorator - - -@mock.patch("os.wait") -@mock.patch("pungi_utils.orchestrator.start_part") -@mock.patch("pungi_utils.orchestrator.handle_finished") -@mock.patch("pungi_utils.orchestrator.check_finished_processes") -@mock.patch("pungi_utils.orchestrator.update_status") -@mock.patch("pungi_utils.orchestrator.linker_pool") -class TestRunAll(BaseTestCase): - def setUp(self): - self.maxDiff = None - self.conf = mock.Mock(name="global_config") - self.calls = [] - - def mock_update(self, global_config, parts): - self.assertEqual(global_config, self.conf) - self.assertEqual(parts, self.parts) - self.calls.append("update_status") - - def mock_start(self, global_config, parts, part): - self.assertEqual(global_config, self.conf) - self.assertEqual(parts, self.parts) - self.calls.append(("start_part", part.name)) - part.status = o.Status.STARTED - return part.proc - - @property - def sorted_calls(self): - """Sort the consecutive calls of the same function based on the argument.""" - - def key(val): - return val[0] if isinstance(val, tuple) else val - - return list( - itertools.chain.from_iterable( - sorted(grp, key=operator.itemgetter(1)) - for _, grp in itertools.groupby(self.calls, key) - ) - ) - - def mock_finish(self, global_config, linker, parts, proc, part): - self.assertEqual(global_config, self.conf) - self.assertEqual(linker, self.linker) - self.assertEqual(parts, self.parts) - self.calls.append(("handle_finished", part.name)) - for child in parts.values(): - if child.parent == part.name: - child.status = o.Status.BLOCKED if part.fails else o.Status.READY - part.status = "DOOMED" if part.fails else "FINISHED" - - @with_mocks( - [_Part("fst"), _Part("snd", parent="fst")], [["fst"], ["snd"]], ["fst", "snd"] - ) - def test_sequential(self): - o.run_all(self.conf, self.parts) - - self.assertEqual( - self.sorted_calls, - [ - # First iteration starts fst - "update_status", - ("start_part", "fst"), - # Second iteration handles finish of fst and starts snd - "update_status", - ("handle_finished", "fst"), - ("start_part", "snd"), - # Third iteration handles finish of snd - "update_status", - ("handle_finished", "snd"), - # Final update of status - "update_status", - ], - ) - - @with_mocks([_Part("fst"), _Part("snd")], [["fst", "snd"]], ["fst"]) - def test_parallel(self): - o.run_all(self.conf, self.parts) - - self.assertEqual( - self.sorted_calls, - [ - # First iteration starts both fst and snd - "update_status", - ("start_part", "fst"), - ("start_part", "snd"), - # Second iteration handles finish of both of them - "update_status", - ("handle_finished", "fst"), - ("handle_finished", "snd"), - # Final update of status - "update_status", - ], - ) - - @with_mocks( - [_Part("1"), _Part("2", parent="1"), _Part("3", parent="1")], - [["1"], ["2", "3"]], - ["1", "2"], - ) - def test_waits_for_dep_then_parallel_with_simultaneous_end(self): - o.run_all(self.conf, self.parts) - - self.assertEqual( - self.sorted_calls, - [ - # First iteration starts first part - "update_status", - ("start_part", "1"), - # Second iteration starts 2 and 3 - "update_status", - ("handle_finished", "1"), - ("start_part", "2"), - ("start_part", "3"), - # Both 2 and 3 end in third iteration - "update_status", - ("handle_finished", "2"), - ("handle_finished", "3"), - # Final update of status - "update_status", - ], - ) - - @with_mocks( - [_Part("1"), _Part("2", parent="1"), _Part("3", parent="1")], - [["1"], ["3"], ["2"]], - ["1", "3", "2"], - ) - def test_waits_for_dep_then_parallel_with_different_end_times(self): - o.run_all(self.conf, self.parts) - - self.assertEqual( - self.sorted_calls, - [ - # First iteration starts first part - "update_status", - ("start_part", "1"), - # Second iteration starts 2 and 3 - "update_status", - ("handle_finished", "1"), - ("start_part", "2"), - ("start_part", "3"), - # Third iteration sees 3 finish - "update_status", - ("handle_finished", "3"), - # Fourth iteration, 2 finishes - "update_status", - ("handle_finished", "2"), - # Final update of status - "update_status", - ], - ) - - @with_mocks( - [_Part("fst", fails=True), _Part("snd", parent="fst")], [["fst"]], ["fst"] - ) - def test_blocked(self): - o.run_all(self.conf, self.parts) - - self.assertEqual( - self.sorted_calls, - [ - # First iteration starts first part - "update_status", - ("start_part", "fst"), - # Second iteration handles fail of first part - "update_status", - ("handle_finished", "fst"), - # Final update of status - "update_status", - ], - ) - - -@mock.patch("pungi_utils.orchestrator.get_compose_dir") -class TestGetTargetDir(BaseTestCase): - def test_with_absolute_path(self, gcd): - config = {"target": "/tgt", "compose_type": "nightly"} - cfg = mock.Mock() - cfg.get.side_effect = lambda _, k: config[k] - ci = mock.Mock() - res = o.get_target_dir(cfg, ci, None, reldir="/checkout") - self.assertEqual(res, gcd.return_value) - self.assertEqual( - gcd.call_args_list, - [mock.call("/tgt", ci, compose_type="nightly", compose_label=None)], - ) - - def test_with_relative_path(self, gcd): - config = {"target": "tgt", "compose_type": "nightly"} - cfg = mock.Mock() - cfg.get.side_effect = lambda _, k: config[k] - ci = mock.Mock() - res = o.get_target_dir(cfg, ci, None, reldir="/checkout") - self.assertEqual(res, gcd.return_value) - self.assertEqual( - gcd.call_args_list, - [ - mock.call( - "/checkout/tgt", ci, compose_type="nightly", compose_label=None - ) - ], - ) - - -class TestComputeStatus(BaseTestCase): - @parameterized.expand( - [ - ([("FINISHED", False)], "FINISHED"), - ([("FINISHED", False), ("STARTED", False)], "STARTED"), - ([("FINISHED", False), ("STARTED", False), ("WAITING", False)], "STARTED"), - ([("FINISHED", False), ("DOOMED", False)], "DOOMED"), - ( - [("FINISHED", False), ("BLOCKED", True), ("DOOMED", True)], - "FINISHED_INCOMPLETE", - ), - ([("FINISHED", False), ("BLOCKED", False), ("DOOMED", True)], "DOOMED"), - ([("FINISHED", False), ("DOOMED", True)], "FINISHED_INCOMPLETE"), - ([("FINISHED", False), ("STARTED", False), ("DOOMED", False)], "STARTED"), - ] - ) - def test_cases(self, statuses, expected): - self.assertEqual(o.compute_status(statuses), expected) - - -class TestUpdateStatus(PungiTestCase): - def test_updating(self): - os.makedirs(os.path.join(self.topdir, "compose/metadata")) - conf = o.Config( - self.topdir, "production", "RC-1.0", "/old", "/cfg", 1234, ["--quiet"] - ) - o.update_status( - conf, - {"1": _Part("1", status="FINISHED"), "2": _Part("2", status="STARTED")}, - ) - self.assertFileContent(os.path.join(self.topdir, "STATUS"), "STARTED") - self.assertFileContent( - os.path.join(self.topdir, "compose/metadata/parts.json"), - dedent( - """\ - { - "1": { - "path": "/path/to/1", - "status": "FINISHED" - }, - "2": { - "path": "/path/to/2", - "status": "STARTED" - } - } - """ - ), - ) - - -@mock.patch("pungi_utils.orchestrator.get_target_dir") -class TestPrepareComposeDir(PungiTestCase): - def setUp(self): - super(TestPrepareComposeDir, self).setUp() - self.conf = mock.Mock(name="config") - self.main_config = "/some/config" - self.compose_info = mock.Mock(name="compose_info") - - def test_new_compose(self, gtd): - def mock_get_target(conf, compose_info, label, reldir): - self.assertEqual(conf, self.conf) - self.assertEqual(compose_info, self.compose_info) - self.assertEqual(label, args.label) - self.assertEqual(reldir, "/some") - touch(os.path.join(self.topdir, "work/global/composeinfo-base.json"), "WOO") - return self.topdir - - gtd.side_effect = mock_get_target - args = mock.Mock(name="args", spec=["label"]) - retval = o.prepare_compose_dir( - self.conf, args, self.main_config, self.compose_info - ) - self.assertEqual(retval, self.topdir) - self.assertFileContent( - os.path.join(self.topdir, "compose/metadata/composeinfo.json"), "WOO" - ) - self.assertTrue(os.path.isdir(os.path.join(self.topdir, "logs"))) - self.assertTrue(os.path.isdir(os.path.join(self.topdir, "parts"))) - self.assertTrue(os.path.isdir(os.path.join(self.topdir, "work/global"))) - self.assertFileContent(os.path.join(self.topdir, "STATUS"), "STARTED") - - def test_restarting_compose(self, gtd): - args = mock.Mock(name="args", spec=["label", "compose_path"]) - retval = o.prepare_compose_dir( - self.conf, args, self.main_config, self.compose_info - ) - self.assertEqual(gtd.call_args_list, []) - self.assertEqual(retval, args.compose_path) - - -class TestLoadPartsMetadata(PungiTestCase): - def test_loading(self): - touch( - os.path.join(self.topdir, "compose/metadata/parts.json"), '{"foo": "bar"}' - ) - conf = mock.Mock(target=self.topdir) - - self.assertEqual(o.load_parts_metadata(conf), {"foo": "bar"}) - - -@mock.patch("pungi_utils.orchestrator.load_parts_metadata") -class TestSetupForRestart(BaseTestCase): - def setUp(self): - self.conf = mock.Mock(name="global_config") - - def test_restart_ok(self, lpm): - lpm.return_value = { - "p1": {"status": "FINISHED", "path": "/p1"}, - "p2": {"status": "DOOMED", "path": "/p2"}, - } - parts = {"p1": _Part("p1"), "p2": _Part("p2", parent="p1")} - - o.setup_for_restart(self.conf, parts, ["p2"]) - - self.assertEqual(parts["p1"].status, "FINISHED") - self.assertEqual(parts["p1"].path, "/p1") - self.assertEqual(parts["p2"].status, "READY") - self.assertEqual(parts["p2"].path, None) - - def test_restart_one_blocked_one_ok(self, lpm): - lpm.return_value = { - "p1": {"status": "DOOMED", "path": "/p1"}, - "p2": {"status": "DOOMED", "path": "/p2"}, - "p3": {"status": "WAITING", "path": None}, - } - parts = { - "p1": _Part("p1"), - "p2": _Part("p2", parent="p1"), - "p3": _Part("p3", parent="p2"), - } - - o.setup_for_restart(self.conf, parts, ["p1", "p3"]) - - self.assertEqual(parts["p1"].status, "READY") - self.assertEqual(parts["p1"].path, None) - self.assertEqual(parts["p2"].status, "DOOMED") - self.assertEqual(parts["p2"].path, "/p2") - self.assertEqual(parts["p3"].status, "WAITING") - self.assertEqual(parts["p3"].path, None) - - def test_restart_all_blocked(self, lpm): - lpm.return_value = { - "p1": {"status": "DOOMED", "path": "/p1"}, - "p2": {"status": "STARTED", "path": "/p2"}, - } - parts = {"p1": _Part("p1"), "p2": _Part("p2", parent="p1")} - - with self.assertRaises(RuntimeError): - o.setup_for_restart(self.conf, parts, ["p2"]) - - self.assertEqual(parts["p1"].status, "DOOMED") - self.assertEqual(parts["p1"].path, "/p1") - self.assertEqual(parts["p2"].status, "WAITING") - self.assertEqual(parts["p2"].path, None) - - -@mock.patch("atexit.register") -@mock.patch("kobo.shortcuts.run") -class TestRunKinit(BaseTestCase): - def test_without_config(self, run, register): - conf = mock.Mock() - conf.getboolean.return_value = False - - o.run_kinit(conf) - - self.assertEqual(run.call_args_list, []) - self.assertEqual(register.call_args_list, []) - - @mock.patch.dict("os.environ") - def test_with_config(self, run, register): - conf = mock.Mock() - conf.getboolean.return_value = True - conf.get.side_effect = lambda section, option: option - - o.run_kinit(conf) - - self.assertEqual( - run.call_args_list, - [mock.call(["kinit", "-k", "-t", "kerberos_keytab", "kerberos_principal"])], - ) - self.assertEqual( - register.call_args_list, [mock.call(os.remove, os.environ["KRB5CCNAME"])] - ) - - -@mock.patch.dict("os.environ", {}, clear=True) -class TestGetScriptEnv(BaseTestCase): - def test_without_metadata(self): - env = o.get_script_env("/foobar") - self.assertEqual(env, {"COMPOSE_PATH": "/foobar"}) - - def test_with_metadata(self): - compose_dir = os.path.join(FIXTURE_DIR, "DP-1.0-20161013.t.4") - env = o.get_script_env(compose_dir) - self.maxDiff = None - self.assertEqual( - env, - { - "COMPOSE_PATH": compose_dir, - "COMPOSE_ID": "DP-1.0-20161013.t.4", - "COMPOSE_DATE": "20161013", - "COMPOSE_TYPE": "test", - "COMPOSE_RESPIN": "4", - "COMPOSE_LABEL": "", - "RELEASE_ID": "DP-1.0", - "RELEASE_NAME": "Dummy Product", - "RELEASE_SHORT": "DP", - "RELEASE_VERSION": "1.0", - "RELEASE_TYPE": "ga", - "RELEASE_IS_LAYERED": "", - }, - ) - - -class TestRunScripts(BaseTestCase): - @mock.patch("pungi_utils.orchestrator.get_script_env") - @mock.patch("kobo.shortcuts.run") - def test_run_scripts(self, run, get_env): - commands = """ - date - env - """ - - o.run_scripts("pref_", "/tmp/compose", commands) - - self.assertEqual( - run.call_args_list, - [ - mock.call( - "date", - logfile="/tmp/compose/logs/pref_0.log", - env=get_env.return_value, - ), - mock.call( - "env", - logfile="/tmp/compose/logs/pref_1.log", - env=get_env.return_value, - ), - ], - ) - - -@mock.patch("pungi.notifier.PungiNotifier") -class TestSendNotification(BaseTestCase): - def test_no_command(self, notif): - o.send_notification("/foobar", None, None) - self.assertEqual(notif.mock_calls, []) - - @mock.patch("pungi.util.load_config") - def test_with_command_and_translate(self, load_config, notif): - compose_dir = os.path.join(FIXTURE_DIR, "DP-1.0-20161013.t.4") - load_config.return_value = { - "translate_paths": [(os.path.dirname(compose_dir), "http://example.com")], - } - parts = {"foo": mock.Mock()} - - o.send_notification(compose_dir, "handler", parts) - - self.assertEqual(len(notif.mock_calls), 2) - self.assertEqual(notif.mock_calls[0], mock.call(["handler"])) - _, args, kwargs = notif.mock_calls[1] - self.assertEqual(args, ("status-change",)) - self.assertEqual( - kwargs, - { - "status": "FINISHED", - "workdir": compose_dir, - "location": "http://example.com/DP-1.0-20161013.t.4", - "compose_id": "DP-1.0-20161013.t.4", - "compose_date": "20161013", - "compose_type": "test", - "compose_respin": "4", - "compose_label": None, - "release_id": "DP-1.0", - "release_name": "Dummy Product", - "release_short": "DP", - "release_version": "1.0", - "release_type": "ga", - "release_is_layered": False, - }, - ) - self.assertEqual(load_config.call_args_list, [mock.call(parts["foo"].config)])