b899126b7e
The change allows for setting the parameters as described below to Lorax. Lorax, a program called during the buildInstall phase, creates the SquashFS during the buildInstall phase. The Squash filesystem is present both on the DVD and the BOOT.ISO. squashfs_only --- (str) passes --squashfs-only option. configuration_file --- (str or scm_dict) passes -c option to Lorax. The final goal of this change is to allow for optimization of the installation medium size. This pull request is related to the Fedora change proposal, which is available at this location: https://fedoraproject.org/wiki/Category:Changes/OptimizeSquashFS See the change proposal for more information about the benefits of higher compression ratio. Jira: RHELCMP-693 Signed-off-by: Bohdan Khomutskyi <bkhomuts@redhat.com>
684 lines
23 KiB
Python
684 lines
23 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
import mock
|
|
|
|
try:
|
|
import unittest2 as unittest
|
|
except ImportError:
|
|
import unittest
|
|
import shutil
|
|
import tempfile
|
|
import random
|
|
|
|
import os
|
|
import six
|
|
|
|
from pungi.wrappers import scm
|
|
from tests.helpers import touch
|
|
from kobo.shortcuts import run
|
|
|
|
|
|
class SCMBaseTest(unittest.TestCase):
|
|
def setUp(self):
|
|
self.destdir = tempfile.mkdtemp()
|
|
|
|
def tearDown(self):
|
|
shutil.rmtree(self.destdir)
|
|
|
|
def assertStructure(self, returned, expected):
|
|
# Check we returned the correct files
|
|
six.assertCountEqual(self, returned, expected)
|
|
|
|
# Each file must exist
|
|
for f in expected:
|
|
self.assertTrue(os.path.isfile(os.path.join(self.destdir, f)))
|
|
|
|
# Only expected files should exist
|
|
found = []
|
|
for root, dirs, files in os.walk(self.destdir):
|
|
for f in files:
|
|
p = os.path.relpath(os.path.join(root, f), self.destdir)
|
|
found.append(p)
|
|
six.assertCountEqual(self, expected, found)
|
|
|
|
|
|
class FileSCMTestCase(SCMBaseTest):
|
|
def setUp(self):
|
|
"""
|
|
Prepares a source structure and destination directory.
|
|
|
|
srcdir
|
|
+- in_root
|
|
+- subdir
|
|
+- first
|
|
+- second
|
|
"""
|
|
super(FileSCMTestCase, self).setUp()
|
|
self.srcdir = tempfile.mkdtemp()
|
|
touch(os.path.join(self.srcdir, "in_root"))
|
|
touch(os.path.join(self.srcdir, "subdir", "first"))
|
|
touch(os.path.join(self.srcdir, "subdir", "second"))
|
|
|
|
def tearDown(self):
|
|
super(FileSCMTestCase, self).tearDown()
|
|
shutil.rmtree(self.srcdir)
|
|
|
|
def test_get_file_by_name(self):
|
|
file = os.path.join(self.srcdir, "in_root")
|
|
retval = scm.get_file_from_scm(file, self.destdir)
|
|
self.assertStructure(retval, ["in_root"])
|
|
|
|
def test_get_file_by_dict(self):
|
|
retval = scm.get_file_from_scm(
|
|
{
|
|
"scm": "file",
|
|
"repo": None,
|
|
"file": os.path.join(self.srcdir, "subdir", "first"),
|
|
},
|
|
self.destdir,
|
|
)
|
|
self.assertStructure(retval, ["first"])
|
|
|
|
def test_get_dir_by_name(self):
|
|
retval = scm.get_dir_from_scm(os.path.join(self.srcdir, "subdir"), self.destdir)
|
|
self.assertStructure(retval, ["first", "second"])
|
|
|
|
def test_get_dir_by_dict(self):
|
|
retval = scm.get_dir_from_scm(
|
|
{"scm": "file", "repo": None, "dir": os.path.join(self.srcdir, "subdir")},
|
|
self.destdir,
|
|
)
|
|
self.assertStructure(retval, ["first", "second"])
|
|
|
|
def test_get_missing_file(self):
|
|
with self.assertRaises(RuntimeError) as ctx:
|
|
scm.get_file_from_scm(
|
|
{"scm": "file", "repo": None, "file": "this-is-really-not-here.txt"},
|
|
self.destdir,
|
|
)
|
|
|
|
self.assertIn("No files matched", str(ctx.exception))
|
|
|
|
def test_get_missing_dir(self):
|
|
with self.assertRaises(RuntimeError) as ctx:
|
|
scm.get_dir_from_scm(
|
|
{"scm": "file", "repo": None, "dir": "this-is-really-not-here"},
|
|
self.destdir,
|
|
)
|
|
|
|
self.assertIn("No directories matched", str(ctx.exception))
|
|
|
|
|
|
class GitSCMTestCase(SCMBaseTest):
|
|
def assertCalls(self, mock_run, url, branch, command=None):
|
|
command = [command] if command else []
|
|
self.assertEqual(
|
|
[call[0][0] for call in mock_run.call_args_list],
|
|
[
|
|
["git", "init"],
|
|
["git", "fetch", "--depth=1", url, branch],
|
|
["git", "checkout", "FETCH_HEAD"],
|
|
]
|
|
+ command,
|
|
)
|
|
|
|
@mock.patch("pungi.wrappers.scm.run")
|
|
def test_get_file(self, run):
|
|
def process(cmd, workdir=None, **kwargs):
|
|
touch(os.path.join(workdir, "some_file.txt"))
|
|
touch(os.path.join(workdir, "other_file.txt"))
|
|
|
|
run.side_effect = process
|
|
|
|
retval = scm.get_file_from_scm(
|
|
{
|
|
"scm": "git",
|
|
"repo": "git://example.com/git/repo.git",
|
|
"file": "some_file.txt",
|
|
},
|
|
self.destdir,
|
|
)
|
|
self.assertStructure(retval, ["some_file.txt"])
|
|
self.assertCalls(run, "git://example.com/git/repo.git", "master")
|
|
|
|
@mock.patch("pungi.wrappers.scm.run")
|
|
def test_get_file_function(self, run):
|
|
compose = mock.Mock(conf={})
|
|
|
|
def process(cmd, workdir=None, **kwargs):
|
|
touch(os.path.join(workdir, "some_file.txt"))
|
|
touch(os.path.join(workdir, "other_file.txt"))
|
|
|
|
run.side_effect = process
|
|
destination = os.path.join(self.destdir, "other_file.txt")
|
|
retval = scm.get_file(
|
|
{
|
|
"scm": "git",
|
|
"repo": "git://example.com/git/repo.git",
|
|
"file": "other_file.txt",
|
|
},
|
|
os.path.join(self.destdir, destination),
|
|
compose=compose,
|
|
)
|
|
self.assertEqual(retval, destination)
|
|
self.assertCalls(run, "git://example.com/git/repo.git", "master")
|
|
|
|
@mock.patch("pungi.wrappers.scm.run")
|
|
def test_get_file_fetch_fails(self, run):
|
|
url = "git://example.com/git/repo.git"
|
|
|
|
def process(cmd, workdir=None, **kwargs):
|
|
if "fetch" in cmd:
|
|
exc = RuntimeError()
|
|
exc.output = ""
|
|
raise exc
|
|
touch(os.path.join(workdir, "some_file.txt"))
|
|
touch(os.path.join(workdir, "other_file.txt"))
|
|
|
|
run.side_effect = process
|
|
|
|
retval = scm.get_file_from_scm(
|
|
{"scm": "git", "repo": url, "file": "some_file.txt"}, self.destdir
|
|
)
|
|
self.assertStructure(retval, ["some_file.txt"])
|
|
self.assertEqual(
|
|
[call[0][0] for call in run.call_args_list],
|
|
[
|
|
["git", "init"],
|
|
[
|
|
"git",
|
|
"fetch",
|
|
"--depth=1",
|
|
"git://example.com/git/repo.git",
|
|
"master",
|
|
],
|
|
["git", "remote", "add", "origin", url],
|
|
["git", "remote", "update", "origin"],
|
|
["git", "checkout", "master"],
|
|
],
|
|
)
|
|
|
|
@mock.patch("pungi.wrappers.scm.run")
|
|
def test_get_file_generated_by_command(self, run):
|
|
def process(cmd, workdir=None, **kwargs):
|
|
if cmd[0] == "git":
|
|
touch(os.path.join(workdir, "some_file.txt"))
|
|
return 0, ""
|
|
|
|
run.side_effect = process
|
|
|
|
retval = scm.get_file_from_scm(
|
|
{
|
|
"scm": "git",
|
|
"repo": "git://example.com/git/repo.git",
|
|
"file": "some_file.txt",
|
|
"command": "make",
|
|
},
|
|
self.destdir,
|
|
)
|
|
self.assertStructure(retval, ["some_file.txt"])
|
|
self.assertCalls(run, "git://example.com/git/repo.git", "master", "make")
|
|
|
|
@mock.patch("pungi.wrappers.scm.run")
|
|
def test_get_file_and_fail_to_generate(self, run):
|
|
def process(cmd, workdir=None, **kwargs):
|
|
if cmd[0] == "git":
|
|
touch(os.path.join(workdir, "some_file.txt"))
|
|
return 0, "output"
|
|
return 1, "output"
|
|
|
|
run.side_effect = process
|
|
|
|
with self.assertRaises(RuntimeError) as ctx:
|
|
scm.get_file_from_scm(
|
|
{
|
|
"scm": "git",
|
|
"repo": "git://example.com/git/repo.git",
|
|
"file": "some_file.txt",
|
|
"command": "make",
|
|
},
|
|
self.destdir,
|
|
)
|
|
|
|
self.assertEqual(str(ctx.exception), "'make' failed with exit code 1")
|
|
|
|
@mock.patch("pungi.wrappers.scm.run")
|
|
def test_get_dir(self, run):
|
|
def process(cmd, workdir=None, **kwargs):
|
|
touch(os.path.join(workdir, "subdir", "first"))
|
|
touch(os.path.join(workdir, "subdir", "second"))
|
|
|
|
run.side_effect = process
|
|
|
|
retval = scm.get_dir_from_scm(
|
|
{"scm": "git", "repo": "git://example.com/git/repo.git", "dir": "subdir"},
|
|
self.destdir,
|
|
)
|
|
self.assertStructure(retval, ["first", "second"])
|
|
self.assertCalls(run, "git://example.com/git/repo.git", "master")
|
|
|
|
@mock.patch("pungi.wrappers.scm.run")
|
|
def test_get_dir_and_generate(self, run):
|
|
def process(cmd, workdir=None, **kwargs):
|
|
if cmd[0] == "git":
|
|
touch(os.path.join(workdir, "subdir", "first"))
|
|
touch(os.path.join(workdir, "subdir", "second"))
|
|
return 0, ""
|
|
|
|
run.side_effect = process
|
|
|
|
retval = scm.get_dir_from_scm(
|
|
{
|
|
"scm": "git",
|
|
"repo": "git://example.com/git/repo.git",
|
|
"dir": "subdir",
|
|
"command": "make",
|
|
},
|
|
self.destdir,
|
|
)
|
|
self.assertStructure(retval, ["first", "second"])
|
|
self.assertCalls(run, "git://example.com/git/repo.git", "master", "make")
|
|
|
|
|
|
class GitSCMTestCaseReal(SCMBaseTest):
|
|
def setUp(self):
|
|
super(GitSCMTestCaseReal, self).setUp()
|
|
self.compose = mock.Mock(conf={})
|
|
self.gitRepositoryLocation = tempfile.mkdtemp()
|
|
run(
|
|
["git", "-C", self.gitRepositoryLocation, "init"],
|
|
workdir=self.gitRepositoryLocation,
|
|
)
|
|
fileOneLocation = os.path.join(self.gitRepositoryLocation, "some_file.txt")
|
|
fileTwoLocation = os.path.join(self.gitRepositoryLocation, "other_file.txt")
|
|
self.files = {
|
|
fileOneLocation: str(random.randrange(100000000000000000000)),
|
|
fileTwoLocation: str(random.randrange(100000000000000000000)),
|
|
}
|
|
for fileLocation, fileContents in self.files.items():
|
|
with open(fileLocation, "w") as fileHandle:
|
|
fileHandle.write(fileContents)
|
|
run(
|
|
[
|
|
"git",
|
|
"-C",
|
|
self.gitRepositoryLocation,
|
|
"add",
|
|
"some_file.txt",
|
|
"other_file.txt",
|
|
],
|
|
workdir=self.gitRepositoryLocation,
|
|
)
|
|
# Must set the user.name and user.email, otherwise an error may be returned.
|
|
run(
|
|
[
|
|
"git",
|
|
"-c",
|
|
"user.name=Pungi Test Engineer",
|
|
"-c",
|
|
"user.email=ptestengineer@example.com",
|
|
"-C",
|
|
self.gitRepositoryLocation,
|
|
"commit",
|
|
"-m",
|
|
"Initial commit",
|
|
],
|
|
workdir=self.gitRepositoryLocation,
|
|
)
|
|
|
|
def tearDown(self):
|
|
super(GitSCMTestCaseReal, self).tearDown()
|
|
shutil.rmtree(self.gitRepositoryLocation)
|
|
|
|
def test_get_file_function(self):
|
|
sourceFileLocation = random.choice(list(self.files.keys()))
|
|
sourceFilename = os.path.basename(sourceFileLocation)
|
|
destinationFileLocation = os.path.join(self.destdir, "other_file.txt")
|
|
destinationFileActualLocation = scm.get_file(
|
|
{
|
|
"scm": "git",
|
|
"repo": "file:///%s" % self.gitRepositoryLocation,
|
|
"file": sourceFilename,
|
|
},
|
|
os.path.join(self.destdir, destinationFileLocation),
|
|
compose=self.compose,
|
|
)
|
|
self.assertEqual(destinationFileActualLocation, destinationFileLocation)
|
|
self.assertTrue(os.path.isfile(destinationFileActualLocation))
|
|
|
|
# Comparing the contents of source to the destination file.
|
|
with open(sourceFileLocation) as sourceFileHandle:
|
|
sourceFileContent = sourceFileHandle.read()
|
|
with open(destinationFileActualLocation) as destinationFileHandle:
|
|
destinationFileContent = destinationFileHandle.read()
|
|
self.assertEqual(sourceFileContent, destinationFileContent)
|
|
|
|
def test_get_file_function_with_overwrite(self):
|
|
sourceFileLocation = random.choice(list(self.files.keys()))
|
|
sourceFilename = os.path.basename(sourceFileLocation)
|
|
destinationFileLocation = os.path.join(self.destdir, "other_file.txt")
|
|
# Writing pre-existing content to the file, that should be overwritten
|
|
preExistingContent = "This line should be overwritten."
|
|
with open(destinationFileLocation, "w") as destinationFileHandle:
|
|
destinationFileHandle.write(preExistingContent)
|
|
destinationFileActualLocation = scm.get_file(
|
|
{
|
|
"scm": "git",
|
|
"repo": "file:///%s" % self.gitRepositoryLocation,
|
|
"file": sourceFilename,
|
|
},
|
|
os.path.join(self.destdir, destinationFileLocation),
|
|
compose=self.compose,
|
|
overwrite=True,
|
|
)
|
|
self.assertEqual(destinationFileActualLocation, destinationFileLocation)
|
|
self.assertTrue(os.path.isfile(destinationFileActualLocation))
|
|
|
|
# Reading the contents of both files to compare later.
|
|
with open(sourceFileLocation) as sourceFileHandle:
|
|
sourceFileContent = sourceFileHandle.read()
|
|
with open(destinationFileActualLocation) as destinationFileHandle:
|
|
destinationFileContent = destinationFileHandle.read()
|
|
# Ensuring that the file was in fact overwritten
|
|
self.assertNotEqual(preExistingContent, destinationFileContent)
|
|
# Comparing the contents of source to the destination file.
|
|
self.assertEqual(sourceFileContent, destinationFileContent)
|
|
|
|
|
|
class RpmSCMTestCase(SCMBaseTest):
|
|
def setUp(self):
|
|
super(RpmSCMTestCase, self).setUp()
|
|
self.tmpdir = tempfile.mkdtemp()
|
|
self.exploded = set()
|
|
self.rpms = [self.tmpdir + "/whatever.rpm", self.tmpdir + "/another.rpm"]
|
|
self.numbered = [
|
|
self.tmpdir + x
|
|
for x in ["/one1.rpm", "/one2.rpm", "/two1.rpm", "/two2.rpm"]
|
|
]
|
|
for rpm in self.rpms + self.numbered:
|
|
touch(rpm)
|
|
|
|
def tearDown(self):
|
|
super(RpmSCMTestCase, self).tearDown()
|
|
shutil.rmtree(self.tmpdir)
|
|
|
|
def _explode_rpm(self, path, dest):
|
|
self.exploded.add(path)
|
|
touch(os.path.join(dest, "some-file.txt"))
|
|
touch(os.path.join(dest, "subdir", "foo.txt"))
|
|
touch(os.path.join(dest, "subdir", "bar.txt"))
|
|
|
|
def _explode_multiple(self, path, dest):
|
|
self.exploded.add(path)
|
|
cnt = len(self.exploded)
|
|
touch(os.path.join(dest, "some-file-%d.txt" % cnt))
|
|
touch(os.path.join(dest, "subdir-%d" % cnt, "foo-%d.txt" % cnt))
|
|
touch(os.path.join(dest, "common", "foo-%d.txt" % cnt))
|
|
|
|
@mock.patch("pungi.wrappers.scm.explode_rpm_package")
|
|
def test_get_file(self, explode):
|
|
explode.side_effect = self._explode_rpm
|
|
|
|
retval = scm.get_file_from_scm(
|
|
{"scm": "rpm", "repo": self.rpms[0], "file": "some-file.txt"}, self.destdir
|
|
)
|
|
|
|
self.assertStructure(retval, ["some-file.txt"])
|
|
self.assertEqual(self.exploded, set([self.rpms[0]]))
|
|
|
|
@mock.patch("pungi.wrappers.scm.explode_rpm_package")
|
|
def test_get_more_files(self, explode):
|
|
explode.side_effect = self._explode_rpm
|
|
|
|
retval = scm.get_file_from_scm(
|
|
{
|
|
"scm": "rpm",
|
|
"repo": self.rpms[0],
|
|
"file": ["some-file.txt", "subdir/foo.txt"],
|
|
},
|
|
self.destdir,
|
|
)
|
|
|
|
self.assertStructure(retval, ["some-file.txt", "foo.txt"])
|
|
self.assertEqual(self.exploded, set([self.rpms[0]]))
|
|
|
|
@mock.patch("pungi.wrappers.scm.explode_rpm_package")
|
|
def test_get_whole_dir(self, explode):
|
|
explode.side_effect = self._explode_rpm
|
|
|
|
retval = scm.get_dir_from_scm(
|
|
{"scm": "rpm", "repo": self.rpms[0], "dir": "subdir"}, self.destdir
|
|
)
|
|
|
|
self.assertStructure(retval, ["subdir/foo.txt", "subdir/bar.txt"])
|
|
self.assertEqual(self.exploded, set([self.rpms[0]]))
|
|
|
|
@mock.patch("pungi.wrappers.scm.explode_rpm_package")
|
|
def test_get_dir_contents(self, explode):
|
|
explode.side_effect = self._explode_rpm
|
|
|
|
retval = scm.get_dir_from_scm(
|
|
{"scm": "rpm", "repo": self.rpms[0], "dir": "subdir/"}, self.destdir
|
|
)
|
|
|
|
self.assertStructure(retval, ["foo.txt", "bar.txt"])
|
|
self.assertEqual(self.exploded, set([self.rpms[0]]))
|
|
|
|
@mock.patch("pungi.wrappers.scm.explode_rpm_package")
|
|
def test_get_files_from_two_rpms(self, explode):
|
|
explode.side_effect = self._explode_multiple
|
|
|
|
retval = scm.get_file_from_scm(
|
|
{
|
|
"scm": "rpm",
|
|
"repo": self.rpms,
|
|
"file": ["some-file-1.txt", "some-file-2.txt"],
|
|
},
|
|
self.destdir,
|
|
)
|
|
|
|
self.assertStructure(retval, ["some-file-1.txt", "some-file-2.txt"])
|
|
six.assertCountEqual(self, self.exploded, self.rpms)
|
|
|
|
@mock.patch("pungi.wrappers.scm.explode_rpm_package")
|
|
def test_get_files_from_glob_rpms(self, explode):
|
|
explode.side_effect = self._explode_multiple
|
|
|
|
retval = scm.get_file_from_scm(
|
|
{
|
|
"scm": "rpm",
|
|
"file": "some-file-*.txt",
|
|
"repo": [self.tmpdir + "/one*.rpm", self.tmpdir + "/two*.rpm"],
|
|
},
|
|
self.destdir,
|
|
)
|
|
|
|
self.assertStructure(
|
|
retval,
|
|
[
|
|
"some-file-1.txt",
|
|
"some-file-2.txt",
|
|
"some-file-3.txt",
|
|
"some-file-4.txt",
|
|
],
|
|
)
|
|
six.assertCountEqual(self, self.exploded, self.numbered)
|
|
|
|
@mock.patch("pungi.wrappers.scm.explode_rpm_package")
|
|
def test_get_dir_from_two_rpms(self, explode):
|
|
explode.side_effect = self._explode_multiple
|
|
|
|
retval = scm.get_dir_from_scm(
|
|
{"scm": "rpm", "repo": self.rpms, "dir": "common"}, self.destdir
|
|
)
|
|
|
|
self.assertStructure(retval, ["common/foo-1.txt", "common/foo-2.txt"])
|
|
six.assertCountEqual(self, self.exploded, self.rpms)
|
|
|
|
@mock.patch("pungi.wrappers.scm.explode_rpm_package")
|
|
def test_get_dir_from_glob_rpms(self, explode):
|
|
explode.side_effect = self._explode_multiple
|
|
|
|
retval = scm.get_dir_from_scm(
|
|
{
|
|
"scm": "rpm",
|
|
"dir": "common/",
|
|
"repo": [self.tmpdir + "/one*.rpm", self.tmpdir + "/two*.rpm"],
|
|
},
|
|
self.destdir,
|
|
)
|
|
|
|
self.assertStructure(
|
|
retval, ["foo-1.txt", "foo-2.txt", "foo-3.txt", "foo-4.txt"]
|
|
)
|
|
six.assertCountEqual(self, self.exploded, self.numbered)
|
|
|
|
|
|
class CvsSCMTestCase(SCMBaseTest):
|
|
@mock.patch("pungi.wrappers.scm.run")
|
|
def test_get_file(self, run):
|
|
commands = []
|
|
|
|
def process(cmd, workdir=None, **kwargs):
|
|
fname = cmd[-1]
|
|
touch(os.path.join(workdir, fname))
|
|
commands.append(" ".join(cmd))
|
|
|
|
run.side_effect = process
|
|
|
|
retval = scm.get_file_from_scm(
|
|
{"scm": "cvs", "repo": "http://example.com/cvs", "file": "some_file.txt"},
|
|
self.destdir,
|
|
)
|
|
self.assertStructure(retval, ["some_file.txt"])
|
|
self.assertEqual(
|
|
commands,
|
|
["/usr/bin/cvs -q -d http://example.com/cvs export -r HEAD some_file.txt"],
|
|
)
|
|
|
|
@mock.patch("pungi.wrappers.scm.run")
|
|
def test_get_dir(self, run):
|
|
commands = []
|
|
|
|
def process(cmd, workdir=None, **kwargs):
|
|
fname = cmd[-1]
|
|
touch(os.path.join(workdir, fname, "first"))
|
|
touch(os.path.join(workdir, fname, "second"))
|
|
commands.append(" ".join(cmd))
|
|
|
|
run.side_effect = process
|
|
|
|
retval = scm.get_dir_from_scm(
|
|
{"scm": "cvs", "repo": "http://example.com/cvs", "dir": "subdir"},
|
|
self.destdir,
|
|
)
|
|
self.assertStructure(retval, ["first", "second"])
|
|
|
|
self.assertEqual(
|
|
commands,
|
|
["/usr/bin/cvs -q -d http://example.com/cvs export -r HEAD subdir"],
|
|
)
|
|
|
|
|
|
@mock.patch("pungi.wrappers.scm.urlretrieve")
|
|
@mock.patch("pungi.wrappers.scm.KojiWrapper")
|
|
class KojiSCMTestCase(SCMBaseTest):
|
|
def test_without_koji_profile(self, KW, dl):
|
|
compose = mock.Mock(conf={})
|
|
|
|
with self.assertRaises(RuntimeError) as ctx:
|
|
scm.get_file_from_scm(
|
|
{"scm": "koji", "repo": "my-build-1.0-2", "file": "*"},
|
|
self.destdir,
|
|
compose=compose,
|
|
)
|
|
self.assertIn("Koji profile must be configured", str(ctx.exception))
|
|
self.assertEqual(KW.mock_calls, [])
|
|
self.assertEqual(dl.mock_calls, [])
|
|
|
|
def test_doesnt_get_dirs(self, KW, dl):
|
|
compose = mock.Mock(conf={"koji_profile": "koji"})
|
|
|
|
with self.assertRaises(RuntimeError) as ctx:
|
|
scm.get_dir_from_scm(
|
|
{"scm": "koji", "repo": "my-build-1.0-2", "dir": "*"},
|
|
self.destdir,
|
|
compose=compose,
|
|
)
|
|
self.assertIn("Only files can be exported", str(ctx.exception))
|
|
self.assertEqual(KW.mock_calls, [mock.call("koji")])
|
|
self.assertEqual(dl.mock_calls, [])
|
|
|
|
def _setup_koji_wrapper(self, KW, build_id, files):
|
|
KW.return_value.koji_module.config.topdir = "/mnt/koji"
|
|
KW.return_value.koji_module.config.topurl = "http://koji.local/koji"
|
|
KW.return_value.koji_module.pathinfo.typedir.return_value = "/mnt/koji/images"
|
|
buildinfo = {"build_id": build_id}
|
|
KW.return_value.koji_proxy.getBuild.return_value = buildinfo
|
|
KW.return_value.koji_proxy.listArchives.return_value = [
|
|
{"filename": f, "btype": "image"} for f in files
|
|
]
|
|
KW.return_value.koji_proxy.listTagged.return_value = [buildinfo]
|
|
|
|
def test_get_from_build(self, KW, dl):
|
|
compose = mock.Mock(conf={"koji_profile": "koji"})
|
|
|
|
def download(src, dst):
|
|
touch(dst)
|
|
|
|
dl.side_effect = download
|
|
|
|
self._setup_koji_wrapper(KW, 123, ["abc.out", "abc.tar"])
|
|
|
|
retval = scm.get_file_from_scm(
|
|
{"scm": "koji", "repo": "my-build-1.0-2", "file": "*.tar"},
|
|
self.destdir,
|
|
compose=compose,
|
|
)
|
|
self.assertStructure(retval, ["abc.tar"])
|
|
self.assertEqual(
|
|
KW.mock_calls,
|
|
[
|
|
mock.call("koji"),
|
|
mock.call().koji_proxy.getBuild("my-build-1.0-2"),
|
|
mock.call().koji_proxy.listArchives(123),
|
|
mock.call().koji_module.pathinfo.typedir({"build_id": 123}, "image"),
|
|
],
|
|
)
|
|
self.assertEqual(
|
|
dl.call_args_list,
|
|
[mock.call("http://koji.local/koji/images/abc.tar", mock.ANY)],
|
|
)
|
|
|
|
def test_get_from_latest_build(self, KW, dl):
|
|
compose = mock.Mock(conf={"koji_profile": "koji"})
|
|
|
|
def download(src, dst):
|
|
touch(dst)
|
|
|
|
dl.side_effect = download
|
|
|
|
self._setup_koji_wrapper(KW, 123, ["abc.out", "abc.tar"])
|
|
|
|
retval = scm.get_file_from_scm(
|
|
{"scm": "koji", "repo": "my-build", "file": "*.tar", "branch": "images"},
|
|
self.destdir,
|
|
compose=compose,
|
|
)
|
|
self.assertStructure(retval, ["abc.tar"])
|
|
self.assertEqual(
|
|
KW.mock_calls,
|
|
[
|
|
mock.call("koji"),
|
|
mock.call().koji_proxy.listTagged(
|
|
"images", package="my-build", inherit=True, latest=True
|
|
),
|
|
mock.call().koji_proxy.listArchives(123),
|
|
mock.call().koji_module.pathinfo.typedir({"build_id": 123}, "image"),
|
|
],
|
|
)
|
|
self.assertEqual(
|
|
dl.call_args_list,
|
|
[mock.call("http://koji.local/koji/images/abc.tar", mock.ANY)],
|
|
)
|