c87d299a20
When probing lookasides for platform definition, we need to make sure it works for repos specified as HTTP urls. Createrepo doesn't seem to automatically download the repodata, so we have to help it. JIRA: COMPOSE-3958 Signed-off-by: Lubomír Sedlář <lsedlar@redhat.com>
900 lines
37 KiB
Python
900 lines
37 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
import argparse
|
|
import mock
|
|
import os
|
|
import sys
|
|
try:
|
|
import unittest2 as unittest
|
|
except ImportError:
|
|
import unittest
|
|
import tempfile
|
|
import shutil
|
|
import subprocess
|
|
import six
|
|
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
|
|
|
|
from pungi import compose
|
|
from pungi import util
|
|
|
|
from tests.helpers import touch, PungiTestCase, mk_boom
|
|
|
|
|
|
class TestGitRefResolver(unittest.TestCase):
|
|
|
|
@mock.patch('pungi.util.run')
|
|
def test_successful_resolve(self, run):
|
|
run.return_value = (0, 'CAFEBABE\tHEAD\n')
|
|
|
|
url = util.resolve_git_url('https://git.example.com/repo.git?somedir#HEAD')
|
|
|
|
self.assertEqual(url, 'https://git.example.com/repo.git?somedir#CAFEBABE')
|
|
run.assert_called_once_with(['git', 'ls-remote', 'https://git.example.com/repo.git', 'HEAD'],
|
|
universal_newlines=True)
|
|
|
|
@mock.patch('pungi.util.run')
|
|
def test_successful_resolve_branch(self, run):
|
|
run.return_value = (0, 'CAFEBABE\trefs/heads/f24\n')
|
|
|
|
url = util.resolve_git_url('https://git.example.com/repo.git?somedir#origin/f24')
|
|
|
|
self.assertEqual(url, 'https://git.example.com/repo.git?somedir#CAFEBABE')
|
|
run.assert_called_once_with(['git', 'ls-remote', 'https://git.example.com/repo.git', 'refs/heads/f24'],
|
|
universal_newlines=True)
|
|
|
|
def test_resolve_ref_with_commit_id(self):
|
|
ref = util.resolve_git_ref("https://git.example.com/repo.git", "a" * 40)
|
|
self.assertEqual(ref, "a" * 40)
|
|
|
|
@mock.patch('pungi.util.run')
|
|
def test_resolve_ref_multiple_matches(self, run):
|
|
run.return_value = (
|
|
0, "CAFEBABE\trefs/heads/master\nBABECAFE\trefs/remotes/origin/master"
|
|
)
|
|
|
|
ref = util.resolve_git_ref("https://git.example.com/repo.git", "master")
|
|
|
|
self.assertEqual(ref, "CAFEBABE")
|
|
run.assert_called_once_with(
|
|
["git", "ls-remote", "https://git.example.com/repo.git", "master"],
|
|
universal_newlines=True,
|
|
)
|
|
|
|
@mock.patch('pungi.util.run')
|
|
def test_resolve_ref_with_remote_head(self, run):
|
|
run.return_value = (
|
|
0, "CAFEBABE\tHEAD\nBABECAFE\trefs/remotes/origin/HEAD"
|
|
)
|
|
|
|
ref = util.resolve_git_ref("https://git.example.com/repo.git", "HEAD")
|
|
|
|
self.assertEqual(ref, "CAFEBABE")
|
|
run.assert_called_once_with(
|
|
["git", "ls-remote", "https://git.example.com/repo.git", "HEAD"],
|
|
universal_newlines=True,
|
|
)
|
|
|
|
@mock.patch('pungi.util.run')
|
|
def test_resolve_missing_spec(self, run):
|
|
url = util.resolve_git_url('https://git.example.com/repo.git')
|
|
|
|
self.assertEqual(url, 'https://git.example.com/repo.git')
|
|
self.assertEqual(run.mock_calls, [])
|
|
|
|
@mock.patch('pungi.util.run')
|
|
def test_resolve_non_head_spec(self, run):
|
|
url = util.resolve_git_url('https://git.example.com/repo.git#some-tag')
|
|
|
|
self.assertEqual(url, 'https://git.example.com/repo.git#some-tag')
|
|
self.assertEqual(run.mock_calls, [])
|
|
|
|
@mock.patch('pungi.util.run')
|
|
def test_resolve_ambiguous(self, run):
|
|
run.return_value = (0, 'CAFEBABE\tF11\nDEADBEEF\tF10\n')
|
|
|
|
with self.assertRaises(RuntimeError):
|
|
util.resolve_git_url('https://git.example.com/repo.git?somedir#HEAD')
|
|
|
|
run.assert_called_once_with(['git', 'ls-remote', 'https://git.example.com/repo.git', 'HEAD'],
|
|
universal_newlines=True)
|
|
|
|
@mock.patch('pungi.util.run')
|
|
def test_resolve_keep_empty_query_string(self, run):
|
|
run.return_value = (0, 'CAFEBABE\tHEAD\n')
|
|
|
|
url = util.resolve_git_url('https://git.example.com/repo.git?#HEAD')
|
|
|
|
run.assert_called_once_with(['git', 'ls-remote', 'https://git.example.com/repo.git', 'HEAD'],
|
|
universal_newlines=True)
|
|
self.assertEqual(url, 'https://git.example.com/repo.git?#CAFEBABE')
|
|
|
|
@mock.patch('pungi.util.run')
|
|
def test_resolve_strip_git_plus_prefix(self, run):
|
|
run.return_value = (0, 'CAFEBABE\tHEAD\n')
|
|
|
|
url = util.resolve_git_url('git+https://git.example.com/repo.git#HEAD')
|
|
|
|
run.assert_called_once_with(['git', 'ls-remote', 'https://git.example.com/repo.git', 'HEAD'],
|
|
universal_newlines=True)
|
|
self.assertEqual(url, 'git+https://git.example.com/repo.git#CAFEBABE')
|
|
|
|
@mock.patch('pungi.util.run')
|
|
def test_resolve_no_branch_in_remote(self, run):
|
|
run.return_value = (0, '')
|
|
|
|
with self.assertRaises(RuntimeError) as ctx:
|
|
util.resolve_git_url('https://git.example.com/repo.git?somedir#origin/my-branch')
|
|
|
|
run.assert_called_once_with(
|
|
['git', 'ls-remote', 'https://git.example.com/repo.git', 'refs/heads/my-branch'],
|
|
universal_newlines=True)
|
|
self.assertIn('ref does not exist in remote repo', str(ctx.exception))
|
|
|
|
@mock.patch('time.sleep')
|
|
@mock.patch('pungi.util.run')
|
|
def test_retry(self, run, sleep):
|
|
run.side_effect = [RuntimeError('Boom'), (0, 'CAFEBABE\tHEAD\n')]
|
|
|
|
url = util.resolve_git_url('https://git.example.com/repo.git?somedir#HEAD')
|
|
|
|
self.assertEqual(url, 'https://git.example.com/repo.git?somedir#CAFEBABE')
|
|
self.assertEqual(sleep.call_args_list, [mock.call(30)])
|
|
self.assertEqual(run.call_args_list,
|
|
[mock.call(['git', 'ls-remote', 'https://git.example.com/repo.git', 'HEAD'],
|
|
universal_newlines=True)] * 2)
|
|
|
|
@mock.patch("pungi.util.resolve_git_ref")
|
|
@mock.patch("pungi.util.resolve_git_url")
|
|
def test_resolver_offline(self, mock_resolve_url, mock_resolve_ref):
|
|
resolver = util.GitUrlResolver(offline=True)
|
|
self.assertEqual(
|
|
resolver("http://example.com/repo.git#HEAD"),
|
|
"http://example.com/repo.git#HEAD",
|
|
)
|
|
self.assertEqual(mock_resolve_url.call_args_list, [])
|
|
self.assertEqual(mock_resolve_ref.call_args_list, [])
|
|
|
|
@mock.patch("pungi.util.resolve_git_ref")
|
|
@mock.patch("pungi.util.resolve_git_url")
|
|
def test_resolver_offline_branch(self, mock_resolve_url, mock_resolve_ref):
|
|
resolver = util.GitUrlResolver(offline=True)
|
|
self.assertEqual(
|
|
resolver("http://example.com/repo.git", "master"),
|
|
"master",
|
|
)
|
|
self.assertEqual(mock_resolve_url.call_args_list, [])
|
|
self.assertEqual(mock_resolve_ref.call_args_list, [])
|
|
|
|
@mock.patch("pungi.util.resolve_git_ref")
|
|
@mock.patch("pungi.util.resolve_git_url")
|
|
def test_resolver_caches_calls(self, mock_resolve_url, mock_resolve_ref):
|
|
url1 = "http://example.com/repo.git#HEAD"
|
|
url2 = "http://example.com/repo.git#master"
|
|
url3 = "http://example.com/repo.git"
|
|
ref1 = "foo"
|
|
ref2 = "bar"
|
|
mock_resolve_url.side_effect = ["1", "2"]
|
|
mock_resolve_ref.side_effect = ["cafe", "beef"]
|
|
resolver = util.GitUrlResolver()
|
|
self.assertEqual(resolver(url1), "1")
|
|
self.assertEqual(resolver(url1), "1")
|
|
self.assertEqual(resolver(url3, ref1), "cafe")
|
|
self.assertEqual(resolver(url3, ref2), "beef")
|
|
self.assertEqual(resolver(url2), "2")
|
|
self.assertEqual(resolver(url3, ref1), "cafe")
|
|
self.assertEqual(resolver(url1), "1")
|
|
self.assertEqual(resolver(url3, ref2), "beef")
|
|
self.assertEqual(resolver(url2), "2")
|
|
self.assertEqual(resolver(url3, ref2), "beef")
|
|
self.assertEqual(
|
|
mock_resolve_url.call_args_list, [mock.call(url1), mock.call(url2)]
|
|
)
|
|
self.assertEqual(
|
|
mock_resolve_ref.call_args_list,
|
|
[mock.call(url3, ref1), mock.call(url3, ref2)],
|
|
)
|
|
|
|
@mock.patch("pungi.util.resolve_git_url")
|
|
def test_resolver_caches_failure(self, mock_resolve):
|
|
url = "http://example.com/repo.git#HEAD"
|
|
mock_resolve.side_effect = mk_boom(util.GitUrlResolveError, "failed")
|
|
resolver = util.GitUrlResolver()
|
|
with self.assertRaises(util.GitUrlResolveError):
|
|
resolver(url)
|
|
with self.assertRaises(util.GitUrlResolveError):
|
|
resolver(url)
|
|
self.assertEqual(mock_resolve.call_args_list, [mock.call(url)])
|
|
|
|
|
|
class TestGetVariantData(unittest.TestCase):
|
|
def test_get_simple(self):
|
|
conf = {
|
|
'foo': {
|
|
'^Client$': 1
|
|
}
|
|
}
|
|
result = util.get_variant_data(conf, 'foo', mock.Mock(uid='Client'))
|
|
self.assertEqual(result, [1])
|
|
|
|
def test_get_make_list(self):
|
|
conf = {
|
|
'foo': {
|
|
'^Client$': [1, 2],
|
|
'^.*$': 3,
|
|
}
|
|
}
|
|
result = util.get_variant_data(conf, 'foo', mock.Mock(uid='Client'))
|
|
six.assertCountEqual(self, result, [1, 2, 3])
|
|
|
|
def test_not_matching_arch(self):
|
|
conf = {
|
|
'foo': {
|
|
'^Client$': [1, 2],
|
|
}
|
|
}
|
|
result = util.get_variant_data(conf, 'foo', mock.Mock(uid='Server'))
|
|
self.assertEqual(result, [])
|
|
|
|
def test_handle_missing_config(self):
|
|
result = util.get_variant_data({}, 'foo', mock.Mock(uid='Client'))
|
|
self.assertEqual(result, [])
|
|
|
|
def test_get_save_pattern(self):
|
|
conf = {
|
|
'foo': {
|
|
'^Client$': 1,
|
|
'^NotClient$': 2,
|
|
}
|
|
}
|
|
patterns = set()
|
|
result = util.get_variant_data(conf, 'foo', mock.Mock(uid='Client'), keys=patterns)
|
|
self.assertEqual(result, [1])
|
|
self.assertEqual(patterns, set(['^Client$']))
|
|
|
|
|
|
class TestVolumeIdGenerator(unittest.TestCase):
|
|
def setUp(self):
|
|
self.tmp_dir = tempfile.mkdtemp()
|
|
|
|
def tearDown(self):
|
|
shutil.rmtree(self.tmp_dir)
|
|
|
|
@mock.patch('pungi.compose.ComposeInfo')
|
|
def test_get_volid(self, ci):
|
|
all_keys = [
|
|
(['arch', 'compose_id', 'date', 'disc_type'], 'x86_64-compose_id-20160107-'),
|
|
(['label', 'label_major_version', 'release_short', 'respin'], 'RC-1.0-1-rel_short2-2'),
|
|
(['type', 'type_suffix', 'variant', 'version'], 'nightly-.n-Server-6.0')
|
|
]
|
|
for keys, expected in all_keys:
|
|
format = '-'.join(['%(' + k + ')s' for k in keys])
|
|
conf = {
|
|
'release_short': 'rel_short2',
|
|
'release_version': '6.0',
|
|
'image_volid_formats': [format],
|
|
'image_volid_layered_product_formats': [],
|
|
'volume_id_substitutions': {},
|
|
'restricted_volid': False,
|
|
}
|
|
variant = mock.Mock(uid='Server', type='variant')
|
|
ci.return_value.compose.respin = 2
|
|
ci.return_value.compose.id = 'compose_id'
|
|
ci.return_value.compose.date = '20160107'
|
|
ci.return_value.compose.type = 'nightly'
|
|
ci.return_value.compose.type_suffix = '.n'
|
|
ci.return_value.compose.label = 'RC-1.0'
|
|
ci.return_value.compose.label_major_version = '1'
|
|
|
|
ci.return_value.release.version = '3.0'
|
|
ci.return_value.release.short = 'rel_short'
|
|
|
|
c = compose.Compose(conf, self.tmp_dir)
|
|
|
|
volid = util.get_volid(c, 'x86_64', variant, disc_type=False)
|
|
|
|
self.assertEqual(volid, expected)
|
|
|
|
@mock.patch('pungi.compose.ComposeInfo')
|
|
def test_get_restricted_volid(self, ci):
|
|
all_keys = [
|
|
(['arch', 'compose_id', 'date', 'disc_type'], 'x86_64-compose_id-20160107-'),
|
|
(['label', 'label_major_version', 'release_short', 'respin'], 'RC-1-0-1-rel_short2-2'),
|
|
(['type', 'type_suffix', 'variant', 'version'], 'nightly--n-Server-6-0')
|
|
]
|
|
for keys, expected in all_keys:
|
|
format = '-'.join(['%(' + k + ')s' for k in keys])
|
|
conf = {
|
|
'release_short': 'rel_short2',
|
|
'release_version': '6.0',
|
|
'image_volid_formats': [format],
|
|
'image_volid_layered_product_formats': [],
|
|
'volume_id_substitutions': {},
|
|
'restricted_volid': True,
|
|
}
|
|
variant = mock.Mock(uid='Server', type='variant')
|
|
ci.return_value.compose.respin = 2
|
|
ci.return_value.compose.id = 'compose_id'
|
|
ci.return_value.compose.date = '20160107'
|
|
ci.return_value.compose.type = 'nightly'
|
|
ci.return_value.compose.type_suffix = '.n'
|
|
ci.return_value.compose.label = 'RC-1.0'
|
|
ci.return_value.compose.label_major_version = '1'
|
|
|
|
ci.return_value.release.version = '3.0'
|
|
ci.return_value.release.short = 'rel_short'
|
|
|
|
c = compose.Compose(conf, self.tmp_dir)
|
|
|
|
volid = util.get_volid(c, 'x86_64', variant, disc_type=False)
|
|
|
|
self.assertEqual(volid, expected)
|
|
|
|
@mock.patch('pungi.compose.ComposeInfo')
|
|
def test_get_volid_too_long(self, ci):
|
|
conf = {
|
|
'release_short': 'rel_short2',
|
|
'release_version': '6.0',
|
|
'image_volid_formats': [
|
|
'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa', # 34 chars
|
|
'bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb', # 33 chars
|
|
],
|
|
'image_volid_layered_product_formats': [],
|
|
'volume_id_substitutions': {},
|
|
}
|
|
variant = mock.Mock(uid='Server', type='variant')
|
|
c = compose.Compose(conf, self.tmp_dir)
|
|
|
|
with self.assertRaises(ValueError) as ctx:
|
|
util.get_volid(c, 'x86_64', variant, disc_type=False)
|
|
|
|
self.assertIn('bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb', str(ctx.exception))
|
|
self.assertIn('aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa', str(ctx.exception))
|
|
|
|
@mock.patch('pungi.compose.ComposeInfo')
|
|
def test_apply_substitutions(self, ci):
|
|
all_keys = [
|
|
('Fedora-WorkstationOstree-ostree-x86_64-rawhide', 'Fedora-WS-ostree-x86_64-rawhide'),
|
|
('Fedora-WorkstationOstree-ostree-x86_64-Rawhide', 'Fedora-WS-ostree-x86_64-rawh'),
|
|
('x86_64-compose_id-20160107', 'x86_64-compose_id-20160107'),
|
|
('x86_64-compose_id-20160107-Alpha', 'x86_64-compose_id-20160107-A'),
|
|
# These test the case where one substitution is a subset
|
|
# of the other, but sorts alphabetically ahead of it, to
|
|
# make sure we're correctly sorting by length
|
|
('Fedora-zzzaaaaaazzz-Rawhide', 'Fedora-zzz-rawh'),
|
|
('Fedora-aaaaaa-Rawhide', 'Fedora-aaa-rawh'),
|
|
]
|
|
for volid, expected in all_keys:
|
|
conf = {
|
|
'volume_id_substitutions': {
|
|
'Rawhide': 'rawh',
|
|
'WorkstationOstree': 'WS',
|
|
'Workstation': 'WS',
|
|
'Alpha': 'A',
|
|
'zzzaaaaaazzz': 'zzz',
|
|
'aaaaaa': 'aaa',
|
|
}
|
|
}
|
|
c = compose.Compose(conf, self.tmp_dir)
|
|
transformed_volid = util._apply_substitutions(c, volid)
|
|
self.assertEqual(transformed_volid, expected)
|
|
|
|
|
|
class TestFindOldCompose(unittest.TestCase):
|
|
def setUp(self):
|
|
self.tmp_dir = tempfile.mkdtemp()
|
|
|
|
def tearDown(self):
|
|
shutil.rmtree(self.tmp_dir)
|
|
|
|
def test_finds_single(self):
|
|
touch(self.tmp_dir + '/Fedora-Rawhide-20160229.0/STATUS', 'FINISHED')
|
|
old = util.find_old_compose(self.tmp_dir, 'Fedora', 'Rawhide', '')
|
|
self.assertEqual(old, self.tmp_dir + '/Fedora-Rawhide-20160229.0')
|
|
|
|
def test_ignores_in_progress(self):
|
|
touch(self.tmp_dir + '/Fedora-Rawhide-20160229.0/STATUS', 'STARTED')
|
|
old = util.find_old_compose(self.tmp_dir, 'Fedora', 'Rawhide', '')
|
|
self.assertIsNone(old)
|
|
|
|
def test_only_considers_allowed_status(self):
|
|
touch(self.tmp_dir + '/Fedora-Rawhide-20160229.0/STATUS', 'FINISHED')
|
|
old = util.find_old_compose(self.tmp_dir, 'Fedora', 'Rawhide', '',
|
|
allowed_statuses=['DOOMED'])
|
|
self.assertIsNone(old)
|
|
|
|
def test_finds_latest(self):
|
|
touch(self.tmp_dir + '/Fedora-Rawhide-20160228.0/STATUS', 'DOOMED')
|
|
touch(self.tmp_dir + '/Fedora-Rawhide-20160229.0/STATUS', 'FINISHED')
|
|
touch(self.tmp_dir + '/Fedora-Rawhide-20160229.1/STATUS', 'FINISHED_INCOMPLETE')
|
|
old = util.find_old_compose(self.tmp_dir, 'Fedora', 'Rawhide', '')
|
|
self.assertEqual(old, self.tmp_dir + '/Fedora-Rawhide-20160229.1')
|
|
|
|
def test_find_correct_type(self):
|
|
touch(self.tmp_dir + '/Fedora-26-updates-20160229.0/STATUS', 'FINISHED')
|
|
touch(self.tmp_dir + '/Fedora-26-updates-testing-20160229.0/STATUS', 'FINISHED')
|
|
old = util.find_old_compose(self.tmp_dir, 'Fedora', '26', '-updates')
|
|
self.assertEqual(old, self.tmp_dir + '/Fedora-26-updates-20160229.0')
|
|
old = util.find_old_compose(self.tmp_dir, 'Fedora', '26', '-updates-testing')
|
|
self.assertEqual(old, self.tmp_dir + '/Fedora-26-updates-testing-20160229.0')
|
|
|
|
def test_find_latest_with_two_digit_respin(self):
|
|
touch(self.tmp_dir + '/Fedora-Rawhide-20160228.n.9/STATUS', 'FINISHED')
|
|
touch(self.tmp_dir + '/Fedora-Rawhide-20160228.n.10/STATUS', 'FINISHED')
|
|
old = util.find_old_compose(self.tmp_dir, 'Fedora', 'Rawhide', '')
|
|
self.assertEqual(old, self.tmp_dir + '/Fedora-Rawhide-20160228.n.10')
|
|
|
|
def test_finds_ignores_other_files(self):
|
|
touch(self.tmp_dir + '/Fedora-Rawhide-20160229.0', 'not a compose')
|
|
touch(self.tmp_dir + '/Fedora-Rawhide-20160228.0/STATUS/file', 'also not a compose')
|
|
touch(self.tmp_dir + '/Fedora-24-20160229.0/STATUS', 'FINISHED')
|
|
touch(self.tmp_dir + '/Another-Rawhide-20160229.0/STATUS', 'FINISHED')
|
|
old = util.find_old_compose(self.tmp_dir, 'Fedora', 'Rawhide', '')
|
|
self.assertIsNone(old)
|
|
|
|
def test_search_in_file(self):
|
|
touch(self.tmp_dir + '/file')
|
|
old = util.find_old_compose(self.tmp_dir + '/file', 'Fedora', 'Rawhide', '')
|
|
self.assertIsNone(old)
|
|
|
|
def test_skips_symlink(self):
|
|
os.symlink(self.tmp_dir, self.tmp_dir + '/Fedora-Rawhide-20160229.0')
|
|
old = util.find_old_compose(self.tmp_dir, 'Fedora', 'Rawhide', '')
|
|
self.assertIsNone(old)
|
|
|
|
def test_finds_layered_product(self):
|
|
touch(self.tmp_dir + '/Fedora-Rawhide-Base-1-20160229.0/STATUS', 'FINISHED')
|
|
old = util.find_old_compose(self.tmp_dir, 'Fedora', 'Rawhide', '',
|
|
base_product_short='Base', base_product_version='1')
|
|
self.assertEqual(old, self.tmp_dir + '/Fedora-Rawhide-Base-1-20160229.0')
|
|
|
|
|
|
class TestHelpers(PungiTestCase):
|
|
def test_process_args(self):
|
|
self.assertEqual(util.process_args('--opt=%s', None), [])
|
|
self.assertEqual(util.process_args('--opt=%s', []), [])
|
|
self.assertEqual(util.process_args('--opt=%s', ['foo', 'bar']),
|
|
['--opt=foo', '--opt=bar'])
|
|
self.assertEqual(util.process_args('--opt=%s', 'foo'), ['--opt=foo'])
|
|
|
|
def test_makedirs(self):
|
|
util.makedirs(self.topdir + '/foo/bar/baz')
|
|
self.assertTrue(os.path.isdir(self.topdir + '/foo/bar/baz'))
|
|
|
|
def test_makedirs_on_existing(self):
|
|
os.makedirs(self.topdir + '/foo/bar/baz')
|
|
try:
|
|
util.makedirs(self.topdir + '/foo/bar/baz')
|
|
except OSError:
|
|
self.fail('makedirs raised exception on existing directory')
|
|
|
|
|
|
class TestLevenshtein(unittest.TestCase):
|
|
def test_edit_dist_empty_str(self):
|
|
self.assertEqual(util.levenshtein('', ''), 0)
|
|
|
|
def test_edit_dist_same_str(self):
|
|
self.assertEqual(util.levenshtein('aaa', 'aaa'), 0)
|
|
|
|
def test_edit_dist_one_change(self):
|
|
self.assertEqual(util.levenshtein('aab', 'aaa'), 1)
|
|
|
|
def test_edit_dist_different_words(self):
|
|
self.assertEqual(util.levenshtein('kitten', 'sitting'), 3)
|
|
|
|
|
|
class TestRecursiveFileList(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
self.tmp_dir = tempfile.mkdtemp()
|
|
|
|
def tearDown(self):
|
|
shutil.rmtree(self.tmp_dir)
|
|
|
|
def test_flat_file_list(self):
|
|
"""Build a directory containing files and assert they are listed."""
|
|
expected_files = sorted(['file1', 'file2', 'file3'])
|
|
for expected_file in [os.path.join(self.tmp_dir, f) for f in expected_files]:
|
|
touch(expected_file)
|
|
|
|
actual_files = sorted(util.recursive_file_list(self.tmp_dir))
|
|
self.assertEqual(expected_files, actual_files)
|
|
|
|
def test_nested_file_list(self):
|
|
"""Build a directory containing files and assert they are listed."""
|
|
expected_files = sorted(['file1', 'subdir/file2', 'sub/subdir/file3'])
|
|
for expected_file in [os.path.join(self.tmp_dir, f) for f in expected_files]:
|
|
touch(expected_file)
|
|
|
|
actual_files = sorted(util.recursive_file_list(self.tmp_dir))
|
|
self.assertEqual(expected_files, actual_files)
|
|
|
|
|
|
class TestTempFiles(unittest.TestCase):
|
|
def test_temp_dir_ok(self):
|
|
with util.temp_dir() as tmp:
|
|
self.assertTrue(os.path.isdir(tmp))
|
|
self.assertFalse(os.path.exists(tmp))
|
|
|
|
def test_temp_dir_fail(self):
|
|
with self.assertRaises(RuntimeError):
|
|
with util.temp_dir() as tmp:
|
|
self.assertTrue(os.path.isdir(tmp))
|
|
raise RuntimeError('BOOM')
|
|
self.assertFalse(os.path.exists(tmp))
|
|
|
|
def test_temp_dir_in_non_existing_dir(self):
|
|
with util.temp_dir() as playground:
|
|
root = os.path.join(playground, 'missing')
|
|
with util.temp_dir(dir=root) as tmp:
|
|
self.assertTrue(os.path.isdir(tmp))
|
|
self.assertTrue(os.path.isdir(root))
|
|
self.assertFalse(os.path.exists(tmp))
|
|
|
|
|
|
class TestUnmountCmd(unittest.TestCase):
|
|
|
|
def _fakeProc(self, ret, err='', out=''):
|
|
proc = mock.Mock(returncode=ret)
|
|
proc.communicate.return_value = (out, err)
|
|
return proc
|
|
|
|
@mock.patch('subprocess.Popen')
|
|
def test_unmount_cmd_success(self, mockPopen):
|
|
cmd = 'unmount'
|
|
mockPopen.side_effect = [self._fakeProc(0, '')]
|
|
util.run_unmount_cmd(cmd)
|
|
self.assertEqual(mockPopen.call_args_list,
|
|
[mock.call(cmd, stderr=subprocess.PIPE, stdout=subprocess.PIPE,
|
|
universal_newlines=True)])
|
|
|
|
@mock.patch('subprocess.Popen')
|
|
def test_unmount_cmd_fail_other_reason(self, mockPopen):
|
|
cmd = 'unmount'
|
|
mockPopen.side_effect = [self._fakeProc(1, 'It is broken')]
|
|
with self.assertRaises(RuntimeError) as ctx:
|
|
util.run_unmount_cmd(cmd)
|
|
self.assertEqual(str(ctx.exception),
|
|
"Unhandled error when running 'unmount': 'It is broken'")
|
|
self.assertEqual(mockPopen.call_args_list,
|
|
[mock.call(cmd, stderr=subprocess.PIPE, stdout=subprocess.PIPE,
|
|
universal_newlines=True)])
|
|
|
|
@mock.patch('time.sleep')
|
|
@mock.patch('subprocess.Popen')
|
|
def test_unmount_cmd_fail_then_retry(self, mockPopen, mock_sleep):
|
|
cmd = 'unmount'
|
|
mockPopen.side_effect = [self._fakeProc(1, 'Device or resource busy'),
|
|
self._fakeProc(1, 'Device or resource busy'),
|
|
self._fakeProc(0, '')]
|
|
util.run_unmount_cmd(cmd)
|
|
self.assertEqual(mockPopen.call_args_list,
|
|
[mock.call(cmd, stderr=subprocess.PIPE, stdout=subprocess.PIPE,
|
|
universal_newlines=True)] * 3)
|
|
self.assertEqual(mock_sleep.call_args_list,
|
|
[mock.call(0), mock.call(1)])
|
|
|
|
@mock.patch('time.sleep')
|
|
@mock.patch('subprocess.Popen')
|
|
def test_unmount_cmd_fail_then_retry_and_fail(self, mockPopen, mock_sleep):
|
|
cmd = 'unmount'
|
|
mockPopen.side_effect = [self._fakeProc(1, 'Device or resource busy'),
|
|
self._fakeProc(1, 'Device or resource busy'),
|
|
self._fakeProc(1, 'Device or resource busy')]
|
|
with self.assertRaises(RuntimeError) as ctx:
|
|
util.run_unmount_cmd(cmd, max_retries=3)
|
|
self.assertEqual(mockPopen.call_args_list,
|
|
[mock.call(cmd, stderr=subprocess.PIPE, stdout=subprocess.PIPE,
|
|
universal_newlines=True)] * 3)
|
|
self.assertEqual(mock_sleep.call_args_list,
|
|
[mock.call(0), mock.call(1), mock.call(2)])
|
|
self.assertEqual(str(ctx.exception), "Failed to run 'unmount': Device or resource busy.")
|
|
|
|
@mock.patch('time.sleep')
|
|
@mock.patch('subprocess.Popen')
|
|
def test_fusermount_fail_then_retry_and_fail_with_debug(self, mockPopen, mock_sleep):
|
|
logger = mock.Mock()
|
|
mockPopen.side_effect = [self._fakeProc(1, 'Device or resource busy'),
|
|
self._fakeProc(1, 'Device or resource busy'),
|
|
self._fakeProc(1, 'Device or resource busy'),
|
|
self._fakeProc(0, out='list of files'),
|
|
self._fakeProc(0, out='It is very busy'),
|
|
self._fakeProc(1, out='lsof output')]
|
|
with self.assertRaises(RuntimeError) as ctx:
|
|
util.run_unmount_cmd(['fusermount', '-u', '/path'], path='/path', max_retries=3, logger=logger)
|
|
cmd = ['fusermount', '-u', '/path']
|
|
expected = [mock.call(cmd, stderr=subprocess.PIPE, stdout=subprocess.PIPE,
|
|
universal_newlines=True),
|
|
mock.call(cmd, stderr=subprocess.PIPE, stdout=subprocess.PIPE,
|
|
universal_newlines=True),
|
|
mock.call(cmd, stderr=subprocess.PIPE, stdout=subprocess.PIPE,
|
|
universal_newlines=True),
|
|
mock.call(['ls', '-lA', '/path'],
|
|
stderr=subprocess.STDOUT, stdout=subprocess.PIPE,
|
|
universal_newlines=True),
|
|
mock.call(['fuser', '-vm', '/path'],
|
|
stderr=subprocess.STDOUT, stdout=subprocess.PIPE,
|
|
universal_newlines=True),
|
|
mock.call(['lsof', '+D', '/path'],
|
|
stderr=subprocess.STDOUT, stdout=subprocess.PIPE,
|
|
universal_newlines=True)]
|
|
self.assertEqual(mockPopen.call_args_list, expected)
|
|
self.assertEqual(mock_sleep.call_args_list,
|
|
[mock.call(0), mock.call(1), mock.call(2)])
|
|
self.assertEqual(str(ctx.exception),
|
|
"Failed to run ['fusermount', '-u', '/path']: Device or resource busy.")
|
|
self.assertEqual(logger.mock_calls,
|
|
[mock.call.debug('`%s` exited with %s and following output:\n%s',
|
|
'ls -lA /path', 0, 'list of files'),
|
|
mock.call.debug('`%s` exited with %s and following output:\n%s',
|
|
'fuser -vm /path', 0, 'It is very busy'),
|
|
mock.call.debug('`%s` exited with %s and following output:\n%s',
|
|
'lsof +D /path', 1, 'lsof output')])
|
|
|
|
|
|
class TranslatePathTestCase(unittest.TestCase):
|
|
def test_does_nothing_without_config(self):
|
|
compose = mock.Mock(conf={'translate_paths': []})
|
|
ret = util.translate_path(compose, '/mnt/koji/compose/rawhide/XYZ')
|
|
self.assertEqual(ret, '/mnt/koji/compose/rawhide/XYZ')
|
|
|
|
def test_translates_prefix(self):
|
|
compose = mock.Mock(conf={
|
|
'translate_paths': [('/mnt/koji', 'http://example.com')]
|
|
})
|
|
ret = util.translate_path(compose, '/mnt/koji/compose/rawhide/XYZ')
|
|
self.assertEqual(ret, 'http://example.com/compose/rawhide/XYZ')
|
|
|
|
def test_does_not_translate_not_matching(self):
|
|
compose = mock.Mock(conf={
|
|
'translate_paths': [('/mnt/koji', 'http://example.com')]
|
|
})
|
|
ret = util.translate_path(compose, '/mnt/fedora_koji/compose/rawhide/XYZ')
|
|
self.assertEqual(ret, '/mnt/fedora_koji/compose/rawhide/XYZ')
|
|
|
|
|
|
class GetRepoFuncsTestCase(unittest.TestCase):
|
|
@mock.patch('pungi.compose.ComposeInfo')
|
|
def setUp(self, ci):
|
|
self.tmp_dir = tempfile.mkdtemp()
|
|
conf = {
|
|
'translate_paths': [(self.tmp_dir, 'http://example.com')]
|
|
}
|
|
ci.return_value.compose.respin = 0
|
|
ci.return_value.compose.id = 'RHEL-8.0-20180101.n.0'
|
|
ci.return_value.compose.date = '20160101'
|
|
ci.return_value.compose.type = 'nightly'
|
|
ci.return_value.compose.type_suffix = '.n'
|
|
ci.return_value.compose.label = 'RC-1.0'
|
|
ci.return_value.compose.label_major_version = '1'
|
|
|
|
compose_dir = os.path.join(self.tmp_dir, ci.return_value.compose.id)
|
|
self.compose = compose.Compose(conf, compose_dir)
|
|
server_variant = mock.Mock(uid='Server', type='variant')
|
|
client_variant = mock.Mock(uid='Client', type='variant')
|
|
self.compose.all_variants = {
|
|
'Server': server_variant,
|
|
'Client': client_variant,
|
|
}
|
|
|
|
def tearDown(self):
|
|
shutil.rmtree(self.tmp_dir)
|
|
|
|
def test_get_repo_url_from_normal_url(self):
|
|
url = util.get_repo_url(self.compose, 'http://example.com/repo')
|
|
self.assertEqual(url, 'http://example.com/repo')
|
|
|
|
def test_get_repo_url_from_path(self):
|
|
url = util.get_repo_url(self.compose, os.path.join(self.tmp_dir, "repo"))
|
|
self.assertEqual(url, "http://example.com/repo")
|
|
|
|
def test_get_repo_url_from_variant_uid(self):
|
|
url = util.get_repo_url(self.compose, 'Server')
|
|
self.assertEqual(url, 'http://example.com/RHEL-8.0-20180101.n.0/compose/Server/$basearch/os')
|
|
|
|
def test_get_repo_url_from_repo_dict(self):
|
|
repo = {'baseurl': 'http://example.com/repo'}
|
|
url = util.get_repo_url(self.compose, repo)
|
|
self.assertEqual(url, 'http://example.com/repo')
|
|
|
|
repo = {'baseurl': 'Server'}
|
|
url = util.get_repo_url(self.compose, repo)
|
|
self.assertEqual(url, 'http://example.com/RHEL-8.0-20180101.n.0/compose/Server/$basearch/os')
|
|
|
|
def test_get_repo_urls(self):
|
|
repos = [
|
|
'http://example.com/repo',
|
|
'Server',
|
|
{'baseurl': 'Client'},
|
|
{'baseurl': 'ftp://example.com/linux/repo'},
|
|
]
|
|
|
|
expect = [
|
|
'http://example.com/repo',
|
|
'http://example.com/RHEL-8.0-20180101.n.0/compose/Server/$basearch/os',
|
|
'http://example.com/RHEL-8.0-20180101.n.0/compose/Client/$basearch/os',
|
|
'ftp://example.com/linux/repo',
|
|
]
|
|
|
|
self.assertEqual(util.get_repo_urls(self.compose, repos), expect)
|
|
|
|
def test_get_repo_dict_from_normal_url(self):
|
|
repo_dict = util.get_repo_dict('http://example.com/repo')
|
|
expect = {'name': 'http:__example.com_repo', 'baseurl': 'http://example.com/repo'}
|
|
self.assertEqual(repo_dict, expect)
|
|
|
|
def test_get_repo_dict_from_variant_uid(self):
|
|
repo_dict = util.get_repo_dict('Server') # this repo format is deprecated
|
|
expect = {}
|
|
self.assertEqual(repo_dict, expect)
|
|
|
|
def test_get_repo_dict_from_repo_dict(self):
|
|
repo = {'baseurl': 'Server'} # this repo format is deprecated
|
|
expect = {}
|
|
repo_dict = util.get_repo_dict(repo)
|
|
self.assertEqual(repo_dict, expect)
|
|
|
|
def test_get_repo_dicts(self):
|
|
repos = [
|
|
'http://example.com/repo',
|
|
'Server', # this repo format is deprecated (and will not be included into final repo_dict)
|
|
{'baseurl': 'Client'}, # this repo format is deprecated
|
|
{'baseurl': 'ftp://example.com/linux/repo'},
|
|
{'name': 'testrepo', 'baseurl': 'ftp://example.com/linux/repo'},
|
|
]
|
|
expect = [
|
|
{'name': 'http:__example.com_repo', 'baseurl': 'http://example.com/repo'},
|
|
{'name': 'ftp:__example.com_linux_repo', 'baseurl': 'ftp://example.com/linux/repo'},
|
|
{'name': 'testrepo', 'baseurl': 'ftp://example.com/linux/repo'},
|
|
]
|
|
repos = util.get_repo_dicts(repos)
|
|
self.assertEqual(repos, expect)
|
|
|
|
|
|
class TestVersionGenerator(unittest.TestCase):
|
|
def setUp(self):
|
|
ci = mock.MagicMock()
|
|
ci.respin = 0
|
|
ci.id = 'RHEL-8.0-20180101.0'
|
|
ci.release.version = '8'
|
|
ci.type = 'nightly'
|
|
ci.type_suffix = ''
|
|
ci.label = 'RC-1.0'
|
|
ci.label_major_version = '1'
|
|
|
|
self.compose = mock.MagicMock()
|
|
self.compose.ci_base = ci
|
|
self.compose.compose_respin = 0
|
|
self.compose.compose_date = '20160101'
|
|
|
|
def test_unknown_generator(self):
|
|
compose = mock.Mock()
|
|
with self.assertRaises(RuntimeError) as ctx:
|
|
util.version_generator(compose, '!GIMME_VERSION')
|
|
|
|
self.assertEqual(str(ctx.exception),
|
|
"Unknown version generator '!GIMME_VERSION'")
|
|
|
|
def test_passthrough_value(self):
|
|
compose = mock.Mock()
|
|
self.assertEqual(util.version_generator(compose, '1.2.3'), '1.2.3')
|
|
|
|
def test_passthrough_none(self):
|
|
compose = mock.Mock()
|
|
self.assertEqual(util.version_generator(compose, None), None)
|
|
|
|
def test_release_from_version_date_respin(self):
|
|
self.assertEqual(util.version_generator(self.compose, '!VERSION_FROM_VERSION_DATE_RESPIN'),
|
|
'8.20160101.0')
|
|
|
|
def test_release_from_date_respin(self):
|
|
self.assertEqual(
|
|
util.version_generator(self.compose, "!RELEASE_FROM_DATE_RESPIN"),
|
|
"20160101.0",
|
|
)
|
|
|
|
def test_version_from_version(self):
|
|
self.assertEqual(
|
|
util.version_generator(self.compose, "!VERSION_FROM_VERSION"),
|
|
"8",
|
|
)
|
|
|
|
|
|
class TestTZOffset(unittest.TestCase):
|
|
@mock.patch('time.daylight', new=False)
|
|
@mock.patch('time.altzone', new=7200)
|
|
@mock.patch('time.timezone', new=3600)
|
|
@mock.patch('time.localtime', new=lambda: mock.Mock(tm_isdst=0))
|
|
def test_zone_without_dst(self):
|
|
self.assertEqual(util.get_tz_offset(), "-01:00")
|
|
|
|
@mock.patch('time.daylight', new=True)
|
|
@mock.patch('time.altzone', new=7200)
|
|
@mock.patch('time.timezone', new=3600)
|
|
@mock.patch('time.localtime', new=lambda: mock.Mock(tm_isdst=0))
|
|
def test_with_active_dst(self):
|
|
self.assertEqual(util.get_tz_offset(), "-01:00")
|
|
|
|
@mock.patch('time.daylight', new=True)
|
|
@mock.patch('time.altzone', new=-9000)
|
|
@mock.patch('time.timezone', new=-3600)
|
|
@mock.patch('time.localtime', new=lambda: mock.Mock(tm_isdst=1))
|
|
def test_with_inactive_dst(self):
|
|
self.assertEqual(util.get_tz_offset(), "+02:30")
|
|
|
|
@mock.patch('time.daylight', new=False)
|
|
@mock.patch('time.altzone', new=0)
|
|
@mock.patch('time.timezone', new=0)
|
|
@mock.patch('time.localtime', new=lambda: mock.Mock(tm_isdst=0))
|
|
def test_utc(self):
|
|
self.assertEqual(util.get_tz_offset(), "+00:00")
|
|
|
|
|
|
class TestParseKojiEvent(PungiTestCase):
|
|
|
|
def test_number(self):
|
|
self.assertEqual(util.parse_koji_event("1234"), 1234)
|
|
|
|
def test_correct_path(self):
|
|
touch(
|
|
os.path.join(self.topdir, 'work/global/koji-event'),
|
|
'{"id": 19769058, "ts": 1527641311.22855}'
|
|
)
|
|
|
|
self.assertEqual(util.parse_koji_event(self.topdir), 19769058)
|
|
|
|
def test_bad_path(self):
|
|
with self.assertRaises(argparse.ArgumentTypeError):
|
|
util.parse_koji_event(self.topdir)
|
|
|
|
|
|
class TestCopyAll(PungiTestCase):
|
|
|
|
def setUp(self):
|
|
super(TestCopyAll, self).setUp()
|
|
self.src = os.path.join(self.topdir, "src")
|
|
self.dst = os.path.join(self.topdir, "dst")
|
|
util.makedirs(self.src)
|
|
|
|
def test_preserve_symlink(self):
|
|
touch(os.path.join(self.src, "target"))
|
|
os.symlink("target", os.path.join(self.src, "symlink"))
|
|
|
|
util.copy_all(self.src, self.dst)
|
|
|
|
self.assertTrue(os.path.isfile(os.path.join(self.dst, "target")))
|
|
self.assertTrue(os.path.islink(os.path.join(self.dst, "symlink")))
|
|
self.assertEqual(os.readlink(os.path.join(self.dst, "symlink")), "target")
|
|
|
|
def test_copy_broken_symlink(self):
|
|
os.symlink("broken", os.path.join(self.src, "symlink"))
|
|
|
|
util.copy_all(self.src, self.dst)
|
|
|
|
self.assertTrue(os.path.islink(os.path.join(self.dst, "symlink")))
|
|
self.assertEqual(os.readlink(os.path.join(self.dst, "symlink")), "broken")
|
|
|
|
|
|
@mock.patch("six.moves.urllib.request.urlretrieve")
|
|
class TestAsLocalFile(PungiTestCase):
|
|
def test_local_file(self, urlretrieve):
|
|
with util.as_local_file("/tmp/foo") as fn:
|
|
self.assertEqual(fn, "/tmp/foo")
|
|
self.assertEqual(urlretrieve.call_args_list, [])
|
|
|
|
def test_http(self, urlretrieve):
|
|
url = "http://example.com/repodata/repomd.xml"
|
|
|
|
def my_mock(url_):
|
|
self.assertEqual(url, url_)
|
|
self.filename = os.path.join(self.topdir, "my-file")
|
|
touch(self.filename)
|
|
return self.filename, {}
|
|
|
|
urlretrieve.side_effect = my_mock
|
|
|
|
with util.as_local_file(url) as fn:
|
|
self.assertEqual(fn, self.filename)
|
|
self.assertTrue(os.path.exists(self.filename))
|
|
self.assertFalse(os.path.exists(self.filename))
|