c585b91422
Modify imgutils.compress to allow the "rootdir" argument to be either a directory or a single file to add to an archive.
340 lines
17 KiB
Python
340 lines
17 KiB
Python
#
|
|
# Copyright (C) 2018 Red Hat, Inc.
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation; either version 2 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
#
|
|
import glob
|
|
import os
|
|
import parted
|
|
import tarfile
|
|
import tempfile
|
|
import unittest
|
|
|
|
from ..lib import get_file_magic
|
|
from pylorax.executils import runcmd
|
|
from pylorax.imgutils import mkcpio, mktar, mksquashfs, mksparse, mkqcow2, loop_attach, loop_detach
|
|
from pylorax.imgutils import get_loop_name, LoopDev, dm_attach, dm_detach, DMDev, Mount
|
|
from pylorax.imgutils import mkdosimg, mkext4img, mkbtrfsimg, mkhfsimg, default_image_name
|
|
from pylorax.imgutils import mount, umount, kpartx_disk_img, PartitionMount, mkfsimage_from_disk
|
|
from pylorax.sysutils import joinpaths
|
|
|
|
def mkfakerootdir(rootdir):
|
|
"""Populate a fake rootdir with a few directories and files
|
|
|
|
:param rootdir: An existing directory to create files/dirs under
|
|
:type rootdir: str
|
|
|
|
Use this for testing the mk* functions that compress a directory tree
|
|
"""
|
|
dirs = ["/root", "/usr/sbin/", "/usr/local/", "/home/bart", "/etc/"]
|
|
files = ["/etc/passwd", "/home/bart/.bashrc", "/root/.bashrc"]
|
|
for d in dirs:
|
|
os.makedirs(joinpaths(rootdir, d))
|
|
for f in files:
|
|
if not os.path.isdir(joinpaths(rootdir, os.path.dirname(f))):
|
|
os.makedirs(joinpaths(rootdir, os.path.dirname(f)))
|
|
open(joinpaths(rootdir, f), "w").write("I AM FAKE FILE %s" % f.upper())
|
|
|
|
def mkfakebootdir(bootdir):
|
|
"""Populate a fake /boot directory with a kernel and initrd
|
|
|
|
:param bootdir: An existing directory to create files/dirs under
|
|
:type bootdir: str
|
|
"""
|
|
open(joinpaths(bootdir, "vmlinuz-4.18.13-200.fc28.x86_64"), "w").write("I AM A FAKE KERNEL")
|
|
open(joinpaths(bootdir, "initramfs-4.18.13-200.fc28.x86_64.img"), "w").write("I AM A FAKE INITRD")
|
|
|
|
def mkfakediskimg(disk_img):
|
|
"""Create a fake partitioned disk image
|
|
|
|
:param disk_img: Full path to a partitioned disk image
|
|
:type disk_img: str
|
|
:returns: True if it was successful, False if something went wrong
|
|
|
|
Include /boot, swap, and / partitions with fake kernel and /etc/passwd
|
|
"""
|
|
try:
|
|
mksparse(disk_img, 42 * 1024**2)
|
|
# Make a /boot, / and swap partitions on it
|
|
dev = parted.getDevice(disk_img)
|
|
disk = parted.freshDisk(dev, "gpt")
|
|
|
|
# (start, length, flags, name)
|
|
for start, length, flags, name in [
|
|
( 1024**2, 1024**2, None, "boot"),
|
|
(2*1024**2, 2*1024**2, parted.PARTITION_SWAP, "swap"),
|
|
(4*1024**2, 38*1024**2, None, "root")]:
|
|
geo = parted.Geometry(device=dev, start=start//dev.sectorSize, length=length//dev.sectorSize)
|
|
part = parted.Partition(disk=disk, type=parted.PARTITION_NORMAL, geometry=geo)
|
|
part.getPedPartition().set_name(name)
|
|
disk.addPartition(partition=part)
|
|
if flags:
|
|
part.setFlag(flags)
|
|
disk.commit()
|
|
os.sync()
|
|
except parted.PartedException:
|
|
return False
|
|
|
|
# Mount the disk's partitions
|
|
loop_devs = kpartx_disk_img(disk_img)
|
|
|
|
try:
|
|
# Format the partitions
|
|
runcmd(["mkfs.ext4", "/dev/mapper/" + loop_devs[0][0]])
|
|
runcmd(["mkswap", "/dev/mapper/" + loop_devs[1][0]])
|
|
runcmd(["mkfs.ext4", "/dev/mapper/" + loop_devs[2][0]])
|
|
|
|
# Mount the boot partition and make a fake kernel and initrd
|
|
boot_mnt = mount("/dev/mapper/" + loop_devs[0][0])
|
|
try:
|
|
mkfakebootdir(boot_mnt)
|
|
finally:
|
|
umount(boot_mnt)
|
|
|
|
# Mount the / partition and make a fake / filesystem with /etc/passwd
|
|
root_mnt = mount("/dev/mapper/" + loop_devs[2][0])
|
|
try:
|
|
mkfakerootdir(root_mnt)
|
|
finally:
|
|
umount(root_mnt)
|
|
except Exception:
|
|
return False
|
|
finally:
|
|
# Remove the disk's mounted partitions
|
|
runcmd(["kpartx", "-d", "-s", disk_img])
|
|
|
|
return True
|
|
|
|
class ImgUtilsTest(unittest.TestCase):
|
|
def mkcpio_test(self):
|
|
"""Test mkcpio function"""
|
|
with tempfile.TemporaryDirectory(prefix="lorax.test.") as work_dir:
|
|
with tempfile.NamedTemporaryFile(prefix="lorax.test.disk.") as disk_img:
|
|
mkfakerootdir(work_dir)
|
|
mkcpio(work_dir, disk_img.name, compression=None)
|
|
|
|
self.assertTrue(os.path.exists(disk_img.name))
|
|
file_details = get_file_magic(disk_img.name)
|
|
self.assertTrue("cpio" in file_details, file_details)
|
|
|
|
def mktar_test(self):
|
|
"""Test mktar function"""
|
|
with tempfile.TemporaryDirectory(prefix="lorax.test.") as work_dir:
|
|
with tempfile.NamedTemporaryFile(prefix="lorax.test.disk.") as disk_img:
|
|
mkfakerootdir(work_dir)
|
|
mktar(work_dir, disk_img.name, compression=None)
|
|
|
|
self.assertTrue(os.path.exists(disk_img.name))
|
|
file_details = get_file_magic(disk_img.name)
|
|
self.assertTrue("POSIX tar" in file_details, file_details)
|
|
|
|
def compressed_mktar_test(self):
|
|
"""Test compressed mktar function"""
|
|
with tempfile.TemporaryDirectory(prefix="lorax.test.") as work_dir:
|
|
with tempfile.NamedTemporaryFile(prefix="lorax.test.disk.") as disk_img:
|
|
mkfakerootdir(work_dir)
|
|
for (compression, magic) in [("xz", "XZ compressed"),
|
|
("lzma", "LZMA compressed"),
|
|
("gzip", "gzip compressed"),
|
|
("bzip2", "bzip2 compressed")]:
|
|
os.unlink(disk_img.name)
|
|
mktar(work_dir, disk_img.name, compression=compression)
|
|
|
|
self.assertTrue(os.path.exists(disk_img.name))
|
|
file_details = get_file_magic(disk_img.name)
|
|
self.assertTrue(magic in file_details, (compression, magic, file_details))
|
|
|
|
def mktar_single_file_test(self):
|
|
with tempfile.NamedTemporaryFile(prefix="lorax.test.disk.") as disk_img,\
|
|
tempfile.NamedTemporaryFile(prefix="lorax.test.input.") as input_file:
|
|
mktar(input_file.name, disk_img.name, compression=None)
|
|
|
|
self.assertTrue(os.path.exists(disk_img.name))
|
|
self.assertTrue(tarfile.is_tarfile(disk_img.name))
|
|
|
|
with tarfile.TarFile(disk_img.name) as t:
|
|
self.assertEqual(t.getnames(), [os.path.basename(input_file.name)])
|
|
|
|
def mksquashfs_test(self):
|
|
"""Test mksquashfs function"""
|
|
with tempfile.TemporaryDirectory(prefix="lorax.test.") as work_dir:
|
|
with tempfile.NamedTemporaryFile(prefix="lorax.test.disk.") as disk_img:
|
|
mkfakerootdir(work_dir)
|
|
disk_img.close()
|
|
mksquashfs(work_dir, disk_img.name)
|
|
|
|
self.assertTrue(os.path.exists(disk_img.name))
|
|
file_details = get_file_magic(disk_img.name)
|
|
self.assertTrue("Squashfs" in file_details, file_details)
|
|
|
|
def mksparse_test(self):
|
|
"""Test mksparse function"""
|
|
with tempfile.NamedTemporaryFile(prefix="lorax.test.disk.") as disk_img:
|
|
mksparse(disk_img.name, 42 * 1024**2)
|
|
self.assertEqual(os.stat(disk_img.name).st_size, 42 * 1024**2)
|
|
|
|
def mkqcow2_test(self):
|
|
"""Test mkqcow2 function"""
|
|
with tempfile.NamedTemporaryFile(prefix="lorax.test.disk.") as disk_img:
|
|
mkqcow2(disk_img.name, 42 * 1024**2)
|
|
file_details = get_file_magic(disk_img.name)
|
|
self.assertTrue("QEMU QCOW" in file_details, file_details)
|
|
self.assertTrue(str(42 * 1024**2) in file_details, file_details)
|
|
|
|
@unittest.skipUnless(os.geteuid() == 0 and not os.path.exists("/.in-container"), "requires root privileges, and no containers")
|
|
def loop_test(self):
|
|
"""Test the loop_* functions (requires loop support)"""
|
|
with tempfile.NamedTemporaryFile(prefix="lorax.test.disk.") as disk_img:
|
|
mksparse(disk_img.name, 42 * 1024**2)
|
|
loop_dev = loop_attach(disk_img.name)
|
|
try:
|
|
self.assertTrue(loop_dev is not None)
|
|
self.assertEqual(loop_dev[5:], get_loop_name(disk_img.name))
|
|
finally:
|
|
loop_detach(loop_dev)
|
|
|
|
@unittest.skipUnless(os.geteuid() == 0 and not os.path.exists("/.in-container"), "requires root privileges, and no containers")
|
|
def loop_context_test(self):
|
|
"""Test the LoopDev context manager (requires loop)"""
|
|
with tempfile.NamedTemporaryFile(prefix="lorax.test.disk.") as disk_img:
|
|
mksparse(disk_img.name, 42 * 1024**2)
|
|
with LoopDev(disk_img.name) as loop_dev:
|
|
self.assertTrue(loop_dev is not None)
|
|
self.assertEqual(loop_dev[5:], get_loop_name(disk_img.name))
|
|
|
|
@unittest.skipUnless(os.geteuid() == 0 and not os.path.exists("/.in-container"), "requires root privileges, and no containers")
|
|
def dm_test(self):
|
|
"""Test the dm_* functions (requires device-mapper support)"""
|
|
with tempfile.NamedTemporaryFile(prefix="lorax.test.disk.") as disk_img:
|
|
mksparse(disk_img.name, 42 * 1024**2)
|
|
with LoopDev(disk_img.name) as loop_dev:
|
|
self.assertTrue(loop_dev is not None)
|
|
dm_name = dm_attach(loop_dev, 42 * 1024**2)
|
|
try:
|
|
self.assertTrue(dm_name is not None)
|
|
finally:
|
|
dm_detach(dm_name)
|
|
|
|
@unittest.skipUnless(os.geteuid() == 0 and not os.path.exists("/.in-container"), "requires root privileges, and no containers")
|
|
def dmdev_test(self):
|
|
"""Test the DMDev context manager (requires device-mapper support)"""
|
|
with tempfile.NamedTemporaryFile(prefix="lorax.test.disk.") as disk_img:
|
|
mksparse(disk_img.name, 42 * 1024**2)
|
|
with LoopDev(disk_img.name) as loop_dev:
|
|
self.assertTrue(loop_dev is not None)
|
|
with DMDev(loop_dev, 42 * 1024**2) as dm_name:
|
|
self.assertTrue(dm_name is not None)
|
|
|
|
@unittest.skipUnless(os.geteuid() == 0 and not os.path.exists("/.in-container"), "requires root privileges, and no containers")
|
|
def mount_test(self):
|
|
"""Test the Mount context manager (requires loop)"""
|
|
with tempfile.NamedTemporaryFile(prefix="lorax.test.disk.") as disk_img:
|
|
mksparse(disk_img.name, 42 * 1024**2)
|
|
runcmd(["mkfs.ext4", "-L", "Anaconda", "-b", "4096", "-m", "0", disk_img.name])
|
|
with LoopDev(disk_img.name) as loopdev:
|
|
self.assertTrue(loopdev is not None)
|
|
with Mount(loopdev) as mnt:
|
|
self.assertTrue(mnt is not None)
|
|
|
|
@unittest.skipUnless(os.geteuid() == 0 and not os.path.exists("/.in-container"), "requires root privileges, and no containers")
|
|
def mkdosimg_test(self):
|
|
"""Test mkdosimg function (requires loop)"""
|
|
with tempfile.TemporaryDirectory(prefix="lorax.test.") as work_dir:
|
|
with tempfile.NamedTemporaryFile(prefix="lorax.test.disk.") as disk_img:
|
|
mkfakerootdir(work_dir)
|
|
mkdosimg(work_dir, disk_img.name)
|
|
self.assertTrue(os.path.exists(disk_img.name))
|
|
file_details = get_file_magic(disk_img.name)
|
|
self.assertTrue("FAT " in file_details, file_details)
|
|
|
|
@unittest.skipUnless(os.geteuid() == 0 and not os.path.exists("/.in-container"), "requires root privileges, and no containers")
|
|
def mkext4img_test(self):
|
|
"""Test mkext4img function (requires loop)"""
|
|
with tempfile.TemporaryDirectory(prefix="lorax.test.") as work_dir:
|
|
with tempfile.NamedTemporaryFile(prefix="lorax.test.disk.") as disk_img:
|
|
mkfakerootdir(work_dir)
|
|
graft = {"/etc/yum.repos.d/": "./tests/pylorax/repos/single.repo"}
|
|
mkext4img(work_dir, disk_img.name, graft=graft)
|
|
self.assertTrue(os.path.exists(disk_img.name))
|
|
file_details = get_file_magic(disk_img.name)
|
|
self.assertTrue("ext2 filesystem" in file_details, file_details)
|
|
|
|
@unittest.skipUnless(os.geteuid() == 0 and not os.path.exists("/.in-container"), "requires root privileges, and no containers")
|
|
def mkbtrfsimg_test(self):
|
|
"""Test mkbtrfsimg function (requires loop)"""
|
|
with tempfile.TemporaryDirectory(prefix="lorax.test.") as work_dir:
|
|
with tempfile.NamedTemporaryFile(prefix="lorax.test.disk.") as disk_img:
|
|
mkfakerootdir(work_dir)
|
|
mkbtrfsimg(work_dir, disk_img.name)
|
|
self.assertTrue(os.path.exists(disk_img.name))
|
|
file_details = get_file_magic(disk_img.name)
|
|
self.assertTrue("BTRFS Filesystem" in file_details, file_details)
|
|
|
|
@unittest.skipUnless(os.geteuid() == 0 and not os.path.exists("/.in-container"), "requires root privileges, and no containers")
|
|
def mkhfsimg_test(self):
|
|
"""Test mkhfsimg function (requires loop)"""
|
|
with tempfile.TemporaryDirectory(prefix="lorax.test.") as work_dir:
|
|
with tempfile.NamedTemporaryFile(prefix="lorax.test.disk.") as disk_img:
|
|
mkfakerootdir(work_dir)
|
|
mkhfsimg(work_dir, disk_img.name, label="test")
|
|
self.assertTrue(os.path.exists(disk_img.name))
|
|
file_details = get_file_magic(disk_img.name)
|
|
self.assertTrue("Macintosh HFS" in file_details, file_details)
|
|
|
|
def default_image_name_test(self):
|
|
"""Test default_image_name function"""
|
|
for compression, suffix in [("xz", ".xz"), ("gzip", ".gz"), ("bzip2", ".bz2"), ("lzma", ".lzma")]:
|
|
filename = default_image_name(compression, "foobar")
|
|
self.assertTrue(filename.endswith(suffix))
|
|
|
|
@unittest.skipUnless(os.geteuid() == 0 and not os.path.exists("/.in-container"), "requires root privileges, and no containers")
|
|
def partition_mount_test(self):
|
|
"""Test PartitionMount context manager (requires loop)"""
|
|
with tempfile.NamedTemporaryFile(prefix="lorax.test.disk.") as disk_img:
|
|
self.assertTrue(mkfakediskimg(disk_img.name))
|
|
# Make sure it can mount the / with /etc/passwd
|
|
with PartitionMount(disk_img.name) as img_mount:
|
|
self.assertTrue(img_mount is not None)
|
|
self.assertTrue(os.path.isdir(img_mount.mount_dir))
|
|
self.assertTrue(os.path.exists(joinpaths(img_mount.mount_dir, "/etc/passwd")))
|
|
|
|
# Make sure submount works
|
|
with PartitionMount(disk_img.name, submount="/a-sub-mount/") as img_mount:
|
|
self.assertTrue(img_mount is not None)
|
|
self.assertTrue(os.path.isdir(img_mount.mount_dir))
|
|
self.assertTrue(os.path.exists(joinpaths(img_mount.mount_dir, "/etc/passwd")))
|
|
|
|
# Make sure it can mount the /boot partition with a custom mount_ok function
|
|
def mount_ok(mount_dir):
|
|
kernels = glob.glob(joinpaths(mount_dir, "vmlinuz-*"))
|
|
return len(kernels) > 0
|
|
|
|
with PartitionMount(disk_img.name, mount_ok=mount_ok) as img_mount:
|
|
self.assertTrue(img_mount is not None)
|
|
self.assertTrue(os.path.isdir(img_mount.mount_dir))
|
|
self.assertFalse(os.path.exists(joinpaths(img_mount.mount_dir, "/etc/passwd")))
|
|
self.assertTrue(os.path.exists(joinpaths(img_mount.mount_dir, "vmlinuz-4.18.13-200.fc28.x86_64")))
|
|
self.assertTrue(os.path.exists(joinpaths(img_mount.mount_dir, "initramfs-4.18.13-200.fc28.x86_64.img")))
|
|
|
|
@unittest.skipUnless(os.geteuid() == 0 and not os.path.exists("/.in-container"), "requires root privileges, and no containers")
|
|
def mkfsimage_from_disk_test(self):
|
|
"""Test creating a fsimage from the / partition of a disk image"""
|
|
with tempfile.NamedTemporaryFile(prefix="lorax.test.disk.") as disk_img:
|
|
self.assertTrue(mkfakediskimg(disk_img.name))
|
|
with tempfile.NamedTemporaryFile(prefix="lorax.test.disk.") as fs_img:
|
|
mkfsimage_from_disk(disk_img.name, fs_img.name)
|
|
self.assertTrue(os.path.exists(fs_img.name))
|
|
file_details = get_file_magic(fs_img.name)
|
|
self.assertTrue("ext2 filesystem" in file_details, file_details)
|