Add support for not having koji volume mounted locally

With this patch, Pungi can be configured with a local directory to be
used as a cache for RPMs, and it will download packages from Koji over
HTTP instead of reading them from filesystem directly.

The files from the cache can then be hardlink as usual.

There is locking in place to avoid different composes running at the
same time to step on each other.

This is now supported for RPMs only, be it real builds or scratch
builds.

Signed-off-by: Lubomír Sedlář <lsedlar@redhat.com>
This commit is contained in:
Lubomír Sedlář 2022-12-12 09:23:31 +01:00 committed by lsedlar
parent b6296bdfcd
commit 631bb01d8f
10 changed files with 313 additions and 24 deletions

View File

@ -19,6 +19,7 @@ Contents:
scm_support scm_support
messaging messaging
gathering gathering
koji
comps comps
contributing contributing
testing testing

103
doc/koji.rst Normal file
View File

@ -0,0 +1,103 @@
======================
Getting data from koji
======================
When Pungi is configured to get packages from a Koji tag, it somehow needs to
access the actual RPM files.
Historically, this required the storage used by Koji to be directly available
on the host where Pungi was running. This was usually achieved by using NFS for
the Koji volume, and mounting it on the compose host.
The compose could be created directly on the same volume. In such case the
packages would be hardlinked, significantly reducing space consumption.
The compose could also be created on a different storage, in which case the
packages would either need to be copied over or symlinked. Using symlinks
requires that anything that accesses the compose (e.g. a download server) would
also need to mount the Koji volume in the same location.
There is also a risk with symlinks that the package in Koji can change (due to
being resigned for example), which would invalidate composes linking to it.
Using Koji without direct mount
===============================
It is possible now to run a compose from a Koji tag without direct access to
Koji storage.
Pungi can download the packages over HTTP protocol, store them in a local
cache, and consume them from there.
The local cache has similar structure to what is on the Koji volume.
When Pungi needs some package, it has a path on Koji volume. It will replace
the ``topdir`` with the cache location. If such file exists, it will be used.
If it doesn't exist, it will be downloaded from Koji (by replacing the
``topdir`` with ``topurl``).
::
Koji path /mnt/koji/packages/foo/1/1.fc38/data/signed/abcdef/noarch/foo-1-1.fc38.noarch.rpm
Koji URL https://kojipkgs.fedoraproject.org/packages/foo/1/1.fc38/data/signed/abcdef/noarch/foo-1-1.fc38.noarch.rpm
Local path /mnt/compose/cache/packages/foo/1/1.fc38/data/signed/abcdef/noarch/foo-1-1.fc38.noarch.rpm
The packages can be hardlinked from this cache directory.
Cleanup
-------
While the approach above allows each RPM to be downloaded only once, it will
eventually result in the Koji volume being mirrored locally. Most of the
packages will however no longer be needed.
There is a script ``pungi-cache-cleanup`` that can help with that. It can find
and remove files from the cache that are no longer needed.
A file is no longer needed if it has a single link (meaning it is only in the
cache, not in any compose), and it has mtime older than a given threshold.
It doesn't make sense to delete files that are hardlinked in an existing
compose as it would not save any space anyway.
The mtime check is meant to preserve files that are downloaded but not actually
used in a compose, like a subpackage that is not included in any variant. Every
time its existence in the local cache is checked, the mtime is updated.
Race conditions?
----------------
It should be safe to have multiple compose hosts share the same storage volume
for generated composes and local cache.
If a cache file is accessed and it exists, there's no risk of race condition.
If two composes need the same file at the same time and it is not present yet,
one of them will take a lock on it and start downloading. The other will wait
until the download is finished.
The lock is only valid for a set amount of time (5 minutes) to avoid issues
where the downloading process is killed in a way that blocks it from releasing
the lock.
If the file is large and network slow, the limit may not be enough finish
downloading. In that case the second process will steal the lock while the
first process is still downloading. This will result in the same file being
downloaded twice.
When the first process finishes the download, it will put the file into the
local cache location. When the second process finishes, it will atomically
replace it, but since it's the same file it will be the same file.
If the first compose already managed to hardlink the file before it gets
replaced, there will be two copies of the file present locally.
Caveats
-------
There is no integrity checking. Ideally Koji should provide checksums for the
RPMs that would be verified after downloading. This is not yet available.

View File

