From c07938143a9906bc0e06e78c818227b4c06f64ad Mon Sep 17 00:00:00 2001 From: Vojtech Trefny Date: Tue, 27 May 2025 15:21:23 +0200 Subject: [PATCH 1/3] Add some basic partitioning storage tests This supplements the existing tests which use sparse files. These new test cases actually run do_it() and check the result after reset. More test cases will follow. Related: RHEL-76917 --- .../devices_test/partition_test.py | 148 ++++++++++++++++++ 1 file changed, 148 insertions(+) diff --git a/tests/storage_tests/devices_test/partition_test.py b/tests/storage_tests/devices_test/partition_test.py index 679fded6e..6ad8a8f1a 100644 --- a/tests/storage_tests/devices_test/partition_test.py +++ b/tests/storage_tests/devices_test/partition_test.py @@ -11,12 +11,15 @@ except ImportError: from mock import patch +import blivet from blivet.devices import DiskFile from blivet.devices import PartitionDevice from blivet.formats import get_format from blivet.size import Size from blivet.util import sparsetmpfile +from ..storagetestcase import StorageTestCase + Weighted = namedtuple("Weighted", ["fstype", "mountpoint", "true_funcs", "weight"]) @@ -218,3 +221,148 @@ def test_extended_min_size(self): end_free = (extended_end - logical_end) * sector_size self.assertEqual(extended_device.min_size, extended_device.align_target_size(extended_device.current_size - end_free)) + + +class PartitionTestCase(StorageTestCase): + + def setUp(self): + super().setUp() + + disks = [os.path.basename(vdev) for vdev in self.vdevs] + self.storage = blivet.Blivet() + self.storage.exclusive_disks = disks + self.storage.reset() + + # make sure only the targetcli disks are in the devicetree + for disk in self.storage.disks: + self.assertTrue(disk.path in self.vdevs) + self.assertIsNone(disk.format.type) + self.assertFalse(disk.children) + + def _clean_up(self): + self.storage.reset() + for disk in self.storage.disks: + if disk.path not in self.vdevs: + raise RuntimeError("Disk %s found in devicetree but not in disks created for tests" % disk.name) + self.storage.recursive_remove(disk) + + self.storage.do_it() + + def test_msdos_basic(self): + disk = self.storage.devicetree.get_device_by_path(self.vdevs[0]) + self.assertIsNotNone(disk) + + self.storage.format_device(disk, blivet.formats.get_format("disklabel", label_type="msdos")) + + for i in range(4): + part = self.storage.new_partition(size=Size("100 MiB"), parents=[disk], + primary=True) + self.storage.create_device(part) + + blivet.partitioning.do_partitioning(self.storage) + + self.storage.do_it() + self.storage.reset() + + disk = self.storage.devicetree.get_device_by_path(self.vdevs[0]) + self.assertIsNotNone(disk) + self.assertEqual(disk.format.type, "disklabel") + self.assertEqual(disk.format.label_type, "msdos") + self.assertIsNotNone(disk.format.parted_disk) + self.assertIsNotNone(disk.format.parted_device) + self.assertEqual(len(disk.format.partitions), 4) + self.assertEqual(len(disk.format.primary_partitions), 4) + self.assertEqual(len(disk.children), 4) + + for i in range(4): + part = self.storage.devicetree.get_device_by_path(self.vdevs[0] + str(i + 1)) + self.assertIsNotNone(part) + self.assertEqual(part.type, "partition") + self.assertEqual(part.disk, disk) + self.assertEqual(part.size, Size("100 MiB")) + self.assertTrue(part.is_primary) + self.assertFalse(part.is_extended) + self.assertFalse(part.is_logical) + self.assertIsNotNone(part.parted_partition) + + def test_msdos_extended(self): + disk = self.storage.devicetree.get_device_by_path(self.vdevs[0]) + self.assertIsNotNone(disk) + + self.storage.format_device(disk, blivet.formats.get_format("disklabel", label_type="msdos")) + + part = self.storage.new_partition(size=Size("100 MiB"), parents=[disk]) + self.storage.create_device(part) + + part = self.storage.new_partition(size=Size("1 GiB"), parents=[disk], + part_type=parted.PARTITION_EXTENDED) + self.storage.create_device(part) + + blivet.partitioning.do_partitioning(self.storage) + + for i in range(4): + part = self.storage.new_partition(size=Size("100 MiB"), parents=[disk], + part_type=parted.PARTITION_LOGICAL) + self.storage.create_device(part) + + blivet.partitioning.do_partitioning(self.storage) + + self.storage.do_it() + self.storage.reset() + + disk = self.storage.devicetree.get_device_by_path(self.vdevs[0]) + self.assertIsNotNone(disk) + self.assertEqual(disk.format.type, "disklabel") + self.assertEqual(disk.format.label_type, "msdos") + self.assertIsNotNone(disk.format.parted_disk) + self.assertIsNotNone(disk.format.parted_device) + self.assertEqual(len(disk.format.partitions), 6) + self.assertEqual(len(disk.format.primary_partitions), 1) + self.assertEqual(len(disk.children), 6) + + for i in range(4, 8): + part = self.storage.devicetree.get_device_by_path(self.vdevs[0] + str(i + 1)) + self.assertIsNotNone(part) + self.assertEqual(part.type, "partition") + self.assertEqual(part.disk, disk) + self.assertEqual(part.size, Size("100 MiB")) + self.assertFalse(part.is_primary) + self.assertFalse(part.is_extended) + self.assertTrue(part.is_logical) + self.assertIsNotNone(part.parted_partition) + + def test_gpt_basic(self): + disk = self.storage.devicetree.get_device_by_path(self.vdevs[0]) + self.assertIsNotNone(disk) + + self.storage.format_device(disk, blivet.formats.get_format("disklabel", label_type="gpt")) + + for i in range(4): + part = self.storage.new_partition(size=Size("100 MiB"), parents=[disk],) + self.storage.create_device(part) + + blivet.partitioning.do_partitioning(self.storage) + + self.storage.do_it() + self.storage.reset() + + disk = self.storage.devicetree.get_device_by_path(self.vdevs[0]) + self.assertIsNotNone(disk) + self.assertEqual(disk.format.type, "disklabel") + self.assertEqual(disk.format.label_type, "gpt") + self.assertIsNotNone(disk.format.parted_disk) + self.assertIsNotNone(disk.format.parted_device) + self.assertEqual(len(disk.format.partitions), 4) + self.assertEqual(len(disk.format.primary_partitions), 4) + self.assertEqual(len(disk.children), 4) + + for i in range(4): + part = self.storage.devicetree.get_device_by_path(self.vdevs[0] + str(i + 1)) + self.assertIsNotNone(part) + self.assertEqual(part.type, "partition") + self.assertEqual(part.disk, disk) + self.assertEqual(part.size, Size("100 MiB")) + self.assertTrue(part.is_primary) + self.assertFalse(part.is_extended) + self.assertFalse(part.is_logical) + self.assertIsNotNone(part.parted_partition) From 1486d2d47d9b757694a3da88ccc13d29d8bb12fd Mon Sep 17 00:00:00 2001 From: Vojtech Trefny Date: Tue, 27 May 2025 14:10:49 +0200 Subject: [PATCH 2/3] Wipe end partition before creating it as well as the start We are currently overwritting start of the newly created partition with zeroes to remove any filesystem metadata that might occupy the space. This extends this functionality to end of the partition to remove 1.0 MD metadata that might be there. Resolves: RHEL-76917 --- blivet/devices/partition.py | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/blivet/devices/partition.py b/blivet/devices/partition.py index 6ae4b8d36..1dac75a5a 100644 --- a/blivet/devices/partition.py +++ b/blivet/devices/partition.py @@ -599,7 +599,7 @@ def _wipe(self): """ Wipe the partition metadata. Assumes that the partition metadata is located at the start - of the partition and occupies no more than 1 MiB. + and end of the partition and occupies no more than 1 MiB. Erases in block increments. Erases the smallest number of blocks such that at least 1 MiB is erased or the whole partition is @@ -632,6 +632,24 @@ def _wipe(self): # things to settle. udev.settle() + if count >= part_len: + # very small partition, we wiped it completely already + return + + # now do the end of the partition as well (RAID 1.0 metadata) + end = self.parted_partition.geometry.end + cmd = ["dd", "if=/dev/zero", "of=%s" % device, "bs=%d" % bs, + "seek=%d" % (end - count), "count=%d" % count] + try: + util.run_program(cmd) + except OSError as e: + log.error(str(e)) + finally: + # If a udev device is created with the watch option, then + # a change uevent is synthesized and we need to wait for + # things to settle. + udev.settle() + def _create(self): """ Create the device. """ log_method_call(self, self.name, status=self.status) From f0f78b801fb52425c13d0384f6867bf55839d98f Mon Sep 17 00:00:00 2001 From: Vojtech Trefny Date: Wed, 28 May 2025 11:01:14 +0200 Subject: [PATCH 3/3] tests: Add tests for wiping stale metadata from new partitions Related: RHEL-76917 --- .../devices_test/partition_test.py | 119 ++++++++++++++++++ 1 file changed, 119 insertions(+) diff --git a/tests/storage_tests/devices_test/partition_test.py b/tests/storage_tests/devices_test/partition_test.py index 6ad8a8f1a..f4be3aa4c 100644 --- a/tests/storage_tests/devices_test/partition_test.py +++ b/tests/storage_tests/devices_test/partition_test.py @@ -4,6 +4,7 @@ import os import six import unittest +import blivet.deviceaction import parted try: @@ -366,3 +367,121 @@ def test_gpt_basic(self): self.assertFalse(part.is_extended) self.assertFalse(part.is_logical) self.assertIsNotNone(part.parted_partition) + + def _partition_wipe_check(self): + part1 = self.storage.devicetree.get_device_by_path(self.vdevs[0] + "1") + self.assertIsNotNone(part1) + self.assertIsNone(part1.format.type) + + out = blivet.util.capture_output(["blkid", "-p", "-sTYPE", "-ovalue", self.vdevs[0] + "1"]) + self.assertEqual(out.strip(), "") + + part2 = self.storage.devicetree.get_device_by_path(self.vdevs[0] + "2") + self.assertIsNotNone(part2) + self.assertEqual(part2.format.type, "ext4") + + try: + part2.format.do_check() + except blivet.errors.FSError as e: + self.fail("Partition wipe corrupted filesystem on an adjacent partition: %s" % str(e)) + + out = blivet.util.capture_output(["blkid", "-p", "-sTYPE", "-ovalue", self.vdevs[0] + "2"]) + self.assertEqual(out.strip(), "ext4") + + def test_partition_wipe_ext(self): + """ Check that any stray filesystem metadata are removed before creating a partition """ + disk = self.storage.devicetree.get_device_by_path(self.vdevs[0]) + self.assertIsNotNone(disk) + + self.storage.format_device(disk, blivet.formats.get_format("disklabel", label_type="gpt")) + + # create two partitions with ext4 + part1 = self.storage.new_partition(size=Size("100 MiB"), parents=[disk], + fmt=blivet.formats.get_format("ext4")) + self.storage.create_device(part1) + + part2 = self.storage.new_partition(size=Size("1 MiB"), parents=[disk], grow=True, + fmt=blivet.formats.get_format("ext4")) + self.storage.create_device(part2) + + blivet.partitioning.do_partitioning(self.storage) + + self.storage.do_it() + self.storage.reset() + + # remove the first partition (only the partition without removing the format) + part1 = self.storage.devicetree.get_device_by_path(self.vdevs[0] + "1") + ac = blivet.deviceaction.ActionDestroyDevice(part1) + self.storage.devicetree.actions.add(ac) + + self.storage.do_it() + self.storage.reset() + + # create the first partition again (without ext4) + disk = self.storage.devicetree.get_device_by_path(self.vdevs[0]) + part1 = self.storage.new_partition(size=Size("100 MiB"), parents=[disk]) + self.storage.create_device(part1) + + blivet.partitioning.do_partitioning(self.storage) + + # XXX PartitionDevice._post_create calls wipefs on the partition, we want to check that + # the _pre_create dd wipe works so we need to skip the _post_create wipefs call + part1._post_create = lambda: None + + self.storage.do_it() + self.storage.reset() + + # make sure the ext4 signature is not present on part1 (and untouched on part2) + self._partition_wipe_check() + + def test_partition_wipe_mdraid(self): + """ Check that any stray RAID metadata are removed before creating a partition """ + disk = self.storage.devicetree.get_device_by_path(self.vdevs[0]) + self.assertIsNotNone(disk) + + self.storage.format_device(disk, blivet.formats.get_format("disklabel", label_type="gpt")) + + # create two partitions, one empty, one with ext4 + part1 = self.storage.new_partition(size=Size("100 MiB"), parents=[disk]) + self.storage.create_device(part1) + + part2 = self.storage.new_partition(size=Size("1 MiB"), parents=[disk], grow=True, + fmt=blivet.formats.get_format("ext4")) + self.storage.create_device(part2) + + blivet.partitioning.do_partitioning(self.storage) + + self.storage.do_it() + self.storage.reset() + + # create MD RAID with metadata 1.0 on the first partition + ret = blivet.util.run_program(["mdadm", "--create", "blivetMDTest", "--level=linear", + "--metadata=1.0", "--raid-devices=1", "--force", part1.path]) + self.assertEqual(ret, 0, "Failed to create RAID array for partition wipe test") + ret = blivet.util.run_program(["mdadm", "--stop", "/dev/md/blivetMDTest"]) + self.assertEqual(ret, 0, "Failed to create RAID array for partition wipe test") + + # now remove the partition without removing the array first + part1 = self.storage.devicetree.get_device_by_path(self.vdevs[0] + "1") + ac = blivet.deviceaction.ActionDestroyDevice(part1) + self.storage.devicetree.actions.add(ac) + + self.storage.do_it() + self.storage.reset() + + # create the first partition again (without format) + disk = self.storage.devicetree.get_device_by_path(self.vdevs[0]) + part1 = self.storage.new_partition(size=Size("100 MiB"), parents=[disk]) + self.storage.create_device(part1) + + blivet.partitioning.do_partitioning(self.storage) + + # XXX PartitionDevice._post_create calls wipefs on the partition, we want to check that + # the _pre_create dd wipe works so we need to skip the _post_create wipefs call + part1._post_create = lambda: None + + self.storage.do_it() + self.storage.reset() + + # make sure the mdmember signature is not present on part1 (and ext4 is untouched on part2) + self._partition_wipe_check()