Support compressing single files.

Modify imgutils.compress to allow the "rootdir" argument to be either a
directory or a single file to add to an archive.

(cherry picked from commit c585b91422)
(cherry picked from commit 5308e7bfec)
This commit is contained in:
David Shea 2019-03-29 11:42:28 -04:00 committed by Brian C. Lane
parent c74e50d99d
commit b52877b63c
2 changed files with 33 additions and 10 deletions

View File

@ -36,8 +36,8 @@ from pylorax.executils import runcmd, runcmd_output
######## Functions for making container images (cpio, tar, squashfs) ##########
def compress(command, rootdir, outfile, compression="xz", compressargs=None):
'''Make a compressed archive of the given rootdir.
def compress(command, root, outfile, compression="xz", compressargs=None):
'''Make a compressed archive of the given rootdir or file.
command is a list of the archiver commands to run
compression should be "xz", "gzip", "lzma", "bzip2", or None.
compressargs will be used on the compression commandline.'''
@ -60,12 +60,23 @@ def compress(command, rootdir, outfile, compression="xz", compressargs=None):
compression = "pbzip2"
compressargs.insert(0, "-p%d" % multiprocessing.cpu_count())
logger.debug("find %s -print0 |%s | %s %s > %s", rootdir, " ".join(command),
compression, " ".join(compressargs), outfile)
find, archive, comp = None, None, None
try:
find = Popen(["find", ".", "-print0"], stdout=PIPE, cwd=rootdir)
archive = Popen(command, stdin=find.stdout, stdout=PIPE, cwd=rootdir)
if os.path.isdir(root):
logger.debug("find %s -print0 |%s | %s %s > %s", root, " ".join(command),
compression, " ".join(compressargs), outfile)
find = Popen(["find", ".", "-print0"], stdout=PIPE, cwd=root)
archive = Popen(command, stdin=find.stdout, stdout=PIPE, cwd=root)
else:
logger.debug("echo %s |%s | %s %s > %s", root, " ".join(command),
compression, " ".join(compressargs), outfile)
archive = Popen(command, stdin=PIPE, stdout=PIPE, cwd=os.path.dirname(root))
archive.stdin.write(os.path.basename(root).encode("utf-8") + b"\0")
archive.stdin.close()
comp = Popen([compression] + compressargs,
stdin=archive.stdout, stdout=open(outfile, "wb"))
comp.wait()
@ -76,18 +87,18 @@ def compress(command, rootdir, outfile, compression="xz", compressargs=None):
list(p.kill() for p in (find, archive, comp) if p)
return 1
def mkcpio(rootdir, outfile, compression="xz", compressargs=None):
def mkcpio(root, outfile, compression="xz", compressargs=None):
compressargs = compressargs or ["-9"]
return compress(["cpio", "--null", "--quiet", "-H", "newc", "-o"],
rootdir, outfile, compression, compressargs)
root, outfile, compression, compressargs)
def mktar(rootdir, outfile, compression="xz", compressargs=None, selinux=True):
def mktar(root, outfile, compression="xz", compressargs=None, selinux=True):
compressargs = compressargs or ["-9"]
tar_cmd = ["tar", "--no-recursion"]
if selinux:
tar_cmd += ["--selinux", "--acls", "--xattrs"]
tar_cmd += ["-cf-", "--null", "-T-"]
return compress(tar_cmd, rootdir, outfile, compression, compressargs)
return compress(tar_cmd, root, outfile, compression, compressargs)
def mksquashfs(rootdir, outfile, compression="default", compressargs=None):
'''Make a squashfs image containing the given rootdir.'''

View File

@ -17,6 +17,7 @@
import glob
import os
import parted
import tarfile
import tempfile
import unittest
@ -154,6 +155,17 @@ class ImgUtilsTest(unittest.TestCase):
file_details = get_file_magic(disk_img.name)
self.assertTrue(magic in file_details, (compression, magic, file_details))
def mktar_single_file_test(self):
with tempfile.NamedTemporaryFile(prefix="lorax.test.disk.") as disk_img,\
tempfile.NamedTemporaryFile(prefix="lorax.test.input.") as input_file:
mktar(input_file.name, disk_img.name, compression=None)
self.assertTrue(os.path.exists(disk_img.name))
self.assertTrue(tarfile.is_tarfile(disk_img.name))
with tarfile.TarFile(disk_img.name) as t:
self.assertEqual(t.getnames(), [os.path.basename(input_file.name)])
def mksquashfs_test(self):
"""Test mksquashfs function"""
with tempfile.TemporaryDirectory(prefix="lorax.test.") as work_dir: