3cf16eb42d
Fixes: https://pagure.io/pungi/issue/1045 JIRA: COMPOSE-2946 Signed-off-by: Haibo Lin <hlin@redhat.com>
282 lines
9.1 KiB
Python
282 lines
9.1 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
import difflib
|
|
import errno
|
|
import imp
|
|
import os
|
|
import shutil
|
|
import tempfile
|
|
from collections import defaultdict
|
|
|
|
import mock
|
|
import six
|
|
from kobo.rpmlib import parse_nvr
|
|
|
|
try:
|
|
import unittest2 as unittest
|
|
except ImportError:
|
|
import unittest
|
|
|
|
from pungi.util import get_arch_variant_data
|
|
from pungi import paths, checks
|
|
from pungi.module_util import Modulemd
|
|
|
|
|
|
class BaseTestCase(unittest.TestCase):
|
|
|
|
def assertFilesEqual(self, fn1, fn2):
|
|
with open(fn1, 'rb') as f1:
|
|
lines1 = f1.read().decode('utf-8').splitlines()
|
|
with open(fn2, 'rb') as f2:
|
|
lines2 = f2.read().decode('utf-8').splitlines()
|
|
diff = '\n'.join(difflib.unified_diff(lines1, lines2,
|
|
fromfile='EXPECTED', tofile='ACTUAL'))
|
|
self.assertEqual(diff, '', 'Files differ:\n' + diff)
|
|
|
|
def assertFileContent(self, fn, expected):
|
|
with open(fn, 'rb') as f:
|
|
lines = f.read().decode('utf-8').splitlines()
|
|
diff = '\n'.join(difflib.unified_diff(
|
|
lines, expected.splitlines(), fromfile='EXPECTED', tofile='ACTUAL')
|
|
)
|
|
self.assertEqual(diff, '', 'Files differ:\n' + diff)
|
|
|
|
|
|
class PungiTestCase(BaseTestCase):
|
|
def setUp(self):
|
|
self.topdir = tempfile.mkdtemp()
|
|
|
|
def tearDown(self):
|
|
try:
|
|
shutil.rmtree(self.topdir)
|
|
except OSError as err:
|
|
if err.errno != errno.ENOENT:
|
|
raise
|
|
|
|
def assertValidConfig(self, conf):
|
|
self.assertEqual(checks.validate(conf, offline=True), ([], []))
|
|
|
|
def _make_pkgset_phase(self, names):
|
|
pkgsets = []
|
|
for name in names:
|
|
pkgset = mock.Mock(paths={})
|
|
pkgset.name = name
|
|
for arch in ("x86_64", "amd64"):
|
|
pkgset.paths[arch] = os.path.join(
|
|
self.topdir, "work", arch, "repo", name
|
|
)
|
|
pkgsets.append(pkgset)
|
|
return mock.Mock(package_sets=pkgsets)
|
|
|
|
|
|
class MockVariant(mock.Mock):
|
|
def __init__(self, is_empty=False, name=None, *args, **kwargs):
|
|
super(MockVariant, self).__init__(*args, is_empty=is_empty, **kwargs)
|
|
self.parent = kwargs.get('parent', None)
|
|
self.arch_mmds = {}
|
|
self.module_uid_to_koji_tag = {}
|
|
self.variants = {}
|
|
self.pkgsets = set()
|
|
self.modules = None
|
|
self.name = name
|
|
self.nsvc_to_pkgset = defaultdict(lambda: mock.Mock(rpms_by_arch={}))
|
|
|
|
def __str__(self):
|
|
return self.uid
|
|
|
|
def get_variants(self, arch=None, types=None):
|
|
return [v for v in list(self.variants.values())
|
|
if (not arch or arch in v.arches) and (not types or v.type in types)]
|
|
|
|
def get_modules(self, arch=None, types=None):
|
|
return []
|
|
|
|
def get_modular_koji_tags(self, arch=None, types=None):
|
|
return []
|
|
|
|
def add_fake_module(self, nsvc, rpm_nvrs=None, with_artifacts=False, mmd_arch=None):
|
|
if not Modulemd:
|
|
# No support for modules
|
|
return
|
|
name, stream, version, context = nsvc.split(":")
|
|
module_stream = Modulemd.ModuleStreamV2.new(name, stream)
|
|
module_stream.set_version(int(version))
|
|
module_stream.set_context(context)
|
|
module_stream.set_summary("foo")
|
|
module_stream.set_description("foo")
|
|
module_stream.add_module_license("GPL")
|
|
|
|
if rpm_nvrs:
|
|
for rpm_nvr in rpm_nvrs:
|
|
rpm_name = parse_nvr(rpm_nvr)["name"]
|
|
component = Modulemd.ComponentRpm.new(rpm_nvr)
|
|
component.set_name(rpm_name)
|
|
component.set_rationale("Needed for test")
|
|
module_stream.add_component(component)
|
|
if with_artifacts:
|
|
module_stream.add_rpm_artifact(rpm_nvr)
|
|
|
|
if self.modules is None:
|
|
self.modules = []
|
|
self.modules.append(":".join([name, stream, version]))
|
|
if mmd_arch:
|
|
nsvc = module_stream.get_nsvc()
|
|
self.arch_mmds.setdefault(mmd_arch, {})[nsvc] = module_stream
|
|
return module_stream
|
|
|
|
|
|
class IterableMock(mock.Mock):
|
|
def __iter__(self):
|
|
return iter([])
|
|
|
|
|
|
class DummyCompose(object):
|
|
def __init__(self, topdir, config):
|
|
self.supported = True
|
|
self.compose_date = '20151203'
|
|
self.compose_type_suffix = '.t'
|
|
self.compose_type = 'test'
|
|
self.compose_respin = 0
|
|
self.compose_id = 'Test-20151203.0.t'
|
|
self.compose_label = None
|
|
self.compose_label_major_version = None
|
|
self.image_release = '20151203.t.0'
|
|
self.image_version = '25'
|
|
self.ci_base = mock.Mock(
|
|
release_id='Test-1.0',
|
|
release=mock.Mock(
|
|
short='test',
|
|
version='1.0',
|
|
is_layered=False,
|
|
type_suffix=''
|
|
),
|
|
)
|
|
self.topdir = topdir
|
|
self.conf = load_config(PKGSET_REPOS, **config)
|
|
checks.validate(self.conf, offline=True)
|
|
self.paths = paths.Paths(self)
|
|
self.has_comps = True
|
|
self.variants = {
|
|
'Server': MockVariant(uid='Server', arches=['x86_64', 'amd64'],
|
|
type='variant', id='Server', name='Server'),
|
|
'Client': MockVariant(uid='Client', arches=['amd64'],
|
|
type='variant', id='Client', name='Client'),
|
|
'Everything': MockVariant(uid='Everything', arches=['x86_64', 'amd64'],
|
|
type='variant', id='Everything', name='Everything'),
|
|
}
|
|
self.all_variants = self.variants.copy()
|
|
|
|
# for PhaseLoggerMixin
|
|
self._logger = mock.Mock(name="compose._logger")
|
|
self._logger.handlers = [mock.Mock()]
|
|
|
|
self.log_info = mock.Mock()
|
|
self.log_error = mock.Mock()
|
|
self.log_debug = mock.Mock()
|
|
self.log_warning = mock.Mock()
|
|
self.get_image_name = mock.Mock(return_value='image-name')
|
|
self.image = mock.Mock(
|
|
path='Client/i386/iso/image.iso', can_fail=False, size=123, _max_size=None,
|
|
)
|
|
self.im = mock.Mock(images={'Client': {'amd64': [self.image]}})
|
|
self.old_composes = []
|
|
self.config_dir = '/home/releng/config'
|
|
self.notifier = None
|
|
self.attempt_deliverable = mock.Mock()
|
|
self.fail_deliverable = mock.Mock()
|
|
self.require_deliverable = mock.Mock()
|
|
self.should_create_yum_database = True
|
|
self.cache_region = None
|
|
|
|
def setup_optional(self):
|
|
self.all_variants['Server-optional'] = MockVariant(
|
|
uid='Server-optional', arches=['x86_64'], type='optional')
|
|
self.all_variants['Server-optional'].parent = self.variants['Server']
|
|
self.variants['Server'].variants['optional'] = self.all_variants['Server-optional']
|
|
|
|
def setup_addon(self):
|
|
self.all_variants['Server-HA'] = MockVariant(
|
|
uid='Server-HA', arches=['x86_64'], type='addon', is_empty=False)
|
|
self.all_variants['Server-HA'].parent = self.variants['Server']
|
|
self.variants['Server'].variants['HA'] = self.all_variants['Server-HA']
|
|
|
|
def get_variants(self, arch=None, types=None):
|
|
return [v for v in list(self.all_variants.values())
|
|
if (not arch or arch in v.arches) and (not types or v.type in types)]
|
|
|
|
def can_fail(self, variant, arch, deliverable):
|
|
failable = get_arch_variant_data(self.conf, 'failable_deliverables', arch, variant)
|
|
return deliverable in failable
|
|
|
|
def get_arches(self):
|
|
result = set()
|
|
for variant in list(self.variants.values()):
|
|
result |= set(variant.arches)
|
|
return sorted(result)
|
|
|
|
def mkdtemp(self, suffix="", prefix="tmp"):
|
|
return tempfile.mkdtemp(suffix=suffix, prefix=prefix, dir=self.topdir)
|
|
|
|
|
|
def touch(path, content=None):
|
|
"""Helper utility that creates an dummy file in given location. Directories
|
|
will be created."""
|
|
content = content or (path + '\n')
|
|
try:
|
|
os.makedirs(os.path.dirname(path))
|
|
except OSError:
|
|
pass
|
|
if not isinstance(content, six.binary_type):
|
|
content = content.encode()
|
|
with open(path, 'wb') as f:
|
|
f.write(content)
|
|
return path
|
|
|
|
|
|
FIXTURE_DIR = os.path.join(os.path.dirname(__file__), 'fixtures')
|
|
|
|
|
|
def copy_fixture(fixture_name, dest):
|
|
src = os.path.join(FIXTURE_DIR, fixture_name)
|
|
touch(dest)
|
|
shutil.copy2(src, dest)
|
|
|
|
|
|
def boom(*args, **kwargs):
|
|
raise Exception('BOOM')
|
|
|
|
|
|
def mk_boom(cls=Exception, msg='BOOM'):
|
|
def b(*args, **kwargs):
|
|
raise cls(msg)
|
|
return b
|
|
|
|
|
|
PKGSET_REPOS = dict(
|
|
pkgset_source='repos',
|
|
pkgset_repos={},
|
|
)
|
|
|
|
BASE_CONFIG = dict(
|
|
release_short='test',
|
|
release_name='Test',
|
|
release_version='1.0',
|
|
variants_file='variants.xml',
|
|
createrepo_checksum='sha256',
|
|
gather_method='deps',
|
|
)
|
|
|
|
|
|
def load_config(data={}, **kwargs):
|
|
conf = dict()
|
|
conf.update(BASE_CONFIG)
|
|
conf.update(data)
|
|
conf.update(kwargs)
|
|
return conf
|
|
|
|
|
|
def fake_run_in_threads(func, params, threads=None):
|
|
"""Like run_in_threads from Kobo, but actually runs tasks serially."""
|
|
for num, param in enumerate(params):
|
|
func(None, param, num)
|