forked from almalinux/cas_wrapper
d68685bf72
Co-authored-by: Daniil Anfimov <anfimovdan@gmail.com> Reviewed-on: almalinux/cas_wrapper#7
254 lines
8.2 KiB
Python
254 lines
8.2 KiB
Python
import json
|
|
import logging
|
|
import typing
|
|
from functools import wraps
|
|
from pathlib import Path
|
|
|
|
from plumbum import ProcessExecutionError, local
|
|
|
|
DEFAULT_BINARY_NAME = "vcn"
|
|
DEFAULT_BINARY_PATH = "/usr/local/bin/"
|
|
|
|
|
|
def with_env_context(func):
|
|
@wraps(func)
|
|
def wrapper(self, *args, **kwargs):
|
|
if not isinstance(self, CasWrapper):
|
|
raise TypeError(
|
|
'Cannot use "with_env_context" decorator outside of CasWrapper instance'
|
|
)
|
|
with local.env(
|
|
VCN_LC_HOST=self._vcn_lc_host,
|
|
VCN_LC_API_KEY=self._vcn_lc_api_key,
|
|
VCN_LC_PORT=self._vcn_lc_port,
|
|
):
|
|
return func(self, *args, **kwargs)
|
|
|
|
return wrapper
|
|
|
|
|
|
class CasWrapper:
|
|
"""
|
|
The python wrapper around binary `vcn`
|
|
from Codenotary Community Attestation Service
|
|
"""
|
|
|
|
@classmethod
|
|
def _is_binary_present(
|
|
cls,
|
|
binary_name: str,
|
|
binary_path: str,
|
|
):
|
|
if not Path(binary_path, binary_name).exists():
|
|
raise FileNotFoundError(
|
|
f"Binary VCN is not found in {binary_path} on the machine",
|
|
)
|
|
|
|
def __init__(
|
|
self,
|
|
vcn_lc_api_key: str,
|
|
vcn_lc_host: str = "eval-honeywell.codenotary.com",
|
|
vcn_lc_port: int = 443,
|
|
logger: logging.Logger = None,
|
|
binary_name: str = DEFAULT_BINARY_NAME,
|
|
binary_path: str = DEFAULT_BINARY_PATH,
|
|
):
|
|
self._is_binary_present(binary_name, binary_path)
|
|
self._vcn_lc_api_key = vcn_lc_api_key
|
|
self._vcn_lc_host = vcn_lc_host
|
|
self._vcn_lc_port = vcn_lc_port
|
|
self._binary_name = binary_name
|
|
self._binary_path = binary_path
|
|
self._full_binary_path = Path(self._binary_path, self._binary_name)
|
|
self._vcn = local[str(self._full_binary_path)]
|
|
self._logger = logger
|
|
if self._logger is None:
|
|
self._logger = logging.getLogger()
|
|
|
|
@classmethod
|
|
def get_version(
|
|
cls,
|
|
binary_name: str = DEFAULT_BINARY_NAME,
|
|
binary_path: str = DEFAULT_BINARY_PATH,
|
|
):
|
|
cls._is_binary_present(binary_name, binary_path)
|
|
full_path = Path(binary_path, binary_name)
|
|
command = local[str(full_path)]["--version"]
|
|
version = command().split()[-1].split("v")[1]
|
|
return version
|
|
|
|
@with_env_context
|
|
def ensure_login(self):
|
|
self._vcn["login"]()
|
|
|
|
@with_env_context
|
|
def notarize(
|
|
self,
|
|
local_path: str,
|
|
metadata: typing.Dict = None,
|
|
) -> str:
|
|
"""
|
|
Wrapper around `vcn notarize`
|
|
:param local_path: path to a local Git repo
|
|
:param metadata: additional metadata
|
|
:return: hash of notarized commit
|
|
:rtype: str
|
|
"""
|
|
command = self._vcn[
|
|
"notarize",
|
|
local_path,
|
|
"-o",
|
|
"json",
|
|
]
|
|
if metadata is not None:
|
|
for key, value in metadata.items():
|
|
command = command[
|
|
"-a",
|
|
f"{key}={value}",
|
|
]
|
|
result_of_execution = command()
|
|
result_of_execution, *_ = json.loads(result_of_execution)
|
|
return result_of_execution["hash"]
|
|
|
|
def notarize_no_exc(
|
|
self,
|
|
local_path: str,
|
|
metadata: typing.Dict = None,
|
|
) -> typing.Tuple[bool, str]:
|
|
"""
|
|
Wrapper for avoiding raising exceptions during notarization.
|
|
Return `success` flag instead for library user to react respectively.
|
|
:param local_path: path to a local Git repo
|
|
:param metadata: additional metadata
|
|
:return: boolean flag for operation success and the hash
|
|
of the notarized artifact.
|
|
:rtype: tuple
|
|
"""
|
|
success = False
|
|
try:
|
|
vcn_hash = self.notarize(local_path, metadata=metadata)
|
|
success = True
|
|
except Exception:
|
|
self._logger.exception("Cannot notarize artifact: %s", local_path)
|
|
vcn_hash = ""
|
|
return success, vcn_hash
|
|
|
|
@with_env_context
|
|
def authenticate(
|
|
self,
|
|
local_path: str,
|
|
return_json: bool = False,
|
|
use_hash: bool = False,
|
|
signer_id: str = "",
|
|
) -> typing.Union[bool, dict]:
|
|
"""
|
|
Wrapper around `vcn authenticate`
|
|
:param local_path: path to a local Git repo
|
|
(should be started from `git://`)
|
|
or to a single local file or hash
|
|
:param return_json: flag for return json response
|
|
:param use_hash: flag for authenticate by hash
|
|
:return: true if a commit is trusted, vice versa - false
|
|
or dict with result if return_json param is True
|
|
:rtype: bool or dict
|
|
"""
|
|
command_args = ["authenticate", local_path]
|
|
if use_hash:
|
|
command_args = ["authenticate", "--hash", local_path]
|
|
if signer_id:
|
|
command_args.extend(("--signerID", signer_id))
|
|
command_args.extend(("-o", "json"))
|
|
command = self._vcn[command_args]
|
|
try:
|
|
result_of_execution = command()
|
|
except ProcessExecutionError:
|
|
# in case if commit is untrusted
|
|
result_of_execution = command(retcode=1)
|
|
json_result = json.loads(result_of_execution)
|
|
if return_json:
|
|
return json_result
|
|
return not bool(json_result["status"])
|
|
|
|
def authenticate_source(
|
|
self,
|
|
local_path: str,
|
|
signer_id: str = "",
|
|
) -> typing.Tuple[bool, typing.Optional[str]]:
|
|
"""
|
|
Authenticates source by git path.
|
|
Returns authenticate result and source commit hash.
|
|
"""
|
|
is_authenticated = False
|
|
commit_vcn_hash = None
|
|
self.ensure_login()
|
|
try:
|
|
result_json = self.authenticate(
|
|
local_path,
|
|
return_json=True,
|
|
signer_id=signer_id,
|
|
)
|
|
is_authenticated = result_json["verified"]
|
|
commit_vcn_hash = result_json["hash"]
|
|
# we can fall with ProcessExecutionError,
|
|
# because source can be not notarized
|
|
except ProcessExecutionError:
|
|
self._logger.exception("Cannot authenticate: %s", local_path)
|
|
return is_authenticated, commit_vcn_hash
|
|
|
|
def authenticate_artifact(
|
|
self,
|
|
local_path: str,
|
|
use_hash: bool = False,
|
|
signer_id: str = "",
|
|
) -> bool:
|
|
"""
|
|
Authenticates artifact by artifact path or hash if `use_hash` is True.
|
|
Returns authenticate result.
|
|
"""
|
|
is_authenticated = False
|
|
self.ensure_login()
|
|
try:
|
|
is_authenticated = self.authenticate(
|
|
local_path,
|
|
use_hash=use_hash,
|
|
return_json=True,
|
|
signer_id=signer_id,
|
|
)["verified"]
|
|
# we can fall with ProcessExecutionError,
|
|
# because artifact can be not notarized
|
|
except ProcessExecutionError:
|
|
self._logger.exception("Cannot authenticate: %s", local_path)
|
|
return is_authenticated
|
|
|
|
def notarize_artifacts(
|
|
self,
|
|
artifact_paths: typing.List[str],
|
|
metadata: typing.Dict[str, typing.Any],
|
|
) -> typing.Tuple[bool, typing.Dict[str, str]]:
|
|
"""
|
|
Notarize artifacts by their paths.
|
|
Returns `True` if all artifacts was successful notarizated
|
|
and dict with VCN hashes.
|
|
"""
|
|
all_artifacts_is_notarized = True
|
|
notarized_artifacts = {}
|
|
self.ensure_login()
|
|
|
|
# ALBS-576: We stopped doing this process in parallel due to the
|
|
# problems experienced and described in this CAS issue:
|
|
# https://github.com/codenotary/cas/issues/275
|
|
# Hence, we decided to go sequential here until the problem is
|
|
# resolved in CAS itself.
|
|
for artifact_path in artifact_paths:
|
|
try:
|
|
vcn_artifact_hash = self.notarize(artifact_path, metadata)
|
|
except Exception:
|
|
self._logger.exception(
|
|
"Cannot notarize artifact: %s",
|
|
artifact_path,
|
|
)
|
|
all_artifacts_is_notarized = False
|
|
continue
|
|
notarized_artifacts[artifact_path] = vcn_artifact_hash
|
|
return all_artifacts_is_notarized, notarized_artifacts
|