All these are calling subprocess in 'text mode', where it will try to decode stdout/stderr using the default encoding (utf-8 for us). If it doesn't decode, subprocess will raise an exception and kobo doesn't handle it, it just passes it along to us, so things blow up - see https://pagure.io/releng/issue/12474 . To avoid this, let's set `errors="replace"`, which tells the decoder to replace invalid data with ? characters. This way we should get as much of the output as can be read, and no crashes. We also replace `universal_newlines=True` with `text=True` as the latter is shorter, clearer, and what Python 3 subprocess wants us to use, it considers `universal_newlines` to just be a backwards-compatibility thing - "The universal_newlines argument is equivalent to text and is provided for backwards compatibility" Signed-off-by: Adam Williamson <awilliam@redhat.com> Merges: https://pagure.io/pungi/pull-request/1812 (cherry picked from commit 2d16a3af004f61cf41e4eb2e5e694bb46a5d3cda)
		
			
				
	
	
		
			231 lines
		
	
	
		
			9.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			231 lines
		
	
	
		
			9.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| # -*- coding: utf-8 -*-
 | |
| 
 | |
| import itertools
 | |
| from unittest import mock
 | |
| import os
 | |
| import unittest
 | |
| 
 | |
| from pungi.wrappers import iso
 | |
| 
 | |
| CORRECT_OUTPUT = """dummy.iso:   31ff3e405e26ad01c63b62f6b11d30f6
 | |
| Fragment sums: 6eb92e7bda221d7fe5f19b4d21468c9bf261d84c96d145d96c76444b9cbc
 | |
| Fragment count: 20
 | |
| Supported ISO: no
 | |
| """
 | |
| 
 | |
| INCORRECT_OUTPUT = """This should never happen: File not found"""
 | |
| 
 | |
| XORRISO_LOAD_OUTPUT = """\
 | |
| xorriso 1.5.4 : RockRidge filesystem manipulator, libburnia project.
 | |
| 
 | |
| xorriso : NOTE : Loading ISO image tree from LBA 0
 | |
| xorriso : UPDATE :    7074 nodes read in 1 seconds
 | |
| Drive current: -indev 'dummy.iso'
 | |
| Media current: stdio file, overwriteable
 | |
| Media status : is written , is appendable
 | |
| Boot record  : El Torito , MBR isohybrid cyl-align-off GPT
 | |
| Media summary: 1 session, 5415454 data blocks, 10.3g data, 4086g free
 | |
| Volume id    : 'My volume id'
 | |
| """
 | |
| 
 | |
| # Cached to use in tests that mock os.listdir
 | |
| orig_listdir = os.listdir
 | |
| 
 | |
| 
 | |
| def fake_listdir(pattern, result=None, exc=None):
 | |
|     """Create a function that mocks os.listdir. If the path contains pattern,
 | |
|     result will be returned or exc raised. Otherwise it's normal os.listdir
 | |
|     """
 | |
| 
 | |
|     # The point of this is to avoid issues on Python 2, where apparently
 | |
|     # isdir() is using listdir(), so the mocking is breaking it.
 | |
|     def worker(path):
 | |
|         if isinstance(path, str) and pattern in path:
 | |
|             if exc:
 | |
|                 raise exc
 | |
|             return result
 | |
|         return orig_listdir(path)
 | |
| 
 | |
|     return worker
 | |
| 
 | |
| 
 | |
| class TestIsoUtils(unittest.TestCase):
 | |
|     @mock.patch("pungi.wrappers.iso.run")
 | |
|     def test_get_implanted_md5_correct(self, mock_run):
 | |
|         mock_run.return_value = (0, CORRECT_OUTPUT)
 | |
|         logger = mock.Mock()
 | |
|         self.assertEqual(
 | |
|             iso.get_implanted_md5("dummy.iso", logger=logger),
 | |
|             "31ff3e405e26ad01c63b62f6b11d30f6",
 | |
|         )
 | |
|         self.assertEqual(
 | |
|             mock_run.call_args_list,
 | |
|             [
 | |
|                 mock.call(
 | |
|                     ["/usr/bin/checkisomd5", "--md5sumonly", "dummy.iso"],
 | |
|                     text=True,
 | |
|                     errors="replace",
 | |
|                 )
 | |
|             ],
 | |
|         )
 | |
|         self.assertEqual(logger.mock_calls, [])
 | |
| 
 | |
|     @mock.patch("pungi.wrappers.iso.run")
 | |
|     def test_get_implanted_md5_incorrect(self, mock_run):
 | |
|         mock_run.return_value = (0, INCORRECT_OUTPUT)
 | |
|         logger = mock.Mock()
 | |
|         self.assertEqual(iso.get_implanted_md5("dummy.iso", logger=logger), None)
 | |
