From 025cc54c04132a056ba863ecdb1d05f3465632a5 Mon Sep 17 00:00:00 2001 From: Vojtech Trefny Date: Tue, 27 May 2025 15:21:23 +0200 Subject: [PATCH 1/3] Add some basic partitioning storage tests This supplements the existing tests which use sparse files. These new test cases actually run do_it() and check the result after reset. More test cases will follow. Related: RHEL-93967 --- .../devices_test/partition_test.py | 148 ++++++++++++++++++ 1 file changed, 148 insertions(+) diff --git a/tests/storage_tests/devices_test/partition_test.py b/tests/storage_tests/devices_test/partition_test.py index 73da87b43..30e6a0151 100644 --- a/tests/storage_tests/devices_test/partition_test.py +++ b/tests/storage_tests/devices_test/partition_test.py @@ -5,6 +5,7 @@ from unittest.mock import patch +import blivet from blivet.devices import DiskFile from blivet.devices import PartitionDevice from blivet.devicelibs.gpt import gpt_part_uuid_for_mountpoint @@ -13,6 +14,8 @@ from blivet.size import Size from blivet.util import sparsetmpfile +from ..storagetestcase import StorageTestCase + class PartitionDeviceTestCase(unittest.TestCase): @@ -266,3 +269,148 @@ def test_dev_part_type_gpt_autodiscover(self): flags.gpt_discoverable_partitions = True self.assertEqual(device.part_type_uuid, gpt_part_uuid_for_mountpoint("/home")) + + +class PartitionTestCase(StorageTestCase): + + def setUp(self): + super().setUp() + + disks = [os.path.basename(vdev) for vdev in self.vdevs] + self.storage = blivet.Blivet() + self.storage.exclusive_disks = disks + self.storage.reset() + + # make sure only the targetcli disks are in the devicetree + for disk in self.storage.disks: + self.assertTrue(disk.path in self.vdevs) + self.assertIsNone(disk.format.type) + self.assertFalse(disk.children) + + def _clean_up(self): + self.storage.reset() + for disk in self.storage.disks: + if disk.path not in self.vdevs: + raise RuntimeError("Disk %s found in devicetree but not in disks created for tests" % disk.name) + self.storage.recursive_remove(disk) + + self.storage.do_it() + + def test_msdos_basic(self): + disk = self.storage.devicetree.get_device_by_path(self.vdevs[0]) + self.assertIsNotNone(disk) + + self.storage.format_device(disk, blivet.formats.get_format("disklabel", label_type="msdos")) + + for i in range(4): + part = self.storage.new_partition(size=Size("100 MiB"), parents=[disk], + primary=True) + self.storage.create_device(part) + + blivet.partitioning.do_partitioning(self.storage) + + self.storage.do_it() + self.storage.reset() + + disk = self.storage.devicetree.get_device_by_path(self.vdevs[0]) + self.assertIsNotNone(disk) + self.assertEqual(disk.format.type, "disklabel") + self.assertEqual(disk.format.label_type, "msdos") + self.assertIsNotNone(disk.format.parted_disk) + self.assertIsNotNone(disk.format.parted_device) + self.assertEqual(len(disk.format.partitions), 4) + self.assertEqual(len(disk.format.primary_partitions), 4) + self.assertEqual(len(disk.children), 4) + + for i in range(4): + part = self.storage.devicetree.get_device_by_path(self.vdevs[0] + str(i + 1)) + self.assertIsNotNone(part) + self.assertEqual(part.type, "partition") + self.assertEqual(part.disk, disk) + self.assertEqual(part.size, Size("100 MiB")) + self.assertTrue(part.is_primary) + self.assertFalse(part.is_extended) + self.assertFalse(part.is_logical) + self.assertIsNotNone(part.parted_partition) + + def test_msdos_extended(self): + disk = self.storage.devicetree.get_device_by_path(self.vdevs[0]) + self.assertIsNotNone(disk) + + self.storage.format_device(disk, blivet.formats.get_format("disklabel", label_type="msdos")) + + part = self.storage.new_partition(size=Size("100 MiB"), parents=[disk]) + self.storage.create_device(part) + + part = self.storage.new_partition(size=Size("1 GiB"), parents=[disk], + part_type=parted.PARTITION_EXTENDED) + self.storage.create_device(part) + + blivet.partitioning.do_partitioning(self.storage) + + for i in range(4): + part = self.storage.new_partition(size=Size("100 MiB"), parents=[disk], + part_type=parted.PARTITION_LOGICAL) + self.storage.create_device(part) + + blivet.partitioning.do_partitioning(self.storage) + + self.storage.do_it() + self.storage.reset() + + disk = self.storage.devicetree.get_device_by_path(self.vdevs[0]) + self.assertIsNotNone(disk) + self.assertEqual(disk.format.type, "disklabel") + self.assertEqual(disk.format.label_type, "msdos") + self.assertIsNotNone(disk.format.parted_disk) + self.assertIsNotNone(disk.format.parted_device) + self.assertEqual(len(disk.format.partitions), 6) + self.assertEqual(len(disk.format.primary_partitions), 1) + self.assertEqual(len(disk.children), 6) + + for i in range(4, 8): + part = self.storage.devicetree.get_device_by_path(self.vdevs[0] + str(i + 1)) + self.assertIsNotNone(part) + self.assertEqual(part.type, "partition") + self.assertEqual(part.disk, disk) + self.assertEqual(part.size, Size("100 MiB")) + self.assertFalse(part.is_primary) + self.assertFalse(part.is_extended) + self.assertTrue(part.is_logical) + self.assertIsNotNone(part.parted_partition) + + def test_gpt_basic(self): + disk = self.storage.devicetree.get_device_by_path(self.vdevs[0]) + self.assertIsNotNone(disk) + + self.storage.format_device(disk, blivet.formats.get_format("disklabel", label_type="gpt")) + + for i in range(4): + part = self.storage.new_partition(size=Size("100 MiB"), parents=[disk],) + self.storage.create_device(part) + + blivet.partitioning.do_partitioning(self.storage) + + self.storage.do_it() + self.storage.reset() + + disk = self.storage.devicetree.get_device_by_path(self.vdevs[0]) + self.assertIsNotNone(disk) + self.assertEqual(disk.format.type, "disklabel") + self.assertEqual(disk.format.label_type, "gpt") + self.assertIsNotNone(disk.format.parted_disk) + self.assertIsNotNone(disk.format.parted_device) + self.assertEqual(len(disk.format.partitions), 4) + self.assertEqual(len(disk.format.primary_partitions), 4) + self.assertEqual(len(disk.children), 4) + + for i in range(4): + part = self.storage.devicetree.get_device_by_path(self.vdevs[0] + str(i + 1)) + self.assertIsNotNone(part) + self.assertEqual(part.type, "partition") + self.assertEqual(part.disk, disk) + self.assertEqual(part.size, Size("100 MiB")) + self.assertTrue(part.is_primary) + self.assertFalse(part.is_extended) + self.assertFalse(part.is_logical) + self.assertIsNotNone(part.parted_partition) From ab6261adbdfedc26c6b0712a42a3dd9169cabb38 Mon Sep 17 00:00:00 2001 From: Vojtech Trefny Date: Tue, 27 May 2025 14:10:49 +0200 Subject: [PATCH 2/3] Wipe end partition before creating it as well as the start We are currently overwritting start of the newly created partition with zeroes to remove any filesystem metadata that might occupy the space. This extends this functionality to end of the partition to remove 1.0 MD metadata that might be there. Resolves: RHEL-93967 --- blivet/devices/partition.py | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/blivet/devices/partition.py b/blivet/devices/partition.py index 89470d9fb..fc9a97be7 100644 --- a/blivet/devices/partition.py +++ b/blivet/devices/partition.py @@ -659,7 +659,7 @@ def _wipe(self): """ Wipe the partition metadata. Assumes that the partition metadata is located at the start - of the partition and occupies no more than 1 MiB. + and end of the partition and occupies no more than 1 MiB. Erases in block increments. Erases the smallest number of blocks such that at least 1 MiB is erased or the whole partition is @@ -692,6 +692,24 @@ def _wipe(self): # things to settle. udev.settle() + if count >= part_len: + # very small partition, we wiped it completely already + return + + # now do the end of the partition as well (RAID 1.0 metadata) + end = self.parted_partition.geometry.end + cmd = ["dd", "if=/dev/zero", "of=%s" % device, "bs=%d" % bs, + "seek=%d" % (end - count), "count=%d" % count] + try: + util.run_program(cmd) + except OSError as e: + log.error(str(e)) + finally: + # If a udev device is created with the watch option, then + # a change uevent is synthesized and we need to wait for + # things to settle. + udev.settle() + def _create(self): """ Create the device. """ log_method_call(self, self.name, status=self.status) From 936cccdf67e3ee612399bd3f0f8b383ca118ce9b Mon Sep 17 00:00:00 2001 From: Vojtech Trefny Date: Wed, 28 May 2025 11:01:14 +0200 Subject: [PATCH 3/3] tests: Add tests for wiping stale metadata from new partitions Related: RHEL-93967 --- .../devices_test/partition_test.py | 119 ++++++++++++++++++ 1 file changed, 119 insertions(+) diff --git a/tests/storage_tests/devices_test/partition_test.py b/tests/storage_tests/devices_test/partition_test.py index 30e6a0151..87e4b1155 100644 --- a/tests/storage_tests/devices_test/partition_test.py +++ b/tests/storage_tests/devices_test/partition_test.py @@ -1,6 +1,7 @@ import os import unittest from uuid import UUID +import blivet.deviceaction import parted from unittest.mock import patch @@ -414,3 +415,121 @@ def test_gpt_basic(self): self.assertFalse(part.is_extended) self.assertFalse(part.is_logical) self.assertIsNotNone(part.parted_partition) + + def _partition_wipe_check(self): + part1 = self.storage.devicetree.get_device_by_path(self.vdevs[0] + "1") + self.assertIsNotNone(part1) + self.assertIsNone(part1.format.type) + + out = blivet.util.capture_output(["blkid", "-p", "-sTYPE", "-ovalue", self.vdevs[0] + "1"]) + self.assertEqual(out.strip(), "") + + part2 = self.storage.devicetree.get_device_by_path(self.vdevs[0] + "2") + self.assertIsNotNone(part2) + self.assertEqual(part2.format.type, "ext4") + + try: + part2.format.do_check() + except blivet.errors.FSError as e: + self.fail("Partition wipe corrupted filesystem on an adjacent partition: %s" % str(e)) + + out = blivet.util.capture_output(["blkid", "-p", "-sTYPE", "-ovalue", self.vdevs[0] + "2"]) + self.assertEqual(out.strip(), "ext4") + + def test_partition_wipe_ext(self): + """ Check that any stray filesystem metadata are removed before creating a partition """ + disk = self.storage.devicetree.get_device_by_path(self.vdevs[0]) + self.assertIsNotNone(disk) + + self.storage.format_device(disk, blivet.formats.get_format("disklabel", label_type="gpt")) + + # create two partitions with ext4 + part1 = self.storage.new_partition(size=Size("100 MiB"), parents=[disk], + fmt=blivet.formats.get_format("ext4")) + self.storage.create_device(part1) + + part2 = self.storage.new_partition(size=Size("1 MiB"), parents=[disk], grow=True, + fmt=blivet.formats.get_format("ext4")) + self.storage.create_device(part2) + + blivet.partitioning.do_partitioning(self.storage) + + self.storage.do_it() + self.storage.reset() + + # remove the first partition (only the partition without removing the format) + part1 = self.storage.devicetree.get_device_by_path(self.vdevs[0] + "1") + ac = blivet.deviceaction.ActionDestroyDevice(part1) + self.storage.devicetree.actions.add(ac) + + self.storage.do_it() + self.storage.reset() + + # create the first partition again (without ext4) + disk = self.storage.devicetree.get_device_by_path(self.vdevs[0]) + part1 = self.storage.new_partition(size=Size("100 MiB"), parents=[disk]) + self.storage.create_device(part1) + + blivet.partitioning.do_partitioning(self.storage) + + # XXX PartitionDevice._post_create calls wipefs on the partition, we want to check that + # the _pre_create dd wipe works so we need to skip the _post_create wipefs call + part1._post_create = lambda: None + + self.storage.do_it() + self.storage.reset() + + # make sure the ext4 signature is not present on part1 (and untouched on part2) + self._partition_wipe_check() + + def test_partition_wipe_mdraid(self): + """ Check that any stray RAID metadata are removed before creating a partition """ + disk = self.storage.devicetree.get_device_by_path(self.vdevs[0]) + self.assertIsNotNone(disk) + + self.storage.format_device(disk, blivet.formats.get_format("disklabel", label_type="gpt")) + + # create two partitions, one empty, one with ext4 + part1 = self.storage.new_partition(size=Size("100 MiB"), parents=[disk]) + self.storage.create_device(part1) + + part2 = self.storage.new_partition(size=Size("1 MiB"), parents=[disk], grow=True, + fmt=blivet.formats.get_format("ext4")) + self.storage.create_device(part2) + + blivet.partitioning.do_partitioning(self.storage) + + self.storage.do_it() + self.storage.reset() + + # create MD RAID with metadata 1.0 on the first partition + ret = blivet.util.run_program(["mdadm", "--create", "blivetMDTest", "--level=linear", + "--metadata=1.0", "--raid-devices=1", "--force", part1.path]) + self.assertEqual(ret, 0, "Failed to create RAID array for partition wipe test") + ret = blivet.util.run_program(["mdadm", "--stop", "/dev/md/blivetMDTest"]) + self.assertEqual(ret, 0, "Failed to create RAID array for partition wipe test") + + # now remove the partition without removing the array first + part1 = self.storage.devicetree.get_device_by_path(self.vdevs[0] + "1") + ac = blivet.deviceaction.ActionDestroyDevice(part1) + self.storage.devicetree.actions.add(ac) + + self.storage.do_it() + self.storage.reset() + + # create the first partition again (without format) + disk = self.storage.devicetree.get_device_by_path(self.vdevs[0]) + part1 = self.storage.new_partition(size=Size("100 MiB"), parents=[disk]) + self.storage.create_device(part1) + + blivet.partitioning.do_partitioning(self.storage) + + # XXX PartitionDevice._post_create calls wipefs on the partition, we want to check that + # the _pre_create dd wipe works so we need to skip the _post_create wipefs call + part1._post_create = lambda: None + + self.storage.do_it() + self.storage.reset() + + # make sure the mdmember signature is not present on part1 (and ext4 is untouched on part2) + self._partition_wipe_check()