@ -836,6 +836,7 @@ def make_schema():
"cts_oidc_client_id": {"type": "string"}, "cts_oidc_client_id": {"type": "string"},
"koji_profile": {"type": "string"}, "koji_profile": {"type": "string"},
"koji_event": {"type": "number"}, "koji_event": {"type": "number"},
"koji_cache": {"type": "string"},
"pkgset_koji_tag": {"$ref": "#/definitions/strings"}, "pkgset_koji_tag": {"$ref": "#/definitions/strings"},
"pkgset_koji_builds": {"$ref": "#/definitions/strings"}, "pkgset_koji_builds": {"$ref": "#/definitions/strings"},
"pkgset_koji_scratch_tasks": {"$ref": "#/definitions/strings"}, "pkgset_koji_scratch_tasks": {"$ref": "#/definitions/strings"},

View File

@ -39,6 +39,7 @@ from dogpile.cache import make_region
from pungi.graph import SimpleAcyclicOrientedGraph from pungi.graph import SimpleAcyclicOrientedGraph
from pungi.wrappers.variants import VariantsXmlParser from pungi.wrappers.variants import VariantsXmlParser
from pungi.paths import Paths from pungi.paths import Paths
from pungi.wrappers.kojiwrapper import KojiDownloadProxy
from pungi.wrappers.scm import get_file_from_scm from pungi.wrappers.scm import get_file_from_scm
from pungi.util import ( from pungi.util import (
makedirs, makedirs,
@ -409,6 +410,8 @@ class Compose(kobo.log.LoggingBase):
else: else:
self.cache_region = make_region().configure("dogpile.cache.null") self.cache_region = make_region().configure("dogpile.cache.null")
self.koji_downloader = KojiDownloadProxy.from_config(self.conf, self._logger)
get_compose_info = staticmethod(get_compose_info) get_compose_info = staticmethod(get_compose_info)
write_compose_info = staticmethod(write_compose_info) write_compose_info = staticmethod(write_compose_info)
get_compose_dir = staticmethod(get_compose_dir) get_compose_dir = staticmethod(get_compose_dir)

View File

@ -644,9 +644,10 @@ def _make_lookaside_repo(compose, variant, arch, pkg_map, package_sets=None):
compose.paths.work.topdir(arch="global"), "download" compose.paths.work.topdir(arch="global"), "download"
) )
+ "/", + "/",
"koji": lambda: pungi.wrappers.kojiwrapper.KojiWrapper( "koji": lambda: compose.conf.get(
compose "koji_cache",
).koji_module.config.topdir.rstrip("/") pungi.wrappers.kojiwrapper.KojiWrapper(compose).koji_module.config.topdir,
).rstrip("/")
+ "/", + "/",
} }
path_prefix = prefixes[compose.conf["pkgset_source"]]() path_prefix = prefixes[compose.conf["pkgset_source"]]()

View File

