- Mock of Koji is moved to the separate modules, classes

- Unittests for mock of Koji are moved to the separate
This commit is contained in:
soksanichenko 2022-11-07 19:24:39 +02:00
parent 750499eda1
commit b49ffee06d
7 changed files with 1859 additions and 100 deletions

View File

@ -493,6 +493,478 @@ class KojiPackageSet(PackageSetBase):
return response return response
def get_package_path(self, queue_item):
rpm_info, build_info = queue_item
# Check if this RPM is coming from scratch task. In this case, we already
# know the path.
if "path_from_task" in rpm_info:
return rpm_info["path_from_task"]
pathinfo = self.koji_wrapper.koji_module.pathinfo
paths = []
for sigkey in self.sigkey_ordering:
if not sigkey:
# we're looking for *signed* copies here
continue
sigkey = sigkey.lower()
rpm_path = os.path.join(
pathinfo.build(build_info), pathinfo.signed(rpm_info, sigkey)
)
paths.append(rpm_path)
if os.path.isfile(rpm_path):
return rpm_path
if None in self.sigkey_ordering or "" in self.sigkey_ordering:
# use an unsigned copy (if allowed)
rpm_path = os.path.join(pathinfo.build(build_info), pathinfo.rpm(rpm_info))
paths.append(rpm_path)
if os.path.isfile(rpm_path):
return rpm_path
if self._allow_invalid_sigkeys and rpm_info["name"] not in self.packages:
# use an unsigned copy (if allowed)
rpm_path = os.path.join(pathinfo.build(build_info), pathinfo.rpm(rpm_info))
paths.append(rpm_path)
if os.path.isfile(rpm_path):
self._invalid_sigkey_rpms.append(rpm_info)
return rpm_path
self._invalid_sigkey_rpms.append(rpm_info)
self.log_error(
"RPM %s not found for sigs: %s. Paths checked: %s"
% (rpm_info, self.sigkey_ordering, paths)
)
return None
def populate(self, tag, event=None, inherit=True, include_packages=None):
"""Populate the package set with packages from given tag.
:param event: the Koji event to query at (or latest if not given)
:param inherit: whether to enable tag inheritance
:param include_packages: an iterable of tuples (package name, arch) that should
be included, all others are skipped.
"""
result_rpms = []
result_srpms = []
include_packages = set(include_packages or [])
if type(event) is dict:
event = event["id"]
msg = "Getting latest RPMs (tag: %s, event: %s, inherit: %s)" % (
tag,
event,
inherit,
)
self.log_info("[BEGIN] %s" % msg)
rpms, builds = self.get_latest_rpms(tag, event, inherit=inherit)
extra_rpms, extra_builds = self.get_extra_rpms()
rpms += extra_rpms
builds += extra_builds
extra_builds_by_name = {}
for build_info in extra_builds:
extra_builds_by_name[build_info["name"]] = build_info["build_id"]
builds_by_id = {}
exclude_build_id = []
for build_info in builds:
build_id, build_name = build_info["build_id"], build_info["name"]
if (
build_name in extra_builds_by_name
and build_id != extra_builds_by_name[build_name]
):
exclude_build_id.append(build_id)
else:
builds_by_id.setdefault(build_id, build_info)
# Get extra RPMs from tasks.
rpms += self.get_extra_rpms_from_tasks()
skipped_arches = []
skipped_packages_count = 0
# We need to process binary packages first, and then source packages.
# If we have a list of packages to use, we need to put all source rpms
# names into it. Otherwise if the SRPM name does not occur on the list,
# it would be missing from the package set. Even if it ultimately does
# not end in the compose, we need it to extract ExcludeArch and
# ExclusiveArch for noarch packages.
for rpm_info in itertools.chain(
(rpm for rpm in rpms if not _is_src(rpm)),
(rpm for rpm in rpms if _is_src(rpm)),
):
if rpm_info["build_id"] in exclude_build_id:
continue
if self.arches and rpm_info["arch"] not in self.arches:
if rpm_info["arch"] not in skipped_arches:
self.log_debug("Skipping packages for arch: %s" % rpm_info["arch"])
skipped_arches.append(rpm_info["arch"])
continue
if (
include_packages
and (rpm_info["name"], rpm_info["arch"]) not in include_packages
and rpm_info["arch"] != "src"
):
self.log_debug(
"Skipping %(name)s-%(version)s-%(release)s.%(arch)s" % rpm_info
)
continue
if (
self.populate_only_packages
and self.packages
and rpm_info["name"] not in self.packages
):
skipped_packages_count += 1
continue
build_info = builds_by_id.get(rpm_info["build_id"], None)
if _is_src(rpm_info):
result_srpms.append((rpm_info, build_info))
else:
result_rpms.append((rpm_info, build_info))
if self.populate_only_packages and self.packages:
# Only add the package if we already have some whitelist.
if build_info:
self.packages.add(build_info["name"])
else:
# We have no build info and therefore no Koji package name,
# we can only guess that the Koji package name would be the same
# one as the RPM name.
self.packages.add(rpm_info["name"])
if skipped_packages_count:
self.log_debug(
"Skipped %d packages, not marked as to be "
"included in a compose." % skipped_packages_count
)
result = self.read_packages(result_rpms, result_srpms)
# Check that after reading the packages, every package that is
# included in a compose has the right sigkey.
if self._invalid_sigkey_rpms:
invalid_sigkey_rpms = [
rpm for rpm in self._invalid_sigkey_rpms if rpm["name"] in self.packages
]
if invalid_sigkey_rpms:
self.raise_invalid_sigkeys_exception(invalid_sigkey_rpms)
self.log_info("[DONE ] %s" % msg)
return result
def write_reuse_file(self, compose, include_packages):
"""Write data to files for reusing in future.
:param compose: compose object
:param include_packages: an iterable of tuples (package name, arch) that should
be included.
"""
reuse_file = compose.paths.work.pkgset_reuse_file(self.name)
self.log_info("Writing pkgset reuse file: %s" % reuse_file)
try:
with open(reuse_file, "wb") as f:
pickle.dump(
{
"name": self.name,
"allow_invalid_sigkeys": self._allow_invalid_sigkeys,
"arches": self.arches,
"sigkeys": self.sigkey_ordering,
"packages": self.packages,
"populate_only_packages": self.populate_only_packages,
"rpms_by_arch": self.rpms_by_arch,
"srpms_by_name": self.srpms_by_name,
"extra_builds": self.extra_builds,
"include_packages": include_packages,
},
f,
protocol=pickle.HIGHEST_PROTOCOL,
)
except Exception as e:
self.log_warning("Writing pkgset reuse file failed: %s" % str(e))
def _get_koji_event_from_file(self, event_file):
with open(event_file, "r") as f:
return json.load(f)["id"]
def try_to_reuse(self, compose, tag, inherit=True, include_packages=None):
"""Try to reuse pkgset data of old compose.
:param compose: compose object
:param str tag: koji tag name
:param inherit: whether to enable tag inheritance
:param include_packages: an iterable of tuples (package name, arch) that should
be included.
"""
if not compose.conf["pkgset_allow_reuse"]:
self.log_info("Reusing pkgset data from old compose is disabled.")
return False
self.log_info("Trying to reuse pkgset data of old compose")
if not compose.paths.get_old_compose_topdir():
self.log_debug("No old compose found. Nothing to reuse.")
return False
event_file = os.path.join(
compose.paths.work.topdir(arch="global", create_dir=False), "koji-event"
)
old_event_file = compose.paths.old_compose_path(event_file)
try:
koji_event = self._get_koji_event_from_file(event_file)
old_koji_event = self._get_koji_event_from_file(old_event_file)
except Exception as e:
self.log_debug("Can't read koji event from file: %s" % str(e))
return False
if koji_event != old_koji_event:
self.log_debug(
"Koji event doesn't match, querying changes between event %d and %d"
% (old_koji_event, koji_event)
)
changed = self.koji_proxy.queryHistory(
tables=["tag_listing"], tag=tag, afterEvent=old_koji_event
)
if changed["tag_listing"]:
self.log_debug("Builds under tag %s changed. Can't reuse." % tag)
return False
if inherit:
inherit_tags = self.koji_proxy.getFullInheritance(tag, koji_event)
for t in inherit_tags:
changed = self.koji_proxy.queryHistory(
tables=["tag_listing"],
tag=t["name"],
afterEvent=old_koji_event,
beforeEvent=koji_event + 1,
)
if changed["tag_listing"]:
self.log_debug(
"Builds under inherited tag %s changed. Can't reuse."
% t["name"]
)
return False
repo_dir = compose.paths.work.pkgset_repo(tag, create_dir=False)
old_repo_dir = compose.paths.old_compose_path(repo_dir)
if not old_repo_dir:
self.log_debug("Can't find old repo dir to reuse.")
return False
old_reuse_file = compose.paths.old_compose_path(
compose.paths.work.pkgset_reuse_file(tag)
)
try:
self.log_debug("Loading reuse file: %s" % old_reuse_file)
reuse_data = self.load_old_file_cache(old_reuse_file)
except Exception as e:
self.log_debug("Failed to load reuse file: %s" % str(e))
return False
if (
reuse_data["allow_invalid_sigkeys"] == self._allow_invalid_sigkeys
and reuse_data["packages"] == self.packages
and reuse_data["populate_only_packages"] == self.populate_only_packages
and reuse_data["extra_builds"] == self.extra_builds
and reuse_data["sigkeys"] == self.sigkey_ordering
and reuse_data["include_packages"] == include_packages
):
self.log_info("Copying repo data for reuse: %s" % old_repo_dir)
copy_all(old_repo_dir, repo_dir)
self.reuse = old_repo_dir
self.rpms_by_arch = reuse_data["rpms_by_arch"]
self.srpms_by_name = reuse_data["srpms_by_name"]
if self.old_file_cache:
self.file_cache = self.old_file_cache
return True
else:
self.log_info("Criteria does not match. Nothing to reuse.")
return False
class KojiMockPackageSet(PackageSetBase):
def __init__(
self,
name,
koji_wrapper,
sigkey_ordering,
arches=None,
logger=None,
packages=None,
allow_invalid_sigkeys=False,
populate_only_packages=False,
cache_region=None,
extra_builds=None,
extra_tasks=None,
):
"""
Creates new KojiPackageSet.
:param list sigkey_ordering: Ordered list of sigkey strings. When
getting package from Koji, KojiPackageSet tries to get the package
signed by sigkey from this list. If None or "" appears in this
list, unsigned package is used.
:param list arches: List of arches to get the packages for.
:param logging.Logger logger: Logger instance to use for logging.
:param list packages: List of package names to be used when
`allow_invalid_sigkeys` or `populate_only_packages` is set.
:param bool allow_invalid_sigkeys: When True, packages *not* listed in
the `packages` list are added to KojiPackageSet even if they have
invalid sigkey. This is useful in case Koji tag contains some
unsigned packages, but we know they won't appear in a compose.
When False, all packages in Koji tag must have valid sigkey as
defined in `sigkey_ordering`.
:param bool populate_only_packages. When True, only packages in
`packages` list are added to KojiPackageSet. This can save time
when generating compose from predefined list of packages from big
Koji tag.
When False, all packages from Koji tag are added to KojiPackageSet.
:param dogpile.cache.CacheRegion cache_region: If set, the CacheRegion
will be used to cache the list of RPMs per Koji tag, so next calls
of the KojiPackageSet.populate(...) method won't try fetching it
again.
:param list extra_builds: Extra builds NVRs to get from Koji and include
in the package set.
:param list extra_tasks: Extra RPMs defined as Koji task IDs to get from Koji
and include in the package set. Useful when building testing compose
with RPM scratch builds.
"""
super(KojiPackageSet, self).__init__(
name,
sigkey_ordering=sigkey_ordering,
arches=arches,
logger=logger,
allow_invalid_sigkeys=allow_invalid_sigkeys,
)
self.koji_wrapper = koji_wrapper
# Names of packages to look for in the Koji tag.
self.packages = set(packages or [])
self.populate_only_packages = populate_only_packages
self.cache_region = cache_region
self.extra_builds = extra_builds or []
self.extra_tasks = extra_tasks or []
self.reuse = None
def __getstate__(self):
result = self.__dict__.copy()
result["koji_profile"] = self.koji_wrapper.profile
del result["koji_wrapper"]
del result["_logger"]
if "cache_region" in result:
del result["cache_region"]
return result
def __setstate__(self, data):
koji_profile = data.pop("koji_profile")
self.koji_wrapper = pungi.wrappers.kojiwrapper.KojiWrapper(koji_profile)
self._logger = None
self.__dict__.update(data)
@property
def koji_proxy(self):
return self.koji_wrapper.koji_proxy
def get_extra_rpms(self):
if not self.extra_builds:
return [], []
rpms = []
builds = []
builds = self.koji_wrapper.retrying_multicall_map(
self.koji_proxy, self.koji_proxy.getBuild, list_of_args=self.extra_builds
)
rpms_in_builds = self.koji_wrapper.retrying_multicall_map(
self.koji_proxy,
self.koji_proxy.listBuildRPMs,
list_of_args=self.extra_builds,
)
rpms = []
for rpms_in_build in rpms_in_builds:
rpms += rpms_in_build
return rpms, builds
def get_extra_rpms_from_tasks(self):
"""
Returns manually constructed RPM infos from the Koji tasks defined
in `self.extra_tasks`.
:rtype: list
:return: List with RPM infos defined as dicts with following keys:
- name, version, release, arch, src - as returned by parse_nvra.
- path_from_task - Full path to RPM on /mnt/koji.
- build_id - Always set to None.
"""
if not self.extra_tasks:
return []
# Get the IDs of children tasks - these are the tasks containing
# the resulting RPMs.
children_tasks = self.koji_wrapper.retrying_multicall_map(
self.koji_proxy,
self.koji_proxy.getTaskChildren,
list_of_args=self.extra_tasks,
)
children_task_ids = []
for tasks in children_tasks:
children_task_ids += [t["id"] for t in tasks]
# Get the results of these children tasks.
results = self.koji_wrapper.retrying_multicall_map(
self.koji_proxy,
self.koji_proxy.getTaskResult,
list_of_args=children_task_ids,
)
rpms = []
for result in results:
rpms += result.get("rpms", [])
rpms += result.get("srpms", [])
rpm_infos = []
for rpm in rpms:
rpm_info = kobo.rpmlib.parse_nvra(os.path.basename(rpm))
rpm_info["path_from_task"] = os.path.join(
self.koji_wrapper.koji_module.pathinfo.work(), rpm
)
rpm_info["build_id"] = None
rpm_infos.append(rpm_info)
return rpm_infos
def get_latest_rpms(self, tag, event, inherit=True):
if not tag:
return [], []
response = None
if self.cache_region:
cache_key = "KojiPackageSet.get_latest_rpms_%s_%s_%s" % (
str(tag),
str(event),
str(inherit),
)
try:
response = self.cache_region.get(cache_key)
except Exception:
pass
if not response:
response = self.koji_proxy.listTaggedRPMS(
tag, event=event, inherit=inherit, latest=True
)
if self.cache_region:
try:
self.cache_region.set(cache_key, response)
except Exception:
pass
return response
def get_package_path(self, queue_item): def get_package_path(self, queue_item):
rpm_info, build_info = queue_item rpm_info, build_info = queue_item

