From 631bb01d8f9eac2045a8933a89e0941df7b5f9e6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lubom=C3=ADr=20Sedl=C3=A1=C5=99?= Date: Mon, 12 Dec 2022 09:23:31 +0100 Subject: [PATCH] Add support for not having koji volume mounted locally MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit With this patch, Pungi can be configured with a local directory to be used as a cache for RPMs, and it will download packages from Koji over HTTP instead of reading them from filesystem directly. The files from the cache can then be hardlink as usual. There is locking in place to avoid different composes running at the same time to step on each other. This is now supported for RPMs only, be it real builds or scratch builds. Signed-off-by: Lubomír Sedlář --- doc/index.rst | 1 + doc/koji.rst | 103 ++++++++++++++ pungi/checks.py | 1 + pungi/compose.py | 3 + pungi/phases/gather/__init__.py | 7 +- pungi/phases/pkgset/pkgsets.py | 20 ++- pungi/phases/pkgset/sources/source_koji.py | 34 +++-- pungi/util.py | 17 +++ pungi/wrappers/kojiwrapper.py | 149 ++++++++++++++++++++- requirements.txt | 2 + 10 files changed, 313 insertions(+), 24 deletions(-) create mode 100644 doc/koji.rst diff --git a/doc/index.rst b/doc/index.rst index 16d9bf58..b71e67d3 100644 --- a/doc/index.rst +++ b/doc/index.rst @@ -19,6 +19,7 @@ Contents: scm_support messaging gathering + koji comps contributing testing diff --git a/doc/koji.rst b/doc/koji.rst new file mode 100644 index 00000000..7403e551 --- /dev/null +++ b/doc/koji.rst @@ -0,0 +1,103 @@ +====================== +Getting data from koji +====================== + +When Pungi is configured to get packages from a Koji tag, it somehow needs to +access the actual RPM files. + +Historically, this required the storage used by Koji to be directly available +on the host where Pungi was running. This was usually achieved by using NFS for +the Koji volume, and mounting it on the compose host. + +The compose could be created directly on the same volume. In such case the +packages would be hardlinked, significantly reducing space consumption. + +The compose could also be created on a different storage, in which case the +packages would either need to be copied over or symlinked. Using symlinks +requires that anything that accesses the compose (e.g. a download server) would +also need to mount the Koji volume in the same location. + +There is also a risk with symlinks that the package in Koji can change (due to +being resigned for example), which would invalidate composes linking to it. + + +Using Koji without direct mount +=============================== + +It is possible now to run a compose from a Koji tag without direct access to +Koji storage. + +Pungi can download the packages over HTTP protocol, store them in a local +cache, and consume them from there. + +The local cache has similar structure to what is on the Koji volume. + +When Pungi needs some package, it has a path on Koji volume. It will replace +the ``topdir`` with the cache location. If such file exists, it will be used. +If it doesn't exist, it will be downloaded from Koji (by replacing the +``topdir`` with ``topurl``). + +:: + + Koji path /mnt/koji/packages/foo/1/1.fc38/data/signed/abcdef/noarch/foo-1-1.fc38.noarch.rpm + Koji URL https://kojipkgs.fedoraproject.org/packages/foo/1/1.fc38/data/signed/abcdef/noarch/foo-1-1.fc38.noarch.rpm + Local path /mnt/compose/cache/packages/foo/1/1.fc38/data/signed/abcdef/noarch/foo-1-1.fc38.noarch.rpm + +The packages can be hardlinked from this cache directory. + + +Cleanup +------- + +While the approach above allows each RPM to be downloaded only once, it will +eventually result in the Koji volume being mirrored locally. Most of the +packages will however no longer be needed. + +There is a script ``pungi-cache-cleanup`` that can help with that. It can find +and remove files from the cache that are no longer needed. + +A file is no longer needed if it has a single link (meaning it is only in the +cache, not in any compose), and it has mtime older than a given threshold. + +It doesn't make sense to delete files that are hardlinked in an existing +compose as it would not save any space anyway. + +The mtime check is meant to preserve files that are downloaded but not actually +used in a compose, like a subpackage that is not included in any variant. Every +time its existence in the local cache is checked, the mtime is updated. + + +Race conditions? +---------------- + +It should be safe to have multiple compose hosts share the same storage volume +for generated composes and local cache. + +If a cache file is accessed and it exists, there's no risk of race condition. + +If two composes need the same file at the same time and it is not present yet, +one of them will take a lock on it and start downloading. The other will wait +until the download is finished. + +The lock is only valid for a set amount of time (5 minutes) to avoid issues +where the downloading process is killed in a way that blocks it from releasing +the lock. + +If the file is large and network slow, the limit may not be enough finish +downloading. In that case the second process will steal the lock while the +first process is still downloading. This will result in the same file being +downloaded twice. + +When the first process finishes the download, it will put the file into the +local cache location. When the second process finishes, it will atomically +replace it, but since it's the same file it will be the same file. + +If the first compose already managed to hardlink the file before it gets +replaced, there will be two copies of the file present locally. + + +Caveats +------- + +There is no integrity checking. Ideally Koji should provide checksums for the +RPMs that would be verified after downloading. This is not yet available. diff --git a/pungi/checks.py b/pungi/checks.py index 3bd334ef..d6ffabe1 100644 --- a/pungi/checks.py +++ b/pungi/checks.py @@ -836,6 +836,7 @@ def make_schema(): "cts_oidc_client_id": {"type": "string"}, "koji_profile": {"type": "string"}, "koji_event": {"type": "number"}, + "koji_cache": {"type": "string"}, "pkgset_koji_tag": {"$ref": "#/definitions/strings"}, "pkgset_koji_builds": {"$ref": "#/definitions/strings"}, "pkgset_koji_scratch_tasks": {"$ref": "#/definitions/strings"}, diff --git a/pungi/compose.py b/pungi/compose.py index e980a9fd..e29b4019 100644 --- a/pungi/compose.py +++ b/pungi/compose.py @@ -39,6 +39,7 @@ from dogpile.cache import make_region from pungi.graph import SimpleAcyclicOrientedGraph from pungi.wrappers.variants import VariantsXmlParser from pungi.paths import Paths +from pungi.wrappers.kojiwrapper import KojiDownloadProxy from pungi.wrappers.scm import get_file_from_scm from pungi.util import ( makedirs, @@ -409,6 +410,8 @@ class Compose(kobo.log.LoggingBase): else: self.cache_region = make_region().configure("dogpile.cache.null") + self.koji_downloader = KojiDownloadProxy.from_config(self.conf, self._logger) + get_compose_info = staticmethod(get_compose_info) write_compose_info = staticmethod(write_compose_info) get_compose_dir = staticmethod(get_compose_dir) diff --git a/pungi/phases/gather/__init__.py b/pungi/phases/gather/__init__.py index 6667b4ce..008da660 100644 --- a/pungi/phases/gather/__init__.py +++ b/pungi/phases/gather/__init__.py @@ -644,9 +644,10 @@ def _make_lookaside_repo(compose, variant, arch, pkg_map, package_sets=None): compose.paths.work.topdir(arch="global"), "download" ) + "/", - "koji": lambda: pungi.wrappers.kojiwrapper.KojiWrapper( - compose - ).koji_module.config.topdir.rstrip("/") + "koji": lambda: compose.conf.get( + "koji_cache", + pungi.wrappers.kojiwrapper.KojiWrapper(compose).koji_module.config.topdir, + ).rstrip("/") + "/", } path_prefix = prefixes[compose.conf["pkgset_source"]]() diff --git a/pungi/phases/pkgset/pkgsets.py b/pungi/phases/pkgset/pkgsets.py index dff9ebbf..938929ca 100644 --- a/pungi/phases/pkgset/pkgsets.py +++ b/pungi/phases/pkgset/pkgsets.py @@ -354,6 +354,7 @@ class KojiPackageSet(PackageSetBase): extra_tasks=None, signed_packages_retries=0, signed_packages_wait=30, + downloader=None, ): """ Creates new KojiPackageSet. @@ -408,6 +409,8 @@ class KojiPackageSet(PackageSetBase): self.signed_packages_retries = signed_packages_retries self.signed_packages_wait = signed_packages_wait + self.downloader = downloader + def __getstate__(self): result = self.__dict__.copy() del result["koji_wrapper"] @@ -526,7 +529,7 @@ class KojiPackageSet(PackageSetBase): # Check if this RPM is coming from scratch task. In this case, we already # know the path. if "path_from_task" in rpm_info: - return rpm_info["path_from_task"] + return self.downloader.get_file(rpm_info["path_from_task"]) pathinfo = self.koji_wrapper.koji_module.pathinfo paths = [] @@ -543,8 +546,9 @@ class KojiPackageSet(PackageSetBase): ) if rpm_path not in paths: paths.append(rpm_path) - if os.path.isfile(rpm_path): - return rpm_path + path = self.downloader.get_file(rpm_path) + if path: + return path # No signed copy was found, wait a little and try again. attempts_left -= 1 @@ -557,16 +561,18 @@ class KojiPackageSet(PackageSetBase): # use an unsigned copy (if allowed) rpm_path = os.path.join(pathinfo.build(build_info), pathinfo.rpm(rpm_info)) paths.append(rpm_path) - if os.path.isfile(rpm_path): - return rpm_path + path = self.downloader.get_file(rpm_path) + if path: + return path if self._allow_invalid_sigkeys and rpm_info["name"] not in self.packages: # use an unsigned copy (if allowed) rpm_path = os.path.join(pathinfo.build(build_info), pathinfo.rpm(rpm_info)) paths.append(rpm_path) - if os.path.isfile(rpm_path): + path = self.downloader.get_file(rpm_path) + if path: self._invalid_sigkey_rpms.append(rpm_info) - return rpm_path + return path self._invalid_sigkey_rpms.append(rpm_info) self.log_error( diff --git a/pungi/phases/pkgset/sources/source_koji.py b/pungi/phases/pkgset/sources/source_koji.py index 712af11e..6c05e353 100644 --- a/pungi/phases/pkgset/sources/source_koji.py +++ b/pungi/phases/pkgset/sources/source_koji.py @@ -193,17 +193,13 @@ class PkgsetSourceKoji(pungi.phases.pkgset.source.PkgsetSourceBase): def __call__(self): compose = self.compose self.koji_wrapper = pungi.wrappers.kojiwrapper.KojiWrapper(compose) - # path prefix must contain trailing '/' - path_prefix = self.koji_wrapper.koji_module.config.topdir.rstrip("/") + "/" - package_sets = get_pkgset_from_koji( - self.compose, self.koji_wrapper, path_prefix - ) - return (package_sets, path_prefix) + package_sets = get_pkgset_from_koji(self.compose, self.koji_wrapper) + return (package_sets, self.compose.koji_downloader.cache_dir) -def get_pkgset_from_koji(compose, koji_wrapper, path_prefix): +def get_pkgset_from_koji(compose, koji_wrapper): event_info = get_koji_event_info(compose, koji_wrapper) - return populate_global_pkgset(compose, koji_wrapper, path_prefix, event_info) + return populate_global_pkgset(compose, koji_wrapper, event_info) def _add_module_to_variant( @@ -232,7 +228,7 @@ def _add_module_to_variant( continue typedir = koji_wrapper.koji_module.pathinfo.typedir(build, archive["btype"]) filename = archive["filename"] - file_path = os.path.join(typedir, filename) + file_path = compose.koji_downloader.get_file(os.path.join(typedir, filename)) try: # If there are two dots, the arch is in the middle. MBS uploads # files with actual architecture in the filename, but Pungi deals @@ -400,7 +396,13 @@ def _is_filtered_out(compose, variant, arch, module_name, module_stream): def _get_modules_from_koji( - compose, koji_wrapper, event, variant, variant_tags, tag_to_mmd, exclude_module_ns + compose, + koji_wrapper, + event, + variant, + variant_tags, + tag_to_mmd, + exclude_module_ns, ): """ Loads modules for given `variant` from koji `session`, adds them to @@ -675,7 +677,7 @@ def _get_modules_from_koji_tags( ) -def populate_global_pkgset(compose, koji_wrapper, path_prefix, event): +def populate_global_pkgset(compose, koji_wrapper, event): all_arches = get_all_arches(compose) # List of compose tags from which we create this compose @@ -769,7 +771,12 @@ def populate_global_pkgset(compose, koji_wrapper, path_prefix, event): if extra_modules: _add_extra_modules_to_variant( - compose, koji_wrapper, variant, extra_modules, variant_tags, tag_to_mmd + compose, + koji_wrapper, + variant, + extra_modules, + variant_tags, + tag_to_mmd, ) variant_scratch_modules = get_variant_data( @@ -826,6 +833,7 @@ def populate_global_pkgset(compose, koji_wrapper, path_prefix, event): cache_region=compose.cache_region, signed_packages_retries=compose.conf["signed_packages_retries"], signed_packages_wait=compose.conf["signed_packages_wait"], + downloader=compose.koji_downloader, **kwargs ) @@ -912,7 +920,7 @@ def populate_global_pkgset(compose, koji_wrapper, path_prefix, event): MaterializedPackageSet.create, compose, pkgset, - path_prefix, + compose.koji_downloader.cache_dir, mmd=tag_to_mmd.get(pkgset.name), ) ) diff --git a/pungi/util.py b/pungi/util.py index e5bf0743..bf2a3cac 100644 --- a/pungi/util.py +++ b/pungi/util.py @@ -461,6 +461,9 @@ def get_volid(compose, arch, variant=None, disc_type=False, formats=None, **kwar if not variant_uid and "%(variant)s" in i: continue try: + # fmt: off + # Black wants to add a comma after kwargs, but that's not valid in + # Python 2.7 args = get_format_substs( compose, variant=variant_uid, @@ -472,6 +475,7 @@ def get_volid(compose, arch, variant=None, disc_type=False, formats=None, **kwar base_product_version=base_product_version, **kwargs ) + # fmt: on volid = (i % args).format(**args) except KeyError as err: raise RuntimeError( @@ -1146,3 +1150,16 @@ def read_json_file(file_path): """A helper function to read a JSON file.""" with open(file_path) as f: return json.load(f) + + +UNITS = ["", "Ki", "Mi", "Gi", "Ti"] + + +def format_size(sz): + sz = float(sz) + unit = 0 + while sz > 1024: + sz /= 1024 + unit += 1 + + return "%.3g %sB" % (sz, UNITS[unit]) diff --git a/pungi/wrappers/kojiwrapper.py b/pungi/wrappers/kojiwrapper.py index 6870beac..9ad7f88b 100644 --- a/pungi/wrappers/kojiwrapper.py +++ b/pungi/wrappers/kojiwrapper.py @@ -14,17 +14,23 @@ # along with this program; if not, see . +import contextlib import os import re +import socket +import shutil import time import threading -import contextlib + +import requests import koji from kobo.shortcuts import run, force_list import six from six.moves import configparser, shlex_quote import six.moves.xmlrpc_client as xmlrpclib +from flufl.lock import Lock +from datetime import timedelta from .. import util from ..arch_utils import getBaseArch @@ -894,3 +900,144 @@ def get_buildroot_rpms(compose, task_id): continue result.append(i) return sorted(result) + + +class KojiDownloadProxy: + def __init__(self, topdir, topurl, cache_dir, logger): + if not topdir: + self.has_local_access = True + return + + self.cache_dir = cache_dir + self.logger = logger + + self.topdir = topdir + self.topurl = topurl + + self.has_local_access = os.path.isdir(self.topdir) + # This is used for temporary downloaded files. The suffix is unique + # per-process. To prevent threads in the same process from colliding, a + # thread id is added later. + self.unique_suffix = "%s.%s" % (socket.gethostname(), os.getpid()) + self.session = None + if not self.has_local_access: + self.session = requests.Session() + + @classmethod + def from_config(klass, conf, logger): + topdir = None + topurl = None + path_prefix = None + if "koji_profile" in conf: + koji_module = koji.get_profile_module(conf["koji_profile"]) + topdir = koji_module.config.topdir + topurl = koji_module.config.topurl + + path_prefix = topdir.rstrip("/") + "/" + if not os.path.exists(path_prefix): + path_prefix = conf["koji_cache"].rstrip("/") + "/" + return klass(topdir, topurl, path_prefix, logger) + + @util.retry(wait_on=requests.exceptions.RequestException) + def _download(self, url, dest): + """Download file into given location + + :param str url: URL of the file to download + :param str dest: file path to store the result in + :returns: path to the downloaded file (same as dest) or None if the URL + """ + with self.session.get(url, stream=True) as r: + if r.status_code == 404: + self.logger.warning("GET %s NOT FOUND", url) + return None + if r.status_code != 200: + self.logger.error("GET %s %s", url, r.status_code) + r.raise_for_status() + # The exception from here will be retried by the decorator. + + file_size = int(r.headers.get("Content-Length", 0)) + self.logger.info("GET %s OK %s", url, util.format_size(file_size)) + with open(dest, "wb") as f: + shutil.copyfileobj(r.raw, f) + return dest + + def _atomic_download(self, url, dest): + """Atomically download a file + + :param str url: URL of the file to download + :param str dest: file path to store the result in + :returns: path to the downloaded file (same as dest) or None if the URL + return 404. + """ + temp_file = "%s.%s.%s" % (dest, self.unique_suffix, threading.get_ident()) + + # First download to the temporary location. + try: + if self._download(url, temp_file) is None: + # The file was not found. + return None + except Exception: + # Download failed, let's make sure to clean up potentially partial + # temporary file. + try: + os.remove(temp_file) + except Exception: + self.logger.warning("Failed to delete %s", temp_file) + pass + raise + + # Atomically move the temporary file into final location + os.rename(temp_file, dest) + return dest + + def _download_file(self, path): + """Ensure file on Koji volume in ``path`` is present in the local + cache. + + :returns: path to the local file or None if file is not found + """ + url = path.replace(self.topdir, self.topurl) + destination_file = path.replace(self.topdir, self.cache_dir) + util.makedirs(os.path.dirname(destination_file)) + + lock = Lock(destination_file + ".lock") + # Hold the lock for this file for 5 minutes. If another compose needs + # the same file but it's not downloaded yet, the process will wait. + # + # If the download finishes in time, the downloaded file will be used + # here. + # + # If the download takes longer, this process will steal the lock and + # start its own download. + # + # That should not be a problem: the same file will be downloaded and + # then replaced atomically on the filesystem. If the original process + # managed to hardlink the first file already, that hardlink will be + # broken, but that will only result in the same file stored twice. + lock.lifetime = timedelta(minutes=5) + + with lock: + # Check if the file already exists. If yes, return the path. + if os.path.exists(destination_file): + # Update mtime of the file. This covers the case of packages in the + # tag that are not included in the compose. Updating mtime will + # exempt them from cleanup for extra time. + os.utime(destination_file) + return destination_file + + return self._atomic_download(url, destination_file) + + def get_file(self, path): + """ + If path refers to an existing file in Koji, return a valid local path + to it. If no such file exists, return None. + """ + if self.has_local_access: + # We have koji volume mounted locally. No transformation needed for + # the path, just check it exists. + if os.path.exists(path): + return path + return None + else: + # We need to download the file. + return self._download_file(path) diff --git a/requirements.txt b/requirements.txt index 9acc2ad6..4f24c110 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,8 @@ # Some packages must be installed via dnf/yum first, see doc/contributing.rst dict.sorted dogpile.cache +flufl.lock ; python_version >= '3.0' +flufl.lock < 3.0 ; python_version <= '2.7' funcsigs jsonschema kobo