keylime/0014-Fix-registrar-duplicate-UUID-vulnerability.patch

1189 lines
51 KiB
Diff

From 2da614d212f58071f54c883ee368ffac4bc5e6b4 Mon Sep 17 00:00:00 2001
From: Sergio Correia <scorreia@redhat.com>
Date: Tue, 9 Dec 2025 12:12:22 +0000
Subject: [PATCH 14/14] Fix registrar duplicate UUID vulnerability
Backport upstream PR#1825
Signed-off-by: Sergio Correia <scorreia@redhat.com>
---
keylime/cmd/registrar.py | 6 +
keylime/models/registrar/registrar_agent.py | 116 +++++
keylime/shared_data.py | 6 +
keylime/web/registrar/agents_controller.py | 98 +++-
test/test_agents_controller.py | 513 ++++++++++++++++++++
test/test_registrar_tpm_identity.py | 342 +++++++++++++
6 files changed, 1071 insertions(+), 10 deletions(-)
create mode 100644 test/test_agents_controller.py
create mode 100644 test/test_registrar_tpm_identity.py
diff --git a/keylime/cmd/registrar.py b/keylime/cmd/registrar.py
index 584275a..2e2b25e 100644
--- a/keylime/cmd/registrar.py
+++ b/keylime/cmd/registrar.py
@@ -5,6 +5,7 @@ import cryptography
from keylime import config, keylime_logging
from keylime.common.migrations import apply
from keylime.models import da_manager, db_manager
+from keylime.shared_data import initialize_shared_memory
from keylime.web import RegistrarServer
logger = keylime_logging.init_logging("registrar")
@@ -47,6 +48,11 @@ def main() -> None:
# Prepare backend for durable attestation, if configured
da_manager.make_backend("registrar")
+ # Initialize shared memory for cross-process synchronization
+ # CRITICAL: Must be called before server.start_multi() to ensure all forked
+ # worker processes share the same manager instance for agent registration locks
+ initialize_shared_memory()
+
# Start HTTP server
server = RegistrarServer()
server.start_multi()
diff --git a/keylime/models/registrar/registrar_agent.py b/keylime/models/registrar/registrar_agent.py
index fc7e1be..e26ae41 100644
--- a/keylime/models/registrar/registrar_agent.py
+++ b/keylime/models/registrar/registrar_agent.py
@@ -65,8 +65,14 @@ class RegistrarAgent(PersistableModel):
def empty(cls):
agent = super().empty()
agent.provider_keys = {}
+ object.__setattr__(agent, "_tpm_identity_violation", False)
return agent
+ @property
+ def has_tpm_identity_violation(self):
+ """Returns True if a TPM identity violation was detected during validation."""
+ return getattr(self, "_tpm_identity_violation", False)
+
def _check_key_against_cert(self, tpm_key_field, cert_field):
# If neither key nor certificate is being updated, no need to check
if tpm_key_field not in self.changes and cert_field not in self.changes:
@@ -139,6 +145,111 @@ class RegistrarAgent(PersistableModel):
return compliant
+ def _check_tpm_identity_immutable(self):
+ """
+ Checks that TPM identity fields are not being changed during re-registration.
+
+ This prevents an attacker from registering with the same UUID but a different TPM,
+ which would allow them to impersonate the original agent and bypass attestation.
+
+ Checked fields (EK-based identity only):
+ - ek_tpm: Endorsement Key (primary TPM identity)
+ - ekcert: EK Certificate (binds EK to TPM manufacturer)
+ - aik_tpm: Attestation Key (bound to EK via MakeCredential/ActivateCredential)
+
+ Note: IAK/IDevID fields are NOT checked and can change on re-registration.
+
+ This check only applies to existing agents (those loaded from the database).
+ New agents created via RegistrarAgent.empty() have no committed values and are
+ allowed to set identity fields during initial registration.
+
+ If the agent needs to be registered with a new TPM (e.g., hardware replacement),
+ the old agent record must be explicitly deleted first.
+ """
+ # Define TPM identity fields that must remain immutable once set
+ # Only checking EK-based identity (ek_tpm, ekcert, aik_tpm)
+ # IAK/IDevID fields (iak_tpm, iak_cert, idevid_tpm, idevid_cert) are not checked
+ identity_fields = ["ek_tpm", "ekcert", "aik_tpm"]
+
+ # Only check for existing agents (those loaded from database)
+ # New agents created via empty() will have no committed values
+ if not self.committed:
+ return
+
+ # Track which fields have been changed
+ changed_fields = []
+
+ for field_name in identity_fields:
+ # Skip fields that are not being changed in this update
+ if field_name not in self.changes:
+ continue
+
+ # Get the old (committed/database) and new (proposed) values
+ old_value = self.committed.get(field_name)
+ new_value = self.changes.get(field_name)
+
+ # Allow setting a previously unset field (e.g., adding EK cert later)
+ if old_value is None:
+ continue
+
+ # Reject attempts to remove an already-set identity field
+ if new_value is None:
+ changed_fields.append(field_name)
+ continue
+
+ # Compare values based on field type
+ if field_name == "ekcert":
+ # For certificates, compare the actual certificate bytes
+ # Note: We compare full certificate, not just public key, because:
+ # 1. User requirement: reject if certificate changed even if same public key
+ # 2. Certificate contains more than just key (issuer, validity period, etc.)
+ # 3. Different cert for same key could indicate compromise or unauthorized replacement
+ try:
+ old_cert_bytes = old_value.public_bytes(Encoding.DER)
+ new_cert_bytes = new_value.public_bytes(Encoding.DER)
+
+ if old_cert_bytes != new_cert_bytes:
+ changed_fields.append(field_name)
+ except Exception:
+ # If we can't extract certificate bytes, treat as changed to be safe
+ changed_fields.append(field_name)
+ else:
+ # For TPM keys (ek_tpm, aik_tpm), compare as binary data
+ # These are Binary(persist_as=String) fields, so they could be bytes or base64 strings
+ try:
+ old_bytes = old_value if isinstance(old_value, bytes) else base64.b64decode(old_value)
+ new_bytes = new_value if isinstance(new_value, bytes) else base64.b64decode(new_value)
+
+ if old_bytes != new_bytes:
+ changed_fields.append(field_name)
+ except Exception:
+ # If comparison fails (e.g., invalid base64), treat as changed to be safe
+ changed_fields.append(field_name)
+
+ # If any TPM identity fields were changed, this is a security violation
+ if changed_fields:
+ # Set flag to indicate TPM identity violation occurred
+ object.__setattr__(self, "_tpm_identity_violation", True)
+
+ # Log security warning for audit trail
+ # Include agent_id and changed fields, but NOT the actual TPM values (sensitive data)
+ logger.warning(
+ "SECURITY: Rejected attempt to re-register agent '%s' with different TPM identity. "
+ "Changed fields: %s. This indicates a potential UUID spoofing attack. "
+ "The existing agent must be deleted before registering with a new TPM. "
+ "If this is unexpected, investigate for compromise.",
+ self.agent_id,
+ ", ".join(changed_fields),
+ )
+
+ # Add validation error to prevent registration
+ # Using "agent_id" field for the error because it's the UUID that's being improperly reused
+ self._add_error(
+ "agent_id",
+ f"cannot re-register with different TPM identity. Changed fields: {', '.join(changed_fields)}. "
+ "To register this UUID with a new TPM, delete the existing agent record first.",
+ )
+
def _check_all_cert_compliance(self):
non_compliant_certs = []
@@ -280,6 +391,11 @@ class RegistrarAgent(PersistableModel):
+ ["port", "mtls_cert"],
)
+ # SECURITY CHECK: Verify TPM identity is not being changed on re-registration
+ # This must happen after cast_changes() (so we have new values to compare)
+ # but before other validation (so we reject immediately without processing further)
+ self._check_tpm_identity_immutable()
+
# Log info about received EK or IAK/IDevID
self._log_root_identity()
# Verify EK as valid
diff --git a/keylime/shared_data.py b/keylime/shared_data.py
index 23a3d81..a415496 100644
--- a/keylime/shared_data.py
+++ b/keylime/shared_data.py
@@ -58,6 +58,12 @@ class FlatDictView:
with self._lock:
return self._store.get(self._make_key(key), default)
+ def pop(self, key: Any, default: Any = None) -> Any:
+ """Remove and return value for key, or default if key not present."""
+ flat_key = self._make_key(key)
+ with self._lock:
+ return self._store.pop(flat_key, default)
+
def keys(self) -> List[Any]:
"""Return keys in this namespace."""
prefix = f"dict:{self._namespace}:"
diff --git a/keylime/web/registrar/agents_controller.py b/keylime/web/registrar/agents_controller.py
index 9be2ef9..f2246de 100644
--- a/keylime/web/registrar/agents_controller.py
+++ b/keylime/web/registrar/agents_controller.py
@@ -1,5 +1,8 @@
+from sqlalchemy.exc import IntegrityError
+
from keylime import keylime_logging
from keylime.models import RegistrarAgent
+from keylime.shared_data import get_shared_memory
from keylime.web.base import Controller
logger = keylime_logging.init_logging("registrar")
@@ -28,16 +31,91 @@ class AgentsController(Controller):
# POST /v2[.:minor]/agents/[:agent_id]
def create(self, agent_id, **params):
- agent = RegistrarAgent.get(agent_id) or RegistrarAgent.empty() # type: ignore[no-untyped-call]
- agent.update({"agent_id": agent_id, **params})
- challenge = agent.produce_ak_challenge()
-
- if not challenge or not agent.changes_valid:
- self.log_model_errors(agent, logger)
- self.respond(400, "Could not register agent with invalid data")
- return
-
- agent.commit_changes()
+ """Register a new agent or re-register an existing agent.
+
+ For new agents, this:
+ 1. Validates TPM identity (EK/AIK or IAK/IDevID)
+ 2. Generates an AK challenge encrypted with the EK
+ 3. Stores agent record in pending state
+ 4. Returns challenge blob to agent
+
+ For existing agents (re-registration with same UUID):
+ 1. Verifies TPM identity has not changed (security check)
+ 2. If identity changed: rejects with 403 Forbidden
+ 3. If identity same: allows re-registration (e.g., after agent restart)
+
+ Security: Re-registration with a different TPM is forbidden to prevent
+ UUID spoofing attacks where an attacker could impersonate a legitimate
+ agent by reusing its UUID.
+
+ Race condition protection: Uses per-agent locks from SharedDataManager to prevent
+ race conditions between concurrent registration requests for the same agent_id.
+ This ensures the check-validate-commit sequence is atomic. Additionally, database
+ constraint violations (e.g., duplicate UUIDs from concurrent requests) are caught
+ and returned as 403 Forbidden.
+ """
+ # Get shared memory manager and per-agent lock storage
+ shared_mem = get_shared_memory()
+ agent_locks = shared_mem.get_or_create_dict("agent_registration_locks")
+
+ # Get or create a lock specific to this agent_id
+ if agent_id not in agent_locks:
+ agent_locks[agent_id] = shared_mem.manager.Lock()
+
+ agent_lock = agent_locks[agent_id]
+
+ # CRITICAL SECTION: Acquire lock to make check-validate-commit atomic
+ with agent_lock:
+ # Step 1: Load existing agent or create new one (inside lock)
+ agent = RegistrarAgent.get(agent_id) or RegistrarAgent.empty() # type: ignore[no-untyped-call]
+
+ # Step 2: Update agent with new data and validate (inside lock)
+ agent.update({"agent_id": agent_id, **params})
+
+ # Step 3: Check for TPM identity change security violation
+ # Use explicit flag instead of fragile string matching for security check
+ if not agent.changes_valid and agent.has_tpm_identity_violation:
+ # Log the validation errors (includes security warning)
+ self.log_model_errors(agent, logger)
+
+ # Return 403 Forbidden
+ # 403 indicates a policy violation, not a malformed request
+ self.respond(403, "Agent re-registration with different TPM identity is forbidden for security reasons")
+ return
+
+ # Step 4: Generate AK challenge (inside lock)
+ challenge = agent.produce_ak_challenge()
+
+ # Step 5: Check for any validation errors or challenge generation failure
+ if not challenge or not agent.changes_valid:
+ self.log_model_errors(agent, logger)
+ self.respond(400, "Could not register agent with invalid data")
+ return
+
+ # Step 6: Commit to database (inside lock)
+ # This ensures no other request can modify the agent between validation and commit
+ try:
+ agent.commit_changes()
+ except IntegrityError as e:
+ # Database constraint violation - most likely duplicate agent_id
+ # This can happen if two requests try to register the same new UUID simultaneously
+ # and both pass validation before either commits (database race condition)
+ logger.warning(
+ "SECURITY: Agent registration failed due to database constraint violation for agent_id '%s'. "
+ "This UUID may already be registered by a concurrent request or the agent already exists. "
+ "Database error: %s",
+ agent_id,
+ str(e),
+ )
+ self.respond(
+ 403,
+ f"Agent with UUID '{agent_id}' cannot be registered. "
+ "This UUID is already in use or a concurrent registration is in progress.",
+ )
+ return
+
+ # Lock released - safe to respond to client
+ # Return challenge blob for agent to decrypt
self.respond(200, "Success", {"blob": challenge})
# DELETE /v2[.:minor]/agents/:agent_id/
diff --git a/test/test_agents_controller.py b/test/test_agents_controller.py
new file mode 100644
index 0000000..898d8f0
--- /dev/null
+++ b/test/test_agents_controller.py
@@ -0,0 +1,513 @@
+"""Unit tests for AgentsController (registrar).
+
+Tests the registrar's agent registration endpoints, including the
+security fix that prevents UUID spoofing via re-registration with
+a different TPM identity.
+"""
+
+# type: ignore - Controller methods are dynamically bound
+
+import unittest
+from typing import cast
+from unittest.mock import MagicMock, patch
+
+from sqlalchemy.exc import IntegrityError
+
+from keylime.web.registrar.agents_controller import AgentsController
+
+
+class TestAgentsControllerIndex(unittest.TestCase):
+ """Test cases for AgentsController.index()."""
+
+ def setUp(self):
+ """Set up test fixtures."""
+ mock_action_handler = MagicMock()
+ self.controller = cast(AgentsController, AgentsController(mock_action_handler))
+ self.controller.respond = MagicMock()
+
+ @patch("keylime.models.RegistrarAgent.all_ids")
+ def test_index_success(self, mock_all_ids):
+ """Test successful retrieval of all agent IDs."""
+ mock_all_ids.return_value = ["agent-1", "agent-2", "agent-3"]
+
+ self.controller.index()
+
+ self.controller.respond.assert_called_once_with(200, "Success", {"uuids": ["agent-1", "agent-2", "agent-3"]}) # type: ignore[attr-defined]
+
+
+class TestAgentsControllerShow(unittest.TestCase):
+ """Test cases for AgentsController.show()."""
+
+ def setUp(self):
+ """Set up test fixtures."""
+ mock_action_handler = MagicMock()
+ self.controller = cast(AgentsController, AgentsController(mock_action_handler))
+ self.controller.respond = MagicMock()
+ self.test_agent_id = "test-agent-123"
+
+ @patch("keylime.models.RegistrarAgent.get")
+ def test_show_not_found(self, mock_get):
+ """Test show with non-existent agent."""
+ mock_get.return_value = None
+
+ self.controller.show(self.test_agent_id)
+
+ self.controller.respond.assert_called_once_with(404, f"Agent with ID '{self.test_agent_id}' not found") # type: ignore[attr-defined]
+
+ @patch("keylime.models.RegistrarAgent.get")
+ def test_show_not_active(self, mock_get):
+ """Test show with inactive agent."""
+ mock_agent = MagicMock()
+ mock_agent.active = False
+ mock_get.return_value = mock_agent
+
+ self.controller.show(self.test_agent_id)
+
+ self.controller.respond.assert_called_once_with( # type: ignore[attr-defined]
+ 404, f"Agent with ID '{self.test_agent_id}' has not been activated"
+ )
+
+ @patch("keylime.models.RegistrarAgent.get")
+ def test_show_success(self, mock_get):
+ """Test successful show of active agent."""
+ mock_agent = MagicMock()
+ mock_agent.active = True
+ mock_agent.render.return_value = {"agent_id": self.test_agent_id, "active": True}
+ mock_get.return_value = mock_agent
+
+ self.controller.show(self.test_agent_id)
+
+ self.controller.respond.assert_called_once_with( # type: ignore[attr-defined]
+ 200, "Success", {"agent_id": self.test_agent_id, "active": True}
+ )
+
+
+class TestAgentsControllerCreate(unittest.TestCase):
+ """Test cases for AgentsController.create() - the main registration endpoint."""
+
+ def setUp(self):
+ """Set up test fixtures."""
+ mock_action_handler = MagicMock()
+ self.controller = cast(AgentsController, AgentsController(mock_action_handler))
+ self.controller.respond = MagicMock()
+ self.controller.log_model_errors = MagicMock()
+ self.test_agent_id = "test-agent-123"
+
+ @patch("keylime.models.RegistrarAgent.get")
+ def test_create_new_agent_success(self, mock_get):
+ """Test successful registration of a new agent."""
+ # Mock that agent doesn't exist yet
+ mock_get.return_value = None
+
+ # Create mock agent that will be returned by empty()
+ mock_agent = MagicMock()
+ mock_agent.changes_valid = True
+ mock_agent.errors = {}
+ mock_agent.produce_ak_challenge.return_value = "challenge_blob_data"
+
+ # Patch RegistrarAgent.empty to return our mock
+ with patch("keylime.models.RegistrarAgent.empty", return_value=mock_agent):
+ params = {"ek_tpm": "ek_key", "aik_tpm": "aik_key"}
+ self.controller.create(self.test_agent_id, **params)
+
+ # Verify agent was updated with params
+ mock_agent.update.assert_called_once_with({"agent_id": self.test_agent_id, **params})
+
+ # Verify challenge was generated
+ mock_agent.produce_ak_challenge.assert_called_once()
+
+ # Verify agent was saved
+ mock_agent.commit_changes.assert_called_once()
+
+ # Verify 200 response with challenge
+ self.controller.respond.assert_called_once_with(200, "Success", {"blob": "challenge_blob_data"}) # type: ignore[attr-defined]
+
+ @patch("keylime.models.RegistrarAgent.get")
+ def test_create_reregistration_same_tpm_identity(self, mock_get):
+ """Test successful re-registration with same TPM identity."""
+ # Mock existing agent
+ mock_existing_agent = MagicMock()
+ mock_existing_agent.changes_valid = True
+ mock_existing_agent.errors = {}
+ mock_existing_agent.produce_ak_challenge.return_value = "challenge_blob_data"
+ mock_get.return_value = mock_existing_agent
+
+ params = {"ek_tpm": "same_ek_key", "aik_tpm": "same_aik_key"}
+ self.controller.create(self.test_agent_id, **params)
+
+ # Verify agent was updated
+ mock_existing_agent.update.assert_called_once_with({"agent_id": self.test_agent_id, **params})
+
+ # Verify challenge was generated
+ mock_existing_agent.produce_ak_challenge.assert_called_once()
+
+ # Verify agent was saved
+ mock_existing_agent.commit_changes.assert_called_once()
+
+ # Verify 200 response
+ self.controller.respond.assert_called_once_with(200, "Success", {"blob": "challenge_blob_data"}) # type: ignore[attr-defined]
+
+ @patch("keylime.models.RegistrarAgent.get")
+ def test_create_reregistration_different_tpm_identity_forbidden(self, mock_get):
+ """Test re-registration with different TPM identity is rejected with 403.
+
+ This is the key security fix: preventing UUID spoofing by rejecting
+ attempts to re-register an agent with a different TPM identity.
+ """
+ # Mock existing agent
+ mock_existing_agent = MagicMock()
+ mock_existing_agent.changes_valid = False # Validation failed
+ # Simulate the error added by _check_tpm_identity_immutable
+ mock_existing_agent.errors = {
+ "agent_id": [
+ "Agent re-registration attempted with different TPM identity (changed fields: ek_tpm). "
+ "This is a security violation - the same agent UUID cannot be reused with a different TPM."
+ ]
+ }
+ mock_get.return_value = mock_existing_agent
+
+ params = {"ek_tpm": "different_ek_key", "aik_tpm": "same_aik_key"}
+ self.controller.create(self.test_agent_id, **params)
+
+ # Verify agent was updated (which triggers validation)
+ mock_existing_agent.update.assert_called_once_with({"agent_id": self.test_agent_id, **params})
+
+ # Verify errors were logged
+ self.controller.log_model_errors.assert_called_once() # type: ignore[attr-defined]
+
+ # Verify 403 Forbidden response (not 400!)
+ self.controller.respond.assert_called_once_with( # type: ignore[attr-defined]
+ 403, "Agent re-registration with different TPM identity is forbidden for security reasons"
+ )
+
+ # Verify agent was NOT saved
+ mock_existing_agent.commit_changes.assert_not_called()
+
+ @patch("keylime.models.RegistrarAgent.get")
+ def test_create_invalid_data_other_validation_error(self, mock_get):
+ """Test registration with other validation errors returns 400."""
+ # Mock agent with validation errors (not TPM identity related)
+ mock_agent = MagicMock()
+ mock_agent.changes_valid = False
+ # Error not related to TPM identity
+ mock_agent.errors = {"ek_tpm": ["must be a valid TPM2B_PUBLIC structure"]}
+ mock_agent.has_tpm_identity_violation = False # Not a TPM identity violation
+ mock_agent.produce_ak_challenge.return_value = None
+ mock_get.return_value = None
+
+ with patch("keylime.models.RegistrarAgent.empty", return_value=mock_agent):
+ params = {"ek_tpm": "invalid_ek_format"}
+ self.controller.create(self.test_agent_id, **params)
+
+ # Verify errors were logged
+ self.controller.log_model_errors.assert_called_once() # type: ignore[attr-defined]
+
+ # Verify 400 Bad Request (not 403)
+ self.controller.respond.assert_called_once_with(400, "Could not register agent with invalid data") # type: ignore[attr-defined]
+
+ # Verify agent was NOT saved
+ mock_agent.commit_changes.assert_not_called()
+
+ @patch("keylime.models.RegistrarAgent.get")
+ def test_create_challenge_generation_failure(self, mock_get):
+ """Test registration fails if challenge generation fails."""
+ # Mock agent where challenge generation fails
+ mock_agent = MagicMock()
+ mock_agent.changes_valid = True
+ mock_agent.errors = {}
+ mock_agent.produce_ak_challenge.return_value = None # Challenge generation failed
+ mock_get.return_value = None
+
+ with patch("keylime.models.RegistrarAgent.empty", return_value=mock_agent):
+ params = {"ek_tpm": "ek_key", "aik_tpm": "aik_key"}
+ self.controller.create(self.test_agent_id, **params)
+
+ # Verify errors were logged
+ self.controller.log_model_errors.assert_called_once() # type: ignore[attr-defined]
+
+ # Verify 400 response
+ self.controller.respond.assert_called_once_with(400, "Could not register agent with invalid data") # type: ignore[attr-defined]
+
+ # Verify agent was NOT saved
+ mock_agent.commit_changes.assert_not_called()
+
+ @patch("keylime.models.RegistrarAgent.get")
+ def test_create_validation_error_with_agent_id_but_not_tpm_identity(self, mock_get):
+ """Test that agent_id errors unrelated to TPM identity get 400, not 403."""
+ # Mock agent with agent_id error, but not about TPM identity
+ mock_agent = MagicMock()
+ mock_agent.changes_valid = False
+ mock_agent.errors = {"agent_id": ["must be a valid UUID format"]} # Not about TPM identity
+ mock_agent.has_tpm_identity_violation = False # Not a TPM identity violation
+ mock_agent.produce_ak_challenge.return_value = None
+ mock_get.return_value = None
+
+ with patch("keylime.models.RegistrarAgent.empty", return_value=mock_agent):
+ params = {"ek_tpm": "ek_key", "aik_tpm": "aik_key"}
+ self.controller.create(self.test_agent_id, **params)
+
+ # Verify 400 Bad Request (not 403) because it's not a TPM identity violation
+ self.controller.respond.assert_called_once_with(400, "Could not register agent with invalid data") # type: ignore[attr-defined]
+
+
+class TestAgentsControllerDelete(unittest.TestCase):
+ """Test cases for AgentsController.delete()."""
+
+ def setUp(self):
+ """Set up test fixtures."""
+ mock_action_handler = MagicMock()
+ self.controller = cast(AgentsController, AgentsController(mock_action_handler))
+ self.controller.respond = MagicMock()
+ self.test_agent_id = "test-agent-123"
+
+ @patch("keylime.models.RegistrarAgent.get")
+ def test_delete_not_found(self, mock_get):
+ """Test delete with non-existent agent."""
+ mock_get.return_value = None
+
+ self.controller.delete(self.test_agent_id)
+
+ self.controller.respond.assert_called_once_with(404, f"Agent with ID '{self.test_agent_id}' not found") # type: ignore[attr-defined]
+
+ @patch("keylime.models.RegistrarAgent.get")
+ def test_delete_success(self, mock_get):
+ """Test successful agent deletion."""
+ mock_agent = MagicMock()
+ mock_get.return_value = mock_agent
+
+ self.controller.delete(self.test_agent_id)
+
+ # Verify agent was deleted
+ mock_agent.delete.assert_called_once()
+
+ # Verify 200 response
+ self.controller.respond.assert_called_once_with(200, "Success") # type: ignore[attr-defined]
+
+
+class TestAgentsControllerActivate(unittest.TestCase):
+ """Test cases for AgentsController.activate()."""
+
+ def setUp(self):
+ """Set up test fixtures."""
+ mock_action_handler = MagicMock()
+ self.controller = cast(AgentsController, AgentsController(mock_action_handler))
+ self.controller.respond = MagicMock()
+ self.test_agent_id = "test-agent-123"
+ self.test_auth_tag = "valid_auth_tag"
+
+ @patch("keylime.models.RegistrarAgent.get")
+ def test_activate_not_found(self, mock_get):
+ """Test activate with non-existent agent."""
+ mock_get.return_value = None
+
+ self.controller.activate(self.test_agent_id, self.test_auth_tag)
+
+ self.controller.respond.assert_called_once_with(404, f"Agent with ID '{self.test_agent_id}' not found") # type: ignore[attr-defined]
+
+ @patch("keylime.models.RegistrarAgent.get")
+ def test_activate_success(self, mock_get):
+ """Test successful agent activation."""
+ mock_agent = MagicMock()
+ mock_agent.verify_ak_response.return_value = True # Auth tag is valid
+ mock_get.return_value = mock_agent
+
+ self.controller.activate(self.test_agent_id, self.test_auth_tag)
+
+ # Verify auth tag was verified
+ mock_agent.verify_ak_response.assert_called_once_with(self.test_auth_tag)
+
+ # Verify agent was saved
+ mock_agent.commit_changes.assert_called_once()
+
+ # Verify 200 response
+ self.controller.respond.assert_called_once_with(200, "Success") # type: ignore[attr-defined]
+
+ @patch("keylime.models.RegistrarAgent.get")
+ def test_activate_invalid_auth_tag(self, mock_get):
+ """Test activation with invalid auth tag."""
+ mock_agent = MagicMock()
+ mock_agent.verify_ak_response.return_value = False # Auth tag is invalid
+ mock_get.return_value = mock_agent
+
+ self.controller.activate(self.test_agent_id, self.test_auth_tag)
+
+ # Verify auth tag was verified
+ mock_agent.verify_ak_response.assert_called_once_with(self.test_auth_tag)
+
+ # Verify agent was deleted (due to failed activation)
+ mock_agent.delete.assert_called_once()
+
+ # Verify agent was NOT saved
+ mock_agent.commit_changes.assert_not_called()
+
+ # Verify 400 response with detailed error message
+ self.controller.respond.assert_called_once() # type: ignore[attr-defined]
+ call_args = self.controller.respond.call_args # type: ignore[attr-defined]
+ self.assertEqual(call_args[0][0], 400)
+ self.assertIn(self.test_auth_tag, call_args[0][1])
+ self.assertIn(self.test_agent_id, call_args[0][1])
+ self.assertIn("deleted", call_args[0][1])
+
+
+class TestAgentsControllerConcurrency(unittest.TestCase):
+ """Test cases for concurrent registration TOCTOU race condition protection."""
+
+ def setUp(self):
+ """Set up test fixtures."""
+ mock_action_handler = MagicMock()
+ self.controller = cast(AgentsController, AgentsController(mock_action_handler))
+ self.controller.respond = MagicMock()
+ self.controller.log_model_errors = MagicMock()
+ self.test_agent_id = "concurrent-test-agent"
+
+ @patch("keylime.web.registrar.agents_controller.get_shared_memory")
+ @patch("keylime.models.RegistrarAgent.get")
+ def test_concurrent_registration_uses_locking(self, mock_get, mock_shared_mem):
+ """Test that concurrent registration attempts use per-agent locking.
+
+ This test verifies that the locking mechanism is invoked to prevent
+ TOCTOU race conditions during concurrent registration.
+ """
+ # Mock shared memory manager with lock support
+ mock_manager = MagicMock()
+ mock_lock = MagicMock()
+ mock_lock.__enter__ = MagicMock(return_value=None)
+ mock_lock.__exit__ = MagicMock(return_value=None)
+ mock_manager.Lock.return_value = mock_lock
+
+ mock_agent_locks = MagicMock()
+ mock_agent_locks.__contains__ = MagicMock(return_value=False)
+ mock_agent_locks.__setitem__ = MagicMock()
+ mock_agent_locks.__getitem__ = MagicMock(return_value=mock_lock)
+
+ mock_shared_mem.return_value.get_or_create_dict.return_value = mock_agent_locks
+ mock_shared_mem.return_value.manager = mock_manager
+
+ # Mock agent that doesn't exist yet (new registration)
+ mock_get.return_value = None
+
+ mock_agent = MagicMock()
+ mock_agent.changes_valid = True
+ mock_agent.errors = {}
+ mock_agent.produce_ak_challenge.return_value = "challenge_blob"
+
+ with patch("keylime.models.RegistrarAgent.empty", return_value=mock_agent):
+ params = {"ek_tpm": "ek_key", "aik_tpm": "aik_key"}
+ self.controller.create(self.test_agent_id, **params)
+
+ # Verify lock was acquired and released
+ mock_lock.__enter__.assert_called_once()
+ mock_lock.__exit__.assert_called_once()
+
+ # Verify successful registration
+ mock_agent.commit_changes.assert_called_once()
+ self.controller.respond.assert_called_once_with(200, "Success", {"blob": "challenge_blob"}) # type: ignore[attr-defined]
+
+ @patch("keylime.web.registrar.agents_controller.get_shared_memory")
+ @patch("keylime.models.RegistrarAgent.get")
+ def test_different_agents_use_different_locks(self, mock_get, mock_shared_mem):
+ """Test that different agent_ids use different locks for parallel registration.
+
+ This ensures that registrations for different agents don't block each other,
+ only concurrent registrations for the same agent_id are serialized.
+ """
+ # Mock shared memory manager
+ mock_manager = MagicMock()
+
+ def mock_lock_factory():
+ """Return a new lock each time."""
+ return MagicMock()
+
+ mock_manager.Lock.side_effect = mock_lock_factory
+
+ mock_agent_locks = {}
+
+ def mock_getitem(_self, key): # pylint: disable=unused-argument
+ return mock_agent_locks.get(key)
+
+ def mock_setitem(_self, key, value): # pylint: disable=unused-argument
+ mock_agent_locks[key] = value
+
+ def mock_contains(_self, key): # pylint: disable=unused-argument
+ return key in mock_agent_locks
+
+ mock_locks_dict = MagicMock()
+ mock_locks_dict.__contains__ = mock_contains
+ mock_locks_dict.__setitem__ = mock_setitem
+ mock_locks_dict.__getitem__ = mock_getitem
+
+ mock_shared_mem.return_value.get_or_create_dict.return_value = mock_locks_dict
+ mock_shared_mem.return_value.manager = mock_manager
+
+ # Register two different agents
+ mock_get.return_value = None
+
+ for agent_id in ["agent-a", "agent-b"]:
+ mock_agent = MagicMock()
+ mock_agent.changes_valid = True
+ mock_agent.errors = {}
+ mock_agent.produce_ak_challenge.return_value = f"challenge_{agent_id}"
+
+ with patch("keylime.models.RegistrarAgent.empty", return_value=mock_agent):
+ self.controller.respond = MagicMock() # Reset for each call
+ params = {"ek_tpm": f"ek_{agent_id}", "aik_tpm": f"aik_{agent_id}"}
+ self.controller.create(agent_id, **params)
+
+ # Verify that two different locks were created (one per agent)
+ self.assertEqual(len(mock_agent_locks), 2)
+ self.assertIn("agent-a", mock_agent_locks)
+ self.assertIn("agent-b", mock_agent_locks)
+ # Verify they are different lock objects
+ self.assertIsNot(mock_agent_locks["agent-a"], mock_agent_locks["agent-b"])
+
+ @patch("keylime.web.registrar.agents_controller.get_shared_memory")
+ @patch("keylime.models.RegistrarAgent.get")
+ def test_concurrent_new_registration_database_constraint_violation(self, mock_get, mock_shared_mem):
+ """Test that database constraint violations during concurrent new agent registration return 403.
+
+ This handles the edge case where two requests both create empty agents for the same UUID,
+ both pass validation, but the second commit fails with IntegrityError due to duplicate
+ primary key. This should return 403 Forbidden, not 500 Internal Server Error.
+ """
+ # Mock shared memory manager with lock support
+ mock_manager = MagicMock()
+ mock_lock = MagicMock()
+ mock_lock.__enter__ = MagicMock(return_value=None)
+ mock_lock.__exit__ = MagicMock(return_value=None)
+ mock_manager.Lock.return_value = mock_lock
+
+ mock_agent_locks = MagicMock()
+ mock_agent_locks.__contains__ = MagicMock(return_value=False)
+ mock_agent_locks.__setitem__ = MagicMock()
+ mock_agent_locks.__getitem__ = MagicMock(return_value=mock_lock)
+
+ mock_shared_mem.return_value.get_or_create_dict.return_value = mock_agent_locks
+ mock_shared_mem.return_value.manager = mock_manager
+
+ # Mock agent that doesn't exist yet (new registration)
+ mock_get.return_value = None
+
+ mock_agent = MagicMock()
+ mock_agent.changes_valid = True
+ mock_agent.errors = {}
+ mock_agent.produce_ak_challenge.return_value = "challenge_blob"
+
+ # Simulate IntegrityError during commit (duplicate primary key)
+ # IntegrityError(statement, params, orig) where orig is the original exception
+ orig_exception = Exception("UNIQUE constraint failed: registrarmain.agent_id")
+ mock_agent.commit_changes.side_effect = IntegrityError("INSERT INTO registrarmain ...", None, orig_exception)
+
+ with patch("keylime.models.RegistrarAgent.empty", return_value=mock_agent):
+ params = {"ek_tpm": "ek_key", "aik_tpm": "aik_key"}
+ self.controller.create(self.test_agent_id, **params)
+
+ # Verify 403 Forbidden response (not 500)
+ self.controller.respond.assert_called_once() # type: ignore[attr-defined]
+ call_args = self.controller.respond.call_args # type: ignore[attr-defined]
+ self.assertEqual(call_args[0][0], 403)
+ self.assertIn(self.test_agent_id, call_args[0][1])
+ self.assertIn("already in use", call_args[0][1])
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/test/test_registrar_tpm_identity.py b/test/test_registrar_tpm_identity.py
new file mode 100644
index 0000000..2fc69b2
--- /dev/null
+++ b/test/test_registrar_tpm_identity.py
@@ -0,0 +1,342 @@
+"""
+Unit tests for RegistrarAgent TPM identity immutability security check.
+
+This module tests the _check_tpm_identity_immutable() method which prevents
+UUID spoofing attacks by rejecting re-registration attempts with different TPM identities.
+"""
+
+import base64
+import types
+import unittest
+from unittest.mock import Mock
+
+import cryptography.x509
+
+from keylime.certificate_wrapper import wrap_certificate
+from keylime.models.registrar.registrar_agent import RegistrarAgent
+
+
+class TestRegistrarAgentTPMIdentity(unittest.TestCase):
+ """Test cases for RegistrarAgent TPM identity immutability."""
+
+ # pylint: disable=protected-access # Testing protected methods
+ # pylint: disable=not-callable # False positive: methods bound via types.MethodType are callable
+
+ def setUp(self):
+ """Set up test fixtures."""
+ # EK certificate (used for testing certificate comparison)
+ self.ek_cert_pem = """-----BEGIN CERTIFICATE-----
+MIIEnzCCA4egAwIBAgIEMV64bDANBgkqhkiG9w0BAQUFADBtMQswCQYDVQQGEwJE
+RTEQMA4GA1UECBMHQmF2YXJpYTEhMB8GA1UEChMYSW5maW5lb24gVGVjaG5vbG9n
+aWVzIEFHMQwwCgYDVQQLEwNBSU0xGzAZBgNVBAMTEklGWCBUUE0gRUsgUm9vdCBD
+QTAeFw0wNTEwMjAxMzQ3NDNaFw0yNTEwMjAxMzQ3NDNaMHcxCzAJBgNVBAYTAkRF
+MQ8wDQYDVQQIEwZTYXhvbnkxITAfBgNVBAoTGEluZmluZW9uIFRlY2hub2xvZ2ll
+cyBBRzEMMAoGA1UECxMDQUlNMSYwJAYDVQQDEx1JRlggVFBNIEVLIEludGVybWVk
+aWF0ZSBDQSAwMTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBALftPhYN
+t4rE+JnU/XOPICbOBLvfo6iA7nuq7zf4DzsAWBdsZEdFJQfaK331ihG3IpQnlQ2i
+YtDim289265f0J4OkPFpKeFU27CsfozVaNUm6UR/uzwA8ncxFc3iZLRMRNLru/Al
+VG053ULVDQMVx2iwwbBSAYO9pGiGbk1iMmuZaSErMdb9v0KRUyZM7yABiyDlM3cz
+UQX5vLWV0uWqxdGoHwNva5u3ynP9UxPTZWHZOHE6+14rMzpobs6Ww2RR8BgF96rh
+4rRAZEl8BXhwiQq4STvUXkfvdpWH4lzsGcDDtrB6Nt3KvVNvsKz+b07Dk+Xzt+EH
+NTf3Byk2HlvX+scCAwEAAaOCATswggE3MB0GA1UdDgQWBBQ4k8292HPEIzMV4bE7
+qWoNI8wQxzAOBgNVHQ8BAf8EBAMCAgQwEgYDVR0TAQH/BAgwBgEB/wIBADBYBgNV
+HSABAf8ETjBMMEoGC2CGSAGG+EUBBy8BMDswOQYIKwYBBQUHAgEWLWh0dHA6Ly93
+d3cudmVyaXNpZ24uY29tL3JlcG9zaXRvcnkvaW5kZXguaHRtbDCBlwYDVR0jBIGP
+MIGMgBRW65FEhWPWcrOu1EWWC/eUDlRCpqFxpG8wbTELMAkGA1UEBhMCREUxEDAO
+BgNVBAgTB0JhdmFyaWExITAfBgNVBAoTGEluZmluZW9uIFRlY2hub2xvZ2llcyBB
+RzEMMAoGA1UECxMDQUlNMRswGQYDVQQDExJJRlggVFBNIEVLIFJvb3QgQ0GCAQMw
+DQYJKoZIhvcNAQEFBQADggEBABJ1+Ap3rNlxZ0FW0aIgdzktbNHlvXWNxFdYIBbM
+OKjmbOos0Y4O60eKPu259XmMItCUmtbzF3oKYXq6ybARUT2Lm+JsseMF5VgikSlU
+BJALqpKVjwAds81OtmnIQe2LSu4xcTSavpsL4f52cUAu/maMhtSgN9mq5roYptq9
+DnSSDZrX4uYiMPl//rBaNDBflhJ727j8xo9CCohF3yQUoQm7coUgbRMzyO64yMIO
+3fhb+Vuc7sNwrMOz3VJN14C3JMoGgXy0c57IP/kD5zGRvljKEvrRC2I147+fPeLS
+DueRMS6lblvRKiZgmGAg7YaKOkOaEmVDMQ+fTo2Po7hI5wc=
+-----END CERTIFICATE-----"""
+
+ # Create wrapped cert from real certificate
+ self.ek_cert = cryptography.x509.load_pem_x509_certificate(self.ek_cert_pem.encode())
+ self.ek_cert_wrapped = wrap_certificate(self.ek_cert, None)
+
+ # Create a different cert mock that returns different DER bytes
+ self.different_ek_cert_wrapped = Mock()
+ self.different_ek_cert_wrapped.public_bytes = Mock(return_value=b"DIFFERENT_CERTIFICATE_DER_BYTES_FOR_TESTING")
+
+ # Sample TPM keys (base64 encoded for simplicity in tests)
+ self.ek_tpm_1 = b"EK_TPM_KEY_NUMBER_ONE_SAMPLE_DATA"
+ self.ek_tpm_2 = b"EK_TPM_KEY_NUMBER_TWO_DIFFERENT_"
+ self.aik_tpm_1 = b"AIK_TPM_KEY_NUMBER_ONE_SAMPLE_DATA"
+ self.aik_tpm_2 = b"AIK_TPM_KEY_NUMBER_TWO_DIFFERENT_"
+
+ # IAK/IDevID keys for testing that they are not checked
+ self.iak_tpm_1 = b"IAK_TPM_KEY_NUMBER_ONE"
+ self.iak_tpm_2 = b"IAK_TPM_KEY_NUMBER_TWO"
+
+ def create_mock_registrar_agent(self, agent_id="test-agent-uuid"):
+ """Create a mock RegistrarAgent with necessary attributes."""
+ agent = Mock()
+ agent.agent_id = agent_id
+ agent.changes = {}
+ agent.values = {}
+ agent.committed = {}
+ agent._add_error = Mock()
+ agent.errors = {}
+
+ # Bind the actual method to the mock instance
+ agent._check_tpm_identity_immutable = types.MethodType(RegistrarAgent._check_tpm_identity_immutable, agent)
+
+ return agent
+
+ def test_new_agent_no_committed_values(self):
+ """Test that new agents (no committed values) are not checked."""
+ agent = self.create_mock_registrar_agent()
+ agent.committed = {} # New agent, no previous values
+ agent.changes = {
+ "ek_tpm": self.ek_tpm_1,
+ "ekcert": self.ek_cert_wrapped,
+ "aik_tpm": self.aik_tpm_1,
+ }
+
+ agent._check_tpm_identity_immutable()
+
+ # Should not add any errors for new agents
+ agent._add_error.assert_not_called()
+
+ def test_reregistration_same_tpm_all_fields_identical(self):
+ """Test re-registration with identical TPM identity passes."""
+ agent = self.create_mock_registrar_agent()
+ agent.committed = {
+ "ek_tpm": self.ek_tpm_1,
+ "ekcert": self.ek_cert_wrapped,
+ "aik_tpm": self.aik_tpm_1,
+ }
+ agent.changes = {
+ "ek_tpm": self.ek_tpm_1, # Same
+ "ekcert": self.ek_cert_wrapped, # Same
+ "aik_tpm": self.aik_tpm_1, # Same
+ }
+
+ agent._check_tpm_identity_immutable()
+
+ # Should not add any errors
+ agent._add_error.assert_not_called()
+
+ def test_reregistration_different_ek_tpm(self):
+ """Test re-registration with different EK TPM is rejected."""
+ agent = self.create_mock_registrar_agent()
+ agent.committed = {
+ "ek_tpm": self.ek_tpm_1,
+ "ekcert": self.ek_cert_wrapped,
+ "aik_tpm": self.aik_tpm_1,
+ }
+ agent.changes = {
+ "ek_tpm": self.ek_tpm_2, # DIFFERENT
+ "ekcert": self.ek_cert_wrapped,
+ "aik_tpm": self.aik_tpm_1,
+ }
+
+ agent._check_tpm_identity_immutable()
+
+ # Should add error for agent_id field
+ agent._add_error.assert_called_once()
+ call_args = agent._add_error.call_args
+ self.assertEqual(call_args[0][0], "agent_id")
+ self.assertIn("different TPM identity", call_args[0][1])
+ self.assertIn("ek_tpm", call_args[0][1])
+
+ def test_reregistration_different_aik_tpm(self):
+ """Test re-registration with different AIK TPM is rejected."""
+ agent = self.create_mock_registrar_agent()
+ agent.committed = {
+ "ek_tpm": self.ek_tpm_1,
+ "ekcert": self.ek_cert_wrapped,
+ "aik_tpm": self.aik_tpm_1,
+ }
+ agent.changes = {
+ "ek_tpm": self.ek_tpm_1,
+ "ekcert": self.ek_cert_wrapped,
+ "aik_tpm": self.aik_tpm_2, # DIFFERENT
+ }
+
+ agent._check_tpm_identity_immutable()
+
+ # Should add error for agent_id field
+ agent._add_error.assert_called_once()
+ call_args = agent._add_error.call_args
+ self.assertEqual(call_args[0][0], "agent_id")
+ self.assertIn("different TPM identity", call_args[0][1])
+ self.assertIn("aik_tpm", call_args[0][1])
+
+ def test_reregistration_different_ekcert(self):
+ """Test re-registration with different EK certificate is rejected."""
+ agent = self.create_mock_registrar_agent()
+ agent.committed = {
+ "ek_tpm": self.ek_tpm_1,
+ "ekcert": self.ek_cert_wrapped,
+ "aik_tpm": self.aik_tpm_1,
+ }
+ agent.changes = {
+ "ek_tpm": self.ek_tpm_1,
+ "ekcert": self.different_ek_cert_wrapped, # DIFFERENT
+ "aik_tpm": self.aik_tpm_1,
+ }
+
+ agent._check_tpm_identity_immutable()
+
+ # Should add error for agent_id field
+ agent._add_error.assert_called_once()
+ call_args = agent._add_error.call_args
+ self.assertEqual(call_args[0][0], "agent_id")
+ self.assertIn("different TPM identity", call_args[0][1])
+ self.assertIn("ekcert", call_args[0][1])
+
+ def test_reregistration_multiple_fields_changed(self):
+ """Test re-registration with multiple fields changed lists all of them."""
+ agent = self.create_mock_registrar_agent()
+ agent.committed = {
+ "ek_tpm": self.ek_tpm_1,
+ "ekcert": self.ek_cert_wrapped,
+ "aik_tpm": self.aik_tpm_1,
+ }
+ agent.changes = {
+ "ek_tpm": self.ek_tpm_2, # DIFFERENT
+ "ekcert": self.different_ek_cert_wrapped, # DIFFERENT
+ "aik_tpm": self.aik_tpm_2, # DIFFERENT
+ }
+
+ agent._check_tpm_identity_immutable()
+
+ # Should add error listing all changed fields
+ agent._add_error.assert_called_once()
+ call_args = agent._add_error.call_args
+ self.assertEqual(call_args[0][0], "agent_id")
+ error_message = call_args[0][1]
+ self.assertIn("ek_tpm", error_message)
+ self.assertIn("ekcert", error_message)
+ self.assertIn("aik_tpm", error_message)
+
+ def test_adding_ekcert_to_existing_agent(self):
+ """Test that adding EK cert to existing agent (without cert) is allowed."""
+ agent = self.create_mock_registrar_agent()
+ agent.committed = {
+ "ek_tpm": self.ek_tpm_1,
+ "ekcert": None, # Previously no cert
+ "aik_tpm": self.aik_tpm_1,
+ }
+ agent.changes = {
+ "ek_tpm": self.ek_tpm_1,
+ "ekcert": self.ek_cert_wrapped, # NOW adding cert
+ "aik_tpm": self.aik_tpm_1,
+ }
+
+ agent._check_tpm_identity_immutable()
+
+ # Should not add any errors - adding cert is allowed
+ agent._add_error.assert_not_called()
+
+ def test_removing_ek_tpm_rejected(self):
+ """Test that removing an existing EK TPM is rejected."""
+ agent = self.create_mock_registrar_agent()
+ agent.committed = {
+ "ek_tpm": self.ek_tpm_1,
+ "ekcert": self.ek_cert_wrapped,
+ "aik_tpm": self.aik_tpm_1,
+ }
+ agent.changes = {
+ "ek_tpm": None, # Trying to remove
+ "ekcert": self.ek_cert_wrapped,
+ "aik_tpm": self.aik_tpm_1,
+ }
+
+ agent._check_tpm_identity_immutable()
+
+ # Should add error
+ agent._add_error.assert_called_once()
+ call_args = agent._add_error.call_args
+ self.assertIn("ek_tpm", call_args[0][1])
+
+ def test_iak_idevid_changes_not_checked(self):
+ """Test that IAK/IDevID field changes are NOT checked (allowed)."""
+ agent = self.create_mock_registrar_agent()
+ agent.committed = {
+ "ek_tpm": self.ek_tpm_1,
+ "ekcert": self.ek_cert_wrapped,
+ "aik_tpm": self.aik_tpm_1,
+ "iak_tpm": self.iak_tpm_1,
+ "idevid_tpm": b"IDEVID_OLD",
+ }
+ agent.changes = {
+ "ek_tpm": self.ek_tpm_1, # Same
+ "ekcert": self.ek_cert_wrapped, # Same
+ "aik_tpm": self.aik_tpm_1, # Same
+ "iak_tpm": self.iak_tpm_2, # DIFFERENT - but not checked
+ "idevid_tpm": b"IDEVID_NEW", # DIFFERENT - but not checked
+ }
+
+ agent._check_tpm_identity_immutable()
+
+ # Should not add any errors - IAK/IDevID are not checked
+ agent._add_error.assert_not_called()
+
+ def test_only_changed_fields_are_checked(self):
+ """Test that only fields in changes dict are checked."""
+ agent = self.create_mock_registrar_agent()
+ agent.committed = {
+ "ek_tpm": self.ek_tpm_1,
+ "ekcert": self.ek_cert_wrapped,
+ "aik_tpm": self.aik_tpm_1,
+ }
+ # Only updating IP, not touching TPM identity fields
+ agent.changes = {
+ "ip": "192.168.1.100",
+ }
+
+ agent._check_tpm_identity_immutable()
+
+ # Should not add any errors - no identity fields changed
+ agent._add_error.assert_not_called()
+
+ def test_base64_encoded_tpm_keys(self):
+ """Test that base64-encoded TPM keys are properly compared."""
+ agent = self.create_mock_registrar_agent()
+
+ # Simulate keys stored as base64 strings (as they might be from database)
+ ek_b64 = base64.b64encode(self.ek_tpm_1).decode("utf-8")
+ aik_b64 = base64.b64encode(self.aik_tpm_1).decode("utf-8")
+
+ agent.committed = {
+ "ek_tpm": self.ek_tpm_1, # As bytes
+ "aik_tpm": self.aik_tpm_1, # As bytes
+ }
+ agent.changes = {
+ "ek_tpm": ek_b64, # As base64 string
+ "aik_tpm": aik_b64, # As base64 string
+ }
+
+ agent._check_tpm_identity_immutable()
+
+ # Should not add any errors - should handle both formats
+ agent._add_error.assert_not_called()
+
+ def test_partial_update_only_one_field(self):
+ """Test updating only one TPM field while others remain unchanged."""
+ agent = self.create_mock_registrar_agent()
+ agent.committed = {
+ "ek_tpm": self.ek_tpm_1,
+ "ekcert": self.ek_cert_wrapped,
+ "aik_tpm": self.aik_tpm_1,
+ }
+ # Only changing AIK in this update
+ agent.changes = {
+ "aik_tpm": self.aik_tpm_2, # DIFFERENT
+ }
+
+ agent._check_tpm_identity_immutable()
+
+ # Should add error for the changed field
+ agent._add_error.assert_called_once()
+ call_args = agent._add_error.call_args
+ self.assertIn("aik_tpm", call_args[0][1])
+
+
+if __name__ == "__main__":
+ unittest.main()
--
2.47.3