View File

@ -23,18 +23,12 @@ from itertools import groupby
from kobo.rpmlib import parse_nvra from kobo.rpmlib import parse_nvra
from kobo.shortcuts import force_list from kobo.shortcuts import force_list
from typing import (
Dict,
AnyStr,
List,
Tuple,
Set,
)
import pungi.wrappers.kojiwrapper import pungi.wrappers.kojiwrapper
from pungi.wrappers.comps import CompsWrapper from pungi.wrappers.comps import CompsWrapper
from pungi.wrappers.mbs import MBSWrapper from pungi.wrappers.mbs import MBSWrapper
import pungi.phases.pkgset.pkgsets import pungi.phases.pkgset.pkgsets
from pungi.arch import getBaseArch
from pungi.util import retry, get_arch_variant_data, get_variant_data from pungi.util import retry, get_arch_variant_data, get_variant_data
from pungi.module_util import Modulemd from pungi.module_util import Modulemd
@ -227,13 +221,18 @@ def _add_module_to_variant(
if archive["btype"] != "module": if archive["btype"] != "module":
# Skip non module archives # Skip non module archives
continue continue
typedir = koji_wrapper.koji_module.pathinfo.typedir(build, archive["btype"])
filename = archive["filename"] filename = archive["filename"]
file_path = os.path.join( file_path = os.path.join(typedir, filename)
koji_wrapper.koji_module.pathinfo.topdir, try:
'modules', # If there are two dots, the arch is in the middle. MBS uploads
build['arch'], # files with actual architecture in the filename, but Pungi deals
build['extra']['typeinfo']['module']['content_koji_tag'] # in basearch. This assumes that each arch in the build maps to a
) # unique basearch.
_, arch, _ = filename.split(".")
filename = "modulemd.%s.txt" % getBaseArch(arch)
except ValueError:
pass
mmds[filename] = file_path mmds[filename] = file_path
if len(mmds) <= 1: if len(mmds) <= 1:
@ -491,15 +490,16 @@ def filter_by_whitelist(compose, module_builds, input_modules, expected_modules)
info.get("context"), info.get("context"),
) )
nvr_patterns.add((pattern, spec["name"])) nvr_patterns.add((pattern, spec["name"]))
modules_to_keep = [] modules_to_keep = []
for mb in sorted(module_builds, key=lambda i: i['name']): for mb in module_builds:
# Split release from the build into version and context # Split release from the build into version and context
ver, ctx = mb["release"].split(".") ver, ctx = mb["release"].split(".")
# Values in `mb` are from Koji build. There's nvr and name, version and # Values in `mb` are from Koji build. There's nvr and name, version and
# release. The input pattern specifies modular name, stream, version # release. The input pattern specifies modular name, stream, version
# and context. # and context.
for (n, s, v, c), spec in sorted(nvr_patterns): for (n, s, v, c), spec in nvr_patterns:
if ( if (
# We always have a name and stream... # We always have a name and stream...
mb["name"] == n mb["name"] == n
@ -511,49 +511,11 @@ def filter_by_whitelist(compose, module_builds, input_modules, expected_modules)
): ):
modules_to_keep.append(mb) modules_to_keep.append(mb)
expected_modules.discard(spec) expected_modules.discard(spec)
break
return modules_to_keep return modules_to_keep
def _filter_expected_modules(
variant_name: AnyStr,
variant_arches: List[AnyStr],
expected_modules: Set[AnyStr],
filtered_modules: List[Tuple[AnyStr, Dict[AnyStr, List[AnyStr]]]],
) -> set:
"""
Function filters out all modules which are listed in Pungi config.
Those modules can be absent in koji env so we must remove it from
the expected modules list otherwise Pungi will fail
"""
for variant_regexp, filters_dict in filtered_modules:
for arch, modules in filters_dict.items():
arch = '.*' if arch == '*' else arch
variant_regexp = '.*' if variant_regexp == '*' else variant_regexp
modules = ['.*' if module == '*' else module for module in modules]
cond1 = re.findall(
variant_regexp,
variant_name,
)
cond2 = any(
re.findall(
arch,
variant_arch,
) for variant_arch in variant_arches
)
if cond1 and cond2:
expected_modules = {
expected_module for expected_module in expected_modules if
not any(
re.findall(
filtered_module,
expected_module,
) for filtered_module in modules
)
}
return expected_modules
def _get_modules_from_koji_tags( def _get_modules_from_koji_tags(
compose, koji_wrapper, event_id, variant, variant_tags, tag_to_mmd compose, koji_wrapper, event_id, variant, variant_tags, tag_to_mmd
): ):
@ -574,13 +536,7 @@ def _get_modules_from_koji_tags(
] ]
# Get set of configured module names for this variant. If nothing is # Get set of configured module names for this variant. If nothing is
# configured, the set is empty. # configured, the set is empty.
expected_modules = [] expected_modules = set(spec["name"] for spec in variant.get_modules())
for spec in variant.get_modules():
name, stream = spec['name'].split(':')
expected_modules.append(
':'.join((name, stream.replace('-', '_')))
)
expected_modules = set(expected_modules)
# Find out all modules in every variant and add their Koji tags # Find out all modules in every variant and add their Koji tags
# to variant and variant_tags list. # to variant and variant_tags list.
koji_proxy = koji_wrapper.koji_proxy koji_proxy = koji_wrapper.koji_proxy
@ -675,12 +631,7 @@ def _get_modules_from_koji_tags(
# needed in createrepo phase where metadata is exposed by # needed in createrepo phase where metadata is exposed by
# productmd # productmd
variant.module_uid_to_koji_tag[nsvc] = module_tag variant.module_uid_to_koji_tag[nsvc] = module_tag
expected_modules = _filter_expected_modules(
variant_name=variant.name,
variant_arches=variant.arches,
expected_modules=expected_modules,
filtered_modules=compose.conf['filter_modules'],
)
if expected_modules: if expected_modules:
# There are some module names that were listed in configuration and not # There are some module names that were listed in configuration and not
# found in any tag... # found in any tag...

View File

@ -0,0 +1,931 @@
# -*- coding: utf-8 -*-
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; version 2 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Library General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <https://gnu.org/licenses/>.
import os
import json
import re
import functools
from fnmatch import fnmatch
from itertools import groupby
from kobo.rpmlib import parse_nvra
from kobo.shortcuts import force_list
from typing import (
Dict,
AnyStr,
List,
Tuple,
Set,
)
import pungi.wrappers.kojiwrapper
from pungi.wrappers.comps import CompsWrapper
from pungi.wrappers.mbs import MBSWrapper
import pungi.phases.pkgset.pkgsets
from pungi.util import retry, get_arch_variant_data, get_variant_data
from pungi.module_util import Modulemd
from pungi.phases.pkgset.common import MaterializedPackageSet, get_all_arches
from pungi.phases.gather import get_packages_to_gather
import pungi.phases.pkgset.source
def variant_dict_from_str(compose, module_str):
"""
Method which parses module NVR string, defined in a variants file and returns
a module info dictionary instead.
For more information about format of module_str, read:
https://pagure.io/modularity/blob/master/f/source/development/
building-modules/naming-policy.rst
Pungi supports N:S, N:S:V and N:S:V:C.
Attributes:
compose: compose for which the variant_dict is generated
module_str: string, the NV(R) of module defined in a variants file.
"""
# The new format can be distinguished by colon in module_str, because
# there is not module in Fedora with colon in a name or stream and it is
# now disallowed to create one. So if colon is there, it must be new
# naming policy format.
if module_str.find(":") != -1:
module_info = {}
nsv = module_str.split(":")
if len(nsv) > 4:
raise ValueError(
'Module string "%s" is not recognized. '
"Only NAME:STREAM[:VERSION[:CONTEXT]] is allowed."
)
if len(nsv) > 3:
module_info["context"] = nsv[3]
if len(nsv) > 2:
module_info["version"] = nsv[2]
if len(nsv) > 1:
module_info["stream"] = nsv[1]
module_info["name"] = nsv[0]
return module_info
else:
# Fallback to previous old format with '-' delimiter.
compose.log_warning(
"Variant file uses old format of module definition with '-'"
"delimiter, please switch to official format defined by "
"Modules Naming Policy."
)
module_info = {}
# The regex is matching a string which should represent the release number
# of a module. The release number is in format: "%Y%m%d%H%M%S"
release_regex = re.compile(r"^(\d){14}$")
section_start = module_str.rfind("-")
module_str_first_part = module_str[section_start + 1 :]
if release_regex.match(module_str_first_part):
module_info["version"] = module_str_first_part
module_str = module_str[:section_start]
section_start = module_str.rfind("-")
module_info["stream"] = module_str[section_start + 1 :]
else:
module_info["stream"] = module_str_first_part
module_info["name"] = module_str[:section_start]
return module_info
@retry(wait_on=IOError)
def get_koji_modules(compose, koji_wrapper, event, module_info_str):
"""
:param koji_wrapper: koji wrapper instance
:param event: event at which to perform the query
:param module_info_str: str, mmd or module dict
:return final list of module_info which pass repoclosure
"""
koji_proxy = koji_wrapper.koji_proxy
module_info = variant_dict_from_str(compose, module_info_str)
# We need to format the query string to koji reguirements. The
# transformation to NVR for use in Koji has to match what MBS is doing when
# importing the build.
query_str = "%s-%s-%s.%s" % (
module_info["name"],
module_info["stream"].replace("-", "_"),
module_info.get("version", "*"),
module_info.get("context", "*"),
)
query_str = query_str.replace("*.*", "*")
koji_builds = koji_proxy.search(query_str, "build", "glob")
modules = []
for build in koji_builds:
md = koji_proxy.getBuild(build["id"])
if md["completion_ts"] > event["ts"]:
# The build finished after the event at which we are limited to,
# ignore it.
compose.log_debug(
"Module build %s is too new, ignoring it." % build["name"]
)
continue
if not md["extra"]:
continue
try:
md["tag"] = md["extra"]["typeinfo"]["module"]["content_koji_tag"]
# Store module versioning information into the dict, but make sure
# not to overwrite any existing keys.
md["module_stream"] = md["extra"]["typeinfo"]["module"]["stream"]
md["module_version"] = int(md["extra"]["typeinfo"]["module"]["version"])
md["module_context"] = md["extra"]["typeinfo"]["module"]["context"]
except KeyError:
continue
if md["state"] == pungi.wrappers.kojiwrapper.KOJI_BUILD_DELETED:
compose.log_debug(
"Module build %s has been deleted, ignoring it." % build["name"]
)
continue
modules.append(md)
if not modules:
raise ValueError(
"No module build found for %r (queried for %r)"
% (module_info_str, query_str)
)
# If there is version provided, then all modules with that version will go
# in. In case version is missing, we will find the latest version and
# include all modules with that version.
if not module_info.get("version"):
# select all found modules with latest version
sorted_modules = sorted(
modules, key=lambda item: item["module_version"], reverse=True
)
latest_version = sorted_modules[0]["module_version"]
modules = [
module for module in modules if latest_version == module["module_version"]
]
return modules
class PkgsetSourceKojiMock(pungi.phases.pkgset.source.PkgsetSourceBase):
enabled = True
def __call__(self):
compose = self.compose
koji_profile = compose.conf["koji_profile"]
self.koji_wrapper = pungi.wrappers.kojiwrapper.KojiMockWrapper(
koji_profile
)
# path prefix must contain trailing '/'
path_prefix = self.koji_wrapper.koji_module.config.topdir.rstrip("/") + "/"
package_sets = get_pkgset_from_koji(
self.compose, self.koji_wrapper, path_prefix
)
return (package_sets, path_prefix)
def get_pkgset_from_koji(compose, koji_wrapper, path_prefix):
event_info = get_koji_event_info(compose, koji_wrapper)
return populate_global_pkgset(compose, koji_wrapper, path_prefix, event_info)
def _add_module_to_variant(
koji_wrapper, variant, build, add_to_variant_modules=False, compose=None
):
"""
Adds module defined by Koji build info to variant.
:param Variant variant: Variant to add the module to.
:param int: build id
:param bool add_to_variant_modules: Adds the modules also to
variant.modules.
:param compose: Compose object to get filters from
"""
mmds = {}
archives = koji_wrapper.koji_proxy.listArchives(build["id"])
for archive in archives:
if archive["btype"] != "module":
# Skip non module archives
continue
filename = archive["filename"]
file_path = os.path.join(
koji_wrapper.koji_module.pathinfo.topdir,
'modules',
build['arch'],
build['extra']['typeinfo']['module']['content_koji_tag']
)
mmds[filename] = file_path
if len(mmds) <= 1:
# There was only one modulemd file. This means the build is rather old
# and final modulemd files were not uploaded. Such modules are no
# longer supported and should be rebuilt. Let's skip it.
return
info = build["extra"]["typeinfo"]["module"]
nsvc = "%(name)s:%(stream)s:%(version)s:%(context)s" % info
added = False
for arch in variant.arches:
if _is_filtered_out(compose, variant, arch, info["name"], info["stream"]):
compose.log_debug("Module %s is filtered from %s.%s", nsvc, variant, arch)
continue
try:
mmd = Modulemd.ModuleStream.read_file(
mmds["modulemd.%s.txt" % arch], strict=True
)
variant.arch_mmds.setdefault(arch, {})[nsvc] = mmd
added = True
except KeyError:
# There is no modulemd for this arch. This could mean an arch was
# added to the compose after the module was built. We don't want to
# process this, let's skip this module.
pass
if not added:
# The module is filtered on all arches of this variant.
return None
if add_to_variant_modules:
variant.modules.append({"name": nsvc, "glob": False})
return nsvc
def _add_extra_modules_to_variant(
compose, koji_wrapper, variant, extra_modules, variant_tags, tag_to_mmd
):
for nsvc in extra_modules:
msg = "Adding extra module build '%s' to variant '%s'" % (nsvc, variant)
compose.log_info(msg)
nsvc_info = nsvc.split(":")
if len(nsvc_info) != 4:
raise ValueError("Module %s does not in N:S:V:C format" % nsvc)
koji_build = koji_wrapper.koji_proxy.getBuild(
"%s-%s-%s.%s" % tuple(nsvc_info), True
)
added = _add_module_to_variant(
koji_wrapper, variant, koji_build, compose=compose
)
if not added:
compose.log_warning("%s - Failed" % msg)
continue
tag = koji_build["extra"]["typeinfo"]["module"]["content_koji_tag"]
variant_tags[variant].append(tag)
tag_to_mmd.setdefault(tag, {})
for arch in variant.arch_mmds:
try:
mmd = variant.arch_mmds[arch][nsvc]
except KeyError:
# Module was filtered from here
continue
tag_to_mmd[tag].setdefault(arch, set()).add(mmd)
if tag_to_mmd[tag]:
compose.log_info(
"Extra module '%s' in variant '%s' will use Koji tag '%s'"
% (nsvc, variant, tag)
)
# Store mapping NSVC --> koji_tag into variant. This is needed
# in createrepo phase where metadata is exposed by producmd
variant.module_uid_to_koji_tag[nsvc] = tag
def _add_scratch_modules_to_variant(
compose, variant, scratch_modules, variant_tags, tag_to_mmd
):
if compose.compose_type != "test" and scratch_modules:
compose.log_warning("Only test composes could include scratch module builds")
return
mbs = MBSWrapper(compose.conf["mbs_api_url"])
for nsvc in scratch_modules:
module_build = mbs.get_module_build_by_nsvc(nsvc)
if not module_build:
continue
try:
final_modulemd = mbs.final_modulemd(module_build["id"])
except Exception:
compose.log_error("Unable to get modulemd for build %s" % module_build)
raise
tag = module_build["koji_tag"]
variant_tags[variant].append(tag)
tag_to_mmd.setdefault(tag, {})
for arch in variant.arches:
try:
mmd = Modulemd.ModuleStream.read_string(
final_modulemd[arch], strict=True
)
variant.arch_mmds.setdefault(arch, {})[nsvc] = mmd
except KeyError:
continue
tag_to_mmd[tag].setdefault(arch, set()).add(mmd)
if tag_to_mmd[tag]:
compose.log_info(
"Module '%s' in variant '%s' will use Koji tag '%s' "
"(as a result of querying module '%s')",
nsvc,
variant,
tag,
module_build["name"],
)
# Store mapping NSVC --> koji_tag into variant. This is needed
# in createrepo phase where metadata is exposed by productmd
variant.module_uid_to_koji_tag[nsvc] = tag
def _is_filtered_out(compose, variant, arch, module_name, module_stream):
"""Check if module with given name and stream is filter out from this stream."""
if not compose:
return False
for filter in get_arch_variant_data(compose.conf, "filter_modules", arch, variant):
if ":" not in filter:
name_filter = filter
stream_filter = "*"
else:
name_filter, stream_filter = filter.split(":", 1)
if fnmatch(module_name, name_filter) and fnmatch(module_stream, stream_filter):
return True
return False
def _get_modules_from_koji(
compose, koji_wrapper, event, variant, variant_tags, tag_to_mmd
):
"""
Loads modules for given `variant` from koji `session`, adds them to
the `variant` and also to `variant_tags` dict.
:param Compose compose: Compose for which the modules are found.
:param koji_wrapper: We will obtain koji session from the wrapper.
:param Variant variant: Variant with modules to find.
:param dict variant_tags: Dict populated by this method. Key is `variant`
and value is list of Koji tags to get the RPMs from.
"""
# Find out all modules in every variant and add their Koji tags
# to variant and variant_tags list.
for module in variant.get_modules():
koji_modules = get_koji_modules(compose, koji_wrapper, event, module["name"])
for koji_module in koji_modules:
nsvc = _add_module_to_variant(
koji_wrapper, variant, koji_module, compose=compose
)
if not nsvc:
continue
tag = koji_module["tag"]
variant_tags[variant].append(tag)
tag_to_mmd.setdefault(tag, {})
for arch in variant.arch_mmds:
try:
mmd = variant.arch_mmds[arch][nsvc]
except KeyError:
# Module was filtered from here
continue
tag_to_mmd[tag].setdefault(arch, set()).add(mmd)
if tag_to_mmd[tag]:
compose.log_info(
"Module '%s' in variant '%s' will use Koji tag '%s' "
"(as a result of querying module '%s')",
nsvc,
variant,
tag,
module["name"],
)
# Store mapping NSVC --> koji_tag into variant. This is needed
# in createrepo phase where metadata is exposed by producmd
variant.module_uid_to_koji_tag[nsvc] = tag
def filter_inherited(koji_proxy, event, module_builds, top_tag):
"""Look at the tag inheritance and keep builds only from the topmost tag.
Using latest=True for listTagged() call would automatically do this, but it
does not understand streams, so we have to reimplement it here.
"""
inheritance = [
tag["name"] for tag in koji_proxy.getFullInheritance(top_tag, event=event["id"])
]
def keyfunc(mb):
return (mb["name"], mb["version"])
result = []
# Group modules by Name-Stream
for _, builds in groupby(sorted(module_builds, key=keyfunc), keyfunc):
builds = list(builds)
# For each N-S combination find out which tags it's in
available_in = set(build["tag_name"] for build in builds)
# And find out which is the topmost tag
for tag in [top_tag] + inheritance:
if tag in available_in:
break
# And keep only builds from that topmost tag
result.extend(build for build in builds if build["tag_name"] == tag)
return result
def filter_by_whitelist(compose, module_builds, input_modules, expected_modules):
"""
Exclude modules from the list that do not match any pattern specified in
input_modules. Order may not be preserved. The last argument is a set of
module patterns that are expected across module tags. When a matching
module is found, the corresponding pattern is removed from the set.
"""
nvr_patterns = set()
for spec in input_modules:
# Do not do any filtering in case variant wants all the modules. Also
# empty the set of remaining expected modules, as the check does not
# really make much sense here.
if spec["name"] == "*":
expected_modules.clear()
return module_builds
info = variant_dict_from_str(compose, spec["name"])
pattern = (
info["name"],
info["stream"].replace("-", "_"),
info.get("version"),
info.get("context"),
)
nvr_patterns.add((pattern, spec["name"]))
modules_to_keep = []
for mb in sorted(module_builds, key=lambda i: i['name']):
# Split release from the build into version and context
ver, ctx = mb["release"].split(".")
# Values in `mb` are from Koji build. There's nvr and name, version and
# release. The input pattern specifies modular name, stream, version
# and context.
for (n, s, v, c), spec in sorted(nvr_patterns):
if (
# We always have a name and stream...
mb["name"] == n
and mb["version"] == s
# ...but version and context can be missing, in which case we
# don't want to check them.
and (not v or ver == v)
and (not c or ctx == c)
):
modules_to_keep.append(mb)
expected_modules.discard(spec)
return modules_to_keep
def _filter_expected_modules(
variant_name: AnyStr,
variant_arches: List[AnyStr],
expected_modules: Set[AnyStr],
filtered_modules: List[Tuple[AnyStr, Dict[AnyStr, List[AnyStr]]]],
) -> set:
"""
Function filters out all modules which are listed in Pungi config.
Those modules can be absent in koji env so we must remove it from
the expected modules list otherwise Pungi will fail
"""
for variant_regexp, filters_dict in filtered_modules:
for arch, modules in filters_dict.items():
arch = '.*' if arch == '*' else arch
variant_regexp = '.*' if variant_regexp == '*' else variant_regexp
modules = ['.*' if module == '*' else module for module in modules]
cond1 = re.findall(
variant_regexp,
variant_name,
)
cond2 = any(
re.findall(
arch,
variant_arch,
) for variant_arch in variant_arches
)
if cond1 and cond2:
expected_modules = {
expected_module for expected_module in expected_modules if
not any(
re.findall(
filtered_module,
expected_module,
) for filtered_module in modules
)
}
return expected_modules
def _get_modules_from_koji_tags(
compose, koji_wrapper, event_id, variant, variant_tags, tag_to_mmd
):
"""
Loads modules for given `variant` from Koji, adds them to
the `variant` and also to `variant_tags` dict.
:param Compose compose: Compose for which the modules are found.
:param KojiWrapper koji_wrapper: Koji wrapper.
:param dict event_id: Koji event ID.
:param Variant variant: Variant with modules to find.
:param dict variant_tags: Dict populated by this method. Key is `variant`
and value is list of Koji tags to get the RPMs from.
"""
# Compose tags from configuration
compose_tags = [
{"name": tag} for tag in force_list(compose.conf["pkgset_koji_module_tag"])
]
# Get set of configured module names for this variant. If nothing is
# configured, the set is empty.
expected_modules = []
for spec in variant.get_modules():
name, stream = spec['name'].split(':')
expected_modules.append(
':'.join((name, stream.replace('-', '_')))
)
expected_modules = set(expected_modules)
# Find out all modules in every variant and add their Koji tags
# to variant and variant_tags list.
koji_proxy = koji_wrapper.koji_proxy
for modular_koji_tag in variant.get_modular_koji_tags() + compose_tags:
tag = modular_koji_tag["name"]
# List all the modular builds in the modular Koji tag.
# We cannot use latest=True here, because we need to get all the
# available streams of all modules. The stream is represented as
# "release" in Koji build and with latest=True, Koji would return
# only builds with highest release.
module_builds = koji_proxy.listTagged(
tag, event=event_id["id"], inherit=True, type="module"
)
# Filter out builds inherited from non-top tag
module_builds = filter_inherited(koji_proxy, event_id, module_builds, tag)
# Apply whitelist of modules if specified.
variant_modules = variant.get_modules()
if variant_modules:
module_builds = filter_by_whitelist(
compose, module_builds, variant_modules, expected_modules
)
# Find the latest builds of all modules. This does following:
# - Sorts the module_builds descending by Koji NVR (which maps to NSV
# for modules). Split release into modular version and context, and
# treat version as numeric.
# - Groups the sorted module_builds by NV (NS in modular world).
# In each resulting `ns_group`, the first item is actually build
# with the latest version (because the list is still sorted by NVR).
# - Groups the `ns_group` again by "release" ("version" in modular
# world) to just get all the "contexts" of the given NSV. This is
# stored in `nsv_builds`.
# - The `nsv_builds` contains the builds representing all the contexts
# of the latest version for give name-stream, so add them to
# `latest_builds`.
def _key(build):
ver, ctx = build["release"].split(".", 1)
return build["name"], build["version"], int(ver), ctx
latest_builds = []
module_builds = sorted(module_builds, key=_key, reverse=True)
for ns, ns_builds in groupby(
module_builds, key=lambda x: ":".join([x["name"], x["version"]])
):
for nsv, nsv_builds in groupby(
ns_builds, key=lambda x: x["release"].split(".")[0]
):
latest_builds += list(nsv_builds)
break
# For each latest modular Koji build, add it to variant and
# variant_tags.
for build in latest_builds:
# Get the Build from Koji to get modulemd and module_tag.
build = koji_proxy.getBuild(build["build_id"])
module_tag = (
build.get("extra", {})
.get("typeinfo", {})
.get("module", {})
.get("content_koji_tag", "")
)
variant_tags[variant].append(module_tag)
nsvc = _add_module_to_variant(
koji_wrapper, variant, build, True, compose=compose
)
if not nsvc:
continue
tag_to_mmd.setdefault(module_tag, {})
for arch in variant.arch_mmds:
try:
mmd = variant.arch_mmds[arch][nsvc]
except KeyError:
# Module was filtered from here
continue
tag_to_mmd[module_tag].setdefault(arch, set()).add(mmd)
if tag_to_mmd[module_tag]:
compose.log_info(
"Module %s in variant %s will use Koji tag %s.",
nsvc,
variant,
module_tag,
)
# Store mapping module-uid --> koji_tag into variant. This is
# needed in createrepo phase where metadata is exposed by
# productmd
variant.module_uid_to_koji_tag[nsvc] = module_tag
expected_modules = _filter_expected_modules(
variant_name=variant.name,
variant_arches=variant.arches,
expected_modules=expected_modules,
filtered_modules=compose.conf['filter_modules'],
)
if expected_modules:
# There are some module names that were listed in configuration and not
# found in any tag...
raise RuntimeError(
"Configuration specified patterns (%s) that don't match "
"any modules in the configured tags." % ", ".join(expected_modules)
)
def populate_global_pkgset(compose, koji_wrapper, path_prefix, event):
all_arches = get_all_arches(compose)
# List of compose tags from which we create this compose
compose_tags = []
# List of compose_tags per variant
variant_tags = {}
# In case we use "nodeps" gather_method, we might know the final list of
# packages which will end up in the compose even now, so instead of reading
# all the packages from Koji tag, we can just cherry-pick the ones which
# are really needed to do the compose and save lot of time and resources
# here. This only works if we are not creating bootable images. Those could
# include packages that are not in the compose.
packages_to_gather, groups = get_packages_to_gather(
compose, include_arch=False, include_prepopulated=True
)
if groups:
comps = CompsWrapper(compose.paths.work.comps())
for group in groups:
packages_to_gather += comps.get_packages(group)
if compose.conf["gather_method"] == "nodeps" and not compose.conf.get(
"buildinstall_method"
):
populate_only_packages_to_gather = True
else:
populate_only_packages_to_gather = False
# In case we use "deps" gather_method, there might be some packages in
# the Koji tag which are not signed with proper sigkey. However, these
# packages might never end up in a compose depending on which packages
# from the Koji tag are requested how the deps are resolved in the end.
# In this case, we allow even packages with invalid sigkeys to be returned
# by PKGSET phase and later, the gather phase checks its results and if
# there are some packages with invalid sigkeys, it raises an exception.
allow_invalid_sigkeys = compose.conf["gather_method"] == "deps"
tag_to_mmd = {}
pkgset_koji_tags = force_list(compose.conf.get("pkgset_koji_tag", []))
for variant in compose.all_variants.values():
variant_tags[variant] = []
# Get the modules from Koji tag
modular_koji_tags = variant.get_modular_koji_tags()
if (variant.modules or modular_koji_tags) and not Modulemd:
raise ValueError(
"pygobject module or libmodulemd library is not installed, "
"support for modules is disabled, but compose contains "
"modules."
)
if modular_koji_tags or (
compose.conf["pkgset_koji_module_tag"] and variant.modules
):
# List modules tagged in particular tags.
_get_modules_from_koji_tags(
compose, koji_wrapper, event, variant, variant_tags, tag_to_mmd
)
elif variant.modules:
# Search each module in Koji separately. Tagging does not come into
# play here.
_get_modules_from_koji(
compose, koji_wrapper, event, variant, variant_tags, tag_to_mmd
)
extra_modules = get_variant_data(
compose.conf, "pkgset_koji_module_builds", variant
)
if extra_modules:
_add_extra_modules_to_variant(
compose, koji_wrapper, variant, extra_modules, variant_tags, tag_to_mmd
)
variant_scratch_modules = get_variant_data(
compose.conf, "pkgset_scratch_modules", variant
)
if variant_scratch_modules:
_add_scratch_modules_to_variant(
compose, variant, variant_scratch_modules, variant_tags, tag_to_mmd
)
# Ensure that every tag added to `variant_tags` is added also to
# `compose_tags`.
for variant_tag in variant_tags[variant]:
if variant_tag not in compose_tags:
compose_tags.append(variant_tag)
variant_tags[variant].extend(pkgset_koji_tags)
# Add global tag(s) if supplied.
compose_tags.extend(pkgset_koji_tags)
inherit = compose.conf["pkgset_koji_inherit"]
inherit_modules = compose.conf["pkgset_koji_inherit_modules"]
pkgsets = []
# Get package set for each compose tag and merge it to global package
# list. Also prepare per-variant pkgset, because we do not have list
# of binary RPMs in module definition - there is just list of SRPMs.
for compose_tag in compose_tags:
compose.log_info("Loading package set for tag %s", compose_tag)
if compose_tag in pkgset_koji_tags:
extra_builds = force_list(compose.conf.get("pkgset_koji_builds", []))
extra_tasks = force_list(compose.conf.get("pkgset_koji_scratch_tasks", []))
else:
extra_builds = []
extra_tasks = []
pkgset = pungi.phases.pkgset.pkgsets.KojiMockPackageSet(
compose_tag,
koji_wrapper,
compose.conf["sigkeys"],
logger=compose._logger,
arches=all_arches,
packages=packages_to_gather,
allow_invalid_sigkeys=allow_invalid_sigkeys,
populate_only_packages=populate_only_packages_to_gather,
cache_region=compose.cache_region,
extra_builds=extra_builds,
extra_tasks=extra_tasks,
)
# Check if we have cache for this tag from previous compose. If so, use
# it.
old_cache_path = compose.paths.old_compose_path(
compose.paths.work.pkgset_file_cache(compose_tag)
)
if old_cache_path:
pkgset.set_old_file_cache(
pungi.phases.pkgset.pkgsets.KojiPackageSet.load_old_file_cache(
old_cache_path
)
)
is_traditional = compose_tag in compose.conf.get("pkgset_koji_tag", [])
should_inherit = inherit if is_traditional else inherit_modules
# If we're processing a modular tag, we have an exact list of
# packages that will be used. This is basically a workaround for
# tagging working on build level, not rpm level. A module tag may
# build a package but not want it included. This should include
# only packages that are actually in modules. It's possible two
# module builds will use the same tag, particularly a -devel module
# is sharing a tag with its regular version.
# The ultimate goal of the mapping is to avoid a package built in modular
# tag to be used as a dependency of some non-modular package.
modular_packages = set()
for variant in compose.all_variants.values():
for nsvc, modular_tag in variant.module_uid_to_koji_tag.items():
if modular_tag != compose_tag:
# Not current tag, skip it
continue
for arch_modules in variant.arch_mmds.values():
try:
module = arch_modules[nsvc]
except KeyError:
# The module was filtered out
continue
for rpm_nevra in module.get_rpm_artifacts():
nevra = parse_nvra(rpm_nevra)
modular_packages.add((nevra["name"], nevra["arch"]))
pkgset.try_to_reuse(
compose,
compose_tag,
inherit=should_inherit,
include_packages=modular_packages,
)
if pkgset.reuse is None:
pkgset.populate(
compose_tag,
event,
inherit=should_inherit,
include_packages=modular_packages,
)
for variant in compose.all_variants.values():
if compose_tag in variant_tags[variant]:
# If it's a modular tag, store the package set for the module.
for nsvc, koji_tag in variant.module_uid_to_koji_tag.items():
if compose_tag == koji_tag:
# It should not be needed,
# we can get package sets by name.
variant.nsvc_to_pkgset[nsvc] = pkgset
# Optimization for case where we have just single compose
# tag - we do not have to merge in this case...
variant.pkgsets.add(compose_tag)
pkgset.write_reuse_file(compose, include_packages=modular_packages)
pkgsets.append(pkgset)
# Create MaterializedPackageSets.
partials = []
for pkgset in pkgsets:
partials.append(
functools.partial(
MaterializedPackageSet.create,
compose,
pkgset,
path_prefix,
mmd=tag_to_mmd.get(pkgset.name),
)
)
return MaterializedPackageSet.create_many(partials)
def get_koji_event_info(compose, koji_wrapper):
event_file = os.path.join(compose.paths.work.topdir(arch="global"), "koji-event")
compose.log_info("Getting koji event")
result = get_koji_event_raw(koji_wrapper, compose.koji_event, event_file)
if compose.koji_event:
compose.log_info(
"Setting koji event to a custom value: %s" % compose.koji_event
)
else:
compose.log_info("Koji event: %s" % result["id"])
return result
def get_koji_event_raw(koji_wrapper, event_id, event_file):
if event_id:
koji_event = koji_wrapper.koji_proxy.getEvent(event_id)
else:
koji_event = koji_wrapper.koji_proxy.getLastEvent()
with open(event_file, "w") as f:
json.dump(koji_event, f)
return koji_event

View File

@ -58,14 +58,9 @@ class KojiWrapper(object):
value = getattr(self.koji_module.config, key, None) value = getattr(self.koji_module.config, key, None)
if value is not None: if value is not None:
session_opts[key] = value session_opts[key] = value
if real_koji: self.koji_proxy = koji.ClientSession(
self.koji_proxy = koji.ClientSession( self.koji_module.config.server, session_opts
self.koji_module.config.server, session_opts )
)
else:
self.koji_proxy = KojiMock(
packages_dir=self.koji_module.config.topdir,
modules_dir=os.path.join(self.koji_module.config.topdir, 'modules'))
def login(self): def login(self):
"""Authenticate to the hub.""" """Authenticate to the hub."""
@ -822,6 +817,35 @@ class KojiWrapper(object):
return self.multicall_map(*args, **kwargs) return self.multicall_map(*args, **kwargs)
class KojiMockWrapper(object):
lock = threading.Lock()
def __init__(self, profile):
self.profile = profile
with self.lock:
self.koji_module = koji.get_profile_module(profile)
session_opts = {}
for key in (
"timeout",
"keepalive",
"max_retries",
"retry_interval",
"anon_retry",
"offline_retry",
"offline_retry_interval",
"debug",
"debug_xmlrpc",
"serverca",
"use_fast_upload",
):
value = getattr(self.koji_module.config, key, None)
if value is not None:
session_opts[key] = value
self.koji_proxy = KojiMock(
packages_dir=self.koji_module.config.topdir,
modules_dir=os.path.join(self.koji_module.config.topdir, 'modules'))
def get_buildroot_rpms(compose, task_id): def get_buildroot_rpms(compose, task_id):
"""Get build root RPMs - either from runroot or local""" """Get build root RPMs - either from runroot or local"""
result = [] result = []

View File

@ -49,7 +49,7 @@ class KojiWrapperBaseTestCase(unittest.TestCase):
) )
) )
self.koji_profile = koji.get_profile_module.return_value self.koji_profile = koji.get_profile_module.return_value
self.koji = KojiWrapper("custom-koji", real_koji=True) self.koji = KojiWrapper("custom-koji")
def tearDown(self): def tearDown(self):
os.remove(self.tmpfile) os.remove(self.tmpfile)

