145c3adbef
If Koji pungi-buildinstall is used, then the buildinstall results are stored in the `output_dir` dir, but in "results" and "logs" subdirectories. We need to move them to final_output_dir. Signed-off-by: Jan Kaluza <jkaluza@redhat.com>
1057 lines
39 KiB
Python
1057 lines
39 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
import argparse
|
|
import mock
|
|
import os
|
|
|
|
try:
|
|
import unittest2 as unittest
|
|
except ImportError:
|
|
import unittest
|
|
import tempfile
|
|
import shutil
|
|
import subprocess
|
|
import six
|
|
|
|
from pungi import compose
|
|
from pungi import util
|
|
|
|
from tests.helpers import touch, PungiTestCase, mk_boom
|
|
|
|
|
|
class TestGitRefResolver(unittest.TestCase):
|
|
@mock.patch("pungi.util.run")
|
|
def test_successful_resolve(self, run):
|
|
run.return_value = (0, "CAFEBABE\tHEAD\n")
|
|
|
|
url = util.resolve_git_url("https://git.example.com/repo.git?somedir#HEAD")
|
|
|
|
self.assertEqual(url, "https://git.example.com/repo.git?somedir#CAFEBABE")
|
|
run.assert_called_once_with(
|
|
["git", "ls-remote", "https://git.example.com/repo.git", "HEAD"],
|
|
universal_newlines=True,
|
|
)
|
|
|
|
@mock.patch("pungi.util.run")
|
|
def test_successful_resolve_branch(self, run):
|
|
run.return_value = (0, "CAFEBABE\trefs/heads/f24\n")
|
|
|
|
url = util.resolve_git_url(
|
|
"https://git.example.com/repo.git?somedir#origin/f24"
|
|
)
|
|
|
|
self.assertEqual(url, "https://git.example.com/repo.git?somedir#CAFEBABE")
|
|
run.assert_called_once_with(
|
|
["git", "ls-remote", "https://git.example.com/repo.git", "refs/heads/f24"],
|
|
universal_newlines=True,
|
|
)
|
|
|
|
def test_resolve_ref_with_commit_id(self):
|
|
ref = util.resolve_git_ref("https://git.example.com/repo.git", "a" * 40)
|
|
self.assertEqual(ref, "a" * 40)
|
|
|
|
@mock.patch("pungi.util.run")
|
|
def test_resolve_ref_multiple_matches(self, run):
|
|
run.return_value = (
|
|
0,
|
|
"CAFEBABE\trefs/heads/master\nBABECAFE\trefs/remotes/origin/master",
|
|
)
|
|
|
|
ref = util.resolve_git_ref("https://git.example.com/repo.git", "master")
|
|
|
|
self.assertEqual(ref, "CAFEBABE")
|
|
run.assert_called_once_with(
|
|
["git", "ls-remote", "https://git.example.com/repo.git", "master"],
|
|
universal_newlines=True,
|
|
)
|
|
|
|
@mock.patch("pungi.util.run")
|
|
def test_resolve_ref_with_remote_head(self, run):
|
|
run.return_value = (0, "CAFEBABE\tHEAD\nBABECAFE\trefs/remotes/origin/HEAD")
|
|
|
|
ref = util.resolve_git_ref("https://git.example.com/repo.git", "HEAD")
|
|
|
|
self.assertEqual(ref, "CAFEBABE")
|
|
run.assert_called_once_with(
|
|
["git", "ls-remote", "https://git.example.com/repo.git", "HEAD"],
|
|
universal_newlines=True,
|
|
)
|
|
|
|
@mock.patch("pungi.util.run")
|
|
def test_resolve_missing_spec(self, run):
|
|
url = util.resolve_git_url("https://git.example.com/repo.git")
|
|
|
|
self.assertEqual(url, "https://git.example.com/repo.git")
|
|
self.assertEqual(run.mock_calls, [])
|
|
|
|
@mock.patch("pungi.util.run")
|
|
def test_resolve_non_head_spec(self, run):
|
|
url = util.resolve_git_url("https://git.example.com/repo.git#some-tag")
|
|
|
|
self.assertEqual(url, "https://git.example.com/repo.git#some-tag")
|
|
self.assertEqual(run.mock_calls, [])
|
|
|
|
@mock.patch("pungi.util.run")
|
|
def test_resolve_ambiguous(self, run):
|
|
run.return_value = (0, "CAFEBABE\tF11\nDEADBEEF\tF10\n")
|
|
|
|
with self.assertRaises(RuntimeError):
|
|
util.resolve_git_url("https://git.example.com/repo.git?somedir#HEAD")
|
|
|
|
run.assert_called_once_with(
|
|
["git", "ls-remote", "https://git.example.com/repo.git", "HEAD"],
|
|
universal_newlines=True,
|
|
)
|
|
|
|
@mock.patch("pungi.util.run")
|
|
def test_resolve_keep_empty_query_string(self, run):
|
|
run.return_value = (0, "CAFEBABE\tHEAD\n")
|
|
|
|
url = util.resolve_git_url("https://git.example.com/repo.git?#HEAD")
|
|
|
|
run.assert_called_once_with(
|
|
["git", "ls-remote", "https://git.example.com/repo.git", "HEAD"],
|
|
universal_newlines=True,
|
|
)
|
|
self.assertEqual(url, "https://git.example.com/repo.git?#CAFEBABE")
|
|
|
|
@mock.patch("pungi.util.run")
|
|
def test_resolve_strip_git_plus_prefix(self, run):
|
|
run.return_value = (0, "CAFEBABE\tHEAD\n")
|
|
|
|
url = util.resolve_git_url("git+https://git.example.com/repo.git#HEAD")
|
|
|
|
run.assert_called_once_with(
|
|
["git", "ls-remote", "https://git.example.com/repo.git", "HEAD"],
|
|
universal_newlines=True,
|
|
)
|
|
self.assertEqual(url, "git+https://git.example.com/repo.git#CAFEBABE")
|
|
|
|
@mock.patch("pungi.util.run")
|
|
def test_resolve_no_branch_in_remote(self, run):
|
|
run.return_value = (0, "")
|
|
|
|
with self.assertRaises(RuntimeError) as ctx:
|
|
util.resolve_git_url(
|
|
"https://git.example.com/repo.git?somedir#origin/my-branch"
|
|
)
|
|
|
|
run.assert_called_once_with(
|
|
[
|
|
"git",
|
|
"ls-remote",
|
|
"https://git.example.com/repo.git",
|
|
"refs/heads/my-branch",
|
|
],
|
|
universal_newlines=True,
|
|
)
|
|
self.assertIn("ref does not exist in remote repo", str(ctx.exception))
|
|
|
|
@mock.patch("time.sleep")
|
|
@mock.patch("pungi.util.run")
|
|
def test_retry(self, run, sleep):
|
|
run.side_effect = [RuntimeError("Boom"), (0, "CAFEBABE\tHEAD\n")]
|
|
|
|
url = util.resolve_git_url("https://git.example.com/repo.git?somedir#HEAD")
|
|
|
|
self.assertEqual(url, "https://git.example.com/repo.git?somedir#CAFEBABE")
|
|
self.assertEqual(sleep.call_args_list, [mock.call(30)])
|
|
self.assertEqual(
|
|
run.call_args_list,
|
|
[
|
|
mock.call(
|
|
["git", "ls-remote", "https://git.example.com/repo.git", "HEAD"],
|
|
universal_newlines=True,
|
|
)
|
|
]
|
|
* 2,
|
|
)
|
|
|
|
@mock.patch("pungi.util.resolve_git_ref")
|
|
@mock.patch("pungi.util.resolve_git_url")
|
|
def test_resolver_offline(self, mock_resolve_url, mock_resolve_ref):
|
|
resolver = util.GitUrlResolver(offline=True)
|
|
self.assertEqual(
|
|
resolver("http://example.com/repo.git#HEAD"),
|
|
"http://example.com/repo.git#HEAD",
|
|
)
|
|
self.assertEqual(mock_resolve_url.call_args_list, [])
|
|
self.assertEqual(mock_resolve_ref.call_args_list, [])
|
|
|
|
@mock.patch("pungi.util.resolve_git_ref")
|
|
@mock.patch("pungi.util.resolve_git_url")
|
|
def test_resolver_offline_branch(self, mock_resolve_url, mock_resolve_ref):
|
|
resolver = util.GitUrlResolver(offline=True)
|
|
self.assertEqual(
|
|
resolver("http://example.com/repo.git", "master"), "master",
|
|
)
|
|
self.assertEqual(mock_resolve_url.call_args_list, [])
|
|
self.assertEqual(mock_resolve_ref.call_args_list, [])
|
|
|
|
@mock.patch("pungi.util.resolve_git_ref")
|
|
@mock.patch("pungi.util.resolve_git_url")
|
|
def test_resolver_caches_calls(self, mock_resolve_url, mock_resolve_ref):
|
|
url1 = "http://example.com/repo.git#HEAD"
|
|
url2 = "http://example.com/repo.git#master"
|
|
url3 = "http://example.com/repo.git"
|
|
ref1 = "foo"
|
|
ref2 = "bar"
|
|
mock_resolve_url.side_effect = ["1", "2"]
|
|
mock_resolve_ref.side_effect = ["cafe", "beef"]
|
|
resolver = util.GitUrlResolver()
|
|
self.assertEqual(resolver(url1), "1")
|
|
self.assertEqual(resolver(url1), "1")
|
|
self.assertEqual(resolver(url3, ref1), "cafe")
|
|
self.assertEqual(resolver(url3, ref2), "beef")
|
|
self.assertEqual(resolver(url2), "2")
|
|
self.assertEqual(resolver(url3, ref1), "cafe")
|
|
self.assertEqual(resolver(url1), "1")
|
|
self.assertEqual(resolver(url3, ref2), "beef")
|
|
self.assertEqual(resolver(url2), "2")
|
|
self.assertEqual(resolver(url3, ref2), "beef")
|
|
self.assertEqual(
|
|
mock_resolve_url.call_args_list, [mock.call(url1), mock.call(url2)]
|
|
)
|
|
self.assertEqual(
|
|
mock_resolve_ref.call_args_list,
|
|
[mock.call(url3, ref1), mock.call(url3, ref2)],
|
|
)
|
|
|
|
@mock.patch("pungi.util.resolve_git_url")
|
|
def test_resolver_caches_failure(self, mock_resolve):
|
|
url = "http://example.com/repo.git#HEAD"
|
|
mock_resolve.side_effect = mk_boom(util.GitUrlResolveError, "failed")
|
|
resolver = util.GitUrlResolver()
|
|
with self.assertRaises(util.GitUrlResolveError):
|
|
resolver(url)
|
|
with self.assertRaises(util.GitUrlResolveError):
|
|
resolver(url)
|
|
self.assertEqual(mock_resolve.call_args_list, [mock.call(url)])
|
|
|
|
|
|
class TestGetVariantData(unittest.TestCase):
|
|
def test_get_simple(self):
|
|
conf = {"foo": {"^Client$": 1}}
|
|
result = util.get_variant_data(conf, "foo", mock.Mock(uid="Client"))
|
|
self.assertEqual(result, [1])
|
|
|
|
def test_get_make_list(self):
|
|
conf = {"foo": {"^Client$": [1, 2], "^.*$": 3}}
|
|
result = util.get_variant_data(conf, "foo", mock.Mock(uid="Client"))
|
|
six.assertCountEqual(self, result, [1, 2, 3])
|
|
|
|
def test_not_matching_arch(self):
|
|
conf = {"foo": {"^Client$": [1, 2]}}
|
|
result = util.get_variant_data(conf, "foo", mock.Mock(uid="Server"))
|
|
self.assertEqual(result, [])
|
|
|
|
def test_handle_missing_config(self):
|
|
result = util.get_variant_data({}, "foo", mock.Mock(uid="Client"))
|
|
self.assertEqual(result, [])
|
|
|
|
def test_get_save_pattern(self):
|
|
conf = {"foo": {"^Client$": 1, "^NotClient$": 2}}
|
|
patterns = set()
|
|
result = util.get_variant_data(
|
|
conf, "foo", mock.Mock(uid="Client"), keys=patterns
|
|
)
|
|
self.assertEqual(result, [1])
|
|
self.assertEqual(patterns, set(["^Client$"]))
|
|
|
|
|
|
class TestVolumeIdGenerator(unittest.TestCase):
|
|
def setUp(self):
|
|
self.tmp_dir = tempfile.mkdtemp()
|
|
|
|
def tearDown(self):
|
|
shutil.rmtree(self.tmp_dir)
|
|
|
|
@mock.patch("pungi.compose.ComposeInfo")
|
|
def test_get_volid(self, ci):
|
|
all_keys = [
|
|
(
|
|
["arch", "compose_id", "date", "disc_type"],
|
|
"x86_64-compose_id-20160107-",
|
|
),
|
|
(
|
|
["label", "label_major_version", "release_short", "respin"],
|
|
"RC-1.0-1-rel_short2-2",
|
|
),
|
|
(["type", "type_suffix", "variant", "version"], "nightly-.n-Server-6.0"),
|
|
]
|
|
for keys, expected in all_keys:
|
|
format = "-".join(["%(" + k + ")s" for k in keys])
|
|
conf = {
|
|
"release_short": "rel_short2",
|
|
"release_version": "6.0",
|
|
"image_volid_formats": [format],
|
|
"image_volid_layered_product_formats": [],
|
|
"volume_id_substitutions": {},
|
|
"restricted_volid": False,
|
|
}
|
|
variant = mock.Mock(uid="Server", type="variant")
|
|
ci.return_value.compose.respin = 2
|
|
ci.return_value.compose.id = "compose_id"
|
|
ci.return_value.compose.date = "20160107"
|
|
ci.return_value.compose.type = "nightly"
|
|
ci.return_value.compose.type_suffix = ".n"
|
|
ci.return_value.compose.label = "RC-1.0"
|
|
ci.return_value.compose.label_major_version = "1"
|
|
|
|
ci.return_value.release.version = "3.0"
|
|
ci.return_value.release.short = "rel_short"
|
|
|
|
c = compose.Compose(conf, self.tmp_dir)
|
|
|
|
volid = util.get_volid(c, "x86_64", variant, disc_type=False)
|
|
|
|
self.assertEqual(volid, expected)
|
|
|
|
@mock.patch("pungi.compose.ComposeInfo")
|
|
def test_get_restricted_volid(self, ci):
|
|
all_keys = [
|
|
(
|
|
["arch", "compose_id", "date", "disc_type"],
|
|
"x86_64-compose_id-20160107-",
|
|
),
|
|
(
|
|
["label", "label_major_version", "release_short", "respin"],
|
|
"RC-1-0-1-rel_short2-2",
|
|
),
|
|
(["type", "type_suffix", "variant", "version"], "nightly--n-Server-6-0"),
|
|
]
|
|
for keys, expected in all_keys:
|
|
format = "-".join(["%(" + k + ")s" for k in keys])
|
|
conf = {
|
|
"release_short": "rel_short2",
|
|
"release_version": "6.0",
|
|
"image_volid_formats": [format],
|
|
"image_volid_layered_product_formats": [],
|
|
"volume_id_substitutions": {},
|
|
"restricted_volid": True,
|
|
}
|
|
variant = mock.Mock(uid="Server", type="variant")
|
|
ci.return_value.compose.respin = 2
|
|
ci.return_value.compose.id = "compose_id"
|
|
ci.return_value.compose.date = "20160107"
|
|
ci.return_value.compose.type = "nightly"
|
|
ci.return_value.compose.type_suffix = ".n"
|
|
ci.return_value.compose.label = "RC-1.0"
|
|
ci.return_value.compose.label_major_version = "1"
|
|
|
|
ci.return_value.release.version = "3.0"
|
|
ci.return_value.release.short = "rel_short"
|
|
|
|
c = compose.Compose(conf, self.tmp_dir)
|
|
|
|
volid = util.get_volid(c, "x86_64", variant, disc_type=False)
|
|
|
|
self.assertEqual(volid, expected)
|
|
|
|
@mock.patch("pungi.compose.ComposeInfo")
|
|
def test_get_volid_too_long(self, ci):
|
|
conf = {
|
|
"release_short": "rel_short2",
|
|
"release_version": "6.0",
|
|
"image_volid_formats": [
|
|
"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", # 34 chars
|
|
"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb", # 33 chars
|
|
],
|
|
"image_volid_layered_product_formats": [],
|
|
"volume_id_substitutions": {},
|
|
}
|
|
variant = mock.Mock(uid="Server", type="variant")
|
|
c = compose.Compose(conf, self.tmp_dir)
|
|
|
|
with self.assertRaises(ValueError) as ctx:
|
|
util.get_volid(c, "x86_64", variant, disc_type=False)
|
|
|
|
self.assertIn("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb", str(ctx.exception))
|
|
self.assertIn("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", str(ctx.exception))
|
|
|
|
@mock.patch("pungi.compose.ComposeInfo")
|
|
def test_apply_substitutions(self, ci):
|
|
all_keys = [
|
|
(
|
|
"Fedora-WorkstationOstree-ostree-x86_64-rawhide",
|
|
"Fedora-WS-ostree-x86_64-rawhide",
|
|
),
|
|
(
|
|
"Fedora-WorkstationOstree-ostree-x86_64-Rawhide",
|
|
"Fedora-WS-ostree-x86_64-rawh",
|
|
),
|
|
("x86_64-compose_id-20160107", "x86_64-compose_id-20160107"),
|
|
("x86_64-compose_id-20160107-Alpha", "x86_64-compose_id-20160107-A"),
|
|
# These test the case where one substitution is a subset
|
|
# of the other, but sorts alphabetically ahead of it, to
|
|
# make sure we're correctly sorting by length
|
|
("Fedora-zzzaaaaaazzz-Rawhide", "Fedora-zzz-rawh"),
|
|
("Fedora-aaaaaa-Rawhide", "Fedora-aaa-rawh"),
|
|
]
|
|
for volid, expected in all_keys:
|
|
conf = {
|
|
"volume_id_substitutions": {
|
|
"Rawhide": "rawh",
|
|
"WorkstationOstree": "WS",
|
|
"Workstation": "WS",
|
|
"Alpha": "A",
|
|
"zzzaaaaaazzz": "zzz",
|
|
"aaaaaa": "aaa",
|
|
}
|
|
}
|
|
c = compose.Compose(conf, self.tmp_dir)
|
|
transformed_volid = util._apply_substitutions(c, volid)
|
|
self.assertEqual(transformed_volid, expected)
|
|
|
|
|
|
class TestFindOldCompose(unittest.TestCase):
|
|
def setUp(self):
|
|
self.tmp_dir = tempfile.mkdtemp()
|
|
|
|
def tearDown(self):
|
|
shutil.rmtree(self.tmp_dir)
|
|
|
|
def test_finds_single(self):
|
|
touch(self.tmp_dir + "/Fedora-Rawhide-20160229.0/STATUS", "FINISHED")
|
|
old = util.find_old_compose(self.tmp_dir, "Fedora", "Rawhide", "")
|
|
self.assertEqual(old, self.tmp_dir + "/Fedora-Rawhide-20160229.0")
|
|
|
|
def test_ignores_in_progress(self):
|
|
touch(self.tmp_dir + "/Fedora-Rawhide-20160229.0/STATUS", "STARTED")
|
|
old = util.find_old_compose(self.tmp_dir, "Fedora", "Rawhide", "")
|
|
self.assertIsNone(old)
|
|
|
|
def test_only_considers_allowed_status(self):
|
|
touch(self.tmp_dir + "/Fedora-Rawhide-20160229.0/STATUS", "FINISHED")
|
|
old = util.find_old_compose(
|
|
self.tmp_dir, "Fedora", "Rawhide", "", allowed_statuses=["DOOMED"]
|
|
)
|
|
self.assertIsNone(old)
|
|
|
|
def test_finds_latest(self):
|
|
touch(self.tmp_dir + "/Fedora-Rawhide-20160228.0/STATUS", "DOOMED")
|
|
touch(self.tmp_dir + "/Fedora-Rawhide-20160229.0/STATUS", "FINISHED")
|
|
touch(self.tmp_dir + "/Fedora-Rawhide-20160229.1/STATUS", "FINISHED_INCOMPLETE")
|
|
old = util.find_old_compose(self.tmp_dir, "Fedora", "Rawhide", "")
|
|
self.assertEqual(old, self.tmp_dir + "/Fedora-Rawhide-20160229.1")
|
|
|
|
def test_find_correct_type(self):
|
|
touch(self.tmp_dir + "/Fedora-26-updates-20160229.0/STATUS", "FINISHED")
|
|
touch(self.tmp_dir + "/Fedora-26-updates-testing-20160229.0/STATUS", "FINISHED")
|
|
old = util.find_old_compose(self.tmp_dir, "Fedora", "26", "-updates")
|
|
self.assertEqual(old, self.tmp_dir + "/Fedora-26-updates-20160229.0")
|
|
old = util.find_old_compose(self.tmp_dir, "Fedora", "26", "-updates-testing")
|
|
self.assertEqual(old, self.tmp_dir + "/Fedora-26-updates-testing-20160229.0")
|
|
|
|
def test_find_latest_with_two_digit_respin(self):
|
|
touch(self.tmp_dir + "/Fedora-Rawhide-20160228.n.9/STATUS", "FINISHED")
|
|
touch(self.tmp_dir + "/Fedora-Rawhide-20160228.n.10/STATUS", "FINISHED")
|
|
old = util.find_old_compose(self.tmp_dir, "Fedora", "Rawhide", "")
|
|
self.assertEqual(old, self.tmp_dir + "/Fedora-Rawhide-20160228.n.10")
|
|
|
|
def test_finds_ignores_other_files(self):
|
|
touch(self.tmp_dir + "/Fedora-Rawhide-20160229.0", "not a compose")
|
|
touch(
|
|
self.tmp_dir + "/Fedora-Rawhide-20160228.0/STATUS/file",
|
|
"also not a compose",
|
|
)
|
|
touch(self.tmp_dir + "/Fedora-24-20160229.0/STATUS", "FINISHED")
|
|
touch(self.tmp_dir + "/Another-Rawhide-20160229.0/STATUS", "FINISHED")
|
|
old = util.find_old_compose(self.tmp_dir, "Fedora", "Rawhide", "")
|
|
self.assertIsNone(old)
|
|
|
|
def test_search_in_file(self):
|
|
touch(self.tmp_dir + "/file")
|
|
old = util.find_old_compose(self.tmp_dir + "/file", "Fedora", "Rawhide", "")
|
|
self.assertIsNone(old)
|
|
|
|
def test_skips_symlink(self):
|
|
os.symlink(self.tmp_dir, self.tmp_dir + "/Fedora-Rawhide-20160229.0")
|
|
old = util.find_old_compose(self.tmp_dir, "Fedora", "Rawhide", "")
|
|
self.assertIsNone(old)
|
|
|
|
def test_finds_layered_product(self):
|
|
touch(self.tmp_dir + "/Fedora-Rawhide-Base-1-20160229.0/STATUS", "FINISHED")
|
|
old = util.find_old_compose(
|
|
self.tmp_dir,
|
|
"Fedora",
|
|
"Rawhide",
|
|
"",
|
|
base_product_short="Base",
|
|
base_product_version="1",
|
|
)
|
|
self.assertEqual(old, self.tmp_dir + "/Fedora-Rawhide-Base-1-20160229.0")
|
|
|
|
|
|
class TestHelpers(PungiTestCase):
|
|
def test_process_args(self):
|
|
self.assertEqual(util.process_args("--opt=%s", None), [])
|
|
self.assertEqual(util.process_args("--opt=%s", []), [])
|
|
self.assertEqual(
|
|
util.process_args("--opt=%s", ["foo", "bar"]), ["--opt=foo", "--opt=bar"]
|
|
)
|
|
self.assertEqual(util.process_args("--opt=%s", "foo"), ["--opt=foo"])
|
|
|
|
def test_makedirs(self):
|
|
util.makedirs(self.topdir + "/foo/bar/baz")
|
|
self.assertTrue(os.path.isdir(self.topdir + "/foo/bar/baz"))
|
|
|
|
def test_makedirs_on_existing(self):
|
|
os.makedirs(self.topdir + "/foo/bar/baz")
|
|
try:
|
|
util.makedirs(self.topdir + "/foo/bar/baz")
|
|
except OSError:
|
|
self.fail("makedirs raised exception on existing directory")
|
|
|
|
|
|
class TestLevenshtein(unittest.TestCase):
|
|
def test_edit_dist_empty_str(self):
|
|
self.assertEqual(util.levenshtein("", ""), 0)
|
|
|
|
def test_edit_dist_same_str(self):
|
|
self.assertEqual(util.levenshtein("aaa", "aaa"), 0)
|
|
|
|
def test_edit_dist_one_change(self):
|
|
self.assertEqual(util.levenshtein("aab", "aaa"), 1)
|
|
|
|
def test_edit_dist_different_words(self):
|
|
self.assertEqual(util.levenshtein("kitten", "sitting"), 3)
|
|
|
|
|
|
class TestRecursiveFileList(unittest.TestCase):
|
|
def setUp(self):
|
|
self.tmp_dir = tempfile.mkdtemp()
|
|
|
|
def tearDown(self):
|
|
shutil.rmtree(self.tmp_dir)
|
|
|
|
def test_flat_file_list(self):
|
|
"""Build a directory containing files and assert they are listed."""
|
|
expected_files = sorted(["file1", "file2", "file3"])
|
|
for expected_file in [os.path.join(self.tmp_dir, f) for f in expected_files]:
|
|
touch(expected_file)
|
|
|
|
actual_files = sorted(util.recursive_file_list(self.tmp_dir))
|
|
self.assertEqual(expected_files, actual_files)
|
|
|
|
def test_nested_file_list(self):
|
|
"""Build a directory containing files and assert they are listed."""
|
|
expected_files = sorted(["file1", "subdir/file2", "sub/subdir/file3"])
|
|
for expected_file in [os.path.join(self.tmp_dir, f) for f in expected_files]:
|
|
touch(expected_file)
|
|
|
|
actual_files = sorted(util.recursive_file_list(self.tmp_dir))
|
|
self.assertEqual(expected_files, actual_files)
|
|
|
|
|
|
class TestTempFiles(unittest.TestCase):
|
|
def test_temp_dir_ok(self):
|
|
with util.temp_dir() as tmp:
|
|
self.assertTrue(os.path.isdir(tmp))
|
|
self.assertFalse(os.path.exists(tmp))
|
|
|
|
def test_temp_dir_fail(self):
|
|
with self.assertRaises(RuntimeError):
|
|
with util.temp_dir() as tmp:
|
|
self.assertTrue(os.path.isdir(tmp))
|
|
raise RuntimeError("BOOM")
|
|
self.assertFalse(os.path.exists(tmp))
|
|
|
|
def test_temp_dir_in_non_existing_dir(self):
|
|
with util.temp_dir() as playground:
|
|
root = os.path.join(playground, "missing")
|
|
with util.temp_dir(dir=root) as tmp:
|
|
self.assertTrue(os.path.isdir(tmp))
|
|
self.assertTrue(os.path.isdir(root))
|
|
self.assertFalse(os.path.exists(tmp))
|
|
|
|
|
|
class TestUnmountCmd(unittest.TestCase):
|
|
def _fakeProc(self, ret, err="", out=""):
|
|
proc = mock.Mock(returncode=ret)
|
|
proc.communicate.return_value = (out, err)
|
|
return proc
|
|
|
|
@mock.patch("subprocess.Popen")
|
|
def test_unmount_cmd_success(self, mockPopen):
|
|
cmd = "unmount"
|
|
mockPopen.side_effect = [self._fakeProc(0, "")]
|
|
util.run_unmount_cmd(cmd)
|
|
self.assertEqual(
|
|
mockPopen.call_args_list,
|
|
[
|
|
mock.call(
|
|
cmd,
|
|
stderr=subprocess.PIPE,
|
|
stdout=subprocess.PIPE,
|
|
universal_newlines=True,
|
|
)
|
|
],
|
|
)
|
|
|
|
@mock.patch("subprocess.Popen")
|
|
def test_unmount_cmd_fail_other_reason(self, mockPopen):
|
|
cmd = "unmount"
|
|
mockPopen.side_effect = [self._fakeProc(1, "It is broken")]
|
|
with self.assertRaises(RuntimeError) as ctx:
|
|
util.run_unmount_cmd(cmd)
|
|
self.assertEqual(
|
|
str(ctx.exception), "Unhandled error when running 'unmount': 'It is broken'"
|
|
)
|
|
self.assertEqual(
|
|
mockPopen.call_args_list,
|
|
[
|
|
mock.call(
|
|
cmd,
|
|
stderr=subprocess.PIPE,
|
|
stdout=subprocess.PIPE,
|
|
universal_newlines=True,
|
|
)
|
|
],
|
|
)
|
|
|
|
@mock.patch("time.sleep")
|
|
@mock.patch("subprocess.Popen")
|
|
def test_unmount_cmd_fail_then_retry(self, mockPopen, mock_sleep):
|
|
cmd = "unmount"
|
|
mockPopen.side_effect = [
|
|
self._fakeProc(1, "Device or resource busy"),
|
|
self._fakeProc(1, "Device or resource busy"),
|
|
self._fakeProc(0, ""),
|
|
]
|
|
util.run_unmount_cmd(cmd)
|
|
self.assertEqual(
|
|
mockPopen.call_args_list,
|
|
[
|
|
mock.call(
|
|
cmd,
|
|
stderr=subprocess.PIPE,
|
|
stdout=subprocess.PIPE,
|
|
universal_newlines=True,
|
|
)
|
|
]
|
|
* 3,
|
|
)
|
|
self.assertEqual(mock_sleep.call_args_list, [mock.call(0), mock.call(1)])
|
|
|
|
@mock.patch("time.sleep")
|
|
@mock.patch("subprocess.Popen")
|
|
def test_unmount_cmd_fail_then_retry_and_fail(self, mockPopen, mock_sleep):
|
|
cmd = "unmount"
|
|
mockPopen.side_effect = [
|
|
self._fakeProc(1, "Device or resource busy"),
|
|
self._fakeProc(1, "Device or resource busy"),
|
|
self._fakeProc(1, "Device or resource busy"),
|
|
]
|
|
with self.assertRaises(RuntimeError) as ctx:
|
|
util.run_unmount_cmd(cmd, max_retries=3)
|
|
self.assertEqual(
|
|
mockPopen.call_args_list,
|
|
[
|
|
mock.call(
|
|
cmd,
|
|
stderr=subprocess.PIPE,
|
|
stdout=subprocess.PIPE,
|
|
universal_newlines=True,
|
|
)
|
|
]
|
|
* 3,
|
|
)
|
|
self.assertEqual(
|
|
mock_sleep.call_args_list, [mock.call(0), mock.call(1), mock.call(2)]
|
|
)
|
|
self.assertEqual(
|
|
str(ctx.exception), "Failed to run 'unmount': Device or resource busy."
|
|
)
|
|
|
|
@mock.patch("time.sleep")
|
|
@mock.patch("subprocess.Popen")
|
|
def test_fusermount_fail_then_retry_and_fail_with_debug(
|
|
self, mockPopen, mock_sleep
|
|
):
|
|
logger = mock.Mock()
|
|
mockPopen.side_effect = [
|
|
self._fakeProc(1, "Device or resource busy"),
|
|
self._fakeProc(1, "Device or resource busy"),
|
|
self._fakeProc(1, "Device or resource busy"),
|
|
self._fakeProc(0, out="list of files"),
|
|
self._fakeProc(0, out="It is very busy"),
|
|
self._fakeProc(1, out="lsof output"),
|
|
]
|
|
with self.assertRaises(RuntimeError) as ctx:
|
|
util.run_unmount_cmd(
|
|
["fusermount", "-u", "/path"],
|
|
path="/path",
|
|
max_retries=3,
|
|
logger=logger,
|
|
)
|
|
cmd = ["fusermount", "-u", "/path"]
|
|
expected = [
|
|
mock.call(
|
|
cmd,
|
|
stderr=subprocess.PIPE,
|
|
stdout=subprocess.PIPE,
|
|
universal_newlines=True,
|
|
),
|
|
mock.call(
|
|
cmd,
|
|
stderr=subprocess.PIPE,
|
|
stdout=subprocess.PIPE,
|
|
universal_newlines=True,
|
|
),
|
|
mock.call(
|
|
cmd,
|
|
stderr=subprocess.PIPE,
|
|
stdout=subprocess.PIPE,
|
|
universal_newlines=True,
|
|
),
|
|
mock.call(
|
|
["ls", "-lA", "/path"],
|
|
stderr=subprocess.STDOUT,
|
|
stdout=subprocess.PIPE,
|
|
universal_newlines=True,
|
|
),
|
|
mock.call(
|
|
["fuser", "-vm", "/path"],
|
|
stderr=subprocess.STDOUT,
|
|
stdout=subprocess.PIPE,
|
|
universal_newlines=True,
|
|
),
|
|
mock.call(
|
|
["lsof", "+D", "/path"],
|
|
stderr=subprocess.STDOUT,
|
|
stdout=subprocess.PIPE,
|
|
universal_newlines=True,
|
|
),
|
|
]
|
|
self.assertEqual(mockPopen.call_args_list, expected)
|
|
self.assertEqual(
|
|
mock_sleep.call_args_list, [mock.call(0), mock.call(1), mock.call(2)]
|
|
)
|
|
self.assertEqual(
|
|
str(ctx.exception),
|
|
"Failed to run ['fusermount', '-u', '/path']: Device or resource busy.",
|
|
)
|
|
self.assertEqual(
|
|
logger.mock_calls,
|
|
[
|
|
mock.call.debug(
|
|
"`%s` exited with %s and following output:\n%s",
|
|
"ls -lA /path",
|
|
0,
|
|
"list of files",
|
|
),
|
|
mock.call.debug(
|
|
"`%s` exited with %s and following output:\n%s",
|
|
"fuser -vm /path",
|
|
0,
|
|
"It is very busy",
|
|
),
|
|
mock.call.debug(
|
|
"`%s` exited with %s and following output:\n%s",
|
|
"lsof +D /path",
|
|
1,
|
|
"lsof output",
|
|
),
|
|
],
|
|
)
|
|
|
|
|
|
class TranslatePathTestCase(unittest.TestCase):
|
|
def test_does_nothing_without_config(self):
|
|
compose = mock.Mock(conf={"translate_paths": []})
|
|
ret = util.translate_path(compose, "/mnt/koji/compose/rawhide/XYZ")
|
|
self.assertEqual(ret, "/mnt/koji/compose/rawhide/XYZ")
|
|
|
|
def test_translates_prefix(self):
|
|
compose = mock.Mock(
|
|
conf={"translate_paths": [("/mnt/koji", "http://example.com")]}
|
|
)
|
|
ret = util.translate_path(compose, "/mnt/koji/compose/rawhide/XYZ")
|
|
self.assertEqual(ret, "http://example.com/compose/rawhide/XYZ")
|
|
|
|
def test_does_not_translate_not_matching(self):
|
|
compose = mock.Mock(
|
|
conf={"translate_paths": [("/mnt/koji", "http://example.com")]}
|
|
)
|
|
ret = util.translate_path(compose, "/mnt/fedora_koji/compose/rawhide/XYZ")
|
|
self.assertEqual(ret, "/mnt/fedora_koji/compose/rawhide/XYZ")
|
|
|
|
|
|
class GetRepoFuncsTestCase(unittest.TestCase):
|
|
@mock.patch("pungi.compose.ComposeInfo")
|
|
def setUp(self, ci):
|
|
self.tmp_dir = tempfile.mkdtemp()
|
|
conf = {"translate_paths": [(self.tmp_dir, "http://example.com")]}
|
|
ci.return_value.compose.respin = 0
|
|
ci.return_value.compose.id = "RHEL-8.0-20180101.n.0"
|
|
ci.return_value.compose.date = "20160101"
|
|
ci.return_value.compose.type = "nightly"
|
|
ci.return_value.compose.type_suffix = ".n"
|
|
ci.return_value.compose.label = "RC-1.0"
|
|
ci.return_value.compose.label_major_version = "1"
|
|
|
|
compose_dir = os.path.join(self.tmp_dir, ci.return_value.compose.id)
|
|
self.compose = compose.Compose(conf, compose_dir)
|
|
server_variant = mock.Mock(uid="Server", type="variant")
|
|
client_variant = mock.Mock(uid="Client", type="variant")
|
|
self.compose.all_variants = {
|
|
"Server": server_variant,
|
|
"Client": client_variant,
|
|
}
|
|
|
|
def tearDown(self):
|
|
shutil.rmtree(self.tmp_dir)
|
|
|
|
def test_get_repo_url_from_normal_url(self):
|
|
url = util.get_repo_url(self.compose, "http://example.com/repo")
|
|
self.assertEqual(url, "http://example.com/repo")
|
|
|
|
def test_get_repo_url_from_path(self):
|
|
url = util.get_repo_url(self.compose, os.path.join(self.tmp_dir, "repo"))
|
|
self.assertEqual(url, "http://example.com/repo")
|
|
|
|
def test_get_repo_url_from_variant_uid(self):
|
|
url = util.get_repo_url(self.compose, "Server")
|
|
self.assertEqual(
|
|
url, "http://example.com/RHEL-8.0-20180101.n.0/compose/Server/$basearch/os"
|
|
)
|
|
|
|
def test_get_repo_url_from_repo_dict(self):
|
|
repo = {"baseurl": "http://example.com/repo"}
|
|
url = util.get_repo_url(self.compose, repo)
|
|
self.assertEqual(url, "http://example.com/repo")
|
|
|
|
repo = {"baseurl": "Server"}
|
|
url = util.get_repo_url(self.compose, repo)
|
|
self.assertEqual(
|
|
url, "http://example.com/RHEL-8.0-20180101.n.0/compose/Server/$basearch/os"
|
|
)
|
|
|
|
def test_get_repo_urls(self):
|
|
repos = [
|
|
"http://example.com/repo",
|
|
"Server",
|
|
{"baseurl": "Client"},
|
|
{"baseurl": "ftp://example.com/linux/repo"},
|
|
]
|
|
|
|
expect = [
|
|
"http://example.com/repo",
|
|
"http://example.com/RHEL-8.0-20180101.n.0/compose/Server/$basearch/os",
|
|
"http://example.com/RHEL-8.0-20180101.n.0/compose/Client/$basearch/os",
|
|
"ftp://example.com/linux/repo",
|
|
]
|
|
|
|
self.assertEqual(util.get_repo_urls(self.compose, repos), expect)
|
|
|
|
def test_get_repo_dict_from_normal_url(self):
|
|
repo_dict = util.get_repo_dict("http://example.com/repo")
|
|
expect = {
|
|
"name": "http:__example.com_repo",
|
|
"baseurl": "http://example.com/repo",
|
|
}
|
|
self.assertEqual(repo_dict, expect)
|
|
|
|
def test_get_repo_dict_from_variant_uid(self):
|
|
repo_dict = util.get_repo_dict("Server") # this repo format is deprecated
|
|
expect = {}
|
|
self.assertEqual(repo_dict, expect)
|
|
|
|
def test_get_repo_dict_from_repo_dict(self):
|
|
repo = {"baseurl": "Server"} # this repo format is deprecated
|
|
expect = {}
|
|
repo_dict = util.get_repo_dict(repo)
|
|
self.assertEqual(repo_dict, expect)
|
|
|
|
def test_get_repo_dicts(self):
|
|
repos = [
|
|
"http://example.com/repo",
|
|
"Server", # this repo format is deprecated (and will not be included into final repo_dict) # noqa: E501
|
|
{"baseurl": "Client"}, # this repo format is deprecated
|
|
{"baseurl": "ftp://example.com/linux/repo"},
|
|
{"name": "testrepo", "baseurl": "ftp://example.com/linux/repo"},
|
|
]
|
|
expect = [
|
|
{"name": "http:__example.com_repo", "baseurl": "http://example.com/repo"},
|
|
{
|
|
"name": "ftp:__example.com_linux_repo",
|
|
"baseurl": "ftp://example.com/linux/repo",
|
|
},
|
|
{"name": "testrepo", "baseurl": "ftp://example.com/linux/repo"},
|
|
]
|
|
repos = util.get_repo_dicts(repos)
|
|
self.assertEqual(repos, expect)
|
|
|
|
|
|
class TestVersionGenerator(unittest.TestCase):
|
|
def setUp(self):
|
|
ci = mock.MagicMock()
|
|
ci.respin = 0
|
|
ci.id = "RHEL-8.0-20180101.0"
|
|
ci.release.version = "8"
|
|
ci.type = "nightly"
|
|
ci.type_suffix = ""
|
|
ci.label = "RC-1.0"
|
|
ci.label_major_version = "1"
|
|
|
|
self.compose = mock.MagicMock()
|
|
self.compose.ci_base = ci
|
|
self.compose.compose_respin = 0
|
|
self.compose.compose_date = "20160101"
|
|
|
|
def test_unknown_generator(self):
|
|
compose = mock.Mock()
|
|
with self.assertRaises(RuntimeError) as ctx:
|
|
util.version_generator(compose, "!GIMME_VERSION")
|
|
|
|
self.assertEqual(
|
|
str(ctx.exception), "Unknown version generator '!GIMME_VERSION'"
|
|
)
|
|
|
|
def test_passthrough_value(self):
|
|
compose = mock.Mock()
|
|
self.assertEqual(util.version_generator(compose, "1.2.3"), "1.2.3")
|
|
|
|
def test_passthrough_none(self):
|
|
compose = mock.Mock()
|
|
self.assertEqual(util.version_generator(compose, None), None)
|
|
|
|
def test_release_from_version_date_respin(self):
|
|
self.assertEqual(
|
|
util.version_generator(self.compose, "!VERSION_FROM_VERSION_DATE_RESPIN"),
|
|
"8.20160101.0",
|
|
)
|
|
|
|
def test_release_from_date_respin(self):
|
|
self.assertEqual(
|
|
util.version_generator(self.compose, "!RELEASE_FROM_DATE_RESPIN"),
|
|
"20160101.0",
|
|
)
|
|
|
|
def test_version_from_version(self):
|
|
self.assertEqual(
|
|
util.version_generator(self.compose, "!VERSION_FROM_VERSION"), "8",
|
|
)
|
|
|
|
|
|
class TestTZOffset(unittest.TestCase):
|
|
@mock.patch("time.daylight", new=False)
|
|
@mock.patch("time.altzone", new=7200)
|
|
@mock.patch("time.timezone", new=3600)
|
|
@mock.patch("time.localtime", new=lambda: mock.Mock(tm_isdst=0))
|
|
def test_zone_without_dst(self):
|
|
self.assertEqual(util.get_tz_offset(), "-01:00")
|
|
|
|
@mock.patch("time.daylight", new=True)
|
|
@mock.patch("time.altzone", new=7200)
|
|
@mock.patch("time.timezone", new=3600)
|
|
@mock.patch("time.localtime", new=lambda: mock.Mock(tm_isdst=0))
|
|
def test_with_active_dst(self):
|
|
self.assertEqual(util.get_tz_offset(), "-01:00")
|
|
|
|
@mock.patch("time.daylight", new=True)
|
|
@mock.patch("time.altzone", new=-9000)
|
|
@mock.patch("time.timezone", new=-3600)
|
|
@mock.patch("time.localtime", new=lambda: mock.Mock(tm_isdst=1))
|
|
def test_with_inactive_dst(self):
|
|
self.assertEqual(util.get_tz_offset(), "+02:30")
|
|
|
|
@mock.patch("time.daylight", new=False)
|
|
@mock.patch("time.altzone", new=0)
|
|
@mock.patch("time.timezone", new=0)
|
|
@mock.patch("time.localtime", new=lambda: mock.Mock(tm_isdst=0))
|
|
def test_utc(self):
|
|
self.assertEqual(util.get_tz_offset(), "+00:00")
|
|
|
|
|
|
class TestParseKojiEvent(PungiTestCase):
|
|
def test_number(self):
|
|
self.assertEqual(util.parse_koji_event("1234"), 1234)
|
|
|
|
def test_correct_path(self):
|
|
touch(
|
|
os.path.join(self.topdir, "work/global/koji-event"),
|
|
'{"id": 19769058, "ts": 1527641311.22855}',
|
|
)
|
|
|
|
self.assertEqual(util.parse_koji_event(self.topdir), 19769058)
|
|
|
|
def test_bad_path(self):
|
|
with self.assertRaises(argparse.ArgumentTypeError):
|
|
util.parse_koji_event(self.topdir)
|
|
|
|
|
|
class TestCopyAll(PungiTestCase):
|
|
def setUp(self):
|
|
super(TestCopyAll, self).setUp()
|
|
self.src = os.path.join(self.topdir, "src")
|
|
self.dst = os.path.join(self.topdir, "dst")
|
|
util.makedirs(self.src)
|
|
|
|
def test_preserve_symlink(self):
|
|
touch(os.path.join(self.src, "target"))
|
|
os.symlink("target", os.path.join(self.src, "symlink"))
|
|
|
|
util.copy_all(self.src, self.dst)
|
|
|
|
self.assertTrue(os.path.isfile(os.path.join(self.dst, "target")))
|
|
self.assertTrue(os.path.islink(os.path.join(self.dst, "symlink")))
|
|
self.assertEqual(os.readlink(os.path.join(self.dst, "symlink")), "target")
|
|
|
|
def test_copy_broken_symlink(self):
|
|
os.symlink("broken", os.path.join(self.src, "symlink"))
|
|
|
|
util.copy_all(self.src, self.dst)
|
|
|
|
self.assertTrue(os.path.islink(os.path.join(self.dst, "symlink")))
|
|
self.assertEqual(os.readlink(os.path.join(self.dst, "symlink")), "broken")
|
|
|
|
|
|
class TestMoveAll(PungiTestCase):
|
|
def setUp(self):
|
|
super(TestMoveAll, self).setUp()
|
|
self.src = os.path.join(self.topdir, "src")
|
|
self.dst = os.path.join(self.topdir, "dst")
|
|
util.makedirs(self.src)
|
|
|
|
def test_move_all(self):
|
|
touch(os.path.join(self.src, "target"))
|
|
util.move_all(self.src, self.dst)
|
|
|
|
self.assertTrue(os.path.isfile(os.path.join(self.dst, "target")))
|
|
self.assertTrue(os.path.exists(os.path.join(self.src)))
|
|
self.assertFalse(os.path.isfile(os.path.join(self.src, "target")))
|
|
|
|
def test_move_all_rm_src_dir(self):
|
|
touch(os.path.join(self.src, "target"))
|
|
util.move_all(self.src, self.dst, rm_src_dir=True)
|
|
|
|
self.assertTrue(os.path.isfile(os.path.join(self.dst, "target")))
|
|
self.assertFalse(os.path.exists(os.path.join(self.src)))
|
|
self.assertFalse(os.path.isfile(os.path.join(self.src, "target")))
|
|
|
|
|
|
@mock.patch("six.moves.urllib.request.urlretrieve")
|
|
class TestAsLocalFile(PungiTestCase):
|
|
def test_local_file(self, urlretrieve):
|
|
with util.as_local_file("/tmp/foo") as fn:
|
|
self.assertEqual(fn, "/tmp/foo")
|
|
self.assertEqual(urlretrieve.call_args_list, [])
|
|
|
|
def test_http(self, urlretrieve):
|
|
url = "http://example.com/repodata/repomd.xml"
|
|
|
|
def my_mock(url_):
|
|
self.assertEqual(url, url_)
|
|
self.filename = os.path.join(self.topdir, "my-file")
|
|
touch(self.filename)
|
|
return self.filename, {}
|
|
|
|
urlretrieve.side_effect = my_mock
|
|
|
|
with util.as_local_file(url) as fn:
|
|
self.assertEqual(fn, self.filename)
|
|
self.assertTrue(os.path.exists(self.filename))
|
|
self.assertFalse(os.path.exists(self.filename))
|