@ -354,6 +354,7 @@ class KojiPackageSet(PackageSetBase):
extra_tasks=None, extra_tasks=None,
signed_packages_retries=0, signed_packages_retries=0,
signed_packages_wait=30, signed_packages_wait=30,
downloader=None,
): ):
""" """
Creates new KojiPackageSet. Creates new KojiPackageSet.
@ -408,6 +409,8 @@ class KojiPackageSet(PackageSetBase):
self.signed_packages_retries = signed_packages_retries self.signed_packages_retries = signed_packages_retries
self.signed_packages_wait = signed_packages_wait self.signed_packages_wait = signed_packages_wait
self.downloader = downloader
def __getstate__(self): def __getstate__(self):
result = self.__dict__.copy() result = self.__dict__.copy()
del result["koji_wrapper"] del result["koji_wrapper"]
@ -526,7 +529,7 @@ class KojiPackageSet(PackageSetBase):
# Check if this RPM is coming from scratch task. In this case, we already # Check if this RPM is coming from scratch task. In this case, we already
# know the path. # know the path.
if "path_from_task" in rpm_info: if "path_from_task" in rpm_info:
return rpm_info["path_from_task"] return self.downloader.get_file(rpm_info["path_from_task"])
pathinfo = self.koji_wrapper.koji_module.pathinfo pathinfo = self.koji_wrapper.koji_module.pathinfo
paths = [] paths = []
@ -543,8 +546,9 @@ class KojiPackageSet(PackageSetBase):
) )
if rpm_path not in paths: if rpm_path not in paths:
paths.append(rpm_path) paths.append(rpm_path)
if os.path.isfile(rpm_path): path = self.downloader.get_file(rpm_path)
return rpm_path if path:
return path
# No signed copy was found, wait a little and try again. # No signed copy was found, wait a little and try again.
attempts_left -= 1 attempts_left -= 1
@ -557,16 +561,18 @@ class KojiPackageSet(PackageSetBase):
# use an unsigned copy (if allowed) # use an unsigned copy (if allowed)
rpm_path = os.path.join(pathinfo.build(build_info), pathinfo.rpm(rpm_info)) rpm_path = os.path.join(pathinfo.build(build_info), pathinfo.rpm(rpm_info))
paths.append(rpm_path) paths.append(rpm_path)
if os.path.isfile(rpm_path): path = self.downloader.get_file(rpm_path)
return rpm_path if path:
return path
if self._allow_invalid_sigkeys and rpm_info["name"] not in self.packages: if self._allow_invalid_sigkeys and rpm_info["name"] not in self.packages:
# use an unsigned copy (if allowed) # use an unsigned copy (if allowed)
rpm_path = os.path.join(pathinfo.build(build_info), pathinfo.rpm(rpm_info)) rpm_path = os.path.join(pathinfo.build(build_info), pathinfo.rpm(rpm_info))
paths.append(rpm_path) paths.append(rpm_path)
if os.path.isfile(rpm_path): path = self.downloader.get_file(rpm_path)
if path:
self._invalid_sigkey_rpms.append(rpm_info) self._invalid_sigkey_rpms.append(rpm_info)
return rpm_path return path
self._invalid_sigkey_rpms.append(rpm_info) self._invalid_sigkey_rpms.append(rpm_info)
self.log_error( self.log_error(

View File

@ -193,17 +193,13 @@ class PkgsetSourceKoji(pungi.phases.pkgset.source.PkgsetSourceBase):
def __call__(self): def __call__(self):
compose = self.compose compose = self.compose
self.koji_wrapper = pungi.wrappers.kojiwrapper.KojiWrapper(compose) self.koji_wrapper = pungi.wrappers.kojiwrapper.KojiWrapper(compose)
# path prefix must contain trailing '/' package_sets = get_pkgset_from_koji(self.compose, self.koji_wrapper)
path_prefix = self.koji_wrapper.koji_module.config.topdir.rstrip("/") + "/" return (package_sets, self.compose.koji_downloader.cache_dir)
package_sets = get_pkgset_from_koji(
self.compose, self.koji_wrapper, path_prefix
)
return (package_sets, path_prefix)
def get_pkgset_from_koji(compose, koji_wrapper, path_prefix): def get_pkgset_from_koji(compose, koji_wrapper):
event_info = get_koji_event_info(compose, koji_wrapper) event_info = get_koji_event_info(compose, koji_wrapper)
return populate_global_pkgset(compose, koji_wrapper, path_prefix, event_info) return populate_global_pkgset(compose, koji_wrapper, event_info)
def _add_module_to_variant( def _add_module_to_variant(
@ -232,7 +228,7 @@ def _add_module_to_variant(
continue continue
typedir = koji_wrapper.koji_module.pathinfo.typedir(build, archive["btype"]) typedir = koji_wrapper.koji_module.pathinfo.typedir(build, archive["btype"])
filename = archive["filename"] filename = archive["filename"]
file_path = os.path.join(typedir, filename) file_path = compose.koji_downloader.get_file(os.path.join(typedir, filename))
try: try:
# If there are two dots, the arch is in the middle. MBS uploads # If there are two dots, the arch is in the middle. MBS uploads
# files with actual architecture in the filename, but Pungi deals # files with actual architecture in the filename, but Pungi deals
@ -400,7 +396,13 @@ def _is_filtered_out(compose, variant, arch, module_name, module_stream):
def _get_modules_from_koji( def _get_modules_from_koji(
compose, koji_wrapper, event, variant, variant_tags, tag_to_mmd, exclude_module_ns compose,
koji_wrapper,
event,
variant,
variant_tags,
tag_to_mmd,
exclude_module_ns,
): ):
""" """
Loads modules for given `variant` from koji `session`, adds them to Loads modules for given `variant` from koji `session`, adds them to
@ -675,7 +677,7 @@ def _get_modules_from_koji_tags(
) )
def populate_global_pkgset(compose, koji_wrapper, path_prefix, event): def populate_global_pkgset(compose, koji_wrapper, event):
all_arches = get_all_arches(compose) all_arches = get_all_arches(compose)
# List of compose tags from which we create this compose # List of compose tags from which we create this compose
@ -769,7 +771,12 @@ def populate_global_pkgset(compose, koji_wrapper, path_prefix, event):
if extra_modules: if extra_modules:
_add_extra_modules_to_variant( _add_extra_modules_to_variant(
compose, koji_wrapper, variant, extra_modules, variant_tags, tag_to_mmd compose,
koji_wrapper,
variant,
extra_modules,
variant_tags,
tag_to_mmd,
) )
variant_scratch_modules = get_variant_data( variant_scratch_modules = get_variant_data(
@ -826,6 +833,7 @@ def populate_global_pkgset(compose, koji_wrapper, path_prefix, event):
cache_region=compose.cache_region, cache_region=compose.cache_region,
signed_packages_retries=compose.conf["signed_packages_retries"], signed_packages_retries=compose.conf["signed_packages_retries"],
signed_packages_wait=compose.conf["signed_packages_wait"], signed_packages_wait=compose.conf["signed_packages_wait"],
downloader=compose.koji_downloader,
**kwargs **kwargs
) )
@ -912,7 +920,7 @@ def populate_global_pkgset(compose, koji_wrapper, path_prefix, event):
MaterializedPackageSet.create, MaterializedPackageSet.create,
compose, compose,
pkgset, pkgset,
path_prefix, compose.koji_downloader.cache_dir,
mmd=tag_to_mmd.get(pkgset.name), mmd=tag_to_mmd.get(pkgset.name),
) )
) )