|         self.assertEqual(
 | |
|             mock_run.call_args_list,
 | |
|             [
 | |
|                 mock.call(
 | |
|                     ["/usr/bin/checkisomd5", "--md5sumonly", "dummy.iso"],
 | |
|                     text=True,
 | |
|                     errors="replace",
 | |
|                 )
 | |
|             ],
 | |
|         )
 | |
|         self.assertTrue(len(logger.mock_calls) > 0)
 | |
| 
 | |
|     @mock.patch("pungi.util.run_unmount_cmd")
 | |
|     @mock.patch("pungi.wrappers.iso.run")
 | |
|     def test_mount_iso(self, mock_run, mock_unmount):
 | |
|         # first tuple is return value for command 'which guestmount'
 | |
|         # value determines type of the mount/unmount
 | |
|         # command ('1' - guestmount is not available)
 | |
|         # for approach as a root, pair commands mount-umount are used
 | |
|         mock_run.side_effect = [(1, ""), (0, "")]
 | |
|         with iso.mount("dummy") as temp_dir:
 | |
|             self.assertTrue(os.path.isdir(temp_dir))
 | |
|         self.assertEqual(len(mock_run.call_args_list), 2)
 | |
|         mount_call_str = str(mock_run.call_args_list[1])
 | |
|         self.assertTrue(mount_call_str.startswith("call(['mount'"))
 | |
|         self.assertEqual(len(mock_unmount.call_args_list), 1)
 | |
|         unmount_call_str = str(mock_unmount.call_args_list[0])
 | |
|         self.assertTrue(unmount_call_str.startswith("call(['umount'"))
 | |
|         self.assertFalse(os.path.isdir(temp_dir))
 | |
| 
 | |
|     @mock.patch("pungi.util.rmtree")
 | |
|     @mock.patch("os.listdir", new=fake_listdir("guestfs", ["root"]))
 | |
|     @mock.patch("pungi.util.run_unmount_cmd")
 | |
|     @mock.patch("pungi.wrappers.iso.run")
 | |
|     def test_guestmount(self, mock_run, mock_unmount, mock_rmtree):
 | |
|         # first tuple is return value for command 'which guestmount'
 | |
|         # value determines type of the mount/unmount
 | |
|         # command ('0' - guestmount is available)
 | |
|         # for approach as a non-root, pair commands guestmount-fusermount are used
 | |
|         mock_run.side_effect = [(0, ""), (0, "")]
 | |
|         with iso.mount("dummy") as temp_dir:
 | |
|             self.assertTrue(os.path.isdir(temp_dir))
 | |
|         self.assertEqual(len(mock_run.call_args_list), 2)
 | |
|         mount_call_str = str(mock_run.call_args_list[1])
 | |
|         self.assertTrue(mount_call_str.startswith("call(['guestmount'"))
 | |
|         self.assertEqual(len(mock_unmount.call_args_list), 1)
 | |
|         unmount_call_str = str(mock_unmount.call_args_list[0])
 | |
|         self.assertTrue(unmount_call_str.startswith("call(['fusermount'"))
 | |
|         self.assertFalse(os.path.isdir(temp_dir))
 | |
| 
 | |
|     @mock.patch("pungi.util.rmtree")
 | |
|     @mock.patch("os.listdir", new=fake_listdir("guestfs", []))
 | |
|     @mock.patch("pungi.util.run_unmount_cmd")
 | |
|     @mock.patch("pungi.wrappers.iso.run")
 | |
|     def test_guestmount_cleans_up_cache(self, mock_run, mock_unmount, mock_rmtree):
 | |
|         # first tuple is return value for command 'which guestmount'
 | |
|         # value determines type of the mount/unmount
 | |
|         # command ('0' - guestmount is available)
 | |
|         # for approach as a non-root, pair commands guestmount-fusermount are used
 | |
|         mock_run.side_effect = [(0, ""), (0, "")]
 | |
|         with iso.mount("dummy") as temp_dir:
 | |
|             self.assertTrue(os.path.isdir(temp_dir))
 | |
|         self.assertEqual(len(mock_run.call_args_list), 2)
 | |
|         mount_call_str = str(mock_run.call_args_list[1])
 | |
|         self.assertTrue(mount_call_str.startswith("call(['guestmount'"))
 | |
|         self.assertEqual(len(mock_unmount.call_args_list), 1)
 | |
|         unmount_call_str = str(mock_unmount.call_args_list[0])
 | |
|         self.assertTrue(unmount_call_str.startswith("call(['fusermount'"))
 | |
|         self.assertFalse(os.path.isdir(temp_dir))
 | |
| 
 | |
|     @mock.patch("pungi.util.rmtree")
 | |
|     @mock.patch("os.listdir", new=fake_listdir("guestfs", OSError("No such file")))
 | |
|     @mock.patch("pungi.util.run_unmount_cmd")
 | |
|     @mock.patch("pungi.wrappers.iso.run")
 | |
