pungi/tests/test_util.py

1068 lines
39 KiB
Python
Raw Permalink Normal View History

# -*- coding: utf-8 -*-
import argparse
from unittest import mock
import os
try:
import unittest2 as unittest
except ImportError:
import unittest
import tempfile
import shutil
import subprocess
import six
from pungi import compose
from pungi import util
from tests.helpers import touch, PungiTestCase, mk_boom
class TestGitRefResolver(unittest.TestCase):
@mock.patch("pungi.util.run")
def test_successful_resolve(self, run):
run.return_value = (0, "CAFEBABE\tHEAD\n")
url = util.resolve_git_url("https://git.example.com/repo.git?somedir#HEAD")
self.assertEqual(url, "https://git.example.com/repo.git?somedir#CAFEBABE")
run.assert_called_once_with(
["git", "ls-remote", "https://git.example.com/repo.git", "HEAD"],
universal_newlines=True,
)
@mock.patch("pungi.util.run")
def test_successful_resolve_branch(self, run):
run.return_value = (0, "CAFEBABE\trefs/heads/f24\n")
url = util.resolve_git_url(
"https://git.example.com/repo.git?somedir#origin/f24"
)
self.assertEqual(url, "https://git.example.com/repo.git?somedir#CAFEBABE")
run.assert_called_once_with(
["git", "ls-remote", "https://git.example.com/repo.git", "refs/heads/f24"],
universal_newlines=True,
)
def test_resolve_ref_with_commit_id(self):
ref = util.resolve_git_ref("https://git.example.com/repo.git", "a" * 40)
self.assertEqual(ref, "a" * 40)
@mock.patch("pungi.util.run")
def test_resolve_ref_multiple_matches(self, run):
run.return_value = (
0,
"CAFEBABE\trefs/heads/master\nBABECAFE\trefs/remotes/origin/master",
)
ref = util.resolve_git_ref("https://git.example.com/repo.git", "master")
self.assertEqual(ref, "CAFEBABE")
run.assert_called_once_with(
["git", "ls-remote", "https://git.example.com/repo.git", "master"],
universal_newlines=True,
)
@mock.patch("pungi.util.run")
def test_resolve_ref_with_remote_head(self, run):
run.return_value = (0, "CAFEBABE\tHEAD\nBABECAFE\trefs/remotes/origin/HEAD")
ref = util.resolve_git_ref("https://git.example.com/repo.git", "HEAD")
self.assertEqual(ref, "CAFEBABE")
run.assert_called_once_with(
["git", "ls-remote", "https://git.example.com/repo.git", "HEAD"],
universal_newlines=True,
)
@mock.patch("pungi.util.run")
def test_resolve_missing_spec(self, run):
url = util.resolve_git_url("https://git.example.com/repo.git")
self.assertEqual(url, "https://git.example.com/repo.git")
self.assertEqual(run.mock_calls, [])
@mock.patch("pungi.util.run")
def test_resolve_non_head_spec(self, run):
url = util.resolve_git_url("https://git.example.com/repo.git#some-tag")
self.assertEqual(url, "https://git.example.com/repo.git#some-tag")
self.assertEqual(run.mock_calls, [])
@mock.patch("pungi.util.run")
def test_resolve_ambiguous(self, run):
run.return_value = (0, "CAFEBABE\tF11\nDEADBEEF\tF10\n")
with self.assertRaises(RuntimeError):
util.resolve_git_url("https://git.example.com/repo.git?somedir#HEAD")
run.assert_called_once_with(
["git", "ls-remote", "https://git.example.com/repo.git", "HEAD"],
universal_newlines=True,
)
@mock.patch("pungi.util.run")
def test_resolve_keep_empty_query_string(self, run):
run.return_value = (0, "CAFEBABE\tHEAD\n")
url = util.resolve_git_url("https://git.example.com/repo.git?#HEAD")
run.assert_called_once_with(
["git", "ls-remote", "https://git.example.com/repo.git", "HEAD"],
universal_newlines=True,
)
self.assertEqual(url, "https://git.example.com/repo.git?#CAFEBABE")
@mock.patch("pungi.util.run")
def test_resolve_strip_git_plus_prefix(self, run):
run.return_value = (0, "CAFEBABE\tHEAD\n")
url = util.resolve_git_url("git+https://git.example.com/repo.git#HEAD")
run.assert_called_once_with(
["git", "ls-remote", "https://git.example.com/repo.git", "HEAD"],
universal_newlines=True,
)
self.assertEqual(url, "git+https://git.example.com/repo.git#CAFEBABE")
@mock.patch("pungi.util.run")
def test_resolve_no_branch_in_remote(self, run):
run.return_value = (0, "")
with self.assertRaises(RuntimeError) as ctx:
util.resolve_git_url(
"https://git.example.com/repo.git?somedir#origin/my-branch"
)
run.assert_called_once_with(
[
"git",
"ls-remote",
"https://git.example.com/repo.git",
"refs/heads/my-branch",
],
universal_newlines=True,
)
self.assertIn("ref does not exist in remote repo", str(ctx.exception))
@mock.patch("time.sleep")
@mock.patch("pungi.util.run")
def test_retry(self, run, sleep):
run.side_effect = [RuntimeError("Boom"), (0, "CAFEBABE\tHEAD\n")]
url = util.resolve_git_url("https://git.example.com/repo.git?somedir#HEAD")
self.assertEqual(url, "https://git.example.com/repo.git?somedir#CAFEBABE")
self.assertEqual(sleep.call_args_list, [mock.call(30)])
self.assertEqual(
run.call_args_list,
[
mock.call(
["git", "ls-remote", "https://git.example.com/repo.git", "HEAD"],
universal_newlines=True,
)
]
* 2,
)
@mock.patch("pungi.util.resolve_git_ref")
@mock.patch("pungi.util.resolve_git_url")
def test_resolver_offline(self, mock_resolve_url, mock_resolve_ref):
resolver = util.GitUrlResolver(offline=True)
self.assertEqual(
resolver("http://example.com/repo.git#HEAD"),
"http://example.com/repo.git#HEAD",
)
self.assertEqual(mock_resolve_url.call_args_list, [])
self.assertEqual(mock_resolve_ref.call_args_list, [])
@mock.patch("pungi.util.resolve_git_ref")
@mock.patch("pungi.util.resolve_git_url")
def test_resolver_offline_branch(self, mock_resolve_url, mock_resolve_ref):
resolver = util.GitUrlResolver(offline=True)
self.assertEqual(
resolver("http://example.com/repo.git", "master"),
"master",
)
self.assertEqual(mock_resolve_url.call_args_list, [])
self.assertEqual(mock_resolve_ref.call_args_list, [])
@mock.patch("pungi.util.resolve_git_ref")
@mock.patch("pungi.util.resolve_git_url")
def test_resolver_caches_calls(self, mock_resolve_url, mock_resolve_ref):
url1 = "http://example.com/repo.git#HEAD"
url2 = "http://example.com/repo.git#master"
url3 = "http://example.com/repo.git"
ref1 = "foo"
ref2 = "bar"
mock_resolve_url.side_effect = ["1", "2"]
mock_resolve_ref.side_effect = ["cafe", "beef"]
resolver = util.GitUrlResolver()
self.assertEqual(resolver(url1), "1")
self.assertEqual(resolver(url1), "1")
self.assertEqual(resolver(url3, ref1), "cafe")
self.assertEqual(resolver(url3, ref2), "beef")
self.assertEqual(resolver(url2), "2")
self.assertEqual(resolver(url3, ref1), "cafe")
self.assertEqual(resolver(url1), "1")
self.assertEqual(resolver(url3, ref2), "beef")
self.assertEqual(resolver(url2), "2")
self.assertEqual(resolver(url3, ref2), "beef")
self.assertEqual(
mock_resolve_url.call_args_list, [mock.call(url1), mock.call(url2)]
)
self.assertEqual(
mock_resolve_ref.call_args_list,
[mock.call(url3, ref1), mock.call(url3, ref2)],
)
@mock.patch("pungi.util.resolve_git_url")
def test_resolver_caches_failure(self, mock_resolve):
url = "http://example.com/repo.git#HEAD"
mock_resolve.side_effect = mk_boom(util.GitUrlResolveError, "failed")
resolver = util.GitUrlResolver()
with self.assertRaises(util.GitUrlResolveError):
resolver(url)
with self.assertRaises(util.GitUrlResolveError):
resolver(url)
self.assertEqual(mock_resolve.call_args_list, [mock.call(url)])
class TestGetVariantData(unittest.TestCase):
def test_get_simple(self):
conf = {"foo": {"^Client$": 1}}
result = util.get_variant_data(conf, "foo", mock.Mock(uid="Client"))
self.assertEqual(result, [1])
def test_get_make_list(self):
conf = {"foo": {"^Client$": [1, 2], "^.*$": 3}}
result = util.get_variant_data(conf, "foo", mock.Mock(uid="Client"))
six.assertCountEqual(self, result, [1, 2, 3])
def test_not_matching_arch(self):
conf = {"foo": {"^Client$": [1, 2]}}
result = util.get_variant_data(conf, "foo", mock.Mock(uid="Server"))
self.assertEqual(result, [])
def test_handle_missing_config(self):
result = util.get_variant_data({}, "foo", mock.Mock(uid="Client"))
self.assertEqual(result, [])
def test_get_save_pattern(self):
conf = {"foo": {"^Client$": 1, "^NotClient$": 2}}
patterns = set()
result = util.get_variant_data(
conf, "foo", mock.Mock(uid="Client"), keys=patterns
)
self.assertEqual(result, [1])
self.assertEqual(patterns, set(["^Client$"]))
class TestVolumeIdGenerator(unittest.TestCase):
def setUp(self):
self.tmp_dir = tempfile.mkdtemp()
def tearDown(self):
shutil.rmtree(self.tmp_dir)
@mock.patch("pungi.compose.ComposeInfo")
def test_get_volid(self, ci):
all_keys = [
(
["arch", "compose_id", "date", "disc_type"],
"x86_64-compose_id-20160107-",
),
(
["label", "label_major_version", "release_short", "respin"],
"RC-1.0-1-rel_short2-2",
),
(["type", "type_suffix", "variant", "version"], "nightly-.n-Server-6.0"),
]
for keys, expected in all_keys:
format = "-".join(["%(" + k + ")s" for k in keys])
conf = {
"release_short": "rel_short2",
"release_version": "6.0",
"image_volid_formats": [format],
"image_volid_layered_product_formats": [],
"volume_id_substitutions": {},
"restricted_volid": False,
}
variant = mock.Mock(uid="Server", type="variant")
ci.return_value.compose.respin = 2
ci.return_value.compose.id = "compose_id"
ci.return_value.compose.date = "20160107"
ci.return_value.compose.type = "nightly"
ci.return_value.compose.type_suffix = ".n"
ci.return_value.compose.label = "RC-1.0"
ci.return_value.compose.label_major_version = "1"
ci.return_value.release.version = "3.0"
ci.return_value.release.short = "rel_short"
c = compose.Compose(conf, self.tmp_dir)
volid = util.get_volid(c, "x86_64", variant, disc_type=False)
self.assertEqual(volid, expected)
@mock.patch("pungi.compose.ComposeInfo")
def test_get_restricted_volid(self, ci):
all_keys = [
(
["arch", "compose_id", "date", "disc_type"],
"x86_64-compose_id-20160107-",
),
(
["label", "label_major_version", "release_short", "respin"],
"RC-1-0-1-rel_short2-2",
),
(["type", "type_suffix", "variant", "version"], "nightly--n-Server-6-0"),
]
for keys, expected in all_keys:
format = "-".join(["%(" + k + ")s" for k in keys])
conf = {
"release_short": "rel_short2",
"release_version": "6.0",
"image_volid_formats": [format],
"image_volid_layered_product_formats": [],
"volume_id_substitutions": {},
"restricted_volid": True,
}
variant = mock.Mock(uid="Server", type="variant")
ci.return_value.compose.respin = 2
ci.return_value.compose.id = "compose_id"
ci.return_value.compose.date = "20160107"
ci.return_value.compose.type = "nightly"
ci.return_value.compose.type_suffix = ".n"
ci.return_value.compose.label = "RC-1.0"
ci.return_value.compose.label_major_version = "1"
ci.return_value.release.version = "3.0"
ci.return_value.release.short = "rel_short"
c = compose.Compose(conf, self.tmp_dir)
volid = util.get_volid(c, "x86_64", variant, disc_type=False)
self.assertEqual(volid, expected)
@mock.patch("pungi.compose.ComposeInfo")
def test_get_volid_too_long(self, ci):
conf = {
"release_short": "rel_short2",
"release_version": "6.0",
"image_volid_formats": [
"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", # 34 chars
"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb", # 33 chars
],
"image_volid_layered_product_formats": [],
"volume_id_substitutions": {},
}
variant = mock.Mock(uid="Server", type="variant")
c = compose.Compose(conf, self.tmp_dir)
with self.assertRaises(ValueError) as ctx:
util.get_volid(c, "x86_64", variant, disc_type=False)
self.assertIn("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb", str(ctx.exception))
self.assertIn("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", str(ctx.exception))
@mock.patch("pungi.compose.ComposeInfo")
def test_apply_substitutions(self, ci):
all_keys = [
(
"Fedora-WorkstationOstree-ostree-x86_64-rawhide",
"Fedora-WS-ostree-x86_64-rawhide",
),
(
"Fedora-WorkstationOstree-ostree-x86_64-Rawhide",
"Fedora-WS-ostree-x86_64-rawh",
),
("x86_64-compose_id-20160107", "x86_64-compose_id-20160107"),
("x86_64-compose_id-20160107-Alpha", "x86_64-compose_id-20160107-A"),
# These test the case where one substitution is a subset
# of the other, but sorts alphabetically ahead of it, to
# make sure we're correctly sorting by length
("Fedora-zzzaaaaaazzz-Rawhide", "Fedora-zzz-rawh"),
("Fedora-aaaaaa-Rawhide", "Fedora-aaa-rawh"),
]
for volid, expected in all_keys:
conf = {
"volume_id_substitutions": {
"Rawhide": "rawh",
"WorkstationOstree": "WS",
"Workstation": "WS",
"Alpha": "A",
"zzzaaaaaazzz": "zzz",
"aaaaaa": "aaa",
}
}
c = compose.Compose(conf, self.tmp_dir)
transformed_volid = util._apply_substitutions(c, volid)
self.assertEqual(transformed_volid, expected)
class TestFindOldCompose(unittest.TestCase):
def setUp(self):
self.tmp_dir = tempfile.mkdtemp()
def tearDown(self):
shutil.rmtree(self.tmp_dir)
def test_finds_single(self):
touch(self.tmp_dir + "/Fedora-Rawhide-20160229.0/STATUS", "FINISHED")
old = util.find_old_compose(self.tmp_dir, "Fedora", "Rawhide", "")
self.assertEqual(old, self.tmp_dir + "/Fedora-Rawhide-20160229.0")
def test_ignores_in_progress(self):
touch(self.tmp_dir + "/Fedora-Rawhide-20160229.0/STATUS", "STARTED")
old = util.find_old_compose(self.tmp_dir, "Fedora", "Rawhide", "")
self.assertIsNone(old)
def test_only_considers_allowed_status(self):
touch(self.tmp_dir + "/Fedora-Rawhide-20160229.0/STATUS", "FINISHED")
old = util.find_old_compose(
self.tmp_dir, "Fedora", "Rawhide", "", allowed_statuses=["DOOMED"]
)
self.assertIsNone(old)
def test_finds_latest(self):
touch(self.tmp_dir + "/Fedora-Rawhide-20160228.0/STATUS", "DOOMED")
touch(self.tmp_dir + "/Fedora-Rawhide-20160229.0/STATUS", "FINISHED")
touch(self.tmp_dir + "/Fedora-Rawhide-20160229.1/STATUS", "FINISHED_INCOMPLETE")
old = util.find_old_compose(self.tmp_dir, "Fedora", "Rawhide", "")
self.assertEqual(old, self.tmp_dir + "/Fedora-Rawhide-20160229.1")
def test_find_correct_type(self):
touch(self.tmp_dir + "/Fedora-26-updates-20160229.0/STATUS", "FINISHED")
touch(self.tmp_dir + "/Fedora-26-updates-testing-20160229.0/STATUS", "FINISHED")
old = util.find_old_compose(self.tmp_dir, "Fedora", "26", "-updates")
self.assertEqual(old, self.tmp_dir + "/Fedora-26-updates-20160229.0")
old = util.find_old_compose(self.tmp_dir, "Fedora", "26", "-updates-testing")
self.assertEqual(old, self.tmp_dir + "/Fedora-26-updates-testing-20160229.0")
def test_find_latest_with_two_digit_respin(self):
touch(self.tmp_dir + "/Fedora-Rawhide-20160228.n.9/STATUS", "FINISHED")
touch(self.tmp_dir + "/Fedora-Rawhide-20160228.n.10/STATUS", "FINISHED")
old = util.find_old_compose(self.tmp_dir, "Fedora", "Rawhide", "")
self.assertEqual(old, self.tmp_dir + "/Fedora-Rawhide-20160228.n.10")
def test_finds_ignores_other_files(self):
touch(self.tmp_dir + "/Fedora-Rawhide-20160229.0", "not a compose")
touch(
self.tmp_dir + "/Fedora-Rawhide-20160228.0/STATUS/file",
"also not a compose",
)
touch(self.tmp_dir + "/Fedora-24-20160229.0/STATUS", "FINISHED")
touch(self.tmp_dir + "/Another-Rawhide-20160229.0/STATUS", "FINISHED")
old = util.find_old_compose(self.tmp_dir, "Fedora", "Rawhide", "")
self.assertIsNone(old)
def test_search_in_file(self):
touch(self.tmp_dir + "/file")
old = util.find_old_compose(self.tmp_dir + "/file", "Fedora", "Rawhide", "")
self.assertIsNone(old)
def test_do_not_skip_symlink(self):
touch(self.tmp_dir + "/Fedora-Rawhide-20160228.n.10/STATUS", "FINISHED")
os.symlink(
self.tmp_dir + "/Fedora-Rawhide-20160228.n.10",
self.tmp_dir + "/Fedora-Rawhide-20160229.n.0",
)
old = util.find_old_compose(self.tmp_dir, "Fedora", "Rawhide", "")
self.assertEqual(old, self.tmp_dir + "/Fedora-Rawhide-20160229.n.0")
def test_finds_layered_product(self):
touch(self.tmp_dir + "/Fedora-Rawhide-Base-1-20160229.0/STATUS", "FINISHED")
old = util.find_old_compose(
self.tmp_dir,
"Fedora",
"Rawhide",
"",
base_product_short="Base",
base_product_version="1",
)
self.assertEqual(old, self.tmp_dir + "/Fedora-Rawhide-Base-1-20160229.0")
class TestHelpers(PungiTestCase):
def test_process_args(self):
self.assertEqual(util.process_args("--opt=%s", None), [])
self.assertEqual(util.process_args("--opt=%s", []), [])
self.assertEqual(
util.process_args("--opt=%s", ["foo", "bar"]), ["--opt=foo", "--opt=bar"]
)
self.assertEqual(util.process_args("--opt=%s", "foo"), ["--opt=foo"])
def test_makedirs(self):
util.makedirs(self.topdir + "/foo/bar/baz")
self.assertTrue(os.path.isdir(self.topdir + "/foo/bar/baz"))
def test_makedirs_on_existing(self):
os.makedirs(self.topdir + "/foo/bar/baz")
try:
util.makedirs(self.topdir + "/foo/bar/baz")
except OSError:
self.fail("makedirs raised exception on existing directory")
class TestLevenshtein(unittest.TestCase):
def test_edit_dist_empty_str(self):
self.assertEqual(util.levenshtein("", ""), 0)
def test_edit_dist_same_str(self):
self.assertEqual(util.levenshtein("aaa", "aaa"), 0)
def test_edit_dist_one_change(self):
self.assertEqual(util.levenshtein("aab", "aaa"), 1)
def test_edit_dist_different_words(self):
self.assertEqual(util.levenshtein("kitten", "sitting"), 3)
class TestRecursiveFileList(unittest.TestCase):
def setUp(self):
self.tmp_dir = tempfile.mkdtemp()
def tearDown(self):
shutil.rmtree(self.tmp_dir)
def test_flat_file_list(self):
"""Build a directory containing files and assert they are listed."""
expected_files = sorted(["file1", "file2", "file3"])
for expected_file in [os.path.join(self.tmp_dir, f) for f in expected_files]:
touch(expected_file)
actual_files = sorted(util.recursive_file_list(self.tmp_dir))
self.assertEqual(expected_files, actual_files)
def test_nested_file_list(self):
"""Build a directory containing files and assert they are listed."""
expected_files = sorted(["file1", "subdir/file2", "sub/subdir/file3"])
for expected_file in [os.path.join(self.tmp_dir, f) for f in expected_files]:
touch(expected_file)
actual_files = sorted(util.recursive_file_list(self.tmp_dir))
self.assertEqual(expected_files, actual_files)
class TestTempFiles(unittest.TestCase):
def test_temp_dir_ok(self):
with util.temp_dir() as tmp:
self.assertTrue(os.path.isdir(tmp))
self.assertFalse(os.path.exists(tmp))
def test_temp_dir_fail(self):
with self.assertRaises(RuntimeError):
with util.temp_dir() as tmp:
self.assertTrue(os.path.isdir(tmp))
raise RuntimeError("BOOM")
self.assertFalse(os.path.exists(tmp))
def test_temp_dir_in_non_existing_dir(self):
with util.temp_dir() as playground:
root = os.path.join(playground, "missing")
with util.temp_dir(dir=root) as tmp:
self.assertTrue(os.path.isdir(tmp))
self.assertTrue(os.path.isdir(root))
self.assertFalse(os.path.exists(tmp))
class TestUnmountCmd(unittest.TestCase):
def _fakeProc(self, ret, err="", out=""):
proc = mock.Mock(returncode=ret)
proc.communicate.return_value = (out, err)
return proc
@mock.patch("subprocess.Popen")
def test_unmount_cmd_success(self, mockPopen):
cmd = "unmount"
mockPopen.side_effect = [self._fakeProc(0, "")]
util.run_unmount_cmd(cmd)
self.assertEqual(
mockPopen.call_args_list,
[
mock.call(
cmd,
stderr=subprocess.PIPE,
stdout=subprocess.PIPE,
universal_newlines=True,
)
],
)
@mock.patch("subprocess.Popen")
def test_unmount_cmd_fail_other_reason(self, mockPopen):
cmd = "unmount"
mockPopen.side_effect = [self._fakeProc(1, "It is broken")]
with self.assertRaises(RuntimeError) as ctx:
util.run_unmount_cmd(cmd)
self.assertEqual(
str(ctx.exception), "Unhandled error when running 'unmount': 'It is broken'"
)
self.assertEqual(
mockPopen.call_args_list,
[
mock.call(
cmd,
stderr=subprocess.PIPE,
stdout=subprocess.PIPE,
universal_newlines=True,
)
],
)
@mock.patch("time.sleep")
@mock.patch("subprocess.Popen")
def test_unmount_cmd_fail_then_retry(self, mockPopen, mock_sleep):
cmd = "unmount"
mockPopen.side_effect = [
self._fakeProc(1, "Device or resource busy"),
self._fakeProc(1, "Device or resource busy"),
self._fakeProc(0, ""),
]
util.run_unmount_cmd(cmd)
self.assertEqual(
mockPopen.call_args_list,
[
mock.call(
cmd,
stderr=subprocess.PIPE,
stdout=subprocess.PIPE,
universal_newlines=True,
)
]
* 3,
)
self.assertEqual(mock_sleep.call_args_list, [mock.call(0), mock.call(1)])
@mock.patch("time.sleep")
@mock.patch("subprocess.Popen")
def test_unmount_cmd_fail_then_retry_and_fail(self, mockPopen, mock_sleep):
cmd = "unmount"
mockPopen.side_effect = [
self._fakeProc(1, "Device or resource busy"),
self._fakeProc(1, "Device or resource busy"),
self._fakeProc(1, "Device or resource busy"),
]
with self.assertRaises(RuntimeError) as ctx:
util.run_unmount_cmd(cmd, max_retries=3)
self.assertEqual(
mockPopen.call_args_list,
[
mock.call(
cmd,
stderr=subprocess.PIPE,
stdout=subprocess.PIPE,
universal_newlines=True,
)
]
* 3,
)
self.assertEqual(
mock_sleep.call_args_list, [mock.call(0), mock.call(1), mock.call(2)]
)
self.assertEqual(
str(ctx.exception), "Failed to run 'unmount': Device or resource busy."
)
@mock.patch("time.sleep")
@mock.patch("subprocess.Popen")
def test_fusermount_fail_then_retry_and_fail_with_debug(
self, mockPopen, mock_sleep
):
logger = mock.Mock()
mockPopen.side_effect = [
self._fakeProc(1, "Device or resource busy"),
self._fakeProc(1, "Device or resource busy"),
self._fakeProc(1, "Device or resource busy"),
self._fakeProc(0, out="list of files"),
self._fakeProc(0, out="It is very busy"),
self._fakeProc(1, out="lsof output"),
]
with self.assertRaises(RuntimeError) as ctx:
util.run_unmount_cmd(
["fusermount", "-u", "/path"],
path="/path",
max_retries=3,
logger=logger,
)
cmd = ["fusermount", "-u", "/path"]
expected = [
mock.call(
cmd,
stderr=subprocess.PIPE,
stdout=subprocess.PIPE,
universal_newlines=True,
),
mock.call(
cmd,
stderr=subprocess.PIPE,
stdout=subprocess.PIPE,
universal_newlines=True,
),
mock.call(
cmd,
stderr=subprocess.PIPE,
stdout=subprocess.PIPE,
universal_newlines=True,
),
mock.call(
["ls", "-lA", "/path"],
stderr=subprocess.STDOUT,
stdout=subprocess.PIPE,
universal_newlines=True,
),
mock.call(
["fuser", "-vm", "/path"],
stderr=subprocess.STDOUT,
stdout=subprocess.PIPE,
universal_newlines=True,
),
mock.call(
["lsof", "+D", "/path"],
stderr=subprocess.STDOUT,
stdout=subprocess.PIPE,
universal_newlines=True,
),
]
self.assertEqual(mockPopen.call_args_list, expected)
self.assertEqual(
mock_sleep.call_args_list, [mock.call(0), mock.call(1), mock.call(2)]
)
self.assertEqual(
str(ctx.exception),
"Failed to run ['fusermount', '-u', '/path']: Device or resource busy.",
)
self.assertEqual(
logger.mock_calls,
[
mock.call.debug(
"`%s` exited with %s and following output:\n%s",
"ls -lA /path",
0,
"list of files",
),
mock.call.debug(
"`%s` exited with %s and following output:\n%s",
"fuser -vm /path",
0,
"It is very busy",
),
mock.call.debug(
"`%s` exited with %s and following output:\n%s",
"lsof +D /path",
1,
"lsof output",
),
],
)
class TranslatePathTestCase(unittest.TestCase):
def test_does_nothing_without_config(self):
compose = mock.Mock(conf={"translate_paths": []})
ret = util.translate_path(compose, "/mnt/koji/compose/rawhide/XYZ")
self.assertEqual(ret, "/mnt/koji/compose/rawhide/XYZ")
def test_translates_prefix(self):
compose = mock.Mock(
conf={"translate_paths": [("/mnt/koji", "http://example.com")]}
)
ret = util.translate_path(compose, "/mnt/koji/compose/rawhide/XYZ")
self.assertEqual(ret, "http://example.com/compose/rawhide/XYZ")
def test_does_not_translate_not_matching(self):
compose = mock.Mock(
conf={"translate_paths": [("/mnt/koji", "http://example.com")]}
)
ret = util.translate_path(compose, "/mnt/fedora_koji/compose/rawhide/XYZ")
self.assertEqual(ret, "/mnt/fedora_koji/compose/rawhide/XYZ")
class GetRepoFuncsTestCase(unittest.TestCase):
@mock.patch("pungi.compose.ComposeInfo")
def setUp(self, ci):
self.tmp_dir = tempfile.mkdtemp()
conf = {"translate_paths": [(self.tmp_dir, "http://example.com")]}
ci.return_value.compose.respin = 0
ci.return_value.compose.id = "RHEL-8.0-20180101.n.0"
ci.return_value.compose.date = "20160101"
ci.return_value.compose.type = "nightly"
ci.return_value.compose.type_suffix = ".n"
ci.return_value.compose.label = "RC-1.0"
ci.return_value.compose.label_major_version = "1"
compose_dir = os.path.join(self.tmp_dir, ci.return_value.compose.id)
self.compose = compose.Compose(conf, compose_dir)
server_variant = mock.Mock(uid="Server", type="variant")
client_variant = mock.Mock(uid="Client", type="variant")
self.compose.all_variants = {
"Server": server_variant,
"Client": client_variant,
}
def tearDown(self):
shutil.rmtree(self.tmp_dir)
def test_get_repo_url_from_normal_url(self):
url = util.get_repo_url(self.compose, "http://example.com/repo")
self.assertEqual(url, "http://example.com/repo")
def test_get_repo_url_from_path(self):
url = util.get_repo_url(self.compose, os.path.join(self.tmp_dir, "repo"))
self.assertEqual(url, "http://example.com/repo")
def test_get_repo_url_from_variant_uid(self):
url = util.get_repo_url(self.compose, "Server")
self.assertEqual(
url, "http://example.com/RHEL-8.0-20180101.n.0/compose/Server/$basearch/os"
)
def test_get_repo_url_from_repo_dict(self):
repo = {"baseurl": "http://example.com/repo"}
url = util.get_repo_url(self.compose, repo)
self.assertEqual(url, "http://example.com/repo")
repo = {"baseurl": "Server"}
url = util.get_repo_url(self.compose, repo)
self.assertEqual(
url, "http://example.com/RHEL-8.0-20180101.n.0/compose/Server/$basearch/os"
)
def test_get_repo_urls(self):
repos = [
"http://example.com/repo",
"Server",
{"baseurl": "Client"},
{"baseurl": "ftp://example.com/linux/repo"},
]
expect = [
"http://example.com/repo",
"http://example.com/RHEL-8.0-20180101.n.0/compose/Server/$basearch/os",
"http://example.com/RHEL-8.0-20180101.n.0/compose/Client/$basearch/os",
"ftp://example.com/linux/repo",
]
self.assertEqual(util.get_repo_urls(self.compose, repos), expect)
def test_get_repo_dict_from_normal_url(self):
repo_dict = util.get_repo_dict("http://example.com/repo")
expect = {
"name": "http:__example.com_repo",
"baseurl": "http://example.com/repo",
}
self.assertEqual(repo_dict, expect)
def test_get_repo_dict_from_variant_uid(self):
repo_dict = util.get_repo_dict("Server") # this repo format is deprecated
expect = {}
self.assertEqual(repo_dict, expect)
def test_get_repo_dict_from_repo_dict(self):
repo = {"baseurl": "Server"} # this repo format is deprecated
expect = {}
repo_dict = util.get_repo_dict(repo)
self.assertEqual(repo_dict, expect)
def test_get_repo_dicts(self):
repos = [
"http://example.com/repo",
"Server", # this repo format is deprecated (and will not be included into final repo_dict) # noqa: E501
{"baseurl": "Client"}, # this repo format is deprecated
{"baseurl": "ftp://example.com/linux/repo"},
{"name": "testrepo", "baseurl": "ftp://example.com/linux/repo"},
]
expect = [
{"name": "http:__example.com_repo", "baseurl": "http://example.com/repo"},
{
"name": "ftp:__example.com_linux_repo",
"baseurl": "ftp://example.com/linux/repo",
},
{"name": "testrepo", "baseurl": "ftp://example.com/linux/repo"},
]
repos = util.get_repo_dicts(repos)
self.assertEqual(repos, expect)
class TestVersionGenerator(unittest.TestCase):
def setUp(self):
ci = mock.MagicMock()
ci.respin = 0
ci.id = "RHEL-8.0-20180101.0"
ci.release.version = "8"
ci.type = "nightly"
ci.type_suffix = ""
ci.label = "RC-1.0"
ci.label_major_version = "1"
self.compose = mock.MagicMock()
self.compose.ci_base = ci
self.compose.compose_respin = 0
self.compose.compose_date = "20160101"
def test_unknown_generator(self):
compose = mock.Mock()
with self.assertRaises(RuntimeError) as ctx:
util.version_generator(compose, "!GIMME_VERSION")
self.assertEqual(
str(ctx.exception), "Unknown version generator '!GIMME_VERSION'"
)
def test_passthrough_value(self):
compose = mock.Mock()
self.assertEqual(util.version_generator(compose, "1.2.3"), "1.2.3")
def test_passthrough_none(self):
compose = mock.Mock()
self.assertEqual(util.version_generator(compose, None), None)
def test_release_from_version_date_respin(self):
self.assertEqual(
util.version_generator(self.compose, "!VERSION_FROM_VERSION_DATE_RESPIN"),
"8.20160101.0",
)
def test_release_from_date_respin(self):
self.assertEqual(
util.version_generator(self.compose, "!RELEASE_FROM_DATE_RESPIN"),
"20160101.0",
)
def test_version_from_version(self):
self.assertEqual(
util.version_generator(self.compose, "!VERSION_FROM_VERSION"),
"8",
)
class TestTZOffset(unittest.TestCase):
@mock.patch("time.daylight", new=False)
@mock.patch("time.altzone", new=7200)
@mock.patch("time.timezone", new=3600)
@mock.patch("time.localtime", new=lambda: mock.Mock(tm_isdst=0))
def test_zone_without_dst(self):
self.assertEqual(util.get_tz_offset(), "-01:00")
@mock.patch("time.daylight", new=True)
@mock.patch("time.altzone", new=7200)
@mock.patch("time.timezone", new=3600)
@mock.patch("time.localtime", new=lambda: mock.Mock(tm_isdst=0))
def test_with_active_dst(self):
self.assertEqual(util.get_tz_offset(), "-01:00")
@mock.patch("time.daylight", new=True)
@mock.patch("time.altzone", new=-9000)
@mock.patch("time.timezone", new=-3600)
@mock.patch("time.localtime", new=lambda: mock.Mock(tm_isdst=1))
def test_with_inactive_dst(self):
self.assertEqual(util.get_tz_offset(), "+02:30")
@mock.patch("time.daylight", new=False)
@mock.patch("time.altzone", new=0)
@mock.patch("time.timezone", new=0)
@mock.patch("time.localtime", new=lambda: mock.Mock(tm_isdst=0))
def test_utc(self):
self.assertEqual(util.get_tz_offset(), "+00:00")
class TestParseKojiEvent(PungiTestCase):
def test_number(self):
self.assertEqual(util.parse_koji_event("1234"), 1234)
def test_correct_path(self):
touch(
os.path.join(self.topdir, "work/global/koji-event"),
'{"id": 19769058, "ts": 1527641311.22855}',
)
self.assertEqual(util.parse_koji_event(self.topdir), 19769058)
def test_bad_path(self):
with self.assertRaises(argparse.ArgumentTypeError):
util.parse_koji_event(self.topdir)
class TestCopyAll(PungiTestCase):
def setUp(self):
super(TestCopyAll, self).setUp()
self.src = os.path.join(self.topdir, "src")
self.dst = os.path.join(self.topdir, "dst")
util.makedirs(self.src)
def test_preserve_symlink(self):
touch(os.path.join(self.src, "target"))
os.symlink("target", os.path.join(self.src, "symlink"))
util.copy_all(self.src, self.dst)
self.assertTrue(os.path.isfile(os.path.join(self.dst, "target")))
self.assertTrue(os.path.islink(os.path.join(self.dst, "symlink")))
self.assertEqual(os.readlink(os.path.join(self.dst, "symlink")), "target")
def test_copy_broken_symlink(self):
os.symlink("broken", os.path.join(self.src, "symlink"))
util.copy_all(self.src, self.dst)
self.assertTrue(os.path.islink(os.path.join(self.dst, "symlink")))
self.assertEqual(os.readlink(os.path.join(self.dst, "symlink")), "broken")
class TestMoveAll(PungiTestCase):
def setUp(self):
super(TestMoveAll, self).setUp()
self.src = os.path.join(self.topdir, "src")
self.dst = os.path.join(self.topdir, "dst")
util.makedirs(self.src)
def test_move_all(self):
touch(os.path.join(self.src, "target"))
util.move_all(self.src, self.dst)
self.assertTrue(os.path.isfile(os.path.join(self.dst, "target")))
self.assertTrue(os.path.exists(os.path.join(self.src)))
self.assertFalse(os.path.isfile(os.path.join(self.src, "target")))
def test_move_all_rm_src_dir(self):
touch(os.path.join(self.src, "target"))
util.move_all(self.src, self.dst, rm_src_dir=True)
self.assertTrue(os.path.isfile(os.path.join(self.dst, "target")))
self.assertFalse(os.path.exists(os.path.join(self.src)))
self.assertFalse(os.path.isfile(os.path.join(self.src, "target")))
@mock.patch("six.moves.urllib.request.urlretrieve")
class TestAsLocalFile(PungiTestCase):
def test_local_file(self, urlretrieve):
with util.as_local_file("/tmp/foo") as fn:
self.assertEqual(fn, "/tmp/foo")
self.assertEqual(urlretrieve.call_args_list, [])
def test_http(self, urlretrieve):
url = "http://example.com/repodata/repomd.xml"
def my_mock(url_):
self.assertEqual(url, url_)
self.filename = os.path.join(self.topdir, "my-file")
touch(self.filename)
return self.filename, {}
urlretrieve.side_effect = my_mock
with util.as_local_file(url) as fn:
self.assertEqual(fn, self.filename)
self.assertTrue(os.path.exists(self.filename))
self.assertFalse(os.path.exists(self.filename))
def test_file_url(self, urlretrieve):
with util.as_local_file("file:///tmp/foo") as fn:
self.assertEqual(fn, "/tmp/foo")
self.assertEqual(urlretrieve.call_args_list, [])