diff --git a/doc/index.rst b/doc/index.rst index 16d9bf58..b71e67d3 100644 --- a/doc/index.rst +++ b/doc/index.rst @@ -19,6 +19,7 @@ Contents: scm_support messaging gathering + koji comps contributing testing diff --git a/doc/koji.rst b/doc/koji.rst new file mode 100644 index 00000000..7403e551 --- /dev/null +++ b/doc/koji.rst @@ -0,0 +1,103 @@ +====================== +Getting data from koji +====================== + +When Pungi is configured to get packages from a Koji tag, it somehow needs to +access the actual RPM files. + +Historically, this required the storage used by Koji to be directly available +on the host where Pungi was running. This was usually achieved by using NFS for +the Koji volume, and mounting it on the compose host. + +The compose could be created directly on the same volume. In such case the +packages would be hardlinked, significantly reducing space consumption. + +The compose could also be created on a different storage, in which case the +packages would either need to be copied over or symlinked. Using symlinks +requires that anything that accesses the compose (e.g. a download server) would +also need to mount the Koji volume in the same location. + +There is also a risk with symlinks that the package in Koji can change (due to +being resigned for example), which would invalidate composes linking to it. + + +Using Koji without direct mount +=============================== + +It is possible now to run a compose from a Koji tag without direct access to +Koji storage. + +Pungi can download the packages over HTTP protocol, store them in a local +cache, and consume them from there. + +The local cache has similar structure to what is on the Koji volume. + +When Pungi needs some package, it has a path on Koji volume. It will replace +the ``topdir`` with the cache location. If such file exists, it will be used. +If it doesn't exist, it will be downloaded from Koji (by replacing the +``topdir`` with ``topurl``). + +:: + + Koji path /mnt/koji/packages/foo/1/1.fc38/data/signed/abcdef/noarch/foo-1-1.fc38.noarch.rpm + Koji URL https://kojipkgs.fedoraproject.org/packages/foo/1/1.fc38/data/signed/abcdef/noarch/foo-1-1.fc38.noarch.rpm + Local path /mnt/compose/cache/packages/foo/1/1.fc38/data/signed/abcdef/noarch/foo-1-1.fc38.noarch.rpm + +The packages can be hardlinked from this cache directory. + + +Cleanup +------- + +While the approach above allows each RPM to be downloaded only once, it will +eventually result in the Koji volume being mirrored locally. Most of the +packages will however no longer be needed. + +There is a script ``pungi-cache-cleanup`` that can help with that. It can find +and remove files from the cache that are no longer needed. + +A file is no longer needed if it has a single link (meaning it is only in the +cache, not in any compose), and it has mtime older than a given threshold. + +It doesn't make sense to delete files that are hardlinked in an existing +compose as it would not save any space anyway. + +The mtime check is meant to preserve files that are downloaded but not actually +used in a compose, like a subpackage that is not included in any variant. Every +time its existence in the local cache is checked, the mtime is updated. + + +Race conditions? +---------------- + +It should be safe to have multiple compose hosts share the same storage volume +for generated composes and local cache. + +If a cache file is accessed and it exists, there's no risk of race condition. + +If two composes need the same file at the same time and it is not present yet, +one of them will take a lock on it and start downloading. The other will wait +until the download is finished. + +The lock is only valid for a set amount of time (5 minutes) to avoid issues +where the downloading process is killed in a way that blocks it from releasing +the lock. + +If the file is large and network slow, the limit may not be enough finish +downloading. In that case the second process will steal the lock while the +first process is still downloading. This will result in the same file being +downloaded twice. + +When the first process finishes the download, it will put the file into the +local cache location. When the second process finishes, it will atomically +replace it, but since it's the same file it will be the same file. + +If the first compose already managed to hardlink the file before it gets +replaced, there will be two copies of the file present locally. + + +Caveats +------- + +There is no integrity checking. Ideally Koji should provide checksums for the +RPMs that would be verified after downloading. This is not yet available. diff --git a/pungi/checks.py b/pungi/checks.py index 95085d95..244a2fff 100644 --- a/pungi/checks.py +++ b/pungi/checks.py @@ -849,6 +849,7 @@ def make_schema(): "cts_oidc_client_id": {"type": "string"}, "koji_profile": {"type": "string"}, "koji_event": {"type": "number"}, + "koji_cache": {"type": "string"}, "pkgset_koji_tag": {"$ref": "#/definitions/strings"}, "pkgset_koji_builds": {"$ref": "#/definitions/strings"}, "pkgset_koji_scratch_tasks": {"$ref": "#/definitions/strings"}, diff --git a/pungi/compose.py b/pungi/compose.py index e980a9fd..e29b4019 100644 --- a/pungi/compose.py +++ b/pungi/compose.py @@ -39,6 +39,7 @@ from dogpile.cache import make_region from pungi.graph import SimpleAcyclicOrientedGraph from pungi.wrappers.variants import VariantsXmlParser from pungi.paths import Paths +from pungi.wrappers.kojiwrapper import KojiDownloadProxy from pungi.wrappers.scm import get_file_from_scm from pungi.util import ( makedirs, @@ -409,6 +410,8 @@ class Compose(kobo.log.LoggingBase): else: self.cache_region = make_region().configure("dogpile.cache.null") + self.koji_downloader = KojiDownloadProxy.from_config(self.conf, self._logger) + get_compose_info = staticmethod(get_compose_info) write_compose_info = staticmethod(write_compose_info) get_compose_dir = staticmethod(get_compose_dir) diff --git a/pungi/phases/gather/__init__.py b/pungi/phases/gather/__init__.py index 8e173090..933af196 100644 --- a/pungi/phases/gather/__init__.py +++ b/pungi/phases/gather/__init__.py @@ -645,9 +645,10 @@ def _make_lookaside_repo(compose, variant, arch, pkg_map, package_sets=None): compose.paths.work.topdir(arch="global"), "download" ) + "/", - "koji": lambda: pungi.wrappers.kojiwrapper.KojiWrapper( - compose - ).koji_module.config.topdir.rstrip("/") + "koji": lambda: compose.conf.get( + "koji_cache", + pungi.wrappers.kojiwrapper.KojiWrapper(compose).koji_module.config.topdir, + ).rstrip("/") + "/", "kojimock": lambda: pungi.wrappers.kojiwrapper.KojiMockWrapper( compose, diff --git a/pungi/phases/pkgset/pkgsets.py b/pungi/phases/pkgset/pkgsets.py index 14cad381..e8e84bb0 100644 --- a/pungi/phases/pkgset/pkgsets.py +++ b/pungi/phases/pkgset/pkgsets.py @@ -362,6 +362,7 @@ class KojiPackageSet(PackageSetBase): extra_tasks=None, signed_packages_retries=0, signed_packages_wait=30, + downloader=None, ): """ Creates new KojiPackageSet. @@ -416,6 +417,8 @@ class KojiPackageSet(PackageSetBase): self.signed_packages_retries = signed_packages_retries self.signed_packages_wait = signed_packages_wait + self.downloader = downloader + def __getstate__(self): result = self.__dict__.copy() del result["koji_wrapper"] @@ -537,7 +540,7 @@ class KojiPackageSet(PackageSetBase): # Check if this RPM is coming from scratch task. In this case, we already # know the path. if "path_from_task" in rpm_info: - return rpm_info["path_from_task"] + return self.downloader.get_file(rpm_info["path_from_task"]) pathinfo = self.koji_wrapper.koji_module.pathinfo paths = [] @@ -554,8 +557,9 @@ class KojiPackageSet(PackageSetBase): ) if rpm_path not in paths: paths.append(rpm_path) - if os.path.isfile(rpm_path): - return rpm_path + path = self.downloader.get_file(rpm_path) + if path: + return path # No signed copy was found, wait a little and try again. attempts_left -= 1 @@ -568,16 +572,18 @@ class KojiPackageSet(PackageSetBase): # use an unsigned copy (if allowed) rpm_path = os.path.join(pathinfo.build(build_info), pathinfo.rpm(rpm_info)) paths.append(rpm_path) - if os.path.isfile(rpm_path): - return rpm_path + path = self.downloader.get_file(rpm_path) + if path: + return path if self._allow_invalid_sigkeys and rpm_info["name"] not in self.packages: # use an unsigned copy (if allowed) rpm_path = os.path.join(pathinfo.build(build_info), pathinfo.rpm(rpm_info)) paths.append(rpm_path) - if os.path.isfile(rpm_path): + path = self.downloader.get_file(rpm_path) + if path: self._invalid_sigkey_rpms.append(rpm_info) - return rpm_path + return path self._invalid_sigkey_rpms.append(rpm_info) self.log_error( diff --git a/pungi/phases/pkgset/sources/source_koji.py b/pungi/phases/pkgset/sources/source_koji.py index 712af11e..6c05e353 100644 --- a/pungi/phases/pkgset/sources/source_koji.py +++ b/pungi/phases/pkgset/sources/source_koji.py @@ -193,17 +193,13 @@ class PkgsetSourceKoji(pungi.phases.pkgset.source.PkgsetSourceBase): def __call__(self): compose = self.compose self.koji_wrapper = pungi.wrappers.kojiwrapper.KojiWrapper(compose) - # path prefix must contain trailing '/' - path_prefix = self.koji_wrapper.koji_module.config.topdir.rstrip("/") + "/" - package_sets = get_pkgset_from_koji( - self.compose, self.koji_wrapper, path_prefix - ) - return (package_sets, path_prefix) + package_sets = get_pkgset_from_koji(self.compose, self.koji_wrapper) + return (package_sets, self.compose.koji_downloader.cache_dir) -def get_pkgset_from_koji(compose, koji_wrapper, path_prefix): +def get_pkgset_from_koji(compose, koji_wrapper): event_info = get_koji_event_info(compose, koji_wrapper) - return populate_global_pkgset(compose, koji_wrapper, path_prefix, event_info) + return populate_global_pkgset(compose, koji_wrapper, event_info) def _add_module_to_variant( @@ -232,7 +228,7 @@ def _add_module_to_variant( continue typedir = koji_wrapper.koji_module.pathinfo.typedir(build, archive["btype"]) filename = archive["filename"] - file_path = os.path.join(typedir, filename) + file_path = compose.koji_downloader.get_file(os.path.join(typedir, filename)) try: # If there are two dots, the arch is in the middle. MBS uploads # files with actual architecture in the filename, but Pungi deals @@ -400,7 +396,13 @@ def _is_filtered_out(compose, variant, arch, module_name, module_stream): def _get_modules_from_koji( - compose, koji_wrapper, event, variant, variant_tags, tag_to_mmd, exclude_module_ns + compose, + koji_wrapper, + event, + variant, + variant_tags, + tag_to_mmd, + exclude_module_ns, ): """ Loads modules for given `variant` from koji `session`, adds them to @@ -675,7 +677,7 @@ def _get_modules_from_koji_tags( ) -def populate_global_pkgset(compose, koji_wrapper, path_prefix, event): +def populate_global_pkgset(compose, koji_wrapper, event): all_arches = get_all_arches(compose) # List of compose tags from which we create this compose @@ -769,7 +771,12 @@ def populate_global_pkgset(compose, koji_wrapper, path_prefix, event): if extra_modules: _add_extra_modules_to_variant( - compose, koji_wrapper, variant, extra_modules, variant_tags, tag_to_mmd + compose, + koji_wrapper, + variant, + extra_modules, + variant_tags, + tag_to_mmd, ) variant_scratch_modules = get_variant_data( @@ -826,6 +833,7 @@ def populate_global_pkgset(compose, koji_wrapper, path_prefix, event): cache_region=compose.cache_region, signed_packages_retries=compose.conf["signed_packages_retries"], signed_packages_wait=compose.conf["signed_packages_wait"], + downloader=compose.koji_downloader, **kwargs ) @@ -912,7 +920,7 @@ def populate_global_pkgset(compose, koji_wrapper, path_prefix, event): MaterializedPackageSet.create, compose, pkgset, - path_prefix, + compose.koji_downloader.cache_dir, mmd=tag_to_mmd.get(pkgset.name), ) ) diff --git a/pungi/util.py b/pungi/util.py index e5bf0743..bf2a3cac 100644 --- a/pungi/util.py +++ b/pungi/util.py @@ -461,6 +461,9 @@ def get_volid(compose, arch, variant=None, disc_type=False, formats=None, **kwar if not variant_uid and "%(variant)s" in i: continue try: + # fmt: off + # Black wants to add a comma after kwargs, but that's not valid in + # Python 2.7 args = get_format_substs( compose, variant=variant_uid, @@ -472,6 +475,7 @@ def get_volid(compose, arch, variant=None, disc_type=False, formats=None, **kwar base_product_version=base_product_version, **kwargs ) + # fmt: on volid = (i % args).format(**args) except KeyError as err: raise RuntimeError( @@ -1146,3 +1150,16 @@ def read_json_file(file_path): """A helper function to read a JSON file.""" with open(file_path) as f: return json.load(f) + + +UNITS = ["", "Ki", "Mi", "Gi", "Ti"] + + +def format_size(sz): + sz = float(sz) + unit = 0 + while sz > 1024: + sz /= 1024 + unit += 1 + + return "%.3g %sB" % (sz, UNITS[unit]) diff --git a/pungi/wrappers/kojiwrapper.py b/pungi/wrappers/kojiwrapper.py index f978229e..6358f882 100644 --- a/pungi/wrappers/kojiwrapper.py +++ b/pungi/wrappers/kojiwrapper.py @@ -14,17 +14,23 @@ # along with this program; if not, see . +import contextlib import os import re +import socket +import shutil import time import threading -import contextlib + +import requests import koji from kobo.shortcuts import run, force_list import six from six.moves import configparser, shlex_quote import six.moves.xmlrpc_client as xmlrpclib +from flufl.lock import Lock +from datetime import timedelta from .kojimock import KojiMock from .. import util @@ -934,3 +940,144 @@ def get_buildroot_rpms(compose, task_id): continue result.append(i) return sorted(result) + + +class KojiDownloadProxy: + def __init__(self, topdir, topurl, cache_dir, logger): + if not topdir: + self.has_local_access = True + return + + self.cache_dir = cache_dir + self.logger = logger + + self.topdir = topdir + self.topurl = topurl + + self.has_local_access = os.path.isdir(self.topdir) + # This is used for temporary downloaded files. The suffix is unique + # per-process. To prevent threads in the same process from colliding, a + # thread id is added later. + self.unique_suffix = "%s.%s" % (socket.gethostname(), os.getpid()) + self.session = None + if not self.has_local_access: + self.session = requests.Session() + + @classmethod + def from_config(klass, conf, logger): + topdir = None + topurl = None + path_prefix = None + if "koji_profile" in conf: + koji_module = koji.get_profile_module(conf["koji_profile"]) + topdir = koji_module.config.topdir + topurl = koji_module.config.topurl + + path_prefix = topdir.rstrip("/") + "/" + if not os.path.exists(path_prefix): + path_prefix = conf["koji_cache"].rstrip("/") + "/" + return klass(topdir, topurl, path_prefix, logger) + + @util.retry(wait_on=requests.exceptions.RequestException) + def _download(self, url, dest): + """Download file into given location + + :param str url: URL of the file to download + :param str dest: file path to store the result in + :returns: path to the downloaded file (same as dest) or None if the URL + """ + with self.session.get(url, stream=True) as r: + if r.status_code == 404: + self.logger.warning("GET %s NOT FOUND", url) + return None + if r.status_code != 200: + self.logger.error("GET %s %s", url, r.status_code) + r.raise_for_status() + # The exception from here will be retried by the decorator. + + file_size = int(r.headers.get("Content-Length", 0)) + self.logger.info("GET %s OK %s", url, util.format_size(file_size)) + with open(dest, "wb") as f: + shutil.copyfileobj(r.raw, f) + return dest + + def _atomic_download(self, url, dest): + """Atomically download a file + + :param str url: URL of the file to download + :param str dest: file path to store the result in + :returns: path to the downloaded file (same as dest) or None if the URL + return 404. + """ + temp_file = "%s.%s.%s" % (dest, self.unique_suffix, threading.get_ident()) + + # First download to the temporary location. + try: + if self._download(url, temp_file) is None: + # The file was not found. + return None + except Exception: + # Download failed, let's make sure to clean up potentially partial + # temporary file. + try: + os.remove(temp_file) + except Exception: + self.logger.warning("Failed to delete %s", temp_file) + pass + raise + + # Atomically move the temporary file into final location + os.rename(temp_file, dest) + return dest + + def _download_file(self, path): + """Ensure file on Koji volume in ``path`` is present in the local + cache. + + :returns: path to the local file or None if file is not found + """ + url = path.replace(self.topdir, self.topurl) + destination_file = path.replace(self.topdir, self.cache_dir) + util.makedirs(os.path.dirname(destination_file)) + + lock = Lock(destination_file + ".lock") + # Hold the lock for this file for 5 minutes. If another compose needs + # the same file but it's not downloaded yet, the process will wait. + # + # If the download finishes in time, the downloaded file will be used + # here. + # + # If the download takes longer, this process will steal the lock and + # start its own download. + # + # That should not be a problem: the same file will be downloaded and + # then replaced atomically on the filesystem. If the original process + # managed to hardlink the first file already, that hardlink will be + # broken, but that will only result in the same file stored twice. + lock.lifetime = timedelta(minutes=5) + + with lock: + # Check if the file already exists. If yes, return the path. + if os.path.exists(destination_file): + # Update mtime of the file. This covers the case of packages in the + # tag that are not included in the compose. Updating mtime will + # exempt them from cleanup for extra time. + os.utime(destination_file) + return destination_file + + return self._atomic_download(url, destination_file) + + def get_file(self, path): + """ + If path refers to an existing file in Koji, return a valid local path + to it. If no such file exists, return None. + """ + if self.has_local_access: + # We have koji volume mounted locally. No transformation needed for + # the path, just check it exists. + if os.path.exists(path): + return path + return None + else: + # We need to download the file. + return self._download_file(path) diff --git a/requirements.txt b/requirements.txt index 9acc2ad6..4f24c110 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,8 @@ # Some packages must be installed via dnf/yum first, see doc/contributing.rst dict.sorted dogpile.cache +flufl.lock ; python_version >= '3.0' +flufl.lock < 3.0 ; python_version <= '2.7' funcsigs jsonschema kobo