From 51a9c28bc6f67295a6f5250d5fdc39d3428115ce Mon Sep 17 00:00:00 2001 From: "Brian C. Lane" Date: Thu, 8 Nov 2018 15:48:53 -0800 Subject: [PATCH] Add tests for partitioned disk images The setup for this is a bit complex, so it really ends up testing things twice. (cherry picked from commit fb4e6f25882cefd3b23e63a099d2d0d1d8fd02b8) --- Dockerfile.test | 1 + tests/pylorax/test_imgutils.py | 115 ++++++++++++++++++++++++++++++++- 2 files changed, 115 insertions(+), 1 deletion(-) diff --git a/Dockerfile.test b/Dockerfile.test index e4699f5c..fd19746c 100644 --- a/Dockerfile.test +++ b/Dockerfile.test @@ -14,6 +14,7 @@ RUN dnf -y install \ python3-nose \ python3-pocketlint \ python3-pylint \ + python3-pyparted \ python3-pytoml \ python3-semantic_version \ python3-sphinx \ diff --git a/tests/pylorax/test_imgutils.py b/tests/pylorax/test_imgutils.py index 20f9dc7b..feee83eb 100644 --- a/tests/pylorax/test_imgutils.py +++ b/tests/pylorax/test_imgutils.py @@ -14,7 +14,9 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . # +import glob import os +import parted import tempfile import unittest @@ -23,6 +25,7 @@ from pylorax.executils import runcmd from pylorax.imgutils import mkcpio, mktar, mksquashfs, mksparse, mkqcow2, loop_attach, loop_detach from pylorax.imgutils import get_loop_name, LoopDev, dm_attach, dm_detach, DMDev, Mount from pylorax.imgutils import mkdosimg, mkext4img, mkbtrfsimg, mkhfsimg, default_image_name +from pylorax.imgutils import mount, umount, kpartx_disk_img, PartitionMount, mkfsimage_from_disk from pylorax.sysutils import joinpaths def mkfakerootdir(rootdir): @@ -42,6 +45,76 @@ def mkfakerootdir(rootdir): os.makedirs(joinpaths(rootdir, os.path.dirname(f))) open(joinpaths(rootdir, f), "w").write("I AM FAKE FILE %s" % f.upper()) +def mkfakebootdir(bootdir): + """Populate a fake /boot directory with a kernel and initrd + + :param bootdir: An existing directory to create files/dirs under + :type bootdir: str + """ + open(joinpaths(bootdir, "vmlinuz-4.18.13-200.fc28.x86_64"), "w").write("I AM A FAKE KERNEL") + open(joinpaths(bootdir, "initramfs-4.18.13-200.fc28.x86_64.img"), "w").write("I AM A FAKE INITRD") + +def mkfakediskimg(disk_img): + """Create a fake partitioned disk image + + :param disk_img: Full path to a partitioned disk image + :type disk_img: str + :returns: True if it was successful, False if something went wrong + + Include /boot, swap, and / partitions with fake kernel and /etc/passwd + """ + try: + mksparse(disk_img, 42 * 1024**2) + # Make a /boot, / and swap partitions on it + dev = parted.getDevice(disk_img) + disk = parted.freshDisk(dev, "gpt") + + # (start, length, flags, name) + for start, length, flags, name in [ + ( 1024**2, 1024**2, None, "boot"), + (2*1024**2, 2*1024**2, parted.PARTITION_SWAP, "swap"), + (4*1024**2, 38*1024**2, None, "root")]: + geo = parted.Geometry(device=dev, start=start//dev.sectorSize, length=length//dev.sectorSize) + part = parted.Partition(disk=disk, type=parted.PARTITION_NORMAL, geometry=geo) + part.getPedPartition().set_name(name) + disk.addPartition(partition=part) + if flags: + part.setFlag(flags) + disk.commit() + os.sync() + except parted.PartedException: + return False + + # Mount the disk's partitions + loop_devs = kpartx_disk_img(disk_img) + + try: + # Format the partitions + runcmd(["mkfs.ext4", "/dev/mapper/" + loop_devs[0][0]]) + runcmd(["mkswap", "/dev/mapper/" + loop_devs[1][0]]) + runcmd(["mkfs.ext4", "/dev/mapper/" + loop_devs[2][0]]) + + # Mount the boot partition and make a fake kernel and initrd + boot_mnt = mount("/dev/mapper/" + loop_devs[0][0]) + try: + mkfakebootdir(boot_mnt) + finally: + umount(boot_mnt) + + # Mount the / partition and make a fake / filesystem with /etc/passwd + root_mnt = mount("/dev/mapper/" + loop_devs[2][0]) + try: + mkfakerootdir(root_mnt) + finally: + umount(root_mnt) + except Exception: + return False + finally: + # Remove the disk's mounted partitions + runcmd(["kpartx", "-d", "-s", disk_img]) + + return True + class ImgUtilsTest(unittest.TestCase): def mkcpio_test(self): """Test mkcpio function""" @@ -179,7 +252,8 @@ class ImgUtilsTest(unittest.TestCase): with tempfile.TemporaryDirectory(prefix="lorax.test.") as work_dir: with tempfile.NamedTemporaryFile(prefix="lorax.test.disk.") as disk_img: mkfakerootdir(work_dir) - mkext4img(work_dir, disk_img.name) + graft = {"/etc/yum.repos.d/": "./tests/pylorax/repos/single.repo"} + mkext4img(work_dir, disk_img.name, graft=graft) self.assertTrue(os.path.exists(disk_img.name)) file_details = get_file_magic(disk_img.name) self.assertTrue("ext2 filesystem" in file_details, file_details) @@ -212,3 +286,42 @@ class ImgUtilsTest(unittest.TestCase): filename = default_image_name(compression, "foobar") self.assertTrue(filename.endswith(suffix)) + @unittest.skipUnless(os.geteuid() == 0 and not os.path.exists("/.in-container"), "requires root privileges, and no containers") + def partition_mount_test(self): + """Test PartitionMount context manager (requires loop)""" + with tempfile.NamedTemporaryFile(prefix="lorax.test.disk.") as disk_img: + self.assertTrue(mkfakediskimg(disk_img.name)) + # Make sure it can mount the / with /etc/passwd + with PartitionMount(disk_img.name) as img_mount: + self.assertTrue(img_mount is not None) + self.assertTrue(os.path.isdir(img_mount.mount_dir)) + self.assertTrue(os.path.exists(joinpaths(img_mount.mount_dir, "/etc/passwd"))) + + # Make sure submount works + with PartitionMount(disk_img.name, submount="/a-sub-mount/") as img_mount: + self.assertTrue(img_mount is not None) + self.assertTrue(os.path.isdir(img_mount.mount_dir)) + self.assertTrue(os.path.exists(joinpaths(img_mount.mount_dir, "/etc/passwd"))) + + # Make sure it can mount the /boot partition with a custom mount_ok function + def mount_ok(mount_dir): + kernels = glob.glob(joinpaths(mount_dir, "vmlinuz-*")) + return len(kernels) > 0 + + with PartitionMount(disk_img.name, mount_ok=mount_ok) as img_mount: + self.assertTrue(img_mount is not None) + self.assertTrue(os.path.isdir(img_mount.mount_dir)) + self.assertFalse(os.path.exists(joinpaths(img_mount.mount_dir, "/etc/passwd"))) + self.assertTrue(os.path.exists(joinpaths(img_mount.mount_dir, "vmlinuz-4.18.13-200.fc28.x86_64"))) + self.assertTrue(os.path.exists(joinpaths(img_mount.mount_dir, "initramfs-4.18.13-200.fc28.x86_64.img"))) + + @unittest.skipUnless(os.geteuid() == 0 and not os.path.exists("/.in-container"), "requires root privileges, and no containers") + def mkfsimage_from_disk_test(self): + """Test creating a fsimage from the / partition of a disk image""" + with tempfile.NamedTemporaryFile(prefix="lorax.test.disk.") as disk_img: + self.assertTrue(mkfakediskimg(disk_img.name)) + with tempfile.NamedTemporaryFile(prefix="lorax.test.disk.") as fs_img: + mkfsimage_from_disk(disk_img.name, fs_img.name) + self.assertTrue(os.path.exists(fs_img.name)) + file_details = get_file_magic(fs_img.name) + self.assertTrue("ext2 filesystem" in file_details, file_details)