pungi/tests/test_scm.py
Haibo Lin 3eddcfccd8 Fix flake8 complaints - F401
F401 'dnf' imported but unused
F401 'imp' imported but unused
F401 'os' imported but unused
F401 'subprocess' imported but unused
F401 'sys' imported but unused
F401 'yum' imported but unused

JIRA: COMPOSE-4108
Signed-off-by: Haibo Lin <hlin@redhat.com>
2020-02-07 11:48:31 +08:00

549 lines
18 KiB
Python

# -*- coding: utf-8 -*-
import mock
try:
import unittest2 as unittest
except ImportError:
import unittest
import shutil
import tempfile
import os
import six
from pungi.wrappers import scm
from tests.helpers import touch
class SCMBaseTest(unittest.TestCase):
def setUp(self):
self.destdir = tempfile.mkdtemp()
def tearDown(self):
shutil.rmtree(self.destdir)
def assertStructure(self, returned, expected):
# Check we returned the correct files
six.assertCountEqual(self, returned, expected)
# Each file must exist
for f in expected:
self.assertTrue(os.path.isfile(os.path.join(self.destdir, f)))
# Only expected files should exist
found = []
for root, dirs, files in os.walk(self.destdir):
for f in files:
p = os.path.relpath(os.path.join(root, f), self.destdir)
found.append(p)
six.assertCountEqual(self, expected, found)
class FileSCMTestCase(SCMBaseTest):
def setUp(self):
"""
Prepares a source structure and destination directory.
srcdir
+- in_root
+- subdir
+- first
+- second
"""
super(FileSCMTestCase, self).setUp()
self.srcdir = tempfile.mkdtemp()
touch(os.path.join(self.srcdir, "in_root"))
touch(os.path.join(self.srcdir, "subdir", "first"))
touch(os.path.join(self.srcdir, "subdir", "second"))
def tearDown(self):
super(FileSCMTestCase, self).tearDown()
shutil.rmtree(self.srcdir)
def test_get_file_by_name(self):
file = os.path.join(self.srcdir, "in_root")
retval = scm.get_file_from_scm(file, self.destdir)
self.assertStructure(retval, ["in_root"])
def test_get_file_by_dict(self):
retval = scm.get_file_from_scm(
{
"scm": "file",
"repo": None,
"file": os.path.join(self.srcdir, "subdir", "first"),
},
self.destdir,
)
self.assertStructure(retval, ["first"])
def test_get_dir_by_name(self):
retval = scm.get_dir_from_scm(os.path.join(self.srcdir, "subdir"), self.destdir)
self.assertStructure(retval, ["first", "second"])
def test_get_dir_by_dict(self):
retval = scm.get_dir_from_scm(
{"scm": "file", "repo": None, "dir": os.path.join(self.srcdir, "subdir")},
self.destdir,
)
self.assertStructure(retval, ["first", "second"])
def test_get_missing_file(self):
with self.assertRaises(RuntimeError) as ctx:
scm.get_file_from_scm(
{"scm": "file", "repo": None, "file": "this-is-really-not-here.txt"},
self.destdir,
)
self.assertIn("No files matched", str(ctx.exception))
def test_get_missing_dir(self):
with self.assertRaises(RuntimeError) as ctx:
scm.get_dir_from_scm(
{"scm": "file", "repo": None, "dir": "this-is-really-not-here"},
self.destdir,
)
self.assertIn("No directories matched", str(ctx.exception))
class GitSCMTestCase(SCMBaseTest):
def assertCalls(self, mock_run, url, branch, command=None):
command = [command] if command else []
self.assertEqual(
[call[0][0] for call in mock_run.call_args_list],
[
["git", "init"],
["git", "fetch", "--depth=1", url, branch],
["git", "checkout", "FETCH_HEAD"],
]
+ command,
)
@mock.patch("pungi.wrappers.scm.run")
def test_get_file(self, run):
def process(cmd, workdir=None, **kwargs):
touch(os.path.join(workdir, "some_file.txt"))
touch(os.path.join(workdir, "other_file.txt"))
run.side_effect = process
retval = scm.get_file_from_scm(
{
"scm": "git",
"repo": "git://example.com/git/repo.git",
"file": "some_file.txt",
},
self.destdir,
)
self.assertStructure(retval, ["some_file.txt"])
self.assertCalls(run, "git://example.com/git/repo.git", "master")
@mock.patch("pungi.wrappers.scm.run")
def test_get_file_fetch_fails(self, run):
url = "git://example.com/git/repo.git"
def process(cmd, workdir=None, **kwargs):
if "fetch" in cmd:
exc = RuntimeError()
exc.output = ""
raise exc
touch(os.path.join(workdir, "some_file.txt"))
touch(os.path.join(workdir, "other_file.txt"))
run.side_effect = process
retval = scm.get_file_from_scm(
{"scm": "git", "repo": url, "file": "some_file.txt"}, self.destdir
)
self.assertStructure(retval, ["some_file.txt"])
self.assertEqual(
[call[0][0] for call in run.call_args_list],
[
["git", "init"],
["git", "fetch", "--depth=1", url, "master"],
["git", "remote", "add", "origin", url],
["git", "remote", "update", "origin"],
["git", "checkout", "master"],
],
)
@mock.patch("pungi.wrappers.scm.run")
def test_get_file_generated_by_command(self, run):
def process(cmd, workdir=None, **kwargs):
if cmd[0] == "git":
touch(os.path.join(workdir, "some_file.txt"))
return 0, ""
run.side_effect = process
retval = scm.get_file_from_scm(
{
"scm": "git",
"repo": "git://example.com/git/repo.git",
"file": "some_file.txt",
"command": "make",
},
self.destdir,
)
self.assertStructure(retval, ["some_file.txt"])
self.assertCalls(run, "git://example.com/git/repo.git", "master", "make")
@mock.patch("pungi.wrappers.scm.run")
def test_get_file_and_fail_to_generate(self, run):
def process(cmd, workdir=None, **kwargs):
if cmd[0] == "git":
touch(os.path.join(workdir, "some_file.txt"))
return 0, "output"
return 1, "output"
run.side_effect = process
with self.assertRaises(RuntimeError) as ctx:
scm.get_file_from_scm(
{
"scm": "git",
"repo": "git://example.com/git/repo.git",
"file": "some_file.txt",
"command": "make",
},
self.destdir,
)
self.assertEqual(str(ctx.exception), "'make' failed with exit code 1")
@mock.patch("pungi.wrappers.scm.run")
def test_get_dir(self, run):
def process(cmd, workdir=None, **kwargs):
touch(os.path.join(workdir, "subdir", "first"))
touch(os.path.join(workdir, "subdir", "second"))
run.side_effect = process
retval = scm.get_dir_from_scm(
{"scm": "git", "repo": "git://example.com/git/repo.git", "dir": "subdir"},
self.destdir,
)
self.assertStructure(retval, ["first", "second"])
self.assertCalls(run, "git://example.com/git/repo.git", "master")
@mock.patch("pungi.wrappers.scm.run")
def test_get_dir_and_generate(self, run):
def process(cmd, workdir=None, **kwargs):
if cmd[0] == "git":
touch(os.path.join(workdir, "subdir", "first"))
touch(os.path.join(workdir, "subdir", "second"))
return 0, ""
run.side_effect = process
retval = scm.get_dir_from_scm(
{
"scm": "git",
"repo": "git://example.com/git/repo.git",
"dir": "subdir",
"command": "make",
},
self.destdir,
)
self.assertStructure(retval, ["first", "second"])
self.assertCalls(run, "git://example.com/git/repo.git", "master", "make")
class RpmSCMTestCase(SCMBaseTest):
def setUp(self):
super(RpmSCMTestCase, self).setUp()
self.tmpdir = tempfile.mkdtemp()
self.exploded = set()
self.rpms = [self.tmpdir + "/whatever.rpm", self.tmpdir + "/another.rpm"]
self.numbered = [
self.tmpdir + x
for x in ["/one1.rpm", "/one2.rpm", "/two1.rpm", "/two2.rpm"]
]
for rpm in self.rpms + self.numbered:
touch(rpm)
def tearDown(self):
super(RpmSCMTestCase, self).tearDown()
shutil.rmtree(self.tmpdir)
def _explode_rpm(self, path, dest):
self.exploded.add(path)
touch(os.path.join(dest, "some-file.txt"))
touch(os.path.join(dest, "subdir", "foo.txt"))
touch(os.path.join(dest, "subdir", "bar.txt"))
def _explode_multiple(self, path, dest):
self.exploded.add(path)
cnt = len(self.exploded)
touch(os.path.join(dest, "some-file-%d.txt" % cnt))
touch(os.path.join(dest, "subdir-%d" % cnt, "foo-%d.txt" % cnt))
touch(os.path.join(dest, "common", "foo-%d.txt" % cnt))
@mock.patch("pungi.wrappers.scm.explode_rpm_package")
def test_get_file(self, explode):
explode.side_effect = self._explode_rpm
retval = scm.get_file_from_scm(
{"scm": "rpm", "repo": self.rpms[0], "file": "some-file.txt"}, self.destdir
)
self.assertStructure(retval, ["some-file.txt"])
self.assertEqual(self.exploded, set([self.rpms[0]]))
@mock.patch("pungi.wrappers.scm.explode_rpm_package")
def test_get_more_files(self, explode):
explode.side_effect = self._explode_rpm
retval = scm.get_file_from_scm(
{
"scm": "rpm",
"repo": self.rpms[0],
"file": ["some-file.txt", "subdir/foo.txt"],
},
self.destdir,
)
self.assertStructure(retval, ["some-file.txt", "foo.txt"])
self.assertEqual(self.exploded, set([self.rpms[0]]))
@mock.patch("pungi.wrappers.scm.explode_rpm_package")
def test_get_whole_dir(self, explode):
explode.side_effect = self._explode_rpm
retval = scm.get_dir_from_scm(
{"scm": "rpm", "repo": self.rpms[0], "dir": "subdir"}, self.destdir
)
self.assertStructure(retval, ["subdir/foo.txt", "subdir/bar.txt"])
self.assertEqual(self.exploded, set([self.rpms[0]]))
@mock.patch("pungi.wrappers.scm.explode_rpm_package")
def test_get_dir_contents(self, explode):
explode.side_effect = self._explode_rpm
retval = scm.get_dir_from_scm(
{"scm": "rpm", "repo": self.rpms[0], "dir": "subdir/"}, self.destdir
)
self.assertStructure(retval, ["foo.txt", "bar.txt"])
self.assertEqual(self.exploded, set([self.rpms[0]]))
@mock.patch("pungi.wrappers.scm.explode_rpm_package")
def test_get_files_from_two_rpms(self, explode):
explode.side_effect = self._explode_multiple
retval = scm.get_file_from_scm(
{
"scm": "rpm",
"repo": self.rpms,
"file": ["some-file-1.txt", "some-file-2.txt"],
},
self.destdir,
)
self.assertStructure(retval, ["some-file-1.txt", "some-file-2.txt"])
six.assertCountEqual(self, self.exploded, self.rpms)
@mock.patch("pungi.wrappers.scm.explode_rpm_package")
def test_get_files_from_glob_rpms(self, explode):
explode.side_effect = self._explode_multiple
retval = scm.get_file_from_scm(
{
"scm": "rpm",
"file": "some-file-*.txt",
"repo": [self.tmpdir + "/one*.rpm", self.tmpdir + "/two*.rpm"],
},
self.destdir,
)
self.assertStructure(
retval,
[
"some-file-1.txt",
"some-file-2.txt",
"some-file-3.txt",
"some-file-4.txt",
],
)
six.assertCountEqual(self, self.exploded, self.numbered)
@mock.patch("pungi.wrappers.scm.explode_rpm_package")
def test_get_dir_from_two_rpms(self, explode):
explode.side_effect = self._explode_multiple
retval = scm.get_dir_from_scm(
{"scm": "rpm", "repo": self.rpms, "dir": "common"}, self.destdir
)
self.assertStructure(retval, ["common/foo-1.txt", "common/foo-2.txt"])
six.assertCountEqual(self, self.exploded, self.rpms)
@mock.patch("pungi.wrappers.scm.explode_rpm_package")
def test_get_dir_from_glob_rpms(self, explode):
explode.side_effect = self._explode_multiple
retval = scm.get_dir_from_scm(
{
"scm": "rpm",
"dir": "common/",
"repo": [self.tmpdir + "/one*.rpm", self.tmpdir + "/two*.rpm"],
},
self.destdir,
)
self.assertStructure(
retval, ["foo-1.txt", "foo-2.txt", "foo-3.txt", "foo-4.txt"]
)
six.assertCountEqual(self, self.exploded, self.numbered)
class CvsSCMTestCase(SCMBaseTest):
@mock.patch("pungi.wrappers.scm.run")
def test_get_file(self, run):
commands = []
def process(cmd, workdir=None, **kwargs):
fname = cmd[-1]
touch(os.path.join(workdir, fname))
commands.append(" ".join(cmd))
run.side_effect = process
retval = scm.get_file_from_scm(
{"scm": "cvs", "repo": "http://example.com/cvs", "file": "some_file.txt"},
self.destdir,
)
self.assertStructure(retval, ["some_file.txt"])
self.assertEqual(
commands,
["/usr/bin/cvs -q -d http://example.com/cvs export -r HEAD some_file.txt"],
)
@mock.patch("pungi.wrappers.scm.run")
def test_get_dir(self, run):
commands = []
def process(cmd, workdir=None, **kwargs):
fname = cmd[-1]
touch(os.path.join(workdir, fname, "first"))
touch(os.path.join(workdir, fname, "second"))
commands.append(" ".join(cmd))
run.side_effect = process
retval = scm.get_dir_from_scm(
{"scm": "cvs", "repo": "http://example.com/cvs", "dir": "subdir"},
self.destdir,
)
self.assertStructure(retval, ["first", "second"])
self.assertEqual(
commands,
["/usr/bin/cvs -q -d http://example.com/cvs export -r HEAD subdir"],
)
@mock.patch("pungi.wrappers.scm.urlretrieve")
@mock.patch("pungi.wrappers.scm.KojiWrapper")
class KojiSCMTestCase(SCMBaseTest):
def test_without_koji_profile(self, KW, dl):
compose = mock.Mock(conf={})
with self.assertRaises(RuntimeError) as ctx:
scm.get_file_from_scm(
{"scm": "koji", "repo": "my-build-1.0-2", "file": "*"},
self.destdir,
compose=compose,
)
self.assertIn("Koji profile must be configured", str(ctx.exception))
self.assertEqual(KW.mock_calls, [])
self.assertEqual(dl.mock_calls, [])
def test_doesnt_get_dirs(self, KW, dl):
compose = mock.Mock(conf={"koji_profile": "koji"})
with self.assertRaises(RuntimeError) as ctx:
scm.get_dir_from_scm(
{"scm": "koji", "repo": "my-build-1.0-2", "dir": "*"},
self.destdir,
compose=compose,
)
self.assertIn("Only files can be exported", str(ctx.exception))
self.assertEqual(KW.mock_calls, [mock.call("koji")])
self.assertEqual(dl.mock_calls, [])
def _setup_koji_wrapper(self, KW, build_id, files):
KW.return_value.koji_module.config.topdir = "/mnt/koji"
KW.return_value.koji_module.config.topurl = "http://koji.local/koji"
KW.return_value.koji_module.pathinfo.typedir.return_value = "/mnt/koji/images"
buildinfo = {"build_id": build_id}
KW.return_value.koji_proxy.getBuild.return_value = buildinfo
KW.return_value.koji_proxy.listArchives.return_value = [
{"filename": f, "btype": "image"} for f in files
]
KW.return_value.koji_proxy.listTagged.return_value = [buildinfo]
def test_get_from_build(self, KW, dl):
compose = mock.Mock(conf={"koji_profile": "koji"})
def download(src, dst):
touch(dst)
dl.side_effect = download
self._setup_koji_wrapper(KW, 123, ["abc.out", "abc.tar"])
retval = scm.get_file_from_scm(
{"scm": "koji", "repo": "my-build-1.0-2", "file": "*.tar"},
self.destdir,
compose=compose,
)
self.assertStructure(retval, ["abc.tar"])
self.assertEqual(
KW.mock_calls,
[
mock.call("koji"),
mock.call().koji_proxy.getBuild("my-build-1.0-2"),
mock.call().koji_proxy.listArchives(123),
mock.call().koji_module.pathinfo.typedir({"build_id": 123}, "image"),
],
)
self.assertEqual(
dl.call_args_list,
[mock.call("http://koji.local/koji/images/abc.tar", mock.ANY)],
)
def test_get_from_latest_build(self, KW, dl):
compose = mock.Mock(conf={"koji_profile": "koji"})
def download(src, dst):
touch(dst)
dl.side_effect = download
self._setup_koji_wrapper(KW, 123, ["abc.out", "abc.tar"])
retval = scm.get_file_from_scm(
{"scm": "koji", "repo": "my-build", "file": "*.tar", "branch": "images"},
self.destdir,
compose=compose,
)
self.assertStructure(retval, ["abc.tar"])
self.assertEqual(
KW.mock_calls,
[
mock.call("koji"),
mock.call().koji_proxy.listTagged(
"images", package="my-build", latest=True
),
mock.call().koji_proxy.listArchives(123),
mock.call().koji_module.pathinfo.typedir({"build_id": 123}, "image"),
],
)
self.assertEqual(
dl.call_args_list,
[mock.call("http://koji.local/koji/images/abc.tar", mock.ANY)],
)