View File

@ -461,6 +461,9 @@ def get_volid(compose, arch, variant=None, disc_type=False, formats=None, **kwar
if not variant_uid and "%(variant)s" in i: if not variant_uid and "%(variant)s" in i:
continue continue
try: try:
# fmt: off
# Black wants to add a comma after kwargs, but that's not valid in
# Python 2.7
args = get_format_substs( args = get_format_substs(
compose, compose,
variant=variant_uid, variant=variant_uid,
@ -472,6 +475,7 @@ def get_volid(compose, arch, variant=None, disc_type=False, formats=None, **kwar
base_product_version=base_product_version, base_product_version=base_product_version,
**kwargs **kwargs
) )
# fmt: on
volid = (i % args).format(**args) volid = (i % args).format(**args)
except KeyError as err: except KeyError as err:
raise RuntimeError( raise RuntimeError(
@ -1146,3 +1150,16 @@ def read_json_file(file_path):
"""A helper function to read a JSON file.""" """A helper function to read a JSON file."""
with open(file_path) as f: with open(file_path) as f:
return json.load(f) return json.load(f)
UNITS = ["", "Ki", "Mi", "Gi", "Ti"]
def format_size(sz):
sz = float(sz)
unit = 0
while sz > 1024:
sz /= 1024
unit += 1
return "%.3g %sB" % (sz, UNITS[unit])

View File

@ -14,17 +14,23 @@
# along with this program; if not, see <https://gnu.org/licenses/>. # along with this program; if not, see <https://gnu.org/licenses/>.
import contextlib
import os import os
import re import re
import socket
import shutil
import time import time
import threading import threading
import contextlib
import requests
import koji import koji
from kobo.shortcuts import run, force_list from kobo.shortcuts import run, force_list
import six import six
from six.moves import configparser, shlex_quote from six.moves import configparser, shlex_quote
import six.moves.xmlrpc_client as xmlrpclib import six.moves.xmlrpc_client as xmlrpclib
from flufl.lock import Lock
from datetime import timedelta
from .. import util from .. import util
from ..arch_utils import getBaseArch from ..arch_utils import getBaseArch
@ -894,3 +900,144 @@ def get_buildroot_rpms(compose, task_id):
continue continue
result.append(i) result.append(i)
return sorted(result) return sorted(result)
class KojiDownloadProxy:
def __init__(self, topdir, topurl, cache_dir, logger):
if not topdir:
self.has_local_access = True
return
self.cache_dir = cache_dir
self.logger = logger
self.topdir = topdir
self.topurl = topurl
self.has_local_access = os.path.isdir(self.topdir)
# This is used for temporary downloaded files. The suffix is unique
# per-process. To prevent threads in the same process from colliding, a
# thread id is added later.
self.unique_suffix = "%s.%s" % (socket.gethostname(), os.getpid())
self.session = None
if not self.has_local_access:
self.session = requests.Session()
@classmethod
def from_config(klass, conf, logger):
topdir = None
topurl = None
path_prefix = None
if "koji_profile" in conf:
koji_module = koji.get_profile_module(conf["koji_profile"])
topdir = koji_module.config.topdir
topurl = koji_module.config.topurl
path_prefix = topdir.rstrip("/") + "/"
if not os.path.exists(path_prefix):
path_prefix = conf["koji_cache"].rstrip("/") + "/"
return klass(topdir, topurl, path_prefix, logger)
@util.retry(wait_on=requests.exceptions.RequestException)
def _download(self, url, dest):
"""Download file into given location
:param str url: URL of the file to download
:param str dest: file path to store the result in
:returns: path to the downloaded file (same as dest) or None if the URL
"""
with self.session.get(url, stream=True) as r:
if r.status_code == 404:
self.logger.warning("GET %s NOT FOUND", url)
return None
if r.status_code != 200:
self.logger.error("GET %s %s", url, r.status_code)
r.raise_for_status()
# The exception from here will be retried by the decorator.
file_size = int(r.headers.get("Content-Length", 0))
self.logger.info("GET %s OK %s", url, util.format_size(file_size))
with open(dest, "wb") as f:
shutil.copyfileobj(r.raw, f)
return dest
def _atomic_download(self, url, dest):
"""Atomically download a file
:param str url: URL of the file to download
:param str dest: file path to store the result in
:returns: path to the downloaded file (same as dest) or None if the URL
return 404.
"""
temp_file = "%s.%s.%s" % (dest, self.unique_suffix, threading.get_ident())
# First download to the temporary location.
try:
if self._download(url, temp_file) is None:
# The file was not found.
return None
except Exception:
# Download failed, let's make sure to clean up potentially partial
# temporary file.
try:
os.remove(temp_file)
except Exception:
self.logger.warning("Failed to delete %s", temp_file)
pass
raise
# Atomically move the temporary file into final location
os.rename(temp_file, dest)
return dest
def _download_file(self, path):
"""Ensure file on Koji volume in ``path`` is present in the local
cache.
:returns: path to the local file or None if file is not found
"""
url = path.replace(self.topdir, self.topurl)
destination_file = path.replace(self.topdir, self.cache_dir)
util.makedirs(os.path.dirname(destination_file))
lock = Lock(destination_file + ".lock")
# Hold the lock for this file for 5 minutes. If another compose needs
# the same file but it's not downloaded yet, the process will wait.
#
# If the download finishes in time, the downloaded file will be used
# here.
#
# If the download takes longer, this process will steal the lock and
# start its own download.
#
# That should not be a problem: the same file will be downloaded and
# then replaced atomically on the filesystem. If the original process
# managed to hardlink the first file already, that hardlink will be
# broken, but that will only result in the same file stored twice.
lock.lifetime = timedelta(minutes=5)
with lock:
# Check if the file already exists. If yes, return the path.
if os.path.exists(destination_file):
# Update mtime of the file. This covers the case of packages in the
# tag that are not included in the compose. Updating mtime will
# exempt them from cleanup for extra time.
os.utime(destination_file)
return destination_file
return self._atomic_download(url, destination_file)
def get_file(self, path):
"""
If path refers to an existing file in Koji, return a valid local path
to it. If no such file exists, return None.
"""
if self.has_local_access:
# We have koji volume mounted locally. No transformation needed for
# the path, just check it exists.
if os.path.exists(path):
return path
return None
else:
# We need to download the file.
return self._download_file(path)

View File

@ -1,6 +1,8 @@
# Some packages must be installed via dnf/yum first, see doc/contributing.rst # Some packages must be installed via dnf/yum first, see doc/contributing.rst
dict.sorted dict.sorted
dogpile.cache dogpile.cache
flufl.lock ; python_version >= '3.0'
flufl.lock < 3.0 ; python_version <= '2.7'
funcsigs funcsigs
jsonschema jsonschema
kobo kobo