diff --git a/0013-fix-malformed-certs-workaround.patch b/0013-fix-malformed-certs-workaround.patch new file mode 100644 index 0000000..6cd5ff5 --- /dev/null +++ b/0013-fix-malformed-certs-workaround.patch @@ -0,0 +1,428 @@ +From 266ce8caed568e20e0521f93a0c03798ca9915f7 Mon Sep 17 00:00:00 2001 +From: Anderson Toshiyuki Sasaki +Date: Wed, 20 Aug 2025 13:40:36 +0200 +Subject: [PATCH 1/2] models: Do not re-encode certificate stored in DB + +Modify the Certificate class to store the original certificate bits to +the database instead of storing the possibly re-encoded certificate. + +The python-cryptography ASN.1 parser is strict and does not accept +malformed certificates. This makes it impossible to use some affected +devices, notably TPM certificates from Nuvoton. To workaround this, the +certificate is re-encoded using pyasn1, but this effectively modify the +certificate making its signature invalid. To avoid storing invalid +certificates to the database, the original bits of the certificate (as +received from the agent) are cached and later used when storing the +certificate into the database. + +Assisted-by: Claude 4 Sonnet +Signed-off-by: Anderson Toshiyuki Sasaki + +fix +--- + keylime/models/base/types/certificate.py | 65 ++++++- + keylime/models/registrar/registrar_agent.py | 6 + + test/test_certificate_preservation.py | 197 ++++++++++++++++++++ + 3 files changed, 264 insertions(+), 4 deletions(-) + create mode 100644 test/test_certificate_preservation.py + +diff --git a/keylime/models/base/types/certificate.py b/keylime/models/base/types/certificate.py +index 2c27603ba..551f73b1c 100644 +--- a/keylime/models/base/types/certificate.py ++++ b/keylime/models/base/types/certificate.py +@@ -10,6 +10,7 @@ + from pyasn1.error import PyAsn1Error + from pyasn1_modules import pem as pyasn1_pem + from pyasn1_modules import rfc2459 as pyasn1_rfc2459 ++from sqlalchemy.engine.interfaces import Dialect + from sqlalchemy.types import Text + + from keylime.models.base.type import ModelType +@@ -82,6 +83,8 @@ def _schema(self): + + def __init__(self) -> None: + super().__init__(Text) ++ # Instance-level cache for original certificate bytes when re-encoding is needed ++ self._original_bytes_cache: Optional[bytes] = None + + def _load_der_cert(self, der_cert_data: bytes) -> cryptography.x509.Certificate: + """Loads a binary x509 certificate encoded using ASN.1 DER as a ``cryptography.x509.Certificate`` object. This +@@ -112,8 +115,11 @@ def _load_der_cert(self, der_cert_data: bytes) -> cryptography.x509.Certificate: + try: + return cryptography.x509.load_der_x509_certificate(der_cert_data) + except Exception: ++ # Store original bytes before re-encoding to preserve signature validity + pyasn1_cert = pyasn1_decoder.decode(der_cert_data, asn1Spec=pyasn1_rfc2459.Certificate())[0] +- return cryptography.x509.load_der_x509_certificate(pyasn1_encoder.encode(pyasn1_cert)) ++ cert = cryptography.x509.load_der_x509_certificate(pyasn1_encoder.encode(pyasn1_cert)) ++ self._original_bytes_cache = base64.b64encode(der_cert_data) ++ return cert + + def _load_pem_cert(self, pem_cert_data: str) -> cryptography.x509.Certificate: + """Loads a text x509 certificate encoded using PEM (Base64ed DER with header and footer) as a +@@ -135,7 +141,7 @@ def _load_pem_cert(self, pem_cert_data: str) -> cryptography.x509.Certificate: + [2] https://github.com/pyca/cryptography/issues/7189 + [3] https://github.com/keylime/keylime/issues/1559 + +- :param der_cert_data: the DER bytes of the certificate ++ :param pem_cert_data: the PEM bytes of the certificate + + :raises: :class:`SubstrateUnderrunError`: cert could not be deserialized even using the fallback pyasn1 parser + +@@ -146,8 +152,13 @@ def _load_pem_cert(self, pem_cert_data: str) -> cryptography.x509.Certificate: + return cryptography.x509.load_pem_x509_certificate(pem_cert_data.encode("utf-8")) + except Exception: + der_data = pyasn1_pem.readPemFromFile(io.StringIO(pem_cert_data)) ++ # Store original DER bytes before re-encoding to preserve signature validity + pyasn1_cert = pyasn1_decoder.decode(der_data, asn1Spec=pyasn1_rfc2459.Certificate())[0] +- return cryptography.x509.load_der_x509_certificate(pyasn1_encoder.encode(pyasn1_cert)) ++ cert = cryptography.x509.load_der_x509_certificate(pyasn1_encoder.encode(pyasn1_cert)) ++ # Only store if we have valid DER bytes (not empty string) ++ if isinstance(der_data, bytes) and der_data: ++ self._original_bytes_cache = base64.b64encode(der_data) ++ return cert + + def infer_encoding(self, value: IncomingValue) -> Optional[str]: + """Tries to infer the certificate encoding from the given value based on the data type and other surface-level +@@ -269,7 +280,13 @@ def _dump(self, value: IncomingValue) -> Optional[str]: + if not cert: + return None + +- # Save as Base64-encoded value (without the PEM "BEGIN" and "END" header/footer for efficiency) ++ # Check if we have original bytes preserved for this certificate (when re-encoding was needed) ++ original_bytes = self._original_bytes_cache ++ if original_bytes is not None: ++ # Use original bytes to preserve signature validity ++ return original_bytes.decode("utf-8") ++ ++ # Use standard encoding for ASN.1-compliant certificates + return base64.b64encode(cert.public_bytes(Encoding.DER)).decode("utf-8") + + def render(self, value: IncomingValue) -> Optional[str]: +@@ -279,9 +296,49 @@ def render(self, value: IncomingValue) -> Optional[str]: + if not cert: + return None + ++ # Check if we have original bytes preserved for this certificate (when re-encoding was needed) ++ original_bytes = self._original_bytes_cache ++ if original_bytes is not None: ++ # Use original bytes to preserve signature validity ++ # The original bytes are expected to be base64 encoded DER ++ return "\n".join( ++ ["-----BEGIN CERTIFICATE-----", original_bytes.decode("utf-8"), "-----END CERTIFICATE-----"] ++ ) ++ + # Render certificate in PEM format + return cert.public_bytes(Encoding.PEM).decode("utf-8") # type: ignore[no-any-return] + ++ def db_load(self, value: Optional[str], _dialect: Dialect) -> Optional[cryptography.x509.Certificate]: ++ """Load certificate from database, preserving original bytes when re-encoding is needed. ++ ++ This method ensures that when reading certificates from the database, we preserve ++ the original bytes (which may have been stored from a previous malformed certificate) ++ rather than losing them during re-parsing. ++ """ ++ if not value: ++ return None ++ ++ # Decode the Base64 value from database - these are the original bytes we want to preserve ++ try: ++ original_der_bytes = base64.b64decode(value, validate=True) ++ except (binascii.Error, ValueError): ++ # If Base64 decoding fails, fall back to standard loading ++ return self.cast(value) ++ ++ # Cast to certificate object (may trigger re-encoding if still malformed) ++ cert = self.cast(original_der_bytes) ++ ++ return cert ++ ++ def has_original_bytes(self) -> bool: ++ """Check if this certificate has preserved original bytes""" ++ return self._original_bytes_cache is not None ++ ++ def get_original_bytes(self) -> Optional[bytes]: ++ """Return the preserved original bytes of the base64-encoded certificate ++ if the certificate was malformed and it was stored""" ++ return self._original_bytes_cache ++ + @property + def native_type(self) -> type: + return cryptography.x509.Certificate +diff --git a/keylime/models/registrar/registrar_agent.py b/keylime/models/registrar/registrar_agent.py +index 560c18838..58dd0854c 100644 +--- a/keylime/models/registrar/registrar_agent.py ++++ b/keylime/models/registrar/registrar_agent.py +@@ -357,6 +357,12 @@ def render(self, only=None): + + # When operating in pull mode, ekcert is encoded as Base64 instead of PEM + if output.get("ekcert"): ++ if self.ekcert.has_original_bytes: ++ original_bytes = self.ekcert.get_original_bytes() ++ if original_bytes is not None: ++ # In case the cert was malformed, the original cert is ++ # cached base64-encoded ++ output["ekcert"] = original_bytes.decode("utf-8") + output["ekcert"] = base64.b64encode(self.ekcert.public_bytes(Encoding.DER)).decode("utf-8") + + return output +diff --git a/test/test_certificate_preservation.py b/test/test_certificate_preservation.py +new file mode 100644 +index 000000000..1ac981891 +--- /dev/null ++++ b/test/test_certificate_preservation.py +@@ -0,0 +1,197 @@ ++""" ++Unit tests for certificate original bytes preservation functionality. ++ ++Tests that the Certificate type correctly preserves original certificate bytes ++when malformed certificates require pyasn1 re-encoding, ensuring signatures ++remain valid throughout the database lifecycle. ++""" ++ ++import base64 ++import os ++import sys ++import unittest ++from unittest.mock import Mock ++ ++# Add the parent directory to sys.path to import from local keylime ++sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) ++# Imported after path manipulation to get local version ++from keylime.models.base.types.certificate import Certificate # pylint: disable=wrong-import-position ++ ++ ++class TestCertificatePreservation(unittest.TestCase): ++ """Test certificate original bytes preservation functionality.""" ++ ++ # pylint: disable=protected-access # Tests need access to internal cache for validation ++ ++ def setUp(self): ++ """Set up test fixtures.""" ++ # Real malformed TPM certificate from /tmp/malformed_cert.txt ++ # This is a Nuvoton TPM EK certificate that requires pyasn1 re-encoding ++ malformed_cert_multiline = """MIIDUjCCAvegAwIBAgILAI5xYHQ14nH5hdYwCgYIKoZIzj0EAwIwVTFTMB8GA1UEAxMYTnV2b3Rv ++biBUUE0gUm9vdCBDQSAyMTExMCUGA1UEChMeTnV2b3RvbiBUZWNobm9sb2d5IENvcnBvcmF0aW9u ++MAkGA1UEBhMCVFcwHhcNMTkwNzIzMTcxNTEzWhcNMzkwNzE5MTcxNTEzWjAAMIIBIjANBgkqhkiG ++9w0BAQEFAAOCAQ8AMIIBCgKCAQEAk8kCj7srY/Zlvm1795fVXdyX44w5qsd1m5VywMDgSOavzPKO ++kgbHgQNx6Ak5+4Q43EJ/5qsaDBv59F8W7K69maUwcMNq1xpuq0V/LiwgJVAtc3CdvlxtwQrn7+Uq ++ieIGf+i8sGxpeUCSmYHJPTHNHqjQnvUtdGoy/+WO0i7WsAvX3k/gHHr4p58a8urjJ1RG2Lk1g48D ++ESwl+D7atQEPWzgjr6vK/s5KpLrn7M+dh97TUbG1510AOWBPP35MtT8IZbqC4hs2Ol16gT1M3a9e +++GaMZkItLUwV76vKDNEgTZG8M1C9OItA/xwzlfXbPepzpxWb4kzHS4qZoQtl4vBZrQIDAQABo4IB ++NjCCATIwUAYDVR0RAQH/BEYwRKRCMEAxPjAUBgVngQUCARMLaWQ6NEU1NDQzMDAwEAYFZ4EFAgIT ++B05QQ1Q3NXgwFAYFZ4EFAgMTC2lkOjAwMDcwMDAyMAwGA1UdEwEB/wQCMAAwEAYDVR0lBAkwBwYF ++Z4EFCAEwHwYDVR0jBBgwFoAUI/TiKtO+N0pEl3KVSqKDrtdSVy4wDgYDVR0PAQH/BAQDAgUgMCIG ++A1UdCQQbMBkwFwYFZ4EFAhAxDjAMDAMyLjACAQACAgCKMGkGCCsGAQUFBwEBBF0wWzBZBggrBgEF ++BQcwAoZNaHR0cHM6Ly93d3cubnV2b3Rvbi5jb20vc2VjdXJpdHkvTlRDLVRQTS1FSy1DZXJ0L051 ++dm90b24gVFBNIFJvb3QgQ0EgMjExMS5jZXIwCgYIKoZIzj0EAwIDSQAwRgIhAPHOFiBDZd0dfml2 ++a/KlPFhmX7Ahpd0Wq11ZUW1/ixviAiEAlex8BB5nsR6w8QrANwCxc7fH/YnbjXfMCFiWzeZH7ps=""" ++ # Normalize the Base64 (remove newlines and spaces) ++ self.malformed_cert_b64 = "".join(malformed_cert_multiline.split()) ++ # Decode to get original DER bytes ++ self.malformed_cert_der = base64.b64decode(self.malformed_cert_b64) ++ # Create Certificate instance for testing ++ self.cert_type = Certificate() ++ ++ def test_malformed_certificate_parsing(self): ++ """Test that malformed certificate can be parsed using pyasn1 fallback.""" ++ # This malformed certificate should trigger pyasn1 re-encoding ++ cert = self.cert_type.cast(self.malformed_cert_der) ++ ++ # Should successfully parse despite being malformed ++ self.assertIsNotNone(cert) ++ ++ # Should have preserved original bytes in cache ++ self.assertTrue(self.cert_type.has_original_bytes()) # type: ignore[arg-type] ++ ++ # Cached bytes should be the original DER bytes ++ cached_bytes = self.cert_type._original_bytes_cache.decode("utf-8") # type: ignore[arg-type] ++ self.assertEqual(cached_bytes, self.malformed_cert_b64) ++ ++ def test_asn1_compliance_detection(self): ++ """Test that malformed certificate is detected as non-ASN.1 compliant.""" ++ # This certificate should not be ASN.1 DER compliant ++ is_compliant = self.cert_type.asn1_compliant(self.malformed_cert_der) ++ self.assertFalse(is_compliant) ++ ++ # Base64 version should also be non-compliant ++ is_compliant_b64 = self.cert_type.asn1_compliant(self.malformed_cert_b64) ++ self.assertFalse(is_compliant_b64) ++ ++ def test_dump_preserves_original_bytes(self): ++ """Test that _dump() returns original bytes for malformed certificates.""" ++ # Parse malformed certificate (triggers pyasn1 and caching) ++ cert = self.cert_type.cast(self.malformed_cert_der) ++ ++ # Dump should return original bytes as Base64 ++ dumped = self.cert_type._dump(cert) # pylint: disable=protected-access ++ self.assertEqual(dumped, self.malformed_cert_b64) ++ ++ # Verify round-trip preservation ++ self.assertIsNotNone(dumped) ++ restored_bytes = base64.b64decode(dumped) # type: ignore[arg-type] ++ self.assertEqual(restored_bytes, self.malformed_cert_der) ++ ++ def test_db_load_preserves_original_bytes(self): ++ """Test that db_load() preserves original bytes when reading from database.""" ++ mock_dialect = Mock() ++ ++ # Simulate loading from database with our malformed certificate ++ cert = self.cert_type.db_load(self.malformed_cert_b64, mock_dialect) ++ ++ # Should successfully load certificate ++ self.assertIsNotNone(cert) ++ ++ # Should have preserved original bytes in cache ++ self.assertTrue(self.cert_type.has_original_bytes()) # type: ignore[arg-type] ++ ++ # Cached bytes should be the original DER bytes, base64-encoded ++ if self.cert_type._original_bytes_cache: ++ cached_bytes = self.cert_type._original_bytes_cache.decode("utf-8") ++ self.assertEqual(cached_bytes, self.malformed_cert_b64) ++ ++ def test_database_round_trip_preservation(self): ++ """Test complete database round-trip preserves original bytes.""" ++ mock_dialect = Mock() ++ ++ # Step 1: Initial parsing (simulates agent registration) ++ cert1 = self.cert_type.cast(self.malformed_cert_der) ++ self.assertIsNotNone(cert1) ++ self.assertTrue(self.cert_type.has_original_bytes()) # type: ignore[arg-type] ++ ++ # Step 2: Store to database simulation ++ db_value = self.cert_type._dump(cert1) # pylint: disable=protected-access ++ self.assertEqual(db_value, self.malformed_cert_b64) ++ ++ # Step 3: Create fresh Certificate instance (simulates new session) ++ fresh_cert_type = Certificate() ++ self.assertEqual(fresh_cert_type._original_bytes_cache, None) # pylint: disable=protected-access ++ ++ # Step 4: Load from database ++ cert2 = fresh_cert_type.db_load(db_value, mock_dialect) ++ self.assertIsNotNone(cert2) ++ self.assertTrue(fresh_cert_type.has_original_bytes()) # type: ignore[arg-type] ++ ++ # Step 5: Verify original bytes preserved across round-trip ++ self.assertTrue(fresh_cert_type._original_bytes_cache) ++ cached_bytes = fresh_cert_type._original_bytes_cache.decode("utf-8") # type: ignore[arg-type] ++ self.assertEqual(cached_bytes, self.malformed_cert_b64) ++ ++ # Step 6: Subsequent dump should still use original bytes ++ second_dump = fresh_cert_type._dump(cert2) # pylint: disable=protected-access ++ self.assertEqual(second_dump, self.malformed_cert_b64) ++ ++ def test_compliant_certificate_no_caching(self): ++ """Test that ASN.1-compliant certificates don't use caching.""" ++ # Create a simple ASN.1-compliant certificate for testing ++ # We'll use a well-formed Base64 string that represents valid DER ++ compliant_cert_b64 = "MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAwJKW9Q==" # Truncated for test ++ ++ try: ++ # Try to parse - may fail due to truncation, but that's ok for this test ++ cert = self.cert_type.cast(compliant_cert_b64) ++ if cert: # Only test if parsing succeeded ++ # Should not have cached original bytes ++ self.assertFalse(self.cert_type.has_original_bytes()) ++ self.assertEqual(self.cert_type._original_bytes_cache, None) ++ except (ValueError, Exception): ++ # Expected for truncated certificate - just verify no caching occurred ++ self.assertEqual(self.cert_type._original_bytes_cache, None) ++ ++ def test_db_load_with_none_value(self): ++ """Test that db_load() handles None values correctly.""" ++ mock_dialect = Mock() ++ ++ result = self.cert_type.db_load(None, mock_dialect) ++ self.assertIsNone(result) ++ self.assertEqual(self.cert_type._original_bytes_cache, None) ++ ++ def test_db_load_with_invalid_base64(self): ++ """Test that db_load() handles invalid Base64 gracefully.""" ++ mock_dialect = Mock() ++ ++ # Invalid Base64 string ++ invalid_b64 = "not_valid_base64!!!" ++ ++ # Should not raise exception, but may return None or attempt fallback ++ try: ++ _ = self.cert_type.db_load(invalid_b64, mock_dialect) ++ # Result may be None or may attempt to parse the string directly ++ # The important thing is it doesn't crash ++ except Exception: ++ # Some exceptions are expected for invalid data ++ # We just want to ensure it's handled gracefully ++ pass ++ ++ def test_render_method_unaffected(self): ++ """Test that render() method works normally with cached certificates.""" ++ cert = self.cert_type.cast(self.malformed_cert_der) ++ self.assertIsNotNone(cert) ++ ++ # render() should return PEM format (doesn't use cached bytes) ++ rendered = self.cert_type.render(cert) ++ ++ # Should be PEM format ++ if rendered: # Only test if rendering succeeded ++ self.assertIn("-----BEGIN CERTIFICATE-----", rendered) ++ self.assertIn("-----END CERTIFICATE-----", rendered) ++ ++ ++if __name__ == "__main__": ++ unittest.main() + +From 4a4084ec081bf3b8caf2d8450db6a86bdbb45c77 Mon Sep 17 00:00:00 2001 +From: Anderson Toshiyuki Sasaki +Date: Mon, 25 Aug 2025 18:19:44 +0200 +Subject: [PATCH 2/2] registrar_agent: Use pyasn1 to parse PEM + +The models.types.Certificate render() method returns a PEM formatted +certificate. + +When the certificate is malformed, the original certificate is returned +by the render method(). + +Use pyasn1 to extract the DER content from the certificate to avoid +failing to parse due to python-cryptography strict ASN.1 parser. + +Signed-off-by: Anderson Toshiyuki Sasaki +--- + keylime/models/registrar/registrar_agent.py | 17 +++++++++-------- + 1 file changed, 9 insertions(+), 8 deletions(-) + +diff --git a/keylime/models/registrar/registrar_agent.py b/keylime/models/registrar/registrar_agent.py +index 58dd0854c..1d7d6ea05 100644 +--- a/keylime/models/registrar/registrar_agent.py ++++ b/keylime/models/registrar/registrar_agent.py +@@ -1,9 +1,11 @@ + import base64 + import hmac ++import io + + import cryptography.x509 + from cryptography.hazmat.primitives.asymmetric import ec, rsa + from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat ++from pyasn1_modules import pem as pyasn1_pem + + from keylime import cert_utils, config, crypto, keylime_logging + from keylime.models.base import ( +@@ -356,13 +358,12 @@ def render(self, only=None): + output = super().render(only) + + # When operating in pull mode, ekcert is encoded as Base64 instead of PEM +- if output.get("ekcert"): +- if self.ekcert.has_original_bytes: +- original_bytes = self.ekcert.get_original_bytes() +- if original_bytes is not None: +- # In case the cert was malformed, the original cert is +- # cached base64-encoded +- output["ekcert"] = original_bytes.decode("utf-8") +- output["ekcert"] = base64.b64encode(self.ekcert.public_bytes(Encoding.DER)).decode("utf-8") ++ ekcert = output.get("ekcert") ++ if ekcert: ++ der_data = pyasn1_pem.readPemFromFile(io.StringIO(ekcert)) ++ if der_data: ++ output["ekcert"] = base64.b64encode(der_data).decode("utf-8") ++ else: ++ self._add_error("ekcert", "Failed to parse certificate") + + return output diff --git a/keylime.spec b/keylime.spec index a31de45..ceb6885 100644 --- a/keylime.spec +++ b/keylime.spec @@ -9,7 +9,7 @@ Name: keylime Version: 7.12.1 -Release: 11%{?dist} +Release: 11%{?dist}.1 Summary: Open source TPM software for Bootstrapping and Maintaining Trust URL: https://github.com/keylime/keylime @@ -39,6 +39,7 @@ Patch: 0009-mb-support-vendor_db-as-logged-by-newer-shim-version.patch Patch: 0010-verifier-Gracefully-shutdown-on-signal.patch Patch: 0011-revocations-Try-to-send-notifications-on-shutdown.patch Patch: 0012-requests_client-close-the-session-at-the-end-of-the-.patch +Patch: 0013-fix-malformed-certs-workaround.patch License: ASL 2.0 and MIT