keylime/0013-fix-malformed-certs-workaround.patch
Anderson Toshiyuki Sasaki 552b797963
Properly fix malformed TPM certificates workaround
The previous fix attempt on commit 7b863e8 was incorrect. This is a
replacement for that fix.

Restore the possibility of using an alternative certificate verification
script to verify the EK certificate.

Resolves: RHEL-111244

Signed-off-by: Anderson Toshiyuki Sasaki <ansasaki@redhat.com>
2025-09-15 16:23:37 +02:00

1266 lines
61 KiB
Diff

diff --git a/keylime/certificate_wrapper.py b/keylime/certificate_wrapper.py
new file mode 100644
index 0000000..899a19a
--- /dev/null
+++ b/keylime/certificate_wrapper.py
@@ -0,0 +1,99 @@
+"""
+X.509 Certificate wrapper that preserves original bytes for malformed certificates.
+
+This module provides a wrapper around cryptography.x509.Certificate that preserves
+the original certificate bytes when the certificate required pyasn1 re-encoding
+due to ASN.1 DER non-compliance. This ensures signature validity is maintained
+throughout the database lifecycle.
+"""
+
+import base64
+from typing import Any, Dict, Optional
+
+import cryptography.x509
+from cryptography.hazmat.primitives.serialization import Encoding
+
+
+class CertificateWrapper:
+ """
+ A wrapper around cryptography.x509.Certificate that preserves original bytes
+ when malformed certificates require pyasn1 re-encoding.
+
+ This class wraps a cryptography.x509.Certificate and adds the ability
+ to store the original certificate bytes when the certificate was malformed
+ and required re-encoding using pyasn1. This ensures that signature validation
+ works correctly even for certificates that don't strictly follow ASN.1 DER.
+ """
+
+ def __init__(self, cert: cryptography.x509.Certificate, original_bytes: Optional[bytes] = None):
+ """
+ Initialize the wrapper certificate.
+
+ :param cert: The cryptography.x509.Certificate object
+ :param original_bytes: The original DER bytes if certificate was re-encoded, None otherwise
+ """
+ self._cert = cert
+ self._original_bytes = original_bytes
+
+ def __getattr__(self, name: str) -> Any:
+ """Delegate attribute access to the wrapped certificate."""
+ return getattr(self._cert, name)
+
+ def __setstate__(self, state: Dict[str, Any]) -> None:
+ """Support for pickling."""
+ self.__dict__.update(state)
+
+ def __getstate__(self) -> Dict[str, Any]:
+ """Support for pickling."""
+ return self.__dict__
+
+ @property
+ def has_original_bytes(self) -> bool:
+ """Check if this certificate has preserved original bytes."""
+ return self._original_bytes is not None
+
+ @property
+ def original_bytes(self) -> Optional[bytes]:
+ """Return the preserved original bytes if available."""
+ return self._original_bytes
+
+ def public_bytes(self, encoding: Encoding) -> bytes:
+ """
+ Return certificate bytes, using original bytes when available.
+
+ For certificates with preserved original bytes, this method always uses
+ the original DER bytes to maintain signature validity. For PEM encoding,
+ it converts the original DER bytes to PEM format.
+ """
+ if self.has_original_bytes:
+ if encoding == Encoding.DER:
+ return self._original_bytes # type: ignore[return-value]
+ if encoding == Encoding.PEM:
+ # Convert original DER bytes to PEM format
+ der_b64 = base64.b64encode(self._original_bytes).decode("utf-8") # type: ignore[arg-type]
+ # Split into 64-character lines per PEM specification (RFC 1421)
+ lines = [der_b64[i : i + 64] for i in range(0, len(der_b64), 64)]
+ # Create PEM format with proper headers
+ pem_content = "\n".join(["-----BEGIN CERTIFICATE-----"] + lines + ["-----END CERTIFICATE-----"]) + "\n"
+ return pem_content.encode("utf-8")
+
+ # For certificates without original bytes, use standard method
+ return self._cert.public_bytes(encoding)
+
+ # Delegate common certificate methods to maintain full compatibility
+ def __str__(self) -> str:
+ return f"CertificateWrapper(subject={self._cert.subject})"
+
+ def __repr__(self) -> str:
+ return f"CertificateWrapper(subject={self._cert.subject}, has_original_bytes={self.has_original_bytes})"
+
+
+def wrap_certificate(cert: cryptography.x509.Certificate, original_bytes: Optional[bytes] = None) -> CertificateWrapper:
+ """
+ Factory function to create a wrapped certificate.
+
+ :param cert: The cryptography.x509.Certificate object
+ :param original_bytes: The original DER bytes if certificate was re-encoded
+ :returns: Wrapped certificate that preserves original bytes
+ """
+ return CertificateWrapper(cert, original_bytes)
diff --git a/keylime/models/base/types/certificate.py b/keylime/models/base/types/certificate.py
index 0f03169..f6cdd48 100644
--- a/keylime/models/base/types/certificate.py
+++ b/keylime/models/base/types/certificate.py
@@ -12,6 +12,7 @@ from pyasn1_modules import pem as pyasn1_pem
from pyasn1_modules import rfc2459 as pyasn1_rfc2459
from sqlalchemy.types import Text
+from keylime.certificate_wrapper import CertificateWrapper, wrap_certificate
from keylime.models.base.type import ModelType
@@ -78,19 +79,20 @@ class Certificate(ModelType):
cert = Certificate().cast("-----BEGIN CERTIFICATE-----\nMIIE...")
"""
- IncomingValue = Union[cryptography.x509.Certificate, bytes, str, None]
+ IncomingValue = Union[cryptography.x509.Certificate, CertificateWrapper, bytes, str, None]
def __init__(self) -> None:
super().__init__(Text)
- def _load_der_cert(self, der_cert_data: bytes) -> cryptography.x509.Certificate:
- """Loads a binary x509 certificate encoded using ASN.1 DER as a ``cryptography.x509.Certificate`` object. This
+ def _load_der_cert(self, der_cert_data: bytes) -> CertificateWrapper:
+ """Loads a binary x509 certificate encoded using ASN.1 DER as a ``CertificateWrapper`` object. This
method does not require strict adherence to ASN.1 DER thereby making it possible to accept certificates which do
not follow every detail of the spec (this is the case for a number of TPM certs) [1,2].
It achieves this by first using the strict parser provided by python-cryptography. If that fails, it decodes the
certificate and re-encodes it using the more-forgiving pyasn1 library. The re-encoded certificate is then
- re-parsed by python-cryptography.
+ re-parsed by python-cryptography. For malformed certificates requiring re-encoding, the original bytes are
+ preserved in the wrapper to maintain signature validity.
This method is equivalent to the ``cert_utils.x509_der_cert`` function but does not produce a warning when the
backup parser is used, allowing this condition to be optionally detected and handled by the model where
@@ -106,24 +108,28 @@ class Certificate(ModelType):
:raises: :class:`SubstrateUnderrunError`: cert could not be deserialized even using the fallback pyasn1 parser
- :returns: A ``cryptography.x509.Certificate`` object
+ :returns: A ``CertificateWrapper`` object
"""
try:
- return cryptography.x509.load_der_x509_certificate(der_cert_data)
+ cert = cryptography.x509.load_der_x509_certificate(der_cert_data)
+ return wrap_certificate(cert, None)
except Exception:
pyasn1_cert = pyasn1_decoder.decode(der_cert_data, asn1Spec=pyasn1_rfc2459.Certificate())[0]
- return cryptography.x509.load_der_x509_certificate(pyasn1_encoder.encode(pyasn1_cert))
+ cert = cryptography.x509.load_der_x509_certificate(pyasn1_encoder.encode(pyasn1_cert))
+ # Preserve the original bytes when re-encoding is necessary
+ return wrap_certificate(cert, der_cert_data)
- def _load_pem_cert(self, pem_cert_data: str) -> cryptography.x509.Certificate:
+ def _load_pem_cert(self, pem_cert_data: str) -> CertificateWrapper:
"""Loads a text x509 certificate encoded using PEM (Base64ed DER with header and footer) as a
- ``cryptography.x509.Certificate`` object. This method does not require strict adherence to ASN.1 DER thereby
+ ``CertificateWrapper`` object. This method does not require strict adherence to ASN.1 DER thereby
making it possible to accept certificates which do not follow every detail of the spec (this is the case for
a number of TPM certs) [1,2].
It achieves this by first using the strict parser provided by python-cryptography. If that fails, it decodes the
certificate and re-encodes it using the more-forgiving pyasn1 library. The re-encoded certificate is then
- re-parsed by python-cryptography.
+ re-parsed by python-cryptography. For malformed certificates requiring re-encoding, the original DER bytes are
+ preserved in the wrapper to maintain signature validity.
This method is equivalent to the ``cert_utils.x509_der_cert`` function but does not produce a warning when the
backup parser is used, allowing this condition to be optionally detected and handled by the model where
@@ -135,19 +141,24 @@ class Certificate(ModelType):
[2] https://github.com/pyca/cryptography/issues/7189
[3] https://github.com/keylime/keylime/issues/1559
- :param der_cert_data: the DER bytes of the certificate
+ :param pem_cert_data: the PEM text of the certificate
:raises: :class:`SubstrateUnderrunError`: cert could not be deserialized even using the fallback pyasn1 parser
- :returns: A ``cryptography.x509.Certificate`` object
+ :returns: A ``CertificateWrapper`` object
"""
try:
- return cryptography.x509.load_pem_x509_certificate(pem_cert_data.encode("utf-8"))
+ cert = cryptography.x509.load_pem_x509_certificate(pem_cert_data.encode("utf-8"))
+ return wrap_certificate(cert, None)
except Exception:
der_data = pyasn1_pem.readPemFromFile(io.StringIO(pem_cert_data))
pyasn1_cert = pyasn1_decoder.decode(der_data, asn1Spec=pyasn1_rfc2459.Certificate())[0]
- return cryptography.x509.load_der_x509_certificate(pyasn1_encoder.encode(pyasn1_cert))
+ cert = cryptography.x509.load_der_x509_certificate(pyasn1_encoder.encode(pyasn1_cert))
+ # Only preserve original bytes if we have valid DER data
+ original_bytes = der_data if isinstance(der_data, bytes) and der_data else None
+ # Preserve the original bytes when re-encoding is necessary
+ return wrap_certificate(cert, original_bytes)
def infer_encoding(self, value: IncomingValue) -> Optional[str]:
"""Tries to infer the certificate encoding from the given value based on the data type and other surface-level
@@ -159,15 +170,21 @@ class Certificate(ModelType):
:returns: ``"der"`` when the value appears to be DER encoded
:returns: ``"pem"`` when the value appears to be PEM encoded
:returns: ``"base64"`` when the value appears to be Base64(DER) encoded (without PEM headers)
+ :returns: ``"wrapped"`` when the value is already a ``CertificateWrapper`` object
:returns: ``"decoded"`` when the value is already a ``cryptography.x509.Certificate`` object
+ :returns: ``"disabled"`` when the value is the string "disabled"
:returns: ``None`` when the encoding cannot be inferred
"""
# pylint: disable=no-else-return
- if isinstance(value, cryptography.x509.Certificate):
+ if isinstance(value, CertificateWrapper):
+ return "wrapped"
+ elif isinstance(value, cryptography.x509.Certificate):
return "decoded"
elif isinstance(value, bytes):
return "der"
+ elif isinstance(value, str) and value == "disabled":
+ return "disabled"
elif isinstance(value, str) and value.startswith("-----BEGIN CERTIFICATE-----"):
return "pem"
elif isinstance(value, str):
@@ -190,19 +207,25 @@ class Certificate(ModelType):
:param value: The value in DER, Base64(DER), or PEM format (or an already deserialized certificate object)
:returns: ``"True"`` if the value can be deserialized by python-cryptography and is ASN.1 DER compliant
+ :returns: ``"True"`` if the value is the string "disabled" (considered compliant as it's a valid field value)
:returns: ``"False"`` if the value cannot be deserialized by python-cryptography
:returns: ``None`` if the value is already a deserialized certificate of type ``cryptography.x509.Certificate``
"""
try:
encoding_inf = self.infer_encoding(value)
+ if encoding_inf == "wrapped":
+ # For CertificateWrapper objects, check if they have original bytes (indicating re-encoding was needed)
+ return not value.has_original_bytes # type: ignore[union-attr]
if encoding_inf == "decoded":
return None
+ if encoding_inf == "disabled":
+ return True
if encoding_inf == "der":
cryptography.x509.load_der_x509_certificate(value) # type: ignore[reportArgumentType, arg-type]
elif encoding_inf == "pem":
- cryptography.x509.load_pem_x509_certificate(value) # type: ignore[reportArgumentType, arg-type]
+ cryptography.x509.load_pem_x509_certificate(value.encode("utf-8")) # type: ignore[reportArgumentType, arg-type, union-attr]
elif encoding_inf == "base64":
der_value = base64.b64decode(value, validate=True) # type: ignore[reportArgumentType, arg-type]
cryptography.x509.load_der_x509_certificate(der_value)
@@ -213,25 +236,27 @@ class Certificate(ModelType):
return True
- def cast(self, value: IncomingValue) -> Optional[cryptography.x509.Certificate]:
+ def cast(self, value: IncomingValue) -> Optional[CertificateWrapper]:
"""Tries to interpret the given value as an X.509 certificate and convert it to a
- ``cryptography.x509.Certificate`` object. Values which do not require conversion are returned unchanged.
+ ``CertificateWrapper`` object. Values which do not require conversion are returned unchanged.
:param value: The value to convert (may be in DER, Base64(DER), or PEM format)
:raises: :class:`TypeError`: ``value`` is of an unexpected data type
:raises: :class:`ValueError`: ``value`` does not contain data which is interpretable as a certificate
- :returns: A ``cryptography.x509.Certificate`` object or None if an empty value is given
+ :returns: A ``CertificateWrapper`` object or None if an empty value is given
"""
if not value:
return None
encoding_inf = self.infer_encoding(value)
+ if encoding_inf == "wrapped":
+ return value # type: ignore[return-value]
if encoding_inf == "decoded":
- return value # type: ignore[reportReturnType, return-value]
-
+ # Wrap raw cryptography certificate without original bytes
+ return wrap_certificate(value, None) # type: ignore[arg-type]
if encoding_inf == "der":
try:
return self._load_der_cert(value) # type: ignore[reportArgumentType, arg-type]
@@ -271,7 +296,6 @@ class Certificate(ModelType):
if not cert:
return None
- # Save as Base64-encoded value (without the PEM "BEGIN" and "END" header/footer for efficiency)
return base64.b64encode(cert.public_bytes(Encoding.DER)).decode("utf-8")
def render(self, value: IncomingValue) -> Optional[str]:
@@ -281,9 +305,8 @@ class Certificate(ModelType):
if not cert:
return None
- # Render certificate in PEM format
return cert.public_bytes(Encoding.PEM).decode("utf-8") # type: ignore[no-any-return]
@property
def native_type(self) -> type:
- return cryptography.x509.Certificate
+ return CertificateWrapper
diff --git a/keylime/models/registrar/registrar_agent.py b/keylime/models/registrar/registrar_agent.py
index b232049..680316b 100644
--- a/keylime/models/registrar/registrar_agent.py
+++ b/keylime/models/registrar/registrar_agent.py
@@ -1,7 +1,6 @@
import base64
import hmac
-import cryptography.x509
from cryptography.hazmat.primitives.asymmetric import ec, rsa
from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat
@@ -116,35 +115,35 @@ class RegistrarAgent(PersistableModel):
if not cert_utils.verify_cert(cert, trust_store, cert_type):
self._add_error(cert_field, "must contain a certificate issued by a CA present in the trust store")
- def _check_cert_compliance(self, cert_field, raw_cert):
+ def _check_cert_compliance(self, cert_field):
new_cert = self.changes.get(cert_field)
old_cert = self.values.get(cert_field)
# If the certificate field has not been changed, no need to perform check
- if not raw_cert or not new_cert:
+ if not new_cert:
+ return True
+
+ # If the certificate field is set as "disabled" (for mtls_cert)
+ if new_cert == "disabled":
return True
# If the new certificate value is the same as the old certificate value, no need to perform check
- if (
- isinstance(new_cert, cryptography.x509.Certificate)
- and isinstance(old_cert, cryptography.x509.Certificate)
- and new_cert.public_bytes(Encoding.DER) == old_cert.public_bytes(Encoding.DER)
- ):
+ if old_cert and new_cert.public_bytes(Encoding.DER) == old_cert.public_bytes(Encoding.DER):
return True
- compliant = Certificate().asn1_compliant(raw_cert)
+ compliant = Certificate().asn1_compliant(new_cert)
if not compliant:
if config.get("registrar", "malformed_cert_action") == "reject":
- self._add_error(cert_field, Certificate().generate_error_msg(raw_cert))
+ self._add_error(cert_field, Certificate().generate_error_msg(new_cert))
return compliant
- def _check_all_cert_compliance(self, data):
+ def _check_all_cert_compliance(self):
non_compliant_certs = []
for field_name in ("ekcert", "iak_cert", "idevid_cert", "mtls_cert"):
- if not self._check_cert_compliance(field_name, data.get(field_name)):
+ if not self._check_cert_compliance(field_name):
non_compliant_certs.append(f"'{field_name}'")
if not non_compliant_certs:
@@ -291,7 +290,7 @@ class RegistrarAgent(PersistableModel):
# Ensure either an EK or IAK/IDevID is present, depending on configuration
self._check_root_identity_presence()
# Handle certificates which are not fully compliant with ASN.1 DER
- self._check_all_cert_compliance(data)
+ self._check_all_cert_compliance()
# Basic validation of values
self.validate_required(["aik_tpm"])
diff --git a/test/test_certificate_modeltype.py b/test/test_certificate_modeltype.py
new file mode 100644
index 0000000..335ae0f
--- /dev/null
+++ b/test/test_certificate_modeltype.py
@@ -0,0 +1,197 @@
+"""
+Unit tests for the Certificate ModelType class.
+
+This module tests the certificate model type functionality including
+encoding inference and ASN.1 compliance checking.
+"""
+
+import base64
+import unittest
+
+import cryptography.x509
+from cryptography.hazmat.primitives.serialization import Encoding
+
+from keylime.certificate_wrapper import CertificateWrapper, wrap_certificate
+from keylime.models.base.types.certificate import Certificate
+
+
+class TestCertificateModelType(unittest.TestCase):
+ """Test cases for Certificate ModelType class."""
+
+ def setUp(self):
+ """Set up test fixtures."""
+ self.cert_type = Certificate()
+
+ # Compliant certificate for testing (loads fine with python-cryptography)
+ self.compliant_cert_pem = """-----BEGIN CERTIFICATE-----
+MIIClzCCAX+gAwIBAgIBATANBgkqhkiG9w0BAQsFADAPMQ0wCwYDVQQDDARUZXN0
+MB4XDTI1MDkxMTEyNDU1MVoXDTI2MDkxMTEyNDU1MVowDzENMAsGA1UEAwwEVGVz
+dDCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAO2V27HsMnKczHCaLgf9
+FtxuorvkA5OMkz6KsW1eyryHr0TJ801prLpeNnMZ3U4pqLMqocMc7T2KO6nPZJxO
+7zRzehyo9pBBVO4pUR1QMGoTWuJQbqNieDQ4V9dW67N5wp/UWEkK6CNNd6aXjswb
+dVaDbIfDL8hMX6Lil3+pTysRWGqjRvBGJxS9r/mYRAvbz1JHPjfegSc0uxnUE+qZ
+SrbWa3TN82LX6jw6tKk0Z3CcPJC6QN+ijCxxAoHyLRYUIgZbAKe/FGRbjO0fuW11
+L7TcE1k3eaC7RkvotIaCOW/RMOkwKu1MbCzFEA2YRYf9covEwdItzI4FE++ZJrsz
+LhUCAwEAaTANBgkqhkiG9w0BAQsFAAOCAQEAeqqJT0LnmAluAjrsCSK/eYYjwjhZ
+aKMi/iBO10zfb+GvT4yqEL5gnuWxJEx4TTcDww1clvOC1EcPUZFaKR3GIBGy0ZgJ
+zGCfg+sC6liyZ+4PSWSJHD2dT5N3IGp4/hPsrhKnVb9fYbRc0Bc5VHeS9QQoSJDH
+f9EbxCcwdErVllRter29OZCb4XnEEbTqLIKRYVrbsu/t4C+vzi0tmKg5HZXf9PMo
+D28zJGsCAr8sKW/iUKObqDOHEn56lk12NTJmJmi+g6rEikk/0czJlRjSGnJQLjUg
+d4wslruibXBsLPtJw2c6vTC2SV2F1PXwy5j1OKU+D6nxaaItQvWADEjcTg==
+-----END CERTIFICATE-----"""
+
+ # Malformed certificate that requires pyasn1 re-encoding (fails with python-cryptography)
+ self.malformed_cert_b64 = (
+ "MIIDUjCCAvegAwIBAgILAI5xYHQ14nH5hdYwCgYIKoZIzj0EAwIwVTFTMB8GA1UEAxMYTnV2b3Rv"
+ "biBUUE0gUm9vdCBDQSAyMTExMCUGA1UEChMeTnV2b3RvbiBUZWNobm9sb2d5IENvcnBvcmF0aW9u"
+ "MAkGA1UEBhMCVFcwHhcNMTkwNzIzMTcxNTEzWhcNMzkwNzE5MTcxNTEzWjAAMIIBIjANBgkqhkiG"
+ "9w0BAQEFAAOCAQ8AMIIBCgKCAQEAk8kCj7srY/Zlvm1795fVXdyX44w5qsd1m5VywMDgSOavzPKO"
+ "kgbHgQNx6Ak5+4Q43EJ/5qsaDBv59F8W7K69maUwcMNq1xpuq0V/LiwgJVAtc3CdvlxtwQrn7+Uq"
+ "ieIGf+i8sGxpeUCSmYHJPTHNHqjQnvUtdGoy/+WO0i7WsAvX3k/gHHr4p58a8urjJ1RG2Lk1g48D"
+ "ESwl+D7atQEPWzgjr6vK/s5KpLrn7M+dh97TUbG1510AOWBPP35MtT8IZbqC4hs2Ol16gT1M3a9e"
+ "+GaMZkItLUwV76vKDNEgTZG8M1C9OItA/xwzlfXbPepzpxWb4kzHS4qZoQtl4vBZrQIDAQABo4IB"
+ "NjCCATIwUAYDVR0RAQH/BEYwRKRCMEAxPjAUBgVngQUCARMLaWQ6NEU1NDQzMDAwEAYFZ4EFAgIT"
+ "B05QQ1Q3NXgwFAYFZ4EFAgMTC2lkOjAwMDcwMDAyMAwGA1UdEwEB/wQCMAAwEAYDVR0lBAkwBwYF"
+ "Z4EFCAEwHwYDVR0jBBgwFoAUI/TiKtO+N0pEl3KVSqKDrtdSVy4wDgYDVR0PAQH/BAQDAgUgMCIG"
+ "A1UdCQQbMBkwFwYFZ4EFAhAxDjAMDAMyLjACAQACAgCKMGkGCCsGAQUFBwEBBF0wWzBZBggrBgEF"
+ "BQcwAoZNaHR0cHM6Ly93d3cubnV2b3Rvbi5jb20vc2VjdXJpdHkvTlRDLVRQTS1FSy1DZXJ0L051"
+ "dm90b24gVFBNIFJvb3QgQ0EgMjExMS5jZXIwCgYIKoZIzj0EAwIDSQAwRgIhAPHOFiBDZd0dfml2"
+ "a/KlPFhmX7Ahpd0Wq11ZUW1/ixviAiEAlex8BB5nsR6w8QrANwCxc7fH/YnbjXfMCFiWzeZH7ps="
+ )
+
+ # Load certificates for testing
+ self.compliant_cert = cryptography.x509.load_pem_x509_certificate(self.compliant_cert_pem.encode())
+ self.malformed_cert_der = base64.b64decode(self.malformed_cert_b64)
+
+ def test_infer_encoding_wrapped_certificate(self):
+ """Test that CertificateWrapper objects are identified as 'wrapped'."""
+ wrapped_cert = wrap_certificate(self.compliant_cert, None)
+ encoding = self.cert_type.infer_encoding(wrapped_cert)
+ self.assertEqual(encoding, "wrapped")
+
+ def test_infer_encoding_raw_certificate(self):
+ """Test that raw cryptography.x509.Certificate objects are identified as 'decoded'."""
+ encoding = self.cert_type.infer_encoding(self.compliant_cert)
+ self.assertEqual(encoding, "decoded")
+
+ def test_infer_encoding_der_bytes(self):
+ """Test that DER bytes are identified as 'der'."""
+ der_bytes = self.compliant_cert.public_bytes(Encoding.DER)
+ encoding = self.cert_type.infer_encoding(der_bytes)
+ self.assertEqual(encoding, "der")
+
+ def test_infer_encoding_pem_string(self):
+ """Test that PEM strings are identified as 'pem'."""
+ encoding = self.cert_type.infer_encoding(self.compliant_cert_pem)
+ self.assertEqual(encoding, "pem")
+
+ def test_infer_encoding_base64_string(self):
+ """Test that Base64 strings are identified as 'base64'."""
+ encoding = self.cert_type.infer_encoding(self.malformed_cert_b64)
+ self.assertEqual(encoding, "base64")
+
+ def test_infer_encoding_none_for_invalid(self):
+ """Test that invalid types return None."""
+ encoding = self.cert_type.infer_encoding(12345) # type: ignore[arg-type] # Testing invalid type
+ self.assertIsNone(encoding)
+
+ def test_asn1_compliant_wrapped_without_original_bytes(self):
+ """Test that CertificateWrapper without original bytes is ASN.1 compliant."""
+ wrapped_cert = wrap_certificate(self.compliant_cert, None)
+ compliant = self.cert_type.asn1_compliant(wrapped_cert)
+ self.assertTrue(compliant)
+
+ def test_asn1_compliant_wrapped_with_original_bytes(self):
+ """Test that CertificateWrapper with original bytes is not ASN.1 compliant."""
+ wrapped_cert = wrap_certificate(self.compliant_cert, b"fake_original_bytes")
+ compliant = self.cert_type.asn1_compliant(wrapped_cert)
+ self.assertFalse(compliant)
+
+ def test_asn1_compliant_raw_certificate(self):
+ """Test that raw cryptography.x509.Certificate returns None (already decoded)."""
+ compliant = self.cert_type.asn1_compliant(self.compliant_cert)
+ self.assertIsNone(compliant)
+
+ def test_asn1_compliant_pem_strings(self):
+ """Test ASN.1 compliance checking on PEM strings."""
+ # The regular certificate and TPM certificate from test_registrar_db.py are actually ASN.1 compliant
+ # and can be loaded directly by python-cryptography without requiring pyasn1 re-encoding
+ compliant_regular = self.cert_type.asn1_compliant(self.compliant_cert_pem)
+ # Only test one certificate since both are the same type (ASN.1 compliant)
+
+ # Should be ASN.1 compliant (True) since it loads fine with python-cryptography
+ self.assertTrue(compliant_regular)
+
+ def test_asn1_compliant_der_and_base64(self):
+ """Test ASN.1 compliance checking on DER and Base64 formats."""
+ # Test DER bytes - regular certificate should be compliant
+ der_bytes = self.compliant_cert.public_bytes(Encoding.DER)
+ compliant_der = self.cert_type.asn1_compliant(der_bytes)
+ self.assertTrue(compliant_der)
+
+ # Test Base64 string - regular certificate should be compliant
+ b64_string = base64.b64encode(der_bytes).decode("utf-8")
+ compliant_b64 = self.cert_type.asn1_compliant(b64_string)
+ self.assertTrue(compliant_b64)
+
+ def test_asn1_compliant_malformed_certificate(self):
+ """Test ASN.1 compliance checking on a truly malformed certificate."""
+ # Test the malformed certificate that requires pyasn1 re-encoding
+ compliant = self.cert_type.asn1_compliant(self.malformed_cert_b64)
+ self.assertFalse(compliant) # Should be non-compliant since it needs pyasn1 fallback
+
+ def test_asn1_compliant_invalid_data(self):
+ """Test that invalid certificate data is not ASN.1 compliant."""
+ compliant = self.cert_type.asn1_compliant("invalid_certificate_data")
+ self.assertFalse(compliant)
+
+ def test_cast_wrapped_certificate(self):
+ """Test that CertificateWrapper objects are returned unchanged."""
+ wrapped_cert = wrap_certificate(self.compliant_cert, None)
+ result = self.cert_type.cast(wrapped_cert)
+ self.assertIs(result, wrapped_cert)
+
+ def test_cast_raw_certificate_to_wrapped(self):
+ """Test that raw certificates are wrapped without original bytes."""
+ result = self.cert_type.cast(self.compliant_cert)
+ self.assertIsInstance(result, CertificateWrapper)
+ assert result is not None # For type checker
+ self.assertFalse(result.has_original_bytes)
+
+ def test_cast_pem_strings(self):
+ """Test casting PEM strings to CertificateWrapper."""
+ # Test regular certificate - should be ASN.1 compliant, no original bytes needed
+ result_regular = self.cert_type.cast(self.compliant_cert_pem)
+ self.assertIsInstance(result_regular, CertificateWrapper)
+ assert result_regular is not None # For type checker
+ self.assertFalse(result_regular.has_original_bytes)
+
+ # Note: Only testing compliant certificate since we now use one consistent certificate for all compliant scenarios
+
+ def test_cast_malformed_certificate(self):
+ """Test casting the malformed certificate that requires pyasn1 re-encoding."""
+ result = self.cert_type.cast(self.malformed_cert_b64)
+ self.assertIsInstance(result, CertificateWrapper)
+ assert result is not None # For type checker
+ # Malformed certificate should have original bytes since it needs re-encoding
+ self.assertTrue(result.has_original_bytes)
+
+ def test_cast_der_bytes(self):
+ """Test casting DER bytes to CertificateWrapper."""
+ der_bytes = self.compliant_cert.public_bytes(Encoding.DER)
+ result = self.cert_type.cast(der_bytes)
+ self.assertIsInstance(result, CertificateWrapper)
+
+ def test_cast_none_value(self):
+ """Test that None values return None."""
+ result = self.cert_type.cast(None)
+ self.assertIsNone(result)
+
+ def test_cast_empty_string(self):
+ """Test that empty strings return None."""
+ result = self.cert_type.cast("")
+ self.assertIsNone(result)
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/test/test_certificate_wrapper.py b/test/test_certificate_wrapper.py
new file mode 100644
index 0000000..6b47260
--- /dev/null
+++ b/test/test_certificate_wrapper.py
@@ -0,0 +1,385 @@
+"""
+Unit tests for the CertificateWrapper class.
+
+This module tests the certificate wrapper functionality that preserves original bytes
+for malformed certificates requiring pyasn1 re-encoding.
+"""
+
+import base64
+import subprocess
+import tempfile
+import unittest
+from unittest.mock import Mock
+
+import cryptography.x509
+from cryptography.hazmat.primitives.serialization import Encoding
+from pyasn1.codec.der import decoder as pyasn1_decoder
+from pyasn1.codec.der import encoder as pyasn1_encoder
+from pyasn1_modules import rfc2459 as pyasn1_rfc2459
+
+from keylime.certificate_wrapper import CertificateWrapper, wrap_certificate
+
+
+class TestCertificateWrapper(unittest.TestCase):
+ """Test cases for CertificateWrapper class."""
+
+ def setUp(self):
+ """Set up test fixtures."""
+ # Malformed certificate (Base64 encoded) that requires pyasn1 re-encoding
+ # This is a real TPM certificate that doesn't strictly follow ASN.1 DER rules
+ self.malformed_cert_b64 = (
+ "MIIDUjCCAvegAwIBAgILAI5xYHQ14nH5hdYwCgYIKoZIzj0EAwIwVTFTMB8GA1UEAxMYTnV2b3Rv"
+ "biBUUE0gUm9vdCBDQSAyMTExMCUGA1UEChMeTnV2b3RvbiBUZWNobm9sb2d5IENvcnBvcmF0aW9u"
+ "MAkGA1UEBhMCVFcwHhcNMTkwNzIzMTcxNTEzWhcNMzkwNzE5MTcxNTEzWjAAMIIBIjANBgkqhkiG"
+ "9w0BAQEFAAOCAQ8AMIIBCgKCAQEAk8kCj7srY/Zlvm1795fVXdyX44w5qsd1m5VywMDgSOavzPKO"
+ "kgbHgQNx6Ak5+4Q43EJ/5qsaDBv59F8W7K69maUwcMNq1xpuq0V/LiwgJVAtc3CdvlxtwQrn7+Uq"
+ "ieIGf+i8sGxpeUCSmYHJPTHNHqjQnvUtdGoy/+WO0i7WsAvX3k/gHHr4p58a8urjJ1RG2Lk1g48D"
+ "ESwl+D7atQEPWzgjr6vK/s5KpLrn7M+dh97TUbG1510AOWBPP35MtT8IZbqC4hs2Ol16gT1M3a9e"
+ "+GaMZkItLUwV76vKDNEgTZG8M1C9OItA/xwzlfXbPepzpxWb4kzHS4qZoQtl4vBZrQIDAQABo4IB"
+ "NjCCATIwUAYDVR0RAQH/BEYwRKRCMEAxPjAUBgVngQUCARMLaWQ6NEU1NDQzMDAwEAYFZ4EFAgIT"
+ "B05QQ1Q3NXgwFAYFZ4EFAgMTC2lkOjAwMDcwMDAyMAwGA1UdEwEB/wQCMAAwEAYDVR0lBAkwBwYF"
+ "Z4EFCAEwHwYDVR0jBBgwFoAUI/TiKtO+N0pEl3KVSqKDrtdSVy4wDgYDVR0PAQH/BAQDAgUgMCIG"
+ "A1UdCQQbMBkwFwYFZ4EFAhAxDjAMDAMyLjACAQACAgCKMGkGCCsGAQUFBwEBBF0wWzBZBggrBgEF"
+ "BQcwAoZNaHR0cHM6Ly93d3cubnV2b3Rvbi5jb20vc2VjdXJpdHkvTlRDLVRQTS1FSy1DZXJ0L051"
+ "dm90b24gVFBNIFJvb3QgQ0EgMjExMS5jZXIwCgYIKoZIzj0EAwIDSQAwRgIhAPHOFiBDZd0dfml2"
+ "a/KlPFhmX7Ahpd0Wq11ZUW1/ixviAiEAlex8BB5nsR6w8QrANwCxc7fH/YnbjXfMCFiWzeZH7ps="
+ )
+ self.malformed_cert_der = base64.b64decode(self.malformed_cert_b64)
+
+ # Create a mock certificate for testing
+ self.mock_cert = Mock(spec=cryptography.x509.Certificate)
+ self.mock_cert.subject = Mock()
+ self.mock_cert.subject.__str__ = Mock(return_value="CN=Test Certificate")
+ self.mock_cert.public_bytes.return_value = b"mock_der_data"
+
+ def test_init_without_original_bytes(self):
+ """Test wrapper initialization without original bytes."""
+ wrapper = CertificateWrapper(self.mock_cert)
+
+ # Test through public interface
+ self.assertFalse(wrapper.has_original_bytes)
+ self.assertIsNone(wrapper.original_bytes)
+ # Test delegation works
+ self.assertEqual(wrapper.subject, self.mock_cert.subject)
+
+ def test_init_with_original_bytes(self):
+ """Test wrapper initialization with original bytes."""
+ original_data = b"original_certificate_data"
+ wrapper = CertificateWrapper(self.mock_cert, original_data)
+
+ # Test through public interface
+ self.assertTrue(wrapper.has_original_bytes)
+ self.assertEqual(wrapper.original_bytes, original_data)
+ # Test delegation works
+ self.assertEqual(wrapper.subject, self.mock_cert.subject)
+
+ def test_getattr_delegation(self):
+ """Test that attributes are properly delegated to the wrapped certificate."""
+ wrapper = CertificateWrapper(self.mock_cert)
+
+ # Access an attribute that should be delegated
+ result = wrapper.subject
+ self.assertEqual(result, self.mock_cert.subject)
+
+ def test_public_bytes_der_without_original(self):
+ """Test public_bytes DER encoding without original bytes."""
+ wrapper = CertificateWrapper(self.mock_cert)
+
+ result = wrapper.public_bytes(Encoding.DER)
+
+ self.mock_cert.public_bytes.assert_called_once_with(Encoding.DER)
+ self.assertEqual(result, b"mock_der_data")
+
+ def test_public_bytes_der_with_original(self):
+ """Test public_bytes DER encoding with original bytes."""
+ original_data = b"original_certificate_data"
+ wrapper = CertificateWrapper(self.mock_cert, original_data)
+
+ result = wrapper.public_bytes(Encoding.DER)
+
+ # Should return original bytes, not call the wrapped certificate
+ self.mock_cert.public_bytes.assert_not_called()
+ self.assertEqual(result, original_data)
+
+ def test_public_bytes_pem_without_original(self):
+ """Test public_bytes PEM encoding without original bytes."""
+ self.mock_cert.public_bytes.return_value = b"-----BEGIN CERTIFICATE-----\nMIIB...\n-----END CERTIFICATE-----\n"
+ wrapper = CertificateWrapper(self.mock_cert)
+
+ result = wrapper.public_bytes(Encoding.PEM)
+
+ self.mock_cert.public_bytes.assert_called_once_with(Encoding.PEM)
+ self.assertEqual(result, b"-----BEGIN CERTIFICATE-----\nMIIB...\n-----END CERTIFICATE-----\n")
+
+ def test_public_bytes_pem_with_original(self):
+ """Test public_bytes PEM encoding with original bytes."""
+ original_data = self.malformed_cert_der
+ wrapper = CertificateWrapper(self.mock_cert, original_data)
+
+ result = wrapper.public_bytes(Encoding.PEM)
+
+ # Should not call the wrapped certificate's method
+ self.mock_cert.public_bytes.assert_not_called()
+
+ # Result should be PEM format derived from original bytes
+ self.assertIsInstance(result, bytes)
+ result_str = result.decode("utf-8")
+ self.assertTrue(result_str.startswith("-----BEGIN CERTIFICATE-----"))
+ self.assertTrue(result_str.endswith("-----END CERTIFICATE-----\n"))
+
+ # Verify that the PEM content can be converted back to the original DER
+ pem_lines = result_str.strip().split("\n")
+ pem_content = "".join(pem_lines[1:-1]) # Remove headers and join
+ recovered_der = base64.b64decode(pem_content)
+ self.assertEqual(recovered_der, original_data)
+
+ def test_pem_line_length_compliance(self):
+ """Test that PEM output follows RFC 1421 line length requirements (64 chars)."""
+ original_data = self.malformed_cert_der
+ wrapper = CertificateWrapper(self.mock_cert, original_data)
+
+ result = wrapper.public_bytes(Encoding.PEM)
+ result_str = result.decode("utf-8")
+
+ lines = result_str.strip().split("\n")
+ # Check that content lines (excluding headers) are max 64 chars
+ for line in lines[1:-1]: # Skip header and footer
+ self.assertLessEqual(len(line), 64)
+
+ def test_str_representation(self):
+ """Test string representation of the wrapper."""
+ wrapper = CertificateWrapper(self.mock_cert)
+
+ result = str(wrapper)
+
+ expected = f"CertificateWrapper(subject={self.mock_cert.subject})"
+ self.assertEqual(result, expected)
+
+ def test_repr_representation_without_original(self):
+ """Test repr representation without original bytes."""
+ wrapper = CertificateWrapper(self.mock_cert)
+
+ result = repr(wrapper)
+
+ expected = f"CertificateWrapper(subject={self.mock_cert.subject}, has_original_bytes=False)"
+ self.assertEqual(result, expected)
+
+ def test_repr_representation_with_original(self):
+ """Test repr representation with original bytes."""
+ original_data = b"original_data"
+ wrapper = CertificateWrapper(self.mock_cert, original_data)
+
+ result = repr(wrapper)
+
+ expected = f"CertificateWrapper(subject={self.mock_cert.subject}, has_original_bytes=True)"
+ self.assertEqual(result, expected)
+
+ def test_pickling_support(self):
+ """Test that the wrapper supports pickling operations."""
+ original_data = b"test_data"
+ wrapper = CertificateWrapper(self.mock_cert, original_data)
+
+ # Test getstate
+ state = wrapper.__getstate__()
+ self.assertIsInstance(state, dict)
+ self.assertIn("_cert", state)
+ self.assertIn("_original_bytes", state)
+
+ # Test setstate
+ new_wrapper = CertificateWrapper(Mock(), None)
+ new_wrapper.__setstate__(state)
+ # Verify state was restored correctly through public interface
+ self.assertTrue(new_wrapper.has_original_bytes)
+ self.assertEqual(new_wrapper.original_bytes, original_data)
+
+ def test_wrap_certificate_function_without_original(self):
+ """Test the wrap_certificate factory function without original bytes."""
+ wrapper = wrap_certificate(self.mock_cert)
+
+ self.assertIsInstance(wrapper, CertificateWrapper)
+ self.assertFalse(wrapper.has_original_bytes)
+ self.assertIsNone(wrapper.original_bytes)
+
+ def test_wrap_certificate_function_with_original(self):
+ """Test the wrap_certificate factory function with original bytes."""
+ original_data = b"original_certificate_data"
+ wrapper = wrap_certificate(self.mock_cert, original_data)
+
+ self.assertIsInstance(wrapper, CertificateWrapper)
+ self.assertTrue(wrapper.has_original_bytes)
+ self.assertEqual(wrapper.original_bytes, original_data)
+
+ def test_real_malformed_certificate_handling(self):
+ """Test with a real malformed certificate that requires pyasn1 re-encoding."""
+ # This test simulates the scenario where a malformed certificate is processed
+
+ # Mock the scenario where cryptography fails but pyasn1 succeeds
+ mock_reencoded_cert = Mock(spec=cryptography.x509.Certificate)
+ mock_reencoded_cert.subject = Mock()
+ mock_reencoded_cert.subject.__str__ = Mock(return_value="CN=Nuvoton TPM")
+
+ # Create wrapper as if it came from the certificate loading process
+ wrapper = wrap_certificate(mock_reencoded_cert, self.malformed_cert_der)
+
+ # Test that original bytes are preserved
+ self.assertTrue(wrapper.has_original_bytes)
+ self.assertEqual(wrapper.original_bytes, self.malformed_cert_der)
+
+ # Test DER output uses original bytes
+ der_output = wrapper.public_bytes(Encoding.DER)
+ self.assertEqual(der_output, self.malformed_cert_der)
+
+ # Test PEM output is derived from original bytes
+ pem_output = wrapper.public_bytes(Encoding.PEM)
+ self.assertIsInstance(pem_output, bytes)
+
+ # Verify PEM can be converted back to original DER
+ pem_str = pem_output.decode("utf-8")
+ lines = pem_str.strip().split("\n")
+ content = "".join(lines[1:-1])
+ recovered_der = base64.b64decode(content)
+ self.assertEqual(recovered_der, self.malformed_cert_der)
+
+ def test_unsupported_encoding_fallback(self):
+ """Test that unsupported encoding types fall back to wrapped certificate."""
+ # Create a custom encoding that's not DER or PEM
+ custom_encoding = Mock()
+ custom_encoding.name = "CUSTOM"
+
+ original_data = b"original_data"
+ wrapper = CertificateWrapper(self.mock_cert, original_data)
+
+ # Should fall back to wrapped certificate for unknown encoding
+ wrapper.public_bytes(custom_encoding)
+ self.mock_cert.public_bytes.assert_called_once_with(custom_encoding)
+
+ def test_malformed_certificate_cryptography_failure_and_verification(self):
+ """
+ Comprehensive test demonstrating that the malformed certificate:
+ 1. Fails to load with python-cryptography
+ 2. Can be verified with OpenSSL
+ 3. Is successfully handled by our wrapper after pyasn1 re-encoding
+ """
+ # Test 1: Demonstrate that python-cryptography fails to load the malformed certificate
+ with self.assertRaises(Exception) as context:
+ cryptography.x509.load_der_x509_certificate(self.malformed_cert_der)
+
+ # The specific exception type may vary, but it should fail
+ self.assertIsInstance(context.exception, Exception)
+
+ # Test 2: Demonstrate that pyasn1 can handle the malformed certificate
+ try:
+ # Decode and re-encode using pyasn1 (simulating what the Certificate type does)
+ pyasn1_cert = pyasn1_decoder.decode(self.malformed_cert_der, asn1Spec=pyasn1_rfc2459.Certificate())[0]
+ reencoded_der = pyasn1_encoder.encode(pyasn1_cert)
+
+ # Now cryptography should be able to load the re-encoded certificate
+ reencoded_cert = cryptography.x509.load_der_x509_certificate(reencoded_der)
+ self.assertIsNotNone(reencoded_cert)
+
+ except Exception as e:
+ self.fail(f"pyasn1 should handle the malformed certificate, but got: {e}")
+
+ # Test 3: Verify that our wrapper preserves the original bytes correctly
+ wrapper = wrap_certificate(reencoded_cert, self.malformed_cert_der)
+
+ # The wrapper should preserve original bytes
+ self.assertTrue(wrapper.has_original_bytes)
+ self.assertEqual(wrapper.original_bytes, self.malformed_cert_der)
+
+ # DER output should use original bytes
+ der_output = wrapper.public_bytes(Encoding.DER)
+ self.assertEqual(der_output, self.malformed_cert_der)
+
+ # PEM output should be derived from original bytes
+ pem_output = wrapper.public_bytes(Encoding.PEM)
+ pem_str = pem_output.decode("utf-8")
+
+ # Verify PEM format is correct
+ self.assertTrue(pem_str.startswith("-----BEGIN CERTIFICATE-----"))
+ self.assertTrue(pem_str.endswith("-----END CERTIFICATE-----\n"))
+
+ # Test 4: Demonstrate OpenSSL can verify the certificate structure
+ # (Even without the root CA, OpenSSL should be able to parse the certificate)
+ try:
+ with tempfile.NamedTemporaryFile(mode="wb", suffix=".der", delete=False) as temp_file:
+ temp_file.write(self.malformed_cert_der)
+ temp_file.flush()
+
+ # Use OpenSSL to parse the certificate (should succeed)
+ result = subprocess.run(
+ ["openssl", "x509", "-in", temp_file.name, "-inform", "DER", "-text", "-noout"],
+ capture_output=True,
+ text=True,
+ check=False,
+ )
+
+ # OpenSSL should successfully parse the certificate
+ self.assertEqual(result.returncode, 0)
+ self.assertIn("Nuvoton TPM Root CA 2111", result.stdout)
+ self.assertIn("Certificate:", result.stdout)
+
+ except (subprocess.CalledProcessError, FileNotFoundError) as e:
+ # Skip if OpenSSL is not available, but don't fail the test
+ self.skipTest(f"OpenSSL not available for verification test: {e}")
+
+ # Test 5: Verify certificate details are accessible through wrapper
+ # The subject should be empty (as shown in the OpenSSL output)
+ self.assertEqual(len(reencoded_cert.subject), 0)
+
+ # The issuer should contain Nuvoton information
+ issuer_attrs = {}
+ for attr in reencoded_cert.issuer:
+ # Use dotted string representation to avoid accessing private _name
+ oid_name = attr.oid.dotted_string
+ if oid_name == "2.5.4.3": # Common Name OID
+ issuer_attrs["commonName"] = attr.value
+ self.assertIn("commonName", issuer_attrs)
+ self.assertEqual(issuer_attrs["commonName"], "Nuvoton TPM Root CA 2111")
+
+ # Test 6: Demonstrate that even re-encoded certificates may have parsing issues
+ # This shows why preserving original bytes is crucial
+ try:
+ # Try to access extensions - this may fail due to malformed ASN.1
+ extensions = list(reencoded_cert.extensions)
+ # If it succeeds, verify it has the expected Subject Alternative Name
+ # Subject Alternative Name OID is 2.5.29.17
+ has_subject_alt_name = any(ext.oid.dotted_string == "2.5.29.17" for ext in extensions)
+ self.assertTrue(has_subject_alt_name, "EK certificate should have Subject Alternative Name extension")
+ except (ValueError, Exception) as e:
+ # This is actually expected for malformed certificates!
+ # Even after pyasn1 re-encoding, some parsing issues may remain
+ self.assertIn("parsing asn1", str(e).lower(), f"Expected ASN.1 parsing error, got: {e}")
+ # This demonstrates why our wrapper preserves original bytes -
+ # they maintain signature validity even when parsing has issues
+
+ def test_certificate_chain_verification_simulation(self):
+ """
+ Test that simulates certificate chain verification where original bytes matter.
+ This demonstrates why preserving original bytes is crucial for signature validation.
+ """
+ # Create a wrapper with the malformed certificate
+ mock_reencoded_cert = Mock(spec=cryptography.x509.Certificate)
+ mock_reencoded_cert.subject = Mock()
+ mock_reencoded_cert.public_key.return_value = Mock()
+
+ wrapper = wrap_certificate(mock_reencoded_cert, self.malformed_cert_der)
+
+ # Simulate signature verification scenario
+ # In real verification, the signature is computed over the exact DER bytes
+ original_bytes_for_verification = wrapper.public_bytes(Encoding.DER)
+
+ # Should get the original malformed bytes (preserving signature validity)
+ self.assertEqual(original_bytes_for_verification, self.malformed_cert_der)
+
+ # If we didn't preserve original bytes, we'd get re-encoded bytes which would
+ # invalidate the signature even though the certificate content is the same
+ mock_reencoded_cert.public_bytes.return_value = b"reencoded_different_bytes"
+
+ # Verify that using the wrapper gets original bytes, not re-encoded bytes
+ self.assertNotEqual(original_bytes_for_verification, b"reencoded_different_bytes")
+ self.assertEqual(original_bytes_for_verification, self.malformed_cert_der)
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/test/test_registrar_agent_cert_compliance.py b/test/test_registrar_agent_cert_compliance.py
new file mode 100644
index 0000000..ede9b9f
--- /dev/null
+++ b/test/test_registrar_agent_cert_compliance.py
@@ -0,0 +1,289 @@
+"""
+Integration tests for RegistrarAgent certificate compliance functionality.
+
+This module tests the simplified certificate compliance checking methods
+to ensure they work correctly with the new CertificateWrapper-based approach.
+"""
+
+import types
+import unittest
+from unittest.mock import Mock, patch
+
+import cryptography.x509
+
+from keylime.certificate_wrapper import wrap_certificate
+from keylime.models.base.types.certificate import Certificate
+from keylime.models.registrar.registrar_agent import RegistrarAgent
+
+
+class TestRegistrarAgentCertCompliance(unittest.TestCase):
+ """Test cases for RegistrarAgent certificate compliance methods."""
+
+ # pylint: disable=protected-access,not-callable # Testing protected methods and dynamic method binding
+
+ def setUp(self):
+ """Set up test fixtures."""
+ # Create a test certificate
+ self.valid_cert_pem = """-----BEGIN CERTIFICATE-----
+MIIEnzCCA4egAwIBAgIEMV64bDANBgkqhkiG9w0BAQUFADBtMQswCQYDVQQGEwJE
+RTEQMA4GA1UECBMHQmF2YXJpYTEhMB8GA1UEChMYSW5maW5lb24gVGVjaG5vbG9n
+aWVzIEFHMQwwCgYDVQQLEwNBSU0xGzAZBgNVBAMTEklGWCBUUE0gRUsgUm9vdCBD
+QTAeFw0wNTEwMjAxMzQ3NDNaFw0yNTEwMjAxMzQ3NDNaMHcxCzAJBgNVBAYTAkRF
+MQ8wDQYDVQQIEwZTYXhvbnkxITAfBgNVBAoTGEluZmluZW9uIFRlY2hub2xvZ2ll
+cyBBRzEMMAoGA1UECxMDQUlNMSYwJAYDVQQDEx1JRlggVFBNIEVLIEludGVybWVk
+aWF0ZSBDQSAwMTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBALftPhYN
+t4rE+JnU/XOPICbOBLvfo6iA7nuq7zf4DzsAWBdsZEdFJQfaK331ihG3IpQnlQ2i
+YtDim289265f0J4OkPFpKeFU27CsfozVaNUm6UR/uzwA8ncxFc3iZLRMRNLru/Al
+VG053ULVDQMVx2iwwbBSAYO9pGiGbk1iMmuZaSErMdb9v0KRUyZM7yABiyDlM3cz
+UQX5vLWV0uWqxdGoHwNva5u3ynP9UxPTZWHZOHE6+14rMzpobs6Ww2RR8BgF96rh
+4rRAZEl8BXhwiQq4STvUXkfvdpWH4lzsGcDDtrB6Nt3KvVNvsKz+b07Dk+Xzt+EH
+NTf3Byk2HlvX+scCAwEAAaOCATswggE3MB0GA1UdDgQWBBQ4k8292HPEIzMV4bE7
+qWoNI8wQxzAOBgNVHQ8BAf8EBAMCAgQwEgYDVR0TAQH/BAgwBgEB/wIBADBYBgNV
+HSABAf8ETjBMMEoGC2CGSAGG+EUBBy8BMDswOQYIKwYBBQUHAgEWLWh0dHA6Ly93
+d3cudmVyaXNpZ24uY29tL3JlcG9zaXRvcnkvaW5kZXguaHRtbDCBlwYDVR0jBIGP
+MIGMgBRW65FEhWPWcrOu1EWWC/eUDlRCpqFxpG8wbTELMAkGA1UEBhMCREUxEDAO
+BgNVBAgTB0JhdmFyaWExITAfBgNVBAoTGEluZmluZW9uIFRlY2hub2xvZ2llcyBB
+RzEMMAoGA1UECxMDQUlNMRswGQYDVQQDExJJRlggVFBNIEVLIFJvb3QgQ0GCAQMw
+DQYJKoZIhvcNAQEFBQADggEBABJ1+Ap3rNlxZ0FW0aIgdzktbNHlvXWNxFdYIBbM
+OKjmbOos0Y4O60eKPu259XmMItCUmtbzF3oKYXq6ybARUT2Lm+JsseMF5VgikSlU
+BJALqpKVjwAds81OtmnIQe2LSu4xcTSavpsL4f52cUAu/maMhtSgN9mq5roYptq9
+DnSSDZrX4uYiMPl//rBaNDBflhJ727j8xo9CCohF3yQUoQm7coUgbRMzyO64yMIO
+3fhb+Vuc7sNwrMOz3VJN14C3JMoGgXy0c57IP/kD5zGRvljKEvrRC2I147+fPeLS
+DueRMS6lblvRKiZgmGAg7YaKOkOaEmVDMQ+fTo2Po7hI5wc=
+-----END CERTIFICATE-----"""
+
+ self.valid_cert = cryptography.x509.load_pem_x509_certificate(self.valid_cert_pem.encode())
+
+ # Malformed certificate that actually requires pyasn1 re-encoding
+ self.malformed_cert_b64 = (
+ "MIIDUjCCAvegAwIBAgILAI5xYHQ14nH5hdYwCgYIKoZIzj0EAwIwVTFTMB8GA1UEAxMYTnV2b3Rv"
+ "biBUUE0gUm9vdCBDQSAyMTExMCUGA1UEChMeTnV2b3RvbiBUZWNobm9sb2d5IENvcnBvcmF0aW9u"
+ "MAkGA1UEBhMCVFcwHhcNMTkwNzIzMTcxNTEzWhcNMzkwNzE5MTcxNTEzWjAAMIIBIjANBgkqhkiG"
+ "9w0BAQEFAAOCAQ8AMIIBCgKCAQEAk8kCj7srY/Zlvm1795fVXdyX44w5qsd1m5VywMDgSOavzPKO"
+ "kgbHgQNx6Ak5+4Q43EJ/5qsaDBv59F8W7K69maUwcMNq1xpuq0V/LiwgJVAtc3CdvlxtwQrn7+Uq"
+ "ieIGf+i8sGxpeUCSmYHJPTHNHqjQnvUtdGoy/+WO0i7WsAvX3k/gHHr4p58a8urjJ1RG2Lk1g48D"
+ "ESwl+D7atQEPWzgjr6vK/s5KpLrn7M+dh97TUbG1510AOWBPP35MtT8IZbqC4hs2Ol16gT1M3a9e"
+ "+GaMZkItLUwV76vKDNEgTZG8M1C9OItA/xwzlfXbPepzpxWb4kzHS4qZoQtl4vBZrQIDAQABo4IB"
+ "NjCCATIwUAYDVR0RAQH/BEYwRKRCMEAxPjAUBgVngQUCARMLaWQ6NEU1NDQzMDAwEAYFZ4EFAgIT"
+ "B05QQ1Q3NXgwFAYFZ4EFAgMTC2lkOjAwMDcwMDAyMAwGA1UdEwEB/wQCMAAwEAYDVR0lBAkwBwYF"
+ "Z4EFCAEwHwYDVR0jBBgwFoAUI/TiKtO+N0pEl3KVSqKDrtdSVy4wDgYDVR0PAQH/BAQDAgUgMCIG"
+ "A1UdCQQbMBkwFwYFZ4EFAhAxDjAMDAMyLjACAQACAgCKMGkGCCsGAQUFBwEBBF0wWzBZBggrBgEF"
+ "BQcwAoZNaHR0cHM6Ly93d3cubnV2b3Rvbi5jb20vc2VjdXJpdHkvTlRDLVRQTS1FSy1DZXJ0L051"
+ "dm90b24gVFBNIFJvb3QgQ0EgMjExMS5jZXIwCgYIKoZIzj0EAwIDSQAwRgIhAPHOFiBDZd0dfml2"
+ "a/KlPFhmX7Ahpd0Wq11ZUW1/ixviAiEAlex8BB5nsR6w8QrANwCxc7fH/YnbjXfMCFiWzeZH7ps="
+ )
+
+ # Create wrapped certificates for testing using Certificate type to ensure proper behavior
+ cert_type = Certificate()
+
+ # Create compliant certificate (no original bytes needed)
+ self.compliant_wrapped_cert = wrap_certificate(self.valid_cert, None)
+
+ # Create non-compliant certificate using the malformed cert data
+ self.non_compliant_wrapped_cert = cert_type.cast(self.malformed_cert_b64)
+
+ def create_mock_registrar_agent(self):
+ """Create a mock RegistrarAgent with necessary attributes."""
+ agent = Mock()
+ agent.changes = {}
+ agent.values = {}
+ agent._add_error = Mock()
+
+ # Bind the actual methods to the mock instance
+ agent._check_cert_compliance = types.MethodType(RegistrarAgent._check_cert_compliance, agent)
+ agent._check_all_cert_compliance = types.MethodType(RegistrarAgent._check_all_cert_compliance, agent)
+
+ return agent
+
+ def test_check_cert_compliance_no_new_cert(self):
+ """Test _check_cert_compliance when no new certificate is provided."""
+ agent = self.create_mock_registrar_agent()
+ agent.changes = {} # No new certificate
+
+ result = agent._check_cert_compliance("ekcert")
+ self.assertTrue(result)
+ agent._add_error.assert_not_called()
+
+ def test_check_cert_compliance_same_cert(self):
+ """Test _check_cert_compliance when new cert is same as old cert."""
+ agent = self.create_mock_registrar_agent()
+ agent.changes = {"ekcert": self.compliant_wrapped_cert}
+ agent.values = {"ekcert": self.compliant_wrapped_cert}
+
+ result = agent._check_cert_compliance("ekcert")
+ self.assertTrue(result)
+ agent._add_error.assert_not_called()
+
+ def test_check_cert_compliance_different_cert_same_der(self):
+ """Test _check_cert_compliance when certificates have same DER bytes."""
+ agent = self.create_mock_registrar_agent()
+ # Create two different wrapper objects but with same underlying certificate
+ cert1 = wrap_certificate(self.valid_cert, None)
+ cert2 = wrap_certificate(self.valid_cert, None)
+
+ agent.changes = {"ekcert": cert1}
+ agent.values = {"ekcert": cert2}
+
+ result = agent._check_cert_compliance("ekcert")
+ self.assertTrue(result)
+ agent._add_error.assert_not_called()
+
+ @patch("keylime.config.get")
+ def test_check_cert_compliance_compliant_cert(self, mock_config):
+ """Test _check_cert_compliance with ASN.1 compliant certificate."""
+ mock_config.return_value = "warn" # Default action
+
+ agent = self.create_mock_registrar_agent()
+ agent.changes = {"ekcert": self.compliant_wrapped_cert}
+ agent.values = {} # No old certificate
+
+ result = agent._check_cert_compliance("ekcert")
+ self.assertTrue(result)
+ agent._add_error.assert_not_called()
+
+ @patch("keylime.config.get")
+ def test_check_cert_compliance_non_compliant_cert_warn(self, mock_config):
+ """Test _check_cert_compliance with non-compliant certificate (warn mode)."""
+ mock_config.return_value = "warn" # Warn action
+
+ agent = self.create_mock_registrar_agent()
+ agent.changes = {"ekcert": self.non_compliant_wrapped_cert}
+ agent.values = {} # No old certificate
+
+ result = agent._check_cert_compliance("ekcert")
+ self.assertFalse(result)
+ agent._add_error.assert_not_called() # Should not add error in warn mode
+
+ @patch("keylime.config.get")
+ def test_check_cert_compliance_non_compliant_cert_reject(self, mock_config):
+ """Test _check_cert_compliance with non-compliant certificate (reject mode)."""
+ mock_config.return_value = "reject" # Reject action
+
+ agent = self.create_mock_registrar_agent()
+ agent.changes = {"ekcert": self.non_compliant_wrapped_cert}
+ agent.values = {} # No old certificate
+
+ result = agent._check_cert_compliance("ekcert")
+ self.assertFalse(result)
+ agent._add_error.assert_called_once() # Should add error in reject mode
+
+ @patch("keylime.config.get")
+ def test_check_all_cert_compliance_no_non_compliant(self, mock_config):
+ """Test _check_all_cert_compliance when all certificates are compliant."""
+ mock_config.return_value = "warn"
+
+ agent = self.create_mock_registrar_agent()
+ agent.changes = {
+ "ekcert": self.compliant_wrapped_cert,
+ "iak_cert": self.compliant_wrapped_cert,
+ }
+ agent.values = {}
+
+ # Should not raise any exceptions or log warnings
+ with patch("keylime.models.registrar.registrar_agent.logger") as mock_logger:
+ agent._check_all_cert_compliance()
+ mock_logger.warning.assert_not_called()
+ mock_logger.error.assert_not_called()
+
+ @patch("keylime.config.get")
+ def test_check_all_cert_compliance_with_non_compliant_warn(self, mock_config):
+ """Test _check_all_cert_compliance with non-compliant certificates (warn mode)."""
+ mock_config.return_value = "warn"
+
+ agent = self.create_mock_registrar_agent()
+ agent.changes = {
+ "ekcert": self.non_compliant_wrapped_cert,
+ "iak_cert": self.compliant_wrapped_cert,
+ "idevid_cert": self.non_compliant_wrapped_cert,
+ }
+ agent.values = {}
+
+ with patch("keylime.models.registrar.registrar_agent.logger") as mock_logger:
+ agent._check_all_cert_compliance()
+ # Should log warning for non-compliant certificates
+ mock_logger.warning.assert_called_once()
+ format_string = mock_logger.warning.call_args[0][0]
+ cert_names = mock_logger.warning.call_args[0][1]
+ self.assertIn("Certificate(s) %s may not conform", format_string)
+ self.assertEqual("'ekcert' and 'idevid_cert'", cert_names)
+
+ @patch("keylime.config.get")
+ def test_check_all_cert_compliance_with_non_compliant_reject(self, mock_config):
+ """Test _check_all_cert_compliance with non-compliant certificates (reject mode)."""
+ mock_config.return_value = "reject"
+
+ agent = self.create_mock_registrar_agent()
+ agent.changes = {
+ "ekcert": self.non_compliant_wrapped_cert,
+ "mtls_cert": self.non_compliant_wrapped_cert,
+ }
+ agent.values = {}
+
+ with patch("keylime.models.registrar.registrar_agent.logger") as mock_logger:
+ agent._check_all_cert_compliance()
+ # Should log error for non-compliant certificates
+ mock_logger.error.assert_called_once()
+ format_string = mock_logger.error.call_args[0][0]
+ cert_names = mock_logger.error.call_args[0][1]
+ self.assertIn("Certificate(s) %s may not conform", format_string)
+ self.assertIn("were rejected due to config", format_string)
+ self.assertEqual("'ekcert' and 'mtls_cert'", cert_names)
+
+ @patch("keylime.config.get")
+ def test_check_all_cert_compliance_ignore_mode(self, mock_config):
+ """Test _check_all_cert_compliance with ignore mode."""
+ mock_config.return_value = "ignore"
+
+ agent = self.create_mock_registrar_agent()
+ agent.changes = {
+ "ekcert": self.non_compliant_wrapped_cert,
+ "iak_cert": self.non_compliant_wrapped_cert,
+ }
+ agent.values = {}
+
+ with patch("keylime.models.registrar.registrar_agent.logger") as mock_logger:
+ agent._check_all_cert_compliance()
+ # Should not log anything in ignore mode
+ mock_logger.warning.assert_not_called()
+ mock_logger.error.assert_not_called()
+
+ def test_check_all_cert_compliance_single_non_compliant(self):
+ """Test _check_all_cert_compliance message formatting for single certificate."""
+ agent = self.create_mock_registrar_agent()
+ agent.changes = {"ekcert": self.non_compliant_wrapped_cert}
+ agent.values = {}
+
+ with patch("keylime.config.get", return_value="warn"):
+ with patch("keylime.models.registrar.registrar_agent.logger") as mock_logger:
+ agent._check_all_cert_compliance()
+ # Should format message correctly for single certificate
+ format_string = mock_logger.warning.call_args[0][0]
+ cert_names = mock_logger.warning.call_args[0][1]
+ self.assertIn("Certificate(s) %s may not conform", format_string)
+ self.assertEqual("'ekcert'", cert_names)
+ self.assertNotIn(" and", cert_names) # Should not have "and" for single cert
+
+ def test_field_names_coverage(self):
+ """Test that all expected certificate field names are checked."""
+ agent = self.create_mock_registrar_agent()
+ agent.changes = {
+ "ekcert": self.non_compliant_wrapped_cert,
+ "iak_cert": self.non_compliant_wrapped_cert,
+ "idevid_cert": self.non_compliant_wrapped_cert,
+ "mtls_cert": self.non_compliant_wrapped_cert,
+ }
+ agent.values = {}
+
+ with patch("keylime.config.get", return_value="warn"):
+ with patch("keylime.models.registrar.registrar_agent.logger") as mock_logger:
+ agent._check_all_cert_compliance()
+ # Should check all four certificate fields
+ format_string = mock_logger.warning.call_args[0][0]
+ cert_names = mock_logger.warning.call_args[0][1]
+ self.assertIn("Certificate(s) %s may not conform", format_string)
+ expected_names = "'ekcert', 'iak_cert', 'idevid_cert' and 'mtls_cert'"
+ self.assertEqual(expected_names, cert_names)
+
+
+if __name__ == "__main__":
+ unittest.main()