From ea5e414464a8c897c732f117eec1f4a98df1ee6f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1=C5=A1=20Hozza?= Date: Mon, 17 Apr 2023 17:30:24 +0200 Subject: [PATCH 1/3] Test: port assembler tests from unittest to pytest MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Port assembler tests from unittest to pytest. In addition, use parametrized tests when testing various filesystems and various combinations. This is important to be able to selectively skip the test for if a specific filesystem is not supported by the kernel (e.g. btrfs is not supported on RHEL). Skipping a unittest subtest is not possible, which is the motivation to move away from it and use only pytest. Test output is now also much nicer for parametrized test cases. Signed-off-by: Tomáš Hozza --- test/run/test_assemblers.py | 467 +++++++++++++++++++----------------- 1 file changed, 241 insertions(+), 226 deletions(-) diff --git a/test/run/test_assemblers.py b/test/run/test_assemblers.py index 5a31a17..30638e6 100644 --- a/test/run/test_assemblers.py +++ b/test/run/test_assemblers.py @@ -8,7 +8,8 @@ import json import os import subprocess import tempfile -import unittest + +import pytest from osbuild import loop @@ -17,237 +18,230 @@ from .. import test MEBIBYTE = 1024 * 1024 -@unittest.skipUnless(test.TestBase.have_test_data(), "no test-data access") -@unittest.skipUnless(test.TestBase.can_bind_mount(), "root-only") -class TestAssemblers(test.TestBase): - @classmethod - def setUpClass(cls): - super().setUpClass() +@pytest.fixture(name="osbuild") +def osbuild_fixture(): + yield test.OSBuild() + + +def assertImageFile(filename, fmt, expected_size=None): + info = json.loads(subprocess.check_output(["qemu-img", "info", "--output", "json", filename])) + assert info["format"] == fmt + assert info["virtual-size"] == expected_size - def setUp(self): - self.osbuild = test.OSBuild() - @contextlib.contextmanager - def run_assembler(self, osb, name, options, output_path): - with open(os.path.join(self.locate_test_data(), - "manifests/filesystem.json"), +def assertFilesystem(device, uuid, fstype, tree): + output = subprocess.check_output(["blkid", "--output", "export", device], encoding="utf8") + blkid = dict(line.split("=") for line in output.strip().split("\n")) + assert blkid["UUID"] == uuid + assert blkid["TYPE"] == fstype + + with mount(device) as target_tree: + diff = test.TestBase.tree_diff(tree, target_tree) + if fstype == 'ext4': + added_files = ["/lost+found"] + else: + added_files = [] + assert diff["added_files"] == added_files + assert diff["deleted_files"] == [] + assert diff["differences"] == {} + + +def assertGRUB2(device, l1hash, l2hash, size): + m1 = hashlib.sha256() + m2 = hashlib.sha256() + with open(device, "rb") as d: + sectors = d.read(size) + assert len(sectors) == size + m1.update(sectors[:440]) + m2.update(sectors[512:size]) + assert m1.hexdigest() == l1hash + assert m2.hexdigest() == l2hash + + +def assertPartitionTable(ptable, label, uuid, n_partitions, boot_partition=None): + assert ptable["label"] == label + assert ptable["id"][2:] == uuid[:8] + assert len(ptable["partitions"]) == n_partitions + + if boot_partition: + bootable = [p.get("bootable", False) for p in ptable["partitions"]] + assert bootable.count(True) == 1 + assert bootable.index(True) + 1 == boot_partition + + +def read_partition_table(device): + sfdisk = json.loads(subprocess.check_output(["sfdisk", "--json", device])) + ptable = sfdisk["partitiontable"] + assert ptable is not None + return ptable + + +@pytest.mark.skipif(not test.TestBase.have_tree_diff(), reason="tree-diff missing") +@pytest.mark.skipif(not test.TestBase.have_test_data(), reason="no test-data access") +@pytest.mark.skipif(not test.TestBase.can_bind_mount(), reason="root-only") +@pytest.mark.parametrize("fs_type", ["ext4", "xfs", "btrfs"]) +def test_rawfs(osbuild, fs_type): + options = { + "filename": "image.raw", + "root_fs_uuid": "016a1cda-5182-4ab3-bf97-426b00b74eb0", + "size": 1024 * MEBIBYTE, + "fs_type": fs_type, + } + with osbuild as osb: + with run_assembler(osb, "org.osbuild.rawfs", options, "image.raw") as (tree, image): + assertImageFile(image, "raw", options["size"]) + assertFilesystem(image, options["root_fs_uuid"], fs_type, tree) + + +@pytest.mark.skipif(not test.TestBase.have_tree_diff(), reason="tree-diff missing") +@pytest.mark.skipif(not test.TestBase.have_test_data(), reason="no test-data access") +@pytest.mark.skipif(not test.TestBase.can_bind_mount(), reason="root-only") +def test_ostree(osbuild): + with osbuild as osb: + with open(os.path.join(test.TestBase.locate_test_data(), + "manifests/fedora-ostree-commit.json"), encoding="utf8") as f: manifest = json.load(f) - manifest["pipeline"] = dict( - manifest["pipeline"], - assembler={"name": name, "options": options} - ) - data = json.dumps(manifest) - - treeid = osb.treeid_from_manifest(data) - assert treeid + data = json.dumps(manifest) with tempfile.TemporaryDirectory(dir="/var/tmp") as output_dir: - osb.compile(data, output_dir=output_dir, exports=["assembler", "tree"]) - tree = os.path.join(output_dir, "tree") - yield tree, os.path.join(output_dir, "assembler", output_path) - - def assertImageFile(self, filename, fmt, expected_size=None): - info = json.loads(subprocess.check_output(["qemu-img", "info", "--output", "json", filename])) - self.assertEqual(info["format"], fmt) - self.assertEqual(info["virtual-size"], expected_size) - - def assertFilesystem(self, device, uuid, fstype, tree): - output = subprocess.check_output(["blkid", "--output", "export", device], encoding="utf8") - blkid = dict(line.split("=") for line in output.strip().split("\n")) - self.assertEqual(blkid["UUID"], uuid) - self.assertEqual(blkid["TYPE"], fstype) - - with mount(device) as target_tree: - diff = self.tree_diff(tree, target_tree) - if fstype == 'ext4': - added_files = ["/lost+found"] - else: - added_files = [] - self.assertEqual(diff["added_files"], added_files) - self.assertEqual(diff["deleted_files"], []) - self.assertEqual(diff["differences"], {}) - - def assertGRUB2(self, device, l1hash, l2hash, size): - m1 = hashlib.sha256() - m2 = hashlib.sha256() - with open(device, "rb") as d: - sectors = d.read(size) - self.assertEqual(len(sectors), size) - m1.update(sectors[:440]) - m2.update(sectors[512:size]) - self.assertEqual(m1.hexdigest(), l1hash) - self.assertEqual(m2.hexdigest(), l2hash) - - def assertPartitionTable(self, ptable, label, uuid, n_partitions, boot_partition=None): - self.assertEqual(ptable["label"], label) - self.assertEqual(ptable["id"][2:], uuid[:8]) - self.assertEqual(len(ptable["partitions"]), n_partitions) - - if boot_partition: - bootable = [p.get("bootable", False) for p in ptable["partitions"]] - self.assertEqual(bootable.count(True), 1) - self.assertEqual(bootable.index(True) + 1, boot_partition) - - def read_partition_table(self, device): - sfdisk = json.loads(subprocess.check_output(["sfdisk", "--json", device])) - ptable = sfdisk["partitiontable"] - self.assertIsNotNone(ptable) - return ptable - - @unittest.skipUnless(test.TestBase.have_tree_diff(), "tree-diff missing") - def test_rawfs(self): - for fs_type in ["ext4", "xfs", "btrfs"]: - with self.subTest(fs_type=fs_type): - print(f" {fs_type}", flush=True) - options = { - "filename": "image.raw", - "root_fs_uuid": "016a1cda-5182-4ab3-bf97-426b00b74eb0", - "size": 1024 * MEBIBYTE, - "fs_type": fs_type, - } - with self.osbuild as osb: - with self.run_assembler(osb, "org.osbuild.rawfs", options, "image.raw") as (tree, image): - self.assertImageFile(image, "raw", options["size"]) - self.assertFilesystem(image, options["root_fs_uuid"], fs_type, tree) - - @unittest.skipUnless(test.TestBase.have_tree_diff(), "tree-diff missing") - def test_ostree(self): - with self.osbuild as osb: - with open(os.path.join(self.locate_test_data(), - "manifests/fedora-ostree-commit.json"), - encoding="utf8") as f: - manifest = json.load(f) - - data = json.dumps(manifest) - with tempfile.TemporaryDirectory(dir="/var/tmp") as output_dir: - result = osb.compile(data, output_dir=output_dir, exports=["ostree-commit"]) - compose_file = os.path.join(output_dir, "ostree-commit", "compose.json") - repo = os.path.join(output_dir, "ostree-commit", "repo") - - with open(compose_file, encoding="utf8") as f: - compose = json.load(f) - commit_id = compose["ostree-commit"] - ref = compose["ref"] - rpmostree_inputhash = compose["rpm-ostree-inputhash"] - os_version = compose["ostree-version"] - assert commit_id - assert ref - assert rpmostree_inputhash - assert os_version - self.assertIn("metadata", result) - metadata = result["metadata"] - commit = metadata["ostree-commit"] - info = commit["org.osbuild.ostree.commit"] - self.assertIn("compose", info) - self.assertEqual(compose, info["compose"]) - - md = subprocess.check_output( - [ - "ostree", - "show", - "--repo", repo, - "--print-metadata-key=rpmostree.inputhash", - commit_id - ], encoding="utf8").strip() - self.assertEqual(md, f"'{rpmostree_inputhash}'") - - md = subprocess.check_output( - [ - "ostree", - "show", - "--repo", repo, - "--print-metadata-key=version", - commit_id - ], encoding="utf8").strip() - self.assertEqual(md, f"'{os_version}'") - - @unittest.skipUnless(test.TestBase.have_tree_diff(), "tree-diff missing") - def test_qemu(self): - loctl = loop.LoopControl() - with self.osbuild as osb: - for fmt in ["raw", "raw.xz", "qcow2", "vmdk", "vdi"]: - for fs_type in ["ext4", "xfs", "btrfs"]: - with self.subTest(fmt=fmt, fs_type=fs_type): - print(f" {fmt} {fs_type}", flush=True) - options = { - "format": fmt, - "filename": f"image.{fmt}", - "ptuuid": "b2c09a39-db93-44c5-846a-81e06b1dc162", - "root_fs_uuid": "aff010e9-df95-4f81-be6b-e22317251033", - "size": 1024 * MEBIBYTE, - "root_fs_type": fs_type, - } - with self.run_assembler(osb, - "org.osbuild.qemu", - options, - f"image.{fmt}") as (tree, image): - if fmt == "raw.xz": - subprocess.run(["unxz", "--keep", "--force", image], check=True) - image = image[:-3] - fmt = "raw" - self.assertImageFile(image, fmt, options["size"]) - with open_image(loctl, image, fmt) as (target, device): - ptable = self.read_partition_table(device) - self.assertPartitionTable(ptable, - "dos", - options["ptuuid"], - 1, - boot_partition=1) - if fs_type == "btrfs": - l2hash = "919aad44d37aa9fdbb8cb1bbd8ce2a44e64aee76f4dceb805eaab041b7f62348" - elif fs_type == "xfs": - l2hash = "1729f531281e4c3cbcde2a39b587c9dd5334ea1335bb860905556d5b73603de6" - else: - l2hash = "24c3ad6be9a5687d5140e0bf66d25953c4f0c7eeb6aaced4cc64685f5b3cfa9e" - self.assertGRUB2(device, - "26e3327c6b5ac9b5e21d8b86f19ff7cb4d12fb2d0406713f936997d9d89de3ee", - l2hash, - 1024 * 1024) - - p1 = ptable["partitions"][0] - ssize = ptable.get("sectorsize", 512) - start, size = p1["start"] * ssize, p1["size"] * ssize - with loop_open(loctl, target, offset=start, size=size) as dev: - self.assertFilesystem(dev, options["root_fs_uuid"], fs_type, tree) - - @unittest.skipUnless(test.TestBase.have_tree_diff(), "tree-diff missing") - def test_tar(self): - cases = [ - ("tree.tar.gz", None, ["application/x-tar"]), - ("tree.tar.gz", "gzip", ["application/x-gzip", "application/gzip"]) - ] - with self.osbuild as osb: - for filename, compression, expected_mimetypes in cases: - options = {"filename": filename} - if compression: - options["compression"] = compression - with self.run_assembler(osb, - "org.osbuild.tar", - options, - filename) as (tree, image): - output = subprocess.check_output(["file", "--mime-type", image], encoding="utf8") - _, mimetype = output.strip().split(": ") # "filename: mimetype" - self.assertIn(mimetype, expected_mimetypes) - - if compression: - continue - - # In the non-compression case, we verify the tree's content - with tempfile.TemporaryDirectory(dir="/var/tmp") as tmp: - args = [ - "tar", - "--numeric-owner", - "--selinux", - "--acls", - "--xattrs", "--xattrs-include", "*", - "-xaf", image, - "-C", tmp] - subprocess.check_output(args, encoding="utf8") - diff = self.tree_diff(tree, tmp) - self.assertEqual(diff["added_files"], []) - self.assertEqual(diff["deleted_files"], []) - self.assertEqual(diff["differences"], {}) + result = osb.compile(data, output_dir=output_dir, exports=["ostree-commit"]) + compose_file = os.path.join(output_dir, "ostree-commit", "compose.json") + repo = os.path.join(output_dir, "ostree-commit", "repo") + + with open(compose_file, encoding="utf8") as f: + compose = json.load(f) + commit_id = compose["ostree-commit"] + ref = compose["ref"] + rpmostree_inputhash = compose["rpm-ostree-inputhash"] + os_version = compose["ostree-version"] + assert commit_id + assert ref + assert rpmostree_inputhash + assert os_version + assert "metadata" in result + metadata = result["metadata"] + commit = metadata["ostree-commit"] + info = commit["org.osbuild.ostree.commit"] + assert "compose" in info + assert info["compose"] == compose + + md = subprocess.check_output( + [ + "ostree", + "show", + "--repo", repo, + "--print-metadata-key=rpmostree.inputhash", + commit_id + ], encoding="utf8").strip() + assert md == f"'{rpmostree_inputhash}'" + + md = subprocess.check_output( + [ + "ostree", + "show", + "--repo", repo, + "--print-metadata-key=version", + commit_id + ], encoding="utf8").strip() + assert md == f"'{os_version}'" + + +@pytest.mark.skipif(not test.TestBase.have_tree_diff(), reason="tree-diff missing") +@pytest.mark.skipif(not test.TestBase.have_test_data(), reason="no test-data access") +@pytest.mark.skipif(not test.TestBase.can_bind_mount(), reason="root-only") +@pytest.mark.parametrize( + "fmt,fs_type", + [("raw", "ext4"), ("raw", "xfs"), ("raw", "btrfs"), + ("raw.xz", "ext4"), ("raw.xz", "xfs"), ("raw.xz", "btrfs"), + ("qcow2", "ext4"), ("qcow2", "xfs"), ("qcow2", "btrfs"), + ("vmdk", "ext4"), ("vmdk", "xfs"), ("vmdk", "btrfs"), + ("vdi", "ext4"), ("vdi", "xfs"), ("vdi", "btrfs")] +) +def test_qemu(osbuild, fmt, fs_type): + loctl = loop.LoopControl() + with osbuild as osb: + options = { + "format": fmt, + "filename": f"image.{fmt}", + "ptuuid": "b2c09a39-db93-44c5-846a-81e06b1dc162", + "root_fs_uuid": "aff010e9-df95-4f81-be6b-e22317251033", + "size": 1024 * MEBIBYTE, + "root_fs_type": fs_type, + } + with run_assembler(osb, + "org.osbuild.qemu", + options, + f"image.{fmt}") as (tree, image): + if fmt == "raw.xz": + subprocess.run(["unxz", "--keep", "--force", image], check=True) + image = image[:-3] + fmt = "raw" + assertImageFile(image, fmt, options["size"]) + with open_image(loctl, image, fmt) as (target, device): + ptable = read_partition_table(device) + assertPartitionTable(ptable, + "dos", + options["ptuuid"], + 1, + boot_partition=1) + if fs_type == "btrfs": + l2hash = "919aad44d37aa9fdbb8cb1bbd8ce2a44e64aee76f4dceb805eaab041b7f62348" + elif fs_type == "xfs": + l2hash = "1729f531281e4c3cbcde2a39b587c9dd5334ea1335bb860905556d5b73603de6" + else: + l2hash = "24c3ad6be9a5687d5140e0bf66d25953c4f0c7eeb6aaced4cc64685f5b3cfa9e" + assertGRUB2(device, + "26e3327c6b5ac9b5e21d8b86f19ff7cb4d12fb2d0406713f936997d9d89de3ee", + l2hash, + 1024 * 1024) + + p1 = ptable["partitions"][0] + ssize = ptable.get("sectorsize", 512) + start, size = p1["start"] * ssize, p1["size"] * ssize + with loop_open(loctl, target, offset=start, size=size) as dev: + assertFilesystem(dev, options["root_fs_uuid"], fs_type, tree) + + +@pytest.mark.skipif(not test.TestBase.have_tree_diff(), reason="tree-diff missing") +@pytest.mark.skipif(not test.TestBase.have_test_data(), reason="no test-data access") +@pytest.mark.skipif(not test.TestBase.can_bind_mount(), reason="root-only") +@pytest.mark.parametrize( + "filename,compression,expected_mimetypes", + [("tree.tar.gz", None, ["application/x-tar"]), + ("tree.tar.gz", "gzip", ["application/x-gzip", "application/gzip"])] +) +def test_tar(osbuild, filename, compression, expected_mimetypes): + with osbuild as osb: + options = {"filename": filename} + if compression: + options["compression"] = compression + with run_assembler(osb, + "org.osbuild.tar", + options, + filename) as (tree, image): + output = subprocess.check_output(["file", "--mime-type", image], encoding="utf8") + _, mimetype = output.strip().split(": ") # "filename: mimetype" + assert mimetype in expected_mimetypes + + if compression: + return + + # In the non-compression case, we verify the tree's content + with tempfile.TemporaryDirectory(dir="/var/tmp") as tmp: + args = [ + "tar", + "--numeric-owner", + "--selinux", + "--acls", + "--xattrs", "--xattrs-include", "*", + "-xaf", image, + "-C", tmp] + subprocess.check_output(args, encoding="utf8") + diff = test.TestBase.tree_diff(tree, tmp) + assert diff["added_files"] == [] + assert diff["deleted_files"] == [] + assert diff["differences"] == {} @contextlib.contextmanager @@ -296,3 +290,24 @@ def open_image(ctl, image, fmt): with loop_open(ctl, target, offset=0, size=size) as dev: yield target, dev + + +@contextlib.contextmanager +def run_assembler(osb, name, options, output_path): + with open(os.path.join(test.TestBase.locate_test_data(), + "manifests/filesystem.json"), + encoding="utf8") as f: + manifest = json.load(f) + manifest["pipeline"] = dict( + manifest["pipeline"], + assembler={"name": name, "options": options} + ) + data = json.dumps(manifest) + + treeid = osb.treeid_from_manifest(data) + assert treeid + + with tempfile.TemporaryDirectory(dir="/var/tmp") as output_dir: + osb.compile(data, output_dir=output_dir, exports=["assembler", "tree"]) + tree = os.path.join(output_dir, "tree") + yield tree, os.path.join(output_dir, "assembler", output_path) -- 2.39.2