import Oracle_OSS keylime-7.12.1-11.el9_7.5

This commit is contained in:
AlmaLinux RelEng Bot 2026-05-05 14:31:48 -04:00
parent e40691c181
commit 4e822a53cf
2 changed files with 961 additions and 1 deletions

View File

@ -0,0 +1,950 @@
From 1d3c529c753efa6420da051be23eb5df387e17ee Mon Sep 17 00:00:00 2001
From: Sergio Correia <scorreia@redhat.com>
Date: Fri, 17 Apr 2026 09:27:23 +0100
Subject: [PATCH 17/17] Backport tenant version negotiation mechanism
Original PRs:
- https://github.com/keylime/keylime/pull/1838
- https://github.com/keylime/keylime/pull/1845
Signed-off-by: Sergio Correia <scorreia@redhat.com>
---
keylime/api_version.py | 41 ++++-
keylime/cloud_verifier_tornado.py | 93 ++++++++--
keylime/registrar_client.py | 63 ++++++-
keylime/tenant.py | 280 +++++++++++++++++++++++++-----
test/test_api_version.py | 65 +++++++
5 files changed, 470 insertions(+), 72 deletions(-)
diff --git a/keylime/api_version.py b/keylime/api_version.py
index 1936260..de383c5 100644
--- a/keylime/api_version.py
+++ b/keylime/api_version.py
@@ -1,6 +1,6 @@
import re
from logging import Logger
-from typing import Dict, List, Union
+from typing import Dict, List, Optional, Union
from packaging import version
@@ -34,6 +34,45 @@ def all_versions() -> List[str]:
return VERSIONS.copy()
+def negotiate_version(
+ remote_versions: Union[str, List[str]],
+ local_versions: Optional[List[str]] = None,
+ raise_on_error: bool = False,
+) -> Optional[str]:
+ """
+ Negotiate highest API version supported by both local and remote components.
+
+ Args:
+ remote_versions: Single version string or list from remote component
+ local_versions: Versions supported locally (default: all_versions())
+ raise_on_error: If True, raise ValueError when no compatible version found.
+ If False (default), return None.
+
+ Returns:
+ Highest mutually supported version, or None if incompatible and raise_on_error=False
+
+ Raises:
+ ValueError: If no compatible version found and raise_on_error=True
+ """
+ if local_versions is None:
+ local_versions = all_versions()
+
+ if isinstance(remote_versions, str):
+ remote_versions = [remote_versions]
+
+ common = set(remote_versions) & set(local_versions)
+
+ if not common:
+ if raise_on_error:
+ raise ValueError(
+ f"No compatible API version found. "
+ f"Local supports: {local_versions}, Remote supports: {remote_versions}"
+ )
+ return None
+
+ return max(common, key=version.parse)
+
+
def is_supported_version(version_type: VersionType) -> bool:
try:
v_obj = version.parse(str(version_type))
diff --git a/keylime/cloud_verifier_tornado.py b/keylime/cloud_verifier_tornado.py
index 67ba8af..81db22f 100644
--- a/keylime/cloud_verifier_tornado.py
+++ b/keylime/cloud_verifier_tornado.py
@@ -518,6 +518,22 @@ class AgentsHandler(BaseHandler):
logger.warning("POST returning 400 response. Expected non zero content length.")
else:
json_body = json.loads(self.request.body)
+
+ # Validate supported_version from tenant
+ supported_version = json_body.get("supported_version")
+ if supported_version:
+ if not keylime_api_version.is_supported_version(supported_version):
+ logger.warning(
+ "Agent %s requested API version %s which is not supported by verifier. "
+ "Verifier supports: %s. Will attempt version negotiation on first contact.",
+ agent_id,
+ supported_version,
+ keylime_api_version.all_versions(),
+ )
+ supported_version = keylime_api_version.current_version()
+ else:
+ supported_version = keylime_api_version.current_version()
+
agent_data = {
"v": json_body.get("v", None),
"ip": json_body["cloudagent_ip"],
@@ -531,7 +547,7 @@ class AgentsHandler(BaseHandler):
"accept_tpm_hash_algs": json_body["accept_tpm_hash_algs"],
"accept_tpm_encryption_algs": json_body["accept_tpm_encryption_algs"],
"accept_tpm_signing_algs": json_body["accept_tpm_signing_algs"],
- "supported_version": json_body["supported_version"],
+ "supported_version": supported_version,
"ak_tpm": json_body["ak_tpm"],
"mtls_cert": json_body.get("mtls_cert", None),
"hash_alg": "",
@@ -1422,7 +1438,11 @@ class MbpolicyHandler(BaseHandler):
async def update_agent_api_version(agent: Dict[str, Any], timeout: float = 60.0) -> Union[Dict[str, Any], None]:
+ """
+ Query agent's /version endpoint and negotiate compatible API version.
+ """
agent_id = agent["agent_id"]
+ old_version = agent.get("supported_version")
logger.info("Agent %s API version bump detected, trying to update stored API version", agent_id)
kwargs = {}
@@ -1447,31 +1467,67 @@ async def update_agent_api_version(agent: Dict[str, Any], timeout: float = 60.0)
try:
json_response = json.loads(response.body)
- new_version = json_response["results"]["supported_version"]
- old_version = agent["supported_version"]
- # Only update the API version to use if it is supported by the verifier
- if new_version in keylime_api_version.all_versions():
- new_version_tuple = str_to_version(new_version)
- old_version_tuple = str_to_version(old_version)
+ # Try new format first (list of versions)
+ agent_versions = json_response["results"].get("supported_versions")
- assert new_version_tuple, f"Agent {agent_id} version {new_version} is invalid"
- assert old_version_tuple, f"Agent {agent_id} version {old_version} is invalid"
+ # Fall back to old format (single version)
+ if agent_versions is None:
+ agent_versions = json_response["results"].get("supported_version")
- # Check that the new version is greater than current version
- if new_version_tuple <= old_version_tuple:
- logger.warning(
- "Agent %s API version %s is lower or equal to previous version %s",
+ if agent_versions:
+ # Negotiate compatible version
+ negotiated = keylime_api_version.negotiate_version(agent_versions)
+
+ if negotiated is None:
+ logger.error(
+ "No compatible API version between verifier and agent %s. "
+ "Agent supports: %s, Verifier supports: %s",
agent_id,
- new_version,
- old_version,
+ agent_versions,
+ keylime_api_version.all_versions(),
)
return None
- logger.info("Agent %s new API version %s is supported", agent_id, new_version)
+ # Check if version actually changed
+ if negotiated == old_version:
+ logger.debug("Agent %s already using negotiated version %s", agent_id, negotiated)
+ return agent
+
+ # Validate negotiated version
+ if not keylime_api_version.validate_version(negotiated):
+ logger.error("Negotiated version %s for agent %s is invalid", negotiated, agent_id)
+ return None
+
+ # Check that the negotiated version is greater than current version (prevent downgrade)
+ negotiated_tuple = str_to_version(negotiated)
+ if not negotiated_tuple:
+ logger.error("Agent %s negotiated version %s is invalid", agent_id, negotiated)
+ return None
+
+ # Only check for downgrade if there was a previous version and successful attestation.
+ # If attestation_count == 0, the stored version might be a fallback guess from the tenant,
+ # not a version the agent actually supported, so we allow the "downgrade".
+ attestation_count = agent.get("attestation_count", 0)
+ if old_version is not None and attestation_count > 0:
+ old_version_tuple = str_to_version(old_version)
+ if not old_version_tuple:
+ logger.error("Agent %s stored version %s is invalid", agent_id, old_version)
+ return None
+
+ if negotiated_tuple <= old_version_tuple:
+ logger.warning(
+ "Agent %s API version %s is lower or equal to previous version %s",
+ agent_id,
+ negotiated,
+ old_version,
+ )
+ return None
+
+ logger.info("Agent %s new API version %s is supported", agent_id, negotiated)
with session_context() as session:
- agent["supported_version"] = new_version
+ agent["supported_version"] = negotiated
# Remove keys that should not go to the DB
agent_db = dict(agent)
@@ -1481,8 +1537,9 @@ async def update_agent_api_version(agent: Dict[str, Any], timeout: float = 60.0)
session.query(VerfierMain).filter_by(agent_id=agent_id).update(agent_db) # pyright: ignore
# session.commit() is automatically called by context manager
+
else:
- logger.warning("Agent %s new API version %s is not supported", agent_id, new_version)
+ logger.warning("Agent %s did not provide version information", agent_id)
return None
except SQLAlchemyError as e:
diff --git a/keylime/registrar_client.py b/keylime/registrar_client.py
index 97fbc2a..e32707b 100644
--- a/keylime/registrar_client.py
+++ b/keylime/registrar_client.py
@@ -25,7 +25,7 @@ class RegistrarData(TypedDict):
logger = keylime_logging.init_logging("registrar_client")
-api_version = keylime_api_version.current_version()
+API_VERSION = keylime_api_version.current_version()
MANDATORY_FIELDS = ["aik_tpm", "regcount", "ek_tpm", "ip", "port"]
@@ -38,24 +38,61 @@ def check_mandatory_fields(results: Dict[str, Any]) -> bool:
return True
+def getVersions(
+ registrar_ip: str,
+ registrar_port: str,
+ tls_context: Optional[ssl.SSLContext],
+) -> Optional[Dict[str, Any]]:
+ """
+ Fetch supported API versions from the registrar's /version endpoint.
+
+ :returns: JSON structure containing version info, or None on failure
+ """
+ try:
+ client = RequestsClient(f"{bracketize_ipv6(registrar_ip)}:{registrar_port}", True, tls_context=tls_context)
+ response = client.get("/version")
+
+ if response.status_code != 200:
+ logger.warning("Failed to get versions from registrar: %s", response.status_code)
+ return None
+
+ response_body: Dict[str, Any] = response.json()
+ if "results" not in response_body:
+ logger.warning("Unexpected response format from registrar /version endpoint")
+ return None
+
+ results: Dict[str, Any] = response_body["results"]
+ return results
+
+ except Exception as e:
+ logger.warning("Error fetching versions from registrar: %s", e)
+ return None
+
+
def getData(
- registrar_ip: str, registrar_port: str, agent_id: str, tls_context: Optional[ssl.SSLContext]
+ registrar_ip: str,
+ registrar_port: str,
+ agent_id: str,
+ tls_context: Optional[ssl.SSLContext],
+ api_version: Optional[str] = None,
) -> Optional[RegistrarData]:
"""
Get the agent data from the registrar.
This is called by the tenant code
+ :param api_version: Optional API version to use. If None, uses current version.
:returns: JSON structure containing the agent data
"""
# make absolutely sure you don't ask for data that contains AIK keys unauthenticated
if not tls_context:
raise Exception("It is unsafe to use this interface to query AIKs without server-authenticated TLS.")
+ version = api_version or API_VERSION
response = None
try:
client = RequestsClient(f"{bracketize_ipv6(registrar_ip)}:{registrar_port}", True, tls_context=tls_context)
- response = client.get(f"/v{api_version}/agents/{agent_id}")
+ response = client.get(f"/v{version}/agents/{agent_id}")
response_body = response.json()
if response.status_code == 404:
@@ -106,18 +143,23 @@ def getData(
def doRegistrarDelete(
- registrar_ip: str, registrar_port: str, agent_id: str, tls_context: Optional[ssl.SSLContext]
+ registrar_ip: str,
+ registrar_port: str,
+ agent_id: str,
+ tls_context: Optional[ssl.SSLContext],
+ api_version: Optional[str] = None,
) -> Dict[str, Any]:
"""
Delete the given agent from the registrar.
This is called by the tenant code
+ :param api_version: Optional API version to use. If None, uses current version.
:returns: The request response body
"""
-
+ version = api_version or API_VERSION
client = RequestsClient(f"{bracketize_ipv6(registrar_ip)}:{registrar_port}", True, tls_context=tls_context)
- response = client.delete(f"/v{api_version}/agents/{agent_id}")
+ response = client.delete(f"/v{version}/agents/{agent_id}")
response_body: Dict[str, Any] = response.json()
if response.status_code == 200:
@@ -130,17 +172,22 @@ def doRegistrarDelete(
def doRegistrarList(
- registrar_ip: str, registrar_port: str, tls_context: Optional[ssl.SSLContext]
+ registrar_ip: str,
+ registrar_port: str,
+ tls_context: Optional[ssl.SSLContext],
+ api_version: Optional[str] = None,
) -> Optional[Dict[str, Any]]:
"""
Get the list of registered agents from the registrar.
This is called by the tenant code
+ :param api_version: Optional API version to use. If None, uses current version.
:returns: The request response body
"""
+ version = api_version or API_VERSION
client = RequestsClient(f"{bracketize_ipv6(registrar_ip)}:{registrar_port}", True, tls_context=tls_context)
- response = client.get(f"/v{api_version}/agents/")
+ response = client.get(f"/v{version}/agents/")
response_body: Dict[str, Any] = response.json()
if response.status_code != 200:
diff --git a/keylime/tenant.py b/keylime/tenant.py
index feb46dc..08b8a51 100644
--- a/keylime/tenant.py
+++ b/keylime/tenant.py
@@ -54,6 +54,7 @@ class Tenant:
registrar_data: Optional[registrar_client.RegistrarData] = None
api_version: Optional[str] = None
+ push_model: bool = False
# uuid_service_generate_locally = None
agent_uuid: str = ""
@@ -76,7 +77,10 @@ class Tenant:
mb_policy = None
mb_policy_name: str = ""
- supported_version: Optional[str] = None
+ supported_version: Optional[str] = None # Deprecated: use agent_api_version
+ agent_api_version: Optional[str] = None
+ verifier_api_version: Optional[str] = None
+ registrar_api_version: Optional[str] = None
client_cert = None
client_key = None
@@ -177,6 +181,103 @@ class Tenant:
if self.registrar_ip:
self.registrar_fid_str = f"{self.registrar_fid_str} ({self.registrar_ip}:{self.registrar_port})"
+ def _fetch_server_versions(
+ self, base_url: str, tls_context: Optional[ssl.SSLContext], server_name: str
+ ) -> Optional[List[str]]:
+ """Fetch supported versions from a server's /version endpoint."""
+ try:
+ client = RequestsClient(base_url, True, tls_context=tls_context)
+ response = client.get("/version", timeout=self.request_timeout)
+
+ if response.status_code == 410:
+ logger.debug("%s returned 410 (push mode), falling back to current version", server_name)
+ return None
+
+ if response.status_code != 200:
+ logger.warning("Failed to get versions from %s: %s", server_name, response.status_code)
+ return None
+
+ response_body: Dict[str, Any] = response.json()
+ if "results" not in response_body or "supported_versions" not in response_body["results"]:
+ logger.warning("Unexpected response format from %s /version endpoint", server_name)
+ return None
+
+ versions: List[str] = response_body["results"]["supported_versions"]
+ return versions
+
+ except Exception as e:
+ logger.warning("Error fetching versions from %s: %s", server_name, e)
+ return None
+
+ def _negotiate_server_version(self, server_versions: Optional[List[str]], server_name: str) -> str:
+ """Negotiate API version with a server.
+
+ In pull mode, API version 3.0+ is excluded since it is for push attestation only.
+ In push mode, all versions including 3.0 are available.
+ """
+ if self.push_model:
+ local_versions = None
+ else:
+ local_versions = [v for v in keylime_api_version.all_versions() if keylime_api_version.major(v) < 3]
+
+ if server_versions is None:
+ if self.api_version is None:
+ raise UserError("Tenant API version is not set")
+ logger.debug("Cannot negotiate version with %s, using current version %s", server_name, self.api_version)
+ return self.api_version
+
+ try:
+ negotiated = keylime_api_version.negotiate_version(server_versions, local_versions, raise_on_error=True)
+ if negotiated is None:
+ raise UserError(f"Failed to negotiate API version with {server_name}")
+ logger.debug("Negotiated API version %s with %s", negotiated, server_name)
+ return negotiated
+ except ValueError:
+ supported = local_versions if local_versions is not None else keylime_api_version.all_versions()
+ raise UserError(
+ f"No compatible API version with {server_name}. "
+ f"Tenant supports: {supported}, "
+ f"Server supports: {server_versions}"
+ ) from None
+
+ def negotiate_verifier_version(self) -> None:
+ """Fetch and negotiate API version with the verifier."""
+ server_versions = self._fetch_server_versions(self.verifier_base_url, self.tls_context, "verifier")
+ self.verifier_api_version = self._negotiate_server_version(server_versions, "verifier")
+ logger.info("Using API version %s for verifier communication", self.verifier_api_version)
+
+ def negotiate_registrar_version(self) -> None:
+ """Fetch and negotiate API version with the registrar."""
+ if not self.registrar_ip or not self.registrar_port:
+ raise UserError("registrar_ip and registrar_port must be configured for version negotiation")
+
+ base_url = f"{bracketize_ipv6(self.registrar_ip)}:{self.registrar_port}"
+ server_versions = self._fetch_server_versions(base_url, self.tls_context, "registrar")
+ self.registrar_api_version = self._negotiate_server_version(server_versions, "registrar")
+ logger.info("Using API version %s for registrar communication", self.registrar_api_version)
+
+ def ensure_verifier_version(self) -> str:
+ """Ensure verifier API version is negotiated and return it.
+
+ Performs lazy negotiation: only negotiates if not already done.
+ """
+ if self.verifier_api_version is None:
+ self.negotiate_verifier_version()
+ if self.verifier_api_version is None:
+ raise UserError("Failed to negotiate API version with verifier")
+ return self.verifier_api_version
+
+ def ensure_registrar_version(self) -> str:
+ """Ensure registrar API version is negotiated and return it.
+
+ Performs lazy negotiation: only negotiates if not already done.
+ """
+ if self.registrar_api_version is None:
+ self.negotiate_registrar_version()
+ if self.registrar_api_version is None:
+ raise UserError("Failed to negotiate API version with registrar")
+ return self.registrar_api_version
+
def init_add(self, args: Dict[str, Any]) -> None:
"""Set up required values. Command line options can overwrite these config values
@@ -192,7 +293,11 @@ class Tenant:
if not self.registrar_ip or not self.registrar_port:
raise UserError("registrar_ip and registrar_port have both to be set in the configuration")
self.registrar_data = registrar_client.getData(
- self.registrar_ip, self.registrar_port, self.agent_uuid, self.tls_context
+ self.registrar_ip,
+ self.registrar_port,
+ self.agent_uuid,
+ self.tls_context,
+ api_version=self.ensure_registrar_version(),
)
if self.registrar_data is None:
@@ -218,13 +323,15 @@ class Tenant:
self.set_full_id_str()
- # Auto-detection for API version
- self.supported_version = args["supported_version"]
+ # Auto-detection for agent API version
+ self.agent_api_version = args.get("agent_api_version")
+ self.supported_version = self.agent_api_version
# Default to 1.0 if the agent did not send a mTLS certificate
- if self.registrar_data.get("mtls_cert", None) is None and self.supported_version is None:
- self.supported_version = "1.0"
- else:
- # Try to connect to the agent to get supported version
+ if self.registrar_data.get("mtls_cert", None) is None and self.agent_api_version is None:
+ self.agent_api_version = "1.0"
+ # Try to contact agent to get API version if in pull-mode and we have contact info
+ # Skip in push-mode since agent will send attestations to verifier directly
+ elif not self.push_model and self.agent_ip is not None and self.agent_port is not None:
if self.registrar_data["mtls_cert"] == "disabled":
self.enable_agent_mtls = False
logger.warning(
@@ -269,18 +376,66 @@ class Tenant:
if res and res.status_code == 200:
try:
data = res.json()
- api_version = data["results"]["supported_version"]
- if keylime_api_version.validate_version(api_version) and self.supported_version is None:
- self.supported_version = api_version
+
+ # Try new format first (list of versions)
+ agent_versions = data["results"].get("supported_versions")
+
+ # Fall back to old format (single version) for backward compatibility
+ if agent_versions is None:
+ agent_versions = data["results"].get("supported_version")
+
+ if agent_versions:
+ negotiated = keylime_api_version.negotiate_version(agent_versions)
+
+ if negotiated is None:
+ logger.error(
+ "No compatible API version between tenant and agent %s. "
+ "Agent supports: %s, Tenant supports: %s",
+ self.agent_uuid,
+ agent_versions,
+ keylime_api_version.all_versions(),
+ )
+ raise UserError(
+ f"Agent {self.agent_uuid} has no compatible API version. "
+ f"Agent supports: {agent_versions}, "
+ f"Tenant supports: {keylime_api_version.all_versions()}"
+ )
+
+ if keylime_api_version.validate_version(negotiated) and self.agent_api_version is None:
+ self.agent_api_version = negotiated
+ logger.info(
+ "Negotiated API version %s with agent %s",
+ negotiated,
+ self.agent_uuid,
+ )
+ elif not keylime_api_version.validate_version(negotiated):
+ logger.warning(
+ "Negotiated version %s is invalid, using current: %s",
+ negotiated,
+ keylime_api_version.current_version(),
+ )
+ if self.agent_api_version is None:
+ self.agent_api_version = keylime_api_version.current_version()
else:
- logger.warning("API version provided by the agent is not valid")
- except (TypeError, KeyError):
- pass
+ logger.warning("Agent did not provide version information")
- if self.supported_version is None:
- api_version = keylime_api_version.current_version()
- logger.warning("Could not detect supported API version. Defaulting to %s", api_version)
- self.supported_version = api_version
+ except (TypeError, KeyError) as e:
+ logger.warning("Failed to parse agent version response: %s", e)
+
+ if self.agent_api_version is None:
+ fallback_version = keylime_api_version.current_version()
+ logger.warning(
+ "Could not detect supported API version. Defaulting to %s (push_model=%s, agent_ip=%s, agent_port=%s)",
+ fallback_version,
+ self.push_model,
+ self.agent_ip,
+ self.agent_port,
+ )
+ self.agent_api_version = fallback_version
+
+ # Keep supported_version in sync for backward compatibility
+ self.supported_version = self.agent_api_version
+ logger.info("Using API version %s for agent communication", self.agent_api_version)
# Now set the cv_agent_ip
if "cv_agent_ip" in args and args["cv_agent_ip"] is not None:
@@ -516,7 +671,7 @@ class Tenant:
quote,
self.registrar_data["aik_tpm"],
hash_alg=hash_alg,
- compressed=(self.supported_version == "1.0"),
+ compressed=(self.agent_api_version == "1.0"),
)
if failure:
if self.registrar_data["regcount"] > 1:
@@ -601,12 +756,14 @@ class Tenant:
"accept_tpm_signing_algs": self.accept_tpm_signing_algs,
"ak_tpm": self.registrar_data["aik_tpm"],
"mtls_cert": self.registrar_data.get("mtls_cert", None),
- "supported_version": self.supported_version,
+ "supported_version": self.agent_api_version,
}
json_message = json.dumps(data)
do_cv = RequestsClient(self.verifier_base_url, True, tls_context=self.tls_context)
response = do_cv.post(
- (f"/v{self.api_version}/agents/{self.agent_uuid}"), data=json_message, timeout=self.request_timeout
+ (f"/v{self.ensure_verifier_version()}/agents/{self.agent_uuid}"),
+ data=json_message,
+ timeout=self.request_timeout,
)
if response.status_code == 503:
@@ -671,7 +828,9 @@ class Tenant:
do_cvstatus = RequestsClient(self.verifier_base_url, True, tls_context=self.tls_context)
- response = do_cvstatus.get((f"/v{self.api_version}/agents/{self.agent_uuid}"), timeout=self.request_timeout)
+ response = do_cvstatus.get(
+ (f"/v{self.ensure_verifier_version()}/agents/{self.agent_uuid}"), timeout=self.request_timeout
+ )
response_json = Tenant._jsonify_response(response, print_response=False, raise_except=True)
@@ -728,7 +887,9 @@ class Tenant:
self.set_full_id_str()
- response = do_cvstatus.get(f"/v{self.api_version}/agents/?verifier={verifier_id}", timeout=self.request_timeout)
+ response = do_cvstatus.get(
+ f"/v{self.ensure_verifier_version()}/agents/?verifier={verifier_id}", timeout=self.request_timeout
+ )
response_json = Tenant._jsonify_response(response, print_response=False, raise_except=True)
@@ -776,7 +937,8 @@ class Tenant:
self.set_full_id_str()
response = do_cvstatus.get(
- f"/v{self.api_version}/agents/?bulk={True}&verifier={verifier_id}", timeout=self.request_timeout
+ f"/v{self.ensure_verifier_version()}/agents/?bulk={True}&verifier={verifier_id}",
+ timeout=self.request_timeout,
)
response_json = Tenant._jsonify_response(response, print_response=False)
@@ -823,7 +985,9 @@ class Tenant:
self.set_full_id_str()
do_cvdelete = RequestsClient(self.verifier_base_url, True, tls_context=self.tls_context)
- response = do_cvdelete.delete(f"/v{self.api_version}/agents/{self.agent_uuid}", timeout=self.request_timeout)
+ response = do_cvdelete.delete(
+ f"/v{self.ensure_verifier_version()}/agents/{self.agent_uuid}", timeout=self.request_timeout
+ )
response_json = Tenant._jsonify_response(response, print_response=False, raise_except=True)
@@ -894,7 +1058,13 @@ class Tenant:
self.set_full_id_str()
- agent_info = registrar_client.getData(self.registrar_ip, self.registrar_port, self.agent_uuid, self.tls_context)
+ agent_info = registrar_client.getData(
+ self.registrar_ip,
+ self.registrar_port,
+ self.agent_uuid,
+ self.tls_context,
+ api_version=self.ensure_registrar_version(),
+ )
if not agent_info:
logger.info(
@@ -945,7 +1115,10 @@ class Tenant:
self.set_full_id_str()
response = registrar_client.doRegistrarList(
- self.registrar_ip, self.registrar_port, tls_context=self.tls_context
+ self.registrar_ip,
+ self.registrar_port,
+ tls_context=self.tls_context,
+ api_version=self.ensure_registrar_version(),
)
# Marked for deletion (need to modify the code on CI tests)
@@ -964,7 +1137,11 @@ class Tenant:
raise UserError("registrar_ip and registrar_port have both to be set in the configuration")
response = registrar_client.doRegistrarDelete(
- self.registrar_ip, self.registrar_port, self.agent_uuid, tls_context=self.tls_context
+ self.registrar_ip,
+ self.registrar_port,
+ self.agent_uuid,
+ tls_context=self.tls_context,
+ api_version=self.ensure_registrar_version(),
)
return response
@@ -1014,7 +1191,7 @@ class Tenant:
do_cvreactivate = RequestsClient(self.verifier_base_url, True, tls_context=self.tls_context)
response = do_cvreactivate.put(
- f"/v{self.api_version}/agents/{self.agent_uuid}/reactivate",
+ f"/v{self.ensure_verifier_version()}/agents/{self.agent_uuid}/reactivate",
data=b"",
timeout=self.request_timeout,
)
@@ -1039,7 +1216,7 @@ class Tenant:
def do_cvstop(self) -> None:
"""Stop declared active agent"""
- params = f"/v{self.api_version}/agents/{self.agent_uuid}/stop"
+ params = f"/v{self.ensure_verifier_version()}/agents/{self.agent_uuid}/stop"
do_cvstop = RequestsClient(self.verifier_base_url, True, tls_context=self.tls_context)
response = do_cvstop.put(params, data=b"", timeout=self.request_timeout)
@@ -1073,7 +1250,7 @@ class Tenant:
# Note: We need a specific retry handler (perhaps in common), no point having localised unless we have too.
while True:
try:
- params = f"/v{self.supported_version}/quotes/identity?nonce=%s" % (self.nonce)
+ params = f"/v{self.agent_api_version}/quotes/identity?nonce=%s" % (self.nonce)
cloudagent_base_url = f"{bracketize_ipv6(self.agent_ip)}:{self.agent_port}"
if self.enable_agent_mtls and self.registrar_data and self.registrar_data["mtls_cert"]:
@@ -1165,7 +1342,7 @@ class Tenant:
data["payload"] = self.payload.decode("utf-8")
# post encrypted U back to CloudAgent
- params = f"/v{self.supported_version}/keys/ukey"
+ params = f"/v{self.agent_api_version}/keys/ukey"
cloudagent_base_url = f"{bracketize_ipv6(self.agent_ip)}:{self.agent_port}"
if self.enable_agent_mtls and self.registrar_data and self.registrar_data["mtls_cert"]:
@@ -1209,14 +1386,14 @@ class Tenant:
tls_context=self.agent_tls_context,
) as do_verify:
response = do_verify.get(
- f"/v{self.supported_version}/keys/verify?challenge={challenge}",
+ f"/v{self.agent_api_version}/keys/verify?challenge={challenge}",
timeout=self.request_timeout,
)
else:
logger.warning("Connecting to %s without using mTLS!", self.agent_fid_str)
do_verify = RequestsClient(cloudagent_base_url, tls_enabled=False)
response = do_verify.get(
- f"/v{self.supported_version}/keys/verify?challenge={challenge}", timeout=self.request_timeout
+ f"/v{self.agent_api_version}/keys/verify?challenge={challenge}", timeout=self.request_timeout
)
response_json = Tenant._jsonify_response(response, print_response=False, raise_except=True)
@@ -1320,7 +1497,7 @@ class Tenant:
cv_client = RequestsClient(self.verifier_base_url, True, tls_context=self.tls_context)
response = cv_client.post(
- f"/v{self.api_version}/allowlists/{self.runtime_policy_name}", data=body, timeout=self.request_timeout
+ f"/v{self.ensure_verifier_version()}/allowlists/{self.runtime_policy_name}", data=body, timeout=self.request_timeout
)
response_json = Tenant._jsonify_response(response)
@@ -1332,7 +1509,7 @@ class Tenant:
cv_client = RequestsClient(self.verifier_base_url, True, tls_context=self.tls_context)
response = cv_client.put(
- f"/v{self.api_version}/allowlists/{self.runtime_policy_name}", data=body, timeout=self.request_timeout
+ f"/v{self.ensure_verifier_version()}/allowlists/{self.runtime_policy_name}", data=body, timeout=self.request_timeout
)
response_json = Tenant._jsonify_response(response)
@@ -1343,7 +1520,7 @@ class Tenant:
if not name:
raise UserError("--allowlist_name or --runtime_policy_name is required to delete a runtime policy")
cv_client = RequestsClient(self.verifier_base_url, True, tls_context=self.tls_context)
- response = cv_client.delete(f"/v{self.api_version}/allowlists/{name}", timeout=self.request_timeout)
+ response = cv_client.delete(f"/v{self.ensure_verifier_version()}/allowlists/{name}", timeout=self.request_timeout)
response_json = Tenant._jsonify_response(response)
if response.status_code >= 400:
@@ -1353,7 +1530,7 @@ class Tenant:
if not name:
raise UserError("--allowlist_name or --runtime_policy_name is required to show a runtime policy")
cv_client = RequestsClient(self.verifier_base_url, True, tls_context=self.tls_context)
- response = cv_client.get(f"/v{self.api_version}/allowlists/{name}", timeout=self.request_timeout)
+ response = cv_client.get(f"/v{self.ensure_verifier_version()}/allowlists/{name}", timeout=self.request_timeout)
print(f"Show allowlist command response: {response.status_code}.")
response_json = Tenant._jsonify_response(response)
@@ -1362,7 +1539,7 @@ class Tenant:
def do_list_runtime_policy(self) -> None:
cv_client = RequestsClient(self.verifier_base_url, True, tls_context=self.tls_context)
- response = cv_client.get(f"/v{self.api_version}/allowlists/", timeout=self.request_timeout)
+ response = cv_client.get(f"/v{self.ensure_verifier_version()}/allowlists/", timeout=self.request_timeout)
print(f"list command response: {response.status_code}.")
response_json = Tenant._jsonify_response(response)
@@ -1393,7 +1570,7 @@ class Tenant:
cv_client = RequestsClient(self.verifier_base_url, True, tls_context=self.tls_context)
response = cv_client.post(
- f"/v{self.api_version}/mbpolicies/{self.mb_policy_name}", data=body, timeout=self.request_timeout
+ f"/v{self.ensure_verifier_version()}/mbpolicies/{self.mb_policy_name}", data=body, timeout=self.request_timeout
)
response_json = Tenant._jsonify_response(response)
@@ -1405,7 +1582,7 @@ class Tenant:
cv_client = RequestsClient(self.verifier_base_url, True, tls_context=self.tls_context)
response = cv_client.put(
- f"/v{self.api_version}/mbpolicies/{self.mb_policy_name}", data=body, timeout=self.request_timeout
+ f"/v{self.ensure_verifier_version()}/mbpolicies/{self.mb_policy_name}", data=body, timeout=self.request_timeout
)
response_json = Tenant._jsonify_response(response)
@@ -1416,7 +1593,7 @@ class Tenant:
if not name:
raise UserError("--mb_policy_name is required to delete a runtime policy")
cv_client = RequestsClient(self.verifier_base_url, True, tls_context=self.tls_context)
- response = cv_client.delete(f"/v{self.api_version}/mbpolicies/{name}", timeout=self.request_timeout)
+ response = cv_client.delete(f"/v{self.ensure_verifier_version()}/mbpolicies/{name}", timeout=self.request_timeout)
response_json = Tenant._jsonify_response(response)
if response.status_code >= 400:
@@ -1426,7 +1603,7 @@ class Tenant:
if not name:
raise UserError("--mb_policy_name is required to show a runtime policy")
cv_client = RequestsClient(self.verifier_base_url, True, tls_context=self.tls_context)
- response = cv_client.get(f"/v{self.api_version}/mbpolicies/{name}", timeout=self.request_timeout)
+ response = cv_client.get(f"/v{self.ensure_verifier_version()}/mbpolicies/{name}", timeout=self.request_timeout)
print(f"showmbpolicy command response: {response.status_code}.")
response_json = Tenant._jsonify_response(response)
@@ -1435,7 +1612,7 @@ class Tenant:
def do_list_mb_policy(self) -> None: # pylint: disable=unused-argument
cv_client = RequestsClient(self.verifier_base_url, True, tls_context=self.tls_context)
- response = cv_client.get(f"/v{self.api_version}/mbpolicies/", timeout=self.request_timeout)
+ response = cv_client.get(f"/v{self.ensure_verifier_version()}/mbpolicies/", timeout=self.request_timeout)
print(f"listmbpolicy command response: {response.status_code}.")
response_json = Tenant._jsonify_response(response)
@@ -1688,16 +1865,29 @@ def main() -> None:
default=None,
help="The name of the measure boot policy to operate with",
)
+ parser.add_argument(
+ "--agent-api-version",
+ default=None,
+ action="store",
+ dest="agent_api_version",
+ help="API version to use for agent communication. Detected automatically by default",
+ )
parser.add_argument(
"--supported-version",
default=None,
action="store",
dest="supported_version",
- help="API version that is supported by the agent. Detected automatically by default",
+ help="DEPRECATED: Use --agent-api-version instead. API version to use for agent communication",
)
args = parser.parse_args()
+ # Handle deprecated --supported-version argument
+ if args.supported_version is not None:
+ logger.warning("--supported-version is deprecated. Use --agent-api-version instead.")
+ if args.agent_api_version is None:
+ args.agent_api_version = args.supported_version
+
argerr, argerrmsg = options.get_opts_error(args)
if argerr:
parser.error(argerrmsg)
diff --git a/test/test_api_version.py b/test/test_api_version.py
index 5389aaa..e1d834a 100644
--- a/test/test_api_version.py
+++ b/test/test_api_version.py
@@ -56,6 +56,71 @@ class APIVersion_Test(unittest.TestCase):
with self.subTest(description):
self.assertEqual(api_version.is_supported_version(version), supported, description)
+ def test_negotiate_version_with_string(self):
+ """Test negotiate_version with a single version string."""
+ result = api_version.negotiate_version("2.0")
+ self.assertEqual(result, "2.0")
+
+ result = api_version.negotiate_version("99.0")
+ self.assertIsNone(result)
+
+ def test_negotiate_version_with_list(self):
+ """Test negotiate_version with a list of versions."""
+ result = api_version.negotiate_version(["1.0", "2.0", "2.1"])
+ self.assertEqual(result, "2.1")
+
+ result = api_version.negotiate_version(["1.0", "2.0", "99.0"])
+ self.assertEqual(result, "2.0")
+
+ result = api_version.negotiate_version(["98.0", "99.0"])
+ self.assertIsNone(result)
+
+ def test_negotiate_version_returns_highest(self):
+ """Test that negotiate_version returns the highest common version."""
+ result = api_version.negotiate_version(["1.0", "2.0", "2.1", "2.2", "2.3"])
+ self.assertEqual(result, "2.3")
+
+ result = api_version.negotiate_version(["1.0", "2.0"])
+ self.assertEqual(result, "2.0")
+
+ def test_negotiate_version_with_custom_local_versions(self):
+ """Test negotiate_version with custom local_versions."""
+ local_versions = ["1.0", "2.0", "2.1", "2.2", "2.3"]
+ result = api_version.negotiate_version(["2.3", "3.0"], local_versions)
+ self.assertEqual(result, "2.3")
+
+ result = api_version.negotiate_version(["3.0"], local_versions)
+ self.assertIsNone(result)
+
+ def test_negotiate_version_raise_on_error(self):
+ """Test negotiate_version with raise_on_error=True."""
+ with self.assertRaises(ValueError) as context:
+ api_version.negotiate_version(["99.0"], raise_on_error=True)
+ self.assertIn("No compatible API version", str(context.exception))
+
+ result = api_version.negotiate_version(["2.0"], raise_on_error=True)
+ self.assertEqual(result, "2.0")
+
+ def test_negotiate_version_empty_list(self):
+ """Test negotiate_version with empty list."""
+ result = api_version.negotiate_version([])
+ self.assertIsNone(result)
+
+ with self.assertRaises(ValueError):
+ api_version.negotiate_version([], raise_on_error=True)
+
+ def test_negotiate_version_proper_version_comparison(self):
+ """Test that version comparison is numeric, not string-based (2.10 > 2.2)."""
+ local_versions = ["2.2", "2.10"]
+ remote_versions = ["2.2", "2.10"]
+ result = api_version.negotiate_version(remote_versions, local_versions)
+ self.assertEqual(result, "2.10", "2.10 should be greater than 2.2")
+
+ local_versions = ["1.0", "2.1", "2.2", "2.9", "2.10", "2.11"]
+ remote_versions = ["2.2", "2.9", "2.10"]
+ result = api_version.negotiate_version(remote_versions, local_versions)
+ self.assertEqual(result, "2.10", "2.10 should be the highest common version")
+
if __name__ == "__main__":
unittest.main()
--
2.52.0

View File

@ -9,7 +9,7 @@
Name: keylime
Version: 7.12.1
Release: 11%{?dist}.4
Release: 11%{?dist}.5
Summary: Open source TPM software for Bootstrapping and Maintaining Trust
URL: https://github.com/keylime/keylime
@ -51,6 +51,12 @@ Patch: 0015-Fix-registrar-duplicate-UUID-vulnerability.patch
# CVE-2026-1709
Patch: 0016-CVE-2026-1709.patch
# Tenant version negotiation.
# Backport from:
# - https://github.com/keylime/keylime/pull/1838
# - https://github.com/keylime/keylime/pull/1845
Patch: 0017-Backport-tenant-version-negotiation-mechanism.patch
License: ASL 2.0 and MIT
BuildRequires: git-core
@ -444,6 +450,10 @@ fi
%license LICENSE
%changelog
* Fri Apr 17 2026 Sergio Correia <scorreia@redhat.com> - 7.12.1-11.5
- Add API version negotiation to keylime_tenant
Resolves: RHEL-154784
* Tue Feb 03 2026 Anderson Toshiyuki Sasaki <ansasaki@redhat.com> - 7.12.1-11.4
- CVE-2026-1709: Registrar authentication bypass
Resolves: RHEL-145390