# -*- coding: utf-8 -*- from unittest import mock import os import random import shutil import tempfile import unittest import http.server import threading from parameterized import parameterized from pungi.wrappers import scm from tests.helpers import touch, GIT_WITH_CREDS from kobo.shortcuts import run class SCMBaseTest(unittest.TestCase): def setUp(self): self.destdir = tempfile.mkdtemp() def tearDown(self): shutil.rmtree(self.destdir) def assertStructure(self, returned, expected): # Check we returned the correct files self.assertCountEqual(returned, expected) # Each file must exist for f in expected: self.assertTrue(os.path.isfile(os.path.join(self.destdir, f))) # Only expected files should exist found = [] for root, dirs, files in os.walk(self.destdir): for f in files: p = os.path.relpath(os.path.join(root, f), self.destdir) found.append(p) self.assertCountEqual(expected, found) class FileSCMTestCase(SCMBaseTest): def setUp(self): """ Prepares a source structure and destination directory. srcdir +- in_root +- subdir +- first +- second """ super(FileSCMTestCase, self).setUp() self.srcdir = tempfile.mkdtemp() touch(os.path.join(self.srcdir, "in_root")) touch(os.path.join(self.srcdir, "subdir", "first")) touch(os.path.join(self.srcdir, "subdir", "second")) def tearDown(self): super(FileSCMTestCase, self).tearDown() shutil.rmtree(self.srcdir) def test_get_file_by_name(self): file = os.path.join(self.srcdir, "in_root") retval = scm.get_file_from_scm(file, self.destdir) self.assertStructure(retval, ["in_root"]) def test_get_file_by_dict(self): retval = scm.get_file_from_scm( { "scm": "file", "repo": None, "file": os.path.join(self.srcdir, "subdir", "first"), }, self.destdir, ) self.assertStructure(retval, ["first"]) def test_get_dir_by_name(self): retval = scm.get_dir_from_scm(os.path.join(self.srcdir, "subdir"), self.destdir) self.assertStructure(retval, ["first", "second"]) def test_get_dir_by_dict(self): retval = scm.get_dir_from_scm( {"scm": "file", "repo": None, "dir": os.path.join(self.srcdir, "subdir")}, self.destdir, ) self.assertStructure(retval, ["first", "second"]) def test_get_missing_file(self): with self.assertRaises(RuntimeError) as ctx: scm.get_file_from_scm( {"scm": "file", "repo": None, "file": "this-is-really-not-here.txt"}, self.destdir, ) self.assertIn("No files matched", str(ctx.exception)) def test_get_missing_dir(self): with self.assertRaises(RuntimeError) as ctx: scm.get_dir_from_scm( {"scm": "file", "repo": None, "dir": "this-is-really-not-here"}, self.destdir, ) self.assertIn("No directories matched", str(ctx.exception)) CREDENTIALS_CONFIG = {"credential_helper": "!ch"} class GitSCMTestCase(SCMBaseTest): def tearDown(self): shutil.rmtree("/tmp/pungi-temp-git-repos-%s" % os.getpid()) super(GitSCMTestCase, self).tearDown() def assertCalls(self, mock_run, url, branch, command=None, with_creds=False): git = GIT_WITH_CREDS if with_creds else ["git"] command = [command] if command else [] self.assertEqual( [call[0][0] for call in mock_run.call_args_list], [ ["git", "init"], git + ["fetch", "--depth=1", url, branch], ["git", "checkout", "FETCH_HEAD"], ] + command, ) @parameterized.expand([("without_creds", {}), ("with_creds", CREDENTIALS_CONFIG)]) def test_get_file(self, _name, config): def process(cmd, workdir=None, **kwargs): touch(os.path.join(workdir, "some_file.txt")) touch(os.path.join(workdir, "other_file.txt")) with mock.patch("pungi.wrappers.scm.run") as run: run.side_effect = process retval = scm.get_file_from_scm( { "scm": "git", "repo": "git://example.com/git/repo.git", "file": "some_file.txt", "options": config, }, self.destdir, ) self.assertStructure(retval, ["some_file.txt"]) self.assertCalls( run, "git://example.com/git/repo.git", "master", with_creds=bool(config) ) @mock.patch("pungi.wrappers.scm.run") def test_get_file_function(self, run): compose = mock.Mock(conf={}) def process(cmd, workdir=None, **kwargs): touch(os.path.join(workdir, "some_file.txt")) touch(os.path.join(workdir, "other_file.txt")) run.side_effect = process destination = os.path.join(self.destdir, "other_file.txt") retval = scm.get_file( { "scm": "git", "repo": "git://example.com/git/repo.git", "file": "other_file.txt", }, os.path.join(self.destdir, destination), compose=compose, ) self.assertEqual(retval, destination) self.assertCalls(run, "git://example.com/git/repo.git", "master") @parameterized.expand([("without_creds", {}), ("with_creds", CREDENTIALS_CONFIG)]) def test_get_file_fetch_fails(self, _name, config): url = "git://example.com/git/repo.git" git = GIT_WITH_CREDS if config else ["git"] def process(cmd, workdir=None, **kwargs): if "fetch" in cmd: exc = RuntimeError() exc.output = "" raise exc touch(os.path.join(workdir, "some_file.txt")) touch(os.path.join(workdir, "other_file.txt")) with mock.patch("pungi.wrappers.scm.run") as run: run.side_effect = process retval = scm.get_file_from_scm( {"scm": "git", "repo": url, "file": "some_file.txt", "options": config}, self.destdir, ) self.assertStructure(retval, ["some_file.txt"]) self.assertEqual( [call[0][0] for call in run.call_args_list], [ ["git", "init"], git + [ "fetch", "--depth=1", "git://example.com/git/repo.git", "master", ], ["git", "init"], ["git", "remote", "add", "origin", url], git + ["remote", "update", "origin"], ["git", "checkout", "master"], ], ) @mock.patch("pungi.wrappers.scm.run") def test_get_file_generated_by_command(self, run): def process(cmd, workdir=None, **kwargs): if cmd[0] == "git": touch(os.path.join(workdir, "some_file.txt")) return 0, "" run.side_effect = process retval = scm.get_file_from_scm( { "scm": "git", "repo": "git://example.com/git/repo.git", "file": "some_file.txt", "command": "make", }, self.destdir, ) self.assertStructure(retval, ["some_file.txt"]) self.assertCalls(run, "git://example.com/git/repo.git", "master", "make") @mock.patch("pungi.wrappers.scm.run") def test_get_file_and_fail_to_generate(self, run): def process(cmd, workdir=None, **kwargs): if cmd[0] == "git": touch(os.path.join(workdir, "some_file.txt")) return 0, "output" return 1, "output" run.side_effect = process with self.assertRaises(RuntimeError) as ctx: scm.get_file_from_scm( { "scm": "git", "repo": "git://example.com/git/repo.git", "file": "some_file.txt", "command": "make", }, self.destdir, ) self.assertEqual(str(ctx.exception), "'make' failed with exit code 1") @parameterized.expand([("without_creds", {}), ("with_creds", CREDENTIALS_CONFIG)]) def test_get_dir(self, _name, config): def process(cmd, workdir=None, **kwargs): touch(os.path.join(workdir, "subdir", "first")) touch(os.path.join(workdir, "subdir", "second")) with mock.patch("pungi.wrappers.scm.run") as run: run.side_effect = process retval = scm.get_dir_from_scm( { "scm": "git", "repo": "git://example.com/git/repo.git", "dir": "subdir", "options": config, }, self.destdir, ) self.assertStructure(retval, ["first", "second"]) self.assertCalls( run, "git://example.com/git/repo.git", "master", with_creds=bool(config) ) @mock.patch("pungi.wrappers.scm.run") def test_get_dir_and_generate(self, run): def process(cmd, workdir=None, **kwargs): if cmd[0] == "git": touch(os.path.join(workdir, "subdir", "first")) touch(os.path.join(workdir, "subdir", "second")) return 0, "" run.side_effect = process retval = scm.get_dir_from_scm( { "scm": "git", "repo": "git://example.com/git/repo.git", "dir": "subdir", "command": "make", }, self.destdir, ) self.assertStructure(retval, ["first", "second"]) self.assertCalls(run, "git://example.com/git/repo.git", "master", "make") class GitSCMTestCaseRealBase(SCMBaseTest): def setUp(self): super(GitSCMTestCaseRealBase, self).setUp() self.compose = mock.Mock(conf={}) self.gitRepositoryLocation = tempfile.mkdtemp() git_dir = os.path.join(self.gitRepositoryLocation, ".git") run( [ "git", "--git-dir=%s" % git_dir, "--work-tree=%s" % self.gitRepositoryLocation, "-c", "init.defaultBranch=master", "init", ], workdir=self.gitRepositoryLocation, ) fileOneLocation = os.path.join(self.gitRepositoryLocation, "some_file.txt") fileTwoLocation = os.path.join(self.gitRepositoryLocation, "other_file.txt") self.files = { fileOneLocation: str(random.randrange(100000000000000000000)), fileTwoLocation: str(random.randrange(100000000000000000000)), } for fileLocation, fileContents in self.files.items(): with open(fileLocation, "w") as fileHandle: fileHandle.write(fileContents) run( [ "git", "--git-dir=%s" % git_dir, "--work-tree=%s" % self.gitRepositoryLocation, "add", "some_file.txt", "other_file.txt", ], workdir=self.gitRepositoryLocation, ) # Must set the user.name and user.email, otherwise an error may be returned. run( [ "git", "-c", "user.name=Pungi Test Engineer", "-c", "user.email=ptestengineer@example.com", "--git-dir=%s" % git_dir, "--work-tree=%s" % self.gitRepositoryLocation, "commit", "-m", "Initial commit", ], workdir=self.gitRepositoryLocation, ) def tearDown(self): super(GitSCMTestCaseRealBase, self).tearDown() shutil.rmtree(self.gitRepositoryLocation) class GitSCMTestCaseReal(GitSCMTestCaseRealBase): def test_get_file_function(self): sourceFileLocation = random.choice(list(self.files.keys())) sourceFilename = os.path.basename(sourceFileLocation) destinationFileLocation = os.path.join(self.destdir, "other_file.txt") destinationFileActualLocation = scm.get_file( { "scm": "git", "repo": "file:///%s" % self.gitRepositoryLocation, "file": sourceFilename, }, os.path.join(self.destdir, destinationFileLocation), compose=self.compose, ) self.assertEqual(destinationFileActualLocation, destinationFileLocation) self.assertTrue(os.path.isfile(destinationFileActualLocation)) # Comparing the contents of source to the destination file. with open(sourceFileLocation) as sourceFileHandle: sourceFileContent = sourceFileHandle.read() with open(destinationFileActualLocation) as destinationFileHandle: destinationFileContent = destinationFileHandle.read() self.assertEqual(sourceFileContent, destinationFileContent) def test_get_file_function_with_overwrite(self): sourceFileLocation = random.choice(list(self.files.keys())) sourceFilename = os.path.basename(sourceFileLocation) destinationFileLocation = os.path.join(self.destdir, "other_file.txt") # Writing pre-existing content to the file, that should be overwritten preExistingContent = "This line should be overwritten." with open(destinationFileLocation, "w") as destinationFileHandle: destinationFileHandle.write(preExistingContent) destinationFileActualLocation = scm.get_file( { "scm": "git", "repo": "file:///%s" % self.gitRepositoryLocation, "file": sourceFilename, }, os.path.join(self.destdir, destinationFileLocation), compose=self.compose, overwrite=True, ) self.assertEqual(destinationFileActualLocation, destinationFileLocation) self.assertTrue(os.path.isfile(destinationFileActualLocation)) # Reading the contents of both files to compare later. with open(sourceFileLocation) as sourceFileHandle: sourceFileContent = sourceFileHandle.read() with open(destinationFileActualLocation) as destinationFileHandle: destinationFileContent = destinationFileHandle.read() # Ensuring that the file was in fact overwritten self.assertNotEqual(preExistingContent, destinationFileContent) # Comparing the contents of source to the destination file. self.assertEqual(sourceFileContent, destinationFileContent) class GitSCMTestCaseRealSubmodule(GitSCMTestCaseRealBase): def setUp(self): # This gets a little complicated. The test sets up a git repo with a # submodule and tries to obtain a file from the submodule. However, # submodules over file:// are restricted for security reasons. The test # should not modify any global configuration file, so to avoid the # issues we instead start a one-off HTTP server to serve the repository # on localhost. # The server runs in a separate thread. super(GitSCMTestCaseRealSubmodule, self).setUp() self.main_repo_path = tempfile.mkdtemp() submodule_path = self.gitRepositoryLocation run(["git", "update-server-info"], workdir=submodule_path) class MyHandler(http.server.SimpleHTTPRequestHandler): def __init__(self, *args, **kwargs): super(MyHandler, self).__init__( *args, directory=submodule_path, **kwargs ) self.httpd = http.server.HTTPServer(("", 0), MyHandler) self.httpd.timeout = 1 self.url = "http://localhost:%s/.git" % self.httpd.server_port self.thread_done = False def runner(): # Repeatedly handle a request until the flag is set. The timeout is # configured on the self.httpd object. while not self.thread_done: self.httpd.handle_request() self.t = threading.Thread(target=runner) self.t.start() cmds = [ ["git", "-c", "init.defaultBranch=master", "init"], ["git", "submodule", "add", "-b", "master", self.url, "submodule"], [ "git", "-c", "user.name=Pungi Test Engineer", "-c", "user.email=ptestengineer@example.com", "commit", "-am", "Add submodule", ], ] for cmd in cmds: run(cmd, workdir=self.main_repo_path) def tearDown(self): super(GitSCMTestCaseRealSubmodule, self).tearDown() self.thread_done = True self.t.join() shutil.rmtree(self.main_repo_path) def test_get_file(self): sourceFileLocation = random.choice(list(self.files.keys())) sourceFilename = os.path.basename(sourceFileLocation) destinationFileLocation = os.path.join(self.destdir, "other_file.txt") destinationFileActualLocation = scm.get_file( { "scm": "git", "repo": self.main_repo_path, "file": os.path.join("submodule", sourceFilename), }, destinationFileLocation, compose=self.compose, ) self.assertEqual(destinationFileActualLocation, destinationFileLocation) self.assertTrue(os.path.isfile(destinationFileActualLocation)) # Reading the contents of both files to compare later. with open(sourceFileLocation) as sourceFileHandle: sourceFileContent = sourceFileHandle.read() with open(destinationFileActualLocation) as destinationFileHandle: destinationFileContent = destinationFileHandle.read() # Comparing the contents of source to the destination file. self.assertEqual(sourceFileContent, destinationFileContent) class RpmSCMTestCase(SCMBaseTest): def setUp(self): super(RpmSCMTestCase, self).setUp() self.tmpdir = tempfile.mkdtemp() self.exploded = set() self.rpms = [self.tmpdir + "/whatever.rpm", self.tmpdir + "/another.rpm"] self.numbered = [ self.tmpdir + x for x in ["/one1.rpm", "/one2.rpm", "/two1.rpm", "/two2.rpm"] ] for rpm in self.rpms + self.numbered: touch(rpm) def tearDown(self): super(RpmSCMTestCase, self).tearDown() shutil.rmtree(self.tmpdir) def _explode_rpm(self, path, dest): self.exploded.add(path) touch(os.path.join(dest, "some-file.txt")) touch(os.path.join(dest, "subdir", "foo.txt")) touch(os.path.join(dest, "subdir", "bar.txt")) def _explode_multiple(self, path, dest): self.exploded.add(path) cnt = len(self.exploded) touch(os.path.join(dest, "some-file-%d.txt" % cnt)) touch(os.path.join(dest, "subdir-%d" % cnt, "foo-%d.txt" % cnt)) touch(os.path.join(dest, "common", "foo-%d.txt" % cnt)) @mock.patch("pungi.wrappers.scm.explode_rpm_package") def test_get_file(self, explode): explode.side_effect = self._explode_rpm retval = scm.get_file_from_scm( {"scm": "rpm", "repo": self.rpms[0], "file": "some-file.txt"}, self.destdir ) self.assertStructure(retval, ["some-file.txt"]) self.assertEqual(self.exploded, set([self.rpms[0]])) @mock.patch("pungi.wrappers.scm.explode_rpm_package") def test_get_more_files(self, explode): explode.side_effect = self._explode_rpm retval = scm.get_file_from_scm( { "scm": "rpm", "repo": self.rpms[0], "file": ["some-file.txt", "subdir/foo.txt"], }, self.destdir, ) self.assertStructure(retval, ["some-file.txt", "foo.txt"]) self.assertEqual(self.exploded, set([self.rpms[0]])) @mock.patch("pungi.wrappers.scm.explode_rpm_package") def test_get_whole_dir(self, explode): explode.side_effect = self._explode_rpm retval = scm.get_dir_from_scm( {"scm": "rpm", "repo": self.rpms[0], "dir": "subdir"}, self.destdir ) self.assertStructure(retval, ["subdir/foo.txt", "subdir/bar.txt"]) self.assertEqual(self.exploded, set([self.rpms[0]])) @mock.patch("pungi.wrappers.scm.explode_rpm_package") def test_get_dir_contents(self, explode): explode.side_effect = self._explode_rpm retval = scm.get_dir_from_scm( {"scm": "rpm", "repo": self.rpms[0], "dir": "subdir/"}, self.destdir ) self.assertStructure(retval, ["foo.txt", "bar.txt"]) self.assertEqual(self.exploded, set([self.rpms[0]])) @mock.patch("pungi.wrappers.scm.explode_rpm_package") def test_get_files_from_two_rpms(self, explode): explode.side_effect = self._explode_multiple retval = scm.get_file_from_scm( { "scm": "rpm", "repo": self.rpms, "file": ["some-file-1.txt", "some-file-2.txt"], }, self.destdir, ) self.assertStructure(retval, ["some-file-1.txt", "some-file-2.txt"]) self.assertCountEqual(self.exploded, self.rpms) @mock.patch("pungi.wrappers.scm.explode_rpm_package") def test_get_files_from_glob_rpms(self, explode): explode.side_effect = self._explode_multiple retval = scm.get_file_from_scm( { "scm": "rpm", "file": "some-file-*.txt", "repo": [self.tmpdir + "/one*.rpm", self.tmpdir + "/two*.rpm"], }, self.destdir, ) self.assertStructure( retval, [ "some-file-1.txt", "some-file-2.txt", "some-file-3.txt", "some-file-4.txt", ], ) self.assertCountEqual(self.exploded, self.numbered) @mock.patch("pungi.wrappers.scm.explode_rpm_package") def test_get_dir_from_two_rpms(self, explode): explode.side_effect = self._explode_multiple retval = scm.get_dir_from_scm( {"scm": "rpm", "repo": self.rpms, "dir": "common"}, self.destdir ) self.assertStructure(retval, ["common/foo-1.txt", "common/foo-2.txt"]) self.assertCountEqual(self.exploded, self.rpms) @mock.patch("pungi.wrappers.scm.explode_rpm_package") def test_get_dir_from_glob_rpms(self, explode): explode.side_effect = self._explode_multiple retval = scm.get_dir_from_scm( { "scm": "rpm", "dir": "common/", "repo": [self.tmpdir + "/one*.rpm", self.tmpdir + "/two*.rpm"], }, self.destdir, ) self.assertStructure( retval, ["foo-1.txt", "foo-2.txt", "foo-3.txt", "foo-4.txt"] ) self.assertCountEqual(self.exploded, self.numbered) class CvsSCMTestCase(SCMBaseTest): @mock.patch("pungi.wrappers.scm.run") def test_get_file(self, run): commands = [] def process(cmd, workdir=None, **kwargs): fname = cmd[-1] touch(os.path.join(workdir, fname)) commands.append(" ".join(cmd)) run.side_effect = process retval = scm.get_file_from_scm( {"scm": "cvs", "repo": "http://example.com/cvs", "file": "some_file.txt"}, self.destdir, ) self.assertStructure(retval, ["some_file.txt"]) self.assertEqual( commands, ["/usr/bin/cvs -q -d http://example.com/cvs export -r HEAD some_file.txt"], ) @mock.patch("pungi.wrappers.scm.run") def test_get_dir(self, run): commands = [] def process(cmd, workdir=None, **kwargs): fname = cmd[-1] touch(os.path.join(workdir, fname, "first")) touch(os.path.join(workdir, fname, "second")) commands.append(" ".join(cmd)) run.side_effect = process retval = scm.get_dir_from_scm( {"scm": "cvs", "repo": "http://example.com/cvs", "dir": "subdir"}, self.destdir, ) self.assertStructure(retval, ["first", "second"]) self.assertEqual( commands, ["/usr/bin/cvs -q -d http://example.com/cvs export -r HEAD subdir"], ) @mock.patch("pungi.wrappers.scm.urlretrieve") class KojiSCMTestCase(SCMBaseTest): def test_without_koji_profile(self, dl): compose = mock.Mock(conf={}) with self.assertRaises(RuntimeError) as ctx: scm.get_file_from_scm( {"scm": "koji", "repo": "my-build-1.0-2", "file": "*"}, self.destdir, compose=compose, ) self.assertIn("Koji profile must be configured", str(ctx.exception)) self.assertEqual(dl.mock_calls, []) @mock.patch("pungi.wrappers.scm.KojiWrapper") def test_doesnt_get_dirs(self, KW, dl): compose = mock.Mock(conf={"koji_profile": "koji"}) with self.assertRaises(RuntimeError) as ctx: scm.get_dir_from_scm( {"scm": "koji", "repo": "my-build-1.0-2", "dir": "*"}, self.destdir, compose=compose, ) self.assertIn("Only files can be exported", str(ctx.exception)) self.assertEqual(KW.mock_calls, [mock.call(compose)]) self.assertEqual(dl.mock_calls, []) def _setup_koji_wrapper(self, KW, build_id, files): KW.return_value.koji_module.config.topdir = "/mnt/koji" KW.return_value.koji_module.config.topurl = "http://koji.local/koji" KW.return_value.koji_module.pathinfo.typedir.return_value = "/mnt/koji/images" buildinfo = {"build_id": build_id} KW.return_value.koji_proxy.getBuild.return_value = buildinfo KW.return_value.koji_proxy.listArchives.return_value = [ {"filename": f, "btype": "image"} for f in files ] KW.return_value.koji_proxy.listTagged.return_value = [buildinfo] @mock.patch("pungi.wrappers.scm.KojiWrapper") def test_get_from_build(self, KW, dl): compose = mock.Mock(conf={"koji_profile": "koji"}) def download(src, dst): touch(dst) dl.side_effect = download self._setup_koji_wrapper(KW, 123, ["abc.out", "abc.tar"]) retval = scm.get_file_from_scm( {"scm": "koji", "repo": "my-build-1.0-2", "file": "*.tar"}, self.destdir, compose=compose, ) self.assertStructure(retval, ["abc.tar"]) self.assertEqual( KW.mock_calls, [ mock.call(compose), mock.call().koji_proxy.getBuild("my-build-1.0-2"), mock.call().koji_proxy.listArchives(123), mock.call().koji_module.pathinfo.typedir({"build_id": 123}, "image"), ], ) self.assertEqual( dl.call_args_list, [mock.call("http://koji.local/koji/images/abc.tar", mock.ANY)], ) @mock.patch("pungi.wrappers.scm.KojiWrapper") def test_get_from_latest_build(self, KW, dl): compose = mock.Mock(conf={"koji_profile": "koji"}) def download(src, dst): touch(dst) dl.side_effect = download self._setup_koji_wrapper(KW, 123, ["abc.out", "abc.tar"]) retval = scm.get_file_from_scm( {"scm": "koji", "repo": "my-build", "file": "*.tar", "branch": "images"}, self.destdir, compose=compose, ) self.assertStructure(retval, ["abc.tar"]) self.assertEqual( KW.mock_calls, [ mock.call(compose), mock.call().koji_proxy.listTagged( "images", package="my-build", inherit=True, latest=True ), mock.call().koji_proxy.listArchives(123), mock.call().koji_module.pathinfo.typedir({"build_id": 123}, "image"), ], ) self.assertEqual( dl.call_args_list, [mock.call("http://koji.local/koji/images/abc.tar", mock.ANY)], ) IMAGE_URL = "example.com/image" class ContainerImageScmWrapperTest(SCMBaseTest): def test_get_dir_is_not_implemented(self): with self.assertRaises(RuntimeError): scm.get_dir_from_scm( {"scm": "container-image", "repo": IMAGE_URL, "dir": ""}, self.destdir ) @parameterized.expand( [ ("x86_64", "amd64"), ("aarch64", "arm64"), ("s390x", "s390x"), ] ) @mock.patch("pungi.wrappers.scm.run") def test_get_file(self, real_arch, translated_arch, mock_run): scm.get_file_from_scm( { "scm": "container-image", "repo": IMAGE_URL + ":latest", "file": "", "target": "subdir", }, self.destdir, arch=real_arch, ) scm.get_file_from_scm( { "scm": "container-image", "repo": IMAGE_URL + ":prev", "file": "", "target": "subdir", }, self.destdir, arch=real_arch, ) self.assertCountEqual( mock_run.mock_calls, [ mock.call( [ "skopeo", f"--override-arch={translated_arch}", "copy", IMAGE_URL + ":latest", f"oci:{self.destdir}", "--remove-signatures", ], can_fail=False, ), mock.call( [ "skopeo", f"--override-arch={translated_arch}", "copy", IMAGE_URL + ":prev", f"oci:{self.destdir}", "--remove-signatures", ], can_fail=False, ), ], )