diff --git a/0013-fix-malformed-certs-workaround.patch b/0013-fix-malformed-certs-workaround.patch index 6cd5ff5..05c3f41 100644 --- a/0013-fix-malformed-certs-workaround.patch +++ b/0013-fix-malformed-certs-workaround.patch @@ -1,428 +1,1265 @@ -From 266ce8caed568e20e0521f93a0c03798ca9915f7 Mon Sep 17 00:00:00 2001 -From: Anderson Toshiyuki Sasaki -Date: Wed, 20 Aug 2025 13:40:36 +0200 -Subject: [PATCH 1/2] models: Do not re-encode certificate stored in DB - -Modify the Certificate class to store the original certificate bits to -the database instead of storing the possibly re-encoded certificate. - -The python-cryptography ASN.1 parser is strict and does not accept -malformed certificates. This makes it impossible to use some affected -devices, notably TPM certificates from Nuvoton. To workaround this, the -certificate is re-encoded using pyasn1, but this effectively modify the -certificate making its signature invalid. To avoid storing invalid -certificates to the database, the original bits of the certificate (as -received from the agent) are cached and later used when storing the -certificate into the database. - -Assisted-by: Claude 4 Sonnet -Signed-off-by: Anderson Toshiyuki Sasaki - -fix ---- - keylime/models/base/types/certificate.py | 65 ++++++- - keylime/models/registrar/registrar_agent.py | 6 + - test/test_certificate_preservation.py | 197 ++++++++++++++++++++ - 3 files changed, 264 insertions(+), 4 deletions(-) - create mode 100644 test/test_certificate_preservation.py - +diff --git a/keylime/certificate_wrapper.py b/keylime/certificate_wrapper.py +new file mode 100644 +index 0000000..899a19a +--- /dev/null ++++ b/keylime/certificate_wrapper.py +@@ -0,0 +1,99 @@ ++""" ++X.509 Certificate wrapper that preserves original bytes for malformed certificates. ++ ++This module provides a wrapper around cryptography.x509.Certificate that preserves ++the original certificate bytes when the certificate required pyasn1 re-encoding ++due to ASN.1 DER non-compliance. This ensures signature validity is maintained ++throughout the database lifecycle. ++""" ++ ++import base64 ++from typing import Any, Dict, Optional ++ ++import cryptography.x509 ++from cryptography.hazmat.primitives.serialization import Encoding ++ ++ ++class CertificateWrapper: ++ """ ++ A wrapper around cryptography.x509.Certificate that preserves original bytes ++ when malformed certificates require pyasn1 re-encoding. ++ ++ This class wraps a cryptography.x509.Certificate and adds the ability ++ to store the original certificate bytes when the certificate was malformed ++ and required re-encoding using pyasn1. This ensures that signature validation ++ works correctly even for certificates that don't strictly follow ASN.1 DER. ++ """ ++ ++ def __init__(self, cert: cryptography.x509.Certificate, original_bytes: Optional[bytes] = None): ++ """ ++ Initialize the wrapper certificate. ++ ++ :param cert: The cryptography.x509.Certificate object ++ :param original_bytes: The original DER bytes if certificate was re-encoded, None otherwise ++ """ ++ self._cert = cert ++ self._original_bytes = original_bytes ++ ++ def __getattr__(self, name: str) -> Any: ++ """Delegate attribute access to the wrapped certificate.""" ++ return getattr(self._cert, name) ++ ++ def __setstate__(self, state: Dict[str, Any]) -> None: ++ """Support for pickling.""" ++ self.__dict__.update(state) ++ ++ def __getstate__(self) -> Dict[str, Any]: ++ """Support for pickling.""" ++ return self.__dict__ ++ ++ @property ++ def has_original_bytes(self) -> bool: ++ """Check if this certificate has preserved original bytes.""" ++ return self._original_bytes is not None ++ ++ @property ++ def original_bytes(self) -> Optional[bytes]: ++ """Return the preserved original bytes if available.""" ++ return self._original_bytes ++ ++ def public_bytes(self, encoding: Encoding) -> bytes: ++ """ ++ Return certificate bytes, using original bytes when available. ++ ++ For certificates with preserved original bytes, this method always uses ++ the original DER bytes to maintain signature validity. For PEM encoding, ++ it converts the original DER bytes to PEM format. ++ """ ++ if self.has_original_bytes: ++ if encoding == Encoding.DER: ++ return self._original_bytes # type: ignore[return-value] ++ if encoding == Encoding.PEM: ++ # Convert original DER bytes to PEM format ++ der_b64 = base64.b64encode(self._original_bytes).decode("utf-8") # type: ignore[arg-type] ++ # Split into 64-character lines per PEM specification (RFC 1421) ++ lines = [der_b64[i : i + 64] for i in range(0, len(der_b64), 64)] ++ # Create PEM format with proper headers ++ pem_content = "\n".join(["-----BEGIN CERTIFICATE-----"] + lines + ["-----END CERTIFICATE-----"]) + "\n" ++ return pem_content.encode("utf-8") ++ ++ # For certificates without original bytes, use standard method ++ return self._cert.public_bytes(encoding) ++ ++ # Delegate common certificate methods to maintain full compatibility ++ def __str__(self) -> str: ++ return f"CertificateWrapper(subject={self._cert.subject})" ++ ++ def __repr__(self) -> str: ++ return f"CertificateWrapper(subject={self._cert.subject}, has_original_bytes={self.has_original_bytes})" ++ ++ ++def wrap_certificate(cert: cryptography.x509.Certificate, original_bytes: Optional[bytes] = None) -> CertificateWrapper: ++ """ ++ Factory function to create a wrapped certificate. ++ ++ :param cert: The cryptography.x509.Certificate object ++ :param original_bytes: The original DER bytes if certificate was re-encoded ++ :returns: Wrapped certificate that preserves original bytes ++ """ ++ return CertificateWrapper(cert, original_bytes) diff --git a/keylime/models/base/types/certificate.py b/keylime/models/base/types/certificate.py -index 2c27603ba..551f73b1c 100644 +index 0f03169..f6cdd48 100644 --- a/keylime/models/base/types/certificate.py +++ b/keylime/models/base/types/certificate.py -@@ -10,6 +10,7 @@ - from pyasn1.error import PyAsn1Error - from pyasn1_modules import pem as pyasn1_pem +@@ -12,6 +12,7 @@ from pyasn1_modules import pem as pyasn1_pem from pyasn1_modules import rfc2459 as pyasn1_rfc2459 -+from sqlalchemy.engine.interfaces import Dialect from sqlalchemy.types import Text ++from keylime.certificate_wrapper import CertificateWrapper, wrap_certificate from keylime.models.base.type import ModelType -@@ -82,6 +83,8 @@ def _schema(self): + + +@@ -78,19 +79,20 @@ class Certificate(ModelType): + cert = Certificate().cast("-----BEGIN CERTIFICATE-----\nMIIE...") + """ + +- IncomingValue = Union[cryptography.x509.Certificate, bytes, str, None] ++ IncomingValue = Union[cryptography.x509.Certificate, CertificateWrapper, bytes, str, None] def __init__(self) -> None: super().__init__(Text) -+ # Instance-level cache for original certificate bytes when re-encoding is needed -+ self._original_bytes_cache: Optional[bytes] = None - def _load_der_cert(self, der_cert_data: bytes) -> cryptography.x509.Certificate: - """Loads a binary x509 certificate encoded using ASN.1 DER as a ``cryptography.x509.Certificate`` object. This -@@ -112,8 +115,11 @@ def _load_der_cert(self, der_cert_data: bytes) -> cryptography.x509.Certificate: +- def _load_der_cert(self, der_cert_data: bytes) -> cryptography.x509.Certificate: +- """Loads a binary x509 certificate encoded using ASN.1 DER as a ``cryptography.x509.Certificate`` object. This ++ def _load_der_cert(self, der_cert_data: bytes) -> CertificateWrapper: ++ """Loads a binary x509 certificate encoded using ASN.1 DER as a ``CertificateWrapper`` object. This + method does not require strict adherence to ASN.1 DER thereby making it possible to accept certificates which do + not follow every detail of the spec (this is the case for a number of TPM certs) [1,2]. + + It achieves this by first using the strict parser provided by python-cryptography. If that fails, it decodes the + certificate and re-encodes it using the more-forgiving pyasn1 library. The re-encoded certificate is then +- re-parsed by python-cryptography. ++ re-parsed by python-cryptography. For malformed certificates requiring re-encoding, the original bytes are ++ preserved in the wrapper to maintain signature validity. + + This method is equivalent to the ``cert_utils.x509_der_cert`` function but does not produce a warning when the + backup parser is used, allowing this condition to be optionally detected and handled by the model where +@@ -106,24 +108,28 @@ class Certificate(ModelType): + + :raises: :class:`SubstrateUnderrunError`: cert could not be deserialized even using the fallback pyasn1 parser + +- :returns: A ``cryptography.x509.Certificate`` object ++ :returns: A ``CertificateWrapper`` object + """ + try: - return cryptography.x509.load_der_x509_certificate(der_cert_data) +- return cryptography.x509.load_der_x509_certificate(der_cert_data) ++ cert = cryptography.x509.load_der_x509_certificate(der_cert_data) ++ return wrap_certificate(cert, None) except Exception: -+ # Store original bytes before re-encoding to preserve signature validity pyasn1_cert = pyasn1_decoder.decode(der_cert_data, asn1Spec=pyasn1_rfc2459.Certificate())[0] - return cryptography.x509.load_der_x509_certificate(pyasn1_encoder.encode(pyasn1_cert)) + cert = cryptography.x509.load_der_x509_certificate(pyasn1_encoder.encode(pyasn1_cert)) -+ self._original_bytes_cache = base64.b64encode(der_cert_data) -+ return cert ++ # Preserve the original bytes when re-encoding is necessary ++ return wrap_certificate(cert, der_cert_data) - def _load_pem_cert(self, pem_cert_data: str) -> cryptography.x509.Certificate: +- def _load_pem_cert(self, pem_cert_data: str) -> cryptography.x509.Certificate: ++ def _load_pem_cert(self, pem_cert_data: str) -> CertificateWrapper: """Loads a text x509 certificate encoded using PEM (Base64ed DER with header and footer) as a -@@ -135,7 +141,7 @@ def _load_pem_cert(self, pem_cert_data: str) -> cryptography.x509.Certificate: +- ``cryptography.x509.Certificate`` object. This method does not require strict adherence to ASN.1 DER thereby ++ ``CertificateWrapper`` object. This method does not require strict adherence to ASN.1 DER thereby + making it possible to accept certificates which do not follow every detail of the spec (this is the case for + a number of TPM certs) [1,2]. + + It achieves this by first using the strict parser provided by python-cryptography. If that fails, it decodes the + certificate and re-encodes it using the more-forgiving pyasn1 library. The re-encoded certificate is then +- re-parsed by python-cryptography. ++ re-parsed by python-cryptography. For malformed certificates requiring re-encoding, the original DER bytes are ++ preserved in the wrapper to maintain signature validity. + + This method is equivalent to the ``cert_utils.x509_der_cert`` function but does not produce a warning when the + backup parser is used, allowing this condition to be optionally detected and handled by the model where +@@ -135,19 +141,24 @@ class Certificate(ModelType): [2] https://github.com/pyca/cryptography/issues/7189 [3] https://github.com/keylime/keylime/issues/1559 - :param der_cert_data: the DER bytes of the certificate -+ :param pem_cert_data: the PEM bytes of the certificate ++ :param pem_cert_data: the PEM text of the certificate :raises: :class:`SubstrateUnderrunError`: cert could not be deserialized even using the fallback pyasn1 parser -@@ -146,8 +152,13 @@ def _load_pem_cert(self, pem_cert_data: str) -> cryptography.x509.Certificate: - return cryptography.x509.load_pem_x509_certificate(pem_cert_data.encode("utf-8")) +- :returns: A ``cryptography.x509.Certificate`` object ++ :returns: A ``CertificateWrapper`` object + """ + + try: +- return cryptography.x509.load_pem_x509_certificate(pem_cert_data.encode("utf-8")) ++ cert = cryptography.x509.load_pem_x509_certificate(pem_cert_data.encode("utf-8")) ++ return wrap_certificate(cert, None) except Exception: der_data = pyasn1_pem.readPemFromFile(io.StringIO(pem_cert_data)) -+ # Store original DER bytes before re-encoding to preserve signature validity pyasn1_cert = pyasn1_decoder.decode(der_data, asn1Spec=pyasn1_rfc2459.Certificate())[0] - return cryptography.x509.load_der_x509_certificate(pyasn1_encoder.encode(pyasn1_cert)) + cert = cryptography.x509.load_der_x509_certificate(pyasn1_encoder.encode(pyasn1_cert)) -+ # Only store if we have valid DER bytes (not empty string) -+ if isinstance(der_data, bytes) and der_data: -+ self._original_bytes_cache = base64.b64encode(der_data) -+ return cert ++ # Only preserve original bytes if we have valid DER data ++ original_bytes = der_data if isinstance(der_data, bytes) and der_data else None ++ # Preserve the original bytes when re-encoding is necessary ++ return wrap_certificate(cert, original_bytes) def infer_encoding(self, value: IncomingValue) -> Optional[str]: """Tries to infer the certificate encoding from the given value based on the data type and other surface-level -@@ -269,7 +280,13 @@ def _dump(self, value: IncomingValue) -> Optional[str]: +@@ -159,15 +170,21 @@ class Certificate(ModelType): + :returns: ``"der"`` when the value appears to be DER encoded + :returns: ``"pem"`` when the value appears to be PEM encoded + :returns: ``"base64"`` when the value appears to be Base64(DER) encoded (without PEM headers) ++ :returns: ``"wrapped"`` when the value is already a ``CertificateWrapper`` object + :returns: ``"decoded"`` when the value is already a ``cryptography.x509.Certificate`` object ++ :returns: ``"disabled"`` when the value is the string "disabled" + :returns: ``None`` when the encoding cannot be inferred + """ + # pylint: disable=no-else-return + +- if isinstance(value, cryptography.x509.Certificate): ++ if isinstance(value, CertificateWrapper): ++ return "wrapped" ++ elif isinstance(value, cryptography.x509.Certificate): + return "decoded" + elif isinstance(value, bytes): + return "der" ++ elif isinstance(value, str) and value == "disabled": ++ return "disabled" + elif isinstance(value, str) and value.startswith("-----BEGIN CERTIFICATE-----"): + return "pem" + elif isinstance(value, str): +@@ -190,19 +207,25 @@ class Certificate(ModelType): + :param value: The value in DER, Base64(DER), or PEM format (or an already deserialized certificate object) + + :returns: ``"True"`` if the value can be deserialized by python-cryptography and is ASN.1 DER compliant ++ :returns: ``"True"`` if the value is the string "disabled" (considered compliant as it's a valid field value) + :returns: ``"False"`` if the value cannot be deserialized by python-cryptography + :returns: ``None`` if the value is already a deserialized certificate of type ``cryptography.x509.Certificate`` + """ + + try: + encoding_inf = self.infer_encoding(value) ++ if encoding_inf == "wrapped": ++ # For CertificateWrapper objects, check if they have original bytes (indicating re-encoding was needed) ++ return not value.has_original_bytes # type: ignore[union-attr] + if encoding_inf == "decoded": + return None ++ if encoding_inf == "disabled": ++ return True + + if encoding_inf == "der": + cryptography.x509.load_der_x509_certificate(value) # type: ignore[reportArgumentType, arg-type] + elif encoding_inf == "pem": +- cryptography.x509.load_pem_x509_certificate(value) # type: ignore[reportArgumentType, arg-type] ++ cryptography.x509.load_pem_x509_certificate(value.encode("utf-8")) # type: ignore[reportArgumentType, arg-type, union-attr] + elif encoding_inf == "base64": + der_value = base64.b64decode(value, validate=True) # type: ignore[reportArgumentType, arg-type] + cryptography.x509.load_der_x509_certificate(der_value) +@@ -213,25 +236,27 @@ class Certificate(ModelType): + + return True + +- def cast(self, value: IncomingValue) -> Optional[cryptography.x509.Certificate]: ++ def cast(self, value: IncomingValue) -> Optional[CertificateWrapper]: + """Tries to interpret the given value as an X.509 certificate and convert it to a +- ``cryptography.x509.Certificate`` object. Values which do not require conversion are returned unchanged. ++ ``CertificateWrapper`` object. Values which do not require conversion are returned unchanged. + + :param value: The value to convert (may be in DER, Base64(DER), or PEM format) + + :raises: :class:`TypeError`: ``value`` is of an unexpected data type + :raises: :class:`ValueError`: ``value`` does not contain data which is interpretable as a certificate + +- :returns: A ``cryptography.x509.Certificate`` object or None if an empty value is given ++ :returns: A ``CertificateWrapper`` object or None if an empty value is given + """ + + if not value: + return None + + encoding_inf = self.infer_encoding(value) ++ if encoding_inf == "wrapped": ++ return value # type: ignore[return-value] + if encoding_inf == "decoded": +- return value # type: ignore[reportReturnType, return-value] +- ++ # Wrap raw cryptography certificate without original bytes ++ return wrap_certificate(value, None) # type: ignore[arg-type] + if encoding_inf == "der": + try: + return self._load_der_cert(value) # type: ignore[reportArgumentType, arg-type] +@@ -271,7 +296,6 @@ class Certificate(ModelType): if not cert: return None - # Save as Base64-encoded value (without the PEM "BEGIN" and "END" header/footer for efficiency) -+ # Check if we have original bytes preserved for this certificate (when re-encoding was needed) -+ original_bytes = self._original_bytes_cache -+ if original_bytes is not None: -+ # Use original bytes to preserve signature validity -+ return original_bytes.decode("utf-8") -+ -+ # Use standard encoding for ASN.1-compliant certificates return base64.b64encode(cert.public_bytes(Encoding.DER)).decode("utf-8") def render(self, value: IncomingValue) -> Optional[str]: -@@ -279,9 +296,49 @@ def render(self, value: IncomingValue) -> Optional[str]: +@@ -281,9 +305,8 @@ class Certificate(ModelType): if not cert: return None -+ # Check if we have original bytes preserved for this certificate (when re-encoding was needed) -+ original_bytes = self._original_bytes_cache -+ if original_bytes is not None: -+ # Use original bytes to preserve signature validity -+ # The original bytes are expected to be base64 encoded DER -+ return "\n".join( -+ ["-----BEGIN CERTIFICATE-----", original_bytes.decode("utf-8"), "-----END CERTIFICATE-----"] -+ ) -+ - # Render certificate in PEM format +- # Render certificate in PEM format return cert.public_bytes(Encoding.PEM).decode("utf-8") # type: ignore[no-any-return] -+ def db_load(self, value: Optional[str], _dialect: Dialect) -> Optional[cryptography.x509.Certificate]: -+ """Load certificate from database, preserving original bytes when re-encoding is needed. -+ -+ This method ensures that when reading certificates from the database, we preserve -+ the original bytes (which may have been stored from a previous malformed certificate) -+ rather than losing them during re-parsing. -+ """ -+ if not value: -+ return None -+ -+ # Decode the Base64 value from database - these are the original bytes we want to preserve -+ try: -+ original_der_bytes = base64.b64decode(value, validate=True) -+ except (binascii.Error, ValueError): -+ # If Base64 decoding fails, fall back to standard loading -+ return self.cast(value) -+ -+ # Cast to certificate object (may trigger re-encoding if still malformed) -+ cert = self.cast(original_der_bytes) -+ -+ return cert -+ -+ def has_original_bytes(self) -> bool: -+ """Check if this certificate has preserved original bytes""" -+ return self._original_bytes_cache is not None -+ -+ def get_original_bytes(self) -> Optional[bytes]: -+ """Return the preserved original bytes of the base64-encoded certificate -+ if the certificate was malformed and it was stored""" -+ return self._original_bytes_cache -+ @property def native_type(self) -> type: - return cryptography.x509.Certificate +- return cryptography.x509.Certificate ++ return CertificateWrapper diff --git a/keylime/models/registrar/registrar_agent.py b/keylime/models/registrar/registrar_agent.py -index 560c18838..58dd0854c 100644 +index b232049..680316b 100644 --- a/keylime/models/registrar/registrar_agent.py +++ b/keylime/models/registrar/registrar_agent.py -@@ -357,6 +357,12 @@ def render(self, only=None): +@@ -1,7 +1,6 @@ + import base64 + import hmac - # When operating in pull mode, ekcert is encoded as Base64 instead of PEM - if output.get("ekcert"): -+ if self.ekcert.has_original_bytes: -+ original_bytes = self.ekcert.get_original_bytes() -+ if original_bytes is not None: -+ # In case the cert was malformed, the original cert is -+ # cached base64-encoded -+ output["ekcert"] = original_bytes.decode("utf-8") - output["ekcert"] = base64.b64encode(self.ekcert.public_bytes(Encoding.DER)).decode("utf-8") +-import cryptography.x509 + from cryptography.hazmat.primitives.asymmetric import ec, rsa + from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat - return output -diff --git a/test/test_certificate_preservation.py b/test/test_certificate_preservation.py +@@ -116,35 +115,35 @@ class RegistrarAgent(PersistableModel): + if not cert_utils.verify_cert(cert, trust_store, cert_type): + self._add_error(cert_field, "must contain a certificate issued by a CA present in the trust store") + +- def _check_cert_compliance(self, cert_field, raw_cert): ++ def _check_cert_compliance(self, cert_field): + new_cert = self.changes.get(cert_field) + old_cert = self.values.get(cert_field) + + # If the certificate field has not been changed, no need to perform check +- if not raw_cert or not new_cert: ++ if not new_cert: ++ return True ++ ++ # If the certificate field is set as "disabled" (for mtls_cert) ++ if new_cert == "disabled": + return True + + # If the new certificate value is the same as the old certificate value, no need to perform check +- if ( +- isinstance(new_cert, cryptography.x509.Certificate) +- and isinstance(old_cert, cryptography.x509.Certificate) +- and new_cert.public_bytes(Encoding.DER) == old_cert.public_bytes(Encoding.DER) +- ): ++ if old_cert and new_cert.public_bytes(Encoding.DER) == old_cert.public_bytes(Encoding.DER): + return True + +- compliant = Certificate().asn1_compliant(raw_cert) ++ compliant = Certificate().asn1_compliant(new_cert) + + if not compliant: + if config.get("registrar", "malformed_cert_action") == "reject": +- self._add_error(cert_field, Certificate().generate_error_msg(raw_cert)) ++ self._add_error(cert_field, Certificate().generate_error_msg(new_cert)) + + return compliant + +- def _check_all_cert_compliance(self, data): ++ def _check_all_cert_compliance(self): + non_compliant_certs = [] + + for field_name in ("ekcert", "iak_cert", "idevid_cert", "mtls_cert"): +- if not self._check_cert_compliance(field_name, data.get(field_name)): ++ if not self._check_cert_compliance(field_name): + non_compliant_certs.append(f"'{field_name}'") + + if not non_compliant_certs: +@@ -291,7 +290,7 @@ class RegistrarAgent(PersistableModel): + # Ensure either an EK or IAK/IDevID is present, depending on configuration + self._check_root_identity_presence() + # Handle certificates which are not fully compliant with ASN.1 DER +- self._check_all_cert_compliance(data) ++ self._check_all_cert_compliance() + + # Basic validation of values + self.validate_required(["aik_tpm"]) +diff --git a/test/test_certificate_modeltype.py b/test/test_certificate_modeltype.py new file mode 100644 -index 000000000..1ac981891 +index 0000000..335ae0f --- /dev/null -+++ b/test/test_certificate_preservation.py ++++ b/test/test_certificate_modeltype.py @@ -0,0 +1,197 @@ +""" -+Unit tests for certificate original bytes preservation functionality. ++Unit tests for the Certificate ModelType class. + -+Tests that the Certificate type correctly preserves original certificate bytes -+when malformed certificates require pyasn1 re-encoding, ensuring signatures -+remain valid throughout the database lifecycle. ++This module tests the certificate model type functionality including ++encoding inference and ASN.1 compliance checking. +""" + +import base64 -+import os -+import sys +import unittest -+from unittest.mock import Mock + -+# Add the parent directory to sys.path to import from local keylime -+sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) -+# Imported after path manipulation to get local version -+from keylime.models.base.types.certificate import Certificate # pylint: disable=wrong-import-position ++import cryptography.x509 ++from cryptography.hazmat.primitives.serialization import Encoding ++ ++from keylime.certificate_wrapper import CertificateWrapper, wrap_certificate ++from keylime.models.base.types.certificate import Certificate + + -+class TestCertificatePreservation(unittest.TestCase): -+ """Test certificate original bytes preservation functionality.""" -+ -+ # pylint: disable=protected-access # Tests need access to internal cache for validation ++class TestCertificateModelType(unittest.TestCase): ++ """Test cases for Certificate ModelType class.""" + + def setUp(self): + """Set up test fixtures.""" -+ # Real malformed TPM certificate from /tmp/malformed_cert.txt -+ # This is a Nuvoton TPM EK certificate that requires pyasn1 re-encoding -+ malformed_cert_multiline = """MIIDUjCCAvegAwIBAgILAI5xYHQ14nH5hdYwCgYIKoZIzj0EAwIwVTFTMB8GA1UEAxMYTnV2b3Rv -+biBUUE0gUm9vdCBDQSAyMTExMCUGA1UEChMeTnV2b3RvbiBUZWNobm9sb2d5IENvcnBvcmF0aW9u -+MAkGA1UEBhMCVFcwHhcNMTkwNzIzMTcxNTEzWhcNMzkwNzE5MTcxNTEzWjAAMIIBIjANBgkqhkiG -+9w0BAQEFAAOCAQ8AMIIBCgKCAQEAk8kCj7srY/Zlvm1795fVXdyX44w5qsd1m5VywMDgSOavzPKO -+kgbHgQNx6Ak5+4Q43EJ/5qsaDBv59F8W7K69maUwcMNq1xpuq0V/LiwgJVAtc3CdvlxtwQrn7+Uq -+ieIGf+i8sGxpeUCSmYHJPTHNHqjQnvUtdGoy/+WO0i7WsAvX3k/gHHr4p58a8urjJ1RG2Lk1g48D -+ESwl+D7atQEPWzgjr6vK/s5KpLrn7M+dh97TUbG1510AOWBPP35MtT8IZbqC4hs2Ol16gT1M3a9e -++GaMZkItLUwV76vKDNEgTZG8M1C9OItA/xwzlfXbPepzpxWb4kzHS4qZoQtl4vBZrQIDAQABo4IB -+NjCCATIwUAYDVR0RAQH/BEYwRKRCMEAxPjAUBgVngQUCARMLaWQ6NEU1NDQzMDAwEAYFZ4EFAgIT -+B05QQ1Q3NXgwFAYFZ4EFAgMTC2lkOjAwMDcwMDAyMAwGA1UdEwEB/wQCMAAwEAYDVR0lBAkwBwYF -+Z4EFCAEwHwYDVR0jBBgwFoAUI/TiKtO+N0pEl3KVSqKDrtdSVy4wDgYDVR0PAQH/BAQDAgUgMCIG -+A1UdCQQbMBkwFwYFZ4EFAhAxDjAMDAMyLjACAQACAgCKMGkGCCsGAQUFBwEBBF0wWzBZBggrBgEF -+BQcwAoZNaHR0cHM6Ly93d3cubnV2b3Rvbi5jb20vc2VjdXJpdHkvTlRDLVRQTS1FSy1DZXJ0L051 -+dm90b24gVFBNIFJvb3QgQ0EgMjExMS5jZXIwCgYIKoZIzj0EAwIDSQAwRgIhAPHOFiBDZd0dfml2 -+a/KlPFhmX7Ahpd0Wq11ZUW1/ixviAiEAlex8BB5nsR6w8QrANwCxc7fH/YnbjXfMCFiWzeZH7ps=""" -+ # Normalize the Base64 (remove newlines and spaces) -+ self.malformed_cert_b64 = "".join(malformed_cert_multiline.split()) -+ # Decode to get original DER bytes -+ self.malformed_cert_der = base64.b64decode(self.malformed_cert_b64) -+ # Create Certificate instance for testing + self.cert_type = Certificate() + -+ def test_malformed_certificate_parsing(self): -+ """Test that malformed certificate can be parsed using pyasn1 fallback.""" -+ # This malformed certificate should trigger pyasn1 re-encoding -+ cert = self.cert_type.cast(self.malformed_cert_der) ++ # Compliant certificate for testing (loads fine with python-cryptography) ++ self.compliant_cert_pem = """-----BEGIN CERTIFICATE----- ++MIIClzCCAX+gAwIBAgIBATANBgkqhkiG9w0BAQsFADAPMQ0wCwYDVQQDDARUZXN0 ++MB4XDTI1MDkxMTEyNDU1MVoXDTI2MDkxMTEyNDU1MVowDzENMAsGA1UEAwwEVGVz ++dDCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAO2V27HsMnKczHCaLgf9 ++FtxuorvkA5OMkz6KsW1eyryHr0TJ801prLpeNnMZ3U4pqLMqocMc7T2KO6nPZJxO ++7zRzehyo9pBBVO4pUR1QMGoTWuJQbqNieDQ4V9dW67N5wp/UWEkK6CNNd6aXjswb ++dVaDbIfDL8hMX6Lil3+pTysRWGqjRvBGJxS9r/mYRAvbz1JHPjfegSc0uxnUE+qZ ++SrbWa3TN82LX6jw6tKk0Z3CcPJC6QN+ijCxxAoHyLRYUIgZbAKe/FGRbjO0fuW11 ++L7TcE1k3eaC7RkvotIaCOW/RMOkwKu1MbCzFEA2YRYf9covEwdItzI4FE++ZJrsz ++LhUCAwEAaTANBgkqhkiG9w0BAQsFAAOCAQEAeqqJT0LnmAluAjrsCSK/eYYjwjhZ ++aKMi/iBO10zfb+GvT4yqEL5gnuWxJEx4TTcDww1clvOC1EcPUZFaKR3GIBGy0ZgJ ++zGCfg+sC6liyZ+4PSWSJHD2dT5N3IGp4/hPsrhKnVb9fYbRc0Bc5VHeS9QQoSJDH ++f9EbxCcwdErVllRter29OZCb4XnEEbTqLIKRYVrbsu/t4C+vzi0tmKg5HZXf9PMo ++D28zJGsCAr8sKW/iUKObqDOHEn56lk12NTJmJmi+g6rEikk/0czJlRjSGnJQLjUg ++d4wslruibXBsLPtJw2c6vTC2SV2F1PXwy5j1OKU+D6nxaaItQvWADEjcTg== ++-----END CERTIFICATE-----""" + -+ # Should successfully parse despite being malformed -+ self.assertIsNotNone(cert) ++ # Malformed certificate that requires pyasn1 re-encoding (fails with python-cryptography) ++ self.malformed_cert_b64 = ( ++ "MIIDUjCCAvegAwIBAgILAI5xYHQ14nH5hdYwCgYIKoZIzj0EAwIwVTFTMB8GA1UEAxMYTnV2b3Rv" ++ "biBUUE0gUm9vdCBDQSAyMTExMCUGA1UEChMeTnV2b3RvbiBUZWNobm9sb2d5IENvcnBvcmF0aW9u" ++ "MAkGA1UEBhMCVFcwHhcNMTkwNzIzMTcxNTEzWhcNMzkwNzE5MTcxNTEzWjAAMIIBIjANBgkqhkiG" ++ "9w0BAQEFAAOCAQ8AMIIBCgKCAQEAk8kCj7srY/Zlvm1795fVXdyX44w5qsd1m5VywMDgSOavzPKO" ++ "kgbHgQNx6Ak5+4Q43EJ/5qsaDBv59F8W7K69maUwcMNq1xpuq0V/LiwgJVAtc3CdvlxtwQrn7+Uq" ++ "ieIGf+i8sGxpeUCSmYHJPTHNHqjQnvUtdGoy/+WO0i7WsAvX3k/gHHr4p58a8urjJ1RG2Lk1g48D" ++ "ESwl+D7atQEPWzgjr6vK/s5KpLrn7M+dh97TUbG1510AOWBPP35MtT8IZbqC4hs2Ol16gT1M3a9e" ++ "+GaMZkItLUwV76vKDNEgTZG8M1C9OItA/xwzlfXbPepzpxWb4kzHS4qZoQtl4vBZrQIDAQABo4IB" ++ "NjCCATIwUAYDVR0RAQH/BEYwRKRCMEAxPjAUBgVngQUCARMLaWQ6NEU1NDQzMDAwEAYFZ4EFAgIT" ++ "B05QQ1Q3NXgwFAYFZ4EFAgMTC2lkOjAwMDcwMDAyMAwGA1UdEwEB/wQCMAAwEAYDVR0lBAkwBwYF" ++ "Z4EFCAEwHwYDVR0jBBgwFoAUI/TiKtO+N0pEl3KVSqKDrtdSVy4wDgYDVR0PAQH/BAQDAgUgMCIG" ++ "A1UdCQQbMBkwFwYFZ4EFAhAxDjAMDAMyLjACAQACAgCKMGkGCCsGAQUFBwEBBF0wWzBZBggrBgEF" ++ "BQcwAoZNaHR0cHM6Ly93d3cubnV2b3Rvbi5jb20vc2VjdXJpdHkvTlRDLVRQTS1FSy1DZXJ0L051" ++ "dm90b24gVFBNIFJvb3QgQ0EgMjExMS5jZXIwCgYIKoZIzj0EAwIDSQAwRgIhAPHOFiBDZd0dfml2" ++ "a/KlPFhmX7Ahpd0Wq11ZUW1/ixviAiEAlex8BB5nsR6w8QrANwCxc7fH/YnbjXfMCFiWzeZH7ps=" ++ ) + -+ # Should have preserved original bytes in cache -+ self.assertTrue(self.cert_type.has_original_bytes()) # type: ignore[arg-type] ++ # Load certificates for testing ++ self.compliant_cert = cryptography.x509.load_pem_x509_certificate(self.compliant_cert_pem.encode()) ++ self.malformed_cert_der = base64.b64decode(self.malformed_cert_b64) + -+ # Cached bytes should be the original DER bytes -+ cached_bytes = self.cert_type._original_bytes_cache.decode("utf-8") # type: ignore[arg-type] -+ self.assertEqual(cached_bytes, self.malformed_cert_b64) ++ def test_infer_encoding_wrapped_certificate(self): ++ """Test that CertificateWrapper objects are identified as 'wrapped'.""" ++ wrapped_cert = wrap_certificate(self.compliant_cert, None) ++ encoding = self.cert_type.infer_encoding(wrapped_cert) ++ self.assertEqual(encoding, "wrapped") + -+ def test_asn1_compliance_detection(self): -+ """Test that malformed certificate is detected as non-ASN.1 compliant.""" -+ # This certificate should not be ASN.1 DER compliant -+ is_compliant = self.cert_type.asn1_compliant(self.malformed_cert_der) -+ self.assertFalse(is_compliant) ++ def test_infer_encoding_raw_certificate(self): ++ """Test that raw cryptography.x509.Certificate objects are identified as 'decoded'.""" ++ encoding = self.cert_type.infer_encoding(self.compliant_cert) ++ self.assertEqual(encoding, "decoded") + -+ # Base64 version should also be non-compliant -+ is_compliant_b64 = self.cert_type.asn1_compliant(self.malformed_cert_b64) -+ self.assertFalse(is_compliant_b64) ++ def test_infer_encoding_der_bytes(self): ++ """Test that DER bytes are identified as 'der'.""" ++ der_bytes = self.compliant_cert.public_bytes(Encoding.DER) ++ encoding = self.cert_type.infer_encoding(der_bytes) ++ self.assertEqual(encoding, "der") + -+ def test_dump_preserves_original_bytes(self): -+ """Test that _dump() returns original bytes for malformed certificates.""" -+ # Parse malformed certificate (triggers pyasn1 and caching) -+ cert = self.cert_type.cast(self.malformed_cert_der) ++ def test_infer_encoding_pem_string(self): ++ """Test that PEM strings are identified as 'pem'.""" ++ encoding = self.cert_type.infer_encoding(self.compliant_cert_pem) ++ self.assertEqual(encoding, "pem") + -+ # Dump should return original bytes as Base64 -+ dumped = self.cert_type._dump(cert) # pylint: disable=protected-access -+ self.assertEqual(dumped, self.malformed_cert_b64) ++ def test_infer_encoding_base64_string(self): ++ """Test that Base64 strings are identified as 'base64'.""" ++ encoding = self.cert_type.infer_encoding(self.malformed_cert_b64) ++ self.assertEqual(encoding, "base64") + -+ # Verify round-trip preservation -+ self.assertIsNotNone(dumped) -+ restored_bytes = base64.b64decode(dumped) # type: ignore[arg-type] -+ self.assertEqual(restored_bytes, self.malformed_cert_der) ++ def test_infer_encoding_none_for_invalid(self): ++ """Test that invalid types return None.""" ++ encoding = self.cert_type.infer_encoding(12345) # type: ignore[arg-type] # Testing invalid type ++ self.assertIsNone(encoding) + -+ def test_db_load_preserves_original_bytes(self): -+ """Test that db_load() preserves original bytes when reading from database.""" -+ mock_dialect = Mock() ++ def test_asn1_compliant_wrapped_without_original_bytes(self): ++ """Test that CertificateWrapper without original bytes is ASN.1 compliant.""" ++ wrapped_cert = wrap_certificate(self.compliant_cert, None) ++ compliant = self.cert_type.asn1_compliant(wrapped_cert) ++ self.assertTrue(compliant) + -+ # Simulate loading from database with our malformed certificate -+ cert = self.cert_type.db_load(self.malformed_cert_b64, mock_dialect) ++ def test_asn1_compliant_wrapped_with_original_bytes(self): ++ """Test that CertificateWrapper with original bytes is not ASN.1 compliant.""" ++ wrapped_cert = wrap_certificate(self.compliant_cert, b"fake_original_bytes") ++ compliant = self.cert_type.asn1_compliant(wrapped_cert) ++ self.assertFalse(compliant) + -+ # Should successfully load certificate -+ self.assertIsNotNone(cert) ++ def test_asn1_compliant_raw_certificate(self): ++ """Test that raw cryptography.x509.Certificate returns None (already decoded).""" ++ compliant = self.cert_type.asn1_compliant(self.compliant_cert) ++ self.assertIsNone(compliant) + -+ # Should have preserved original bytes in cache -+ self.assertTrue(self.cert_type.has_original_bytes()) # type: ignore[arg-type] ++ def test_asn1_compliant_pem_strings(self): ++ """Test ASN.1 compliance checking on PEM strings.""" ++ # The regular certificate and TPM certificate from test_registrar_db.py are actually ASN.1 compliant ++ # and can be loaded directly by python-cryptography without requiring pyasn1 re-encoding ++ compliant_regular = self.cert_type.asn1_compliant(self.compliant_cert_pem) ++ # Only test one certificate since both are the same type (ASN.1 compliant) + -+ # Cached bytes should be the original DER bytes, base64-encoded -+ if self.cert_type._original_bytes_cache: -+ cached_bytes = self.cert_type._original_bytes_cache.decode("utf-8") -+ self.assertEqual(cached_bytes, self.malformed_cert_b64) ++ # Should be ASN.1 compliant (True) since it loads fine with python-cryptography ++ self.assertTrue(compliant_regular) + -+ def test_database_round_trip_preservation(self): -+ """Test complete database round-trip preserves original bytes.""" -+ mock_dialect = Mock() ++ def test_asn1_compliant_der_and_base64(self): ++ """Test ASN.1 compliance checking on DER and Base64 formats.""" ++ # Test DER bytes - regular certificate should be compliant ++ der_bytes = self.compliant_cert.public_bytes(Encoding.DER) ++ compliant_der = self.cert_type.asn1_compliant(der_bytes) ++ self.assertTrue(compliant_der) + -+ # Step 1: Initial parsing (simulates agent registration) -+ cert1 = self.cert_type.cast(self.malformed_cert_der) -+ self.assertIsNotNone(cert1) -+ self.assertTrue(self.cert_type.has_original_bytes()) # type: ignore[arg-type] ++ # Test Base64 string - regular certificate should be compliant ++ b64_string = base64.b64encode(der_bytes).decode("utf-8") ++ compliant_b64 = self.cert_type.asn1_compliant(b64_string) ++ self.assertTrue(compliant_b64) + -+ # Step 2: Store to database simulation -+ db_value = self.cert_type._dump(cert1) # pylint: disable=protected-access -+ self.assertEqual(db_value, self.malformed_cert_b64) ++ def test_asn1_compliant_malformed_certificate(self): ++ """Test ASN.1 compliance checking on a truly malformed certificate.""" ++ # Test the malformed certificate that requires pyasn1 re-encoding ++ compliant = self.cert_type.asn1_compliant(self.malformed_cert_b64) ++ self.assertFalse(compliant) # Should be non-compliant since it needs pyasn1 fallback + -+ # Step 3: Create fresh Certificate instance (simulates new session) -+ fresh_cert_type = Certificate() -+ self.assertEqual(fresh_cert_type._original_bytes_cache, None) # pylint: disable=protected-access ++ def test_asn1_compliant_invalid_data(self): ++ """Test that invalid certificate data is not ASN.1 compliant.""" ++ compliant = self.cert_type.asn1_compliant("invalid_certificate_data") ++ self.assertFalse(compliant) + -+ # Step 4: Load from database -+ cert2 = fresh_cert_type.db_load(db_value, mock_dialect) -+ self.assertIsNotNone(cert2) -+ self.assertTrue(fresh_cert_type.has_original_bytes()) # type: ignore[arg-type] ++ def test_cast_wrapped_certificate(self): ++ """Test that CertificateWrapper objects are returned unchanged.""" ++ wrapped_cert = wrap_certificate(self.compliant_cert, None) ++ result = self.cert_type.cast(wrapped_cert) ++ self.assertIs(result, wrapped_cert) + -+ # Step 5: Verify original bytes preserved across round-trip -+ self.assertTrue(fresh_cert_type._original_bytes_cache) -+ cached_bytes = fresh_cert_type._original_bytes_cache.decode("utf-8") # type: ignore[arg-type] -+ self.assertEqual(cached_bytes, self.malformed_cert_b64) ++ def test_cast_raw_certificate_to_wrapped(self): ++ """Test that raw certificates are wrapped without original bytes.""" ++ result = self.cert_type.cast(self.compliant_cert) ++ self.assertIsInstance(result, CertificateWrapper) ++ assert result is not None # For type checker ++ self.assertFalse(result.has_original_bytes) + -+ # Step 6: Subsequent dump should still use original bytes -+ second_dump = fresh_cert_type._dump(cert2) # pylint: disable=protected-access -+ self.assertEqual(second_dump, self.malformed_cert_b64) ++ def test_cast_pem_strings(self): ++ """Test casting PEM strings to CertificateWrapper.""" ++ # Test regular certificate - should be ASN.1 compliant, no original bytes needed ++ result_regular = self.cert_type.cast(self.compliant_cert_pem) ++ self.assertIsInstance(result_regular, CertificateWrapper) ++ assert result_regular is not None # For type checker ++ self.assertFalse(result_regular.has_original_bytes) + -+ def test_compliant_certificate_no_caching(self): -+ """Test that ASN.1-compliant certificates don't use caching.""" -+ # Create a simple ASN.1-compliant certificate for testing -+ # We'll use a well-formed Base64 string that represents valid DER -+ compliant_cert_b64 = "MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAwJKW9Q==" # Truncated for test ++ # Note: Only testing compliant certificate since we now use one consistent certificate for all compliant scenarios + -+ try: -+ # Try to parse - may fail due to truncation, but that's ok for this test -+ cert = self.cert_type.cast(compliant_cert_b64) -+ if cert: # Only test if parsing succeeded -+ # Should not have cached original bytes -+ self.assertFalse(self.cert_type.has_original_bytes()) -+ self.assertEqual(self.cert_type._original_bytes_cache, None) -+ except (ValueError, Exception): -+ # Expected for truncated certificate - just verify no caching occurred -+ self.assertEqual(self.cert_type._original_bytes_cache, None) ++ def test_cast_malformed_certificate(self): ++ """Test casting the malformed certificate that requires pyasn1 re-encoding.""" ++ result = self.cert_type.cast(self.malformed_cert_b64) ++ self.assertIsInstance(result, CertificateWrapper) ++ assert result is not None # For type checker ++ # Malformed certificate should have original bytes since it needs re-encoding ++ self.assertTrue(result.has_original_bytes) + -+ def test_db_load_with_none_value(self): -+ """Test that db_load() handles None values correctly.""" -+ mock_dialect = Mock() ++ def test_cast_der_bytes(self): ++ """Test casting DER bytes to CertificateWrapper.""" ++ der_bytes = self.compliant_cert.public_bytes(Encoding.DER) ++ result = self.cert_type.cast(der_bytes) ++ self.assertIsInstance(result, CertificateWrapper) + -+ result = self.cert_type.db_load(None, mock_dialect) ++ def test_cast_none_value(self): ++ """Test that None values return None.""" ++ result = self.cert_type.cast(None) + self.assertIsNone(result) -+ self.assertEqual(self.cert_type._original_bytes_cache, None) + -+ def test_db_load_with_invalid_base64(self): -+ """Test that db_load() handles invalid Base64 gracefully.""" -+ mock_dialect = Mock() -+ -+ # Invalid Base64 string -+ invalid_b64 = "not_valid_base64!!!" -+ -+ # Should not raise exception, but may return None or attempt fallback -+ try: -+ _ = self.cert_type.db_load(invalid_b64, mock_dialect) -+ # Result may be None or may attempt to parse the string directly -+ # The important thing is it doesn't crash -+ except Exception: -+ # Some exceptions are expected for invalid data -+ # We just want to ensure it's handled gracefully -+ pass -+ -+ def test_render_method_unaffected(self): -+ """Test that render() method works normally with cached certificates.""" -+ cert = self.cert_type.cast(self.malformed_cert_der) -+ self.assertIsNotNone(cert) -+ -+ # render() should return PEM format (doesn't use cached bytes) -+ rendered = self.cert_type.render(cert) -+ -+ # Should be PEM format -+ if rendered: # Only test if rendering succeeded -+ self.assertIn("-----BEGIN CERTIFICATE-----", rendered) -+ self.assertIn("-----END CERTIFICATE-----", rendered) ++ def test_cast_empty_string(self): ++ """Test that empty strings return None.""" ++ result = self.cert_type.cast("") ++ self.assertIsNone(result) ++ ++ ++if __name__ == "__main__": ++ unittest.main() +diff --git a/test/test_certificate_wrapper.py b/test/test_certificate_wrapper.py +new file mode 100644 +index 0000000..6b47260 +--- /dev/null ++++ b/test/test_certificate_wrapper.py +@@ -0,0 +1,385 @@ ++""" ++Unit tests for the CertificateWrapper class. ++ ++This module tests the certificate wrapper functionality that preserves original bytes ++for malformed certificates requiring pyasn1 re-encoding. ++""" ++ ++import base64 ++import subprocess ++import tempfile ++import unittest ++from unittest.mock import Mock ++ ++import cryptography.x509 ++from cryptography.hazmat.primitives.serialization import Encoding ++from pyasn1.codec.der import decoder as pyasn1_decoder ++from pyasn1.codec.der import encoder as pyasn1_encoder ++from pyasn1_modules import rfc2459 as pyasn1_rfc2459 ++ ++from keylime.certificate_wrapper import CertificateWrapper, wrap_certificate ++ ++ ++class TestCertificateWrapper(unittest.TestCase): ++ """Test cases for CertificateWrapper class.""" ++ ++ def setUp(self): ++ """Set up test fixtures.""" ++ # Malformed certificate (Base64 encoded) that requires pyasn1 re-encoding ++ # This is a real TPM certificate that doesn't strictly follow ASN.1 DER rules ++ self.malformed_cert_b64 = ( ++ "MIIDUjCCAvegAwIBAgILAI5xYHQ14nH5hdYwCgYIKoZIzj0EAwIwVTFTMB8GA1UEAxMYTnV2b3Rv" ++ "biBUUE0gUm9vdCBDQSAyMTExMCUGA1UEChMeTnV2b3RvbiBUZWNobm9sb2d5IENvcnBvcmF0aW9u" ++ "MAkGA1UEBhMCVFcwHhcNMTkwNzIzMTcxNTEzWhcNMzkwNzE5MTcxNTEzWjAAMIIBIjANBgkqhkiG" ++ "9w0BAQEFAAOCAQ8AMIIBCgKCAQEAk8kCj7srY/Zlvm1795fVXdyX44w5qsd1m5VywMDgSOavzPKO" ++ "kgbHgQNx6Ak5+4Q43EJ/5qsaDBv59F8W7K69maUwcMNq1xpuq0V/LiwgJVAtc3CdvlxtwQrn7+Uq" ++ "ieIGf+i8sGxpeUCSmYHJPTHNHqjQnvUtdGoy/+WO0i7WsAvX3k/gHHr4p58a8urjJ1RG2Lk1g48D" ++ "ESwl+D7atQEPWzgjr6vK/s5KpLrn7M+dh97TUbG1510AOWBPP35MtT8IZbqC4hs2Ol16gT1M3a9e" ++ "+GaMZkItLUwV76vKDNEgTZG8M1C9OItA/xwzlfXbPepzpxWb4kzHS4qZoQtl4vBZrQIDAQABo4IB" ++ "NjCCATIwUAYDVR0RAQH/BEYwRKRCMEAxPjAUBgVngQUCARMLaWQ6NEU1NDQzMDAwEAYFZ4EFAgIT" ++ "B05QQ1Q3NXgwFAYFZ4EFAgMTC2lkOjAwMDcwMDAyMAwGA1UdEwEB/wQCMAAwEAYDVR0lBAkwBwYF" ++ "Z4EFCAEwHwYDVR0jBBgwFoAUI/TiKtO+N0pEl3KVSqKDrtdSVy4wDgYDVR0PAQH/BAQDAgUgMCIG" ++ "A1UdCQQbMBkwFwYFZ4EFAhAxDjAMDAMyLjACAQACAgCKMGkGCCsGAQUFBwEBBF0wWzBZBggrBgEF" ++ "BQcwAoZNaHR0cHM6Ly93d3cubnV2b3Rvbi5jb20vc2VjdXJpdHkvTlRDLVRQTS1FSy1DZXJ0L051" ++ "dm90b24gVFBNIFJvb3QgQ0EgMjExMS5jZXIwCgYIKoZIzj0EAwIDSQAwRgIhAPHOFiBDZd0dfml2" ++ "a/KlPFhmX7Ahpd0Wq11ZUW1/ixviAiEAlex8BB5nsR6w8QrANwCxc7fH/YnbjXfMCFiWzeZH7ps=" ++ ) ++ self.malformed_cert_der = base64.b64decode(self.malformed_cert_b64) ++ ++ # Create a mock certificate for testing ++ self.mock_cert = Mock(spec=cryptography.x509.Certificate) ++ self.mock_cert.subject = Mock() ++ self.mock_cert.subject.__str__ = Mock(return_value="CN=Test Certificate") ++ self.mock_cert.public_bytes.return_value = b"mock_der_data" ++ ++ def test_init_without_original_bytes(self): ++ """Test wrapper initialization without original bytes.""" ++ wrapper = CertificateWrapper(self.mock_cert) ++ ++ # Test through public interface ++ self.assertFalse(wrapper.has_original_bytes) ++ self.assertIsNone(wrapper.original_bytes) ++ # Test delegation works ++ self.assertEqual(wrapper.subject, self.mock_cert.subject) ++ ++ def test_init_with_original_bytes(self): ++ """Test wrapper initialization with original bytes.""" ++ original_data = b"original_certificate_data" ++ wrapper = CertificateWrapper(self.mock_cert, original_data) ++ ++ # Test through public interface ++ self.assertTrue(wrapper.has_original_bytes) ++ self.assertEqual(wrapper.original_bytes, original_data) ++ # Test delegation works ++ self.assertEqual(wrapper.subject, self.mock_cert.subject) ++ ++ def test_getattr_delegation(self): ++ """Test that attributes are properly delegated to the wrapped certificate.""" ++ wrapper = CertificateWrapper(self.mock_cert) ++ ++ # Access an attribute that should be delegated ++ result = wrapper.subject ++ self.assertEqual(result, self.mock_cert.subject) ++ ++ def test_public_bytes_der_without_original(self): ++ """Test public_bytes DER encoding without original bytes.""" ++ wrapper = CertificateWrapper(self.mock_cert) ++ ++ result = wrapper.public_bytes(Encoding.DER) ++ ++ self.mock_cert.public_bytes.assert_called_once_with(Encoding.DER) ++ self.assertEqual(result, b"mock_der_data") ++ ++ def test_public_bytes_der_with_original(self): ++ """Test public_bytes DER encoding with original bytes.""" ++ original_data = b"original_certificate_data" ++ wrapper = CertificateWrapper(self.mock_cert, original_data) ++ ++ result = wrapper.public_bytes(Encoding.DER) ++ ++ # Should return original bytes, not call the wrapped certificate ++ self.mock_cert.public_bytes.assert_not_called() ++ self.assertEqual(result, original_data) ++ ++ def test_public_bytes_pem_without_original(self): ++ """Test public_bytes PEM encoding without original bytes.""" ++ self.mock_cert.public_bytes.return_value = b"-----BEGIN CERTIFICATE-----\nMIIB...\n-----END CERTIFICATE-----\n" ++ wrapper = CertificateWrapper(self.mock_cert) ++ ++ result = wrapper.public_bytes(Encoding.PEM) ++ ++ self.mock_cert.public_bytes.assert_called_once_with(Encoding.PEM) ++ self.assertEqual(result, b"-----BEGIN CERTIFICATE-----\nMIIB...\n-----END CERTIFICATE-----\n") ++ ++ def test_public_bytes_pem_with_original(self): ++ """Test public_bytes PEM encoding with original bytes.""" ++ original_data = self.malformed_cert_der ++ wrapper = CertificateWrapper(self.mock_cert, original_data) ++ ++ result = wrapper.public_bytes(Encoding.PEM) ++ ++ # Should not call the wrapped certificate's method ++ self.mock_cert.public_bytes.assert_not_called() ++ ++ # Result should be PEM format derived from original bytes ++ self.assertIsInstance(result, bytes) ++ result_str = result.decode("utf-8") ++ self.assertTrue(result_str.startswith("-----BEGIN CERTIFICATE-----")) ++ self.assertTrue(result_str.endswith("-----END CERTIFICATE-----\n")) ++ ++ # Verify that the PEM content can be converted back to the original DER ++ pem_lines = result_str.strip().split("\n") ++ pem_content = "".join(pem_lines[1:-1]) # Remove headers and join ++ recovered_der = base64.b64decode(pem_content) ++ self.assertEqual(recovered_der, original_data) ++ ++ def test_pem_line_length_compliance(self): ++ """Test that PEM output follows RFC 1421 line length requirements (64 chars).""" ++ original_data = self.malformed_cert_der ++ wrapper = CertificateWrapper(self.mock_cert, original_data) ++ ++ result = wrapper.public_bytes(Encoding.PEM) ++ result_str = result.decode("utf-8") ++ ++ lines = result_str.strip().split("\n") ++ # Check that content lines (excluding headers) are max 64 chars ++ for line in lines[1:-1]: # Skip header and footer ++ self.assertLessEqual(len(line), 64) ++ ++ def test_str_representation(self): ++ """Test string representation of the wrapper.""" ++ wrapper = CertificateWrapper(self.mock_cert) ++ ++ result = str(wrapper) ++ ++ expected = f"CertificateWrapper(subject={self.mock_cert.subject})" ++ self.assertEqual(result, expected) ++ ++ def test_repr_representation_without_original(self): ++ """Test repr representation without original bytes.""" ++ wrapper = CertificateWrapper(self.mock_cert) ++ ++ result = repr(wrapper) ++ ++ expected = f"CertificateWrapper(subject={self.mock_cert.subject}, has_original_bytes=False)" ++ self.assertEqual(result, expected) ++ ++ def test_repr_representation_with_original(self): ++ """Test repr representation with original bytes.""" ++ original_data = b"original_data" ++ wrapper = CertificateWrapper(self.mock_cert, original_data) ++ ++ result = repr(wrapper) ++ ++ expected = f"CertificateWrapper(subject={self.mock_cert.subject}, has_original_bytes=True)" ++ self.assertEqual(result, expected) ++ ++ def test_pickling_support(self): ++ """Test that the wrapper supports pickling operations.""" ++ original_data = b"test_data" ++ wrapper = CertificateWrapper(self.mock_cert, original_data) ++ ++ # Test getstate ++ state = wrapper.__getstate__() ++ self.assertIsInstance(state, dict) ++ self.assertIn("_cert", state) ++ self.assertIn("_original_bytes", state) ++ ++ # Test setstate ++ new_wrapper = CertificateWrapper(Mock(), None) ++ new_wrapper.__setstate__(state) ++ # Verify state was restored correctly through public interface ++ self.assertTrue(new_wrapper.has_original_bytes) ++ self.assertEqual(new_wrapper.original_bytes, original_data) ++ ++ def test_wrap_certificate_function_without_original(self): ++ """Test the wrap_certificate factory function without original bytes.""" ++ wrapper = wrap_certificate(self.mock_cert) ++ ++ self.assertIsInstance(wrapper, CertificateWrapper) ++ self.assertFalse(wrapper.has_original_bytes) ++ self.assertIsNone(wrapper.original_bytes) ++ ++ def test_wrap_certificate_function_with_original(self): ++ """Test the wrap_certificate factory function with original bytes.""" ++ original_data = b"original_certificate_data" ++ wrapper = wrap_certificate(self.mock_cert, original_data) ++ ++ self.assertIsInstance(wrapper, CertificateWrapper) ++ self.assertTrue(wrapper.has_original_bytes) ++ self.assertEqual(wrapper.original_bytes, original_data) ++ ++ def test_real_malformed_certificate_handling(self): ++ """Test with a real malformed certificate that requires pyasn1 re-encoding.""" ++ # This test simulates the scenario where a malformed certificate is processed ++ ++ # Mock the scenario where cryptography fails but pyasn1 succeeds ++ mock_reencoded_cert = Mock(spec=cryptography.x509.Certificate) ++ mock_reencoded_cert.subject = Mock() ++ mock_reencoded_cert.subject.__str__ = Mock(return_value="CN=Nuvoton TPM") ++ ++ # Create wrapper as if it came from the certificate loading process ++ wrapper = wrap_certificate(mock_reencoded_cert, self.malformed_cert_der) ++ ++ # Test that original bytes are preserved ++ self.assertTrue(wrapper.has_original_bytes) ++ self.assertEqual(wrapper.original_bytes, self.malformed_cert_der) ++ ++ # Test DER output uses original bytes ++ der_output = wrapper.public_bytes(Encoding.DER) ++ self.assertEqual(der_output, self.malformed_cert_der) ++ ++ # Test PEM output is derived from original bytes ++ pem_output = wrapper.public_bytes(Encoding.PEM) ++ self.assertIsInstance(pem_output, bytes) ++ ++ # Verify PEM can be converted back to original DER ++ pem_str = pem_output.decode("utf-8") ++ lines = pem_str.strip().split("\n") ++ content = "".join(lines[1:-1]) ++ recovered_der = base64.b64decode(content) ++ self.assertEqual(recovered_der, self.malformed_cert_der) ++ ++ def test_unsupported_encoding_fallback(self): ++ """Test that unsupported encoding types fall back to wrapped certificate.""" ++ # Create a custom encoding that's not DER or PEM ++ custom_encoding = Mock() ++ custom_encoding.name = "CUSTOM" ++ ++ original_data = b"original_data" ++ wrapper = CertificateWrapper(self.mock_cert, original_data) ++ ++ # Should fall back to wrapped certificate for unknown encoding ++ wrapper.public_bytes(custom_encoding) ++ self.mock_cert.public_bytes.assert_called_once_with(custom_encoding) ++ ++ def test_malformed_certificate_cryptography_failure_and_verification(self): ++ """ ++ Comprehensive test demonstrating that the malformed certificate: ++ 1. Fails to load with python-cryptography ++ 2. Can be verified with OpenSSL ++ 3. Is successfully handled by our wrapper after pyasn1 re-encoding ++ """ ++ # Test 1: Demonstrate that python-cryptography fails to load the malformed certificate ++ with self.assertRaises(Exception) as context: ++ cryptography.x509.load_der_x509_certificate(self.malformed_cert_der) ++ ++ # The specific exception type may vary, but it should fail ++ self.assertIsInstance(context.exception, Exception) ++ ++ # Test 2: Demonstrate that pyasn1 can handle the malformed certificate ++ try: ++ # Decode and re-encode using pyasn1 (simulating what the Certificate type does) ++ pyasn1_cert = pyasn1_decoder.decode(self.malformed_cert_der, asn1Spec=pyasn1_rfc2459.Certificate())[0] ++ reencoded_der = pyasn1_encoder.encode(pyasn1_cert) ++ ++ # Now cryptography should be able to load the re-encoded certificate ++ reencoded_cert = cryptography.x509.load_der_x509_certificate(reencoded_der) ++ self.assertIsNotNone(reencoded_cert) ++ ++ except Exception as e: ++ self.fail(f"pyasn1 should handle the malformed certificate, but got: {e}") ++ ++ # Test 3: Verify that our wrapper preserves the original bytes correctly ++ wrapper = wrap_certificate(reencoded_cert, self.malformed_cert_der) ++ ++ # The wrapper should preserve original bytes ++ self.assertTrue(wrapper.has_original_bytes) ++ self.assertEqual(wrapper.original_bytes, self.malformed_cert_der) ++ ++ # DER output should use original bytes ++ der_output = wrapper.public_bytes(Encoding.DER) ++ self.assertEqual(der_output, self.malformed_cert_der) ++ ++ # PEM output should be derived from original bytes ++ pem_output = wrapper.public_bytes(Encoding.PEM) ++ pem_str = pem_output.decode("utf-8") ++ ++ # Verify PEM format is correct ++ self.assertTrue(pem_str.startswith("-----BEGIN CERTIFICATE-----")) ++ self.assertTrue(pem_str.endswith("-----END CERTIFICATE-----\n")) ++ ++ # Test 4: Demonstrate OpenSSL can verify the certificate structure ++ # (Even without the root CA, OpenSSL should be able to parse the certificate) ++ try: ++ with tempfile.NamedTemporaryFile(mode="wb", suffix=".der", delete=False) as temp_file: ++ temp_file.write(self.malformed_cert_der) ++ temp_file.flush() ++ ++ # Use OpenSSL to parse the certificate (should succeed) ++ result = subprocess.run( ++ ["openssl", "x509", "-in", temp_file.name, "-inform", "DER", "-text", "-noout"], ++ capture_output=True, ++ text=True, ++ check=False, ++ ) ++ ++ # OpenSSL should successfully parse the certificate ++ self.assertEqual(result.returncode, 0) ++ self.assertIn("Nuvoton TPM Root CA 2111", result.stdout) ++ self.assertIn("Certificate:", result.stdout) ++ ++ except (subprocess.CalledProcessError, FileNotFoundError) as e: ++ # Skip if OpenSSL is not available, but don't fail the test ++ self.skipTest(f"OpenSSL not available for verification test: {e}") ++ ++ # Test 5: Verify certificate details are accessible through wrapper ++ # The subject should be empty (as shown in the OpenSSL output) ++ self.assertEqual(len(reencoded_cert.subject), 0) ++ ++ # The issuer should contain Nuvoton information ++ issuer_attrs = {} ++ for attr in reencoded_cert.issuer: ++ # Use dotted string representation to avoid accessing private _name ++ oid_name = attr.oid.dotted_string ++ if oid_name == "2.5.4.3": # Common Name OID ++ issuer_attrs["commonName"] = attr.value ++ self.assertIn("commonName", issuer_attrs) ++ self.assertEqual(issuer_attrs["commonName"], "Nuvoton TPM Root CA 2111") ++ ++ # Test 6: Demonstrate that even re-encoded certificates may have parsing issues ++ # This shows why preserving original bytes is crucial ++ try: ++ # Try to access extensions - this may fail due to malformed ASN.1 ++ extensions = list(reencoded_cert.extensions) ++ # If it succeeds, verify it has the expected Subject Alternative Name ++ # Subject Alternative Name OID is 2.5.29.17 ++ has_subject_alt_name = any(ext.oid.dotted_string == "2.5.29.17" for ext in extensions) ++ self.assertTrue(has_subject_alt_name, "EK certificate should have Subject Alternative Name extension") ++ except (ValueError, Exception) as e: ++ # This is actually expected for malformed certificates! ++ # Even after pyasn1 re-encoding, some parsing issues may remain ++ self.assertIn("parsing asn1", str(e).lower(), f"Expected ASN.1 parsing error, got: {e}") ++ # This demonstrates why our wrapper preserves original bytes - ++ # they maintain signature validity even when parsing has issues ++ ++ def test_certificate_chain_verification_simulation(self): ++ """ ++ Test that simulates certificate chain verification where original bytes matter. ++ This demonstrates why preserving original bytes is crucial for signature validation. ++ """ ++ # Create a wrapper with the malformed certificate ++ mock_reencoded_cert = Mock(spec=cryptography.x509.Certificate) ++ mock_reencoded_cert.subject = Mock() ++ mock_reencoded_cert.public_key.return_value = Mock() ++ ++ wrapper = wrap_certificate(mock_reencoded_cert, self.malformed_cert_der) ++ ++ # Simulate signature verification scenario ++ # In real verification, the signature is computed over the exact DER bytes ++ original_bytes_for_verification = wrapper.public_bytes(Encoding.DER) ++ ++ # Should get the original malformed bytes (preserving signature validity) ++ self.assertEqual(original_bytes_for_verification, self.malformed_cert_der) ++ ++ # If we didn't preserve original bytes, we'd get re-encoded bytes which would ++ # invalidate the signature even though the certificate content is the same ++ mock_reencoded_cert.public_bytes.return_value = b"reencoded_different_bytes" ++ ++ # Verify that using the wrapper gets original bytes, not re-encoded bytes ++ self.assertNotEqual(original_bytes_for_verification, b"reencoded_different_bytes") ++ self.assertEqual(original_bytes_for_verification, self.malformed_cert_der) ++ ++ ++if __name__ == "__main__": ++ unittest.main() +diff --git a/test/test_registrar_agent_cert_compliance.py b/test/test_registrar_agent_cert_compliance.py +new file mode 100644 +index 0000000..ede9b9f +--- /dev/null ++++ b/test/test_registrar_agent_cert_compliance.py +@@ -0,0 +1,289 @@ ++""" ++Integration tests for RegistrarAgent certificate compliance functionality. ++ ++This module tests the simplified certificate compliance checking methods ++to ensure they work correctly with the new CertificateWrapper-based approach. ++""" ++ ++import types ++import unittest ++from unittest.mock import Mock, patch ++ ++import cryptography.x509 ++ ++from keylime.certificate_wrapper import wrap_certificate ++from keylime.models.base.types.certificate import Certificate ++from keylime.models.registrar.registrar_agent import RegistrarAgent ++ ++ ++class TestRegistrarAgentCertCompliance(unittest.TestCase): ++ """Test cases for RegistrarAgent certificate compliance methods.""" ++ ++ # pylint: disable=protected-access,not-callable # Testing protected methods and dynamic method binding ++ ++ def setUp(self): ++ """Set up test fixtures.""" ++ # Create a test certificate ++ self.valid_cert_pem = """-----BEGIN CERTIFICATE----- ++MIIEnzCCA4egAwIBAgIEMV64bDANBgkqhkiG9w0BAQUFADBtMQswCQYDVQQGEwJE ++RTEQMA4GA1UECBMHQmF2YXJpYTEhMB8GA1UEChMYSW5maW5lb24gVGVjaG5vbG9n ++aWVzIEFHMQwwCgYDVQQLEwNBSU0xGzAZBgNVBAMTEklGWCBUUE0gRUsgUm9vdCBD ++QTAeFw0wNTEwMjAxMzQ3NDNaFw0yNTEwMjAxMzQ3NDNaMHcxCzAJBgNVBAYTAkRF ++MQ8wDQYDVQQIEwZTYXhvbnkxITAfBgNVBAoTGEluZmluZW9uIFRlY2hub2xvZ2ll ++cyBBRzEMMAoGA1UECxMDQUlNMSYwJAYDVQQDEx1JRlggVFBNIEVLIEludGVybWVk ++aWF0ZSBDQSAwMTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBALftPhYN ++t4rE+JnU/XOPICbOBLvfo6iA7nuq7zf4DzsAWBdsZEdFJQfaK331ihG3IpQnlQ2i ++YtDim289265f0J4OkPFpKeFU27CsfozVaNUm6UR/uzwA8ncxFc3iZLRMRNLru/Al ++VG053ULVDQMVx2iwwbBSAYO9pGiGbk1iMmuZaSErMdb9v0KRUyZM7yABiyDlM3cz ++UQX5vLWV0uWqxdGoHwNva5u3ynP9UxPTZWHZOHE6+14rMzpobs6Ww2RR8BgF96rh ++4rRAZEl8BXhwiQq4STvUXkfvdpWH4lzsGcDDtrB6Nt3KvVNvsKz+b07Dk+Xzt+EH ++NTf3Byk2HlvX+scCAwEAAaOCATswggE3MB0GA1UdDgQWBBQ4k8292HPEIzMV4bE7 ++qWoNI8wQxzAOBgNVHQ8BAf8EBAMCAgQwEgYDVR0TAQH/BAgwBgEB/wIBADBYBgNV ++HSABAf8ETjBMMEoGC2CGSAGG+EUBBy8BMDswOQYIKwYBBQUHAgEWLWh0dHA6Ly93 ++d3cudmVyaXNpZ24uY29tL3JlcG9zaXRvcnkvaW5kZXguaHRtbDCBlwYDVR0jBIGP ++MIGMgBRW65FEhWPWcrOu1EWWC/eUDlRCpqFxpG8wbTELMAkGA1UEBhMCREUxEDAO ++BgNVBAgTB0JhdmFyaWExITAfBgNVBAoTGEluZmluZW9uIFRlY2hub2xvZ2llcyBB ++RzEMMAoGA1UECxMDQUlNMRswGQYDVQQDExJJRlggVFBNIEVLIFJvb3QgQ0GCAQMw ++DQYJKoZIhvcNAQEFBQADggEBABJ1+Ap3rNlxZ0FW0aIgdzktbNHlvXWNxFdYIBbM ++OKjmbOos0Y4O60eKPu259XmMItCUmtbzF3oKYXq6ybARUT2Lm+JsseMF5VgikSlU ++BJALqpKVjwAds81OtmnIQe2LSu4xcTSavpsL4f52cUAu/maMhtSgN9mq5roYptq9 ++DnSSDZrX4uYiMPl//rBaNDBflhJ727j8xo9CCohF3yQUoQm7coUgbRMzyO64yMIO ++3fhb+Vuc7sNwrMOz3VJN14C3JMoGgXy0c57IP/kD5zGRvljKEvrRC2I147+fPeLS ++DueRMS6lblvRKiZgmGAg7YaKOkOaEmVDMQ+fTo2Po7hI5wc= ++-----END CERTIFICATE-----""" ++ ++ self.valid_cert = cryptography.x509.load_pem_x509_certificate(self.valid_cert_pem.encode()) ++ ++ # Malformed certificate that actually requires pyasn1 re-encoding ++ self.malformed_cert_b64 = ( ++ "MIIDUjCCAvegAwIBAgILAI5xYHQ14nH5hdYwCgYIKoZIzj0EAwIwVTFTMB8GA1UEAxMYTnV2b3Rv" ++ "biBUUE0gUm9vdCBDQSAyMTExMCUGA1UEChMeTnV2b3RvbiBUZWNobm9sb2d5IENvcnBvcmF0aW9u" ++ "MAkGA1UEBhMCVFcwHhcNMTkwNzIzMTcxNTEzWhcNMzkwNzE5MTcxNTEzWjAAMIIBIjANBgkqhkiG" ++ "9w0BAQEFAAOCAQ8AMIIBCgKCAQEAk8kCj7srY/Zlvm1795fVXdyX44w5qsd1m5VywMDgSOavzPKO" ++ "kgbHgQNx6Ak5+4Q43EJ/5qsaDBv59F8W7K69maUwcMNq1xpuq0V/LiwgJVAtc3CdvlxtwQrn7+Uq" ++ "ieIGf+i8sGxpeUCSmYHJPTHNHqjQnvUtdGoy/+WO0i7WsAvX3k/gHHr4p58a8urjJ1RG2Lk1g48D" ++ "ESwl+D7atQEPWzgjr6vK/s5KpLrn7M+dh97TUbG1510AOWBPP35MtT8IZbqC4hs2Ol16gT1M3a9e" ++ "+GaMZkItLUwV76vKDNEgTZG8M1C9OItA/xwzlfXbPepzpxWb4kzHS4qZoQtl4vBZrQIDAQABo4IB" ++ "NjCCATIwUAYDVR0RAQH/BEYwRKRCMEAxPjAUBgVngQUCARMLaWQ6NEU1NDQzMDAwEAYFZ4EFAgIT" ++ "B05QQ1Q3NXgwFAYFZ4EFAgMTC2lkOjAwMDcwMDAyMAwGA1UdEwEB/wQCMAAwEAYDVR0lBAkwBwYF" ++ "Z4EFCAEwHwYDVR0jBBgwFoAUI/TiKtO+N0pEl3KVSqKDrtdSVy4wDgYDVR0PAQH/BAQDAgUgMCIG" ++ "A1UdCQQbMBkwFwYFZ4EFAhAxDjAMDAMyLjACAQACAgCKMGkGCCsGAQUFBwEBBF0wWzBZBggrBgEF" ++ "BQcwAoZNaHR0cHM6Ly93d3cubnV2b3Rvbi5jb20vc2VjdXJpdHkvTlRDLVRQTS1FSy1DZXJ0L051" ++ "dm90b24gVFBNIFJvb3QgQ0EgMjExMS5jZXIwCgYIKoZIzj0EAwIDSQAwRgIhAPHOFiBDZd0dfml2" ++ "a/KlPFhmX7Ahpd0Wq11ZUW1/ixviAiEAlex8BB5nsR6w8QrANwCxc7fH/YnbjXfMCFiWzeZH7ps=" ++ ) ++ ++ # Create wrapped certificates for testing using Certificate type to ensure proper behavior ++ cert_type = Certificate() ++ ++ # Create compliant certificate (no original bytes needed) ++ self.compliant_wrapped_cert = wrap_certificate(self.valid_cert, None) ++ ++ # Create non-compliant certificate using the malformed cert data ++ self.non_compliant_wrapped_cert = cert_type.cast(self.malformed_cert_b64) ++ ++ def create_mock_registrar_agent(self): ++ """Create a mock RegistrarAgent with necessary attributes.""" ++ agent = Mock() ++ agent.changes = {} ++ agent.values = {} ++ agent._add_error = Mock() ++ ++ # Bind the actual methods to the mock instance ++ agent._check_cert_compliance = types.MethodType(RegistrarAgent._check_cert_compliance, agent) ++ agent._check_all_cert_compliance = types.MethodType(RegistrarAgent._check_all_cert_compliance, agent) ++ ++ return agent ++ ++ def test_check_cert_compliance_no_new_cert(self): ++ """Test _check_cert_compliance when no new certificate is provided.""" ++ agent = self.create_mock_registrar_agent() ++ agent.changes = {} # No new certificate ++ ++ result = agent._check_cert_compliance("ekcert") ++ self.assertTrue(result) ++ agent._add_error.assert_not_called() ++ ++ def test_check_cert_compliance_same_cert(self): ++ """Test _check_cert_compliance when new cert is same as old cert.""" ++ agent = self.create_mock_registrar_agent() ++ agent.changes = {"ekcert": self.compliant_wrapped_cert} ++ agent.values = {"ekcert": self.compliant_wrapped_cert} ++ ++ result = agent._check_cert_compliance("ekcert") ++ self.assertTrue(result) ++ agent._add_error.assert_not_called() ++ ++ def test_check_cert_compliance_different_cert_same_der(self): ++ """Test _check_cert_compliance when certificates have same DER bytes.""" ++ agent = self.create_mock_registrar_agent() ++ # Create two different wrapper objects but with same underlying certificate ++ cert1 = wrap_certificate(self.valid_cert, None) ++ cert2 = wrap_certificate(self.valid_cert, None) ++ ++ agent.changes = {"ekcert": cert1} ++ agent.values = {"ekcert": cert2} ++ ++ result = agent._check_cert_compliance("ekcert") ++ self.assertTrue(result) ++ agent._add_error.assert_not_called() ++ ++ @patch("keylime.config.get") ++ def test_check_cert_compliance_compliant_cert(self, mock_config): ++ """Test _check_cert_compliance with ASN.1 compliant certificate.""" ++ mock_config.return_value = "warn" # Default action ++ ++ agent = self.create_mock_registrar_agent() ++ agent.changes = {"ekcert": self.compliant_wrapped_cert} ++ agent.values = {} # No old certificate ++ ++ result = agent._check_cert_compliance("ekcert") ++ self.assertTrue(result) ++ agent._add_error.assert_not_called() ++ ++ @patch("keylime.config.get") ++ def test_check_cert_compliance_non_compliant_cert_warn(self, mock_config): ++ """Test _check_cert_compliance with non-compliant certificate (warn mode).""" ++ mock_config.return_value = "warn" # Warn action ++ ++ agent = self.create_mock_registrar_agent() ++ agent.changes = {"ekcert": self.non_compliant_wrapped_cert} ++ agent.values = {} # No old certificate ++ ++ result = agent._check_cert_compliance("ekcert") ++ self.assertFalse(result) ++ agent._add_error.assert_not_called() # Should not add error in warn mode ++ ++ @patch("keylime.config.get") ++ def test_check_cert_compliance_non_compliant_cert_reject(self, mock_config): ++ """Test _check_cert_compliance with non-compliant certificate (reject mode).""" ++ mock_config.return_value = "reject" # Reject action ++ ++ agent = self.create_mock_registrar_agent() ++ agent.changes = {"ekcert": self.non_compliant_wrapped_cert} ++ agent.values = {} # No old certificate ++ ++ result = agent._check_cert_compliance("ekcert") ++ self.assertFalse(result) ++ agent._add_error.assert_called_once() # Should add error in reject mode ++ ++ @patch("keylime.config.get") ++ def test_check_all_cert_compliance_no_non_compliant(self, mock_config): ++ """Test _check_all_cert_compliance when all certificates are compliant.""" ++ mock_config.return_value = "warn" ++ ++ agent = self.create_mock_registrar_agent() ++ agent.changes = { ++ "ekcert": self.compliant_wrapped_cert, ++ "iak_cert": self.compliant_wrapped_cert, ++ } ++ agent.values = {} ++ ++ # Should not raise any exceptions or log warnings ++ with patch("keylime.models.registrar.registrar_agent.logger") as mock_logger: ++ agent._check_all_cert_compliance() ++ mock_logger.warning.assert_not_called() ++ mock_logger.error.assert_not_called() ++ ++ @patch("keylime.config.get") ++ def test_check_all_cert_compliance_with_non_compliant_warn(self, mock_config): ++ """Test _check_all_cert_compliance with non-compliant certificates (warn mode).""" ++ mock_config.return_value = "warn" ++ ++ agent = self.create_mock_registrar_agent() ++ agent.changes = { ++ "ekcert": self.non_compliant_wrapped_cert, ++ "iak_cert": self.compliant_wrapped_cert, ++ "idevid_cert": self.non_compliant_wrapped_cert, ++ } ++ agent.values = {} ++ ++ with patch("keylime.models.registrar.registrar_agent.logger") as mock_logger: ++ agent._check_all_cert_compliance() ++ # Should log warning for non-compliant certificates ++ mock_logger.warning.assert_called_once() ++ format_string = mock_logger.warning.call_args[0][0] ++ cert_names = mock_logger.warning.call_args[0][1] ++ self.assertIn("Certificate(s) %s may not conform", format_string) ++ self.assertEqual("'ekcert' and 'idevid_cert'", cert_names) ++ ++ @patch("keylime.config.get") ++ def test_check_all_cert_compliance_with_non_compliant_reject(self, mock_config): ++ """Test _check_all_cert_compliance with non-compliant certificates (reject mode).""" ++ mock_config.return_value = "reject" ++ ++ agent = self.create_mock_registrar_agent() ++ agent.changes = { ++ "ekcert": self.non_compliant_wrapped_cert, ++ "mtls_cert": self.non_compliant_wrapped_cert, ++ } ++ agent.values = {} ++ ++ with patch("keylime.models.registrar.registrar_agent.logger") as mock_logger: ++ agent._check_all_cert_compliance() ++ # Should log error for non-compliant certificates ++ mock_logger.error.assert_called_once() ++ format_string = mock_logger.error.call_args[0][0] ++ cert_names = mock_logger.error.call_args[0][1] ++ self.assertIn("Certificate(s) %s may not conform", format_string) ++ self.assertIn("were rejected due to config", format_string) ++ self.assertEqual("'ekcert' and 'mtls_cert'", cert_names) ++ ++ @patch("keylime.config.get") ++ def test_check_all_cert_compliance_ignore_mode(self, mock_config): ++ """Test _check_all_cert_compliance with ignore mode.""" ++ mock_config.return_value = "ignore" ++ ++ agent = self.create_mock_registrar_agent() ++ agent.changes = { ++ "ekcert": self.non_compliant_wrapped_cert, ++ "iak_cert": self.non_compliant_wrapped_cert, ++ } ++ agent.values = {} ++ ++ with patch("keylime.models.registrar.registrar_agent.logger") as mock_logger: ++ agent._check_all_cert_compliance() ++ # Should not log anything in ignore mode ++ mock_logger.warning.assert_not_called() ++ mock_logger.error.assert_not_called() ++ ++ def test_check_all_cert_compliance_single_non_compliant(self): ++ """Test _check_all_cert_compliance message formatting for single certificate.""" ++ agent = self.create_mock_registrar_agent() ++ agent.changes = {"ekcert": self.non_compliant_wrapped_cert} ++ agent.values = {} ++ ++ with patch("keylime.config.get", return_value="warn"): ++ with patch("keylime.models.registrar.registrar_agent.logger") as mock_logger: ++ agent._check_all_cert_compliance() ++ # Should format message correctly for single certificate ++ format_string = mock_logger.warning.call_args[0][0] ++ cert_names = mock_logger.warning.call_args[0][1] ++ self.assertIn("Certificate(s) %s may not conform", format_string) ++ self.assertEqual("'ekcert'", cert_names) ++ self.assertNotIn(" and", cert_names) # Should not have "and" for single cert ++ ++ def test_field_names_coverage(self): ++ """Test that all expected certificate field names are checked.""" ++ agent = self.create_mock_registrar_agent() ++ agent.changes = { ++ "ekcert": self.non_compliant_wrapped_cert, ++ "iak_cert": self.non_compliant_wrapped_cert, ++ "idevid_cert": self.non_compliant_wrapped_cert, ++ "mtls_cert": self.non_compliant_wrapped_cert, ++ } ++ agent.values = {} ++ ++ with patch("keylime.config.get", return_value="warn"): ++ with patch("keylime.models.registrar.registrar_agent.logger") as mock_logger: ++ agent._check_all_cert_compliance() ++ # Should check all four certificate fields ++ format_string = mock_logger.warning.call_args[0][0] ++ cert_names = mock_logger.warning.call_args[0][1] ++ self.assertIn("Certificate(s) %s may not conform", format_string) ++ expected_names = "'ekcert', 'iak_cert', 'idevid_cert' and 'mtls_cert'" ++ self.assertEqual(expected_names, cert_names) + + +if __name__ == "__main__": + unittest.main() - -From 4a4084ec081bf3b8caf2d8450db6a86bdbb45c77 Mon Sep 17 00:00:00 2001 -From: Anderson Toshiyuki Sasaki -Date: Mon, 25 Aug 2025 18:19:44 +0200 -Subject: [PATCH 2/2] registrar_agent: Use pyasn1 to parse PEM - -The models.types.Certificate render() method returns a PEM formatted -certificate. - -When the certificate is malformed, the original certificate is returned -by the render method(). - -Use pyasn1 to extract the DER content from the certificate to avoid -failing to parse due to python-cryptography strict ASN.1 parser. - -Signed-off-by: Anderson Toshiyuki Sasaki ---- - keylime/models/registrar/registrar_agent.py | 17 +++++++++-------- - 1 file changed, 9 insertions(+), 8 deletions(-) - -diff --git a/keylime/models/registrar/registrar_agent.py b/keylime/models/registrar/registrar_agent.py -index 58dd0854c..1d7d6ea05 100644 ---- a/keylime/models/registrar/registrar_agent.py -+++ b/keylime/models/registrar/registrar_agent.py -@@ -1,9 +1,11 @@ - import base64 - import hmac -+import io - - import cryptography.x509 - from cryptography.hazmat.primitives.asymmetric import ec, rsa - from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat -+from pyasn1_modules import pem as pyasn1_pem - - from keylime import cert_utils, config, crypto, keylime_logging - from keylime.models.base import ( -@@ -356,13 +358,12 @@ def render(self, only=None): - output = super().render(only) - - # When operating in pull mode, ekcert is encoded as Base64 instead of PEM -- if output.get("ekcert"): -- if self.ekcert.has_original_bytes: -- original_bytes = self.ekcert.get_original_bytes() -- if original_bytes is not None: -- # In case the cert was malformed, the original cert is -- # cached base64-encoded -- output["ekcert"] = original_bytes.decode("utf-8") -- output["ekcert"] = base64.b64encode(self.ekcert.public_bytes(Encoding.DER)).decode("utf-8") -+ ekcert = output.get("ekcert") -+ if ekcert: -+ der_data = pyasn1_pem.readPemFromFile(io.StringIO(ekcert)) -+ if der_data: -+ output["ekcert"] = base64.b64encode(der_data).decode("utf-8") -+ else: -+ self._add_error("ekcert", "Failed to parse certificate") - - return output diff --git a/keylime.spec b/keylime.spec index ceb6885..0ba90ce 100644 --- a/keylime.spec +++ b/keylime.spec @@ -9,7 +9,7 @@ Name: keylime Version: 7.12.1 -Release: 11%{?dist}.1 +Release: 11%{?dist}.2 Summary: Open source TPM software for Bootstrapping and Maintaining Trust URL: https://github.com/keylime/keylime @@ -434,6 +434,10 @@ fi %license LICENSE %changelog +* Mon Sep 15 2025 Anderson Toshiyuki Sasaki - 7.12.1-11.2 +- Properly fix the malformed certificate workaround + Resolves: RHEL-111244 + * Mon Aug 18 2025 Sergio Correia - 7.12.1-11 - Fix for revocation notifier not closing TLS session correctly Resolves: RHEL-109656