# -*- coding: utf-8 -*- # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; version 2 of the License. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Library General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, see . __all__ = ("Compose",) import contextlib import errno import logging import os import time import tempfile import shutil import json import socket import kobo.log import kobo.tback import requests from requests.exceptions import RequestException from productmd.composeinfo import ComposeInfo from productmd.images import Images from dogpile.cache import make_region from pungi.graph import SimpleAcyclicOrientedGraph from pungi.wrappers.variants import VariantsXmlParser from pungi.paths import Paths from pungi.wrappers.kojiwrapper import KojiDownloadProxy from pungi.wrappers.scm import get_file_from_scm from pungi.util import ( makedirs, get_arch_variant_data, get_format_substs, get_variant_data, retry, translate_path_raw, ) from pungi.metadata import compose_to_composeinfo try: # This is available since productmd >= 1.18 # TODO: remove this once the version is distributed widely enough from productmd.composeinfo import SUPPORTED_MILESTONES except ImportError: SUPPORTED_MILESTONES = ["RC", "Update", "SecurityFix"] def is_status_fatal(status_code): """Check if status code returned from CTS reports an error that is unlikely to be fixed by retrying. Generally client errors (4XX) are fatal, with the exception of 401 Unauthorized which could be caused by transient network issue between compose host and KDC. """ if status_code == 401: return False return status_code >= 400 and status_code < 500 @retry(wait_on=RequestException) def retry_request(method, url, data=None, json_data=None, auth=None): """ :param str method: Reqest method. :param str url: Target URL. :param dict data: form-urlencoded data to send in the body of the request. :param dict json_data: json data to send in the body of the request. """ request_method = getattr(requests, method) rv = request_method(url, data=data, json=json_data, auth=auth) if is_status_fatal(rv.status_code): try: error = rv.json() except ValueError: error = rv.text raise RuntimeError("%s responded with %d: %s" % (url, rv.status_code, error)) rv.raise_for_status() return rv class BearerAuth(requests.auth.AuthBase): def __init__(self, token): self.token = token def __call__(self, r): r.headers["authorization"] = "Bearer " + self.token return r @contextlib.contextmanager def cts_auth(pungi_conf): """ :param dict pungi_conf: dict obj of pungi.json config. """ auth = None token = None cts_keytab = pungi_conf.get("cts_keytab") cts_oidc_token_url = os.environ.get("CTS_OIDC_TOKEN_URL", "") or pungi_conf.get( "cts_oidc_token_url" ) try: if cts_keytab: # requests-kerberos cannot accept custom keytab, we need to use # environment variable for this. But we need to change environment # only temporarily just for this single requests.post. # So at first backup the current environment and revert to it # after the requests call. from requests_kerberos import HTTPKerberosAuth auth = HTTPKerberosAuth() environ_copy = dict(os.environ) if "$HOSTNAME" in cts_keytab: cts_keytab = cts_keytab.replace("$HOSTNAME", socket.gethostname()) os.environ["KRB5_CLIENT_KTNAME"] = cts_keytab os.environ["KRB5CCNAME"] = "DIR:%s" % tempfile.mkdtemp() elif cts_oidc_token_url: cts_oidc_client_id = os.environ.get( "CTS_OIDC_CLIENT_ID", "" ) or pungi_conf.get("cts_oidc_client_id", "") token = retry_request( "post", cts_oidc_token_url, data={ "grant_type": "client_credentials", "client_id": cts_oidc_client_id, "client_secret": os.environ.get("CTS_OIDC_CLIENT_SECRET", ""), }, ).json()["access_token"] auth = BearerAuth(token) del token yield auth except Exception as e: # Avoid leaking client secret in trackback e.show_locals = False raise e finally: if cts_keytab: shutil.rmtree(os.environ["KRB5CCNAME"].split(":", 1)[1]) os.environ.clear() os.environ.update(environ_copy) def get_compose_info( conf, compose_type="production", compose_date=None, compose_respin=None, compose_label=None, parent_compose_ids=None, respin_of=None, ): """ Creates inncomplete ComposeInfo to generate Compose ID """ ci = ComposeInfo() ci.release.name = conf["release_name"] ci.release.short = conf["release_short"] ci.release.version = conf["release_version"] ci.release.is_layered = True if conf.get("base_product_name", "") else False ci.release.type = conf.get("release_type", "ga").lower() ci.release.internal = bool(conf.get("release_internal", False)) if ci.release.is_layered: ci.base_product.name = conf["base_product_name"] ci.base_product.short = conf["base_product_short"] ci.base_product.version = conf["base_product_version"] ci.base_product.type = conf.get("base_product_type", "ga").lower() ci.compose.label = compose_label ci.compose.type = compose_type ci.compose.date = compose_date or time.strftime("%Y%m%d", time.localtime()) ci.compose.respin = compose_respin or 0 ci.compose.id = ci.create_compose_id() cts_url = conf.get("cts_url") if cts_url: # Create compose in CTS and get the reserved compose ID. url = os.path.join(cts_url, "api/1/composes/") data = { "compose_info": json.loads(ci.dumps()), "parent_compose_ids": parent_compose_ids, "respin_of": respin_of, } with cts_auth(conf) as authentication: rv = retry_request("post", url, json_data=data, auth=authentication) # Update local ComposeInfo with received ComposeInfo. cts_ci = ComposeInfo() cts_ci.loads(rv.text) ci.compose.respin = cts_ci.compose.respin ci.compose.id = cts_ci.compose.id return ci def write_compose_info(compose_dir, ci): """ Write ComposeInfo `ci` to `compose_dir` subdirectories. """ makedirs(compose_dir) with open(os.path.join(compose_dir, "COMPOSE_ID"), "w") as f: f.write(ci.compose.id) work_dir = os.path.join(compose_dir, "work", "global") makedirs(work_dir) ci.dump(os.path.join(work_dir, "composeinfo-base.json")) def update_compose_url(compose_id, compose_dir, conf): cts_url = conf.get("cts_url", None) if cts_url: url = os.path.join(cts_url, "api/1/composes", compose_id) tp = conf.get("translate_paths", None) compose_url = translate_path_raw(tp, compose_dir) if compose_url == compose_dir: # We do not have a URL, do not attempt the update. return data = { "action": "set_url", "compose_url": compose_url, } with cts_auth(conf) as authentication: return retry_request("patch", url, json_data=data, auth=authentication) def get_compose_dir( topdir, conf, compose_type="production", compose_date=None, compose_respin=None, compose_label=None, already_exists_callbacks=None, parent_compose_ids=None, respin_of=None, ): already_exists_callbacks = already_exists_callbacks or [] ci = get_compose_info( conf, compose_type, compose_date, compose_respin, compose_label, parent_compose_ids, respin_of, ) cts_url = conf.get("cts_url", None) if cts_url: # Create compose directory. compose_dir = os.path.join(topdir, ci.compose.id) os.makedirs(compose_dir) else: while 1: ci.compose.id = ci.create_compose_id() compose_dir = os.path.join(topdir, ci.compose.id) exists = False # TODO: callbacks to determine if a composeid was already used # for callback in already_exists_callbacks: # if callback(data): # exists = True # break # already_exists_callbacks fallback: does target compose_dir exist? try: os.makedirs(compose_dir) except OSError as ex: if ex.errno == errno.EEXIST: exists = True else: raise if exists: ci = get_compose_info( conf, compose_type, compose_date, ci.compose.respin + 1, compose_label, ) continue break write_compose_info(compose_dir, ci) return compose_dir class Compose(kobo.log.LoggingBase): def __init__( self, conf, topdir, skip_phases=None, just_phases=None, old_composes=None, koji_event=None, supported=False, logger=None, notifier=None, ): kobo.log.LoggingBase.__init__(self, logger) # TODO: check if minimal conf values are set self.conf = conf # This is a dict mapping UID to Variant objects. It only contains top # level variants. self.variants = {} # This is a similar mapping, but contains even nested variants. self.all_variants = {} self.topdir = os.path.abspath(topdir) self.skip_phases = skip_phases or [] self.just_phases = just_phases or [] self.old_composes = old_composes or [] self.koji_event = koji_event or conf.get("koji_event") self.notifier = notifier self._old_config = None # path definitions self.paths = Paths(self) # Set up logging to file if logger: kobo.log.add_file_logger( logger, self.paths.log.log_file("global", "pungi.log") ) kobo.log.add_file_logger( logger, self.paths.log.log_file("global", "excluding-arch.log") ) class PungiLogFilter(logging.Filter): def filter(self, record): return ( False if record.funcName and record.funcName == "is_excluded" else True ) class ExcludingArchLogFilter(logging.Filter): def filter(self, record): message = record.getMessage() if "Populating package set for arch:" in message or ( record.funcName and record.funcName == "is_excluded" ): return True else: return False for handler in logger.handlers: if isinstance(handler, logging.FileHandler): log_file_name = os.path.basename(handler.stream.name) if log_file_name == "pungi.global.log": handler.addFilter(PungiLogFilter()) elif log_file_name == "excluding-arch.global.log": handler.addFilter(ExcludingArchLogFilter()) # to provide compose_id, compose_date and compose_respin self.ci_base = ComposeInfo() self.ci_base.load( os.path.join(self.paths.work.topdir(arch="global"), "composeinfo-base.json") ) self.supported = supported if ( self.compose_label and self.compose_label.split("-")[0] in SUPPORTED_MILESTONES ): self.log_info( "Automatically setting 'supported' flag due to label: %s." % self.compose_label ) self.supported = True self.im = Images() self.im.compose.id = self.compose_id self.im.compose.type = self.compose_type self.im.compose.date = self.compose_date self.im.compose.respin = self.compose_respin self.im.metadata_path = self.paths.compose.metadata() self.containers_metadata = {} # Stores list of deliverables that failed, but did not abort the # compose. # {deliverable: [(Variant.uid, arch, subvariant)]} self.failed_deliverables = {} self.attempted_deliverables = {} self.required_deliverables = {} if self.conf.get("dogpile_cache_backend", None): self.cache_region = make_region().configure( self.conf.get("dogpile_cache_backend"), expiration_time=self.conf.get("dogpile_cache_expiration_time", 3600), arguments=self.conf.get("dogpile_cache_arguments", {}), ) else: self.cache_region = make_region().configure("dogpile.cache.null") self.koji_downloader = KojiDownloadProxy.from_config(self.conf, self._logger) get_compose_info = staticmethod(get_compose_info) write_compose_info = staticmethod(write_compose_info) get_compose_dir = staticmethod(get_compose_dir) update_compose_url = staticmethod(update_compose_url) def __getitem__(self, name): return self.variants[name] @property def compose_id(self): return self.ci_base.compose.id @property def compose_date(self): return self.ci_base.compose.date @property def compose_respin(self): return self.ci_base.compose.respin @property def compose_type(self): return self.ci_base.compose.type @property def compose_type_suffix(self): return self.ci_base.compose.type_suffix @property def compose_label(self): return self.ci_base.compose.label @property def compose_label_major_version(self): return self.ci_base.compose.label_major_version @property def has_comps(self): return bool(self.conf.get("comps_file", False)) @property def has_module_defaults(self): return bool(self.conf.get("module_defaults_dir", False)) @property def has_module_obsoletes(self): return bool(self.conf.get("module_obsoletes_dir", False)) @property def config_dir(self): return os.path.dirname(self.conf._open_file or "") @property def should_create_yum_database(self): """Explicit configuration trumps all. Otherwise check gather backend and only create it for Yum. """ config = self.conf.get("createrepo_database") if config is not None: return config return self.conf["gather_backend"] == "yum" def read_variants(self): # TODO: move to phases/init ? variants_file = self.paths.work.variants_file(arch="global") scm_dict = self.conf["variants_file"] if isinstance(scm_dict, dict): file_name = os.path.basename(scm_dict["file"]) if scm_dict["scm"] == "file": scm_dict["file"] = os.path.join( self.config_dir, os.path.basename(scm_dict["file"]) ) else: file_name = os.path.basename(scm_dict) scm_dict = os.path.join(self.config_dir, scm_dict) self.log_debug("Writing variants file: %s", variants_file) tmp_dir = self.mkdtemp(prefix="variants_file_") get_file_from_scm(scm_dict, tmp_dir, compose=self) shutil.copy2(os.path.join(tmp_dir, file_name), variants_file) shutil.rmtree(tmp_dir) tree_arches = self.conf.get("tree_arches", None) tree_variants = self.conf.get("tree_variants", None) with open(variants_file, "r") as file_obj: parser = VariantsXmlParser( file_obj, tree_arches, tree_variants, logger=self._logger ) self.variants = parser.parse() self.all_variants = {} for variant in self.get_variants(): self.all_variants[variant.uid] = variant # populate ci_base with variants - needed for layered-products (compose_id) # FIXME - compose_to_composeinfo is no longer needed and has been # removed, but I'm not entirely sure what this is needed for # or if it is at all self.ci_base = compose_to_composeinfo(self) def get_variants(self, types=None, arch=None): result = [] for i in self.variants.values(): if (not types or i.type in types) and (not arch or arch in i.arches): result.append(i) result.extend(i.get_variants(types=types, arch=arch)) return sorted(set(result)) def get_arches(self): result = set() for variant in self.get_variants(): for arch in variant.arches: result.add(arch) return sorted(result) @property def status_file(self): """Path to file where the compose status will be stored.""" if not hasattr(self, "_status_file"): self._status_file = os.path.join(self.topdir, "STATUS") return self._status_file def _log_failed_deliverables(self): for kind, data in self.failed_deliverables.items(): for variant, arch, subvariant in data: self.log_info( "Failed %s on variant <%s>, arch <%s>, subvariant <%s>." % (kind, variant, arch, subvariant) ) log = os.path.join(self.paths.log.topdir("global"), "deliverables.json") with open(log, "w") as f: json.dump( { "required": self.required_deliverables, "failed": self.failed_deliverables, "attempted": self.attempted_deliverables, }, f, indent=4, ) def write_status(self, stat_msg): if stat_msg not in ("STARTED", "FINISHED", "DOOMED", "TERMINATED"): self.log_warning("Writing nonstandard compose status: %s" % stat_msg) old_status = self.get_status() if stat_msg == old_status: return if old_status == "FINISHED": msg = "Could not modify a FINISHED compose: %s" % self.topdir self.log_error(msg) raise RuntimeError(msg) if stat_msg == "FINISHED" and self.failed_deliverables: stat_msg = "FINISHED_INCOMPLETE" self._log_failed_deliverables() with open(self.status_file, "w") as f: f.write(stat_msg + "\n") if self.notifier: self.notifier.send("status-change", status=stat_msg) def get_status(self): if not os.path.isfile(self.status_file): return return open(self.status_file, "r").read().strip() def get_image_name( self, arch, variant, disc_type="dvd", disc_num=1, suffix=".iso", format=None ): """Create a filename for image with given parameters. :raises RuntimeError: when unknown ``disc_type`` is given """ default_format = "{compose_id}-{variant}-{arch}-{disc_type}{disc_num}{suffix}" format = format or self.conf.get("image_name_format", default_format) if isinstance(format, dict): conf = get_variant_data(self.conf, "image_name_format", variant) format = conf[0] if conf else default_format if arch == "src": arch = "source" if disc_num: disc_num = int(disc_num) else: disc_num = "" kwargs = { "arch": arch, "disc_type": disc_type, "disc_num": disc_num, "suffix": suffix, } if variant.type == "layered-product": variant_uid = variant.parent.uid kwargs["compose_id"] = self.ci_base[variant.uid].compose_id else: variant_uid = variant.uid args = get_format_substs(self, variant=variant_uid, **kwargs) try: return (format % args).format(**args) except KeyError as err: raise RuntimeError( "Failed to create image name: unknown format element: %s" % err ) def can_fail(self, variant, arch, deliverable): """Figure out if deliverable can fail on variant.arch. Variant can be None. """ failable = get_arch_variant_data( self.conf, "failable_deliverables", arch, variant ) return deliverable in failable def attempt_deliverable(self, variant, arch, kind, subvariant=None): """Log information about attempted deliverable.""" variant_uid = variant.uid if variant else "" self.attempted_deliverables.setdefault(kind, []).append( (variant_uid, arch, subvariant) ) def require_deliverable(self, variant, arch, kind, subvariant=None): """Log information about attempted deliverable.""" variant_uid = variant.uid if variant else "" self.required_deliverables.setdefault(kind, []).append( (variant_uid, arch, subvariant) ) def fail_deliverable(self, variant, arch, kind, subvariant=None): """Log information about failed deliverable.""" variant_uid = variant.uid if variant else "" self.failed_deliverables.setdefault(kind, []).append( (variant_uid, arch, subvariant) ) @property def image_release(self): """Generate a value to pass to Koji as image release. If this compose has a label, the version from it will be used, otherwise we will create a string with date, compose type and respin. """ if self.compose_label: milestone, release = self.compose_label.split("-") return release return "%s%s.%s" % ( self.compose_date, self.ci_base.compose.type_suffix, self.compose_respin, ) @property def image_version(self): """Generate a value to pass to Koji as image version. The value is based on release version. If compose has a label, the milestone from it is appended to the version (unless it is RC). """ version = self.ci_base.release.version if self.compose_label and not self.compose_label.startswith("RC-"): milestone, release = self.compose_label.split("-") return "%s_%s" % (version, milestone) return version def mkdtemp(self, arch=None, variant=None, suffix="", prefix="tmp"): """ Create and return a unique temporary directory under dir of /work/{global,}/tmp[-]/ """ path = os.path.join(self.paths.work.tmp_dir(arch=arch, variant=variant)) tmpdir = tempfile.mkdtemp(suffix=suffix, prefix=prefix, dir=path) os.chmod(tmpdir, 0o755) return tmpdir def dump_containers_metadata(self): """Create a file with container metadata if there are any containers.""" if not self.containers_metadata: return with open(self.paths.compose.metadata("osbs.json"), "w") as f: json.dump( self.containers_metadata, f, indent=4, sort_keys=True, separators=(",", ": "), ) def traceback(self, detail=None, show_locals=True): """Store an extended traceback. This method should only be called when handling an exception. :param str detail: Extra information appended to the filename """ basename = "traceback" if detail: basename += "-" + detail tb_path = self.paths.log.log_file("global", basename) self.log_error("Extended traceback in: %s", tb_path) with open(tb_path, "wb") as f: f.write(kobo.tback.Traceback(show_locals=show_locals).get_traceback()) def load_old_compose_config(self): """ Helper method to load Pungi config dump from old compose. """ if not self._old_config: config_dump_full = self.paths.log.log_file("global", "config-dump") config_dump_full = self.paths.old_compose_path(config_dump_full) if not config_dump_full: return None self.log_info("Loading old config file: %s", config_dump_full) with open(config_dump_full, "r") as f: self._old_config = json.load(f) return self._old_config def get_ordered_variant_uids(compose): if not hasattr(compose, "_ordered_variant_uids"): ordered_variant_uids = _prepare_variant_as_lookaside(compose) # Some variants were not mentioned in configuration value # 'variant_as_lookaside' and its run order is not crucial (that # means there are no dependencies inside this group). They will be # processed first. A-Z sorting is for reproducibility. unordered_variant_uids = sorted( set(compose.all_variants.keys()) - set(ordered_variant_uids) ) setattr( compose, "_ordered_variant_uids", unordered_variant_uids + ordered_variant_uids, ) return getattr(compose, "_ordered_variant_uids") def _prepare_variant_as_lookaside(compose): """ Configuration value 'variant_as_lookaside' contains variant pairs . In that pair lookaside variant have to be processed first. Structure can be represented as a oriented graph. Its spanning line shows order how to process this set of variants. """ variant_as_lookaside = compose.conf.get("variant_as_lookaside", []) graph = SimpleAcyclicOrientedGraph() for variant, lookaside_variant in variant_as_lookaside: try: graph.add_edge(variant, lookaside_variant) except ValueError as e: raise ValueError( "There is a bad configuration in 'variant_as_lookaside': %s" % e ) variant_processing_order = reversed(graph.prune_graph()) return list(variant_processing_order)