From f7c32aec9c44a176124d982d942391ed3d50e846 Mon Sep 17 00:00:00 2001 From: Sergio Correia Date: Tue, 3 Jun 2025 21:23:09 +0100 Subject: [PATCH 1/6] Make keylime compatible with python 3.9 Signed-off-by: Sergio Correia --- keylime/ima/types.py | 33 ++++---- keylime/models/base/basic_model.py | 4 +- keylime/models/base/basic_model_meta.py | 4 +- keylime/models/base/field.py | 4 +- keylime/models/base/persistable_model.py | 4 +- keylime/models/base/type.py | 4 +- keylime/models/base/types/base64_bytes.py | 4 +- keylime/models/base/types/certificate.py | 92 +++++++++++---------- keylime/models/base/types/dictionary.py | 4 +- keylime/models/base/types/one_of.py | 6 +- keylime/models/registrar/registrar_agent.py | 31 +++---- keylime/policy/create_runtime_policy.py | 2 +- keylime/registrar_client.py | 8 +- keylime/web/base/action_handler.py | 7 +- keylime/web/base/controller.py | 78 ++++++++--------- tox.ini | 10 +++ 16 files changed, 154 insertions(+), 141 deletions(-) diff --git a/keylime/ima/types.py b/keylime/ima/types.py index 99f0aa7..a0fffdf 100644 --- a/keylime/ima/types.py +++ b/keylime/ima/types.py @@ -6,11 +6,6 @@ if sys.version_info >= (3, 8): else: from typing_extensions import Literal, TypedDict -if sys.version_info >= (3, 11): - from typing import NotRequired, Required -else: - from typing_extensions import NotRequired, Required - ### Types for tpm_dm.py RuleAttributeType = Optional[Union[int, str, bool]] @@ -51,7 +46,7 @@ class Rule(TypedDict): class Policies(TypedDict): - version: Required[int] + version: int match_on: MatchKeyType rules: Dict[str, Rule] @@ -60,27 +55,27 @@ class Policies(TypedDict): class RPMetaType(TypedDict): - version: Required[int] - generator: NotRequired[int] - timestamp: NotRequired[str] + version: int + generator: int + timestamp: str class RPImaType(TypedDict): - ignored_keyrings: Required[List[str]] - log_hash_alg: Required[Literal["sha1", "sha256", "sha384", "sha512"]] + ignored_keyrings: List[str] + log_hash_alg: Literal["sha1", "sha256", "sha384", "sha512"] dm_policy: Optional[Policies] RuntimePolicyType = TypedDict( "RuntimePolicyType", { - "meta": Required[RPMetaType], - "release": NotRequired[int], - "digests": Required[Dict[str, List[str]]], - "excludes": Required[List[str]], - "keyrings": Required[Dict[str, List[str]]], - "ima": Required[RPImaType], - "ima-buf": Required[Dict[str, List[str]]], - "verification-keys": Required[str], + "meta": RPMetaType, + "release": int, + "digests": Dict[str, List[str]], + "excludes": List[str], + "keyrings": Dict[str, List[str]], + "ima": RPImaType, + "ima-buf": Dict[str, List[str]], + "verification-keys": str, }, ) diff --git a/keylime/models/base/basic_model.py b/keylime/models/base/basic_model.py index 68a126e..6f5de83 100644 --- a/keylime/models/base/basic_model.py +++ b/keylime/models/base/basic_model.py @@ -407,7 +407,9 @@ class BasicModel(ABC, metaclass=BasicModelMeta): if max and length > max: self._add_error(field, msg or f"should be at most {length} {element_type}(s)") - def validate_number(self, field: str, *expressions: tuple[str, int | float], msg: Optional[str] = None) -> None: + def validate_number( + self, field: str, *expressions: tuple[str, Union[int, float]], msg: Optional[str] = None + ) -> None: value = self.values.get(field) if not value: diff --git a/keylime/models/base/basic_model_meta.py b/keylime/models/base/basic_model_meta.py index 353e004..84617d4 100644 --- a/keylime/models/base/basic_model_meta.py +++ b/keylime/models/base/basic_model_meta.py @@ -1,6 +1,6 @@ from abc import ABCMeta from types import MappingProxyType -from typing import Any, Callable, Mapping, TypeAlias, Union +from typing import Any, Callable, Mapping, Union from sqlalchemy.types import TypeEngine @@ -40,7 +40,7 @@ class BasicModelMeta(ABCMeta): # pylint: disable=bad-staticmethod-argument, no-value-for-parameter, using-constant-test - DeclaredFieldType: TypeAlias = Union[ModelType, TypeEngine, type[ModelType], type[TypeEngine]] + DeclaredFieldType = Union[ModelType, TypeEngine, type[ModelType], type[TypeEngine]] @classmethod def _is_model_class(mcs, cls: type) -> bool: # type: ignore[reportSelfClassParameterName] diff --git a/keylime/models/base/field.py b/keylime/models/base/field.py index 7fb3dcb..d1e3bc3 100644 --- a/keylime/models/base/field.py +++ b/keylime/models/base/field.py @@ -1,6 +1,6 @@ import re from inspect import isclass -from typing import TYPE_CHECKING, Any, Optional, TypeAlias, Union +from typing import TYPE_CHECKING, Any, Optional, Union from sqlalchemy.types import TypeEngine @@ -23,7 +23,7 @@ class ModelField: [2] https://docs.python.org/3/library/functions.html#property """ - DeclaredFieldType: TypeAlias = Union[ModelType, TypeEngine, type[ModelType], type[TypeEngine]] + DeclaredFieldType = Union[ModelType, TypeEngine, type[ModelType], type[TypeEngine]] FIELD_NAME_REGEX = re.compile(r"^[A-Za-z_]+[A-Za-z0-9_]*$") diff --git a/keylime/models/base/persistable_model.py b/keylime/models/base/persistable_model.py index 18f7d0d..015d661 100644 --- a/keylime/models/base/persistable_model.py +++ b/keylime/models/base/persistable_model.py @@ -1,4 +1,4 @@ -from typing import Any, Mapping, Optional, Sequence +from typing import Any, Mapping, Optional, Sequence, Union from keylime.models.base.basic_model import BasicModel from keylime.models.base.db import db_manager @@ -165,7 +165,7 @@ class PersistableModel(BasicModel, metaclass=PersistableModelMeta): else: return None - def __init__(self, data: Optional[dict | object] = None, process_associations: bool = True) -> None: + def __init__(self, data: Optional[Union[dict, object]] = None, process_associations: bool = True) -> None: if isinstance(data, type(self).db_mapping): super().__init__({}, process_associations) self._init_from_mapping(data, process_associations) diff --git a/keylime/models/base/type.py b/keylime/models/base/type.py index 2520f72..e4d924c 100644 --- a/keylime/models/base/type.py +++ b/keylime/models/base/type.py @@ -1,7 +1,7 @@ from decimal import Decimal from inspect import isclass from numbers import Real -from typing import Any, TypeAlias, Union +from typing import Any, Union from sqlalchemy.engine.interfaces import Dialect from sqlalchemy.types import TypeEngine @@ -99,7 +99,7 @@ class ModelType: you should instead set ``_type_engine`` to ``None`` and override the ``get_db_type`` method. """ - DeclaredTypeEngine: TypeAlias = Union[TypeEngine, type[TypeEngine]] + DeclaredTypeEngine = Union[TypeEngine, type[TypeEngine]] def __init__(self, type_engine: DeclaredTypeEngine) -> None: if isclass(type_engine) and issubclass(type_engine, TypeEngine): diff --git a/keylime/models/base/types/base64_bytes.py b/keylime/models/base/types/base64_bytes.py index b9b4b13..a1eeced 100644 --- a/keylime/models/base/types/base64_bytes.py +++ b/keylime/models/base/types/base64_bytes.py @@ -1,6 +1,6 @@ import base64 import binascii -from typing import Optional, TypeAlias, Union +from typing import Optional, Union from sqlalchemy.types import Text @@ -62,7 +62,7 @@ class Base64Bytes(ModelType): b64_str = Base64Bytes().cast("MIIE...") """ - IncomingValue: TypeAlias = Union[bytes, str, None] + IncomingValue = Union[bytes, str, None] def __init__(self) -> None: super().__init__(Text) diff --git a/keylime/models/base/types/certificate.py b/keylime/models/base/types/certificate.py index 2c27603..0f03169 100644 --- a/keylime/models/base/types/certificate.py +++ b/keylime/models/base/types/certificate.py @@ -1,7 +1,7 @@ import base64 import binascii import io -from typing import Optional, TypeAlias, Union +from typing import Optional, Union import cryptography.x509 from cryptography.hazmat.primitives.serialization import Encoding @@ -78,7 +78,7 @@ class Certificate(ModelType): cert = Certificate().cast("-----BEGIN CERTIFICATE-----\nMIIE...") """ - IncomingValue: TypeAlias = Union[cryptography.x509.Certificate, bytes, str, None] + IncomingValue = Union[cryptography.x509.Certificate, bytes, str, None] def __init__(self) -> None: super().__init__(Text) @@ -195,18 +195,19 @@ class Certificate(ModelType): """ try: - match self.infer_encoding(value): - case "decoded": - return None - case "der": - cryptography.x509.load_der_x509_certificate(value) # type: ignore[reportArgumentType, arg-type] - case "pem": - cryptography.x509.load_pem_x509_certificate(value) # type: ignore[reportArgumentType, arg-type] - case "base64": - der_value = base64.b64decode(value, validate=True) # type: ignore[reportArgumentType, arg-type] - cryptography.x509.load_der_x509_certificate(der_value) - case _: - raise Exception + encoding_inf = self.infer_encoding(value) + if encoding_inf == "decoded": + return None + + if encoding_inf == "der": + cryptography.x509.load_der_x509_certificate(value) # type: ignore[reportArgumentType, arg-type] + elif encoding_inf == "pem": + cryptography.x509.load_pem_x509_certificate(value) # type: ignore[reportArgumentType, arg-type] + elif encoding_inf == "base64": + der_value = base64.b64decode(value, validate=True) # type: ignore[reportArgumentType, arg-type] + cryptography.x509.load_der_x509_certificate(der_value) + else: + raise Exception except Exception: return False @@ -227,37 +228,38 @@ class Certificate(ModelType): if not value: return None - match self.infer_encoding(value): - case "decoded": - return value # type: ignore[reportReturnType, return-value] - case "der": - try: - return self._load_der_cert(value) # type: ignore[reportArgumentType, arg-type] - except PyAsn1Error as err: - raise ValueError( - f"value cast to certificate appears DER encoded but cannot be deserialized as such: {value!r}" - ) from err - case "pem": - try: - return self._load_pem_cert(value) # type: ignore[reportArgumentType, arg-type] - except PyAsn1Error as err: - raise ValueError( - f"value cast to certificate appears PEM encoded but cannot be deserialized as such: " - f"'{str(value)}'" - ) from err - case "base64": - try: - return self._load_der_cert(base64.b64decode(value, validate=True)) # type: ignore[reportArgumentType, arg-type] - except (binascii.Error, PyAsn1Error) as err: - raise ValueError( - f"value cast to certificate appears Base64 encoded but cannot be deserialized as such: " - f"'{str(value)}'" - ) from err - case _: - raise TypeError( - f"value cast to certificate is of type '{value.__class__.__name__}' but should be one of 'str', " - f"'bytes' or 'cryptography.x509.Certificate': '{str(value)}'" - ) + encoding_inf = self.infer_encoding(value) + if encoding_inf == "decoded": + return value # type: ignore[reportReturnType, return-value] + + if encoding_inf == "der": + try: + return self._load_der_cert(value) # type: ignore[reportArgumentType, arg-type] + except PyAsn1Error as err: + raise ValueError( + f"value cast to certificate appears DER encoded but cannot be deserialized as such: {value!r}" + ) from err + elif encoding_inf == "pem": + try: + return self._load_pem_cert(value) # type: ignore[reportArgumentType, arg-type] + except PyAsn1Error as err: + raise ValueError( + f"value cast to certificate appears PEM encoded but cannot be deserialized as such: " + f"'{str(value)}'" + ) from err + elif encoding_inf == "base64": + try: + return self._load_der_cert(base64.b64decode(value, validate=True)) # type: ignore[reportArgumentType, arg-type] + except (binascii.Error, PyAsn1Error) as err: + raise ValueError( + f"value cast to certificate appears Base64 encoded but cannot be deserialized as such: " + f"'{str(value)}'" + ) from err + else: + raise TypeError( + f"value cast to certificate is of type '{value.__class__.__name__}' but should be one of 'str', " + f"'bytes' or 'cryptography.x509.Certificate': '{str(value)}'" + ) def generate_error_msg(self, _value: IncomingValue) -> str: return "must be a valid X.509 certificate in PEM format or otherwise encoded using Base64" diff --git a/keylime/models/base/types/dictionary.py b/keylime/models/base/types/dictionary.py index 7d9e811..d9ffec3 100644 --- a/keylime/models/base/types/dictionary.py +++ b/keylime/models/base/types/dictionary.py @@ -1,5 +1,5 @@ import json -from typing import Optional, TypeAlias, Union +from typing import Optional, Union from sqlalchemy.types import Text @@ -50,7 +50,7 @@ class Dictionary(ModelType): kv_pairs = Dictionary().cast('{"key": "value"}') """ - IncomingValue: TypeAlias = Union[dict, str, None] + IncomingValue = Union[dict, str, None] def __init__(self) -> None: super().__init__(Text) diff --git a/keylime/models/base/types/one_of.py b/keylime/models/base/types/one_of.py index 479d417..faf097d 100644 --- a/keylime/models/base/types/one_of.py +++ b/keylime/models/base/types/one_of.py @@ -1,6 +1,6 @@ from collections import Counter from inspect import isclass -from typing import Any, Optional, TypeAlias, Union +from typing import Any, Optional, Union from sqlalchemy.engine.interfaces import Dialect from sqlalchemy.types import Float, Integer, String, TypeEngine @@ -65,8 +65,8 @@ class OneOf(ModelType): incoming PEM value would not be cast to a certificate object and remain a string. """ - Declaration: TypeAlias = Union[str, int, float, ModelType, TypeEngine, type[ModelType], type[TypeEngine]] - PermittedList: TypeAlias = list[Union[str, int, float, ModelType]] + Declaration = Union[str, int, float, ModelType, TypeEngine, type[ModelType], type[TypeEngine]] + PermittedList = list[Union[str, int, float, ModelType]] def __init__(self, *args: Declaration) -> None: # pylint: disable=super-init-not-called diff --git a/keylime/models/registrar/registrar_agent.py b/keylime/models/registrar/registrar_agent.py index 560c188..b232049 100644 --- a/keylime/models/registrar/registrar_agent.py +++ b/keylime/models/registrar/registrar_agent.py @@ -153,21 +153,22 @@ class RegistrarAgent(PersistableModel): names = ", ".join(non_compliant_certs) names = " and".join(names.rsplit(",", 1)) - match config.get("registrar", "malformed_cert_action"): - case "ignore": - return - case "reject": - logger.error( - "Certificate(s) %s may not conform to strict ASN.1 DER encoding rules and were rejected due to " - "config ('malformed_cert_action = reject')", - names, - ) - case _: - logger.warning( - "Certificate(s) %s may not conform to strict ASN.1 DER encoding rules and were re-encoded before " - "parsing by python-cryptography", - names, - ) + cfg = config.get("registrar", "malformed_cert_action") + if cfg == "ignore": + return + + if cfg == "reject": + logger.error( + "Certificate(s) %s may not conform to strict ASN.1 DER encoding rules and were rejected due to " + "config ('malformed_cert_action = reject')", + names, + ) + else: + logger.warning( + "Certificate(s) %s may not conform to strict ASN.1 DER encoding rules and were re-encoded before " + "parsing by python-cryptography", + names, + ) def _bind_ak_to_iak(self, iak_attest, iak_sign): # The ak-iak binding should only be verified when either aik_tpm or iak_tpm is changed diff --git a/keylime/policy/create_runtime_policy.py b/keylime/policy/create_runtime_policy.py index 6a412c4..8e1c687 100644 --- a/keylime/policy/create_runtime_policy.py +++ b/keylime/policy/create_runtime_policy.py @@ -972,7 +972,7 @@ def create_runtime_policy(args: argparse.Namespace) -> Optional[RuntimePolicyTyp ) abort = True else: - if a not in algorithms.Hash: + if a not in set(algorithms.Hash): if a == SHA256_OR_SM3: algo = a else: diff --git a/keylime/registrar_client.py b/keylime/registrar_client.py index 705ff12..97fbc2a 100644 --- a/keylime/registrar_client.py +++ b/keylime/registrar_client.py @@ -13,12 +13,6 @@ if sys.version_info >= (3, 8): else: from typing_extensions import TypedDict -if sys.version_info >= (3, 11): - from typing import NotRequired -else: - from typing_extensions import NotRequired - - class RegistrarData(TypedDict): ip: Optional[str] port: Optional[str] @@ -27,7 +21,7 @@ class RegistrarData(TypedDict): aik_tpm: str ek_tpm: str ekcert: Optional[str] - provider_keys: NotRequired[Dict[str, str]] + provider_keys: Dict[str, str] logger = keylime_logging.init_logging("registrar_client") diff --git a/keylime/web/base/action_handler.py b/keylime/web/base/action_handler.py index b20de89..e7b5888 100644 --- a/keylime/web/base/action_handler.py +++ b/keylime/web/base/action_handler.py @@ -1,4 +1,5 @@ import re +import sys import time import traceback from inspect import iscoroutinefunction @@ -48,7 +49,11 @@ class ActionHandler(RequestHandler): # Take the list of strings returned by format_exception, where each string ends in a newline and may contain # internal newlines, and split the concatenation of all the strings by newline - message = "".join(traceback.format_exception(err)) + if sys.version_info < (3, 10): + message = "".join(traceback.format_exception(err, None, None)) + else: + message = "".join(traceback.format_exception(err)) + lines = message.split("\n") for line in lines: diff --git a/keylime/web/base/controller.py b/keylime/web/base/controller.py index f1ac3c5..153535e 100644 --- a/keylime/web/base/controller.py +++ b/keylime/web/base/controller.py @@ -2,7 +2,7 @@ import http.client import json import re from types import MappingProxyType -from typing import TYPE_CHECKING, Any, Mapping, Optional, Sequence, TypeAlias, Union +from typing import TYPE_CHECKING, Any, Mapping, Optional, Sequence, Union from tornado.escape import parse_qs_bytes from tornado.httputil import parse_body_arguments @@ -15,14 +15,16 @@ if TYPE_CHECKING: from keylime.models.base.basic_model import BasicModel from keylime.web.base.action_handler import ActionHandler -PathParams: TypeAlias = Mapping[str, str] -QueryParams: TypeAlias = Mapping[str, str | Sequence[str]] -MultipartParams: TypeAlias = Mapping[str, Union[str, bytes, Sequence[str | bytes]]] -FormParams: TypeAlias = Union[QueryParams, MultipartParams] -JSONConvertible: TypeAlias = Union[str, int, float, bool, None, "JSONObjectConvertible", "JSONArrayConvertible"] -JSONObjectConvertible: TypeAlias = Mapping[str, JSONConvertible] -JSONArrayConvertible: TypeAlias = Sequence[JSONConvertible] # pyright: ignore[reportInvalidTypeForm] -Params: TypeAlias = Mapping[str, Union[str, bytes, Sequence[str | bytes], JSONObjectConvertible, JSONArrayConvertible]] +PathParams = Mapping[str, str] +QueryParams = Mapping[str, Union[str, Sequence[str]]] +MultipartParams = Mapping[str, Union[str, bytes, Union[Sequence[str], Sequence[bytes]]]] +FormParams = Union[QueryParams, MultipartParams] +JSONConvertible = Union[str, int, float, bool, None, "JSONObjectConvertible", "JSONArrayConvertible"] +JSONObjectConvertible = Mapping[str, JSONConvertible] +JSONArrayConvertible = Sequence[JSONConvertible] # pyright: ignore[reportInvalidTypeForm] +Params = Mapping[ + str, Union[str, bytes, Union[Sequence[str], Sequence[bytes]], JSONObjectConvertible, JSONArrayConvertible] +] class Controller: @@ -77,7 +79,7 @@ class Controller: VERSION_REGEX = re.compile("^\\/v(\\d+)(?:\\.(\\d+))*") @staticmethod - def decode_url_query(query: str | bytes) -> QueryParams: + def decode_url_query(query: Union[str, bytes]) -> QueryParams: """Parses a binary query string (whether from a URL or HTTP body) into a dict of Unicode strings. If multiple instances of the same key are present in the string, their values are collected into a list. @@ -135,8 +137,8 @@ class Controller: @staticmethod def prepare_http_body( - body: Union[str, JSONObjectConvertible | JSONArrayConvertible, Any], content_type: Optional[str] = None - ) -> tuple[Optional[bytes | Any], Optional[str]]: + body: Union[str, Union[JSONObjectConvertible, JSONArrayConvertible], Any], content_type: Optional[str] = None + ) -> tuple[Optional[Union[bytes, Any]], Optional[str]]: """Prepares an object to be included in the body of an HTTP request or response and infers the appropriate media type unless provided. ``body`` will be serialised into JSON if it contains a ``dict`` or ``list`` which is serialisable unless a ``content_type`` other than ``"application/json"`` is provided. @@ -155,32 +157,34 @@ class Controller: if content_type: content_type = content_type.lower().strip() - body_out: Optional[bytes | Any] - content_type_out: Optional[str] - - match (body, content_type): - case (None, _): - body_out = None - content_type_out = content_type - case ("", _): - body_out = b"" - content_type_out = "text/plain; charset=utf-8" - case (_, "text/plain"): + body_out: Optional[bytes | Any] = None + content_type_out: Optional[str] = None + + if body is None: + body_out = None + content_type_out = content_type + elif body == "": + body_out = b"" + content_type_out = "text/plain; charset=utf-8" + else: + if content_type == "text/plain": body_out = str(body).encode("utf-8") content_type_out = "text/plain; charset=utf-8" - case (_, "application/json") if isinstance(body, str): - body_out = body.encode("utf-8") - content_type_out = "application/json" - case (_, "application/json"): - body_out = json.dumps(body, allow_nan=False, indent=4).encode("utf-8") - content_type_out = "application/json" - case (_, None) if isinstance(body, str): - body_out = body.encode("utf-8") - content_type_out = "text/plain; charset=utf-8" - case (_, None) if isinstance(body, (dict, list)): - body_out = json.dumps(body, allow_nan=False, indent=4).encode("utf-8") - content_type_out = "application/json" - case (_, _): + elif content_type == "application/json": + if isinstance(body, str): + body_out = body.encode("utf-8") + content_type_out = "application/json" + else: + body_out = json.dumps(body, allow_nan=False, indent=4).encode("utf-8") + content_type_out = "application/json" + elif content_type is None: + if isinstance(body, str): + body_out = body.encode("utf-8") + content_type_out = "text/plain; charset=utf-8" + elif isinstance(body, (dict, list)): + body_out = json.dumps(body, allow_nan=False, indent=4).encode("utf-8") + content_type_out = "application/json" + else: body_out = body content_type_out = content_type @@ -248,7 +252,7 @@ class Controller: self, code: int = 200, status: Optional[str] = None, - data: Optional[JSONObjectConvertible | JSONArrayConvertible] = None, + data: Optional[Union[JSONObjectConvertible, JSONArrayConvertible]] = None, ) -> None: """Converts a Python data structure to JSON and wraps it in the following boilerplate JSON object which is returned by all v2 endpoints: diff --git a/tox.ini b/tox.ini index 031ac54..ce3974c 100644 --- a/tox.ini +++ b/tox.ini @@ -51,3 +51,13 @@ commands = black --diff ./keylime ./test deps = isort commands = isort --diff --check ./keylime ./test + + +[testenv:pylint39] +basepython = python3.9 +deps = + -r{toxinidir}/requirements.txt + -r{toxinidir}/test-requirements.txt + pylint +commands = bash scripts/check_codestyle.sh +allowlist_externals = bash -- 2.47.1