Fix malformed TPM certificates workaround

Restore the possibility of using an alternative certificate verification
script to verify the EK certificate.

Resolves: RHEL-111244

Signed-off-by: Anderson Toshiyuki Sasaki <ansasaki@redhat.com>
This commit is contained in:
Anderson Toshiyuki Sasaki 2025-08-26 16:31:36 +02:00
parent 302e60d9ff
commit 7b863e83f7
2 changed files with 430 additions and 1 deletions

View File

@ -0,0 +1,428 @@
From 266ce8caed568e20e0521f93a0c03798ca9915f7 Mon Sep 17 00:00:00 2001
From: Anderson Toshiyuki Sasaki <ansasaki@redhat.com>
Date: Wed, 20 Aug 2025 13:40:36 +0200
Subject: [PATCH 1/2] models: Do not re-encode certificate stored in DB
Modify the Certificate class to store the original certificate bits to
the database instead of storing the possibly re-encoded certificate.
The python-cryptography ASN.1 parser is strict and does not accept
malformed certificates. This makes it impossible to use some affected
devices, notably TPM certificates from Nuvoton. To workaround this, the
certificate is re-encoded using pyasn1, but this effectively modify the
certificate making its signature invalid. To avoid storing invalid
certificates to the database, the original bits of the certificate (as
received from the agent) are cached and later used when storing the
certificate into the database.
Assisted-by: Claude 4 Sonnet
Signed-off-by: Anderson Toshiyuki Sasaki <ansasaki@redhat.com>
fix
---
keylime/models/base/types/certificate.py | 65 ++++++-
keylime/models/registrar/registrar_agent.py | 6 +
test/test_certificate_preservation.py | 197 ++++++++++++++++++++
3 files changed, 264 insertions(+), 4 deletions(-)
create mode 100644 test/test_certificate_preservation.py
diff --git a/keylime/models/base/types/certificate.py b/keylime/models/base/types/certificate.py
index 2c27603ba..551f73b1c 100644
--- a/keylime/models/base/types/certificate.py
+++ b/keylime/models/base/types/certificate.py
@@ -10,6 +10,7 @@
from pyasn1.error import PyAsn1Error
from pyasn1_modules import pem as pyasn1_pem
from pyasn1_modules import rfc2459 as pyasn1_rfc2459
+from sqlalchemy.engine.interfaces import Dialect
from sqlalchemy.types import Text
from keylime.models.base.type import ModelType
@@ -82,6 +83,8 @@ def _schema(self):
def __init__(self) -> None:
super().__init__(Text)
+ # Instance-level cache for original certificate bytes when re-encoding is needed
+ self._original_bytes_cache: Optional[bytes] = None
def _load_der_cert(self, der_cert_data: bytes) -> cryptography.x509.Certificate:
"""Loads a binary x509 certificate encoded using ASN.1 DER as a ``cryptography.x509.Certificate`` object. This
@@ -112,8 +115,11 @@ def _load_der_cert(self, der_cert_data: bytes) -> cryptography.x509.Certificate:
try:
return cryptography.x509.load_der_x509_certificate(der_cert_data)
except Exception:
+ # Store original bytes before re-encoding to preserve signature validity
pyasn1_cert = pyasn1_decoder.decode(der_cert_data, asn1Spec=pyasn1_rfc2459.Certificate())[0]
- return cryptography.x509.load_der_x509_certificate(pyasn1_encoder.encode(pyasn1_cert))
+ cert = cryptography.x509.load_der_x509_certificate(pyasn1_encoder.encode(pyasn1_cert))
+ self._original_bytes_cache = base64.b64encode(der_cert_data)
+ return cert
def _load_pem_cert(self, pem_cert_data: str) -> cryptography.x509.Certificate:
"""Loads a text x509 certificate encoded using PEM (Base64ed DER with header and footer) as a
@@ -135,7 +141,7 @@ def _load_pem_cert(self, pem_cert_data: str) -> cryptography.x509.Certificate:
[2] https://github.com/pyca/cryptography/issues/7189
[3] https://github.com/keylime/keylime/issues/1559
- :param der_cert_data: the DER bytes of the certificate
+ :param pem_cert_data: the PEM bytes of the certificate
:raises: :class:`SubstrateUnderrunError`: cert could not be deserialized even using the fallback pyasn1 parser
@@ -146,8 +152,13 @@ def _load_pem_cert(self, pem_cert_data: str) -> cryptography.x509.Certificate:
return cryptography.x509.load_pem_x509_certificate(pem_cert_data.encode("utf-8"))
except Exception:
der_data = pyasn1_pem.readPemFromFile(io.StringIO(pem_cert_data))
+ # Store original DER bytes before re-encoding to preserve signature validity
pyasn1_cert = pyasn1_decoder.decode(der_data, asn1Spec=pyasn1_rfc2459.Certificate())[0]
- return cryptography.x509.load_der_x509_certificate(pyasn1_encoder.encode(pyasn1_cert))
+ cert = cryptography.x509.load_der_x509_certificate(pyasn1_encoder.encode(pyasn1_cert))
+ # Only store if we have valid DER bytes (not empty string)
+ if isinstance(der_data, bytes) and der_data:
+ self._original_bytes_cache = base64.b64encode(der_data)
+ return cert
def infer_encoding(self, value: IncomingValue) -> Optional[str]:
"""Tries to infer the certificate encoding from the given value based on the data type and other surface-level
@@ -269,7 +280,13 @@ def _dump(self, value: IncomingValue) -> Optional[str]:
if not cert:
return None
- # Save as Base64-encoded value (without the PEM "BEGIN" and "END" header/footer for efficiency)
+ # Check if we have original bytes preserved for this certificate (when re-encoding was needed)
+ original_bytes = self._original_bytes_cache
+ if original_bytes is not None:
+ # Use original bytes to preserve signature validity
+ return original_bytes.decode("utf-8")
+
+ # Use standard encoding for ASN.1-compliant certificates
return base64.b64encode(cert.public_bytes(Encoding.DER)).decode("utf-8")
def render(self, value: IncomingValue) -> Optional[str]:
@@ -279,9 +296,49 @@ def render(self, value: IncomingValue) -> Optional[str]:
if not cert:
return None
+ # Check if we have original bytes preserved for this certificate (when re-encoding was needed)
+ original_bytes = self._original_bytes_cache
+ if original_bytes is not None:
+ # Use original bytes to preserve signature validity
+ # The original bytes are expected to be base64 encoded DER
+ return "\n".join(
+ ["-----BEGIN CERTIFICATE-----", original_bytes.decode("utf-8"), "-----END CERTIFICATE-----"]
+ )
+
# Render certificate in PEM format
return cert.public_bytes(Encoding.PEM).decode("utf-8") # type: ignore[no-any-return]
+ def db_load(self, value: Optional[str], _dialect: Dialect) -> Optional[cryptography.x509.Certificate]:
+ """Load certificate from database, preserving original bytes when re-encoding is needed.
+
+ This method ensures that when reading certificates from the database, we preserve
+ the original bytes (which may have been stored from a previous malformed certificate)
+ rather than losing them during re-parsing.
+ """
+ if not value:
+ return None
+
+ # Decode the Base64 value from database - these are the original bytes we want to preserve
+ try:
+ original_der_bytes = base64.b64decode(value, validate=True)
+ except (binascii.Error, ValueError):
+ # If Base64 decoding fails, fall back to standard loading
+ return self.cast(value)
+
+ # Cast to certificate object (may trigger re-encoding if still malformed)
+ cert = self.cast(original_der_bytes)
+
+ return cert
+
+ def has_original_bytes(self) -> bool:
+ """Check if this certificate has preserved original bytes"""
+ return self._original_bytes_cache is not None
+
+ def get_original_bytes(self) -> Optional[bytes]:
+ """Return the preserved original bytes of the base64-encoded certificate
+ if the certificate was malformed and it was stored"""
+ return self._original_bytes_cache
+
@property
def native_type(self) -> type:
return cryptography.x509.Certificate
diff --git a/keylime/models/registrar/registrar_agent.py b/keylime/models/registrar/registrar_agent.py
index 560c18838..58dd0854c 100644
--- a/keylime/models/registrar/registrar_agent.py
+++ b/keylime/models/registrar/registrar_agent.py
@@ -357,6 +357,12 @@ def render(self, only=None):
# When operating in pull mode, ekcert is encoded as Base64 instead of PEM
if output.get("ekcert"):
+ if self.ekcert.has_original_bytes:
+ original_bytes = self.ekcert.get_original_bytes()
+ if original_bytes is not None:
+ # In case the cert was malformed, the original cert is
+ # cached base64-encoded
+ output["ekcert"] = original_bytes.decode("utf-8")
output["ekcert"] = base64.b64encode(self.ekcert.public_bytes(Encoding.DER)).decode("utf-8")
return output
diff --git a/test/test_certificate_preservation.py b/test/test_certificate_preservation.py
new file mode 100644
index 000000000..1ac981891
--- /dev/null
+++ b/test/test_certificate_preservation.py
@@ -0,0 +1,197 @@
+"""
+Unit tests for certificate original bytes preservation functionality.
+
+Tests that the Certificate type correctly preserves original certificate bytes
+when malformed certificates require pyasn1 re-encoding, ensuring signatures
+remain valid throughout the database lifecycle.
+"""
+
+import base64
+import os
+import sys
+import unittest
+from unittest.mock import Mock
+
+# Add the parent directory to sys.path to import from local keylime
+sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
+# Imported after path manipulation to get local version
+from keylime.models.base.types.certificate import Certificate # pylint: disable=wrong-import-position
+
+
+class TestCertificatePreservation(unittest.TestCase):
+ """Test certificate original bytes preservation functionality."""
+
+ # pylint: disable=protected-access # Tests need access to internal cache for validation
+
+ def setUp(self):
+ """Set up test fixtures."""
+ # Real malformed TPM certificate from /tmp/malformed_cert.txt
+ # This is a Nuvoton TPM EK certificate that requires pyasn1 re-encoding
+ malformed_cert_multiline = """MIIDUjCCAvegAwIBAgILAI5xYHQ14nH5hdYwCgYIKoZIzj0EAwIwVTFTMB8GA1UEAxMYTnV2b3Rv
+biBUUE0gUm9vdCBDQSAyMTExMCUGA1UEChMeTnV2b3RvbiBUZWNobm9sb2d5IENvcnBvcmF0aW9u
+MAkGA1UEBhMCVFcwHhcNMTkwNzIzMTcxNTEzWhcNMzkwNzE5MTcxNTEzWjAAMIIBIjANBgkqhkiG
+9w0BAQEFAAOCAQ8AMIIBCgKCAQEAk8kCj7srY/Zlvm1795fVXdyX44w5qsd1m5VywMDgSOavzPKO
+kgbHgQNx6Ak5+4Q43EJ/5qsaDBv59F8W7K69maUwcMNq1xpuq0V/LiwgJVAtc3CdvlxtwQrn7+Uq
+ieIGf+i8sGxpeUCSmYHJPTHNHqjQnvUtdGoy/+WO0i7WsAvX3k/gHHr4p58a8urjJ1RG2Lk1g48D
+ESwl+D7atQEPWzgjr6vK/s5KpLrn7M+dh97TUbG1510AOWBPP35MtT8IZbqC4hs2Ol16gT1M3a9e
++GaMZkItLUwV76vKDNEgTZG8M1C9OItA/xwzlfXbPepzpxWb4kzHS4qZoQtl4vBZrQIDAQABo4IB
+NjCCATIwUAYDVR0RAQH/BEYwRKRCMEAxPjAUBgVngQUCARMLaWQ6NEU1NDQzMDAwEAYFZ4EFAgIT
+B05QQ1Q3NXgwFAYFZ4EFAgMTC2lkOjAwMDcwMDAyMAwGA1UdEwEB/wQCMAAwEAYDVR0lBAkwBwYF
+Z4EFCAEwHwYDVR0jBBgwFoAUI/TiKtO+N0pEl3KVSqKDrtdSVy4wDgYDVR0PAQH/BAQDAgUgMCIG
+A1UdCQQbMBkwFwYFZ4EFAhAxDjAMDAMyLjACAQACAgCKMGkGCCsGAQUFBwEBBF0wWzBZBggrBgEF
+BQcwAoZNaHR0cHM6Ly93d3cubnV2b3Rvbi5jb20vc2VjdXJpdHkvTlRDLVRQTS1FSy1DZXJ0L051
+dm90b24gVFBNIFJvb3QgQ0EgMjExMS5jZXIwCgYIKoZIzj0EAwIDSQAwRgIhAPHOFiBDZd0dfml2
+a/KlPFhmX7Ahpd0Wq11ZUW1/ixviAiEAlex8BB5nsR6w8QrANwCxc7fH/YnbjXfMCFiWzeZH7ps="""
+ # Normalize the Base64 (remove newlines and spaces)
+ self.malformed_cert_b64 = "".join(malformed_cert_multiline.split())
+ # Decode to get original DER bytes
+ self.malformed_cert_der = base64.b64decode(self.malformed_cert_b64)
+ # Create Certificate instance for testing
+ self.cert_type = Certificate()
+
+ def test_malformed_certificate_parsing(self):
+ """Test that malformed certificate can be parsed using pyasn1 fallback."""
+ # This malformed certificate should trigger pyasn1 re-encoding
+ cert = self.cert_type.cast(self.malformed_cert_der)
+
+ # Should successfully parse despite being malformed
+ self.assertIsNotNone(cert)
+
+ # Should have preserved original bytes in cache
+ self.assertTrue(self.cert_type.has_original_bytes()) # type: ignore[arg-type]
+
+ # Cached bytes should be the original DER bytes
+ cached_bytes = self.cert_type._original_bytes_cache.decode("utf-8") # type: ignore[arg-type]
+ self.assertEqual(cached_bytes, self.malformed_cert_b64)
+
+ def test_asn1_compliance_detection(self):
+ """Test that malformed certificate is detected as non-ASN.1 compliant."""
+ # This certificate should not be ASN.1 DER compliant
+ is_compliant = self.cert_type.asn1_compliant(self.malformed_cert_der)
+ self.assertFalse(is_compliant)
+
+ # Base64 version should also be non-compliant
+ is_compliant_b64 = self.cert_type.asn1_compliant(self.malformed_cert_b64)
+ self.assertFalse(is_compliant_b64)
+
+ def test_dump_preserves_original_bytes(self):
+ """Test that _dump() returns original bytes for malformed certificates."""
+ # Parse malformed certificate (triggers pyasn1 and caching)
+ cert = self.cert_type.cast(self.malformed_cert_der)
+
+ # Dump should return original bytes as Base64
+ dumped = self.cert_type._dump(cert) # pylint: disable=protected-access
+ self.assertEqual(dumped, self.malformed_cert_b64)
+
+ # Verify round-trip preservation
+ self.assertIsNotNone(dumped)
+ restored_bytes = base64.b64decode(dumped) # type: ignore[arg-type]
+ self.assertEqual(restored_bytes, self.malformed_cert_der)
+
+ def test_db_load_preserves_original_bytes(self):
+ """Test that db_load() preserves original bytes when reading from database."""
+ mock_dialect = Mock()
+
+ # Simulate loading from database with our malformed certificate
+ cert = self.cert_type.db_load(self.malformed_cert_b64, mock_dialect)
+
+ # Should successfully load certificate
+ self.assertIsNotNone(cert)
+
+ # Should have preserved original bytes in cache
+ self.assertTrue(self.cert_type.has_original_bytes()) # type: ignore[arg-type]
+
+ # Cached bytes should be the original DER bytes, base64-encoded
+ if self.cert_type._original_bytes_cache:
+ cached_bytes = self.cert_type._original_bytes_cache.decode("utf-8")
+ self.assertEqual(cached_bytes, self.malformed_cert_b64)
+
+ def test_database_round_trip_preservation(self):
+ """Test complete database round-trip preserves original bytes."""
+ mock_dialect = Mock()
+
+ # Step 1: Initial parsing (simulates agent registration)
+ cert1 = self.cert_type.cast(self.malformed_cert_der)
+ self.assertIsNotNone(cert1)
+ self.assertTrue(self.cert_type.has_original_bytes()) # type: ignore[arg-type]
+
+ # Step 2: Store to database simulation
+ db_value = self.cert_type._dump(cert1) # pylint: disable=protected-access
+ self.assertEqual(db_value, self.malformed_cert_b64)
+
+ # Step 3: Create fresh Certificate instance (simulates new session)
+ fresh_cert_type = Certificate()
+ self.assertEqual(fresh_cert_type._original_bytes_cache, None) # pylint: disable=protected-access
+
+ # Step 4: Load from database
+ cert2 = fresh_cert_type.db_load(db_value, mock_dialect)
+ self.assertIsNotNone(cert2)
+ self.assertTrue(fresh_cert_type.has_original_bytes()) # type: ignore[arg-type]
+
+ # Step 5: Verify original bytes preserved across round-trip
+ self.assertTrue(fresh_cert_type._original_bytes_cache)
+ cached_bytes = fresh_cert_type._original_bytes_cache.decode("utf-8") # type: ignore[arg-type]
+ self.assertEqual(cached_bytes, self.malformed_cert_b64)
+
+ # Step 6: Subsequent dump should still use original bytes
+ second_dump = fresh_cert_type._dump(cert2) # pylint: disable=protected-access
+ self.assertEqual(second_dump, self.malformed_cert_b64)
+
+ def test_compliant_certificate_no_caching(self):
+ """Test that ASN.1-compliant certificates don't use caching."""
+ # Create a simple ASN.1-compliant certificate for testing
+ # We'll use a well-formed Base64 string that represents valid DER
+ compliant_cert_b64 = "MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAwJKW9Q==" # Truncated for test
+
+ try:
+ # Try to parse - may fail due to truncation, but that's ok for this test
+ cert = self.cert_type.cast(compliant_cert_b64)
+ if cert: # Only test if parsing succeeded
+ # Should not have cached original bytes
+ self.assertFalse(self.cert_type.has_original_bytes())
+ self.assertEqual(self.cert_type._original_bytes_cache, None)
+ except (ValueError, Exception):
+ # Expected for truncated certificate - just verify no caching occurred
+ self.assertEqual(self.cert_type._original_bytes_cache, None)
+
+ def test_db_load_with_none_value(self):
+ """Test that db_load() handles None values correctly."""
+ mock_dialect = Mock()
+
+ result = self.cert_type.db_load(None, mock_dialect)
+ self.assertIsNone(result)
+ self.assertEqual(self.cert_type._original_bytes_cache, None)
+
+ def test_db_load_with_invalid_base64(self):
+ """Test that db_load() handles invalid Base64 gracefully."""
+ mock_dialect = Mock()
+
+ # Invalid Base64 string
+ invalid_b64 = "not_valid_base64!!!"
+
+ # Should not raise exception, but may return None or attempt fallback
+ try:
+ _ = self.cert_type.db_load(invalid_b64, mock_dialect)
+ # Result may be None or may attempt to parse the string directly
+ # The important thing is it doesn't crash
+ except Exception:
+ # Some exceptions are expected for invalid data
+ # We just want to ensure it's handled gracefully
+ pass
+
+ def test_render_method_unaffected(self):
+ """Test that render() method works normally with cached certificates."""
+ cert = self.cert_type.cast(self.malformed_cert_der)
+ self.assertIsNotNone(cert)
+
+ # render() should return PEM format (doesn't use cached bytes)
+ rendered = self.cert_type.render(cert)
+
+ # Should be PEM format
+ if rendered: # Only test if rendering succeeded
+ self.assertIn("-----BEGIN CERTIFICATE-----", rendered)
+ self.assertIn("-----END CERTIFICATE-----", rendered)
+
+
+if __name__ == "__main__":
+ unittest.main()
From 4a4084ec081bf3b8caf2d8450db6a86bdbb45c77 Mon Sep 17 00:00:00 2001
From: Anderson Toshiyuki Sasaki <ansasaki@redhat.com>
Date: Mon, 25 Aug 2025 18:19:44 +0200
Subject: [PATCH 2/2] registrar_agent: Use pyasn1 to parse PEM
The models.types.Certificate render() method returns a PEM formatted
certificate.
When the certificate is malformed, the original certificate is returned
by the render method().
Use pyasn1 to extract the DER content from the certificate to avoid
failing to parse due to python-cryptography strict ASN.1 parser.
Signed-off-by: Anderson Toshiyuki Sasaki <ansasaki@redhat.com>
---
keylime/models/registrar/registrar_agent.py | 17 +++++++++--------
1 file changed, 9 insertions(+), 8 deletions(-)
diff --git a/keylime/models/registrar/registrar_agent.py b/keylime/models/registrar/registrar_agent.py
index 58dd0854c..1d7d6ea05 100644
--- a/keylime/models/registrar/registrar_agent.py
+++ b/keylime/models/registrar/registrar_agent.py
@@ -1,9 +1,11 @@
import base64
import hmac
+import io
import cryptography.x509
from cryptography.hazmat.primitives.asymmetric import ec, rsa
from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat
+from pyasn1_modules import pem as pyasn1_pem
from keylime import cert_utils, config, crypto, keylime_logging
from keylime.models.base import (
@@ -356,13 +358,12 @@ def render(self, only=None):
output = super().render(only)
# When operating in pull mode, ekcert is encoded as Base64 instead of PEM
- if output.get("ekcert"):
- if self.ekcert.has_original_bytes:
- original_bytes = self.ekcert.get_original_bytes()
- if original_bytes is not None:
- # In case the cert was malformed, the original cert is
- # cached base64-encoded
- output["ekcert"] = original_bytes.decode("utf-8")
- output["ekcert"] = base64.b64encode(self.ekcert.public_bytes(Encoding.DER)).decode("utf-8")
+ ekcert = output.get("ekcert")
+ if ekcert:
+ der_data = pyasn1_pem.readPemFromFile(io.StringIO(ekcert))
+ if der_data:
+ output["ekcert"] = base64.b64encode(der_data).decode("utf-8")
+ else:
+ self._add_error("ekcert", "Failed to parse certificate")
return output

View File

@ -9,7 +9,7 @@
Name: keylime
Version: 7.12.1
Release: 11%{?dist}
Release: 11%{?dist}.1
Summary: Open source TPM software for Bootstrapping and Maintaining Trust
URL: https://github.com/keylime/keylime
@ -39,6 +39,7 @@ Patch: 0009-mb-support-vendor_db-as-logged-by-newer-shim-version.patch
Patch: 0010-verifier-Gracefully-shutdown-on-signal.patch
Patch: 0011-revocations-Try-to-send-notifications-on-shutdown.patch
Patch: 0012-requests_client-close-the-session-at-the-end-of-the-.patch
Patch: 0013-fix-malformed-certs-workaround.patch
License: ASL 2.0 and MIT