629 lines
26 KiB
Diff
629 lines
26 KiB
Diff
From f7c32aec9c44a176124d982d942391ed3d50e846 Mon Sep 17 00:00:00 2001
|
|
From: Sergio Correia <scorreia@redhat.com>
|
|
Date: Tue, 3 Jun 2025 21:23:09 +0100
|
|
Subject: [PATCH 1/6] Make keylime compatible with python 3.9
|
|
|
|
Signed-off-by: Sergio Correia <scorreia@redhat.com>
|
|
---
|
|
keylime/ima/types.py | 33 ++++----
|
|
keylime/models/base/basic_model.py | 4 +-
|
|
keylime/models/base/basic_model_meta.py | 4 +-
|
|
keylime/models/base/field.py | 4 +-
|
|
keylime/models/base/persistable_model.py | 4 +-
|
|
keylime/models/base/type.py | 4 +-
|
|
keylime/models/base/types/base64_bytes.py | 4 +-
|
|
keylime/models/base/types/certificate.py | 92 +++++++++++----------
|
|
keylime/models/base/types/dictionary.py | 4 +-
|
|
keylime/models/base/types/one_of.py | 6 +-
|
|
keylime/models/registrar/registrar_agent.py | 31 +++----
|
|
keylime/policy/create_runtime_policy.py | 2 +-
|
|
keylime/registrar_client.py | 8 +-
|
|
keylime/web/base/action_handler.py | 7 +-
|
|
keylime/web/base/controller.py | 78 ++++++++---------
|
|
tox.ini | 10 +++
|
|
16 files changed, 154 insertions(+), 141 deletions(-)
|
|
|
|
diff --git a/keylime/ima/types.py b/keylime/ima/types.py
|
|
index 99f0aa7..a0fffdf 100644
|
|
--- a/keylime/ima/types.py
|
|
+++ b/keylime/ima/types.py
|
|
@@ -6,11 +6,6 @@ if sys.version_info >= (3, 8):
|
|
else:
|
|
from typing_extensions import Literal, TypedDict
|
|
|
|
-if sys.version_info >= (3, 11):
|
|
- from typing import NotRequired, Required
|
|
-else:
|
|
- from typing_extensions import NotRequired, Required
|
|
-
|
|
### Types for tpm_dm.py
|
|
|
|
RuleAttributeType = Optional[Union[int, str, bool]]
|
|
@@ -51,7 +46,7 @@ class Rule(TypedDict):
|
|
|
|
|
|
class Policies(TypedDict):
|
|
- version: Required[int]
|
|
+ version: int
|
|
match_on: MatchKeyType
|
|
rules: Dict[str, Rule]
|
|
|
|
@@ -60,27 +55,27 @@ class Policies(TypedDict):
|
|
|
|
|
|
class RPMetaType(TypedDict):
|
|
- version: Required[int]
|
|
- generator: NotRequired[int]
|
|
- timestamp: NotRequired[str]
|
|
+ version: int
|
|
+ generator: int
|
|
+ timestamp: str
|
|
|
|
|
|
class RPImaType(TypedDict):
|
|
- ignored_keyrings: Required[List[str]]
|
|
- log_hash_alg: Required[Literal["sha1", "sha256", "sha384", "sha512"]]
|
|
+ ignored_keyrings: List[str]
|
|
+ log_hash_alg: Literal["sha1", "sha256", "sha384", "sha512"]
|
|
dm_policy: Optional[Policies]
|
|
|
|
|
|
RuntimePolicyType = TypedDict(
|
|
"RuntimePolicyType",
|
|
{
|
|
- "meta": Required[RPMetaType],
|
|
- "release": NotRequired[int],
|
|
- "digests": Required[Dict[str, List[str]]],
|
|
- "excludes": Required[List[str]],
|
|
- "keyrings": Required[Dict[str, List[str]]],
|
|
- "ima": Required[RPImaType],
|
|
- "ima-buf": Required[Dict[str, List[str]]],
|
|
- "verification-keys": Required[str],
|
|
+ "meta": RPMetaType,
|
|
+ "release": int,
|
|
+ "digests": Dict[str, List[str]],
|
|
+ "excludes": List[str],
|
|
+ "keyrings": Dict[str, List[str]],
|
|
+ "ima": RPImaType,
|
|
+ "ima-buf": Dict[str, List[str]],
|
|
+ "verification-keys": str,
|
|
},
|
|
)
|
|
diff --git a/keylime/models/base/basic_model.py b/keylime/models/base/basic_model.py
|
|
index 68a126e..6f5de83 100644
|
|
--- a/keylime/models/base/basic_model.py
|
|
+++ b/keylime/models/base/basic_model.py
|
|
@@ -407,7 +407,9 @@ class BasicModel(ABC, metaclass=BasicModelMeta):
|
|
if max and length > max:
|
|
self._add_error(field, msg or f"should be at most {length} {element_type}(s)")
|
|
|
|
- def validate_number(self, field: str, *expressions: tuple[str, int | float], msg: Optional[str] = None) -> None:
|
|
+ def validate_number(
|
|
+ self, field: str, *expressions: tuple[str, Union[int, float]], msg: Optional[str] = None
|
|
+ ) -> None:
|
|
value = self.values.get(field)
|
|
|
|
if not value:
|
|
diff --git a/keylime/models/base/basic_model_meta.py b/keylime/models/base/basic_model_meta.py
|
|
index 353e004..84617d4 100644
|
|
--- a/keylime/models/base/basic_model_meta.py
|
|
+++ b/keylime/models/base/basic_model_meta.py
|
|
@@ -1,6 +1,6 @@
|
|
from abc import ABCMeta
|
|
from types import MappingProxyType
|
|
-from typing import Any, Callable, Mapping, TypeAlias, Union
|
|
+from typing import Any, Callable, Mapping, Union
|
|
|
|
from sqlalchemy.types import TypeEngine
|
|
|
|
@@ -40,7 +40,7 @@ class BasicModelMeta(ABCMeta):
|
|
|
|
# pylint: disable=bad-staticmethod-argument, no-value-for-parameter, using-constant-test
|
|
|
|
- DeclaredFieldType: TypeAlias = Union[ModelType, TypeEngine, type[ModelType], type[TypeEngine]]
|
|
+ DeclaredFieldType = Union[ModelType, TypeEngine, type[ModelType], type[TypeEngine]]
|
|
|
|
@classmethod
|
|
def _is_model_class(mcs, cls: type) -> bool: # type: ignore[reportSelfClassParameterName]
|
|
diff --git a/keylime/models/base/field.py b/keylime/models/base/field.py
|
|
index 7fb3dcb..d1e3bc3 100644
|
|
--- a/keylime/models/base/field.py
|
|
+++ b/keylime/models/base/field.py
|
|
@@ -1,6 +1,6 @@
|
|
import re
|
|
from inspect import isclass
|
|
-from typing import TYPE_CHECKING, Any, Optional, TypeAlias, Union
|
|
+from typing import TYPE_CHECKING, Any, Optional, Union
|
|
|
|
from sqlalchemy.types import TypeEngine
|
|
|
|
@@ -23,7 +23,7 @@ class ModelField:
|
|
[2] https://docs.python.org/3/library/functions.html#property
|
|
"""
|
|
|
|
- DeclaredFieldType: TypeAlias = Union[ModelType, TypeEngine, type[ModelType], type[TypeEngine]]
|
|
+ DeclaredFieldType = Union[ModelType, TypeEngine, type[ModelType], type[TypeEngine]]
|
|
|
|
FIELD_NAME_REGEX = re.compile(r"^[A-Za-z_]+[A-Za-z0-9_]*$")
|
|
|
|
diff --git a/keylime/models/base/persistable_model.py b/keylime/models/base/persistable_model.py
|
|
index 18f7d0d..015d661 100644
|
|
--- a/keylime/models/base/persistable_model.py
|
|
+++ b/keylime/models/base/persistable_model.py
|
|
@@ -1,4 +1,4 @@
|
|
-from typing import Any, Mapping, Optional, Sequence
|
|
+from typing import Any, Mapping, Optional, Sequence, Union
|
|
|
|
from keylime.models.base.basic_model import BasicModel
|
|
from keylime.models.base.db import db_manager
|
|
@@ -165,7 +165,7 @@ class PersistableModel(BasicModel, metaclass=PersistableModelMeta):
|
|
else:
|
|
return None
|
|
|
|
- def __init__(self, data: Optional[dict | object] = None, process_associations: bool = True) -> None:
|
|
+ def __init__(self, data: Optional[Union[dict, object]] = None, process_associations: bool = True) -> None:
|
|
if isinstance(data, type(self).db_mapping):
|
|
super().__init__({}, process_associations)
|
|
self._init_from_mapping(data, process_associations)
|
|
diff --git a/keylime/models/base/type.py b/keylime/models/base/type.py
|
|
index 2520f72..e4d924c 100644
|
|
--- a/keylime/models/base/type.py
|
|
+++ b/keylime/models/base/type.py
|
|
@@ -1,7 +1,7 @@
|
|
from decimal import Decimal
|
|
from inspect import isclass
|
|
from numbers import Real
|
|
-from typing import Any, TypeAlias, Union
|
|
+from typing import Any, Union
|
|
|
|
from sqlalchemy.engine.interfaces import Dialect
|
|
from sqlalchemy.types import TypeEngine
|
|
@@ -99,7 +99,7 @@ class ModelType:
|
|
you should instead set ``_type_engine`` to ``None`` and override the ``get_db_type`` method.
|
|
"""
|
|
|
|
- DeclaredTypeEngine: TypeAlias = Union[TypeEngine, type[TypeEngine]]
|
|
+ DeclaredTypeEngine = Union[TypeEngine, type[TypeEngine]]
|
|
|
|
def __init__(self, type_engine: DeclaredTypeEngine) -> None:
|
|
if isclass(type_engine) and issubclass(type_engine, TypeEngine):
|
|
diff --git a/keylime/models/base/types/base64_bytes.py b/keylime/models/base/types/base64_bytes.py
|
|
index b9b4b13..a1eeced 100644
|
|
--- a/keylime/models/base/types/base64_bytes.py
|
|
+++ b/keylime/models/base/types/base64_bytes.py
|
|
@@ -1,6 +1,6 @@
|
|
import base64
|
|
import binascii
|
|
-from typing import Optional, TypeAlias, Union
|
|
+from typing import Optional, Union
|
|
|
|
from sqlalchemy.types import Text
|
|
|
|
@@ -62,7 +62,7 @@ class Base64Bytes(ModelType):
|
|
b64_str = Base64Bytes().cast("MIIE...")
|
|
"""
|
|
|
|
- IncomingValue: TypeAlias = Union[bytes, str, None]
|
|
+ IncomingValue = Union[bytes, str, None]
|
|
|
|
def __init__(self) -> None:
|
|
super().__init__(Text)
|
|
diff --git a/keylime/models/base/types/certificate.py b/keylime/models/base/types/certificate.py
|
|
index 2c27603..0f03169 100644
|
|
--- a/keylime/models/base/types/certificate.py
|
|
+++ b/keylime/models/base/types/certificate.py
|
|
@@ -1,7 +1,7 @@
|
|
import base64
|
|
import binascii
|
|
import io
|
|
-from typing import Optional, TypeAlias, Union
|
|
+from typing import Optional, Union
|
|
|
|
import cryptography.x509
|
|
from cryptography.hazmat.primitives.serialization import Encoding
|
|
@@ -78,7 +78,7 @@ class Certificate(ModelType):
|
|
cert = Certificate().cast("-----BEGIN CERTIFICATE-----\nMIIE...")
|
|
"""
|
|
|
|
- IncomingValue: TypeAlias = Union[cryptography.x509.Certificate, bytes, str, None]
|
|
+ IncomingValue = Union[cryptography.x509.Certificate, bytes, str, None]
|
|
|
|
def __init__(self) -> None:
|
|
super().__init__(Text)
|
|
@@ -195,18 +195,19 @@ class Certificate(ModelType):
|
|
"""
|
|
|
|
try:
|
|
- match self.infer_encoding(value):
|
|
- case "decoded":
|
|
- return None
|
|
- case "der":
|
|
- cryptography.x509.load_der_x509_certificate(value) # type: ignore[reportArgumentType, arg-type]
|
|
- case "pem":
|
|
- cryptography.x509.load_pem_x509_certificate(value) # type: ignore[reportArgumentType, arg-type]
|
|
- case "base64":
|
|
- der_value = base64.b64decode(value, validate=True) # type: ignore[reportArgumentType, arg-type]
|
|
- cryptography.x509.load_der_x509_certificate(der_value)
|
|
- case _:
|
|
- raise Exception
|
|
+ encoding_inf = self.infer_encoding(value)
|
|
+ if encoding_inf == "decoded":
|
|
+ return None
|
|
+
|
|
+ if encoding_inf == "der":
|
|
+ cryptography.x509.load_der_x509_certificate(value) # type: ignore[reportArgumentType, arg-type]
|
|
+ elif encoding_inf == "pem":
|
|
+ cryptography.x509.load_pem_x509_certificate(value) # type: ignore[reportArgumentType, arg-type]
|
|
+ elif encoding_inf == "base64":
|
|
+ der_value = base64.b64decode(value, validate=True) # type: ignore[reportArgumentType, arg-type]
|
|
+ cryptography.x509.load_der_x509_certificate(der_value)
|
|
+ else:
|
|
+ raise Exception
|
|
except Exception:
|
|
return False
|
|
|
|
@@ -227,37 +228,38 @@ class Certificate(ModelType):
|
|
if not value:
|
|
return None
|
|
|
|
- match self.infer_encoding(value):
|
|
- case "decoded":
|
|
- return value # type: ignore[reportReturnType, return-value]
|
|
- case "der":
|
|
- try:
|
|
- return self._load_der_cert(value) # type: ignore[reportArgumentType, arg-type]
|
|
- except PyAsn1Error as err:
|
|
- raise ValueError(
|
|
- f"value cast to certificate appears DER encoded but cannot be deserialized as such: {value!r}"
|
|
- ) from err
|
|
- case "pem":
|
|
- try:
|
|
- return self._load_pem_cert(value) # type: ignore[reportArgumentType, arg-type]
|
|
- except PyAsn1Error as err:
|
|
- raise ValueError(
|
|
- f"value cast to certificate appears PEM encoded but cannot be deserialized as such: "
|
|
- f"'{str(value)}'"
|
|
- ) from err
|
|
- case "base64":
|
|
- try:
|
|
- return self._load_der_cert(base64.b64decode(value, validate=True)) # type: ignore[reportArgumentType, arg-type]
|
|
- except (binascii.Error, PyAsn1Error) as err:
|
|
- raise ValueError(
|
|
- f"value cast to certificate appears Base64 encoded but cannot be deserialized as such: "
|
|
- f"'{str(value)}'"
|
|
- ) from err
|
|
- case _:
|
|
- raise TypeError(
|
|
- f"value cast to certificate is of type '{value.__class__.__name__}' but should be one of 'str', "
|
|
- f"'bytes' or 'cryptography.x509.Certificate': '{str(value)}'"
|
|
- )
|
|
+ encoding_inf = self.infer_encoding(value)
|
|
+ if encoding_inf == "decoded":
|
|
+ return value # type: ignore[reportReturnType, return-value]
|
|
+
|
|
+ if encoding_inf == "der":
|
|
+ try:
|
|
+ return self._load_der_cert(value) # type: ignore[reportArgumentType, arg-type]
|
|
+ except PyAsn1Error as err:
|
|
+ raise ValueError(
|
|
+ f"value cast to certificate appears DER encoded but cannot be deserialized as such: {value!r}"
|
|
+ ) from err
|
|
+ elif encoding_inf == "pem":
|
|
+ try:
|
|
+ return self._load_pem_cert(value) # type: ignore[reportArgumentType, arg-type]
|
|
+ except PyAsn1Error as err:
|
|
+ raise ValueError(
|
|
+ f"value cast to certificate appears PEM encoded but cannot be deserialized as such: "
|
|
+ f"'{str(value)}'"
|
|
+ ) from err
|
|
+ elif encoding_inf == "base64":
|
|
+ try:
|
|
+ return self._load_der_cert(base64.b64decode(value, validate=True)) # type: ignore[reportArgumentType, arg-type]
|
|
+ except (binascii.Error, PyAsn1Error) as err:
|
|
+ raise ValueError(
|
|
+ f"value cast to certificate appears Base64 encoded but cannot be deserialized as such: "
|
|
+ f"'{str(value)}'"
|
|
+ ) from err
|
|
+ else:
|
|
+ raise TypeError(
|
|
+ f"value cast to certificate is of type '{value.__class__.__name__}' but should be one of 'str', "
|
|
+ f"'bytes' or 'cryptography.x509.Certificate': '{str(value)}'"
|
|
+ )
|
|
|
|
def generate_error_msg(self, _value: IncomingValue) -> str:
|
|
return "must be a valid X.509 certificate in PEM format or otherwise encoded using Base64"
|
|
diff --git a/keylime/models/base/types/dictionary.py b/keylime/models/base/types/dictionary.py
|
|
index 7d9e811..d9ffec3 100644
|
|
--- a/keylime/models/base/types/dictionary.py
|
|
+++ b/keylime/models/base/types/dictionary.py
|
|
@@ -1,5 +1,5 @@
|
|
import json
|
|
-from typing import Optional, TypeAlias, Union
|
|
+from typing import Optional, Union
|
|
|
|
from sqlalchemy.types import Text
|
|
|
|
@@ -50,7 +50,7 @@ class Dictionary(ModelType):
|
|
kv_pairs = Dictionary().cast('{"key": "value"}')
|
|
"""
|
|
|
|
- IncomingValue: TypeAlias = Union[dict, str, None]
|
|
+ IncomingValue = Union[dict, str, None]
|
|
|
|
def __init__(self) -> None:
|
|
super().__init__(Text)
|
|
diff --git a/keylime/models/base/types/one_of.py b/keylime/models/base/types/one_of.py
|
|
index 479d417..faf097d 100644
|
|
--- a/keylime/models/base/types/one_of.py
|
|
+++ b/keylime/models/base/types/one_of.py
|
|
@@ -1,6 +1,6 @@
|
|
from collections import Counter
|
|
from inspect import isclass
|
|
-from typing import Any, Optional, TypeAlias, Union
|
|
+from typing import Any, Optional, Union
|
|
|
|
from sqlalchemy.engine.interfaces import Dialect
|
|
from sqlalchemy.types import Float, Integer, String, TypeEngine
|
|
@@ -65,8 +65,8 @@ class OneOf(ModelType):
|
|
incoming PEM value would not be cast to a certificate object and remain a string.
|
|
"""
|
|
|
|
- Declaration: TypeAlias = Union[str, int, float, ModelType, TypeEngine, type[ModelType], type[TypeEngine]]
|
|
- PermittedList: TypeAlias = list[Union[str, int, float, ModelType]]
|
|
+ Declaration = Union[str, int, float, ModelType, TypeEngine, type[ModelType], type[TypeEngine]]
|
|
+ PermittedList = list[Union[str, int, float, ModelType]]
|
|
|
|
def __init__(self, *args: Declaration) -> None:
|
|
# pylint: disable=super-init-not-called
|
|
diff --git a/keylime/models/registrar/registrar_agent.py b/keylime/models/registrar/registrar_agent.py
|
|
index 560c188..b232049 100644
|
|
--- a/keylime/models/registrar/registrar_agent.py
|
|
+++ b/keylime/models/registrar/registrar_agent.py
|
|
@@ -153,21 +153,22 @@ class RegistrarAgent(PersistableModel):
|
|
names = ", ".join(non_compliant_certs)
|
|
names = " and".join(names.rsplit(",", 1))
|
|
|
|
- match config.get("registrar", "malformed_cert_action"):
|
|
- case "ignore":
|
|
- return
|
|
- case "reject":
|
|
- logger.error(
|
|
- "Certificate(s) %s may not conform to strict ASN.1 DER encoding rules and were rejected due to "
|
|
- "config ('malformed_cert_action = reject')",
|
|
- names,
|
|
- )
|
|
- case _:
|
|
- logger.warning(
|
|
- "Certificate(s) %s may not conform to strict ASN.1 DER encoding rules and were re-encoded before "
|
|
- "parsing by python-cryptography",
|
|
- names,
|
|
- )
|
|
+ cfg = config.get("registrar", "malformed_cert_action")
|
|
+ if cfg == "ignore":
|
|
+ return
|
|
+
|
|
+ if cfg == "reject":
|
|
+ logger.error(
|
|
+ "Certificate(s) %s may not conform to strict ASN.1 DER encoding rules and were rejected due to "
|
|
+ "config ('malformed_cert_action = reject')",
|
|
+ names,
|
|
+ )
|
|
+ else:
|
|
+ logger.warning(
|
|
+ "Certificate(s) %s may not conform to strict ASN.1 DER encoding rules and were re-encoded before "
|
|
+ "parsing by python-cryptography",
|
|
+ names,
|
|
+ )
|
|
|
|
def _bind_ak_to_iak(self, iak_attest, iak_sign):
|
|
# The ak-iak binding should only be verified when either aik_tpm or iak_tpm is changed
|
|
diff --git a/keylime/policy/create_runtime_policy.py b/keylime/policy/create_runtime_policy.py
|
|
index 6a412c4..8e1c687 100644
|
|
--- a/keylime/policy/create_runtime_policy.py
|
|
+++ b/keylime/policy/create_runtime_policy.py
|
|
@@ -972,7 +972,7 @@ def create_runtime_policy(args: argparse.Namespace) -> Optional[RuntimePolicyTyp
|
|
)
|
|
abort = True
|
|
else:
|
|
- if a not in algorithms.Hash:
|
|
+ if a not in set(algorithms.Hash):
|
|
if a == SHA256_OR_SM3:
|
|
algo = a
|
|
else:
|
|
diff --git a/keylime/registrar_client.py b/keylime/registrar_client.py
|
|
index 705ff12..97fbc2a 100644
|
|
--- a/keylime/registrar_client.py
|
|
+++ b/keylime/registrar_client.py
|
|
@@ -13,12 +13,6 @@ if sys.version_info >= (3, 8):
|
|
else:
|
|
from typing_extensions import TypedDict
|
|
|
|
-if sys.version_info >= (3, 11):
|
|
- from typing import NotRequired
|
|
-else:
|
|
- from typing_extensions import NotRequired
|
|
-
|
|
-
|
|
class RegistrarData(TypedDict):
|
|
ip: Optional[str]
|
|
port: Optional[str]
|
|
@@ -27,7 +21,7 @@ class RegistrarData(TypedDict):
|
|
aik_tpm: str
|
|
ek_tpm: str
|
|
ekcert: Optional[str]
|
|
- provider_keys: NotRequired[Dict[str, str]]
|
|
+ provider_keys: Dict[str, str]
|
|
|
|
|
|
logger = keylime_logging.init_logging("registrar_client")
|
|
diff --git a/keylime/web/base/action_handler.py b/keylime/web/base/action_handler.py
|
|
index b20de89..e7b5888 100644
|
|
--- a/keylime/web/base/action_handler.py
|
|
+++ b/keylime/web/base/action_handler.py
|
|
@@ -1,4 +1,5 @@
|
|
import re
|
|
+import sys
|
|
import time
|
|
import traceback
|
|
from inspect import iscoroutinefunction
|
|
@@ -48,7 +49,11 @@ class ActionHandler(RequestHandler):
|
|
|
|
# Take the list of strings returned by format_exception, where each string ends in a newline and may contain
|
|
# internal newlines, and split the concatenation of all the strings by newline
|
|
- message = "".join(traceback.format_exception(err))
|
|
+ if sys.version_info < (3, 10):
|
|
+ message = "".join(traceback.format_exception(err, None, None))
|
|
+ else:
|
|
+ message = "".join(traceback.format_exception(err))
|
|
+
|
|
lines = message.split("\n")
|
|
|
|
for line in lines:
|
|
diff --git a/keylime/web/base/controller.py b/keylime/web/base/controller.py
|
|
index f1ac3c5..153535e 100644
|
|
--- a/keylime/web/base/controller.py
|
|
+++ b/keylime/web/base/controller.py
|
|
@@ -2,7 +2,7 @@ import http.client
|
|
import json
|
|
import re
|
|
from types import MappingProxyType
|
|
-from typing import TYPE_CHECKING, Any, Mapping, Optional, Sequence, TypeAlias, Union
|
|
+from typing import TYPE_CHECKING, Any, Mapping, Optional, Sequence, Union
|
|
|
|
from tornado.escape import parse_qs_bytes
|
|
from tornado.httputil import parse_body_arguments
|
|
@@ -15,14 +15,16 @@ if TYPE_CHECKING:
|
|
from keylime.models.base.basic_model import BasicModel
|
|
from keylime.web.base.action_handler import ActionHandler
|
|
|
|
-PathParams: TypeAlias = Mapping[str, str]
|
|
-QueryParams: TypeAlias = Mapping[str, str | Sequence[str]]
|
|
-MultipartParams: TypeAlias = Mapping[str, Union[str, bytes, Sequence[str | bytes]]]
|
|
-FormParams: TypeAlias = Union[QueryParams, MultipartParams]
|
|
-JSONConvertible: TypeAlias = Union[str, int, float, bool, None, "JSONObjectConvertible", "JSONArrayConvertible"]
|
|
-JSONObjectConvertible: TypeAlias = Mapping[str, JSONConvertible]
|
|
-JSONArrayConvertible: TypeAlias = Sequence[JSONConvertible] # pyright: ignore[reportInvalidTypeForm]
|
|
-Params: TypeAlias = Mapping[str, Union[str, bytes, Sequence[str | bytes], JSONObjectConvertible, JSONArrayConvertible]]
|
|
+PathParams = Mapping[str, str]
|
|
+QueryParams = Mapping[str, Union[str, Sequence[str]]]
|
|
+MultipartParams = Mapping[str, Union[str, bytes, Union[Sequence[str], Sequence[bytes]]]]
|
|
+FormParams = Union[QueryParams, MultipartParams]
|
|
+JSONConvertible = Union[str, int, float, bool, None, "JSONObjectConvertible", "JSONArrayConvertible"]
|
|
+JSONObjectConvertible = Mapping[str, JSONConvertible]
|
|
+JSONArrayConvertible = Sequence[JSONConvertible] # pyright: ignore[reportInvalidTypeForm]
|
|
+Params = Mapping[
|
|
+ str, Union[str, bytes, Union[Sequence[str], Sequence[bytes]], JSONObjectConvertible, JSONArrayConvertible]
|
|
+]
|
|
|
|
|
|
class Controller:
|
|
@@ -77,7 +79,7 @@ class Controller:
|
|
VERSION_REGEX = re.compile("^\\/v(\\d+)(?:\\.(\\d+))*")
|
|
|
|
@staticmethod
|
|
- def decode_url_query(query: str | bytes) -> QueryParams:
|
|
+ def decode_url_query(query: Union[str, bytes]) -> QueryParams:
|
|
"""Parses a binary query string (whether from a URL or HTTP body) into a dict of Unicode strings. If multiple
|
|
instances of the same key are present in the string, their values are collected into a list.
|
|
|
|
@@ -135,8 +137,8 @@ class Controller:
|
|
|
|
@staticmethod
|
|
def prepare_http_body(
|
|
- body: Union[str, JSONObjectConvertible | JSONArrayConvertible, Any], content_type: Optional[str] = None
|
|
- ) -> tuple[Optional[bytes | Any], Optional[str]]:
|
|
+ body: Union[str, Union[JSONObjectConvertible, JSONArrayConvertible], Any], content_type: Optional[str] = None
|
|
+ ) -> tuple[Optional[Union[bytes, Any]], Optional[str]]:
|
|
"""Prepares an object to be included in the body of an HTTP request or response and infers the appropriate
|
|
media type unless provided. ``body`` will be serialised into JSON if it contains a ``dict`` or ``list`` which is
|
|
serialisable unless a ``content_type`` other than ``"application/json"`` is provided.
|
|
@@ -155,32 +157,34 @@ class Controller:
|
|
if content_type:
|
|
content_type = content_type.lower().strip()
|
|
|
|
- body_out: Optional[bytes | Any]
|
|
- content_type_out: Optional[str]
|
|
-
|
|
- match (body, content_type):
|
|
- case (None, _):
|
|
- body_out = None
|
|
- content_type_out = content_type
|
|
- case ("", _):
|
|
- body_out = b""
|
|
- content_type_out = "text/plain; charset=utf-8"
|
|
- case (_, "text/plain"):
|
|
+ body_out: Optional[bytes | Any] = None
|
|
+ content_type_out: Optional[str] = None
|
|
+
|
|
+ if body is None:
|
|
+ body_out = None
|
|
+ content_type_out = content_type
|
|
+ elif body == "":
|
|
+ body_out = b""
|
|
+ content_type_out = "text/plain; charset=utf-8"
|
|
+ else:
|
|
+ if content_type == "text/plain":
|
|
body_out = str(body).encode("utf-8")
|
|
content_type_out = "text/plain; charset=utf-8"
|
|
- case (_, "application/json") if isinstance(body, str):
|
|
- body_out = body.encode("utf-8")
|
|
- content_type_out = "application/json"
|
|
- case (_, "application/json"):
|
|
- body_out = json.dumps(body, allow_nan=False, indent=4).encode("utf-8")
|
|
- content_type_out = "application/json"
|
|
- case (_, None) if isinstance(body, str):
|
|
- body_out = body.encode("utf-8")
|
|
- content_type_out = "text/plain; charset=utf-8"
|
|
- case (_, None) if isinstance(body, (dict, list)):
|
|
- body_out = json.dumps(body, allow_nan=False, indent=4).encode("utf-8")
|
|
- content_type_out = "application/json"
|
|
- case (_, _):
|
|
+ elif content_type == "application/json":
|
|
+ if isinstance(body, str):
|
|
+ body_out = body.encode("utf-8")
|
|
+ content_type_out = "application/json"
|
|
+ else:
|
|
+ body_out = json.dumps(body, allow_nan=False, indent=4).encode("utf-8")
|
|
+ content_type_out = "application/json"
|
|
+ elif content_type is None:
|
|
+ if isinstance(body, str):
|
|
+ body_out = body.encode("utf-8")
|
|
+ content_type_out = "text/plain; charset=utf-8"
|
|
+ elif isinstance(body, (dict, list)):
|
|
+ body_out = json.dumps(body, allow_nan=False, indent=4).encode("utf-8")
|
|
+ content_type_out = "application/json"
|
|
+ else:
|
|
body_out = body
|
|
content_type_out = content_type
|
|
|
|
@@ -248,7 +252,7 @@ class Controller:
|
|
self,
|
|
code: int = 200,
|
|
status: Optional[str] = None,
|
|
- data: Optional[JSONObjectConvertible | JSONArrayConvertible] = None,
|
|
+ data: Optional[Union[JSONObjectConvertible, JSONArrayConvertible]] = None,
|
|
) -> None:
|
|
"""Converts a Python data structure to JSON and wraps it in the following boilerplate JSON object which is
|
|
returned by all v2 endpoints:
|
|
diff --git a/tox.ini b/tox.ini
|
|
index 031ac54..ce3974c 100644
|
|
--- a/tox.ini
|
|
+++ b/tox.ini
|
|
@@ -51,3 +51,13 @@ commands = black --diff ./keylime ./test
|
|
deps =
|
|
isort
|
|
commands = isort --diff --check ./keylime ./test
|
|
+
|
|
+
|
|
+[testenv:pylint39]
|
|
+basepython = python3.9
|
|
+deps =
|
|
+ -r{toxinidir}/requirements.txt
|
|
+ -r{toxinidir}/test-requirements.txt
|
|
+ pylint
|
|
+commands = bash scripts/check_codestyle.sh
|
|
+allowlist_externals = bash
|
|
--
|
|
2.47.1
|
|
|