python-blivet/0037-Wipe-end-partition-before-creating-it-as-well-as-the-start.patch
2025-05-30 13:37:05 +02:00

385 lines
16 KiB
Diff

From c07938143a9906bc0e06e78c818227b4c06f64ad Mon Sep 17 00:00:00 2001
From: Vojtech Trefny <vtrefny@redhat.com>
Date: Tue, 27 May 2025 15:21:23 +0200
Subject: [PATCH 1/3] Add some basic partitioning storage tests
This supplements the existing tests which use sparse files. These
new test cases actually run do_it() and check the result after
reset. More test cases will follow.
Related: RHEL-76917
---
.../devices_test/partition_test.py | 148 ++++++++++++++++++
1 file changed, 148 insertions(+)
diff --git a/tests/storage_tests/devices_test/partition_test.py b/tests/storage_tests/devices_test/partition_test.py
index 679fded6e..6ad8a8f1a 100644
--- a/tests/storage_tests/devices_test/partition_test.py
+++ b/tests/storage_tests/devices_test/partition_test.py
@@ -11,12 +11,15 @@
except ImportError:
from mock import patch
+import blivet
from blivet.devices import DiskFile
from blivet.devices import PartitionDevice
from blivet.formats import get_format
from blivet.size import Size
from blivet.util import sparsetmpfile
+from ..storagetestcase import StorageTestCase
+
Weighted = namedtuple("Weighted", ["fstype", "mountpoint", "true_funcs", "weight"])
@@ -218,3 +221,148 @@ def test_extended_min_size(self):
end_free = (extended_end - logical_end) * sector_size
self.assertEqual(extended_device.min_size,
extended_device.align_target_size(extended_device.current_size - end_free))
+
+
+class PartitionTestCase(StorageTestCase):
+
+ def setUp(self):
+ super().setUp()
+
+ disks = [os.path.basename(vdev) for vdev in self.vdevs]
+ self.storage = blivet.Blivet()
+ self.storage.exclusive_disks = disks
+ self.storage.reset()
+
+ # make sure only the targetcli disks are in the devicetree
+ for disk in self.storage.disks:
+ self.assertTrue(disk.path in self.vdevs)
+ self.assertIsNone(disk.format.type)
+ self.assertFalse(disk.children)
+
+ def _clean_up(self):
+ self.storage.reset()
+ for disk in self.storage.disks:
+ if disk.path not in self.vdevs:
+ raise RuntimeError("Disk %s found in devicetree but not in disks created for tests" % disk.name)
+ self.storage.recursive_remove(disk)
+
+ self.storage.do_it()
+
+ def test_msdos_basic(self):
+ disk = self.storage.devicetree.get_device_by_path(self.vdevs[0])
+ self.assertIsNotNone(disk)
+
+ self.storage.format_device(disk, blivet.formats.get_format("disklabel", label_type="msdos"))
+
+ for i in range(4):
+ part = self.storage.new_partition(size=Size("100 MiB"), parents=[disk],
+ primary=True)
+ self.storage.create_device(part)
+
+ blivet.partitioning.do_partitioning(self.storage)
+
+ self.storage.do_it()
+ self.storage.reset()
+
+ disk = self.storage.devicetree.get_device_by_path(self.vdevs[0])
+ self.assertIsNotNone(disk)
+ self.assertEqual(disk.format.type, "disklabel")
+ self.assertEqual(disk.format.label_type, "msdos")
+ self.assertIsNotNone(disk.format.parted_disk)
+ self.assertIsNotNone(disk.format.parted_device)
+ self.assertEqual(len(disk.format.partitions), 4)
+ self.assertEqual(len(disk.format.primary_partitions), 4)
+ self.assertEqual(len(disk.children), 4)
+
+ for i in range(4):
+ part = self.storage.devicetree.get_device_by_path(self.vdevs[0] + str(i + 1))
+ self.assertIsNotNone(part)
+ self.assertEqual(part.type, "partition")
+ self.assertEqual(part.disk, disk)
+ self.assertEqual(part.size, Size("100 MiB"))
+ self.assertTrue(part.is_primary)
+ self.assertFalse(part.is_extended)
+ self.assertFalse(part.is_logical)
+ self.assertIsNotNone(part.parted_partition)
+
+ def test_msdos_extended(self):
+ disk = self.storage.devicetree.get_device_by_path(self.vdevs[0])
+ self.assertIsNotNone(disk)
+
+ self.storage.format_device(disk, blivet.formats.get_format("disklabel", label_type="msdos"))
+
+ part = self.storage.new_partition(size=Size("100 MiB"), parents=[disk])
+ self.storage.create_device(part)
+
+ part = self.storage.new_partition(size=Size("1 GiB"), parents=[disk],
+ part_type=parted.PARTITION_EXTENDED)
+ self.storage.create_device(part)
+
+ blivet.partitioning.do_partitioning(self.storage)
+
+ for i in range(4):
+ part = self.storage.new_partition(size=Size("100 MiB"), parents=[disk],
+ part_type=parted.PARTITION_LOGICAL)
+ self.storage.create_device(part)
+
+ blivet.partitioning.do_partitioning(self.storage)
+
+ self.storage.do_it()
+ self.storage.reset()
+
+ disk = self.storage.devicetree.get_device_by_path(self.vdevs[0])
+ self.assertIsNotNone(disk)
+ self.assertEqual(disk.format.type, "disklabel")
+ self.assertEqual(disk.format.label_type, "msdos")
+ self.assertIsNotNone(disk.format.parted_disk)
+ self.assertIsNotNone(disk.format.parted_device)
+ self.assertEqual(len(disk.format.partitions), 6)
+ self.assertEqual(len(disk.format.primary_partitions), 1)
+ self.assertEqual(len(disk.children), 6)
+
+ for i in range(4, 8):
+ part = self.storage.devicetree.get_device_by_path(self.vdevs[0] + str(i + 1))
+ self.assertIsNotNone(part)
+ self.assertEqual(part.type, "partition")
+ self.assertEqual(part.disk, disk)
+ self.assertEqual(part.size, Size("100 MiB"))
+ self.assertFalse(part.is_primary)
+ self.assertFalse(part.is_extended)
+ self.assertTrue(part.is_logical)
+ self.assertIsNotNone(part.parted_partition)
+
+ def test_gpt_basic(self):
+ disk = self.storage.devicetree.get_device_by_path(self.vdevs[0])
+ self.assertIsNotNone(disk)
+
+ self.storage.format_device(disk, blivet.formats.get_format("disklabel", label_type="gpt"))
+
+ for i in range(4):
+ part = self.storage.new_partition(size=Size("100 MiB"), parents=[disk],)
+ self.storage.create_device(part)
+
+ blivet.partitioning.do_partitioning(self.storage)
+
+ self.storage.do_it()
+ self.storage.reset()
+
+ disk = self.storage.devicetree.get_device_by_path(self.vdevs[0])
+ self.assertIsNotNone(disk)
+ self.assertEqual(disk.format.type, "disklabel")
+ self.assertEqual(disk.format.label_type, "gpt")
+ self.assertIsNotNone(disk.format.parted_disk)
+ self.assertIsNotNone(disk.format.parted_device)
+ self.assertEqual(len(disk.format.partitions), 4)
+ self.assertEqual(len(disk.format.primary_partitions), 4)
+ self.assertEqual(len(disk.children), 4)
+
+ for i in range(4):
+ part = self.storage.devicetree.get_device_by_path(self.vdevs[0] + str(i + 1))
+ self.assertIsNotNone(part)
+ self.assertEqual(part.type, "partition")
+ self.assertEqual(part.disk, disk)
+ self.assertEqual(part.size, Size("100 MiB"))
+ self.assertTrue(part.is_primary)
+ self.assertFalse(part.is_extended)
+ self.assertFalse(part.is_logical)
+ self.assertIsNotNone(part.parted_partition)
From 1486d2d47d9b757694a3da88ccc13d29d8bb12fd Mon Sep 17 00:00:00 2001
From: Vojtech Trefny <vtrefny@redhat.com>
Date: Tue, 27 May 2025 14:10:49 +0200
Subject: [PATCH 2/3] Wipe end partition before creating it as well as the
start
We are currently overwritting start of the newly created partition
with zeroes to remove any filesystem metadata that might occupy
the space. This extends this functionality to end of the partition
to remove 1.0 MD metadata that might be there.
Resolves: RHEL-76917
---
blivet/devices/partition.py | 20 +++++++++++++++++++-
1 file changed, 19 insertions(+), 1 deletion(-)
diff --git a/blivet/devices/partition.py b/blivet/devices/partition.py
index 6ae4b8d36..1dac75a5a 100644
--- a/blivet/devices/partition.py
+++ b/blivet/devices/partition.py
@@ -599,7 +599,7 @@ def _wipe(self):
""" Wipe the partition metadata.
Assumes that the partition metadata is located at the start
- of the partition and occupies no more than 1 MiB.
+ and end of the partition and occupies no more than 1 MiB.
Erases in block increments. Erases the smallest number of blocks
such that at least 1 MiB is erased or the whole partition is
@@ -632,6 +632,24 @@ def _wipe(self):
# things to settle.
udev.settle()
+ if count >= part_len:
+ # very small partition, we wiped it completely already
+ return
+
+ # now do the end of the partition as well (RAID 1.0 metadata)
+ end = self.parted_partition.geometry.end
+ cmd = ["dd", "if=/dev/zero", "of=%s" % device, "bs=%d" % bs,
+ "seek=%d" % (end - count), "count=%d" % count]
+ try:
+ util.run_program(cmd)
+ except OSError as e:
+ log.error(str(e))
+ finally:
+ # If a udev device is created with the watch option, then
+ # a change uevent is synthesized and we need to wait for
+ # things to settle.
+ udev.settle()
+
def _create(self):
""" Create the device. """
log_method_call(self, self.name, status=self.status)
From f0f78b801fb52425c13d0384f6867bf55839d98f Mon Sep 17 00:00:00 2001
From: Vojtech Trefny <vtrefny@redhat.com>
Date: Wed, 28 May 2025 11:01:14 +0200
Subject: [PATCH 3/3] tests: Add tests for wiping stale metadata from new
partitions
Related: RHEL-76917
---
.../devices_test/partition_test.py | 119 ++++++++++++++++++
1 file changed, 119 insertions(+)
diff --git a/tests/storage_tests/devices_test/partition_test.py b/tests/storage_tests/devices_test/partition_test.py
index 6ad8a8f1a..f4be3aa4c 100644
--- a/tests/storage_tests/devices_test/partition_test.py
+++ b/tests/storage_tests/devices_test/partition_test.py
@@ -4,6 +4,7 @@
import os
import six
import unittest
+import blivet.deviceaction
import parted
try:
@@ -366,3 +367,121 @@ def test_gpt_basic(self):
self.assertFalse(part.is_extended)
self.assertFalse(part.is_logical)
self.assertIsNotNone(part.parted_partition)
+
+ def _partition_wipe_check(self):
+ part1 = self.storage.devicetree.get_device_by_path(self.vdevs[0] + "1")
+ self.assertIsNotNone(part1)
+ self.assertIsNone(part1.format.type)
+
+ out = blivet.util.capture_output(["blkid", "-p", "-sTYPE", "-ovalue", self.vdevs[0] + "1"])
+ self.assertEqual(out.strip(), "")
+
+ part2 = self.storage.devicetree.get_device_by_path(self.vdevs[0] + "2")
+ self.assertIsNotNone(part2)
+ self.assertEqual(part2.format.type, "ext4")
+
+ try:
+ part2.format.do_check()
+ except blivet.errors.FSError as e:
+ self.fail("Partition wipe corrupted filesystem on an adjacent partition: %s" % str(e))
+
+ out = blivet.util.capture_output(["blkid", "-p", "-sTYPE", "-ovalue", self.vdevs[0] + "2"])
+ self.assertEqual(out.strip(), "ext4")
+
+ def test_partition_wipe_ext(self):
+ """ Check that any stray filesystem metadata are removed before creating a partition """
+ disk = self.storage.devicetree.get_device_by_path(self.vdevs[0])
+ self.assertIsNotNone(disk)
+
+ self.storage.format_device(disk, blivet.formats.get_format("disklabel", label_type="gpt"))
+
+ # create two partitions with ext4
+ part1 = self.storage.new_partition(size=Size("100 MiB"), parents=[disk],
+ fmt=blivet.formats.get_format("ext4"))
+ self.storage.create_device(part1)
+
+ part2 = self.storage.new_partition(size=Size("1 MiB"), parents=[disk], grow=True,
+ fmt=blivet.formats.get_format("ext4"))
+ self.storage.create_device(part2)
+
+ blivet.partitioning.do_partitioning(self.storage)
+
+ self.storage.do_it()
+ self.storage.reset()
+
+ # remove the first partition (only the partition without removing the format)
+ part1 = self.storage.devicetree.get_device_by_path(self.vdevs[0] + "1")
+ ac = blivet.deviceaction.ActionDestroyDevice(part1)
+ self.storage.devicetree.actions.add(ac)
+
+ self.storage.do_it()
+ self.storage.reset()
+
+ # create the first partition again (without ext4)
+ disk = self.storage.devicetree.get_device_by_path(self.vdevs[0])
+ part1 = self.storage.new_partition(size=Size("100 MiB"), parents=[disk])
+ self.storage.create_device(part1)
+
+ blivet.partitioning.do_partitioning(self.storage)
+
+ # XXX PartitionDevice._post_create calls wipefs on the partition, we want to check that
+ # the _pre_create dd wipe works so we need to skip the _post_create wipefs call
+ part1._post_create = lambda: None
+
+ self.storage.do_it()
+ self.storage.reset()
+
+ # make sure the ext4 signature is not present on part1 (and untouched on part2)
+ self._partition_wipe_check()
+
+ def test_partition_wipe_mdraid(self):
+ """ Check that any stray RAID metadata are removed before creating a partition """
+ disk = self.storage.devicetree.get_device_by_path(self.vdevs[0])
+ self.assertIsNotNone(disk)
+
+ self.storage.format_device(disk, blivet.formats.get_format("disklabel", label_type="gpt"))
+
+ # create two partitions, one empty, one with ext4
+ part1 = self.storage.new_partition(size=Size("100 MiB"), parents=[disk])
+ self.storage.create_device(part1)
+
+ part2 = self.storage.new_partition(size=Size("1 MiB"), parents=[disk], grow=True,
+ fmt=blivet.formats.get_format("ext4"))
+ self.storage.create_device(part2)
+
+ blivet.partitioning.do_partitioning(self.storage)
+
+ self.storage.do_it()
+ self.storage.reset()
+
+ # create MD RAID with metadata 1.0 on the first partition
+ ret = blivet.util.run_program(["mdadm", "--create", "blivetMDTest", "--level=linear",
+ "--metadata=1.0", "--raid-devices=1", "--force", part1.path])
+ self.assertEqual(ret, 0, "Failed to create RAID array for partition wipe test")
+ ret = blivet.util.run_program(["mdadm", "--stop", "/dev/md/blivetMDTest"])
+ self.assertEqual(ret, 0, "Failed to create RAID array for partition wipe test")
+
+ # now remove the partition without removing the array first
+ part1 = self.storage.devicetree.get_device_by_path(self.vdevs[0] + "1")
+ ac = blivet.deviceaction.ActionDestroyDevice(part1)
+ self.storage.devicetree.actions.add(ac)
+
+ self.storage.do_it()
+ self.storage.reset()
+
+ # create the first partition again (without format)
+ disk = self.storage.devicetree.get_device_by_path(self.vdevs[0])
+ part1 = self.storage.new_partition(size=Size("100 MiB"), parents=[disk])
+ self.storage.create_device(part1)
+
+ blivet.partitioning.do_partitioning(self.storage)
+
+ # XXX PartitionDevice._post_create calls wipefs on the partition, we want to check that
+ # the _pre_create dd wipe works so we need to skip the _post_create wipefs call
+ part1._post_create = lambda: None
+
+ self.storage.do_it()
+ self.storage.reset()
+
+ # make sure the mdmember signature is not present on part1 (and ext4 is untouched on part2)
+ self._partition_wipe_check()