pungi/tests/test_pkgset_pkgsets.py
Haibo Lin 3eddcfccd8 Fix flake8 complaints - F401
F401 'dnf' imported but unused
F401 'imp' imported but unused
F401 'os' imported but unused
F401 'subprocess' imported but unused
F401 'sys' imported but unused
F401 'yum' imported but unused

JIRA: COMPOSE-4108
Signed-off-by: Haibo Lin <hlin@redhat.com>
2020-02-07 11:48:31 +08:00

697 lines
22 KiB
Python

# -*- coding: utf-8 -*-
import mock
import os
import six
try:
import unittest2 as unittest
except ImportError:
import unittest
import json
import tempfile
import re
from dogpile.cache import make_region
from pungi.phases.pkgset import pkgsets
from tests import helpers
class MockPathInfo(object):
def __init__(self, topdir):
self.topdir = topdir
def build(self, build_info):
return self.topdir
def get_filename(self, rpm_info):
return "{name}@{version}@{release}@{arch}".format(**rpm_info)
def signed(self, rpm_info, sigkey):
return os.path.join("signed", sigkey, self.get_filename(rpm_info))
def rpm(self, rpm_info):
return os.path.join("rpms", self.get_filename(rpm_info))
class MockFile(object):
def __init__(self, path):
if path.startswith("/tmp"):
# Drop /tmp/something/ from path
path = path.split("/", 3)[-1]
self.file_path = path
self.file_name = os.path.basename(path)
self.name, self.version, self.release, self.arch = self.file_name.split("@")
self.sourcerpm = "{0.name}-{0.version}-{0.release}.{0.arch}".format(self)
self.exclusivearch = []
self.excludearch = []
def __hash__(self):
return hash(self.file_path)
def __repr__(self):
return self.file_path
def __eq__(self, other):
try:
return self.file_path == other.file_path
except AttributeError:
return self.file_path == other
def __le__(self, other):
try:
return self.file_path < other.file_path
except AttributeError:
return self.file_path < other
def __lt__(self, other):
return self <= other and self != other
def __ge__(self, other):
return not (self <= other) or self == other
def __gt__(self, other):
return not (self <= other)
class MockFileCache(dict):
"""Mock for kobo.pkgset.FileCache.
It gets data from filename and does not touch filesystem.
"""
def __init__(self, _wrapper):
super(MockFileCache, self).__init__()
self.file_cache = self
def add(self, file_path):
obj = MockFile(file_path)
self[file_path] = obj
return obj
class FakePool(object):
"""This class will be substituted for ReaderPool.
It implements the same interface, but uses only the last added worker to
process all tasks sequentially.
"""
def __init__(self, package_set, logger=None):
self.queue = []
self.worker = None
self.package_set = package_set
def log_warning(self, *args, **kwargs):
pass
@property
def queue_total(self):
return len(self.queue)
def queue_put(self, item):
self.queue.append(item)
def add(self, worker):
self.worker = worker
def start(self):
for i, item in enumerate(self.queue):
self.worker.process(item, i)
def stop(self):
pass
class PkgsetCompareMixin(object):
def assertPkgsetEqual(self, actual, expected):
for k, v1 in expected.items():
self.assertIn(k, actual)
v2 = actual.pop(k)
six.assertCountEqual(self, v1, v2)
self.assertEqual({}, actual)
@mock.patch("pungi.phases.pkgset.pkgsets.ReaderPool", new=FakePool)
@mock.patch("kobo.pkgset.FileCache", new=MockFileCache)
class TestKojiPkgset(PkgsetCompareMixin, helpers.PungiTestCase):
def setUp(self):
super(TestKojiPkgset, self).setUp()
with open(os.path.join(helpers.FIXTURE_DIR, "tagged-rpms.json")) as f:
self.tagged_rpms = json.load(f)
self.path_info = MockPathInfo(self.topdir)
self.koji_wrapper = mock.Mock()
self.koji_wrapper.koji_proxy.listTaggedRPMS.return_value = self.tagged_rpms
self.koji_wrapper.koji_module.pathinfo = self.path_info
def _touch_files(self, filenames):
for filename in filenames:
helpers.touch(os.path.join(self.topdir, filename))
def assertPkgsetEqual(self, actual, expected):
for k, v1 in expected.items():
self.assertIn(k, actual)
v2 = actual.pop(k)
six.assertCountEqual(self, v1, v2)
self.assertEqual({}, actual, msg="Some architectures were missing")
def test_all_arches(self):
self._touch_files(
[
"rpms/pungi@4.1.3@3.fc25@noarch",
"rpms/pungi@4.1.3@3.fc25@src",
"rpms/bash@4.3.42@4.fc24@i686",
"rpms/bash@4.3.42@4.fc24@x86_64",
"rpms/bash@4.3.42@4.fc24@src",
"rpms/bash-debuginfo@4.3.42@4.fc24@i686",
"rpms/bash-debuginfo@4.3.42@4.fc24@x86_64",
]
)
pkgset = pkgsets.KojiPackageSet("pkgset", self.koji_wrapper, [None])
result = pkgset.populate("f25")
self.assertEqual(
self.koji_wrapper.koji_proxy.mock_calls,
[mock.call.listTaggedRPMS("f25", event=None, inherit=True, latest=True)],
)
self.assertPkgsetEqual(
result,
{
"src": ["rpms/pungi@4.1.3@3.fc25@src", "rpms/bash@4.3.42@4.fc24@src"],
"noarch": ["rpms/pungi@4.1.3@3.fc25@noarch"],
"i686": [
"rpms/bash@4.3.42@4.fc24@i686",
"rpms/bash-debuginfo@4.3.42@4.fc24@i686",
],
"x86_64": [
"rpms/bash@4.3.42@4.fc24@x86_64",
"rpms/bash-debuginfo@4.3.42@4.fc24@x86_64",
],
},
)
def test_only_one_arch(self):
self._touch_files(
[
"rpms/bash@4.3.42@4.fc24@x86_64",
"rpms/bash-debuginfo@4.3.42@4.fc24@x86_64",
]
)
pkgset = pkgsets.KojiPackageSet(
"pkgset", self.koji_wrapper, [None], arches=["x86_64"]
)
result = pkgset.populate("f25")
self.assertEqual(
self.koji_wrapper.koji_proxy.mock_calls,
[mock.call.listTaggedRPMS("f25", event=None, inherit=True, latest=True)],
)
self.assertPkgsetEqual(
result,
{
"x86_64": [
"rpms/bash-debuginfo@4.3.42@4.fc24@x86_64",
"rpms/bash@4.3.42@4.fc24@x86_64",
]
},
)
def test_find_signed_with_preference(self):
self._touch_files(
[
"signed/cafebabe/bash@4.3.42@4.fc24@x86_64",
"signed/deadbeef/bash@4.3.42@4.fc24@x86_64",
"signed/deadbeef/bash-debuginfo@4.3.42@4.fc24@x86_64",
]
)
pkgset = pkgsets.KojiPackageSet(
"pkgset", self.koji_wrapper, ["cafebabe", "deadbeef"], arches=["x86_64"]
)
result = pkgset.populate("f25")
self.assertEqual(
self.koji_wrapper.koji_proxy.mock_calls,
[mock.call.listTaggedRPMS("f25", event=None, inherit=True, latest=True)],
)
self.assertPkgsetEqual(
result,
{
"x86_64": [
"signed/cafebabe/bash@4.3.42@4.fc24@x86_64",
"signed/deadbeef/bash-debuginfo@4.3.42@4.fc24@x86_64",
]
},
)
def test_find_signed_fallback_unsigned(self):
self._touch_files(
[
"signed/cafebabe/bash@4.3.42@4.fc24@x86_64",
"rpms/bash-debuginfo@4.3.42@4.fc24@x86_64",
]
)
pkgset = pkgsets.KojiPackageSet(
"pkgset", self.koji_wrapper, ["cafebabe", None], arches=["x86_64"]
)
result = pkgset.populate("f25")
self.assertEqual(
self.koji_wrapper.koji_proxy.mock_calls,
[mock.call.listTaggedRPMS("f25", event=None, inherit=True, latest=True)],
)
self.assertPkgsetEqual(
result,
{
"x86_64": [
"rpms/bash-debuginfo@4.3.42@4.fc24@x86_64",
"signed/cafebabe/bash@4.3.42@4.fc24@x86_64",
]
},
)
def test_can_not_find_signed_package(self):
pkgset = pkgsets.KojiPackageSet(
"pkgset", self.koji_wrapper, ["cafebabe"], arches=["x86_64"]
)
with self.assertRaises(RuntimeError) as ctx:
pkgset.populate("f25")
self.assertEqual(
self.koji_wrapper.koji_proxy.mock_calls,
[mock.call.listTaggedRPMS("f25", event=None, inherit=True, latest=True)],
)
figure = re.compile(
r"^RPM\(s\) not found for sigs: .+Check log for details.+bash-4\.3\.42-4\.fc24.+bash-debuginfo-4\.3\.42-4\.fc24$",
re.DOTALL,
)
self.assertRegexpMatches(str(ctx.exception), figure)
def test_can_not_find_signed_package_allow_invalid_sigkeys(self):
pkgset = pkgsets.KojiPackageSet(
"pkgset",
self.koji_wrapper,
["cafebabe"],
arches=["x86_64"],
allow_invalid_sigkeys=True,
)
pkgset.populate("f25")
self.assertEqual(
self.koji_wrapper.koji_proxy.mock_calls,
[mock.call.listTaggedRPMS("f25", event=None, inherit=True, latest=True)],
)
with self.assertRaises(RuntimeError) as ctx:
pkgset.raise_invalid_sigkeys_exception(pkgset.invalid_sigkey_rpms)
figure = re.compile(
r"^RPM\(s\) not found for sigs: .+Check log for details.+bash-4\.3\.42-4\.fc24.+bash-debuginfo-4\.3\.42-4\.fc24$",
re.DOTALL,
)
self.assertRegexpMatches(str(ctx.exception), figure)
def test_can_not_find_any_package(self):
pkgset = pkgsets.KojiPackageSet(
"pkgset", self.koji_wrapper, ["cafebabe", None], arches=["x86_64"]
)
with self.assertRaises(RuntimeError) as ctx:
pkgset.populate("f25")
self.assertEqual(
self.koji_wrapper.koji_proxy.mock_calls,
[mock.call.listTaggedRPMS("f25", event=None, inherit=True, latest=True)],
)
self.assertRegexpMatches(
str(ctx.exception),
r"^RPM\(s\) not found for sigs: .+Check log for details.+",
)
def test_packages_attribute(self):
self._touch_files(
[
"rpms/pungi@4.1.3@3.fc25@noarch",
"rpms/pungi@4.1.3@3.fc25@src",
"rpms/bash@4.3.42@4.fc24@i686",
"rpms/bash@4.3.42@4.fc24@x86_64",
"rpms/bash@4.3.42@4.fc24@src",
"rpms/bash-debuginfo@4.3.42@4.fc24@i686",
"rpms/bash-debuginfo@4.3.42@4.fc24@x86_64",
]
)
pkgset = pkgsets.KojiPackageSet(
"pkgset",
self.koji_wrapper,
[None],
packages=["bash"],
populate_only_packages=True,
)
result = pkgset.populate("f25")
self.assertEqual(
self.koji_wrapper.koji_proxy.mock_calls,
[mock.call.listTaggedRPMS("f25", event=None, inherit=True, latest=True)],
)
self.assertPkgsetEqual(
result,
{
"src": ["rpms/bash@4.3.42@4.fc24@src"],
"i686": ["rpms/bash@4.3.42@4.fc24@i686"],
"x86_64": ["rpms/bash@4.3.42@4.fc24@x86_64"],
},
)
def test_get_latest_rpms_cache(self):
self._touch_files(
[
"rpms/bash@4.3.42@4.fc24@x86_64",
"rpms/bash-debuginfo@4.3.42@4.fc24@x86_64",
]
)
cache_region = make_region().configure("dogpile.cache.memory")
pkgset = pkgsets.KojiPackageSet(
"pkgset",
self.koji_wrapper,
[None],
arches=["x86_64"],
cache_region=cache_region,
)
# Try calling the populate twice, but expect just single listTaggedRPMs
# call - that means the caching worked.
for i in range(2):
result = pkgset.populate("f25")
self.assertEqual(
self.koji_wrapper.koji_proxy.mock_calls,
[
mock.call.listTaggedRPMS(
"f25", event=None, inherit=True, latest=True
)
],
)
self.assertPkgsetEqual(
result,
{
"x86_64": [
"rpms/bash-debuginfo@4.3.42@4.fc24@x86_64",
"rpms/bash@4.3.42@4.fc24@x86_64",
]
},
)
def test_get_latest_rpms_cache_different_id(self):
self._touch_files(
[
"rpms/bash@4.3.42@4.fc24@x86_64",
"rpms/bash-debuginfo@4.3.42@4.fc24@x86_64",
]
)
cache_region = make_region().configure("dogpile.cache.memory")
pkgset = pkgsets.KojiPackageSet(
"pkgset",
self.koji_wrapper,
[None],
arches=["x86_64"],
cache_region=cache_region,
)
# Try calling the populate twice with different event id. It must not
# cache anything.
expected_calls = []
for i in range(2):
expected_calls.append(
mock.call.listTaggedRPMS("f25", event=i, inherit=True, latest=True)
)
result = pkgset.populate("f25", event={"id": i})
self.assertEqual(self.koji_wrapper.koji_proxy.mock_calls, expected_calls)
self.assertPkgsetEqual(
result,
{
"x86_64": [
"rpms/bash-debuginfo@4.3.42@4.fc24@x86_64",
"rpms/bash@4.3.42@4.fc24@x86_64",
]
},
)
def test_extra_builds_attribute(self):
self._touch_files(
[
"rpms/pungi@4.1.3@3.fc25@noarch",
"rpms/pungi@4.1.3@3.fc25@src",
"rpms/bash@4.3.42@4.fc24@i686",
"rpms/bash@4.3.42@4.fc24@x86_64",
"rpms/bash@4.3.42@4.fc24@src",
"rpms/bash-debuginfo@4.3.42@4.fc24@i686",
"rpms/bash-debuginfo@4.3.42@4.fc24@x86_64",
]
)
# Return "pungi" RPMs and builds using "get_latest_rpms" which gets
# them from Koji multiCall.
extra_rpms = [rpm for rpm in self.tagged_rpms[0] if rpm["name"] == "pungi"]
extra_builds = [
build for build in self.tagged_rpms[1] if build["package_name"] == "pungi"
]
self.koji_wrapper.retrying_multicall_map.side_effect = [
extra_builds,
[extra_rpms],
]
# Do not return "pungi" RPMs and builds using the listTaggedRPMs, so
# we can be sure "pungi" gets into compose using the `extra_builds`.
self.koji_wrapper.koji_proxy.listTaggedRPMS.return_value = [
[rpm for rpm in self.tagged_rpms[0] if rpm["name"] != "pungi"],
[b for b in self.tagged_rpms[1] if b["package_name"] != "pungi"],
]
pkgset = pkgsets.KojiPackageSet(
"pkgset", self.koji_wrapper, [None], extra_builds=["pungi-4.1.3-3.fc25"]
)
result = pkgset.populate("f25")
self.assertEqual(
self.koji_wrapper.koji_proxy.mock_calls,
[mock.call.listTaggedRPMS("f25", event=None, inherit=True, latest=True)],
)
self.assertPkgsetEqual(
result,
{
"src": ["rpms/pungi@4.1.3@3.fc25@src", "rpms/bash@4.3.42@4.fc24@src"],
"noarch": ["rpms/pungi@4.1.3@3.fc25@noarch"],
"i686": [
"rpms/bash@4.3.42@4.fc24@i686",
"rpms/bash-debuginfo@4.3.42@4.fc24@i686",
],
"x86_64": [
"rpms/bash@4.3.42@4.fc24@x86_64",
"rpms/bash-debuginfo@4.3.42@4.fc24@x86_64",
],
},
)
@mock.patch("kobo.pkgset.FileCache", new=MockFileCache)
class TestMergePackageSets(PkgsetCompareMixin, unittest.TestCase):
def test_merge_in_another_arch(self):
first = pkgsets.PackageSetBase("first", [None])
second = pkgsets.PackageSetBase("second", [None])
for name in ["rpms/pungi@4.1.3@3.fc25@noarch", "rpms/pungi@4.1.3@3.fc25@src"]:
pkg = first.file_cache.add(name)
first.rpms_by_arch.setdefault(pkg.arch, []).append(pkg)
for name in ["rpms/bash@4.3.42@4.fc24@i686"]:
pkg = second.file_cache.add(name)
second.rpms_by_arch.setdefault(pkg.arch, []).append(pkg)
first.merge(second, "i386", ["i686"])
self.assertPkgsetEqual(
first.rpms_by_arch,
{
"src": ["rpms/pungi@4.1.3@3.fc25@src"],
"noarch": ["rpms/pungi@4.1.3@3.fc25@noarch"],
"i686": ["rpms/bash@4.3.42@4.fc24@i686"],
},
)
def test_merge_includes_noarch_with_different_exclude_arch(self):
first = pkgsets.PackageSetBase("first", [None])
second = pkgsets.PackageSetBase("second", [None])
pkg = first.file_cache.add("rpms/bash@4.3.42@4.fc24@i686")
first.rpms_by_arch.setdefault(pkg.arch, []).append(pkg)
pkg = second.file_cache.add("rpms/pungi@4.1.3@3.fc25@noarch")
pkg.excludearch = ["x86_64"]
second.rpms_by_arch.setdefault(pkg.arch, []).append(pkg)
first.merge(second, "i386", ["i686", "noarch"])
self.assertPkgsetEqual(
first.rpms_by_arch,
{
"i686": ["rpms/bash@4.3.42@4.fc24@i686"],
"noarch": ["rpms/pungi@4.1.3@3.fc25@noarch"],
},
)
def test_merge_excludes_noarch_exclude_arch(self):
first = pkgsets.PackageSetBase("first", [None])
second = pkgsets.PackageSetBase("second", [None])
pkg = first.file_cache.add("rpms/bash@4.3.42@4.fc24@i686")
first.rpms_by_arch.setdefault(pkg.arch, []).append(pkg)
pkg = second.file_cache.add("rpms/pungi@4.1.3@3.fc25@noarch")
pkg.excludearch = ["i686"]
second.rpms_by_arch.setdefault(pkg.arch, []).append(pkg)
first.merge(second, "i386", ["i686", "noarch"])
self.assertPkgsetEqual(
first.rpms_by_arch, {"i686": ["rpms/bash@4.3.42@4.fc24@i686"], "noarch": []}
)
def test_merge_excludes_noarch_exclusive_arch(self):
first = pkgsets.PackageSetBase("first", [None])
second = pkgsets.PackageSetBase("second", [None])
pkg = first.file_cache.add("rpms/bash@4.3.42@4.fc24@i686")
first.rpms_by_arch.setdefault(pkg.arch, []).append(pkg)
pkg = second.file_cache.add("rpms/pungi@4.1.3@3.fc25@noarch")
pkg.exclusivearch = ["x86_64"]
second.rpms_by_arch.setdefault(pkg.arch, []).append(pkg)
first.merge(second, "i386", ["i686", "noarch"])
self.assertPkgsetEqual(
first.rpms_by_arch, {"i686": ["rpms/bash@4.3.42@4.fc24@i686"], "noarch": []}
)
def test_merge_includes_noarch_with_same_exclusive_arch(self):
first = pkgsets.PackageSetBase("first", [None])
second = pkgsets.PackageSetBase("second", [None])
pkg = first.file_cache.add("rpms/bash@4.3.42@4.fc24@i686")
first.rpms_by_arch.setdefault(pkg.arch, []).append(pkg)
pkg = second.file_cache.add("rpms/pungi@4.1.3@3.fc25@noarch")
pkg.exclusivearch = ["i686"]
second.rpms_by_arch.setdefault(pkg.arch, []).append(pkg)
first.merge(second, "i386", ["i686", "noarch"])
self.assertPkgsetEqual(
first.rpms_by_arch,
{
"i686": ["rpms/bash@4.3.42@4.fc24@i686"],
"noarch": ["rpms/pungi@4.1.3@3.fc25@noarch"],
},
)
def test_merge_skips_package_in_cache(self):
first = pkgsets.PackageSetBase("first", [None])
second = pkgsets.PackageSetBase("second", [None])
pkg = first.file_cache.add("rpms/bash@4.3.42@4.fc24@i686")
first.rpms_by_arch.setdefault(pkg.arch, []).append(pkg)
pkg = second.file_cache.add("rpms/bash@4.3.42@4.fc24@i686")
second.rpms_by_arch.setdefault(pkg.arch, []).append(pkg)
first.merge(second, "i386", ["i686"])
self.assertPkgsetEqual(
first.rpms_by_arch, {"i686": ["rpms/bash@4.3.42@4.fc24@i686"]}
)
def test_merge_skips_src_without_binary(self):
first = pkgsets.PackageSetBase("first", [None])
second = pkgsets.PackageSetBase("second", [None])
pkg = first.file_cache.add("rpms/bash@4.3.42@4.fc24@i686")
first.rpms_by_arch.setdefault(pkg.arch, []).append(pkg)
pkg = second.file_cache.add("rpms/pungi@4.1.3@3.fc25@src")
second.rpms_by_arch.setdefault(pkg.arch, []).append(pkg)
first.merge(second, "i386", ["i686", "src"])
self.assertPkgsetEqual(
first.rpms_by_arch,
{"i686": ["rpms/bash@4.3.42@4.fc24@i686"], "src": [], "nosrc": []},
)
@mock.patch("kobo.pkgset.FileCache", new=MockFileCache)
class TestSaveFileList(unittest.TestCase):
def setUp(self):
fd, self.tmpfile = tempfile.mkstemp()
os.close(fd)
def tearDown(self):
os.unlink(self.tmpfile)
def test_save_arches_alphabetically(self):
pkgset = pkgsets.PackageSetBase("pkgset", [None])
for name in [
"rpms/pungi@4.1.3@3.fc25@x86_64",
"rpms/pungi@4.1.3@3.fc25@src",
"rpms/pungi@4.1.3@3.fc25@ppc64",
]:
pkg = pkgset.file_cache.add(name)
pkgset.rpms_by_arch.setdefault(pkg.arch, []).append(pkg)
pkgset.save_file_list(self.tmpfile)
with open(self.tmpfile) as f:
rpms = f.read().strip().split("\n")
self.assertEqual(
rpms,
[
"rpms/pungi@4.1.3@3.fc25@ppc64",
"rpms/pungi@4.1.3@3.fc25@src",
"rpms/pungi@4.1.3@3.fc25@x86_64",
],
)
def test_save_strip_prefix(self):
pkgset = pkgsets.PackageSetBase("pkgset", [None])
for name in ["rpms/pungi@4.1.3@3.fc25@noarch", "rpms/pungi@4.1.3@3.fc25@src"]:
pkg = pkgset.file_cache.add(name)
pkgset.rpms_by_arch.setdefault(pkg.arch, []).append(pkg)
pkgset.save_file_list(self.tmpfile, remove_path_prefix="rpms/")
with open(self.tmpfile) as f:
rpms = f.read().strip().split("\n")
six.assertCountEqual(
self, rpms, ["pungi@4.1.3@3.fc25@noarch", "pungi@4.1.3@3.fc25@src"]
)