From 2d809d8b537c0d9faab05ee5fe7efb85f48918f3 Mon Sep 17 00:00:00 2001 From: Anderson Toshiyuki Sasaki Date: Fri, 13 Mar 2026 10:53:54 +0100 Subject: [PATCH] refactor: Remove dead code AuthSession.authenticate_agent() authenticate_agent() was superseded by _extract_identity() in action_handler.py, which performs token-based agent authentication directly via AuthSession.get_by_token(). The method, its helper get_session(), the module-level _engine global, and the associated unused imports (Session, SessionManager, make_engine) are all removed. The corresponding tests (test_authenticate_agent_success, test_authenticate_agent_inactive_session, test_authenticate_agent_no_session) are also removed. Co-Authored-By: Claude Opus 4.6 Signed-off-by: Anderson Toshiyuki Sasaki --- keylime/models/verifier/auth_session.py | 67 +----------- keylime/web/verifier/session_controller.py | 9 +- test/test_auth_session.py | 113 +-------------------- test/test_session_controller.py | 52 +++++----- 4 files changed, 32 insertions(+), 209 deletions(-) diff --git a/keylime/models/verifier/auth_session.py b/keylime/models/verifier/auth_session.py index 918dfb4..b0b40b0 100644 --- a/keylime/models/verifier/auth_session.py +++ b/keylime/models/verifier/auth_session.py @@ -1,12 +1,8 @@ import base64 import hmac -import threading import uuid -from contextlib import contextmanager from datetime import timedelta -from typing import Any, Dict, Iterator, Optional, Sequence - -from sqlalchemy.orm import Session +from typing import Any, Dict, Optional, Sequence from keylime import config, keylime_logging from keylime.crypto import ( @@ -16,7 +12,6 @@ from keylime.crypto import ( parse_session_token, verify_token_hash, ) -from keylime.db.keylime_db import SessionManager, make_engine from keylime.db.verifier_db import VerfierMain from keylime.models.base import * from keylime.shared_data import get_shared_memory @@ -31,21 +26,6 @@ from keylime.tpm.tpm_main import Tpm logger = keylime_logging.init_logging("verifier") -_engine = None -_engine_lock = threading.Lock() -_session_manager = SessionManager() - - -@contextmanager -def get_session_context() -> Iterator[Session]: - global _engine - if _engine is None: - with _engine_lock: - if _engine is None: - _engine = make_engine("cloud_verifier") - with _session_manager.session_context(_engine) as session: - yield session - class AuthSession(PersistableModel): # Explicit attribute declarations for type checkers @@ -244,51 +224,6 @@ class AuthSession(PersistableModel): # Slow path: query database by primary key return cls.get(session_id) # type: ignore[return-value] - @classmethod - def authenticate_agent(cls, token: str): # type: ignore[no-untyped-def] - """Authenticate an agent using their session token. - - Uses indexed database lookup by token hash for performance (O(1) instead of O(n)). - Tokens are hashed before lookup since only hashes are stored in the database. - - Args: - token: The session token to verify - - Returns: - VerfierMain object if authenticated, False otherwise - """ - # Use indexed lookup by token hash (much faster than scanning all sessions) - auth_session = cls.get_by_token(token) - - if not auth_session: - return False - - # Validate session is active - if not getattr(auth_session, "active", False): - return False - - # Validate session hasn't expired - token_expires_at = getattr(auth_session, "token_expires_at", None) - if token_expires_at and token_expires_at < Timestamp.now(): - logger.debug( - "Authentication attempted with expired token for agent '%s' (expired at %s)", - getattr(auth_session, "agent_id", "unknown"), - token_expires_at, - ) - return False - - # Use old engine to query VerfierMain (legacy model) - with get_session_context() as session: - agent = ( - session.query(VerfierMain) - .filter(VerfierMain.agent_id == auth_session.agent_id) # type: ignore[attr-defined] - .one_or_none() - ) - if agent: - session.expunge(agent) # type: ignore[no-untyped-call] - - return agent - @classmethod def create( cls, agent: Optional[VerfierMain], data: Dict[str, Any], agent_id: Optional[str] = None diff --git a/keylime/web/verifier/session_controller.py b/keylime/web/verifier/session_controller.py index c8664e2..9a314f2 100644 --- a/keylime/web/verifier/session_controller.py +++ b/keylime/web/verifier/session_controller.py @@ -2,9 +2,8 @@ import base64 from keylime import config, keylime_logging from keylime.db.verifier_db import VerfierMain -from keylime.models.base import Timestamp +from keylime.models.base import Timestamp, db_manager from keylime.models.verifier import AuthSession -from keylime.models.verifier.auth_session import get_session_context from keylime.web.base import Controller logger = keylime_logging.init_logging("verifier") @@ -186,7 +185,7 @@ class SessionController(Controller): # Check if agent exists - this is where we validate enrollment agent = None - with get_session_context() as session: + with db_manager.session_context() as session: agent = session.query(VerfierMain).filter(VerfierMain.agent_id == agent_id).one_or_none() if agent: session.expunge(agent) # type: ignore[no-untyped-call] @@ -384,7 +383,7 @@ class SessionController(Controller): # POST /v3[.:minor]/agents/:agent_id/session def create(self, agent_id, **params): agent = None - with get_session_context() as session: + with db_manager.session_context() as session: agent = session.query(VerfierMain).filter(VerfierMain.agent_id == agent_id).one_or_none() if agent: session.expunge(agent) # type: ignore[no-untyped-call] @@ -410,7 +409,7 @@ class SessionController(Controller): def update(self, agent_id, token, **params): agent = None - with get_session_context() as session: + with db_manager.session_context() as session: agent = session.query(VerfierMain).filter(VerfierMain.agent_id == agent_id).one_or_none() if agent: session.expunge(agent) # type: ignore[no-untyped-call] diff --git a/test/test_auth_session.py b/test/test_auth_session.py index 2c78547..dd554b6 100644 --- a/test/test_auth_session.py +++ b/test/test_auth_session.py @@ -2,74 +2,15 @@ import base64 import unittest -from contextlib import contextmanager from datetime import timedelta from unittest.mock import MagicMock, PropertyMock, patch from keylime.crypto import generate_session_token, generate_token_salt, hash_token_for_storage from keylime.models.base.types import Timestamp -from keylime.models.verifier.auth_session import AuthSession, get_session_context +from keylime.models.verifier.auth_session import AuthSession from keylime.shared_data import cleanup_global_shared_memory, get_shared_memory -class TestGetSessionContext(unittest.TestCase): - """Test cases for get_session_context context manager.""" - - def _make_mock_session_manager(self, mock_session): - """Create a mock SessionManager whose session_context() mirrors real lifecycle.""" - mock_scoped = MagicMock() - mock_session_manager = MagicMock() - mock_session_manager.make_session.return_value = mock_session - mock_session_manager._scoped_session = mock_scoped # pylint: disable=protected-access - - @contextmanager - def fake_session_context(engine): # pylint: disable=unused-argument - session = mock_session_manager.make_session(engine) - try: - yield session - session.commit() - except Exception: - session.rollback() - raise - finally: - scoped = mock_session_manager._scoped_session # pylint: disable=protected-access - if scoped is not None: - scoped.remove() - - mock_session_manager.session_context = fake_session_context - return mock_session_manager, mock_scoped - - @patch("keylime.models.verifier.auth_session.make_engine") - def test_session_cleanup_on_normal_exit(self, _mock_make_engine): - """Test that session is committed and cleaned up when context manager exits normally.""" - mock_session = MagicMock() - mock_session_manager, mock_scoped = self._make_mock_session_manager(mock_session) - - with patch("keylime.models.verifier.auth_session._engine", None): - with patch("keylime.models.verifier.auth_session._session_manager", mock_session_manager): - with get_session_context() as session: - self.assertIs(session, mock_session) - - mock_session.commit.assert_called_once() - mock_scoped.remove.assert_called_once() - - @patch("keylime.models.verifier.auth_session.make_engine") - def test_session_rollback_on_exception(self, _mock_make_engine): - """Test that session is rolled back and cleaned up when an exception occurs.""" - mock_session = MagicMock() - mock_session_manager, mock_scoped = self._make_mock_session_manager(mock_session) - - with patch("keylime.models.verifier.auth_session._engine", None): - with patch("keylime.models.verifier.auth_session._session_manager", mock_session_manager): - with self.assertRaises(RuntimeError): - with get_session_context(): - raise RuntimeError("simulated error") - - mock_session.rollback.assert_called_once() - mock_session.commit.assert_not_called() - mock_scoped.remove.assert_called_once() - - class TestAuthSessionHelpers(unittest.TestCase): """Test cases for AuthSession helper methods.""" @@ -457,58 +398,6 @@ class TestAuthSessionCore(unittest.TestCase): self.assertIn("errors", result) self.assertIn("authentication_supported", result["errors"]) - @patch("keylime.models.verifier.auth_session.get_session_context") - @patch.object(AuthSession, "get_by_token") - def test_authenticate_agent_success(self, mock_get_by_token, mock_get_session): - """Test successful agent authentication with valid token.""" - # Create a mock agent - mock_agent = MagicMock() - mock_agent.agent_id = self.test_agent_id - - # Mock session query - mock_db_session = MagicMock() - mock_db_session.query.return_value.filter.return_value.one_or_none.return_value = mock_agent - mock_get_session.return_value.__enter__ = MagicMock(return_value=mock_db_session) - mock_get_session.return_value.__exit__ = MagicMock(return_value=False) - - # Mock AuthSession.get_by_token to return an active session - mock_auth_session = MagicMock() - mock_auth_session.session_id = "550e8400-e29b-41d4-a716-446655440000" - mock_auth_session.active = True - mock_auth_session.agent_id = self.test_agent_id - mock_auth_session.token_expires_at = Timestamp.now() + timedelta(hours=1) - mock_get_by_token.return_value = mock_auth_session - - result = AuthSession.authenticate_agent("test-token") - - # Should return the agent - self.assertIsNotNone(result) - self.assertEqual(result.agent_id, self.test_agent_id) # type: ignore[union-attr] - - @patch.object(AuthSession, "get_by_token") - def test_authenticate_agent_inactive_session(self, mock_get_by_token): - """Test that inactive sessions cannot authenticate.""" - # Mock AuthSession.get_by_token to return an inactive session - mock_auth_session = MagicMock() - mock_auth_session.active = False - mock_get_by_token.return_value = mock_auth_session - - result = AuthSession.authenticate_agent("test-token") - - # Should return False - self.assertFalse(result) - - @patch.object(AuthSession, "get_by_token") - def test_authenticate_agent_no_session(self, mock_get_by_token): - """Test that authentication fails when session doesn't exist.""" - # Mock AuthSession.get_by_token to return None (no session found) - mock_get_by_token.return_value = None - - result = AuthSession.authenticate_agent("test-token") - - # Should return False - self.assertFalse(result) - @patch.object(AuthSession, "empty") def test_create_with_agent(self, mock_empty): """Test AuthSession.create() with an enrolled agent.""" diff --git a/test/test_session_controller.py b/test/test_session_controller.py index eec7fef..f8db8db 100644 --- a/test/test_session_controller.py +++ b/test/test_session_controller.py @@ -272,8 +272,8 @@ class TestSessionControllerUpdateSession(unittest.TestCase): # Verify session was deleted from cache self.assertNotIn(self.test_session_id, self.sessions_cache) - @patch("keylime.web.verifier.session_controller.get_session_context") - def test_update_session_agent_not_enrolled(self, mock_get_session): + @patch("keylime.web.verifier.session_controller.db_manager") + def test_update_session_agent_not_enrolled(self, mock_db_manager): """Test update_session with unenrolled agent.""" # Create session in cache now = Timestamp.now() @@ -290,8 +290,8 @@ class TestSessionControllerUpdateSession(unittest.TestCase): # Mock database query to return no agent mock_session = MagicMock() mock_session.query.return_value.filter.return_value.one_or_none.return_value = None - mock_get_session.return_value.__enter__ = MagicMock(return_value=mock_session) - mock_get_session.return_value.__exit__ = MagicMock(return_value=False) + mock_db_manager.session_context.return_value.__enter__ = MagicMock(return_value=mock_session) + mock_db_manager.session_context.return_value.__exit__ = MagicMock(return_value=False) # Call update_session params = { @@ -319,9 +319,9 @@ class TestSessionControllerUpdateSession(unittest.TestCase): body = call_args[1]["body"] self.assertEqual(body["data"]["attributes"]["evaluation"], "fail") - @patch("keylime.web.verifier.session_controller.get_session_context") + @patch("keylime.web.verifier.session_controller.db_manager") @patch("keylime.models.verifier.auth_session.AuthSession.create_from_memory") - def test_update_session_authentication_failed(self, mock_create_from_memory, mock_get_session): + def test_update_session_authentication_failed(self, mock_create_from_memory, mock_db_manager): """Test update_session with failed authentication.""" # Create session in cache now = Timestamp.now() @@ -340,8 +340,8 @@ class TestSessionControllerUpdateSession(unittest.TestCase): mock_agent.agent_id = self.test_agent_id mock_session = MagicMock() mock_session.query.return_value.filter.return_value.one_or_none.return_value = mock_agent - mock_get_session.return_value.__enter__ = MagicMock(return_value=mock_session) - mock_get_session.return_value.__exit__ = MagicMock(return_value=False) + mock_db_manager.session_context.return_value.__enter__ = MagicMock(return_value=mock_session) + mock_db_manager.session_context.return_value.__exit__ = MagicMock(return_value=False) # Mock AuthSession.create_from_memory to return errors mock_auth_session = MagicMock() @@ -379,11 +379,11 @@ class TestSessionControllerUpdateSession(unittest.TestCase): call_args = self.controller.send_response.call_args # type: ignore[attr-defined] self.assertEqual(call_args[1]["code"], 401) - @patch("keylime.web.verifier.session_controller.get_session_context") + @patch("keylime.web.verifier.session_controller.db_manager") @patch("keylime.models.verifier.auth_session.AuthSession.create_from_memory") @patch("keylime.models.verifier.auth_session.AuthSession.delete_active_session_for_agent") @patch("keylime.web.verifier.session_controller.config") - def test_update_session_success(self, mock_config, _mock_delete_active, mock_create_from_memory, mock_get_session): + def test_update_session_success(self, mock_config, _mock_delete_active, mock_create_from_memory, mock_db_manager): """Test successful session update.""" # Create session in cache now = Timestamp.now() @@ -405,8 +405,8 @@ class TestSessionControllerUpdateSession(unittest.TestCase): mock_agent.agent_id = self.test_agent_id mock_session = MagicMock() mock_session.query.return_value.filter.return_value.one_or_none.return_value = mock_agent - mock_get_session.return_value.__enter__ = MagicMock(return_value=mock_session) - mock_get_session.return_value.__exit__ = MagicMock(return_value=False) + mock_db_manager.session_context.return_value.__enter__ = MagicMock(return_value=mock_session) + mock_db_manager.session_context.return_value.__exit__ = MagicMock(return_value=False) # Mock config mock_config.getboolean.return_value = False # Don't keep in memory @@ -525,17 +525,17 @@ class TestSessionControllerLegacyEndpoints(unittest.TestCase): self.assertEqual(call_args[0][0], 404) @patch("keylime.models.verifier.auth_session.AuthSession.delete_stale") - @patch("keylime.web.verifier.session_controller.get_session_context") + @patch("keylime.web.verifier.session_controller.db_manager") @patch("keylime.models.verifier.auth_session.AuthSession.create") - def test_create_success(self, mock_create, mock_get_session, _mock_delete_stale): + def test_create_success(self, mock_create, mock_db_manager, _mock_delete_stale): """Test successful create endpoint.""" # Mock database query mock_agent = MagicMock() mock_agent.agent_id = self.test_agent_id mock_session = MagicMock() mock_session.query.return_value.filter.return_value.one_or_none.return_value = mock_agent - mock_get_session.return_value.__enter__ = MagicMock(return_value=mock_session) - mock_get_session.return_value.__exit__ = MagicMock(return_value=False) + mock_db_manager.session_context.return_value.__enter__ = MagicMock(return_value=mock_session) + mock_db_manager.session_context.return_value.__exit__ = MagicMock(return_value=False) # Mock AuthSession.create mock_auth_session = MagicMock() @@ -553,14 +553,14 @@ class TestSessionControllerLegacyEndpoints(unittest.TestCase): call_args = self.controller.respond.call_args # type: ignore[attr-defined] self.assertEqual(call_args[0][0], 200) - @patch("keylime.web.verifier.session_controller.get_session_context") - def test_create_agent_not_found(self, mock_get_session): + @patch("keylime.web.verifier.session_controller.db_manager") + def test_create_agent_not_found(self, mock_db_manager): """Test create endpoint with non-existent agent.""" # Mock database query to return None mock_session = MagicMock() mock_session.query.return_value.filter.return_value.one_or_none.return_value = None - mock_get_session.return_value.__enter__ = MagicMock(return_value=mock_session) - mock_get_session.return_value.__exit__ = MagicMock(return_value=False) + mock_db_manager.session_context.return_value.__enter__ = MagicMock(return_value=mock_session) + mock_db_manager.session_context.return_value.__exit__ = MagicMock(return_value=False) # Call create params = {"data": {}} @@ -571,17 +571,17 @@ class TestSessionControllerLegacyEndpoints(unittest.TestCase): call_args = self.controller.respond.call_args # type: ignore[attr-defined] self.assertEqual(call_args[0][0], 404) - @patch("keylime.web.verifier.session_controller.get_session_context") + @patch("keylime.web.verifier.session_controller.db_manager") @patch("keylime.models.verifier.auth_session.AuthSession.get_by_token") - def test_update_success(self, mock_get, mock_get_session): + def test_update_success(self, mock_get, mock_db_manager): """Test successful update endpoint.""" # Mock database query mock_agent = MagicMock() mock_agent.agent_id = self.test_agent_id mock_session = MagicMock() mock_session.query.return_value.filter.return_value.one_or_none.return_value = mock_agent - mock_get_session.return_value.__enter__ = MagicMock(return_value=mock_session) - mock_get_session.return_value.__exit__ = MagicMock(return_value=False) + mock_db_manager.session_context.return_value.__enter__ = MagicMock(return_value=mock_session) + mock_db_manager.session_context.return_value.__exit__ = MagicMock(return_value=False) # Mock AuthSession.get_by_token mock_auth_session = MagicMock() @@ -601,9 +601,9 @@ class TestSessionControllerLegacyEndpoints(unittest.TestCase): call_args = self.controller.respond.call_args # type: ignore[attr-defined] self.assertEqual(call_args[0][0], 200) - @patch("keylime.web.verifier.session_controller.get_session_context") + @patch("keylime.web.verifier.session_controller.db_manager") @patch("keylime.models.verifier.auth_session.AuthSession.get_by_token") - def test_update_not_found(self, mock_get, _mock_get_session): + def test_update_not_found(self, mock_get, _mock_db_manager): """Test update endpoint with non-existent session.""" # Mock AuthSession.get_by_token to return None mock_get.return_value = None -- 2.53.0