1189 lines
51 KiB
Diff
1189 lines
51 KiB
Diff
From 2da614d212f58071f54c883ee368ffac4bc5e6b4 Mon Sep 17 00:00:00 2001
|
|
From: Sergio Correia <scorreia@redhat.com>
|
|
Date: Tue, 9 Dec 2025 12:12:22 +0000
|
|
Subject: [PATCH 14/14] Fix registrar duplicate UUID vulnerability
|
|
|
|
Backport upstream PR#1825
|
|
|
|
Signed-off-by: Sergio Correia <scorreia@redhat.com>
|
|
---
|
|
keylime/cmd/registrar.py | 6 +
|
|
keylime/models/registrar/registrar_agent.py | 116 +++++
|
|
keylime/shared_data.py | 6 +
|
|
keylime/web/registrar/agents_controller.py | 98 +++-
|
|
test/test_agents_controller.py | 513 ++++++++++++++++++++
|
|
test/test_registrar_tpm_identity.py | 342 +++++++++++++
|
|
6 files changed, 1071 insertions(+), 10 deletions(-)
|
|
create mode 100644 test/test_agents_controller.py
|
|
create mode 100644 test/test_registrar_tpm_identity.py
|
|
|
|
diff --git a/keylime/cmd/registrar.py b/keylime/cmd/registrar.py
|
|
index 584275a..2e2b25e 100644
|
|
--- a/keylime/cmd/registrar.py
|
|
+++ b/keylime/cmd/registrar.py
|
|
@@ -5,6 +5,7 @@ import cryptography
|
|
from keylime import config, keylime_logging
|
|
from keylime.common.migrations import apply
|
|
from keylime.models import da_manager, db_manager
|
|
+from keylime.shared_data import initialize_shared_memory
|
|
from keylime.web import RegistrarServer
|
|
|
|
logger = keylime_logging.init_logging("registrar")
|
|
@@ -47,6 +48,11 @@ def main() -> None:
|
|
# Prepare backend for durable attestation, if configured
|
|
da_manager.make_backend("registrar")
|
|
|
|
+ # Initialize shared memory for cross-process synchronization
|
|
+ # CRITICAL: Must be called before server.start_multi() to ensure all forked
|
|
+ # worker processes share the same manager instance for agent registration locks
|
|
+ initialize_shared_memory()
|
|
+
|
|
# Start HTTP server
|
|
server = RegistrarServer()
|
|
server.start_multi()
|
|
diff --git a/keylime/models/registrar/registrar_agent.py b/keylime/models/registrar/registrar_agent.py
|
|
index fc7e1be..e26ae41 100644
|
|
--- a/keylime/models/registrar/registrar_agent.py
|
|
+++ b/keylime/models/registrar/registrar_agent.py
|
|
@@ -65,8 +65,14 @@ class RegistrarAgent(PersistableModel):
|
|
def empty(cls):
|
|
agent = super().empty()
|
|
agent.provider_keys = {}
|
|
+ object.__setattr__(agent, "_tpm_identity_violation", False)
|
|
return agent
|
|
|
|
+ @property
|
|
+ def has_tpm_identity_violation(self):
|
|
+ """Returns True if a TPM identity violation was detected during validation."""
|
|
+ return getattr(self, "_tpm_identity_violation", False)
|
|
+
|
|
def _check_key_against_cert(self, tpm_key_field, cert_field):
|
|
# If neither key nor certificate is being updated, no need to check
|
|
if tpm_key_field not in self.changes and cert_field not in self.changes:
|
|
@@ -139,6 +145,111 @@ class RegistrarAgent(PersistableModel):
|
|
|
|
return compliant
|
|
|
|
+ def _check_tpm_identity_immutable(self):
|
|
+ """
|
|
+ Checks that TPM identity fields are not being changed during re-registration.
|
|
+
|
|
+ This prevents an attacker from registering with the same UUID but a different TPM,
|
|
+ which would allow them to impersonate the original agent and bypass attestation.
|
|
+
|
|
+ Checked fields (EK-based identity only):
|
|
+ - ek_tpm: Endorsement Key (primary TPM identity)
|
|
+ - ekcert: EK Certificate (binds EK to TPM manufacturer)
|
|
+ - aik_tpm: Attestation Key (bound to EK via MakeCredential/ActivateCredential)
|
|
+
|
|
+ Note: IAK/IDevID fields are NOT checked and can change on re-registration.
|
|
+
|
|
+ This check only applies to existing agents (those loaded from the database).
|
|
+ New agents created via RegistrarAgent.empty() have no committed values and are
|
|
+ allowed to set identity fields during initial registration.
|
|
+
|
|
+ If the agent needs to be registered with a new TPM (e.g., hardware replacement),
|
|
+ the old agent record must be explicitly deleted first.
|
|
+ """
|
|
+ # Define TPM identity fields that must remain immutable once set
|
|
+ # Only checking EK-based identity (ek_tpm, ekcert, aik_tpm)
|
|
+ # IAK/IDevID fields (iak_tpm, iak_cert, idevid_tpm, idevid_cert) are not checked
|
|
+ identity_fields = ["ek_tpm", "ekcert", "aik_tpm"]
|
|
+
|
|
+ # Only check for existing agents (those loaded from database)
|
|
+ # New agents created via empty() will have no committed values
|
|
+ if not self.committed:
|
|
+ return
|
|
+
|
|
+ # Track which fields have been changed
|
|
+ changed_fields = []
|
|
+
|
|
+ for field_name in identity_fields:
|
|
+ # Skip fields that are not being changed in this update
|
|
+ if field_name not in self.changes:
|
|
+ continue
|
|
+
|
|
+ # Get the old (committed/database) and new (proposed) values
|
|
+ old_value = self.committed.get(field_name)
|
|
+ new_value = self.changes.get(field_name)
|
|
+
|
|
+ # Allow setting a previously unset field (e.g., adding EK cert later)
|
|
+ if old_value is None:
|
|
+ continue
|
|
+
|
|
+ # Reject attempts to remove an already-set identity field
|
|
+ if new_value is None:
|
|
+ changed_fields.append(field_name)
|
|
+ continue
|
|
+
|
|
+ # Compare values based on field type
|
|
+ if field_name == "ekcert":
|
|
+ # For certificates, compare the actual certificate bytes
|
|
+ # Note: We compare full certificate, not just public key, because:
|
|
+ # 1. User requirement: reject if certificate changed even if same public key
|
|
+ # 2. Certificate contains more than just key (issuer, validity period, etc.)
|
|
+ # 3. Different cert for same key could indicate compromise or unauthorized replacement
|
|
+ try:
|
|
+ old_cert_bytes = old_value.public_bytes(Encoding.DER)
|
|
+ new_cert_bytes = new_value.public_bytes(Encoding.DER)
|
|
+
|
|
+ if old_cert_bytes != new_cert_bytes:
|
|
+ changed_fields.append(field_name)
|
|
+ except Exception:
|
|
+ # If we can't extract certificate bytes, treat as changed to be safe
|
|
+ changed_fields.append(field_name)
|
|
+ else:
|
|
+ # For TPM keys (ek_tpm, aik_tpm), compare as binary data
|
|
+ # These are Binary(persist_as=String) fields, so they could be bytes or base64 strings
|
|
+ try:
|
|
+ old_bytes = old_value if isinstance(old_value, bytes) else base64.b64decode(old_value)
|
|
+ new_bytes = new_value if isinstance(new_value, bytes) else base64.b64decode(new_value)
|
|
+
|
|
+ if old_bytes != new_bytes:
|
|
+ changed_fields.append(field_name)
|
|
+ except Exception:
|
|
+ # If comparison fails (e.g., invalid base64), treat as changed to be safe
|
|
+ changed_fields.append(field_name)
|
|
+
|
|
+ # If any TPM identity fields were changed, this is a security violation
|
|
+ if changed_fields:
|
|
+ # Set flag to indicate TPM identity violation occurred
|
|
+ object.__setattr__(self, "_tpm_identity_violation", True)
|
|
+
|
|
+ # Log security warning for audit trail
|
|
+ # Include agent_id and changed fields, but NOT the actual TPM values (sensitive data)
|
|
+ logger.warning(
|
|
+ "SECURITY: Rejected attempt to re-register agent '%s' with different TPM identity. "
|
|
+ "Changed fields: %s. This indicates a potential UUID spoofing attack. "
|
|
+ "The existing agent must be deleted before registering with a new TPM. "
|
|
+ "If this is unexpected, investigate for compromise.",
|
|
+ self.agent_id,
|
|
+ ", ".join(changed_fields),
|
|
+ )
|
|
+
|
|
+ # Add validation error to prevent registration
|
|
+ # Using "agent_id" field for the error because it's the UUID that's being improperly reused
|
|
+ self._add_error(
|
|
+ "agent_id",
|
|
+ f"cannot re-register with different TPM identity. Changed fields: {', '.join(changed_fields)}. "
|
|
+ "To register this UUID with a new TPM, delete the existing agent record first.",
|
|
+ )
|
|
+
|
|
def _check_all_cert_compliance(self):
|
|
non_compliant_certs = []
|
|
|
|
@@ -280,6 +391,11 @@ class RegistrarAgent(PersistableModel):
|
|
+ ["port", "mtls_cert"],
|
|
)
|
|
|
|
+ # SECURITY CHECK: Verify TPM identity is not being changed on re-registration
|
|
+ # This must happen after cast_changes() (so we have new values to compare)
|
|
+ # but before other validation (so we reject immediately without processing further)
|
|
+ self._check_tpm_identity_immutable()
|
|
+
|
|
# Log info about received EK or IAK/IDevID
|
|
self._log_root_identity()
|
|
# Verify EK as valid
|
|
diff --git a/keylime/shared_data.py b/keylime/shared_data.py
|
|
index 23a3d81..a415496 100644
|
|
--- a/keylime/shared_data.py
|
|
+++ b/keylime/shared_data.py
|
|
@@ -58,6 +58,12 @@ class FlatDictView:
|
|
with self._lock:
|
|
return self._store.get(self._make_key(key), default)
|
|
|
|
+ def pop(self, key: Any, default: Any = None) -> Any:
|
|
+ """Remove and return value for key, or default if key not present."""
|
|
+ flat_key = self._make_key(key)
|
|
+ with self._lock:
|
|
+ return self._store.pop(flat_key, default)
|
|
+
|
|
def keys(self) -> List[Any]:
|
|
"""Return keys in this namespace."""
|
|
prefix = f"dict:{self._namespace}:"
|
|
diff --git a/keylime/web/registrar/agents_controller.py b/keylime/web/registrar/agents_controller.py
|
|
index 9be2ef9..f2246de 100644
|
|
--- a/keylime/web/registrar/agents_controller.py
|
|
+++ b/keylime/web/registrar/agents_controller.py
|
|
@@ -1,5 +1,8 @@
|
|
+from sqlalchemy.exc import IntegrityError
|
|
+
|
|
from keylime import keylime_logging
|
|
from keylime.models import RegistrarAgent
|
|
+from keylime.shared_data import get_shared_memory
|
|
from keylime.web.base import Controller
|
|
|
|
logger = keylime_logging.init_logging("registrar")
|
|
@@ -28,16 +31,91 @@ class AgentsController(Controller):
|
|
|
|
# POST /v2[.:minor]/agents/[:agent_id]
|
|
def create(self, agent_id, **params):
|
|
- agent = RegistrarAgent.get(agent_id) or RegistrarAgent.empty() # type: ignore[no-untyped-call]
|
|
- agent.update({"agent_id": agent_id, **params})
|
|
- challenge = agent.produce_ak_challenge()
|
|
-
|
|
- if not challenge or not agent.changes_valid:
|
|
- self.log_model_errors(agent, logger)
|
|
- self.respond(400, "Could not register agent with invalid data")
|
|
- return
|
|
-
|
|
- agent.commit_changes()
|
|
+ """Register a new agent or re-register an existing agent.
|
|
+
|
|
+ For new agents, this:
|
|
+ 1. Validates TPM identity (EK/AIK or IAK/IDevID)
|
|
+ 2. Generates an AK challenge encrypted with the EK
|
|
+ 3. Stores agent record in pending state
|
|
+ 4. Returns challenge blob to agent
|
|
+
|
|
+ For existing agents (re-registration with same UUID):
|
|
+ 1. Verifies TPM identity has not changed (security check)
|
|
+ 2. If identity changed: rejects with 403 Forbidden
|
|
+ 3. If identity same: allows re-registration (e.g., after agent restart)
|
|
+
|
|
+ Security: Re-registration with a different TPM is forbidden to prevent
|
|
+ UUID spoofing attacks where an attacker could impersonate a legitimate
|
|
+ agent by reusing its UUID.
|
|
+
|
|
+ Race condition protection: Uses per-agent locks from SharedDataManager to prevent
|
|
+ race conditions between concurrent registration requests for the same agent_id.
|
|
+ This ensures the check-validate-commit sequence is atomic. Additionally, database
|
|
+ constraint violations (e.g., duplicate UUIDs from concurrent requests) are caught
|
|
+ and returned as 403 Forbidden.
|
|
+ """
|
|
+ # Get shared memory manager and per-agent lock storage
|
|
+ shared_mem = get_shared_memory()
|
|
+ agent_locks = shared_mem.get_or_create_dict("agent_registration_locks")
|
|
+
|
|
+ # Get or create a lock specific to this agent_id
|
|
+ if agent_id not in agent_locks:
|
|
+ agent_locks[agent_id] = shared_mem.manager.Lock()
|
|
+
|
|
+ agent_lock = agent_locks[agent_id]
|
|
+
|
|
+ # CRITICAL SECTION: Acquire lock to make check-validate-commit atomic
|
|
+ with agent_lock:
|
|
+ # Step 1: Load existing agent or create new one (inside lock)
|
|
+ agent = RegistrarAgent.get(agent_id) or RegistrarAgent.empty() # type: ignore[no-untyped-call]
|
|
+
|
|
+ # Step 2: Update agent with new data and validate (inside lock)
|
|
+ agent.update({"agent_id": agent_id, **params})
|
|
+
|
|
+ # Step 3: Check for TPM identity change security violation
|
|
+ # Use explicit flag instead of fragile string matching for security check
|
|
+ if not agent.changes_valid and agent.has_tpm_identity_violation:
|
|
+ # Log the validation errors (includes security warning)
|
|
+ self.log_model_errors(agent, logger)
|
|
+
|
|
+ # Return 403 Forbidden
|
|
+ # 403 indicates a policy violation, not a malformed request
|
|
+ self.respond(403, "Agent re-registration with different TPM identity is forbidden for security reasons")
|
|
+ return
|
|
+
|
|
+ # Step 4: Generate AK challenge (inside lock)
|
|
+ challenge = agent.produce_ak_challenge()
|
|
+
|
|
+ # Step 5: Check for any validation errors or challenge generation failure
|
|
+ if not challenge or not agent.changes_valid:
|
|
+ self.log_model_errors(agent, logger)
|
|
+ self.respond(400, "Could not register agent with invalid data")
|
|
+ return
|
|
+
|
|
+ # Step 6: Commit to database (inside lock)
|
|
+ # This ensures no other request can modify the agent between validation and commit
|
|
+ try:
|
|
+ agent.commit_changes()
|
|
+ except IntegrityError as e:
|
|
+ # Database constraint violation - most likely duplicate agent_id
|
|
+ # This can happen if two requests try to register the same new UUID simultaneously
|
|
+ # and both pass validation before either commits (database race condition)
|
|
+ logger.warning(
|
|
+ "SECURITY: Agent registration failed due to database constraint violation for agent_id '%s'. "
|
|
+ "This UUID may already be registered by a concurrent request or the agent already exists. "
|
|
+ "Database error: %s",
|
|
+ agent_id,
|
|
+ str(e),
|
|
+ )
|
|
+ self.respond(
|
|
+ 403,
|
|
+ f"Agent with UUID '{agent_id}' cannot be registered. "
|
|
+ "This UUID is already in use or a concurrent registration is in progress.",
|
|
+ )
|
|
+ return
|
|
+
|
|
+ # Lock released - safe to respond to client
|
|
+ # Return challenge blob for agent to decrypt
|
|
self.respond(200, "Success", {"blob": challenge})
|
|
|
|
# DELETE /v2[.:minor]/agents/:agent_id/
|
|
diff --git a/test/test_agents_controller.py b/test/test_agents_controller.py
|
|
new file mode 100644
|
|
index 0000000..898d8f0
|
|
--- /dev/null
|
|
+++ b/test/test_agents_controller.py
|
|
@@ -0,0 +1,513 @@
|
|
+"""Unit tests for AgentsController (registrar).
|
|
+
|
|
+Tests the registrar's agent registration endpoints, including the
|
|
+security fix that prevents UUID spoofing via re-registration with
|
|
+a different TPM identity.
|
|
+"""
|
|
+
|
|
+# type: ignore - Controller methods are dynamically bound
|
|
+
|
|
+import unittest
|
|
+from typing import cast
|
|
+from unittest.mock import MagicMock, patch
|
|
+
|
|
+from sqlalchemy.exc import IntegrityError
|
|
+
|
|
+from keylime.web.registrar.agents_controller import AgentsController
|
|
+
|
|
+
|
|
+class TestAgentsControllerIndex(unittest.TestCase):
|
|
+ """Test cases for AgentsController.index()."""
|
|
+
|
|
+ def setUp(self):
|
|
+ """Set up test fixtures."""
|
|
+ mock_action_handler = MagicMock()
|
|
+ self.controller = cast(AgentsController, AgentsController(mock_action_handler))
|
|
+ self.controller.respond = MagicMock()
|
|
+
|
|
+ @patch("keylime.models.RegistrarAgent.all_ids")
|
|
+ def test_index_success(self, mock_all_ids):
|
|
+ """Test successful retrieval of all agent IDs."""
|
|
+ mock_all_ids.return_value = ["agent-1", "agent-2", "agent-3"]
|
|
+
|
|
+ self.controller.index()
|
|
+
|
|
+ self.controller.respond.assert_called_once_with(200, "Success", {"uuids": ["agent-1", "agent-2", "agent-3"]}) # type: ignore[attr-defined]
|
|
+
|
|
+
|
|
+class TestAgentsControllerShow(unittest.TestCase):
|
|
+ """Test cases for AgentsController.show()."""
|
|
+
|
|
+ def setUp(self):
|
|
+ """Set up test fixtures."""
|
|
+ mock_action_handler = MagicMock()
|
|
+ self.controller = cast(AgentsController, AgentsController(mock_action_handler))
|
|
+ self.controller.respond = MagicMock()
|
|
+ self.test_agent_id = "test-agent-123"
|
|
+
|
|
+ @patch("keylime.models.RegistrarAgent.get")
|
|
+ def test_show_not_found(self, mock_get):
|
|
+ """Test show with non-existent agent."""
|
|
+ mock_get.return_value = None
|
|
+
|
|
+ self.controller.show(self.test_agent_id)
|
|
+
|
|
+ self.controller.respond.assert_called_once_with(404, f"Agent with ID '{self.test_agent_id}' not found") # type: ignore[attr-defined]
|
|
+
|
|
+ @patch("keylime.models.RegistrarAgent.get")
|
|
+ def test_show_not_active(self, mock_get):
|
|
+ """Test show with inactive agent."""
|
|
+ mock_agent = MagicMock()
|
|
+ mock_agent.active = False
|
|
+ mock_get.return_value = mock_agent
|
|
+
|
|
+ self.controller.show(self.test_agent_id)
|
|
+
|
|
+ self.controller.respond.assert_called_once_with( # type: ignore[attr-defined]
|
|
+ 404, f"Agent with ID '{self.test_agent_id}' has not been activated"
|
|
+ )
|
|
+
|
|
+ @patch("keylime.models.RegistrarAgent.get")
|
|
+ def test_show_success(self, mock_get):
|
|
+ """Test successful show of active agent."""
|
|
+ mock_agent = MagicMock()
|
|
+ mock_agent.active = True
|
|
+ mock_agent.render.return_value = {"agent_id": self.test_agent_id, "active": True}
|
|
+ mock_get.return_value = mock_agent
|
|
+
|
|
+ self.controller.show(self.test_agent_id)
|
|
+
|
|
+ self.controller.respond.assert_called_once_with( # type: ignore[attr-defined]
|
|
+ 200, "Success", {"agent_id": self.test_agent_id, "active": True}
|
|
+ )
|
|
+
|
|
+
|
|
+class TestAgentsControllerCreate(unittest.TestCase):
|
|
+ """Test cases for AgentsController.create() - the main registration endpoint."""
|
|
+
|
|
+ def setUp(self):
|
|
+ """Set up test fixtures."""
|
|
+ mock_action_handler = MagicMock()
|
|
+ self.controller = cast(AgentsController, AgentsController(mock_action_handler))
|
|
+ self.controller.respond = MagicMock()
|
|
+ self.controller.log_model_errors = MagicMock()
|
|
+ self.test_agent_id = "test-agent-123"
|
|
+
|
|
+ @patch("keylime.models.RegistrarAgent.get")
|
|
+ def test_create_new_agent_success(self, mock_get):
|
|
+ """Test successful registration of a new agent."""
|
|
+ # Mock that agent doesn't exist yet
|
|
+ mock_get.return_value = None
|
|
+
|
|
+ # Create mock agent that will be returned by empty()
|
|
+ mock_agent = MagicMock()
|
|
+ mock_agent.changes_valid = True
|
|
+ mock_agent.errors = {}
|
|
+ mock_agent.produce_ak_challenge.return_value = "challenge_blob_data"
|
|
+
|
|
+ # Patch RegistrarAgent.empty to return our mock
|
|
+ with patch("keylime.models.RegistrarAgent.empty", return_value=mock_agent):
|
|
+ params = {"ek_tpm": "ek_key", "aik_tpm": "aik_key"}
|
|
+ self.controller.create(self.test_agent_id, **params)
|
|
+
|
|
+ # Verify agent was updated with params
|
|
+ mock_agent.update.assert_called_once_with({"agent_id": self.test_agent_id, **params})
|
|
+
|
|
+ # Verify challenge was generated
|
|
+ mock_agent.produce_ak_challenge.assert_called_once()
|
|
+
|
|
+ # Verify agent was saved
|
|
+ mock_agent.commit_changes.assert_called_once()
|
|
+
|
|
+ # Verify 200 response with challenge
|
|
+ self.controller.respond.assert_called_once_with(200, "Success", {"blob": "challenge_blob_data"}) # type: ignore[attr-defined]
|
|
+
|
|
+ @patch("keylime.models.RegistrarAgent.get")
|
|
+ def test_create_reregistration_same_tpm_identity(self, mock_get):
|
|
+ """Test successful re-registration with same TPM identity."""
|
|
+ # Mock existing agent
|
|
+ mock_existing_agent = MagicMock()
|
|
+ mock_existing_agent.changes_valid = True
|
|
+ mock_existing_agent.errors = {}
|
|
+ mock_existing_agent.produce_ak_challenge.return_value = "challenge_blob_data"
|
|
+ mock_get.return_value = mock_existing_agent
|
|
+
|
|
+ params = {"ek_tpm": "same_ek_key", "aik_tpm": "same_aik_key"}
|
|
+ self.controller.create(self.test_agent_id, **params)
|
|
+
|
|
+ # Verify agent was updated
|
|
+ mock_existing_agent.update.assert_called_once_with({"agent_id": self.test_agent_id, **params})
|
|
+
|
|
+ # Verify challenge was generated
|
|
+ mock_existing_agent.produce_ak_challenge.assert_called_once()
|
|
+
|
|
+ # Verify agent was saved
|
|
+ mock_existing_agent.commit_changes.assert_called_once()
|
|
+
|
|
+ # Verify 200 response
|
|
+ self.controller.respond.assert_called_once_with(200, "Success", {"blob": "challenge_blob_data"}) # type: ignore[attr-defined]
|
|
+
|
|
+ @patch("keylime.models.RegistrarAgent.get")
|
|
+ def test_create_reregistration_different_tpm_identity_forbidden(self, mock_get):
|
|
+ """Test re-registration with different TPM identity is rejected with 403.
|
|
+
|
|
+ This is the key security fix: preventing UUID spoofing by rejecting
|
|
+ attempts to re-register an agent with a different TPM identity.
|
|
+ """
|
|
+ # Mock existing agent
|
|
+ mock_existing_agent = MagicMock()
|
|
+ mock_existing_agent.changes_valid = False # Validation failed
|
|
+ # Simulate the error added by _check_tpm_identity_immutable
|
|
+ mock_existing_agent.errors = {
|
|
+ "agent_id": [
|
|
+ "Agent re-registration attempted with different TPM identity (changed fields: ek_tpm). "
|
|
+ "This is a security violation - the same agent UUID cannot be reused with a different TPM."
|
|
+ ]
|
|
+ }
|
|
+ mock_get.return_value = mock_existing_agent
|
|
+
|
|
+ params = {"ek_tpm": "different_ek_key", "aik_tpm": "same_aik_key"}
|
|
+ self.controller.create(self.test_agent_id, **params)
|
|
+
|
|
+ # Verify agent was updated (which triggers validation)
|
|
+ mock_existing_agent.update.assert_called_once_with({"agent_id": self.test_agent_id, **params})
|
|
+
|
|
+ # Verify errors were logged
|
|
+ self.controller.log_model_errors.assert_called_once() # type: ignore[attr-defined]
|
|
+
|
|
+ # Verify 403 Forbidden response (not 400!)
|
|
+ self.controller.respond.assert_called_once_with( # type: ignore[attr-defined]
|
|
+ 403, "Agent re-registration with different TPM identity is forbidden for security reasons"
|
|
+ )
|
|
+
|
|
+ # Verify agent was NOT saved
|
|
+ mock_existing_agent.commit_changes.assert_not_called()
|
|
+
|
|
+ @patch("keylime.models.RegistrarAgent.get")
|
|
+ def test_create_invalid_data_other_validation_error(self, mock_get):
|
|
+ """Test registration with other validation errors returns 400."""
|
|
+ # Mock agent with validation errors (not TPM identity related)
|
|
+ mock_agent = MagicMock()
|
|
+ mock_agent.changes_valid = False
|
|
+ # Error not related to TPM identity
|
|
+ mock_agent.errors = {"ek_tpm": ["must be a valid TPM2B_PUBLIC structure"]}
|
|
+ mock_agent.has_tpm_identity_violation = False # Not a TPM identity violation
|
|
+ mock_agent.produce_ak_challenge.return_value = None
|
|
+ mock_get.return_value = None
|
|
+
|
|
+ with patch("keylime.models.RegistrarAgent.empty", return_value=mock_agent):
|
|
+ params = {"ek_tpm": "invalid_ek_format"}
|
|
+ self.controller.create(self.test_agent_id, **params)
|
|
+
|
|
+ # Verify errors were logged
|
|
+ self.controller.log_model_errors.assert_called_once() # type: ignore[attr-defined]
|
|
+
|
|
+ # Verify 400 Bad Request (not 403)
|
|
+ self.controller.respond.assert_called_once_with(400, "Could not register agent with invalid data") # type: ignore[attr-defined]
|
|
+
|
|
+ # Verify agent was NOT saved
|
|
+ mock_agent.commit_changes.assert_not_called()
|
|
+
|
|
+ @patch("keylime.models.RegistrarAgent.get")
|
|
+ def test_create_challenge_generation_failure(self, mock_get):
|
|
+ """Test registration fails if challenge generation fails."""
|
|
+ # Mock agent where challenge generation fails
|
|
+ mock_agent = MagicMock()
|
|
+ mock_agent.changes_valid = True
|
|
+ mock_agent.errors = {}
|
|
+ mock_agent.produce_ak_challenge.return_value = None # Challenge generation failed
|
|
+ mock_get.return_value = None
|
|
+
|
|
+ with patch("keylime.models.RegistrarAgent.empty", return_value=mock_agent):
|
|
+ params = {"ek_tpm": "ek_key", "aik_tpm": "aik_key"}
|
|
+ self.controller.create(self.test_agent_id, **params)
|
|
+
|
|
+ # Verify errors were logged
|
|
+ self.controller.log_model_errors.assert_called_once() # type: ignore[attr-defined]
|
|
+
|
|
+ # Verify 400 response
|
|
+ self.controller.respond.assert_called_once_with(400, "Could not register agent with invalid data") # type: ignore[attr-defined]
|
|
+
|
|
+ # Verify agent was NOT saved
|
|
+ mock_agent.commit_changes.assert_not_called()
|
|
+
|
|
+ @patch("keylime.models.RegistrarAgent.get")
|
|
+ def test_create_validation_error_with_agent_id_but_not_tpm_identity(self, mock_get):
|
|
+ """Test that agent_id errors unrelated to TPM identity get 400, not 403."""
|
|
+ # Mock agent with agent_id error, but not about TPM identity
|
|
+ mock_agent = MagicMock()
|
|
+ mock_agent.changes_valid = False
|
|
+ mock_agent.errors = {"agent_id": ["must be a valid UUID format"]} # Not about TPM identity
|
|
+ mock_agent.has_tpm_identity_violation = False # Not a TPM identity violation
|
|
+ mock_agent.produce_ak_challenge.return_value = None
|
|
+ mock_get.return_value = None
|
|
+
|
|
+ with patch("keylime.models.RegistrarAgent.empty", return_value=mock_agent):
|
|
+ params = {"ek_tpm": "ek_key", "aik_tpm": "aik_key"}
|
|
+ self.controller.create(self.test_agent_id, **params)
|
|
+
|
|
+ # Verify 400 Bad Request (not 403) because it's not a TPM identity violation
|
|
+ self.controller.respond.assert_called_once_with(400, "Could not register agent with invalid data") # type: ignore[attr-defined]
|
|
+
|
|
+
|
|
+class TestAgentsControllerDelete(unittest.TestCase):
|
|
+ """Test cases for AgentsController.delete()."""
|
|
+
|
|
+ def setUp(self):
|
|
+ """Set up test fixtures."""
|
|
+ mock_action_handler = MagicMock()
|
|
+ self.controller = cast(AgentsController, AgentsController(mock_action_handler))
|
|
+ self.controller.respond = MagicMock()
|
|
+ self.test_agent_id = "test-agent-123"
|
|
+
|
|
+ @patch("keylime.models.RegistrarAgent.get")
|
|
+ def test_delete_not_found(self, mock_get):
|
|
+ """Test delete with non-existent agent."""
|
|
+ mock_get.return_value = None
|
|
+
|
|
+ self.controller.delete(self.test_agent_id)
|
|
+
|
|
+ self.controller.respond.assert_called_once_with(404, f"Agent with ID '{self.test_agent_id}' not found") # type: ignore[attr-defined]
|
|
+
|
|
+ @patch("keylime.models.RegistrarAgent.get")
|
|
+ def test_delete_success(self, mock_get):
|
|
+ """Test successful agent deletion."""
|
|
+ mock_agent = MagicMock()
|
|
+ mock_get.return_value = mock_agent
|
|
+
|
|
+ self.controller.delete(self.test_agent_id)
|
|
+
|
|
+ # Verify agent was deleted
|
|
+ mock_agent.delete.assert_called_once()
|
|
+
|
|
+ # Verify 200 response
|
|
+ self.controller.respond.assert_called_once_with(200, "Success") # type: ignore[attr-defined]
|
|
+
|
|
+
|
|
+class TestAgentsControllerActivate(unittest.TestCase):
|
|
+ """Test cases for AgentsController.activate()."""
|
|
+
|
|
+ def setUp(self):
|
|
+ """Set up test fixtures."""
|
|
+ mock_action_handler = MagicMock()
|
|
+ self.controller = cast(AgentsController, AgentsController(mock_action_handler))
|
|
+ self.controller.respond = MagicMock()
|
|
+ self.test_agent_id = "test-agent-123"
|
|
+ self.test_auth_tag = "valid_auth_tag"
|
|
+
|
|
+ @patch("keylime.models.RegistrarAgent.get")
|
|
+ def test_activate_not_found(self, mock_get):
|
|
+ """Test activate with non-existent agent."""
|
|
+ mock_get.return_value = None
|
|
+
|
|
+ self.controller.activate(self.test_agent_id, self.test_auth_tag)
|
|
+
|
|
+ self.controller.respond.assert_called_once_with(404, f"Agent with ID '{self.test_agent_id}' not found") # type: ignore[attr-defined]
|
|
+
|
|
+ @patch("keylime.models.RegistrarAgent.get")
|
|
+ def test_activate_success(self, mock_get):
|
|
+ """Test successful agent activation."""
|
|
+ mock_agent = MagicMock()
|
|
+ mock_agent.verify_ak_response.return_value = True # Auth tag is valid
|
|
+ mock_get.return_value = mock_agent
|
|
+
|
|
+ self.controller.activate(self.test_agent_id, self.test_auth_tag)
|
|
+
|
|
+ # Verify auth tag was verified
|
|
+ mock_agent.verify_ak_response.assert_called_once_with(self.test_auth_tag)
|
|
+
|
|
+ # Verify agent was saved
|
|
+ mock_agent.commit_changes.assert_called_once()
|
|
+
|
|
+ # Verify 200 response
|
|
+ self.controller.respond.assert_called_once_with(200, "Success") # type: ignore[attr-defined]
|
|
+
|
|
+ @patch("keylime.models.RegistrarAgent.get")
|
|
+ def test_activate_invalid_auth_tag(self, mock_get):
|
|
+ """Test activation with invalid auth tag."""
|
|
+ mock_agent = MagicMock()
|
|
+ mock_agent.verify_ak_response.return_value = False # Auth tag is invalid
|
|
+ mock_get.return_value = mock_agent
|
|
+
|
|
+ self.controller.activate(self.test_agent_id, self.test_auth_tag)
|
|
+
|
|
+ # Verify auth tag was verified
|
|
+ mock_agent.verify_ak_response.assert_called_once_with(self.test_auth_tag)
|
|
+
|
|
+ # Verify agent was deleted (due to failed activation)
|
|
+ mock_agent.delete.assert_called_once()
|
|
+
|
|
+ # Verify agent was NOT saved
|
|
+ mock_agent.commit_changes.assert_not_called()
|
|
+
|
|
+ # Verify 400 response with detailed error message
|
|
+ self.controller.respond.assert_called_once() # type: ignore[attr-defined]
|
|
+ call_args = self.controller.respond.call_args # type: ignore[attr-defined]
|
|
+ self.assertEqual(call_args[0][0], 400)
|
|
+ self.assertIn(self.test_auth_tag, call_args[0][1])
|
|
+ self.assertIn(self.test_agent_id, call_args[0][1])
|
|
+ self.assertIn("deleted", call_args[0][1])
|
|
+
|
|
+
|
|
+class TestAgentsControllerConcurrency(unittest.TestCase):
|
|
+ """Test cases for concurrent registration TOCTOU race condition protection."""
|
|
+
|
|
+ def setUp(self):
|
|
+ """Set up test fixtures."""
|
|
+ mock_action_handler = MagicMock()
|
|
+ self.controller = cast(AgentsController, AgentsController(mock_action_handler))
|
|
+ self.controller.respond = MagicMock()
|
|
+ self.controller.log_model_errors = MagicMock()
|
|
+ self.test_agent_id = "concurrent-test-agent"
|
|
+
|
|
+ @patch("keylime.web.registrar.agents_controller.get_shared_memory")
|
|
+ @patch("keylime.models.RegistrarAgent.get")
|
|
+ def test_concurrent_registration_uses_locking(self, mock_get, mock_shared_mem):
|
|
+ """Test that concurrent registration attempts use per-agent locking.
|
|
+
|
|
+ This test verifies that the locking mechanism is invoked to prevent
|
|
+ TOCTOU race conditions during concurrent registration.
|
|
+ """
|
|
+ # Mock shared memory manager with lock support
|
|
+ mock_manager = MagicMock()
|
|
+ mock_lock = MagicMock()
|
|
+ mock_lock.__enter__ = MagicMock(return_value=None)
|
|
+ mock_lock.__exit__ = MagicMock(return_value=None)
|
|
+ mock_manager.Lock.return_value = mock_lock
|
|
+
|
|
+ mock_agent_locks = MagicMock()
|
|
+ mock_agent_locks.__contains__ = MagicMock(return_value=False)
|
|
+ mock_agent_locks.__setitem__ = MagicMock()
|
|
+ mock_agent_locks.__getitem__ = MagicMock(return_value=mock_lock)
|
|
+
|
|
+ mock_shared_mem.return_value.get_or_create_dict.return_value = mock_agent_locks
|
|
+ mock_shared_mem.return_value.manager = mock_manager
|
|
+
|
|
+ # Mock agent that doesn't exist yet (new registration)
|
|
+ mock_get.return_value = None
|
|
+
|
|
+ mock_agent = MagicMock()
|
|
+ mock_agent.changes_valid = True
|
|
+ mock_agent.errors = {}
|
|
+ mock_agent.produce_ak_challenge.return_value = "challenge_blob"
|
|
+
|
|
+ with patch("keylime.models.RegistrarAgent.empty", return_value=mock_agent):
|
|
+ params = {"ek_tpm": "ek_key", "aik_tpm": "aik_key"}
|
|
+ self.controller.create(self.test_agent_id, **params)
|
|
+
|
|
+ # Verify lock was acquired and released
|
|
+ mock_lock.__enter__.assert_called_once()
|
|
+ mock_lock.__exit__.assert_called_once()
|
|
+
|
|
+ # Verify successful registration
|
|
+ mock_agent.commit_changes.assert_called_once()
|
|
+ self.controller.respond.assert_called_once_with(200, "Success", {"blob": "challenge_blob"}) # type: ignore[attr-defined]
|
|
+
|
|
+ @patch("keylime.web.registrar.agents_controller.get_shared_memory")
|
|
+ @patch("keylime.models.RegistrarAgent.get")
|
|
+ def test_different_agents_use_different_locks(self, mock_get, mock_shared_mem):
|
|
+ """Test that different agent_ids use different locks for parallel registration.
|
|
+
|
|
+ This ensures that registrations for different agents don't block each other,
|
|
+ only concurrent registrations for the same agent_id are serialized.
|
|
+ """
|
|
+ # Mock shared memory manager
|
|
+ mock_manager = MagicMock()
|
|
+
|
|
+ def mock_lock_factory():
|
|
+ """Return a new lock each time."""
|
|
+ return MagicMock()
|
|
+
|
|
+ mock_manager.Lock.side_effect = mock_lock_factory
|
|
+
|
|
+ mock_agent_locks = {}
|
|
+
|
|
+ def mock_getitem(_self, key): # pylint: disable=unused-argument
|
|
+ return mock_agent_locks.get(key)
|
|
+
|
|
+ def mock_setitem(_self, key, value): # pylint: disable=unused-argument
|
|
+ mock_agent_locks[key] = value
|
|
+
|
|
+ def mock_contains(_self, key): # pylint: disable=unused-argument
|
|
+ return key in mock_agent_locks
|
|
+
|
|
+ mock_locks_dict = MagicMock()
|
|
+ mock_locks_dict.__contains__ = mock_contains
|
|
+ mock_locks_dict.__setitem__ = mock_setitem
|
|
+ mock_locks_dict.__getitem__ = mock_getitem
|
|
+
|
|
+ mock_shared_mem.return_value.get_or_create_dict.return_value = mock_locks_dict
|
|
+ mock_shared_mem.return_value.manager = mock_manager
|
|
+
|
|
+ # Register two different agents
|
|
+ mock_get.return_value = None
|
|
+
|
|
+ for agent_id in ["agent-a", "agent-b"]:
|
|
+ mock_agent = MagicMock()
|
|
+ mock_agent.changes_valid = True
|
|
+ mock_agent.errors = {}
|
|
+ mock_agent.produce_ak_challenge.return_value = f"challenge_{agent_id}"
|
|
+
|
|
+ with patch("keylime.models.RegistrarAgent.empty", return_value=mock_agent):
|
|
+ self.controller.respond = MagicMock() # Reset for each call
|
|
+ params = {"ek_tpm": f"ek_{agent_id}", "aik_tpm": f"aik_{agent_id}"}
|
|
+ self.controller.create(agent_id, **params)
|
|
+
|
|
+ # Verify that two different locks were created (one per agent)
|
|
+ self.assertEqual(len(mock_agent_locks), 2)
|
|
+ self.assertIn("agent-a", mock_agent_locks)
|
|
+ self.assertIn("agent-b", mock_agent_locks)
|
|
+ # Verify they are different lock objects
|
|
+ self.assertIsNot(mock_agent_locks["agent-a"], mock_agent_locks["agent-b"])
|
|
+
|
|
+ @patch("keylime.web.registrar.agents_controller.get_shared_memory")
|
|
+ @patch("keylime.models.RegistrarAgent.get")
|
|
+ def test_concurrent_new_registration_database_constraint_violation(self, mock_get, mock_shared_mem):
|
|
+ """Test that database constraint violations during concurrent new agent registration return 403.
|
|
+
|
|
+ This handles the edge case where two requests both create empty agents for the same UUID,
|
|
+ both pass validation, but the second commit fails with IntegrityError due to duplicate
|
|
+ primary key. This should return 403 Forbidden, not 500 Internal Server Error.
|
|
+ """
|
|
+ # Mock shared memory manager with lock support
|
|
+ mock_manager = MagicMock()
|
|
+ mock_lock = MagicMock()
|
|
+ mock_lock.__enter__ = MagicMock(return_value=None)
|
|
+ mock_lock.__exit__ = MagicMock(return_value=None)
|
|
+ mock_manager.Lock.return_value = mock_lock
|
|
+
|
|
+ mock_agent_locks = MagicMock()
|
|
+ mock_agent_locks.__contains__ = MagicMock(return_value=False)
|
|
+ mock_agent_locks.__setitem__ = MagicMock()
|
|
+ mock_agent_locks.__getitem__ = MagicMock(return_value=mock_lock)
|
|
+
|
|
+ mock_shared_mem.return_value.get_or_create_dict.return_value = mock_agent_locks
|
|
+ mock_shared_mem.return_value.manager = mock_manager
|
|
+
|
|
+ # Mock agent that doesn't exist yet (new registration)
|
|
+ mock_get.return_value = None
|
|
+
|
|
+ mock_agent = MagicMock()
|
|
+ mock_agent.changes_valid = True
|
|
+ mock_agent.errors = {}
|
|
+ mock_agent.produce_ak_challenge.return_value = "challenge_blob"
|
|
+
|
|
+ # Simulate IntegrityError during commit (duplicate primary key)
|
|
+ # IntegrityError(statement, params, orig) where orig is the original exception
|
|
+ orig_exception = Exception("UNIQUE constraint failed: registrarmain.agent_id")
|
|
+ mock_agent.commit_changes.side_effect = IntegrityError("INSERT INTO registrarmain ...", None, orig_exception)
|
|
+
|
|
+ with patch("keylime.models.RegistrarAgent.empty", return_value=mock_agent):
|
|
+ params = {"ek_tpm": "ek_key", "aik_tpm": "aik_key"}
|
|
+ self.controller.create(self.test_agent_id, **params)
|
|
+
|
|
+ # Verify 403 Forbidden response (not 500)
|
|
+ self.controller.respond.assert_called_once() # type: ignore[attr-defined]
|
|
+ call_args = self.controller.respond.call_args # type: ignore[attr-defined]
|
|
+ self.assertEqual(call_args[0][0], 403)
|
|
+ self.assertIn(self.test_agent_id, call_args[0][1])
|
|
+ self.assertIn("already in use", call_args[0][1])
|
|
+
|
|
+
|
|
+if __name__ == "__main__":
|
|
+ unittest.main()
|
|
diff --git a/test/test_registrar_tpm_identity.py b/test/test_registrar_tpm_identity.py
|
|
new file mode 100644
|
|
index 0000000..2fc69b2
|
|
--- /dev/null
|
|
+++ b/test/test_registrar_tpm_identity.py
|
|
@@ -0,0 +1,342 @@
|
|
+"""
|
|
+Unit tests for RegistrarAgent TPM identity immutability security check.
|
|
+
|
|
+This module tests the _check_tpm_identity_immutable() method which prevents
|
|
+UUID spoofing attacks by rejecting re-registration attempts with different TPM identities.
|
|
+"""
|
|
+
|
|
+import base64
|
|
+import types
|
|
+import unittest
|
|
+from unittest.mock import Mock
|
|
+
|
|
+import cryptography.x509
|
|
+
|
|
+from keylime.certificate_wrapper import wrap_certificate
|
|
+from keylime.models.registrar.registrar_agent import RegistrarAgent
|
|
+
|
|
+
|
|
+class TestRegistrarAgentTPMIdentity(unittest.TestCase):
|
|
+ """Test cases for RegistrarAgent TPM identity immutability."""
|
|
+
|
|
+ # pylint: disable=protected-access # Testing protected methods
|
|
+ # pylint: disable=not-callable # False positive: methods bound via types.MethodType are callable
|
|
+
|
|
+ def setUp(self):
|
|
+ """Set up test fixtures."""
|
|
+ # EK certificate (used for testing certificate comparison)
|
|
+ self.ek_cert_pem = """-----BEGIN CERTIFICATE-----
|
|
+MIIEnzCCA4egAwIBAgIEMV64bDANBgkqhkiG9w0BAQUFADBtMQswCQYDVQQGEwJE
|
|
+RTEQMA4GA1UECBMHQmF2YXJpYTEhMB8GA1UEChMYSW5maW5lb24gVGVjaG5vbG9n
|
|
+aWVzIEFHMQwwCgYDVQQLEwNBSU0xGzAZBgNVBAMTEklGWCBUUE0gRUsgUm9vdCBD
|
|
+QTAeFw0wNTEwMjAxMzQ3NDNaFw0yNTEwMjAxMzQ3NDNaMHcxCzAJBgNVBAYTAkRF
|
|
+MQ8wDQYDVQQIEwZTYXhvbnkxITAfBgNVBAoTGEluZmluZW9uIFRlY2hub2xvZ2ll
|
|
+cyBBRzEMMAoGA1UECxMDQUlNMSYwJAYDVQQDEx1JRlggVFBNIEVLIEludGVybWVk
|
|
+aWF0ZSBDQSAwMTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBALftPhYN
|
|
+t4rE+JnU/XOPICbOBLvfo6iA7nuq7zf4DzsAWBdsZEdFJQfaK331ihG3IpQnlQ2i
|
|
+YtDim289265f0J4OkPFpKeFU27CsfozVaNUm6UR/uzwA8ncxFc3iZLRMRNLru/Al
|
|
+VG053ULVDQMVx2iwwbBSAYO9pGiGbk1iMmuZaSErMdb9v0KRUyZM7yABiyDlM3cz
|
|
+UQX5vLWV0uWqxdGoHwNva5u3ynP9UxPTZWHZOHE6+14rMzpobs6Ww2RR8BgF96rh
|
|
+4rRAZEl8BXhwiQq4STvUXkfvdpWH4lzsGcDDtrB6Nt3KvVNvsKz+b07Dk+Xzt+EH
|
|
+NTf3Byk2HlvX+scCAwEAAaOCATswggE3MB0GA1UdDgQWBBQ4k8292HPEIzMV4bE7
|
|
+qWoNI8wQxzAOBgNVHQ8BAf8EBAMCAgQwEgYDVR0TAQH/BAgwBgEB/wIBADBYBgNV
|
|
+HSABAf8ETjBMMEoGC2CGSAGG+EUBBy8BMDswOQYIKwYBBQUHAgEWLWh0dHA6Ly93
|
|
+d3cudmVyaXNpZ24uY29tL3JlcG9zaXRvcnkvaW5kZXguaHRtbDCBlwYDVR0jBIGP
|
|
+MIGMgBRW65FEhWPWcrOu1EWWC/eUDlRCpqFxpG8wbTELMAkGA1UEBhMCREUxEDAO
|
|
+BgNVBAgTB0JhdmFyaWExITAfBgNVBAoTGEluZmluZW9uIFRlY2hub2xvZ2llcyBB
|
|
+RzEMMAoGA1UECxMDQUlNMRswGQYDVQQDExJJRlggVFBNIEVLIFJvb3QgQ0GCAQMw
|
|
+DQYJKoZIhvcNAQEFBQADggEBABJ1+Ap3rNlxZ0FW0aIgdzktbNHlvXWNxFdYIBbM
|
|
+OKjmbOos0Y4O60eKPu259XmMItCUmtbzF3oKYXq6ybARUT2Lm+JsseMF5VgikSlU
|
|
+BJALqpKVjwAds81OtmnIQe2LSu4xcTSavpsL4f52cUAu/maMhtSgN9mq5roYptq9
|
|
+DnSSDZrX4uYiMPl//rBaNDBflhJ727j8xo9CCohF3yQUoQm7coUgbRMzyO64yMIO
|
|
+3fhb+Vuc7sNwrMOz3VJN14C3JMoGgXy0c57IP/kD5zGRvljKEvrRC2I147+fPeLS
|
|
+DueRMS6lblvRKiZgmGAg7YaKOkOaEmVDMQ+fTo2Po7hI5wc=
|
|
+-----END CERTIFICATE-----"""
|
|
+
|
|
+ # Create wrapped cert from real certificate
|
|
+ self.ek_cert = cryptography.x509.load_pem_x509_certificate(self.ek_cert_pem.encode())
|
|
+ self.ek_cert_wrapped = wrap_certificate(self.ek_cert, None)
|
|
+
|
|
+ # Create a different cert mock that returns different DER bytes
|
|
+ self.different_ek_cert_wrapped = Mock()
|
|
+ self.different_ek_cert_wrapped.public_bytes = Mock(return_value=b"DIFFERENT_CERTIFICATE_DER_BYTES_FOR_TESTING")
|
|
+
|
|
+ # Sample TPM keys (base64 encoded for simplicity in tests)
|
|
+ self.ek_tpm_1 = b"EK_TPM_KEY_NUMBER_ONE_SAMPLE_DATA"
|
|
+ self.ek_tpm_2 = b"EK_TPM_KEY_NUMBER_TWO_DIFFERENT_"
|
|
+ self.aik_tpm_1 = b"AIK_TPM_KEY_NUMBER_ONE_SAMPLE_DATA"
|
|
+ self.aik_tpm_2 = b"AIK_TPM_KEY_NUMBER_TWO_DIFFERENT_"
|
|
+
|
|
+ # IAK/IDevID keys for testing that they are not checked
|
|
+ self.iak_tpm_1 = b"IAK_TPM_KEY_NUMBER_ONE"
|
|
+ self.iak_tpm_2 = b"IAK_TPM_KEY_NUMBER_TWO"
|
|
+
|
|
+ def create_mock_registrar_agent(self, agent_id="test-agent-uuid"):
|
|
+ """Create a mock RegistrarAgent with necessary attributes."""
|
|
+ agent = Mock()
|
|
+ agent.agent_id = agent_id
|
|
+ agent.changes = {}
|
|
+ agent.values = {}
|
|
+ agent.committed = {}
|
|
+ agent._add_error = Mock()
|
|
+ agent.errors = {}
|
|
+
|
|
+ # Bind the actual method to the mock instance
|
|
+ agent._check_tpm_identity_immutable = types.MethodType(RegistrarAgent._check_tpm_identity_immutable, agent)
|
|
+
|
|
+ return agent
|
|
+
|
|
+ def test_new_agent_no_committed_values(self):
|
|
+ """Test that new agents (no committed values) are not checked."""
|
|
+ agent = self.create_mock_registrar_agent()
|
|
+ agent.committed = {} # New agent, no previous values
|
|
+ agent.changes = {
|
|
+ "ek_tpm": self.ek_tpm_1,
|
|
+ "ekcert": self.ek_cert_wrapped,
|
|
+ "aik_tpm": self.aik_tpm_1,
|
|
+ }
|
|
+
|
|
+ agent._check_tpm_identity_immutable()
|
|
+
|
|
+ # Should not add any errors for new agents
|
|
+ agent._add_error.assert_not_called()
|
|
+
|
|
+ def test_reregistration_same_tpm_all_fields_identical(self):
|
|
+ """Test re-registration with identical TPM identity passes."""
|
|
+ agent = self.create_mock_registrar_agent()
|
|
+ agent.committed = {
|
|
+ "ek_tpm": self.ek_tpm_1,
|
|
+ "ekcert": self.ek_cert_wrapped,
|
|
+ "aik_tpm": self.aik_tpm_1,
|
|
+ }
|
|
+ agent.changes = {
|
|
+ "ek_tpm": self.ek_tpm_1, # Same
|
|
+ "ekcert": self.ek_cert_wrapped, # Same
|
|
+ "aik_tpm": self.aik_tpm_1, # Same
|
|
+ }
|
|
+
|
|
+ agent._check_tpm_identity_immutable()
|
|
+
|
|
+ # Should not add any errors
|
|
+ agent._add_error.assert_not_called()
|
|
+
|
|
+ def test_reregistration_different_ek_tpm(self):
|
|
+ """Test re-registration with different EK TPM is rejected."""
|
|
+ agent = self.create_mock_registrar_agent()
|
|
+ agent.committed = {
|
|
+ "ek_tpm": self.ek_tpm_1,
|
|
+ "ekcert": self.ek_cert_wrapped,
|
|
+ "aik_tpm": self.aik_tpm_1,
|
|
+ }
|
|
+ agent.changes = {
|
|
+ "ek_tpm": self.ek_tpm_2, # DIFFERENT
|
|
+ "ekcert": self.ek_cert_wrapped,
|
|
+ "aik_tpm": self.aik_tpm_1,
|
|
+ }
|
|
+
|
|
+ agent._check_tpm_identity_immutable()
|
|
+
|
|
+ # Should add error for agent_id field
|
|
+ agent._add_error.assert_called_once()
|
|
+ call_args = agent._add_error.call_args
|
|
+ self.assertEqual(call_args[0][0], "agent_id")
|
|
+ self.assertIn("different TPM identity", call_args[0][1])
|
|
+ self.assertIn("ek_tpm", call_args[0][1])
|
|
+
|
|
+ def test_reregistration_different_aik_tpm(self):
|
|
+ """Test re-registration with different AIK TPM is rejected."""
|
|
+ agent = self.create_mock_registrar_agent()
|
|
+ agent.committed = {
|
|
+ "ek_tpm": self.ek_tpm_1,
|
|
+ "ekcert": self.ek_cert_wrapped,
|
|
+ "aik_tpm": self.aik_tpm_1,
|
|
+ }
|
|
+ agent.changes = {
|
|
+ "ek_tpm": self.ek_tpm_1,
|
|
+ "ekcert": self.ek_cert_wrapped,
|
|
+ "aik_tpm": self.aik_tpm_2, # DIFFERENT
|
|
+ }
|
|
+
|
|
+ agent._check_tpm_identity_immutable()
|
|
+
|
|
+ # Should add error for agent_id field
|
|
+ agent._add_error.assert_called_once()
|
|
+ call_args = agent._add_error.call_args
|
|
+ self.assertEqual(call_args[0][0], "agent_id")
|
|
+ self.assertIn("different TPM identity", call_args[0][1])
|
|
+ self.assertIn("aik_tpm", call_args[0][1])
|
|
+
|
|
+ def test_reregistration_different_ekcert(self):
|
|
+ """Test re-registration with different EK certificate is rejected."""
|
|
+ agent = self.create_mock_registrar_agent()
|
|
+ agent.committed = {
|
|
+ "ek_tpm": self.ek_tpm_1,
|
|
+ "ekcert": self.ek_cert_wrapped,
|
|
+ "aik_tpm": self.aik_tpm_1,
|
|
+ }
|
|
+ agent.changes = {
|
|
+ "ek_tpm": self.ek_tpm_1,
|
|
+ "ekcert": self.different_ek_cert_wrapped, # DIFFERENT
|
|
+ "aik_tpm": self.aik_tpm_1,
|
|
+ }
|
|
+
|
|
+ agent._check_tpm_identity_immutable()
|
|
+
|
|
+ # Should add error for agent_id field
|
|
+ agent._add_error.assert_called_once()
|
|
+ call_args = agent._add_error.call_args
|
|
+ self.assertEqual(call_args[0][0], "agent_id")
|
|
+ self.assertIn("different TPM identity", call_args[0][1])
|
|
+ self.assertIn("ekcert", call_args[0][1])
|
|
+
|
|
+ def test_reregistration_multiple_fields_changed(self):
|
|
+ """Test re-registration with multiple fields changed lists all of them."""
|
|
+ agent = self.create_mock_registrar_agent()
|
|
+ agent.committed = {
|
|
+ "ek_tpm": self.ek_tpm_1,
|
|
+ "ekcert": self.ek_cert_wrapped,
|
|
+ "aik_tpm": self.aik_tpm_1,
|
|
+ }
|
|
+ agent.changes = {
|
|
+ "ek_tpm": self.ek_tpm_2, # DIFFERENT
|
|
+ "ekcert": self.different_ek_cert_wrapped, # DIFFERENT
|
|
+ "aik_tpm": self.aik_tpm_2, # DIFFERENT
|
|
+ }
|
|
+
|
|
+ agent._check_tpm_identity_immutable()
|
|
+
|
|
+ # Should add error listing all changed fields
|
|
+ agent._add_error.assert_called_once()
|
|
+ call_args = agent._add_error.call_args
|
|
+ self.assertEqual(call_args[0][0], "agent_id")
|
|
+ error_message = call_args[0][1]
|
|
+ self.assertIn("ek_tpm", error_message)
|
|
+ self.assertIn("ekcert", error_message)
|
|
+ self.assertIn("aik_tpm", error_message)
|
|
+
|
|
+ def test_adding_ekcert_to_existing_agent(self):
|
|
+ """Test that adding EK cert to existing agent (without cert) is allowed."""
|
|
+ agent = self.create_mock_registrar_agent()
|
|
+ agent.committed = {
|
|
+ "ek_tpm": self.ek_tpm_1,
|
|
+ "ekcert": None, # Previously no cert
|
|
+ "aik_tpm": self.aik_tpm_1,
|
|
+ }
|
|
+ agent.changes = {
|
|
+ "ek_tpm": self.ek_tpm_1,
|
|
+ "ekcert": self.ek_cert_wrapped, # NOW adding cert
|
|
+ "aik_tpm": self.aik_tpm_1,
|
|
+ }
|
|
+
|
|
+ agent._check_tpm_identity_immutable()
|
|
+
|
|
+ # Should not add any errors - adding cert is allowed
|
|
+ agent._add_error.assert_not_called()
|
|
+
|
|
+ def test_removing_ek_tpm_rejected(self):
|
|
+ """Test that removing an existing EK TPM is rejected."""
|
|
+ agent = self.create_mock_registrar_agent()
|
|
+ agent.committed = {
|
|
+ "ek_tpm": self.ek_tpm_1,
|
|
+ "ekcert": self.ek_cert_wrapped,
|
|
+ "aik_tpm": self.aik_tpm_1,
|
|
+ }
|
|
+ agent.changes = {
|
|
+ "ek_tpm": None, # Trying to remove
|
|
+ "ekcert": self.ek_cert_wrapped,
|
|
+ "aik_tpm": self.aik_tpm_1,
|
|
+ }
|
|
+
|
|
+ agent._check_tpm_identity_immutable()
|
|
+
|
|
+ # Should add error
|
|
+ agent._add_error.assert_called_once()
|
|
+ call_args = agent._add_error.call_args
|
|
+ self.assertIn("ek_tpm", call_args[0][1])
|
|
+
|
|
+ def test_iak_idevid_changes_not_checked(self):
|
|
+ """Test that IAK/IDevID field changes are NOT checked (allowed)."""
|
|
+ agent = self.create_mock_registrar_agent()
|
|
+ agent.committed = {
|
|
+ "ek_tpm": self.ek_tpm_1,
|
|
+ "ekcert": self.ek_cert_wrapped,
|
|
+ "aik_tpm": self.aik_tpm_1,
|
|
+ "iak_tpm": self.iak_tpm_1,
|
|
+ "idevid_tpm": b"IDEVID_OLD",
|
|
+ }
|
|
+ agent.changes = {
|
|
+ "ek_tpm": self.ek_tpm_1, # Same
|
|
+ "ekcert": self.ek_cert_wrapped, # Same
|
|
+ "aik_tpm": self.aik_tpm_1, # Same
|
|
+ "iak_tpm": self.iak_tpm_2, # DIFFERENT - but not checked
|
|
+ "idevid_tpm": b"IDEVID_NEW", # DIFFERENT - but not checked
|
|
+ }
|
|
+
|
|
+ agent._check_tpm_identity_immutable()
|
|
+
|
|
+ # Should not add any errors - IAK/IDevID are not checked
|
|
+ agent._add_error.assert_not_called()
|
|
+
|
|
+ def test_only_changed_fields_are_checked(self):
|
|
+ """Test that only fields in changes dict are checked."""
|
|
+ agent = self.create_mock_registrar_agent()
|
|
+ agent.committed = {
|
|
+ "ek_tpm": self.ek_tpm_1,
|
|
+ "ekcert": self.ek_cert_wrapped,
|
|
+ "aik_tpm": self.aik_tpm_1,
|
|
+ }
|
|
+ # Only updating IP, not touching TPM identity fields
|
|
+ agent.changes = {
|
|
+ "ip": "192.168.1.100",
|
|
+ }
|
|
+
|
|
+ agent._check_tpm_identity_immutable()
|
|
+
|
|
+ # Should not add any errors - no identity fields changed
|
|
+ agent._add_error.assert_not_called()
|
|
+
|
|
+ def test_base64_encoded_tpm_keys(self):
|
|
+ """Test that base64-encoded TPM keys are properly compared."""
|
|
+ agent = self.create_mock_registrar_agent()
|
|
+
|
|
+ # Simulate keys stored as base64 strings (as they might be from database)
|
|
+ ek_b64 = base64.b64encode(self.ek_tpm_1).decode("utf-8")
|
|
+ aik_b64 = base64.b64encode(self.aik_tpm_1).decode("utf-8")
|
|
+
|
|
+ agent.committed = {
|
|
+ "ek_tpm": self.ek_tpm_1, # As bytes
|
|
+ "aik_tpm": self.aik_tpm_1, # As bytes
|
|
+ }
|
|
+ agent.changes = {
|
|
+ "ek_tpm": ek_b64, # As base64 string
|
|
+ "aik_tpm": aik_b64, # As base64 string
|
|
+ }
|
|
+
|
|
+ agent._check_tpm_identity_immutable()
|
|
+
|
|
+ # Should not add any errors - should handle both formats
|
|
+ agent._add_error.assert_not_called()
|
|
+
|
|
+ def test_partial_update_only_one_field(self):
|
|
+ """Test updating only one TPM field while others remain unchanged."""
|
|
+ agent = self.create_mock_registrar_agent()
|
|
+ agent.committed = {
|
|
+ "ek_tpm": self.ek_tpm_1,
|
|
+ "ekcert": self.ek_cert_wrapped,
|
|
+ "aik_tpm": self.aik_tpm_1,
|
|
+ }
|
|
+ # Only changing AIK in this update
|
|
+ agent.changes = {
|
|
+ "aik_tpm": self.aik_tpm_2, # DIFFERENT
|
|
+ }
|
|
+
|
|
+ agent._check_tpm_identity_immutable()
|
|
+
|
|
+ # Should add error for the changed field
|
|
+ agent._add_error.assert_called_once()
|
|
+ call_args = agent._add_error.call_args
|
|
+ self.assertIn("aik_tpm", call_args[0][1])
|
|
+
|
|
+
|
|
+if __name__ == "__main__":
|
|
+ unittest.main()
|
|
--
|
|
2.47.3
|
|
|