python-blivet/0020-Wipe-end-partition-before-creating-it-as-well-as-the-start.patch
2025-05-30 13:40:40 +02:00

386 lines
16 KiB
Diff

From 025cc54c04132a056ba863ecdb1d05f3465632a5 Mon Sep 17 00:00:00 2001
From: Vojtech Trefny <vtrefny@redhat.com>
Date: Tue, 27 May 2025 15:21:23 +0200
Subject: [PATCH 1/3] Add some basic partitioning storage tests
This supplements the existing tests which use sparse files. These
new test cases actually run do_it() and check the result after
reset. More test cases will follow.
Related: RHEL-93967
---
.../devices_test/partition_test.py | 148 ++++++++++++++++++
1 file changed, 148 insertions(+)
diff --git a/tests/storage_tests/devices_test/partition_test.py b/tests/storage_tests/devices_test/partition_test.py
index 73da87b43..30e6a0151 100644
--- a/tests/storage_tests/devices_test/partition_test.py
+++ b/tests/storage_tests/devices_test/partition_test.py
@@ -5,6 +5,7 @@
from unittest.mock import patch
+import blivet
from blivet.devices import DiskFile
from blivet.devices import PartitionDevice
from blivet.devicelibs.gpt import gpt_part_uuid_for_mountpoint
@@ -13,6 +14,8 @@
from blivet.size import Size
from blivet.util import sparsetmpfile
+from ..storagetestcase import StorageTestCase
+
class PartitionDeviceTestCase(unittest.TestCase):
@@ -266,3 +269,148 @@ def test_dev_part_type_gpt_autodiscover(self):
flags.gpt_discoverable_partitions = True
self.assertEqual(device.part_type_uuid,
gpt_part_uuid_for_mountpoint("/home"))
+
+
+class PartitionTestCase(StorageTestCase):
+
+ def setUp(self):
+ super().setUp()
+
+ disks = [os.path.basename(vdev) for vdev in self.vdevs]
+ self.storage = blivet.Blivet()
+ self.storage.exclusive_disks = disks
+ self.storage.reset()
+
+ # make sure only the targetcli disks are in the devicetree
+ for disk in self.storage.disks:
+ self.assertTrue(disk.path in self.vdevs)
+ self.assertIsNone(disk.format.type)
+ self.assertFalse(disk.children)
+
+ def _clean_up(self):
+ self.storage.reset()
+ for disk in self.storage.disks:
+ if disk.path not in self.vdevs:
+ raise RuntimeError("Disk %s found in devicetree but not in disks created for tests" % disk.name)
+ self.storage.recursive_remove(disk)
+
+ self.storage.do_it()
+
+ def test_msdos_basic(self):
+ disk = self.storage.devicetree.get_device_by_path(self.vdevs[0])
+ self.assertIsNotNone(disk)
+
+ self.storage.format_device(disk, blivet.formats.get_format("disklabel", label_type="msdos"))
+
+ for i in range(4):
+ part = self.storage.new_partition(size=Size("100 MiB"), parents=[disk],
+ primary=True)
+ self.storage.create_device(part)
+
+ blivet.partitioning.do_partitioning(self.storage)
+
+ self.storage.do_it()
+ self.storage.reset()
+
+ disk = self.storage.devicetree.get_device_by_path(self.vdevs[0])
+ self.assertIsNotNone(disk)
+ self.assertEqual(disk.format.type, "disklabel")
+ self.assertEqual(disk.format.label_type, "msdos")
+ self.assertIsNotNone(disk.format.parted_disk)
+ self.assertIsNotNone(disk.format.parted_device)
+ self.assertEqual(len(disk.format.partitions), 4)
+ self.assertEqual(len(disk.format.primary_partitions), 4)
+ self.assertEqual(len(disk.children), 4)
+
+ for i in range(4):
+ part = self.storage.devicetree.get_device_by_path(self.vdevs[0] + str(i + 1))
+ self.assertIsNotNone(part)
+ self.assertEqual(part.type, "partition")
+ self.assertEqual(part.disk, disk)
+ self.assertEqual(part.size, Size("100 MiB"))
+ self.assertTrue(part.is_primary)
+ self.assertFalse(part.is_extended)
+ self.assertFalse(part.is_logical)
+ self.assertIsNotNone(part.parted_partition)
+
+ def test_msdos_extended(self):
+ disk = self.storage.devicetree.get_device_by_path(self.vdevs[0])
+ self.assertIsNotNone(disk)
+
+ self.storage.format_device(disk, blivet.formats.get_format("disklabel", label_type="msdos"))
+
+ part = self.storage.new_partition(size=Size("100 MiB"), parents=[disk])
+ self.storage.create_device(part)
+
+ part = self.storage.new_partition(size=Size("1 GiB"), parents=[disk],
+ part_type=parted.PARTITION_EXTENDED)
+ self.storage.create_device(part)
+
+ blivet.partitioning.do_partitioning(self.storage)
+
+ for i in range(4):
+ part = self.storage.new_partition(size=Size("100 MiB"), parents=[disk],
+ part_type=parted.PARTITION_LOGICAL)
+ self.storage.create_device(part)
+
+ blivet.partitioning.do_partitioning(self.storage)
+
+ self.storage.do_it()
+ self.storage.reset()
+
+ disk = self.storage.devicetree.get_device_by_path(self.vdevs[0])
+ self.assertIsNotNone(disk)
+ self.assertEqual(disk.format.type, "disklabel")
+ self.assertEqual(disk.format.label_type, "msdos")
+ self.assertIsNotNone(disk.format.parted_disk)
+ self.assertIsNotNone(disk.format.parted_device)
+ self.assertEqual(len(disk.format.partitions), 6)
+ self.assertEqual(len(disk.format.primary_partitions), 1)
+ self.assertEqual(len(disk.children), 6)
+
+ for i in range(4, 8):
+ part = self.storage.devicetree.get_device_by_path(self.vdevs[0] + str(i + 1))
+ self.assertIsNotNone(part)
+ self.assertEqual(part.type, "partition")
+ self.assertEqual(part.disk, disk)
+ self.assertEqual(part.size, Size("100 MiB"))
+ self.assertFalse(part.is_primary)
+ self.assertFalse(part.is_extended)
+ self.assertTrue(part.is_logical)
+ self.assertIsNotNone(part.parted_partition)
+
+ def test_gpt_basic(self):
+ disk = self.storage.devicetree.get_device_by_path(self.vdevs[0])
+ self.assertIsNotNone(disk)
+
+ self.storage.format_device(disk, blivet.formats.get_format("disklabel", label_type="gpt"))
+
+ for i in range(4):
+ part = self.storage.new_partition(size=Size("100 MiB"), parents=[disk],)
+ self.storage.create_device(part)
+
+ blivet.partitioning.do_partitioning(self.storage)
+
+ self.storage.do_it()
+ self.storage.reset()
+
+ disk = self.storage.devicetree.get_device_by_path(self.vdevs[0])
+ self.assertIsNotNone(disk)
+ self.assertEqual(disk.format.type, "disklabel")
+ self.assertEqual(disk.format.label_type, "gpt")
+ self.assertIsNotNone(disk.format.parted_disk)
+ self.assertIsNotNone(disk.format.parted_device)
+ self.assertEqual(len(disk.format.partitions), 4)
+ self.assertEqual(len(disk.format.primary_partitions), 4)
+ self.assertEqual(len(disk.children), 4)
+
+ for i in range(4):
+ part = self.storage.devicetree.get_device_by_path(self.vdevs[0] + str(i + 1))
+ self.assertIsNotNone(part)
+ self.assertEqual(part.type, "partition")
+ self.assertEqual(part.disk, disk)
+ self.assertEqual(part.size, Size("100 MiB"))
+ self.assertTrue(part.is_primary)
+ self.assertFalse(part.is_extended)
+ self.assertFalse(part.is_logical)
+ self.assertIsNotNone(part.parted_partition)
From ab6261adbdfedc26c6b0712a42a3dd9169cabb38 Mon Sep 17 00:00:00 2001
From: Vojtech Trefny <vtrefny@redhat.com>
Date: Tue, 27 May 2025 14:10:49 +0200
Subject: [PATCH 2/3] Wipe end partition before creating it as well as the
start
We are currently overwritting start of the newly created partition
with zeroes to remove any filesystem metadata that might occupy
the space. This extends this functionality to end of the partition
to remove 1.0 MD metadata that might be there.
Resolves: RHEL-93967
---
blivet/devices/partition.py | 20 +++++++++++++++++++-
1 file changed, 19 insertions(+), 1 deletion(-)
diff --git a/blivet/devices/partition.py b/blivet/devices/partition.py
index 89470d9fb..fc9a97be7 100644
--- a/blivet/devices/partition.py
+++ b/blivet/devices/partition.py
@@ -659,7 +659,7 @@ def _wipe(self):
""" Wipe the partition metadata.
Assumes that the partition metadata is located at the start
- of the partition and occupies no more than 1 MiB.
+ and end of the partition and occupies no more than 1 MiB.
Erases in block increments. Erases the smallest number of blocks
such that at least 1 MiB is erased or the whole partition is
@@ -692,6 +692,24 @@ def _wipe(self):
# things to settle.
udev.settle()
+ if count >= part_len:
+ # very small partition, we wiped it completely already
+ return
+
+ # now do the end of the partition as well (RAID 1.0 metadata)
+ end = self.parted_partition.geometry.end
+ cmd = ["dd", "if=/dev/zero", "of=%s" % device, "bs=%d" % bs,
+ "seek=%d" % (end - count), "count=%d" % count]
+ try:
+ util.run_program(cmd)
+ except OSError as e:
+ log.error(str(e))
+ finally:
+ # If a udev device is created with the watch option, then
+ # a change uevent is synthesized and we need to wait for
+ # things to settle.
+ udev.settle()
+
def _create(self):
""" Create the device. """
log_method_call(self, self.name, status=self.status)
From 936cccdf67e3ee612399bd3f0f8b383ca118ce9b Mon Sep 17 00:00:00 2001
From: Vojtech Trefny <vtrefny@redhat.com>
Date: Wed, 28 May 2025 11:01:14 +0200
Subject: [PATCH 3/3] tests: Add tests for wiping stale metadata from new
partitions
Related: RHEL-93967
---
.../devices_test/partition_test.py | 119 ++++++++++++++++++
1 file changed, 119 insertions(+)
diff --git a/tests/storage_tests/devices_test/partition_test.py b/tests/storage_tests/devices_test/partition_test.py
index 30e6a0151..87e4b1155 100644
--- a/tests/storage_tests/devices_test/partition_test.py
+++ b/tests/storage_tests/devices_test/partition_test.py
@@ -1,6 +1,7 @@
import os
import unittest
from uuid import UUID
+import blivet.deviceaction
import parted
from unittest.mock import patch
@@ -414,3 +415,121 @@ def test_gpt_basic(self):
self.assertFalse(part.is_extended)
self.assertFalse(part.is_logical)
self.assertIsNotNone(part.parted_partition)
+
+ def _partition_wipe_check(self):
+ part1 = self.storage.devicetree.get_device_by_path(self.vdevs[0] + "1")
+ self.assertIsNotNone(part1)
+ self.assertIsNone(part1.format.type)
+
+ out = blivet.util.capture_output(["blkid", "-p", "-sTYPE", "-ovalue", self.vdevs[0] + "1"])
+ self.assertEqual(out.strip(), "")
+
+ part2 = self.storage.devicetree.get_device_by_path(self.vdevs[0] + "2")
+ self.assertIsNotNone(part2)
+ self.assertEqual(part2.format.type, "ext4")
+
+ try:
+ part2.format.do_check()
+ except blivet.errors.FSError as e:
+ self.fail("Partition wipe corrupted filesystem on an adjacent partition: %s" % str(e))
+
+ out = blivet.util.capture_output(["blkid", "-p", "-sTYPE", "-ovalue", self.vdevs[0] + "2"])
+ self.assertEqual(out.strip(), "ext4")
+
+ def test_partition_wipe_ext(self):
+ """ Check that any stray filesystem metadata are removed before creating a partition """
+ disk = self.storage.devicetree.get_device_by_path(self.vdevs[0])
+ self.assertIsNotNone(disk)
+
+ self.storage.format_device(disk, blivet.formats.get_format("disklabel", label_type="gpt"))
+
+ # create two partitions with ext4
+ part1 = self.storage.new_partition(size=Size("100 MiB"), parents=[disk],
+ fmt=blivet.formats.get_format("ext4"))
+ self.storage.create_device(part1)
+
+ part2 = self.storage.new_partition(size=Size("1 MiB"), parents=[disk], grow=True,
+ fmt=blivet.formats.get_format("ext4"))
+ self.storage.create_device(part2)
+
+ blivet.partitioning.do_partitioning(self.storage)
+
+ self.storage.do_it()
+ self.storage.reset()
+
+ # remove the first partition (only the partition without removing the format)
+ part1 = self.storage.devicetree.get_device_by_path(self.vdevs[0] + "1")
+ ac = blivet.deviceaction.ActionDestroyDevice(part1)
+ self.storage.devicetree.actions.add(ac)
+
+ self.storage.do_it()
+ self.storage.reset()
+
+ # create the first partition again (without ext4)
+ disk = self.storage.devicetree.get_device_by_path(self.vdevs[0])
+ part1 = self.storage.new_partition(size=Size("100 MiB"), parents=[disk])
+ self.storage.create_device(part1)
+
+ blivet.partitioning.do_partitioning(self.storage)
+
+ # XXX PartitionDevice._post_create calls wipefs on the partition, we want to check that
+ # the _pre_create dd wipe works so we need to skip the _post_create wipefs call
+ part1._post_create = lambda: None
+
+ self.storage.do_it()
+ self.storage.reset()
+
+ # make sure the ext4 signature is not present on part1 (and untouched on part2)
+ self._partition_wipe_check()
+
+ def test_partition_wipe_mdraid(self):
+ """ Check that any stray RAID metadata are removed before creating a partition """
+ disk = self.storage.devicetree.get_device_by_path(self.vdevs[0])
+ self.assertIsNotNone(disk)
+
+ self.storage.format_device(disk, blivet.formats.get_format("disklabel", label_type="gpt"))
+
+ # create two partitions, one empty, one with ext4
+ part1 = self.storage.new_partition(size=Size("100 MiB"), parents=[disk])
+ self.storage.create_device(part1)
+
+ part2 = self.storage.new_partition(size=Size("1 MiB"), parents=[disk], grow=True,
+ fmt=blivet.formats.get_format("ext4"))
+ self.storage.create_device(part2)
+
+ blivet.partitioning.do_partitioning(self.storage)
+
+ self.storage.do_it()
+ self.storage.reset()
+
+ # create MD RAID with metadata 1.0 on the first partition
+ ret = blivet.util.run_program(["mdadm", "--create", "blivetMDTest", "--level=linear",
+ "--metadata=1.0", "--raid-devices=1", "--force", part1.path])
+ self.assertEqual(ret, 0, "Failed to create RAID array for partition wipe test")
+ ret = blivet.util.run_program(["mdadm", "--stop", "/dev/md/blivetMDTest"])
+ self.assertEqual(ret, 0, "Failed to create RAID array for partition wipe test")
+
+ # now remove the partition without removing the array first
+ part1 = self.storage.devicetree.get_device_by_path(self.vdevs[0] + "1")
+ ac = blivet.deviceaction.ActionDestroyDevice(part1)
+ self.storage.devicetree.actions.add(ac)
+
+ self.storage.do_it()
+ self.storage.reset()
+
+ # create the first partition again (without format)
+ disk = self.storage.devicetree.get_device_by_path(self.vdevs[0])
+ part1 = self.storage.new_partition(size=Size("100 MiB"), parents=[disk])
+ self.storage.create_device(part1)
+
+ blivet.partitioning.do_partitioning(self.storage)
+
+ # XXX PartitionDevice._post_create calls wipefs on the partition, we want to check that
+ # the _pre_create dd wipe works so we need to skip the _post_create wipefs call
+ part1._post_create = lambda: None
+
+ self.storage.do_it()
+ self.storage.reset()
+
+ # make sure the mdmember signature is not present on part1 (and ext4 is untouched on part2)
+ self._partition_wipe_check()