diff --git a/Makefile b/Makefile
index 55d2aab2..90e8a087 100644
--- a/Makefile
+++ b/Makefile
@@ -104,5 +104,8 @@ test-data:
test-compose:
cd tests && ./test_compose.sh
+test-multi-compose:
+ PYTHONPATH=$$(pwd) PATH=$$(pwd)/bin:$$PATH pungi-orchestrate --debug start tests/data/multi-compose.conf
+
doc:
cd doc; make html
diff --git a/bin/pungi-orchestrate b/bin/pungi-orchestrate
new file mode 100755
index 00000000..0cb4348d
--- /dev/null
+++ b/bin/pungi-orchestrate
@@ -0,0 +1,16 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import os
+import sys
+
+here = sys.path[0]
+if here != '/usr/bin':
+ # Git checkout
+ sys.path[0] = os.path.dirname(here)
+
+from pungi_utils import orchestrator
+
+
+if __name__ == '__main__':
+ orchestrator.main()
diff --git a/doc/contributing.rst b/doc/contributing.rst
index 3aff0e87..02ae13ae 100644
--- a/doc/contributing.rst
+++ b/doc/contributing.rst
@@ -46,6 +46,7 @@ For running unit tests, these packages are recommended as well:
* python-nose-cov
* python-unittest2
* rpmdevtools
+ * python-parameterized
While being difficult, it is possible to work on *Pungi* using *virtualenv*.
Install *python-virtualenvwrapper* (after installation you have to add the command
@@ -60,7 +61,7 @@ packages above as they are used by calling an executable. ::
$ for pkg in _deltarpm krbV _selinux deltarpm sqlitecachec _sqlitecache; do ln -vs "$(deactivate && python -c 'import os, '$pkg'; print('$pkg'.__file__)')" "$(virtualenvwrapper_get_site_packages_dir)"; done
$ pip install -U pip
$ PYCURL_SSL_LIBRARY=nss pip install pycurl --no-binary :all:
- $ pip install beanbag jsonschema 'kobo>=0.6.0' lockfile lxml mock nose nose-cov productmd pyopenssl python-multilib requests requests-kerberos setuptools sphinx ordered_set koji PyYAML dogpile.cache
+ $ pip install beanbag jsonschema 'kobo>=0.6.0' lockfile lxml mock nose nose-cov productmd pyopenssl python-multilib requests requests-kerberos setuptools sphinx ordered_set koji PyYAML dogpile.cache parameterized
Now you should be able to run all existing tests.
diff --git a/doc/index.rst b/doc/index.rst
index 15d244fc..0081fdea 100644
--- a/doc/index.rst
+++ b/doc/index.rst
@@ -21,3 +21,4 @@ Contents:
comps
contributing
testing
+ multi_compose
diff --git a/doc/multi_compose.rst b/doc/multi_compose.rst
new file mode 100644
index 00000000..8848ad6b
--- /dev/null
+++ b/doc/multi_compose.rst
@@ -0,0 +1,60 @@
+.. _multi_compose:
+
+Managing compose from multiple parts
+====================================
+
+There may be cases where it makes sense to split a big compose into separate
+parts, but create a compose output that links all output into one familiar
+structure.
+
+The `pungi-orchestrate` tools allows that.
+
+It works with an INI-style configuration file. The ``[general]`` section
+contains information about identity of the main compose. Other sections define
+individual parts.
+
+The parts are scheduled to run in parallel, with the minimal amount of
+serialization. The final compose directory will contain hard-links to the
+files.
+
+
+General settings
+----------------
+
+**target**
+ Path to directory where the final compose should be created.
+**compose_type**
+ Type of compose to make.
+**release_name**
+ Name of the product for the final compose.
+**release_short**
+ Short name of the product for the final compose.
+**release_version**
+ Version of the product for the final compose.
+**release_type**
+ Type of the product for the final compose.
+**extra_args**
+ Additional arguments that wil be passed to the child Pungi processes.
+**koji_profile**
+ If specified, a current event will be retrieved from the Koji instance and
+ used for all parts.
+
+
+Partial compose settings
+------------------------
+
+Each part should have a separate section in the config file.
+
+It can specify these options:
+
+**config**
+ Path to configuration file that describes this part. If relative, it is
+ resolved relative to the file with parts configuration.
+**just_phase**, **skip_phase**
+ Customize which phases should run for this part.
+**depends_on**
+ A comma separated list of other parts that must be finished before this part
+ starts.
+**failable**
+ A boolean toggle to mark a part as failable. A failure in such part will
+ mark the final compose as incomplete, but still successful.
diff --git a/pungi.spec b/pungi.spec
index 79010f10..108a0b1f 100644
--- a/pungi.spec
+++ b/pungi.spec
@@ -21,6 +21,7 @@ BuildRequires: python2-libcomps
BuildRequires: python2-six
BuildRequires: python2-multilib
BuildRequires: python2-dogpile-cache
+BuildRequires: python2-parameterized
Requires: yum => 3.4.3-28
Requires: lorax >= 22.1
diff --git a/pungi/linker.py b/pungi/linker.py
index d90c4fc0..7a2389f5 100644
--- a/pungi/linker.py
+++ b/pungi/linker.py
@@ -14,6 +14,7 @@
# along with this program; if not, see .
+import contextlib
import errno
import os
import shutil
@@ -39,6 +40,17 @@ class LinkerPool(ThreadPool):
return pool
+@contextlib.contextmanager
+def linker_pool(link_type="hardlink-or-copy", num_workers=10):
+ """Create a linker and make sure it is stopped no matter what."""
+ linker = LinkerPool.with_workers(num_workers=num_workers, link_type=link_type)
+ linker.start()
+ try:
+ yield linker
+ finally:
+ linker.stop()
+
+
class LinkerThread(WorkerThread):
def process(self, item, num):
src, dst = item
diff --git a/pungi/phases/pkgset/sources/source_koji.py b/pungi/phases/pkgset/sources/source_koji.py
index bb793c73..dd1b64fb 100644
--- a/pungi/phases/pkgset/sources/source_koji.py
+++ b/pungi/phases/pkgset/sources/source_koji.py
@@ -694,22 +694,30 @@ def populate_global_pkgset(compose, koji_wrapper, path_prefix, event):
def get_koji_event_info(compose, koji_wrapper):
- koji_proxy = koji_wrapper.koji_proxy
event_file = os.path.join(compose.paths.work.topdir(arch="global"), "koji-event")
- if compose.koji_event:
- koji_event = koji_proxy.getEvent(compose.koji_event)
- compose.log_info("Setting koji event to a custom value: %s" % compose.koji_event)
- json.dump(koji_event, open(event_file, "w"))
- return koji_event
-
msg = "Getting koji event"
if compose.DEBUG and os.path.exists(event_file):
compose.log_warning("[SKIP ] %s" % msg)
result = json.load(open(event_file, "r"))
else:
- compose.log_info(msg)
- result = koji_proxy.getLastEvent()
- json.dump(result, open(event_file, "w"))
- compose.log_info("Koji event: %s" % result["id"])
+ result = get_koji_event_raw(koji_wrapper, compose.koji_event, event_file)
+ if compose.koji_event:
+ compose.log_info("Setting koji event to a custom value: %s" % compose.koji_event)
+ else:
+ compose.log_info(msg)
+ compose.log_info("Koji event: %s" % result["id"])
+
return result
+
+
+def get_koji_event_raw(koji_wrapper, event_id, event_file):
+ if event_id:
+ koji_event = koji_wrapper.koji_proxy.getEvent(event_id)
+ else:
+ koji_event = koji_wrapper.koji_proxy.getLastEvent()
+
+ with open(event_file, "w") as f:
+ json.dump(koji_event, f)
+
+ return koji_event
diff --git a/pungi_utils/orchestrator.py b/pungi_utils/orchestrator.py
new file mode 100644
index 00000000..e8534847
--- /dev/null
+++ b/pungi_utils/orchestrator.py
@@ -0,0 +1,546 @@
+# -*- coding: utf-8 -*-
+
+from __future__ import print_function
+
+import argparse
+import errno
+import json
+import logging
+import os
+import re
+import shutil
+import subprocess
+import sys
+from collections import namedtuple
+
+import kobo.conf
+import kobo.log
+import productmd
+from six.moves import configparser, shlex_quote
+
+from pungi.compose import get_compose_dir
+from pungi.linker import linker_pool
+from pungi.phases.pkgset.sources.source_koji import get_koji_event_raw
+from pungi.util import find_old_compose, parse_koji_event, temp_dir
+from pungi.wrappers.kojiwrapper import KojiWrapper
+
+
+Config = namedtuple(
+ "Config",
+ [
+ # Path to directory with the compose
+ "target",
+ "compose_type",
+ "label",
+ # Path to the selected old compose that will be reused
+ "old_compose",
+ # Path to directory with config file copies
+ "config_dir",
+ # Which koji event to use (if any)
+ "event",
+ # Additional arguments to pungi-koji executable
+ "extra_args",
+ ],
+)
+
+log = logging.getLogger(__name__)
+
+
+class Status(object):
+ # Ready to start
+ READY = "READY"
+ # Waiting for dependencies to finish.
+ WAITING = "WAITING"
+ # Part is currently running
+ STARTED = "STARTED"
+ # A dependency failed, this one will never start.
+ BLOCKED = "BLOCKED"
+
+
+class ComposePart(object):
+ def __init__(self, name, config, just_phase=[], skip_phase=[], dependencies=[]):
+ self.name = name
+ self.config = config
+ self.status = Status.WAITING if dependencies else Status.READY
+ self.just_phase = just_phase
+ self.skip_phase = skip_phase
+ self.blocked_on = set(dependencies)
+ self.depends_on = set(dependencies)
+ self.path = None
+ self.log_file = None
+ self.failable = False
+
+ def __str__(self):
+ return self.name
+
+ def __repr__(self):
+ return (
+ "ComposePart({0.name!r},"
+ " {0.config!r},"
+ " {0.status!r},"
+ " just_phase={0.just_phase!r},"
+ " skip_phase={0.skip_phase!r},"
+ " dependencies={0.depends_on!r})"
+ ).format(self)
+
+ def refresh_status(self):
+ """Refresh status of this part with the result of the compose. This
+ should only be called once the compose finished.
+ """
+ try:
+ with open(os.path.join(self.path, "STATUS")) as fh:
+ self.status = fh.read().strip()
+ except IOError as exc:
+ log.error("Failed to update status of %s: %s", self.name, exc)
+ log.error("Assuming %s is DOOMED", self.name)
+ self.status = "DOOMED"
+
+ def is_finished(self):
+ return "FINISHED" in self.status
+
+ def unblock_on(self, finished_part):
+ """Update set of blockers for this part. If it's empty, mark us as ready."""
+ self.blocked_on.discard(finished_part)
+ if self.status == Status.WAITING and not self.blocked_on:
+ log.debug("%s is ready to start", self)
+ self.status = Status.READY
+
+ def setup_start(self, global_config, parts):
+ substitutions = {name: p.path for name, p in parts.items() if p.is_finished()}
+ substitutions["configdir"] = global_config.config_dir
+
+ config = kobo.conf.PyConfigParser()
+ config.load_from_file(self.config)
+
+ for f in config.opened_files:
+ # apply substitutions
+ fill_in_config_file(f, substitutions)
+
+ self.status = Status.STARTED
+ self.path = get_compose_dir(
+ os.path.join(global_config.target, "parts"),
+ config,
+ compose_type=global_config.compose_type,
+ compose_label=global_config.label,
+ )
+ self.log_file = os.path.join(global_config.target, "logs", "%s.log" % self.name)
+ log.info("Starting %s in %s", self.name, self.path)
+
+ def get_cmd(self, global_config):
+ cmd = ["pungi-koji", "--config", self.config, "--compose-dir", self.path]
+ cmd.append("--%s" % global_config.compose_type)
+ if global_config.label:
+ cmd.extend(["--label", global_config.label])
+ for phase in self.just_phase:
+ cmd.extend(["--just-phase", phase])
+ for phase in self.skip_phase:
+ cmd.extend(["--skip-phase", phase])
+ if global_config.old_compose:
+ cmd.extend(
+ ["--old-compose", os.path.join(global_config.old_compose, "parts")]
+ )
+ if global_config.event:
+ cmd.extend(["--koji-event", str(global_config.event)])
+ if global_config.extra_args:
+ cmd.extend(global_config.extra_args)
+ cmd.extend(["--no-latest-link"])
+ return cmd
+
+ @classmethod
+ def from_config(cls, config, section, config_dir):
+ part = cls(
+ name=section,
+ config=os.path.join(config_dir, config.get(section, "config")),
+ just_phase=_safe_get_list(config, section, "just_phase", []),
+ skip_phase=_safe_get_list(config, section, "skip_phase", []),
+ dependencies=_safe_get_list(config, section, "depends_on", []),
+ )
+ if config.has_option(section, "failable"):
+ part.failable = config.getboolean(section, "failable")
+ return part
+
+
+def _safe_get_list(config, section, option, default=None):
+ """Get a value from config parser. The result is split into a list on
+ commas or spaces, and `default` is returned if the key does not exist.
+ """
+ if config.has_option(section, option):
+ value = config.get(section, option)
+ return [x.strip() for x in re.split(r"[, ]+", value) if x]
+ return default
+
+
+def fill_in_config_file(fp, substs):
+ """Templating function. It works with Jinja2 style placeholders such as
+ {{foo}}. Whitespace around the key name is fine. The file is modified in place.
+
+ :param fp string: path to the file to process
+ :param substs dict: a mapping for values to put into the file
+ """
+
+ def repl(match):
+ try:
+ return substs[match.group(1)]
+ except KeyError as exc:
+ raise RuntimeError(
+ "Unknown placeholder %s in %s" % (exc, os.path.basename(fp))
+ )
+
+ with open(fp, "r") as f:
+ contents = re.sub(r"{{ *([a-zA-Z-_]+) *}}", repl, f.read())
+ with open(fp, "w") as f:
+ f.write(contents)
+
+
+def start_part(global_config, parts, part):
+ part.setup_start(global_config, parts)
+ fh = open(part.log_file, "w")
+ cmd = part.get_cmd(global_config)
+ log.debug("Running command %r", " ".join(shlex_quote(x) for x in cmd))
+ return subprocess.Popen(cmd, stdout=fh, stderr=subprocess.STDOUT)
+
+
+def handle_finished(global_config, linker, parts, proc, finished_part):
+ finished_part.refresh_status()
+ log.info("%s finished with status %s", finished_part, finished_part.status)
+ if proc.returncode == 0:
+ # Success, unblock other parts...
+ for part in parts.values():
+ part.unblock_on(finished_part.name)
+ # ...and link the results into final destination.
+ copy_part(global_config, linker, finished_part)
+ update_metadata(global_config, finished_part)
+ else:
+ # Failure, other stuff may be blocked.
+ log.info("See details in %s", finished_part.log_file)
+ block_on(parts, finished_part.name)
+
+
+def copy_part(global_config, linker, part):
+ c = productmd.Compose(part.path)
+ for variant in c.info.variants:
+ data_path = os.path.join(part.path, "compose", variant)
+ link = os.path.join(global_config.target, "compose", variant)
+ log.info("Hardlinking content %s -> %s", data_path, link)
+ hardlink_dir(linker, data_path, link)
+
+
+def hardlink_dir(linker, srcdir, dstdir):
+ for root, dirs, files in os.walk(srcdir):
+ root = os.path.relpath(root, srcdir)
+ for f in files:
+ src = os.path.normpath(os.path.join(srcdir, root, f))
+ dst = os.path.normpath(os.path.join(dstdir, root, f))
+ linker.queue_put((src, dst))
+
+
+def update_metadata(global_config, part):
+ part_metadata_dir = os.path.join(part.path, "compose", "metadata")
+ final_metadata_dir = os.path.join(global_config.target, "compose", "metadata")
+ for f in os.listdir(part_metadata_dir):
+ # Load the metadata
+ with open(os.path.join(part_metadata_dir, f)) as fh:
+ part_metadata = json.load(fh)
+ final_metadata = os.path.join(final_metadata_dir, f)
+ if os.path.exists(final_metadata):
+ # We already have this file, will need to merge.
+ merge_metadata(final_metadata, part_metadata)
+ else:
+ # A new file, just copy it.
+ copy_metadata(global_config, final_metadata, part_metadata)
+
+
+def copy_metadata(global_config, final_metadata, source):
+ """Copy file to final location, but update compose information."""
+ with open(
+ os.path.join(global_config.target, "compose/metadata/composeinfo.json")
+ ) as f:
+ composeinfo = json.load(f)
+ try:
+ source["payload"]["compose"].update(composeinfo["payload"]["compose"])
+ except KeyError:
+ # No [payload][compose], probably OSBS metadata
+ pass
+ with open(final_metadata, "w") as f:
+ json.dump(source, f, indent=2, sort_keys=True)
+
+
+def merge_metadata(final_metadata, source):
+ with open(final_metadata) as f:
+ metadata = json.load(f)
+
+ try:
+ key = {
+ "productmd.composeinfo": "variants",
+ "productmd.modules": "modules",
+ "productmd.images": "images",
+ "productmd.rpms": "rpms",
+ }[source["header"]["type"]]
+ # TODO what if multiple parts create images for the same variant
+ metadata["payload"][key].update(source["payload"][key])
+ except KeyError:
+ # OSBS metadata, merge whole file
+ metadata.update(source)
+ with open(final_metadata, "w") as f:
+ json.dump(metadata, f, indent=2, sort_keys=True)
+
+
+def block_on(parts, name):
+ """Part ``name`` failed, mark everything depending on it as blocked."""
+ for part in parts.values():
+ if name in part.blocked_on:
+ log.warning("%s is blocked now and will not run", part)
+ part.status = Status.BLOCKED
+ block_on(parts, part.name)
+
+
+def check_finished_processes(processes):
+ """Walk through all active processes and check if something finished.
+ """
+ for proc in processes.keys():
+ proc.poll()
+ if proc.returncode is not None:
+ yield proc, processes[proc]
+
+
+def run_all(global_config, parts):
+ # Mapping subprocess.Popen -> ComposePart
+ processes = dict()
+ remaining = set(p.name for p in parts.values() if not p.is_finished())
+
+ with linker_pool("hardlink") as linker:
+ while remaining or processes:
+ update_status(global_config, parts)
+
+ for proc, part in check_finished_processes(processes):
+ del processes[proc]
+ handle_finished(global_config, linker, parts, proc, part)
+
+ # Start new available processes.
+ for name in list(remaining):
+ part = parts[name]
+ # Start all ready parts
+ if part.status == Status.READY:
+ remaining.remove(name)
+ processes[start_part(global_config, parts, part)] = part
+ # Remove blocked parts from todo list
+ elif part.status == Status.BLOCKED:
+ remaining.remove(part.name)
+
+ # Wait for any child process to finish if there is any.
+ if processes:
+ pid, reason = os.wait()
+ for proc in processes.keys():
+ # Set the return code for process that we caught by os.wait().
+ # Calling poll() on it would not set the return code properly
+ # since the value was already consumed by os.wait().
+ if proc.pid == pid:
+ proc.returncode = (reason >> 8) & 0xFF
+
+ log.info("Waiting for linking to finish...")
+ return update_status(global_config, parts)
+
+
+def get_target_dir(config, compose_info, label, reldir=""):
+ """Find directory where this compose will be.
+
+ @param reldir: if target path in config is relative, it will be resolved
+ against this directory
+ """
+ dir = os.path.realpath(os.path.join(reldir, config.get("general", "target")))
+ target_dir = get_compose_dir(
+ dir,
+ compose_info,
+ compose_type=config.get("general", "compose_type"),
+ compose_label=label,
+ )
+ return target_dir
+
+
+def setup_logging(debug=False):
+ FORMAT = "%(asctime)s: %(levelname)s: %(message)s"
+ level = logging.DEBUG if debug else logging.INFO
+ kobo.log.add_stderr_logger(log, log_level=level, format=FORMAT)
+ log.setLevel(level)
+
+
+def compute_status(statuses):
+ if any(map(lambda x: x[0] in ("STARTED", "WAITING"), statuses)):
+ # If there is anything still running or waiting to start, the whole is
+ # still running.
+ return "STARTED"
+ elif any(map(lambda x: x[0] in ("DOOMED", "BLOCKED") and not x[1], statuses)):
+ # If any required part is doomed or blocked, the whole is doomed
+ return "DOOMED"
+ elif all(map(lambda x: x[0] == "FINISHED", statuses)):
+ # If all parts are complete, the whole is complete
+ return "FINISHED"
+ else:
+ return "FINISHED_INCOMPLETE"
+
+
+def update_status(global_config, parts):
+ log.debug("Updating status metadata")
+ metadata = {}
+ statuses = set()
+ for part in parts.values():
+ metadata[part.name] = {"status": part.status, "path": part.path}
+ statuses.add((part.status, part.failable))
+ metadata_path = os.path.join(
+ global_config.target, "compose", "metadata", "parts.json"
+ )
+ with open(metadata_path, "w") as fh:
+ json.dump(metadata, fh, indent=2, sort_keys=True, separators=(",", ": "))
+
+ status = compute_status(statuses)
+ log.info("Overall status is %s", status)
+ with open(os.path.join(global_config.target, "STATUS"), "w") as fh:
+ fh.write(status)
+
+ return status != "DOOMED"
+
+
+def prepare_compose_dir(config, args, main_config_file, compose_info):
+ if not hasattr(args, "compose_path"):
+ # Creating a brand new compose
+ target_dir = get_target_dir(
+ config, compose_info, args.label, reldir=os.path.dirname(main_config_file)
+ )
+ for dir in ("logs", "parts", "compose/metadata", "work/global"):
+ try:
+ os.makedirs(os.path.join(target_dir, dir))
+ except OSError as exc:
+ if exc.errno != errno.EEXIST:
+ raise
+ # Copy initial composeinfo for new compose
+ shutil.copy(
+ os.path.join(target_dir, "work/global/composeinfo-base.json"),
+ os.path.join(target_dir, "compose/metadata/composeinfo.json"),
+ )
+ else:
+ # Restarting a particular compose
+ target_dir = args.compose_path
+
+ return target_dir
+
+
+def load_parts_metadata(global_config):
+ parts_metadata = os.path.join(global_config.target, "compose/metadata/parts.json")
+ with open(parts_metadata) as f:
+ return json.load(f)
+
+
+def setup_for_restart(global_config, parts, to_restart):
+ has_stuff_to_do = False
+ metadata = load_parts_metadata(global_config)
+ for key in metadata:
+ # Update state to match what is on disk
+ log.debug(
+ "Reusing %s (%s) from %s",
+ key,
+ metadata[key]["status"],
+ metadata[key]["path"],
+ )
+ parts[key].status = metadata[key]["status"]
+ parts[key].path = metadata[key]["path"]
+ for key in to_restart:
+ # Set restarted parts to run again
+ parts[key].status = Status.WAITING
+ parts[key].path = None
+
+ for key in to_restart:
+ # Remove blockers that are already finished
+ for blocker in list(parts[key].blocked_on):
+ if parts[blocker].is_finished():
+ parts[key].blocked_on.discard(blocker)
+ if not parts[key].blocked_on:
+ log.debug("Part %s in not blocked", key)
+ # Nothing blocks it; let's go
+ parts[key].status = Status.READY
+ has_stuff_to_do = True
+
+ if not has_stuff_to_do:
+ raise RuntimeError("All restarted parts are blocked. Nothing to do.")
+
+
+def run(work_dir, main_config_file, args):
+ config_dir = os.path.join(work_dir, "config")
+ shutil.copytree(os.path.dirname(main_config_file), config_dir)
+
+ # Read main config
+ parser = configparser.RawConfigParser()
+ parser.read(main_config_file)
+
+ compose_info = dict(parser.items("general"))
+ compose_type = parser.get("general", "compose_type")
+
+ target_dir = prepare_compose_dir(parser, args, main_config_file, compose_info)
+ kobo.log.add_file_logger(log, os.path.join(target_dir, "logs", "orchestrator.log"))
+ log.info("Composing %s", target_dir)
+
+ old_compose = find_old_compose(
+ os.path.dirname(target_dir),
+ compose_info["release_short"],
+ compose_info["release_version"],
+ "",
+ )
+ if old_compose:
+ log.info("Reusing old compose %s", old_compose)
+
+ global_config = Config(
+ target=target_dir,
+ compose_type=compose_type,
+ label=args.label,
+ old_compose=old_compose,
+ config_dir=os.path.dirname(main_config_file),
+ event=args.koji_event,
+ extra_args=_safe_get_list(parser, "general", "extra_args"),
+ )
+
+ if not global_config.event and parser.has_option("general", "koji_profile"):
+ koji_wrapper = KojiWrapper(parser.get("general", "koji_profile"))
+ event_file = os.path.join(global_config.target, "work/global/koji-event")
+ result = get_koji_event_raw(koji_wrapper, None, event_file)
+ global_config = global_config._replace(event=result["id"])
+
+ parts = {}
+ for section in parser.sections():
+ if section == "general":
+ continue
+ parts[section] = ComposePart.from_config(parser, section, config_dir)
+
+ if hasattr(args, "part"):
+ setup_for_restart()
+
+ return run_all(global_config, parts)
+
+
+def parse_args(argv):
+ parser = argparse.ArgumentParser()
+ parser.add_argument("--debug", action="store_true")
+ parser.add_argument("--koji-event", metavar="ID", type=parse_koji_event)
+ subparsers = parser.add_subparsers()
+ start = subparsers.add_parser("start")
+ start.add_argument("config", metavar="CONFIG")
+ start.add_argument("--label")
+
+ restart = subparsers.add_parser("restart")
+ restart.add_argument("config", metavar="CONFIG")
+ restart.add_argument("compose_path", metavar="COMPOSE_PATH")
+ restart.add_argument(
+ "part", metavar="PART", nargs="*", help="which parts to restart"
+ )
+ restart.add_argument("--label")
+
+ return parser.parse_args(argv)
+
+
+def main(argv=None):
+ args = parse_args(argv)
+ setup_logging(args.debug)
+
+ main_config_file = os.path.abspath(args.config)
+
+ with temp_dir() as work_dir:
+ if not run(work_dir, main_config_file, args):
+ sys.exit(1)
diff --git a/setup.py b/setup.py
index ac8b10b8..dcb7f0eb 100755
--- a/setup.py
+++ b/setup.py
@@ -43,6 +43,7 @@ setup(
'bin/pungi-gather',
'bin/pungi-koji',
'bin/pungi-make-ostree',
+ 'bin/pungi-orchestrate',
'bin/pungi-patch-iso',
'bin/pungi-wait-for-signed-ostree-handler',
diff --git a/tests/data/client.conf b/tests/data/client.conf
new file mode 100644
index 00000000..1ce23ca7
--- /dev/null
+++ b/tests/data/client.conf
@@ -0,0 +1,14 @@
+from dummy-pungi import *
+
+tree_variants = ["Client"]
+pkgset_repos = {
+ "i386": [
+ "{{configdir}}/repo",
+ ],
+ "x86_64": [
+ "{{configdir}}/repo",
+ ],
+ "s390x": [
+ "{{configdir}}/repo",
+ ],
+}
diff --git a/tests/data/multi-compose-variants.xml b/tests/data/multi-compose-variants.xml
new file mode 100644
index 00000000..3f836525
--- /dev/null
+++ b/tests/data/multi-compose-variants.xml
@@ -0,0 +1,47 @@
+
+
+
+
+
+
+ x86_64
+
+
+ resilient-storage
+
+
+
+
+
+ i386
+ x86_64
+
+
+ core
+ standard
+ text-internet
+ firefox
+ skype
+
+
+ minimal
+ desktop
+
+
+
+
+
+ x86_64
+ s390x
+
+
+ core
+ standard
+ text-internet
+
+
+ minimal
+
+
+
+
diff --git a/tests/data/multi-compose.conf b/tests/data/multi-compose.conf
new file mode 100644
index 00000000..f1807639
--- /dev/null
+++ b/tests/data/multi-compose.conf
@@ -0,0 +1,19 @@
+[general]
+release_name = Multi Compose
+release_short = multi
+release_version = 1.0
+release_type = ga
+compose_type = nightly
+target = ../_composes/
+extra_args = --quiet
+
+[server]
+config = server.conf
+
+[client]
+config = client.conf
+
+[resilient-storage]
+config = resilient-storage.conf
+depends_on = server
+failable = yes
diff --git a/tests/data/resilient-storage.conf b/tests/data/resilient-storage.conf
new file mode 100644
index 00000000..b414f875
--- /dev/null
+++ b/tests/data/resilient-storage.conf
@@ -0,0 +1,15 @@
+from dummy-pungi import *
+
+tree_variants = ["ResilientStorage"]
+variants_file = "multi-compose-variants.xml"
+pkgset_repos = {
+ "i386": [
+ "{{configdir}}/repo",
+ ],
+ "x86_64": [
+ "{{configdir}}/repo",
+ ],
+ "s390x": [
+ "{{configdir}}/repo",
+ ],
+}
diff --git a/tests/data/server.conf b/tests/data/server.conf
new file mode 100644
index 00000000..cec3e3fb
--- /dev/null
+++ b/tests/data/server.conf
@@ -0,0 +1,16 @@
+from dummy-pungi import *
+
+tree_variants = ["Server"]
+variants_file = "multi-compose-variants.xml"
+pkgset_repos = {
+ "i386": [
+ "{{configdir}}/repo",
+ ],
+ "x86_64": [
+ "{{configdir}}/repo",
+ ],
+ "s390x": [
+ "{{configdir}}/repo",
+ ],
+}
+extra_isos = {}
diff --git a/tests/fixtures/DP-1.0-20181001.n.0/compose/metadata/composeinfo.json b/tests/fixtures/DP-1.0-20181001.n.0/compose/metadata/composeinfo.json
new file mode 100644
index 00000000..ea13f0a6
--- /dev/null
+++ b/tests/fixtures/DP-1.0-20181001.n.0/compose/metadata/composeinfo.json
@@ -0,0 +1,25 @@
+{
+ "header": {
+ "type": "productmd.composeinfo",
+ "version": "1.2"
+ },
+ "payload": {
+ "compose": {
+ "date": "20181001",
+ "id": "DP-1.0-20181001.n.0",
+ "respin": 0,
+ "type": "nightly"
+ },
+ "release": {
+ "internal": false,
+ "name": "Dummy Product",
+ "short": "DP",
+ "type": "ga",
+ "version": "1.0"
+ },
+ "variants": {
+ "Var1": {
+ }
+ }
+ }
+}
diff --git a/tests/fixtures/DP-1.0-20181001.n.0/compose/metadata/images.json b/tests/fixtures/DP-1.0-20181001.n.0/compose/metadata/images.json
new file mode 100644
index 00000000..dbc4ed08
--- /dev/null
+++ b/tests/fixtures/DP-1.0-20181001.n.0/compose/metadata/images.json
@@ -0,0 +1,18 @@
+{
+ "header": {
+ "type": "productmd.images",
+ "version": "1.2"
+ },
+ "payload": {
+ "compose": {
+ "date": "20181001",
+ "id": "DP-1.0-20181001.n.0",
+ "respin": 0,
+ "type": "nightly"
+ },
+ "images": {
+ "Var1": {
+ }
+ }
+ }
+}
diff --git a/tests/fixtures/DP-1.0-20181001.n.0/compose/metadata/modules.json b/tests/fixtures/DP-1.0-20181001.n.0/compose/metadata/modules.json
new file mode 100644
index 00000000..d3c8fc2f
--- /dev/null
+++ b/tests/fixtures/DP-1.0-20181001.n.0/compose/metadata/modules.json
@@ -0,0 +1,18 @@
+{
+ "header": {
+ "type": "productmd.modules",
+ "version": "1.2"
+ },
+ "payload": {
+ "compose": {
+ "date": "20181001",
+ "id": "DP-1.0-20181001.n.0",
+ "respin": 0,
+ "type": "nightly"
+ },
+ "modules": {
+ "Var1": {
+ }
+ }
+ }
+}
diff --git a/tests/fixtures/DP-1.0-20181001.n.0/compose/metadata/osbs.json b/tests/fixtures/DP-1.0-20181001.n.0/compose/metadata/osbs.json
new file mode 100644
index 00000000..6fe980e6
--- /dev/null
+++ b/tests/fixtures/DP-1.0-20181001.n.0/compose/metadata/osbs.json
@@ -0,0 +1,4 @@
+{
+ "Var1": {
+ }
+}
diff --git a/tests/fixtures/DP-1.0-20181001.n.0/compose/metadata/rpms.json b/tests/fixtures/DP-1.0-20181001.n.0/compose/metadata/rpms.json
new file mode 100644
index 00000000..2fe27c84
--- /dev/null
+++ b/tests/fixtures/DP-1.0-20181001.n.0/compose/metadata/rpms.json
@@ -0,0 +1,18 @@
+{
+ "header": {
+ "type": "productmd.rpms",
+ "version": "1.2"
+ },
+ "payload": {
+ "compose": {
+ "date": "20181001",
+ "id": "DP-1.0-20181001.n.0",
+ "respin": 0,
+ "type": "nightly"
+ },
+ "rpms": {
+ "Var1": {
+ }
+ }
+ }
+}
diff --git a/tests/fixtures/basic-metadata-merged/compose/metadata/composeinfo.json b/tests/fixtures/basic-metadata-merged/compose/metadata/composeinfo.json
new file mode 100644
index 00000000..3d4b9b18
--- /dev/null
+++ b/tests/fixtures/basic-metadata-merged/compose/metadata/composeinfo.json
@@ -0,0 +1,27 @@
+{
+ "header": {
+ "type": "productmd.composeinfo",
+ "version": "1.2"
+ },
+ "payload": {
+ "compose": {
+ "date": "20181001",
+ "id": "Mixed-1.0-20181001.n.0",
+ "respin": 0,
+ "type": "nightly"
+ },
+ "release": {
+ "internal": false,
+ "name": "Dummy Product",
+ "short": "Mixed",
+ "type": "ga",
+ "version": "1.0"
+ },
+ "variants": {
+ "Var0": {
+ },
+ "Var1": {
+ }
+ }
+ }
+}
diff --git a/tests/fixtures/basic-metadata-merged/compose/metadata/images.json b/tests/fixtures/basic-metadata-merged/compose/metadata/images.json
new file mode 100644
index 00000000..4f114a2f
--- /dev/null
+++ b/tests/fixtures/basic-metadata-merged/compose/metadata/images.json
@@ -0,0 +1,20 @@
+{
+ "header": {
+ "type": "productmd.images",
+ "version": "1.2"
+ },
+ "payload": {
+ "compose": {
+ "date": "20181001",
+ "id": "Mixed-1.0-20181001.n.0",
+ "respin": 0,
+ "type": "nightly"
+ },
+ "images": {
+ "Var0": {
+ },
+ "Var1": {
+ }
+ }
+ }
+}
diff --git a/tests/fixtures/basic-metadata-merged/compose/metadata/modules.json b/tests/fixtures/basic-metadata-merged/compose/metadata/modules.json
new file mode 100644
index 00000000..e4767297
--- /dev/null
+++ b/tests/fixtures/basic-metadata-merged/compose/metadata/modules.json
@@ -0,0 +1,20 @@
+{
+ "header": {
+ "type": "productmd.modules",
+ "version": "1.2"
+ },
+ "payload": {
+ "compose": {
+ "date": "20181001",
+ "id": "Mixed-1.0-20181001.n.0",
+ "respin": 0,
+ "type": "nightly"
+ },
+ "modules": {
+ "Var0": {
+ },
+ "Var1": {
+ }
+ }
+ }
+}
diff --git a/tests/fixtures/basic-metadata-merged/compose/metadata/osbs.json b/tests/fixtures/basic-metadata-merged/compose/metadata/osbs.json
new file mode 100644
index 00000000..a4e51795
--- /dev/null
+++ b/tests/fixtures/basic-metadata-merged/compose/metadata/osbs.json
@@ -0,0 +1,6 @@
+{
+ "Var0": {
+ },
+ "Var1": {
+ }
+}
diff --git a/tests/fixtures/basic-metadata-merged/compose/metadata/rpms.json b/tests/fixtures/basic-metadata-merged/compose/metadata/rpms.json
new file mode 100644
index 00000000..199b6422
--- /dev/null
+++ b/tests/fixtures/basic-metadata-merged/compose/metadata/rpms.json
@@ -0,0 +1,20 @@
+{
+ "header": {
+ "type": "productmd.rpms",
+ "version": "1.2"
+ },
+ "payload": {
+ "compose": {
+ "date": "20181001",
+ "id": "Mixed-1.0-20181001.n.0",
+ "respin": 0,
+ "type": "nightly"
+ },
+ "rpms": {
+ "Var0": {
+ },
+ "Var1": {
+ }
+ }
+ }
+}
diff --git a/tests/fixtures/basic-metadata/compose/metadata/composeinfo.json b/tests/fixtures/basic-metadata/compose/metadata/composeinfo.json
new file mode 100644
index 00000000..eb64af2d
--- /dev/null
+++ b/tests/fixtures/basic-metadata/compose/metadata/composeinfo.json
@@ -0,0 +1,25 @@
+{
+ "header": {
+ "type": "productmd.composeinfo",
+ "version": "1.2"
+ },
+ "payload": {
+ "compose": {
+ "date": "20181001",
+ "id": "Mixed-1.0-20181001.n.0",
+ "respin": 0,
+ "type": "nightly"
+ },
+ "release": {
+ "internal": false,
+ "name": "Dummy Product",
+ "short": "Mixed",
+ "type": "ga",
+ "version": "1.0"
+ },
+ "variants": {
+ "Var0": {
+ }
+ }
+ }
+}
diff --git a/tests/fixtures/basic-metadata/compose/metadata/images.json b/tests/fixtures/basic-metadata/compose/metadata/images.json
new file mode 100644
index 00000000..1948b04f
--- /dev/null
+++ b/tests/fixtures/basic-metadata/compose/metadata/images.json
@@ -0,0 +1,18 @@
+{
+ "header": {
+ "type": "productmd.images",
+ "version": "1.2"
+ },
+ "payload": {
+ "compose": {
+ "date": "20181001",
+ "id": "Mixed-1.0-20181001.n.0",
+ "respin": 0,
+ "type": "nightly"
+ },
+ "images": {
+ "Var0": {
+ }
+ }
+ }
+}
diff --git a/tests/fixtures/basic-metadata/compose/metadata/modules.json b/tests/fixtures/basic-metadata/compose/metadata/modules.json
new file mode 100644
index 00000000..6418d24d
--- /dev/null
+++ b/tests/fixtures/basic-metadata/compose/metadata/modules.json
@@ -0,0 +1,18 @@
+{
+ "header": {
+ "type": "productmd.modules",
+ "version": "1.2"
+ },
+ "payload": {
+ "compose": {
+ "date": "20181001",
+ "id": "Mixed-1.0-20181001.n.0",
+ "respin": 0,
+ "type": "nightly"
+ },
+ "modules": {
+ "Var0": {
+ }
+ }
+ }
+}
diff --git a/tests/fixtures/basic-metadata/compose/metadata/osbs.json b/tests/fixtures/basic-metadata/compose/metadata/osbs.json
new file mode 100644
index 00000000..d51fa3c6
--- /dev/null
+++ b/tests/fixtures/basic-metadata/compose/metadata/osbs.json
@@ -0,0 +1,4 @@
+{
+ "Var0": {
+ }
+}
diff --git a/tests/fixtures/basic-metadata/compose/metadata/rpms.json b/tests/fixtures/basic-metadata/compose/metadata/rpms.json
new file mode 100644
index 00000000..d6b71023
--- /dev/null
+++ b/tests/fixtures/basic-metadata/compose/metadata/rpms.json
@@ -0,0 +1,18 @@
+{
+ "header": {
+ "type": "productmd.rpms",
+ "version": "1.2"
+ },
+ "payload": {
+ "compose": {
+ "date": "20181001",
+ "id": "Mixed-1.0-20181001.n.0",
+ "respin": 0,
+ "type": "nightly"
+ },
+ "rpms": {
+ "Var0": {
+ }
+ }
+ }
+}
diff --git a/tests/fixtures/empty-metadata-merged/compose/metadata/composeinfo.json b/tests/fixtures/empty-metadata-merged/compose/metadata/composeinfo.json
new file mode 100644
index 00000000..d446c93b
--- /dev/null
+++ b/tests/fixtures/empty-metadata-merged/compose/metadata/composeinfo.json
@@ -0,0 +1,25 @@
+{
+ "header": {
+ "type": "productmd.composeinfo",
+ "version": "1.2"
+ },
+ "payload": {
+ "compose": {
+ "date": "20181001",
+ "id": "Empty-1.0-20181001.n.0",
+ "respin": 0,
+ "type": "nightly"
+ },
+ "release": {
+ "internal": false,
+ "name": "Dummy Product",
+ "short": "Empty",
+ "type": "ga",
+ "version": "1.0"
+ },
+ "variants": {
+ "Var1": {
+ }
+ }
+ }
+}
diff --git a/tests/fixtures/empty-metadata-merged/compose/metadata/images.json b/tests/fixtures/empty-metadata-merged/compose/metadata/images.json
new file mode 100644
index 00000000..60cc6ba7
--- /dev/null
+++ b/tests/fixtures/empty-metadata-merged/compose/metadata/images.json
@@ -0,0 +1,18 @@
+{
+ "header": {
+ "type": "productmd.images",
+ "version": "1.2"
+ },
+ "payload": {
+ "compose": {
+ "date": "20181001",
+ "id": "Empty-1.0-20181001.n.0",
+ "respin": 0,
+ "type": "nightly"
+ },
+ "images": {
+ "Var1": {
+ }
+ }
+ }
+}
diff --git a/tests/fixtures/empty-metadata-merged/compose/metadata/modules.json b/tests/fixtures/empty-metadata-merged/compose/metadata/modules.json
new file mode 100644
index 00000000..53ed3b6a
--- /dev/null
+++ b/tests/fixtures/empty-metadata-merged/compose/metadata/modules.json
@@ -0,0 +1,18 @@
+{
+ "header": {
+ "type": "productmd.modules",
+ "version": "1.2"
+ },
+ "payload": {
+ "compose": {
+ "date": "20181001",
+ "id": "Empty-1.0-20181001.n.0",
+ "respin": 0,
+ "type": "nightly"
+ },
+ "modules": {
+ "Var1": {
+ }
+ }
+ }
+}
diff --git a/tests/fixtures/empty-metadata-merged/compose/metadata/osbs.json b/tests/fixtures/empty-metadata-merged/compose/metadata/osbs.json
new file mode 100644
index 00000000..6fe980e6
--- /dev/null
+++ b/tests/fixtures/empty-metadata-merged/compose/metadata/osbs.json
@@ -0,0 +1,4 @@
+{
+ "Var1": {
+ }
+}
diff --git a/tests/fixtures/empty-metadata-merged/compose/metadata/rpms.json b/tests/fixtures/empty-metadata-merged/compose/metadata/rpms.json
new file mode 100644
index 00000000..8a8e5a93
--- /dev/null
+++ b/tests/fixtures/empty-metadata-merged/compose/metadata/rpms.json
@@ -0,0 +1,18 @@
+{
+ "header": {
+ "type": "productmd.rpms",
+ "version": "1.2"
+ },
+ "payload": {
+ "compose": {
+ "date": "20181001",
+ "id": "Empty-1.0-20181001.n.0",
+ "respin": 0,
+ "type": "nightly"
+ },
+ "rpms": {
+ "Var1": {
+ }
+ }
+ }
+}
diff --git a/tests/fixtures/empty-metadata/compose/metadata/composeinfo.json b/tests/fixtures/empty-metadata/compose/metadata/composeinfo.json
new file mode 100644
index 00000000..d7660318
--- /dev/null
+++ b/tests/fixtures/empty-metadata/compose/metadata/composeinfo.json
@@ -0,0 +1,23 @@
+{
+ "header": {
+ "type": "productmd.composeinfo",
+ "version": "1.2"
+ },
+ "payload": {
+ "compose": {
+ "date": "20181001",
+ "id": "Empty-1.0-20181001.n.0",
+ "respin": 0,
+ "type": "nightly"
+ },
+ "release": {
+ "internal": false,
+ "name": "Dummy Product",
+ "short": "Empty",
+ "type": "ga",
+ "version": "1.0"
+ },
+ "variants": {
+ }
+ }
+}
diff --git a/tests/test_orchestrator.py b/tests/test_orchestrator.py
new file mode 100644
index 00000000..35fe1ddc
--- /dev/null
+++ b/tests/test_orchestrator.py
@@ -0,0 +1,806 @@
+# -*- coding: utf-8 -*-
+
+import itertools
+import json
+from functools import wraps
+import operator
+import os
+import shutil
+import subprocess
+import sys
+from textwrap import dedent
+
+import mock
+import six
+from six.moves import configparser
+
+from parameterized import parameterized
+
+sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
+
+from tests.helpers import BaseTestCase, PungiTestCase, touch, FIXTURE_DIR
+from pungi_utils import orchestrator as o
+
+
+class TestConfigSubstitute(PungiTestCase):
+ def setUp(self):
+ super(TestConfigSubstitute, self).setUp()
+ self.fp = os.path.join(self.topdir, "config.conf")
+
+ @parameterized.expand(
+ [
+ ("hello = 'world'", "hello = 'world'"),
+ ("hello = '{{foo}}'", "hello = 'bar'"),
+ ("hello = '{{ foo}}'", "hello = 'bar'"),
+ ("hello = '{{foo }}'", "hello = 'bar'"),
+ ]
+ )
+ def test_substitutions(self, initial, expected):
+ touch(self.fp, initial)
+ o.fill_in_config_file(self.fp, {"foo": "bar"})
+ with open(self.fp) as f:
+ self.assertEqual(expected, f.read())
+
+ def test_missing_key(self):
+ touch(self.fp, "hello = '{{unknown}}'")
+ with self.assertRaises(RuntimeError) as ctx:
+ o.fill_in_config_file(self.fp, {})
+ self.assertEqual(
+ "Unknown placeholder 'unknown' in config.conf", str(ctx.exception)
+ )
+
+
+class TestSafeGetList(BaseTestCase):
+ @parameterized.expand(
+ [
+ ("", []),
+ ("foo", ["foo"]),
+ ("foo,bar", ["foo", "bar"]),
+ ("foo bar", ["foo", "bar"]),
+ ]
+ )
+ def test_success(self, value, expected):
+ cf = configparser.RawConfigParser()
+ cf.add_section("general")
+ cf.set("general", "key", value)
+ self.assertEqual(o._safe_get_list(cf, "general", "key"), expected)
+
+ def test_default(self):
+ cf = configparser.RawConfigParser()
+ cf.add_section("general")
+ self.assertEqual(o._safe_get_list(cf, "general", "missing", "hello"), "hello")
+
+
+class TestComposePart(PungiTestCase):
+ def test_from_minimal_config(self):
+ cf = configparser.RawConfigParser()
+ cf.add_section("test")
+ cf.set("test", "config", "my.conf")
+
+ part = o.ComposePart.from_config(cf, "test", "/tmp/config")
+ deps = "set()" if six.PY3 else "set([])"
+ self.assertEqual(str(part), "test")
+ self.assertEqual(
+ repr(part),
+ "ComposePart('test', '/tmp/config/my.conf', 'READY', "
+ "just_phase=[], skip_phase=[], dependencies=%s)" % deps,
+ )
+ self.assertFalse(part.failable)
+
+ def test_from_full_config(self):
+ cf = configparser.RawConfigParser()
+ cf.add_section("test")
+ cf.set("test", "config", "my.conf")
+ cf.set("test", "depends_on", "base")
+ cf.set("test", "skip_phase", "skip")
+ cf.set("test", "just_phase", "just")
+ cf.set("test", "failable", "yes")
+
+ part = o.ComposePart.from_config(cf, "test", "/tmp/config")
+ deps = "{'base'}" if six.PY3 else "set(['base'])"
+ self.assertEqual(
+ repr(part),
+ "ComposePart('test', '/tmp/config/my.conf', 'WAITING', "
+ "just_phase=['just'], skip_phase=['skip'], dependencies=%s)" % deps,
+ )
+ self.assertTrue(part.failable)
+
+ def test_get_cmd(self):
+ conf = o.Config(
+ "/tgt/", "production", "RC-1.0", "/old", "/cfg", 1234, ["--quiet"]
+ )
+ part = o.ComposePart(
+ "test", "/tmp/my.conf", just_phase=["just"], skip_phase=["skip"]
+ )
+ part.path = "/compose"
+
+ self.assertEqual(
+ part.get_cmd(conf),
+ [
+ "pungi-koji",
+ "--config",
+ "/tmp/my.conf",
+ "--compose-dir",
+ "/compose",
+ "--production",
+ "--label",
+ "RC-1.0",
+ "--just-phase",
+ "just",
+ "--skip-phase",
+ "skip",
+ "--old-compose",
+ "/old/parts",
+ "--koji-event",
+ "1234",
+ "--quiet",
+ "--no-latest-link",
+ ],
+ )
+
+ def test_refresh_status(self):
+ part = o.ComposePart("test", "/tmp/my.conf")
+ part.path = os.path.join(self.topdir)
+ touch(os.path.join(self.topdir, "STATUS"), "FINISHED")
+ part.refresh_status()
+ self.assertEqual(part.status, "FINISHED")
+
+ def test_refresh_status_missing_file(self):
+ part = o.ComposePart("test", "/tmp/my.conf")
+ part.path = os.path.join(self.topdir)
+ part.refresh_status()
+ self.assertEqual(part.status, "DOOMED")
+
+ @parameterized.expand(["FINISHED", "FINISHED_INCOMPLETE"])
+ def test_is_finished(self, status):
+ part = o.ComposePart("test", "/tmp/my.conf")
+ part.status = status
+ self.assertTrue(part.is_finished())
+
+ @parameterized.expand(["STARTED", "WAITING"])
+ def test_is_not_finished(self, status):
+ part = o.ComposePart("test", "/tmp/my.conf")
+ part.status = status
+ self.assertFalse(part.is_finished())
+
+ @mock.patch("pungi_utils.orchestrator.fill_in_config_file")
+ @mock.patch("pungi_utils.orchestrator.get_compose_dir")
+ @mock.patch("kobo.conf.PyConfigParser")
+ def test_setup_start(self, Conf, gcd, ficf):
+ def pth(*path):
+ return os.path.join(self.topdir, *path)
+
+ conf = o.Config(
+ pth("tgt"), "production", "RC-1.0", "/old", pth("cfg"), None, None
+ )
+ part = o.ComposePart("test", "/tmp/my.conf")
+ parts = {"base": mock.Mock(path="/base", is_finished=lambda: True)}
+ Conf.return_value.opened_files = ["foo.conf"]
+
+ part.setup_start(conf, parts)
+
+ self.assertEqual(part.status, "STARTED")
+ self.assertEqual(part.path, gcd.return_value)
+ self.assertEqual(part.log_file, pth("tgt", "logs", "test.log"))
+ self.assertEqual(
+ ficf.call_args_list,
+ [mock.call("foo.conf", {"base": "/base", "configdir": pth("cfg")})],
+ )
+ self.assertEqual(
+ gcd.call_args_list,
+ [
+ mock.call(
+ pth("tgt/parts"),
+ Conf.return_value,
+ compose_type="production",
+ compose_label="RC-1.0",
+ )
+ ],
+ )
+
+ @parameterized.expand(
+ [
+ # Nothing blocking, no change
+ ([], [], o.Status.READY),
+ # Remove last blocker and switch to READY
+ (["finished"], [], o.Status.READY),
+ # Blocker remaining, stay in WAITING
+ (["finished", "block"], ["block"], o.Status.WAITING),
+ ]
+ )
+ def test_unblock_on(self, deps, blockers, status):
+ part = o.ComposePart("test", "/tmp/my.conf", dependencies=deps)
+ part.unblock_on("finished")
+ self.assertItemsEqual(part.blocked_on, blockers)
+ self.assertEqual(part.status, status)
+
+
+class TestStartPart(PungiTestCase):
+ @mock.patch("subprocess.Popen")
+ def test_start(self, Popen):
+ part = mock.Mock(log_file=os.path.join(self.topdir, "log"))
+ config = mock.Mock()
+ parts = mock.Mock()
+ cmd = ["pungi-koji", "..."]
+
+ part.get_cmd.return_value = cmd
+
+ proc = o.start_part(config, parts, part)
+
+ self.assertEqual(
+ part.mock_calls,
+ [mock.call.setup_start(config, parts), mock.call.get_cmd(config)],
+ )
+ self.assertEqual(proc, Popen.return_value)
+ self.assertEqual(
+ Popen.call_args_list,
+ [mock.call(cmd, stdout=mock.ANY, stderr=subprocess.STDOUT)],
+ )
+
+
+class TestHandleFinished(BaseTestCase):
+ def setUp(self):
+ self.config = mock.Mock()
+ self.linker = mock.Mock()
+ self.parts = {"a": mock.Mock(), "b": mock.Mock()}
+
+ @mock.patch("pungi_utils.orchestrator.update_metadata")
+ @mock.patch("pungi_utils.orchestrator.copy_part")
+ def test_handle_success(self, cp, um):
+ proc = mock.Mock(returncode=0)
+ o.handle_finished(self.config, self.linker, self.parts, proc, self.parts["a"])
+
+ self.assertEqual(
+ self.parts["a"].mock_calls,
+ [mock.call.refresh_status(), mock.call.unblock_on(self.parts["a"].name)],
+ )
+ self.assertEqual(
+ self.parts["b"].mock_calls, [mock.call.unblock_on(self.parts["a"].name)]
+ )
+ self.assertEqual(
+ cp.call_args_list, [mock.call(self.config, self.linker, self.parts["a"])]
+ )
+ self.assertEqual(um.call_args_list, [mock.call(self.config, self.parts["a"])])
+
+ @mock.patch("pungi_utils.orchestrator.block_on")
+ def test_handle_failure(self, bo):
+ proc = mock.Mock(returncode=1)
+ o.handle_finished(self.config, self.linker, self.parts, proc, self.parts["a"])
+
+ self.assertEqual(self.parts["a"].mock_calls, [mock.call.refresh_status()])
+
+ self.assertEqual(
+ bo.call_args_list, [mock.call(self.parts, self.parts["a"].name)]
+ )
+
+
+class TestBlockOn(BaseTestCase):
+ def test_single(self):
+ parts = {"b": o.ComposePart("b", "b.conf", dependencies=["a"])}
+
+ o.block_on(parts, "a")
+
+ self.assertEqual(parts["b"].status, o.Status.BLOCKED)
+
+ def test_chain(self):
+ parts = {
+ "b": o.ComposePart("b", "b.conf", dependencies=["a"]),
+ "c": o.ComposePart("c", "c.conf", dependencies=["b"]),
+ "d": o.ComposePart("d", "d.conf", dependencies=["c"]),
+ }
+
+ o.block_on(parts, "a")
+
+ self.assertEqual(parts["b"].status, o.Status.BLOCKED)
+ self.assertEqual(parts["c"].status, o.Status.BLOCKED)
+ self.assertEqual(parts["d"].status, o.Status.BLOCKED)
+
+
+class TestUpdateMetadata(PungiTestCase):
+ def assertEqualJSON(self, f1, f2):
+ with open(f1) as f:
+ actual = json.load(f)
+ with open(f2) as f:
+ expected = json.load(f)
+ self.assertEqual(actual, expected)
+
+ def assertEqualMetadata(self, expected):
+ expected_dir = os.path.join(FIXTURE_DIR, expected, "compose/metadata")
+ for f in os.listdir(expected_dir):
+ self.assertEqualJSON(
+ os.path.join(self.tgt, "compose/metadata", f),
+ os.path.join(expected_dir, f),
+ )
+
+ @parameterized.expand(["empty-metadata", "basic-metadata"])
+ def test_merge_into_empty(self, fixture):
+ self.tgt = os.path.join(self.topdir, "target")
+
+ conf = o.Config(self.tgt, "production", None, None, None, None, [])
+ part = o.ComposePart("test", "/tmp/my.conf")
+ part.path = os.path.join(FIXTURE_DIR, "DP-1.0-20181001.n.0")
+
+ shutil.copytree(os.path.join(FIXTURE_DIR, fixture), self.tgt)
+
+ o.update_metadata(conf, part)
+
+ self.assertEqualMetadata(fixture + "-merged")
+
+
+class TestCopyPart(PungiTestCase):
+ @mock.patch("pungi_utils.orchestrator.hardlink_dir")
+ def test_copy(self, hd):
+ self.tgt = os.path.join(self.topdir, "target")
+ conf = o.Config(self.tgt, "production", None, None, None, None, [])
+ linker = mock.Mock()
+ part = o.ComposePart("test", "/tmp/my.conf")
+ part.path = os.path.join(FIXTURE_DIR, "DP-1.0-20161013.t.4")
+
+ o.copy_part(conf, linker, part)
+
+ self.assertItemsEqual(
+ hd.call_args_list,
+ [
+ mock.call(
+ linker,
+ os.path.join(part.path, "compose", variant),
+ os.path.join(self.tgt, "compose", variant),
+ )
+ for variant in ["Client", "Server"]
+ ],
+ )
+
+
+class TestHardlinkDir(PungiTestCase):
+ def test_hardlinking(self):
+ linker = mock.Mock()
+ src = os.path.join(self.topdir, "src")
+ dst = os.path.join(self.topdir, "dst")
+ files = ["file.txt", "nested/deep/another.txt"]
+
+ for f in files:
+ touch(os.path.join(src, f))
+
+ o.hardlink_dir(linker, src, dst)
+
+ self.assertItemsEqual(
+ linker.queue_put.call_args_list,
+ [mock.call((os.path.join(src, f), os.path.join(dst, f))) for f in files],
+ )
+
+
+class TestCheckFinishedProcesses(BaseTestCase):
+ def test_nothing_finished(self):
+ k1 = mock.Mock(returncode=None)
+ v1 = mock.Mock()
+ processes = {k1: v1}
+
+ self.assertItemsEqual(o.check_finished_processes(processes), [])
+
+ def test_yields_finished(self):
+ k1 = mock.Mock(returncode=None)
+ v1 = mock.Mock()
+ k2 = mock.Mock(returncode=0)
+ v2 = mock.Mock()
+ processes = {k1: v1, k2: v2}
+
+ self.assertItemsEqual(o.check_finished_processes(processes), [(k2, v2)])
+
+ def test_yields_failed(self):
+ k1 = mock.Mock(returncode=1)
+ v1 = mock.Mock()
+ processes = {k1: v1}
+
+ self.assertItemsEqual(o.check_finished_processes(processes), [(k1, v1)])
+
+
+class _Part(object):
+ def __init__(self, name, parent=None, fails=False, status=None):
+ self.name = name
+ self.finished = False
+ self.status = o.Status.WAITING if parent else o.Status.READY
+ if status:
+ self.status = status
+ self.proc = mock.Mock(name="proc_%s" % name, pid=hash(self))
+ self.parent = parent
+ self.fails = fails
+ self.failable = False
+ self.path = "/path/to/%s" % name
+ self.blocked_on = set([parent]) if parent else set()
+
+ def is_finished(self):
+ return self.finished or self.status == "FINISHED"
+
+ def __repr__(self):
+ return "<_Part(%r, parent=%r)>" % (self.name, self.parent)
+
+
+def with_mocks(parts, finish_order, wait_results):
+ """Setup all mocks and create dict with the parts.
+ :param finish_order: nested list: first element contains parts that finish
+ in first iteration, etc.
+ :param wait_results: list of names of processes that are returned by wait in each
+ iteration
+ """
+
+ def decorator(func):
+ @wraps(func)
+ def worker(self, lp, update_status, cfp, hf, sp, wait):
+ self.parts = dict((p.name, p) for p in parts)
+ self.linker = lp.return_value.__enter__.return_value
+
+ update_status.side_effect = self.mock_update
+ hf.side_effect = self.mock_finish
+ sp.side_effect = self.mock_start
+
+ finish = [[]]
+ for grp in finish_order:
+ finish.append([(self.parts[p].proc, self.parts[p]) for p in grp])
+
+ cfp.side_effect = finish
+ wait.side_effect = [(self.parts[p].proc.pid, 0) for p in wait_results]
+
+ func(self)
+
+ self.assertEqual(lp.call_args_list, [mock.call("hardlink")])
+
+ return worker
+
+ return decorator
+
+
+@mock.patch("os.wait")
+@mock.patch("pungi_utils.orchestrator.start_part")
+@mock.patch("pungi_utils.orchestrator.handle_finished")
+@mock.patch("pungi_utils.orchestrator.check_finished_processes")
+@mock.patch("pungi_utils.orchestrator.update_status")
+@mock.patch("pungi_utils.orchestrator.linker_pool")
+class TestRunAll(BaseTestCase):
+ def setUp(self):
+ self.maxDiff = None
+ self.conf = mock.Mock(name="global_config")
+ self.calls = []
+
+ def mock_update(self, global_config, parts):
+ self.assertEqual(global_config, self.conf)
+ self.assertEqual(parts, self.parts)
+ self.calls.append("update_status")
+
+ def mock_start(self, global_config, parts, part):
+ self.assertEqual(global_config, self.conf)
+ self.assertEqual(parts, self.parts)
+ self.calls.append(("start_part", part.name))
+ part.status = o.Status.STARTED
+ return part.proc
+
+ @property
+ def sorted_calls(self):
+ """Sort the consecutive calls of the same function based on the argument."""
+
+ def key(val):
+ return val[0] if isinstance(val, tuple) else val
+
+ return list(
+ itertools.chain.from_iterable(
+ sorted(grp, key=operator.itemgetter(1))
+ for _, grp in itertools.groupby(self.calls, key)
+ )
+ )
+
+ def mock_finish(self, global_config, linker, parts, proc, part):
+ self.assertEqual(global_config, self.conf)
+ self.assertEqual(linker, self.linker)
+ self.assertEqual(parts, self.parts)
+ self.calls.append(("handle_finished", part.name))
+ for child in parts.values():
+ if child.parent == part.name:
+ child.status = o.Status.BLOCKED if part.fails else o.Status.READY
+ part.status = "DOOMED" if part.fails else "FINISHED"
+
+ @with_mocks(
+ [_Part("fst"), _Part("snd", parent="fst")], [["fst"], ["snd"]], ["fst", "snd"]
+ )
+ def test_sequential(self):
+ o.run_all(self.conf, self.parts)
+
+ self.assertEqual(
+ self.sorted_calls,
+ [
+ # First iteration starts fst
+ "update_status",
+ ("start_part", "fst"),
+ # Second iteration handles finish of fst and starts snd
+ "update_status",
+ ("handle_finished", "fst"),
+ ("start_part", "snd"),
+ # Third iteration handles finish of snd
+ "update_status",
+ ("handle_finished", "snd"),
+ # Final update of status
+ "update_status",
+ ],
+ )
+
+ @with_mocks([_Part("fst"), _Part("snd")], [["fst", "snd"]], ["fst"])
+ def test_parallel(self):
+ o.run_all(self.conf, self.parts)
+
+ self.assertEqual(
+ self.sorted_calls,
+ [
+ # First iteration starts both fst and snd
+ "update_status",
+ ("start_part", "fst"),
+ ("start_part", "snd"),
+ # Second iteration handles finish of both of them
+ "update_status",
+ ("handle_finished", "fst"),
+ ("handle_finished", "snd"),
+ # Final update of status
+ "update_status",
+ ],
+ )
+
+ @with_mocks(
+ [_Part("1"), _Part("2", parent="1"), _Part("3", parent="1")],
+ [["1"], ["2", "3"]],
+ ["1", "2"],
+ )
+ def test_waits_for_dep_then_parallel_with_simultaneous_end(self):
+ o.run_all(self.conf, self.parts)
+
+ self.assertEqual(
+ self.sorted_calls,
+ [
+ # First iteration starts first part
+ "update_status",
+ ("start_part", "1"),
+ # Second iteration starts 2 and 3
+ "update_status",
+ ("handle_finished", "1"),
+ ("start_part", "2"),
+ ("start_part", "3"),
+ # Both 2 and 3 end in third iteration
+ "update_status",
+ ("handle_finished", "2"),
+ ("handle_finished", "3"),
+ # Final update of status
+ "update_status",
+ ],
+ )
+
+ @with_mocks(
+ [_Part("1"), _Part("2", parent="1"), _Part("3", parent="1")],
+ [["1"], ["3"], ["2"]],
+ ["1", "3", "2"],
+ )
+ def test_waits_for_dep_then_parallel_with_different_end_times(self):
+ o.run_all(self.conf, self.parts)
+
+ self.assertEqual(
+ self.sorted_calls,
+ [
+ # First iteration starts first part
+ "update_status",
+ ("start_part", "1"),
+ # Second iteration starts 2 and 3
+ "update_status",
+ ("handle_finished", "1"),
+ ("start_part", "2"),
+ ("start_part", "3"),
+ # Third iteration sees 3 finish
+ "update_status",
+ ("handle_finished", "3"),
+ # Fourth iteration, 2 finishes
+ "update_status",
+ ("handle_finished", "2"),
+ # Final update of status
+ "update_status",
+ ],
+ )
+
+ @with_mocks(
+ [_Part("fst", fails=True), _Part("snd", parent="fst")], [["fst"]], ["fst"]
+ )
+ def test_blocked(self):
+ o.run_all(self.conf, self.parts)
+
+ self.assertEqual(
+ self.sorted_calls,
+ [
+ # First iteration starts first part
+ "update_status",
+ ("start_part", "fst"),
+ # Second iteration handles fail of first part
+ "update_status",
+ ("handle_finished", "fst"),
+ # Final update of status
+ "update_status",
+ ],
+ )
+
+
+@mock.patch("pungi_utils.orchestrator.get_compose_dir")
+class TestGetTargetDir(BaseTestCase):
+ def test_with_absolute_path(self, gcd):
+ config = {"target": "/tgt", "compose_type": "nightly"}
+ cfg = mock.Mock()
+ cfg.get.side_effect = lambda _, k: config[k]
+ ci = mock.Mock()
+ res = o.get_target_dir(cfg, ci, None, reldir="/checkout")
+ self.assertEqual(res, gcd.return_value)
+ self.assertEqual(
+ gcd.call_args_list,
+ [mock.call("/tgt", ci, compose_type="nightly", compose_label=None)],
+ )
+
+ def test_with_relative_path(self, gcd):
+ config = {"target": "tgt", "compose_type": "nightly"}
+ cfg = mock.Mock()
+ cfg.get.side_effect = lambda _, k: config[k]
+ ci = mock.Mock()
+ res = o.get_target_dir(cfg, ci, None, reldir="/checkout")
+ self.assertEqual(res, gcd.return_value)
+ self.assertEqual(
+ gcd.call_args_list,
+ [
+ mock.call(
+ "/checkout/tgt", ci, compose_type="nightly", compose_label=None
+ )
+ ],
+ )
+
+
+class TestComputeStatus(BaseTestCase):
+ @parameterized.expand(
+ [
+ ([("FINISHED", False)], "FINISHED"),
+ ([("FINISHED", False), ("STARTED", False)], "STARTED"),
+ ([("FINISHED", False), ("STARTED", False), ("WAITING", False)], "STARTED"),
+ ([("FINISHED", False), ("DOOMED", False)], "DOOMED"),
+ (
+ [("FINISHED", False), ("BLOCKED", True), ("DOOMED", True)],
+ "FINISHED_INCOMPLETE",
+ ),
+ ([("FINISHED", False), ("BLOCKED", False), ("DOOMED", True)], "DOOMED"),
+ ([("FINISHED", False), ("DOOMED", True)], "FINISHED_INCOMPLETE"),
+ ([("FINISHED", False), ("STARTED", False), ("DOOMED", False)], "STARTED"),
+ ]
+ )
+ def test_cases(self, statuses, expected):
+ self.assertEqual(o.compute_status(statuses), expected)
+
+
+class TestUpdateStatus(PungiTestCase):
+ def test_updating(self):
+ os.makedirs(os.path.join(self.topdir, "compose/metadata"))
+ conf = o.Config(
+ self.topdir, "production", "RC-1.0", "/old", "/cfg", 1234, ["--quiet"]
+ )
+ o.update_status(
+ conf,
+ {"1": _Part("1", status="FINISHED"), "2": _Part("2", status="STARTED")},
+ )
+ self.assertFileContent(os.path.join(self.topdir, "STATUS"), "STARTED")
+ self.assertFileContent(
+ os.path.join(self.topdir, "compose/metadata/parts.json"),
+ dedent(
+ """\
+ {
+ "1": {
+ "path": "/path/to/1",
+ "status": "FINISHED"
+ },
+ "2": {
+ "path": "/path/to/2",
+ "status": "STARTED"
+ }
+ }
+ """
+ ),
+ )
+
+
+@mock.patch("pungi_utils.orchestrator.get_target_dir")
+class TestPrepareComposeDir(PungiTestCase):
+ def setUp(self):
+ super(TestPrepareComposeDir, self).setUp()
+ self.conf = mock.Mock(name="config")
+ self.main_config = "/some/config"
+ self.compose_info = mock.Mock(name="compose_info")
+
+ def test_new_compose(self, gtd):
+ def mock_get_target(conf, compose_info, label, reldir):
+ self.assertEqual(conf, self.conf)
+ self.assertEqual(compose_info, self.compose_info)
+ self.assertEqual(label, args.label)
+ self.assertEqual(reldir, "/some")
+ touch(os.path.join(self.topdir, "work/global/composeinfo-base.json"), "WOO")
+ return self.topdir
+
+ gtd.side_effect = mock_get_target
+ args = mock.Mock(name="args", spec=["label"])
+ retval = o.prepare_compose_dir(
+ self.conf, args, self.main_config, self.compose_info
+ )
+ self.assertEqual(retval, self.topdir)
+ self.assertFileContent(
+ os.path.join(self.topdir, "compose/metadata/composeinfo.json"), "WOO"
+ )
+ self.assertTrue(os.path.isdir(os.path.join(self.topdir, "logs")))
+ self.assertTrue(os.path.isdir(os.path.join(self.topdir, "parts")))
+ self.assertTrue(os.path.isdir(os.path.join(self.topdir, "work/global")))
+
+ def test_restarting_compose(self, gtd):
+ args = mock.Mock(name="args", spec=["label", "compose_path"])
+ retval = o.prepare_compose_dir(
+ self.conf, args, self.main_config, self.compose_info
+ )
+ self.assertEqual(gtd.call_args_list, [])
+ self.assertEqual(retval, args.compose_path)
+
+
+class TestLoadPartsMetadata(PungiTestCase):
+ def test_loading(self):
+ touch(
+ os.path.join(self.topdir, "compose/metadata/parts.json"), '{"foo": "bar"}'
+ )
+ conf = mock.Mock(target=self.topdir)
+
+ self.assertEqual(o.load_parts_metadata(conf), {"foo": "bar"})
+
+
+@mock.patch("pungi_utils.orchestrator.load_parts_metadata")
+class TestSetupForRestart(BaseTestCase):
+ def setUp(self):
+ self.conf = mock.Mock(name="global_config")
+
+ def test_restart_ok(self, lpm):
+ lpm.return_value = {
+ "p1": {"status": "FINISHED", "path": "/p1"},
+ "p2": {"status": "DOOMED", "path": "/p2"},
+ }
+ parts = {"p1": _Part("p1"), "p2": _Part("p2", parent="p1")}
+
+ o.setup_for_restart(self.conf, parts, ["p2"])
+
+ self.assertEqual(parts["p1"].status, "FINISHED")
+ self.assertEqual(parts["p1"].path, "/p1")
+ self.assertEqual(parts["p2"].status, "READY")
+ self.assertEqual(parts["p2"].path, None)
+
+ def test_restart_one_blocked_one_ok(self, lpm):
+ lpm.return_value = {
+ "p1": {"status": "DOOMED", "path": "/p1"},
+ "p2": {"status": "DOOMED", "path": "/p2"},
+ "p3": {"status": "WAITING", "path": None},
+ }
+ parts = {
+ "p1": _Part("p1"),
+ "p2": _Part("p2", parent="p1"),
+ "p3": _Part("p3", parent="p2"),
+ }
+
+ o.setup_for_restart(self.conf, parts, ["p1", "p3"])
+
+ self.assertEqual(parts["p1"].status, "READY")
+ self.assertEqual(parts["p1"].path, None)
+ self.assertEqual(parts["p2"].status, "DOOMED")
+ self.assertEqual(parts["p2"].path, "/p2")
+ self.assertEqual(parts["p3"].status, "WAITING")
+ self.assertEqual(parts["p3"].path, None)
+
+ def test_restart_all_blocked(self, lpm):
+ lpm.return_value = {
+ "p1": {"status": "DOOMED", "path": "/p1"},
+ "p2": {"status": "STARTED", "path": "/p2"},
+ }
+ parts = {"p1": _Part("p1"), "p2": _Part("p2", parent="p1")}
+
+ with self.assertRaises(RuntimeError):
+ o.setup_for_restart(self.conf, parts, ["p2"])
+
+ self.assertEqual(parts["p1"].status, "DOOMED")
+ self.assertEqual(parts["p1"].path, "/p1")
+ self.assertEqual(parts["p2"].status, "WAITING")
+ self.assertEqual(parts["p2"].path, None)