View File

@ -1,5 +1,5 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import ddt as ddt
import mock import mock
import os import os
import six import six
@ -133,6 +133,7 @@ class PkgsetCompareMixin(object):
self.assertEqual({}, actual) self.assertEqual({}, actual)
@ddt.ddt
@mock.patch("pungi.phases.pkgset.pkgsets.ReaderPool", new=FakePool) @mock.patch("pungi.phases.pkgset.pkgsets.ReaderPool", new=FakePool)
@mock.patch("kobo.pkgset.FileCache", new=MockFileCache) @mock.patch("kobo.pkgset.FileCache", new=MockFileCache)
class TestKojiPkgset(PkgsetCompareMixin, helpers.PungiTestCase): class TestKojiPkgset(PkgsetCompareMixin, helpers.PungiTestCase):
@ -158,7 +159,11 @@ class TestKojiPkgset(PkgsetCompareMixin, helpers.PungiTestCase):
six.assertCountEqual(self, v1, v2) six.assertCountEqual(self, v1, v2)
self.assertEqual({}, actual, msg="Some architectures were missing") self.assertEqual({}, actual, msg="Some architectures were missing")
def test_all_arches(self): @ddt.data(
pkgsets.KojiMockPackageSet,
pkgsets.KojiPackageSet,
)
def test_all_arches(self, package_set):
self._touch_files( self._touch_files(
[ [
"rpms/pungi@4.1.3@3.fc25@noarch", "rpms/pungi@4.1.3@3.fc25@noarch",
@ -171,7 +176,7 @@ class TestKojiPkgset(PkgsetCompareMixin, helpers.PungiTestCase):
] ]
) )
pkgset = pkgsets.KojiPackageSet("pkgset", self.koji_wrapper, [None]) pkgset = package_set("pkgset", self.koji_wrapper, [None])
result = pkgset.populate("f25") result = pkgset.populate("f25")
@ -196,7 +201,11 @@ class TestKojiPkgset(PkgsetCompareMixin, helpers.PungiTestCase):
}, },
) )
def test_only_one_arch(self): @ddt.data(
pkgsets.KojiPackageSet,
pkgsets.KojiMockPackageSet,
)
def test_only_one_arch(self, package_set):
self._touch_files( self._touch_files(
[ [
"rpms/bash@4.3.42@4.fc24@x86_64", "rpms/bash@4.3.42@4.fc24@x86_64",
@ -204,7 +213,7 @@ class TestKojiPkgset(PkgsetCompareMixin, helpers.PungiTestCase):
] ]
) )
pkgset = pkgsets.KojiPackageSet( pkgset = package_set(
"pkgset", self.koji_wrapper, [None], arches=["x86_64"] "pkgset", self.koji_wrapper, [None], arches=["x86_64"]
) )
@ -225,7 +234,6 @@ class TestKojiPkgset(PkgsetCompareMixin, helpers.PungiTestCase):
}, },
) )
@unittest.skip('Unneeded after lenix patch')
def test_find_signed_with_preference(self): def test_find_signed_with_preference(self):
self._touch_files( self._touch_files(
[ [
@ -256,7 +264,6 @@ class TestKojiPkgset(PkgsetCompareMixin, helpers.PungiTestCase):
}, },
) )
@unittest.skip('Unneeded after lenix patch')
def test_find_signed_fallback_unsigned(self): def test_find_signed_fallback_unsigned(self):
self._touch_files( self._touch_files(
[ [
@ -286,7 +293,6 @@ class TestKojiPkgset(PkgsetCompareMixin, helpers.PungiTestCase):
}, },
) )
@unittest.skip('Unneeded after lenix patch')
def test_can_not_find_signed_package(self): def test_can_not_find_signed_package(self):
pkgset = pkgsets.KojiPackageSet( pkgset = pkgsets.KojiPackageSet(
"pkgset", self.koji_wrapper, ["cafebabe"], arches=["x86_64"] "pkgset", self.koji_wrapper, ["cafebabe"], arches=["x86_64"]
@ -306,7 +312,6 @@ class TestKojiPkgset(PkgsetCompareMixin, helpers.PungiTestCase):
) )
self.assertRegex(str(ctx.exception), figure) self.assertRegex(str(ctx.exception), figure)
@unittest.skip('Unneeded after lenix patch')
def test_can_not_find_signed_package_allow_invalid_sigkeys(self): def test_can_not_find_signed_package_allow_invalid_sigkeys(self):
pkgset = pkgsets.KojiPackageSet( pkgset = pkgsets.KojiPackageSet(
"pkgset", "pkgset",
@ -332,7 +337,6 @@ class TestKojiPkgset(PkgsetCompareMixin, helpers.PungiTestCase):
) )
self.assertRegex(str(ctx.exception), figure) self.assertRegex(str(ctx.exception), figure)
@unittest.skip('Unneeded after lenix patch')
def test_can_not_find_any_package(self): def test_can_not_find_any_package(self):
pkgset = pkgsets.KojiPackageSet( pkgset = pkgsets.KojiPackageSet(
"pkgset", self.koji_wrapper, ["cafebabe", None], arches=["x86_64"] "pkgset", self.koji_wrapper, ["cafebabe", None], arches=["x86_64"]
@ -351,7 +355,11 @@ class TestKojiPkgset(PkgsetCompareMixin, helpers.PungiTestCase):
r"^RPM\(s\) not found for sigs: .+Check log for details.+", r"^RPM\(s\) not found for sigs: .+Check log for details.+",
) )
def test_packages_attribute(self): @ddt.data(
pkgsets.KojiPackageSet,
pkgsets.KojiMockPackageSet,
)
def test_packages_attribute(self, package_set):
self._touch_files( self._touch_files(
[ [
"rpms/pungi@4.1.3@3.fc25@noarch", "rpms/pungi@4.1.3@3.fc25@noarch",
@ -364,7 +372,7 @@ class TestKojiPkgset(PkgsetCompareMixin, helpers.PungiTestCase):
] ]
) )
pkgset = pkgsets.KojiPackageSet( pkgset = package_set(
"pkgset", "pkgset",
self.koji_wrapper, self.koji_wrapper,
[None], [None],
@ -388,8 +396,12 @@ class TestKojiPkgset(PkgsetCompareMixin, helpers.PungiTestCase):
}, },
) )
def test_get_extra_rpms_from_tasks(self): @ddt.data(
pkgset = pkgsets.KojiPackageSet( pkgsets.KojiPackageSet,
pkgsets.KojiMockPackageSet,
)
def test_get_extra_rpms_from_tasks(self, package_set):
pkgset = package_set(
"pkgset", "pkgset",
self.koji_wrapper, self.koji_wrapper,
[None], [None],
@ -455,7 +467,11 @@ class TestKojiPkgset(PkgsetCompareMixin, helpers.PungiTestCase):
rpms = pkgset.get_extra_rpms_from_tasks() rpms = pkgset.get_extra_rpms_from_tasks()
self.assertEqual(rpms, expected_rpms) self.assertEqual(rpms, expected_rpms)
def test_get_latest_rpms_cache(self): @ddt.data(
pkgsets.KojiMockPackageSet,
pkgsets.KojiPackageSet,
)
def test_get_latest_rpms_cache(self, package_set):
self._touch_files( self._touch_files(
[ [
"rpms/bash@4.3.42@4.fc24@x86_64", "rpms/bash@4.3.42@4.fc24@x86_64",
@ -464,7 +480,7 @@ class TestKojiPkgset(PkgsetCompareMixin, helpers.PungiTestCase):
) )
cache_region = make_region().configure("dogpile.cache.memory") cache_region = make_region().configure("dogpile.cache.memory")
pkgset = pkgsets.KojiPackageSet( pkgset = package_set(
"pkgset", "pkgset",
self.koji_wrapper, self.koji_wrapper,
[None], [None],
@ -494,7 +510,11 @@ class TestKojiPkgset(PkgsetCompareMixin, helpers.PungiTestCase):
}, },
) )
def test_get_latest_rpms_cache_different_id(self): @ddt.data(
pkgsets.KojiMockPackageSet,
pkgsets.KojiPackageSet,
)
def test_get_latest_rpms_cache_different_id(self, package_set):
self._touch_files( self._touch_files(
[ [
"rpms/bash@4.3.42@4.fc24@x86_64", "rpms/bash@4.3.42@4.fc24@x86_64",
@ -503,7 +523,7 @@ class TestKojiPkgset(PkgsetCompareMixin, helpers.PungiTestCase):
) )
cache_region = make_region().configure("dogpile.cache.memory") cache_region = make_region().configure("dogpile.cache.memory")
pkgset = pkgsets.KojiPackageSet( pkgset = package_set(
"pkgset", "pkgset",
self.koji_wrapper, self.koji_wrapper,
[None], [None],
@ -530,7 +550,11 @@ class TestKojiPkgset(PkgsetCompareMixin, helpers.PungiTestCase):
}, },
) )
def test_extra_builds_attribute(self): @ddt.data(
pkgsets.KojiMockPackageSet,
pkgsets.KojiPackageSet,
)
def test_extra_builds_attribute(self, package_set):
self._touch_files( self._touch_files(
[ [
"rpms/pungi@4.1.3@3.fc25@noarch", "rpms/pungi@4.1.3@3.fc25@noarch",
@ -561,7 +585,7 @@ class TestKojiPkgset(PkgsetCompareMixin, helpers.PungiTestCase):
[b for b in self.tagged_rpms[1] if b["package_name"] != "pungi"], [b for b in self.tagged_rpms[1] if b["package_name"] != "pungi"],
] ]
pkgset = pkgsets.KojiPackageSet( pkgset = package_set(
"pkgset", self.koji_wrapper, [None], extra_builds=["pungi-4.1.3-3.fc25"] "pkgset", self.koji_wrapper, [None], extra_builds=["pungi-4.1.3-3.fc25"]
) )
@ -786,6 +810,203 @@ class TestReuseKojiPkgset(helpers.PungiTestCase):
self.assertEqual(self.pkgset.file_cache, self.pkgset.old_file_cache) self.assertEqual(self.pkgset.file_cache, self.pkgset.old_file_cache)
class TestReuseKojiMockPkgset(helpers.PungiTestCase):
def setUp(self):
super(TestReuseKojiMockPkgset, self).setUp()
self.old_compose_dir = tempfile.mkdtemp()
self.old_compose = helpers.DummyCompose(self.old_compose_dir, {})
self.compose = helpers.DummyCompose(
self.topdir, {"old_composes": os.path.dirname(self.old_compose_dir)}
)
self.koji_wrapper = mock.Mock()
self.tag = "test-tag"
self.inherited_tag = "inherited-test-tag"
self.pkgset = pkgsets.KojiMockPackageSet(
self.tag, self.koji_wrapper, [None], arches=["x86_64"]
)
self.pkgset.log_debug = mock.Mock()
self.pkgset.log_info = mock.Mock()
def assert_not_reuse(self):
self.assertIsNone(getattr(self.pkgset, "reuse", None))
def test_resue_no_old_compose_found(self):
self.pkgset.try_to_reuse(self.compose, self.tag)
self.pkgset.log_info.assert_called_once_with(
"Trying to reuse pkgset data of old compose"
)
self.pkgset.log_debug.assert_called_once_with(
"No old compose found. Nothing to reuse."
)
self.assert_not_reuse()
@mock.patch.object(helpers.paths.Paths, "get_old_compose_topdir")
def test_reuse_read_koji_event_file_failed(self, mock_old_topdir):
mock_old_topdir.return_value = self.old_compose_dir
self.pkgset._get_koji_event_from_file = mock.Mock(
side_effect=Exception("unknown error")
)
self.pkgset.try_to_reuse(self.compose, self.tag)
self.pkgset.log_debug.assert_called_once_with(
"Can't read koji event from file: unknown error"
)
self.assert_not_reuse()
@mock.patch.object(helpers.paths.Paths, "get_old_compose_topdir")
def test_reuse_build_under_tag_changed(self, mock_old_topdir):
mock_old_topdir.return_value = self.old_compose_dir
self.pkgset._get_koji_event_from_file = mock.Mock(side_effect=[3, 1])
self.koji_wrapper.koji_proxy.queryHistory.return_value = {"tag_listing": [{}]}
self.pkgset.try_to_reuse(self.compose, self.tag)
self.assertEqual(
self.pkgset.log_debug.mock_calls,
[
mock.call(
"Koji event doesn't match, querying changes between event 1 and 3"
),
mock.call("Builds under tag %s changed. Can't reuse." % self.tag),
],
)
self.assert_not_reuse()
@mock.patch.object(helpers.paths.Paths, "get_old_compose_topdir")
def test_reuse_build_under_inherited_tag_changed(self, mock_old_topdir):
mock_old_topdir.return_value = self.old_compose_dir
self.pkgset._get_koji_event_from_file = mock.Mock(side_effect=[3, 1])
self.koji_wrapper.koji_proxy.queryHistory.side_effect = [
{"tag_listing": []},
{"tag_listing": [{}]},
]
self.koji_wrapper.koji_proxy.getFullInheritance.return_value = [
{"name": self.inherited_tag}
]
self.pkgset.try_to_reuse(self.compose, self.tag)
self.assertEqual(
self.pkgset.log_debug.mock_calls,
[
mock.call(
"Koji event doesn't match, querying changes between event 1 and 3"
),
mock.call(
"Builds under inherited tag %s changed. Can't reuse."
% self.inherited_tag
),
],
)
self.assert_not_reuse()
@mock.patch("pungi.paths.os.path.exists", return_value=True)
@mock.patch.object(helpers.paths.Paths, "get_old_compose_topdir")
def test_reuse_failed_load_reuse_file(self, mock_old_topdir, mock_exists):
mock_old_topdir.return_value = self.old_compose_dir
self.pkgset._get_koji_event_from_file = mock.Mock(side_effect=[3, 1])
self.koji_wrapper.koji_proxy.queryHistory.return_value = {"tag_listing": []}
self.koji_wrapper.koji_proxy.getFullInheritance.return_value = []
self.pkgset.load_old_file_cache = mock.Mock(
side_effect=Exception("unknown error")
)
self.pkgset.try_to_reuse(self.compose, self.tag)
self.assertEqual(
self.pkgset.log_debug.mock_calls,
[
mock.call(
"Koji event doesn't match, querying changes between event 1 and 3"
),
mock.call(
"Loading reuse file: %s"
% os.path.join(
self.old_compose_dir,
"work/global",
"pkgset_%s_reuse.pickle" % self.tag,
)
),
mock.call("Failed to load reuse file: unknown error"),
],
)
self.assert_not_reuse()
@mock.patch("pungi.paths.os.path.exists", return_value=True)
@mock.patch.object(helpers.paths.Paths, "get_old_compose_topdir")
def test_reuse_criteria_not_match(self, mock_old_topdir, mock_exists):
mock_old_topdir.return_value = self.old_compose_dir
self.pkgset._get_koji_event_from_file = mock.Mock(side_effect=[3, 1])
self.koji_wrapper.koji_proxy.queryHistory.return_value = {"tag_listing": []}
self.koji_wrapper.koji_proxy.getFullInheritance.return_value = []
self.pkgset.load_old_file_cache = mock.Mock(
return_value={"allow_invalid_sigkeys": True}
)
self.pkgset.try_to_reuse(self.compose, self.tag)
self.assertEqual(
self.pkgset.log_debug.mock_calls,
[
mock.call(
"Koji event doesn't match, querying changes between event 1 and 3"
),
mock.call(
"Loading reuse file: %s"
% os.path.join(
self.old_compose_dir,
"work/global",
"pkgset_%s_reuse.pickle" % self.tag,
)
),
],
)
self.assertEqual(
self.pkgset.log_info.mock_calls,
[
mock.call("Trying to reuse pkgset data of old compose"),
mock.call("Criteria does not match. Nothing to reuse."),
],
)
self.assert_not_reuse()
@mock.patch("pungi.phases.pkgset.pkgsets.copy_all")
@mock.patch("pungi.paths.os.path.exists", return_value=True)
@mock.patch.object(helpers.paths.Paths, "get_old_compose_topdir")
def test_reuse_pkgset(self, mock_old_topdir, mock_exists, mock_copy_all):
mock_old_topdir.return_value = self.old_compose_dir
self.pkgset._get_koji_event_from_file = mock.Mock(side_effect=[3, 1])
self.koji_wrapper.koji_proxy.queryHistory.return_value = {"tag_listing": []}
self.koji_wrapper.koji_proxy.getFullInheritance.return_value = []
self.pkgset.load_old_file_cache = mock.Mock(
return_value={
"allow_invalid_sigkeys": self.pkgset._allow_invalid_sigkeys,
"packages": self.pkgset.packages,
"populate_only_packages": self.pkgset.populate_only_packages,
"extra_builds": self.pkgset.extra_builds,
"sigkeys": self.pkgset.sigkey_ordering,
"include_packages": None,
"rpms_by_arch": mock.Mock(),
"srpms_by_name": mock.Mock(),
}
)
self.pkgset.old_file_cache = mock.Mock()
self.pkgset.try_to_reuse(self.compose, self.tag)
old_repo_dir = os.path.join(self.old_compose_dir, "work/global/repo", self.tag)
self.assertEqual(
self.pkgset.log_info.mock_calls,
[
mock.call("Trying to reuse pkgset data of old compose"),
mock.call("Copying repo data for reuse: %s" % old_repo_dir),
],
)
self.assertEqual(old_repo_dir, self.pkgset.reuse)
self.assertEqual(self.pkgset.file_cache, self.pkgset.old_file_cache)
@mock.patch("kobo.pkgset.FileCache", new=MockFileCache) @mock.patch("kobo.pkgset.FileCache", new=MockFileCache)
class TestMergePackageSets(PkgsetCompareMixin, unittest.TestCase): class TestMergePackageSets(PkgsetCompareMixin, unittest.TestCase):
def test_merge_in_another_arch(self): def test_merge_in_another_arch(self):

View File

@ -15,7 +15,7 @@ except ImportError:
import unittest import unittest
from unittest import mock from unittest import mock
from pungi.phases.pkgset.sources import source_koji from pungi.phases.pkgset.sources import source_koji, source_kojimock
from tests import helpers from tests import helpers
from pungi.module_util import Modulemd from pungi.module_util import Modulemd
@ -687,11 +687,171 @@ class MockModule(object):
return self.path == other.path return self.path == other.path
# TODO: multiarch support was removed from modules
# and will be added by https://cloudlinux.atlassian.net/browse/LNX-108
@mock.patch("pungi.module_util.Modulemd.ModuleStream.read_file", new=MockModule) @mock.patch("pungi.module_util.Modulemd.ModuleStream.read_file", new=MockModule)
@unittest.skipIf(Modulemd is None, "Skipping tests, no module support") @unittest.skipIf(Modulemd is None, "Skipping tests, no module support")
class TestAddModuleToVariant(helpers.PungiTestCase): class TestAddModuleToVariant(helpers.PungiTestCase):
def setUp(self):
super(TestAddModuleToVariant, self).setUp()
self.koji = mock.Mock()
self.koji.koji_module.pathinfo.typedir.return_value = "/koji"
files = ["modulemd.x86_64.txt", "modulemd.armv7hl.txt", "modulemd.txt"]
self.koji.koji_proxy.listArchives.return_value = [
{"btype": "module", "filename": fname} for fname in files
] + [{"btype": "foo"}]
self.buildinfo = {
"id": 1234,
"extra": {
"typeinfo": {
"module": {
"name": "module",
"stream": "master",
"version": "20190318",
"context": "abcdef",
},
},
},
}
def test_adding_module(self):
variant = mock.Mock(arches=["armhfp", "x86_64"], arch_mmds={}, modules=[])
source_koji._add_module_to_variant(self.koji, variant, self.buildinfo)
self.assertEqual(
variant.arch_mmds,
{
"armhfp": {
"module:master:20190318:abcdef": MockModule(
"/koji/modulemd.armv7hl.txt"
),
},
"x86_64": {
"module:master:20190318:abcdef": MockModule(
"/koji/modulemd.x86_64.txt"
),
},
},
)
self.assertEqual(variant.modules, [])
def test_adding_module_to_existing(self):
variant = mock.Mock(
arches=["armhfp", "x86_64"],
arch_mmds={
"x86_64": {"m1:latest:20190101:cafe": MockModule("/koji/m1.x86_64.txt")}
},
modules=[{"name": "m1:latest-20190101:cafe", "glob": False}],
)
source_koji._add_module_to_variant(self.koji, variant, self.buildinfo)
self.assertEqual(
variant.arch_mmds,
{
"armhfp": {
"module:master:20190318:abcdef": MockModule(
"/koji/modulemd.armv7hl.txt"
),
},
"x86_64": {
"module:master:20190318:abcdef": MockModule(
"/koji/modulemd.x86_64.txt"
),
"m1:latest:20190101:cafe": MockModule("/koji/m1.x86_64.txt"),
},
},
)
self.assertEqual(
variant.modules, [{"name": "m1:latest-20190101:cafe", "glob": False}]
)
def test_adding_module_with_add_module(self):
variant = mock.Mock(arches=["armhfp", "x86_64"], arch_mmds={}, modules=[])
source_koji._add_module_to_variant(
self.koji, variant, self.buildinfo, add_to_variant_modules=True
)
self.assertEqual(
variant.arch_mmds,
{
"armhfp": {
"module:master:20190318:abcdef": MockModule(
"/koji/modulemd.armv7hl.txt"
),
},
"x86_64": {
"module:master:20190318:abcdef": MockModule(
"/koji/modulemd.x86_64.txt"
),
},
},
)
self.assertEqual(
variant.modules, [{"name": "module:master:20190318:abcdef", "glob": False}]
)
def test_adding_module_to_existing_with_add_module(self):
variant = mock.Mock(
arches=["armhfp", "x86_64"],
arch_mmds={
"x86_64": {"m1:latest:20190101:cafe": MockModule("/koji/m1.x86_64.txt")}
},
modules=[{"name": "m1:latest-20190101:cafe", "glob": False}],
)
source_koji._add_module_to_variant(
self.koji, variant, self.buildinfo, add_to_variant_modules=True
)
self.assertEqual(
variant.arch_mmds,
{
"armhfp": {
"module:master:20190318:abcdef": MockModule(
"/koji/modulemd.armv7hl.txt"
),
},
"x86_64": {
"module:master:20190318:abcdef": MockModule(
"/koji/modulemd.x86_64.txt"
),
"m1:latest:20190101:cafe": MockModule("/koji/m1.x86_64.txt"),
},
},
)
self.assertEqual(
variant.modules,
[
{"name": "m1:latest-20190101:cafe", "glob": False},
{"name": "module:master:20190318:abcdef", "glob": False},
],
)
def test_adding_module_but_filtered(self):
compose = helpers.DummyCompose(
self.topdir, {"filter_modules": [(".*", {"*": ["module:*"]})]}
)
variant = mock.Mock(
arches=["armhfp", "x86_64"], arch_mmds={}, modules=[], uid="Variant"
)
nsvc = source_koji._add_module_to_variant(
self.koji,
variant,
self.buildinfo,
add_to_variant_modules=True,
compose=compose,
)
self.assertIsNone(nsvc)
self.assertEqual(variant.arch_mmds, {})
self.assertEqual(variant.modules, [])
@mock.patch("pungi.module_util.Modulemd.ModuleStream.read_file", new=MockModule)
@unittest.skipIf(Modulemd is None, "Skipping tests, no module support")
class TestAddModuleToVariantForKojiMock(helpers.PungiTestCase):
def setUp(self): def setUp(self):
super(TestAddModuleToVariant, self).setUp() super(TestAddModuleToVariant, self).setUp()
self.koji = mock.Mock() self.koji = mock.Mock()
@ -1084,7 +1244,7 @@ class TestSourceKoji(unittest.TestCase):
filtered_modules: List[Tuple[AnyStr, Dict[AnyStr, List[AnyStr]]]], filtered_modules: List[Tuple[AnyStr, Dict[AnyStr, List[AnyStr]]]],
expected_result: Set[AnyStr], expected_result: Set[AnyStr],
) -> None: ) -> None:
real_result = source_koji._filter_expected_modules( real_result = source_kojimock._filter_expected_modules(
variant_name=variant_name, variant_name=variant_name,
variant_arches=variant_arches, variant_arches=variant_arches,
expected_modules=expected_modules, expected_modules=expected_modules,