|     def test_guestmount_handles_missing_cache(
 | |
|         self, mock_run, mock_unmount, mock_rmtree
 | |
|     ):
 | |
|         # first tuple is return value for command 'which guestmount'
 | |
|         # value determines type of the mount/unmount
 | |
|         # command ('0' - guestmount is available)
 | |
|         # for approach as a non-root, pair commands guestmount-fusermount are used
 | |
|         mock_run.side_effect = [(0, ""), (0, "")]
 | |
|         with iso.mount("dummy") as temp_dir:
 | |
|             self.assertTrue(os.path.isdir(temp_dir))
 | |
|         self.assertEqual(len(mock_run.call_args_list), 2)
 | |
|         mount_call_str = str(mock_run.call_args_list[1])
 | |
|         self.assertTrue(mount_call_str.startswith("call(['guestmount'"))
 | |
|         self.assertEqual(len(mock_unmount.call_args_list), 1)
 | |
|         unmount_call_str = str(mock_unmount.call_args_list[0])
 | |
|         self.assertTrue(unmount_call_str.startswith("call(['fusermount'"))
 | |
|         self.assertFalse(os.path.isdir(temp_dir))
 | |
| 
 | |
|     @mock.patch("pungi.util.run_unmount_cmd")
 | |
|     @mock.patch("pungi.wrappers.iso.run")
 | |
|     def test_mount_iso_always_unmounts(self, mock_run, mock_unmount):
 | |
|         mock_run.side_effect = [(1, ""), (0, "")]
 | |
|         try:
 | |
|             with iso.mount("dummy") as temp_dir:
 | |
|                 self.assertTrue(os.path.isdir(temp_dir))
 | |
|                 raise RuntimeError()
 | |
|         except RuntimeError:
 | |
|             pass
 | |
|         self.assertEqual(len(mock_run.call_args_list), 2)
 | |
|         self.assertEqual(len(mock_unmount.call_args_list), 1)
 | |
|         self.assertFalse(os.path.isdir(temp_dir))
 | |
| 
 | |
|     @mock.patch("pungi.util.run_unmount_cmd")
 | |
|     @mock.patch("pungi.wrappers.iso.run")
 | |
|     def test_mount_iso_raises_on_error(self, mock_run, mock_unmount):
 | |
|         log = mock.Mock()
 | |
|         mock_run.side_effect = [(1, ""), (1, "Boom")]
 | |
|         with self.assertRaises(RuntimeError):
 | |
|             with iso.mount("dummy", logger=log) as temp_dir:
 | |
|                 self.assertTrue(os.path.isdir(temp_dir))
 | |
|         self.assertEqual(len(mock_run.call_args_list), 2)
 | |
|         self.assertEqual(len(mock_unmount.call_args_list), 0)
 | |
|         self.assertEqual(len(log.mock_calls), 1)
 | |
| 
 | |
|     @mock.patch("pungi.wrappers.iso.run")
 | |
|     def test_get_volume_id_xorriso(self, mock_run):
 | |
|         mock_run.return_value = (0, XORRISO_LOAD_OUTPUT)
 | |
|         self.assertEqual(iso.get_volume_id("/dummy.iso", True), "My volume id")
 | |
| 
 | |
| 
 | |
| class TestCmpGraftPoints(unittest.TestCase):
 | |
|     def assertSorted(self, *args):
 | |
|         """Tests that all permutations of arguments yield the same sorted results."""
 | |
|         for perm in itertools.permutations(args):
 | |
|             self.assertEqual(sorted(perm, key=iso.graft_point_sort_key), list(args))
 | |
| 
 | |
|     def test_eq(self):
 | |
|         self.assertSorted("pkgs/foo.rpm", "pkgs/foo.rpm")
 | |
| 
 | |
|     def test_rpms_sorted_alphabetically(self):
 | |
|         self.assertSorted("pkgs/bar.rpm", "pkgs/foo.rpm")
 | |
| 
 | |
|     def test_images_sorted_alphabetically(self):
 | |
|         self.assertSorted("aaa.img", "images/foo", "isolinux/foo")
 | |
| 
 | |
|     def test_other_files_sorted_alphabetically(self):
 | |
|         self.assertSorted("bar.txt", "foo.txt")
 | |
| 
 | |
|     def test_rpms_after_images(self):
 | |
|         self.assertSorted("foo.ins", "bar.rpm")
 | |
| 
 | |
|     def test_other_after_images(self):
 | |
|         self.assertSorted("EFI/anything", "zzz.txt")
 | |
| 
 | |
|     def test_rpm_after_other(self):
 | |
|         self.assertSorted("bbb.txt", "aaa.rpm")
 | |
| 
 | |
|     def test_all_kinds(self):
 | |
|         self.assertSorted("etc/file", "ppc/file", "c.txt", "d.txt", "a.rpm", "b.rpm")
 |