pungi/pungi/wrappers/kojiwrapper.py

1103 lines
38 KiB
Python

# -*- coding: utf-8 -*-
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; version 2 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Library General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <https://gnu.org/licenses/>.
import contextlib
import os
import re
import socket
import shutil
import time
import threading
import requests
import koji
from kobo.shortcuts import run, force_list
import six
from six.moves import configparser, shlex_quote
import six.moves.xmlrpc_client as xmlrpclib
from flufl.lock import Lock
from datetime import timedelta
from .kojimock import KojiMock
from .. import util
from ..arch_utils import getBaseArch
KOJI_BUILD_DELETED = koji.BUILD_STATES["DELETED"]
class KojiWrapper(object):
lock = threading.Lock()
def __init__(self, compose):
self.compose = compose
try:
self.profile = self.compose.conf["koji_profile"]
except KeyError:
raise RuntimeError("Koji profile must be configured")
with self.lock:
self.koji_module = koji.get_profile_module(self.profile)
session_opts = {}
for key in (
"timeout",
"keepalive",
"max_retries",
"retry_interval",
"anon_retry",
"offline_retry",
"offline_retry_interval",
"debug",
"debug_xmlrpc",
"serverca",
"use_fast_upload",
):
value = getattr(self.koji_module.config, key, None)
if value is not None:
session_opts[key] = value
self.koji_proxy = koji.ClientSession(
self.koji_module.config.server, session_opts
)
# This retry should be removed once https://pagure.io/koji/issue/3170 is
# fixed and released.
@util.retry(wait_on=(xmlrpclib.ProtocolError, koji.GenericError))
def login(self):
"""Authenticate to the hub."""
auth_type = self.koji_module.config.authtype
if auth_type == "ssl" or (
os.path.isfile(os.path.expanduser(self.koji_module.config.cert))
and auth_type is None
):
self.koji_proxy.ssl_login(
os.path.expanduser(self.koji_module.config.cert),
os.path.expanduser(self.koji_module.config.ca),
os.path.expanduser(self.koji_module.config.serverca),
)
elif auth_type == "kerberos":
self.koji_proxy.gssapi_login(
getattr(self.koji_module.config, "principal", None),
getattr(self.koji_module.config, "keytab", None),
)
else:
raise RuntimeError("Unsupported authentication type in Koji")
def _get_cmd(self, *args):
return ["koji", "--profile=%s" % self.profile] + list(args)
def get_runroot_cmd(
self,
target,
arch,
command,
quiet=False,
use_shell=True,
channel=None,
packages=None,
mounts=None,
weight=None,
new_chroot=False,
chown_paths=None,
):
cmd = self._get_cmd("runroot", "--nowait", "--task-id")
if quiet:
cmd.append("--quiet")
if new_chroot:
cmd.append("--new-chroot")
if use_shell:
cmd.append("--use-shell")
if channel:
cmd.append("--channel-override=%s" % channel)
if weight:
cmd.append("--weight=%s" % int(weight))
for package in packages or []:
cmd.append("--package=%s" % package)
for mount in mounts or []:
# directories are *not* created here
cmd.append("--mount=%s" % mount)
# IMPORTANT: all --opts have to be provided *before* args
cmd.append(target)
# i686 -> i386 etc.
arch = getBaseArch(arch)
cmd.append(arch)
if isinstance(command, list):
command = " ".join([shlex_quote(i) for i in command])
# HACK: remove rpmdb and yum cache
command = (
"rm -f /var/lib/rpm/__db*; rm -rf /var/cache/yum/*; set -x; " + command
)
if chown_paths:
paths = " ".join(shlex_quote(pth) for pth in chown_paths)
command += " ; EXIT_CODE=$?"
# Make the files world readable
command += " ; chmod -R a+r %s" % paths
# and owned by the same user that is running the process
command += " ; chown -R %d %s" % (os.getuid(), paths)
# Exit with code of main command
command += " ; exit $EXIT_CODE"
cmd.append(command)
return cmd
def get_pungi_buildinstall_cmd(
self,
target,
arch,
args,
channel=None,
packages=None,
mounts=None,
weight=None,
chown_uid=None,
):
cmd = self._get_cmd("pungi-buildinstall", "--nowait", "--task-id")
if channel:
cmd.append("--channel-override=%s" % channel)
if weight:
cmd.append("--weight=%s" % int(weight))
for package in packages or []:
cmd.append("--package=%s" % package)
for mount in mounts or []:
# directories are *not* created here
cmd.append("--mount=%s" % mount)
if chown_uid:
cmd.append("--chown-uid=%s" % chown_uid)
# IMPORTANT: all --opts have to be provided *before* args
cmd.append(target)
# i686 -> i386 etc.
arch = getBaseArch(arch)
cmd.append(arch)
for k, v in args.items():
if v:
if isinstance(v, bool):
cmd.append(k)
elif isinstance(v, int):
cmd.append("%s=%d" % (k, v))
else:
for arg in force_list(v):
cmd.append("%s=%s" % (k, arg))
return cmd
def get_pungi_ostree_cmd(
self,
target,
arch,
args,
channel=None,
packages=None,
mounts=None,
weight=None,
):
cmd = self._get_cmd("pungi-ostree", "--nowait", "--task-id")
if channel:
cmd.append("--channel-override=%s" % channel)
if weight:
cmd.append("--weight=%s" % int(weight))
for package in packages or []:
cmd.append("--package=%s" % package)
for mount in mounts or []:
# directories are *not* created here
cmd.append("--mount=%s" % mount)
# IMPORTANT: all --opts have to be provided *before* args
cmd.append(target)
# i686 -> i386 etc.
arch = getBaseArch(arch)
cmd.append(arch)
for k, v in args.items():
if v:
if isinstance(v, bool):
cmd.append(k)
elif isinstance(v, int):
cmd.append("%s=%d" % (k, v))
else:
for arg in force_list(v):
cmd.append("%s=%s" % (k, arg))
return cmd
@contextlib.contextmanager
def get_koji_cmd_env(self):
"""Get environment variables for running a koji command.
Buffering is disabled - if the compose is aborted while koji tasks are
running, we can be left with empty log files. That complicates
debugging.
If we are authenticated with a keytab, we need a fresh credentials
cache to avoid possible race condition.
"""
env = os.environ.copy()
if getattr(self.koji_module.config, "keytab", None):
with util.temp_dir(prefix="krb_ccache") as tempdir:
env["KRB5CCNAME"] = "DIR:%s" % tempdir
env["PYTHONUNBUFFERED"] = "1"
yield env
else:
env["PYTHONUNBUFFERED"] = "1"
yield env
def run_runroot_cmd(self, command, log_file=None):
"""Run koji runroot command and wait for results.
:param list command: runroot command returned by self.get_runroot_cmd()
:param str log_file: save logs to log_file
:return dict: {"retcode": 0, "output": "", "task_id": 1}
"""
task_id = None
with self.get_koji_cmd_env() as env:
retcode, output = run(
command,
can_fail=True,
logfile=log_file,
show_cmd=True,
env=env,
buffer_size=-1,
universal_newlines=True,
)
# Look for first line that contains only a number. This is the ID of
# the new task. Usually this should be the first line, but there may be
# warnings before it.
for line in output.splitlines():
match = re.search(r"^(\d+)$", line)
if match:
task_id = int(match.groups()[0])
break
if not task_id:
raise RuntimeError(
"Could not find task ID in output. Command '%s' returned '%s'."
% (" ".join(command), output)
)
self.save_task_id(task_id)
retcode, output = self._wait_for_task(task_id, logfile=log_file)
return {
"retcode": retcode,
"output": output,
"task_id": task_id,
}
def get_image_build_cmd(
self, config_options, conf_file_dest, wait=True, scratch=False
):
"""
@param config_options
@param conf_file_dest - a destination in compose workdir for
the conf file to be written
@param wait=True
@param scratch=False
"""
# Usage: koji image-build [options] <name> <version> <target> <install-tree-url> <arch> [<arch>...] # noqa: E501
sub_command = "image-build"
# The minimum set of options
min_options = (
"name",
"version",
"target",
"install_tree",
"arches",
"format",
"kickstart",
"ksurl",
"distro",
)
assert set(min_options).issubset(
set(config_options["image-build"].keys())
), "image-build requires at least %s got '%s'" % (
", ".join(min_options),
config_options,
)
cfg_parser = configparser.ConfigParser()
for section, opts in config_options.items():
cfg_parser.add_section(section)
for option, value in opts.items():
if isinstance(value, list):
value = ",".join(value)
if not isinstance(value, six.string_types):
# Python 3 configparser will reject non-string values.
value = str(value)
cfg_parser.set(section, option, value)
fd = open(conf_file_dest, "w")
cfg_parser.write(fd)
fd.close()
cmd = self._get_cmd(sub_command, "--config=%s" % conf_file_dest)
if wait:
cmd.append("--wait")
if scratch:
cmd.append("--scratch")
return cmd
def get_live_media_cmd(self, options, wait=True):
# Usage: koji spin-livemedia [options] <name> <version> <target> <arch> <kickstart-file> # noqa: E501
cmd = self._get_cmd("spin-livemedia")
for key in ("name", "version", "target", "arch", "ksfile"):
if key not in options:
raise ValueError('Expected options to have key "%s"' % key)
cmd.append(options[key])
if "install_tree" not in options:
raise ValueError('Expected options to have key "install_tree"')
cmd.append("--install-tree=%s" % options["install_tree"])
for repo in options.get("repo", []):
cmd.append("--repo=%s" % repo)
if options.get("scratch"):
cmd.append("--scratch")
if options.get("skip_tag"):
cmd.append("--skip-tag")
if "ksurl" in options:
cmd.append("--ksurl=%s" % options["ksurl"])
if "release" in options:
cmd.append("--release=%s" % options["release"])
if "can_fail" in options:
cmd.append("--can-fail=%s" % ",".join(options["can_fail"]))
if options.get("nomacboot"):
cmd.append("--nomacboot")
if wait:
cmd.append("--wait")
return cmd
def get_create_image_cmd(
self,
name,
version,
target,
arch,
ks_file,
repos,
image_type="live",
image_format=None,
release=None,
wait=True,
archive=False,
specfile=None,
ksurl=None,
):
# Usage: koji spin-livecd [options] <name> <version> <target> <arch> <kickstart-file> # noqa: E501
# Usage: koji spin-appliance [options] <name> <version> <target> <arch> <kickstart-file> # noqa: E501
# Examples:
# * name: RHEL-7.0
# * name: Satellite-6.0.1-RHEL-6
# ** -<type>.<arch>
# * version: YYYYMMDD[.n|.t].X
# * release: 1
cmd = self._get_cmd()
if image_type == "live":
cmd.append("spin-livecd")
elif image_type == "appliance":
cmd.append("spin-appliance")
else:
raise ValueError("Invalid image type: %s" % image_type)
if not archive:
cmd.append("--scratch")
cmd.append("--noprogress")
if wait:
cmd.append("--wait")
else:
cmd.append("--nowait")
if specfile:
cmd.append("--specfile=%s" % specfile)
if ksurl:
cmd.append("--ksurl=%s" % ksurl)
if isinstance(repos, list):
for repo in repos:
cmd.append("--repo=%s" % repo)
else:
cmd.append("--repo=%s" % repos)
if image_format:
if image_type != "appliance":
raise ValueError("Format can be specified only for appliance images'")
supported_formats = ["raw", "qcow", "qcow2", "vmx"]
if image_format not in supported_formats:
raise ValueError(
"Format is not supported: %s. Supported formats: %s"
% (image_format, " ".join(sorted(supported_formats)))
)
cmd.append("--format=%s" % image_format)
if release is not None:
cmd.append("--release=%s" % release)
# IMPORTANT: all --opts have to be provided *before* args
# Usage:
# koji spin-livecd [options] <name> <version> <target> <arch> <kickstart-file>
cmd.append(name)
cmd.append(version)
cmd.append(target)
# i686 -> i386 etc.
arch = getBaseArch(arch)
cmd.append(arch)
cmd.append(ks_file)
return cmd
def _has_connection_error(self, output):
"""Checks if output indicates connection error."""
return re.search("error: failed to connect\n$", output)
def _has_offline_error(self, output):
"""Check if output indicates server offline."""
return re.search("koji: ServerOffline:", output)
def _wait_for_task(self, task_id, logfile=None, max_retries=None):
"""Tries to wait for a task to finish. On connection error it will
retry with `watch-task` command.
"""
cmd = self._get_cmd("watch-task", str(task_id))
attempt = 0
while True:
retcode, output = run(
cmd, can_fail=True, logfile=logfile, universal_newlines=True
)
if retcode == 0 or not (
self._has_connection_error(output) or self._has_offline_error(output)
):
# Task finished for reason other than connection error
# or server offline error.
return retcode, output
attempt += 1
if max_retries and attempt >= max_retries:
break
time.sleep(attempt * 10)
raise RuntimeError(
"Failed to wait for task %s. Too many connection errors." % task_id
)
def run_blocking_cmd(self, command, log_file=None, max_retries=None):
"""
Run a blocking koji command. Returns a dict with output of the command,
its exit code and parsed task id. This method will block until the
command finishes.
"""
with self.get_koji_cmd_env() as env:
retcode, output = run(
command,
can_fail=True,
show_cmd=True,
logfile=log_file,
env=env,
buffer_size=-1,
universal_newlines=True,
)
match = re.search(r"Created task: (\d+)", output)
if not match:
raise RuntimeError(
"Could not find task ID in output. Command '%s' returned '%s'."
% (" ".join(command), output)
)
task_id = int(match.groups()[0])
self.save_task_id(task_id)
if retcode != 0 and (
self._has_connection_error(output) or self._has_offline_error(output)
):
retcode, output = self._wait_for_task(
task_id, logfile=log_file, max_retries=max_retries
)
return {
"retcode": retcode,
"output": output,
"task_id": task_id,
}
def watch_task(self, task_id, log_file=None, max_retries=None):
"""Watch and wait for a task to finish.
:param int task_id: ID of koji task.
:param str log_file: Path to log file.
:param int max_retries: Max times to retry when error occurs,
no limits by default.
"""
if log_file:
task_url = os.path.join(
self.koji_module.config.weburl, "taskinfo?taskID=%d" % task_id
)
with open(log_file, "a") as f:
f.write("Task URL: %s\n" % task_url)
retcode, _ = self._wait_for_task(
task_id, logfile=log_file, max_retries=max_retries
)
return retcode
def get_image_paths(self, task_id, callback=None):
"""
Given an image task in Koji, get a mapping from arches to a list of
paths to results of the task.
If callback is given, it will be called once with arch of every failed
subtask.
"""
result = {}
# task = self.koji_proxy.getTaskInfo(task_id, request=True)
children_tasks = self.koji_proxy.getTaskChildren(task_id, request=True)
for child_task in children_tasks:
if child_task["method"] not in [
"createImage",
"createLiveMedia",
"createAppliance",
]:
continue
if child_task["state"] != koji.TASK_STATES["CLOSED"]:
# The subtask is failed, which can happen with the can_fail
# option. If given, call the callback, and go to next child.
if callback:
callback(child_task["arch"])
continue
is_scratch = child_task["request"][-1].get("scratch", False)
task_result = self.koji_proxy.getTaskResult(child_task["id"])
if is_scratch:
topdir = os.path.join(
self.koji_module.pathinfo.work(),
self.koji_module.pathinfo.taskrelpath(child_task["id"]),
)
else:
build = self.koji_proxy.getImageBuild(
"%(name)s-%(version)s-%(release)s" % task_result
)
build["name"] = task_result["name"]
build["version"] = task_result["version"]
build["release"] = task_result["release"]
build["arch"] = task_result["arch"]
topdir = self.koji_module.pathinfo.imagebuild(build)
for i in task_result["files"]:
result.setdefault(task_result["arch"], []).append(
os.path.join(topdir, i)
)
return result
def get_image_path(self, task_id):
result = []
task_info_list = []
task_info_list.append(self.koji_proxy.getTaskInfo(task_id, request=True))
task_info_list.extend(self.koji_proxy.getTaskChildren(task_id, request=True))
# scan parent and child tasks for certain methods
task_info = None
for i in task_info_list:
if i["method"] in ("createAppliance", "createLiveCD", "createImage"):
task_info = i
break
scratch = task_info["request"][-1].get("scratch", False)
task_result = self.koji_proxy.getTaskResult(task_info["id"])
task_result.pop("rpmlist", None)
if scratch:
topdir = os.path.join(
self.koji_module.pathinfo.work(),
self.koji_module.pathinfo.taskrelpath(task_info["id"]),
)
else:
build = self.koji_proxy.getImageBuild(
"%(name)s-%(version)s-%(release)s" % task_result
)
build["name"] = task_result["name"]
build["version"] = task_result["version"]
build["release"] = task_result["release"]
build["arch"] = task_result["arch"]
topdir = self.koji_module.pathinfo.imagebuild(build)
for i in task_result["files"]:
result.append(os.path.join(topdir, i))
return result
def get_wrapped_rpm_path(self, task_id, srpm=False):
result = []
task_info_list = []
task_info_list.extend(self.koji_proxy.getTaskChildren(task_id, request=True))
# scan parent and child tasks for certain methods
task_info = None
for i in task_info_list:
if i["method"] in ("wrapperRPM"):
task_info = i
break
# Get results of wrapperRPM task
# {'buildroot_id': 2479520,
# 'logs': ['checkout.log', 'root.log', 'state.log', 'build.log'],
# 'rpms': ['foreman-discovery-image-2.1.0-2.el7sat.noarch.rpm'],
# 'srpm': 'foreman-discovery-image-2.1.0-2.el7sat.src.rpm'}
task_result = self.koji_proxy.getTaskResult(task_info["id"])
# Get koji dir with results (rpms, srpms, logs, ...)
topdir = os.path.join(
self.koji_module.pathinfo.work(),
self.koji_module.pathinfo.taskrelpath(task_info["id"]),
)
# TODO: Maybe use different approach for non-scratch
# builds - see get_image_path()
# Get list of filenames that should be returned
result_files = task_result["rpms"]
if srpm:
result_files += [task_result["srpm"]]
# Prepare list with paths to the required files
for i in result_files:
result.append(os.path.join(topdir, i))
return result
def get_signed_wrapped_rpms_paths(self, task_id, sigkey, srpm=False):
result = []
parent_task = self.koji_proxy.getTaskInfo(task_id, request=True)
task_info_list = []
task_info_list.extend(self.koji_proxy.getTaskChildren(task_id, request=True))
# scan parent and child tasks for certain methods
task_info = None
for i in task_info_list:
if i["method"] in ("wrapperRPM"):
task_info = i
break
# Check parent_task if it's scratch build
scratch = parent_task["request"][-1].get("scratch", False)
if scratch:
raise RuntimeError("Scratch builds cannot be signed!")
# Get results of wrapperRPM task
# {'buildroot_id': 2479520,
# 'logs': ['checkout.log', 'root.log', 'state.log', 'build.log'],
# 'rpms': ['foreman-discovery-image-2.1.0-2.el7sat.noarch.rpm'],
# 'srpm': 'foreman-discovery-image-2.1.0-2.el7sat.src.rpm'}
task_result = self.koji_proxy.getTaskResult(task_info["id"])
# Get list of filenames that should be returned
result_files = task_result["rpms"]
if srpm:
result_files += [task_result["srpm"]]
# Prepare list with paths to the required files
for i in result_files:
rpminfo = self.koji_proxy.getRPM(i)
build = self.koji_proxy.getBuild(rpminfo["build_id"])
path = os.path.join(
self.koji_module.pathinfo.build(build),
self.koji_module.pathinfo.signed(rpminfo, sigkey),
)
result.append(path)
return result
def get_build_nvrs(self, task_id):
builds = self.koji_proxy.listBuilds(taskID=task_id)
return [build.get("nvr") for build in builds if build.get("nvr")]
def multicall_map(
self, koji_session, koji_session_fnc, list_of_args=None, list_of_kwargs=None
):
"""
Calls the `koji_session_fnc` using Koji multicall feature N times based on
the list of arguments passed in `list_of_args` and `list_of_kwargs`.
Returns list of responses sorted the same way as input args/kwargs.
In case of error, the error message is logged and None is returned.
For example to get the package ids of "httpd" and "apr" packages:
ids = multicall_map(session, session.getPackageID, ["httpd", "apr"])
# ids is now [280, 632]
:param KojiSessions koji_session: KojiSession to use for multicall.
:param object koji_session_fnc: Python object representing the
KojiSession method to call.
:param list list_of_args: List of args which are passed to each
call of koji_session_fnc.
:param list list_of_kwargs: List of kwargs which are passed to
each call of koji_session_fnc.
"""
if list_of_args is None and list_of_kwargs is None:
raise ValueError("One of list_of_args or list_of_kwargs must be set.")
if list_of_args is not None and not isinstance(list_of_args, list):
raise ValueError("list_of_args must be list or None.")
if list_of_kwargs is not None and not isinstance(list_of_kwargs, list):
raise ValueError("list_of_kwargs must be list or None.")
if list_of_kwargs is None:
list_of_kwargs = [{}] * len(list_of_args)
if list_of_args is None:
list_of_args = [[]] * len(list_of_kwargs)
if len(list_of_args) != len(list_of_kwargs):
raise ValueError(
"Length of list_of_args and list_of_kwargs must be the same."
)
koji_session.multicall = True
for args, kwargs in zip(list_of_args, list_of_kwargs):
if not isinstance(args, list):
args = [args]
if not isinstance(kwargs, dict):
raise ValueError("Every item in list_of_kwargs must be a dict")
koji_session_fnc(*args, **kwargs)
responses = koji_session.multiCall(strict=True)
if not responses:
return None
if not isinstance(responses, list):
raise ValueError(
"Fault element was returned for multicall of method %r: %r"
% (koji_session_fnc, responses)
)
results = []
# For the response specification, see
# https://web.archive.org/web/20060624230303/http://www.xmlrpc.com/discuss/msgReader$1208?mode=topic # noqa: E501
# Relevant part of this:
# Multicall returns an array of responses. There will be one response
# for each call in the original array. The result will either be
# a one-item array containing the result value,
# or a struct of the form found inside the standard <fault> element.
for response, args, kwargs in zip(responses, list_of_args, list_of_kwargs):
if isinstance(response, list):
if not response:
raise ValueError(
"Empty list returned for multicall of method %r with args %r, %r" # noqa: E501
% (koji_session_fnc, args, kwargs)
)
results.append(response[0])
else:
raise ValueError(
"Unexpected data returned for multicall of method %r with args %r, %r: %r" # noqa: E501
% (koji_session_fnc, args, kwargs, response)
)
return results
@util.retry(wait_on=(xmlrpclib.ProtocolError, koji.GenericError))
def retrying_multicall_map(self, *args, **kwargs):
"""
Retrying version of multicall_map. This tries to retry the Koji call
in case of koji.GenericError or xmlrpclib.ProtocolError.
Please refer to koji_multicall_map for further specification of arguments.
"""
return self.multicall_map(*args, **kwargs)
def save_task_id(self, task_id):
"""Save task id by creating a file using task_id as file name
:param int task_id: ID of koji task
"""
log_dir = self.compose.paths.log.koji_tasks_dir()
with open(os.path.join(log_dir, str(task_id)), "w"):
pass
class KojiMockWrapper(object):
lock = threading.Lock()
def __init__(self, compose, all_arches):
self.all_arches = all_arches
self.compose = compose
try:
self.profile = self.compose.conf["koji_profile"]
except KeyError:
raise RuntimeError("Koji profile must be configured")
with self.lock:
self.koji_module = koji.get_profile_module(self.profile)
session_opts = {}
for key in (
"timeout",
"keepalive",
"max_retries",
"retry_interval",
"anon_retry",
"offline_retry",
"offline_retry_interval",
"debug",
"debug_xmlrpc",
"serverca",
"use_fast_upload",
):
value = getattr(self.koji_module.config, key, None)
if value is not None:
session_opts[key] = value
self.koji_proxy = KojiMock(
packages_dir=self.koji_module.config.topdir,
modules_dir=os.path.join(
self.koji_module.config.topdir,
'modules',
),
all_arches=self.all_arches,
)
def get_buildroot_rpms(compose, task_id):
"""Get build root RPMs - either from runroot or local"""
result = []
if task_id:
# runroot
koji = KojiWrapper(compose)
buildroot_infos = koji.koji_proxy.listBuildroots(taskID=task_id)
if not buildroot_infos:
children_tasks = koji.koji_proxy.getTaskChildren(task_id)
for child_task in children_tasks:
buildroot_infos = koji.koji_proxy.listBuildroots(
taskID=child_task["id"]
)
if buildroot_infos:
break
buildroot_info = buildroot_infos[-1]
data = koji.koji_proxy.listRPMs(componentBuildrootID=buildroot_info["id"])
for rpm_info in data:
fmt = "%(nvr)s.%(arch)s"
result.append(fmt % rpm_info)
else:
# local
retcode, output = run(
"rpm -qa --qf='%{name}-%{version}-%{release}.%{arch}\n'",
universal_newlines=True,
)
for i in output.splitlines():
if not i:
continue
result.append(i)
return sorted(result)
class KojiDownloadProxy:
def __init__(self, topdir, topurl, cache_dir, logger):
if not topdir:
self.has_local_access = True
return
self.cache_dir = cache_dir
self.logger = logger
self.topdir = topdir
self.topurl = topurl
self.has_local_access = os.path.isdir(self.topdir)
# This is used for temporary downloaded files. The suffix is unique
# per-process. To prevent threads in the same process from colliding, a
# thread id is added later.
self.unique_suffix = "%s.%s" % (socket.gethostname(), os.getpid())
self.session = None
if not self.has_local_access:
self.session = requests.Session()
@classmethod
def from_config(klass, conf, logger):
topdir = None
topurl = None
path_prefix = None
if "koji_profile" in conf:
koji_module = koji.get_profile_module(conf["koji_profile"])
topdir = koji_module.config.topdir
topurl = koji_module.config.topurl
path_prefix = topdir.rstrip("/") + "/"
if not os.path.exists(path_prefix):
path_prefix = conf["koji_cache"].rstrip("/") + "/"
return klass(topdir, topurl, path_prefix, logger)
@util.retry(wait_on=requests.exceptions.RequestException)
def _download(self, url, dest):
"""Download file into given location
:param str url: URL of the file to download
:param str dest: file path to store the result in
:returns: path to the downloaded file (same as dest) or None if the URL
"""
with self.session.get(url, stream=True) as r:
if r.status_code == 404:
self.logger.warning("GET %s NOT FOUND", url)
return None
if r.status_code != 200:
self.logger.error("GET %s %s", url, r.status_code)
r.raise_for_status()
# The exception from here will be retried by the decorator.
file_size = int(r.headers.get("Content-Length", 0))
self.logger.info("GET %s OK %s", url, util.format_size(file_size))
with open(dest, "wb") as f:
shutil.copyfileobj(r.raw, f)
return dest
def _delete(self, path):
"""Try to delete file at given path and ignore errors."""
try:
os.remove(path)
except Exception:
self.logger.warning("Failed to delete %s", path)
def _atomic_download(self, url, dest, validator):
"""Atomically download a file
:param str url: URL of the file to download
:param str dest: file path to store the result in
:returns: path to the downloaded file (same as dest) or None if the URL
return 404.
"""
temp_file = "%s.%s.%s" % (dest, self.unique_suffix, threading.get_ident())
# First download to the temporary location.
try:
if self._download(url, temp_file) is None:
# The file was not found.
return None
except Exception:
# Download failed, let's make sure to clean up potentially partial
# temporary file.
self._delete(temp_file)
raise
# Check if the temporary file is correct (assuming we were provided a
# validator function).
try:
if validator:
validator(temp_file)
except Exception:
# Validation failed. Let's delete the problematic file and re-raise
# the exception.
self._delete(temp_file)
raise
# Atomically move the temporary file into final location
os.rename(temp_file, dest)
return dest
def _download_file(self, path, validator):
"""Ensure file on Koji volume in ``path`` is present in the local
cache.
:returns: path to the local file or None if file is not found
"""
url = path.replace(self.topdir, self.topurl)
destination_file = path.replace(self.topdir, self.cache_dir)
util.makedirs(os.path.dirname(destination_file))
lock = Lock(destination_file + ".lock")
# Hold the lock for this file for 5 minutes. If another compose needs
# the same file but it's not downloaded yet, the process will wait.
#
# If the download finishes in time, the downloaded file will be used
# here.
#
# If the download takes longer, this process will steal the lock and
# start its own download.
#
# That should not be a problem: the same file will be downloaded and
# then replaced atomically on the filesystem. If the original process
# managed to hardlink the first file already, that hardlink will be
# broken, but that will only result in the same file stored twice.
lock.lifetime = timedelta(minutes=5)
with lock:
# Check if the file already exists. If yes, return the path.
if os.path.exists(destination_file):
# Update mtime of the file. This covers the case of packages in the
# tag that are not included in the compose. Updating mtime will
# exempt them from cleanup for extra time.
os.utime(destination_file)
return destination_file
return self._atomic_download(url, destination_file, validator)
def get_file(self, path, validator=None):
"""
If path refers to an existing file in Koji, return a valid local path
to it. If no such file exists, return None.
:param validator: A callable that will be called with the path to the
downloaded file if and only if the file was actually downloaded.
Any exception raised from there will be abort the download and be
propagated.
"""
if self.has_local_access:
# We have koji volume mounted locally. No transformation needed for
# the path, just check it exists.
if os.path.exists(path):
return path
return None
else:
# We need to download the file.
return self._download_file(path, validator)