Merge pull request 'Revert "ALBS-1122: Replace notarization interface from CAS to VCN (#7)"' (#8) from revert into master

Reviewed-on: #8
This commit is contained in:
danfimov 2023-06-22 11:40:26 +00:00
commit d5ae64321c
3 changed files with 92 additions and 117 deletions

View File

@ -1,3 +1,3 @@
# cas_wrapper # cas_wrapper
The python wrapper around binary `vcn` from project Codenotary Community Attestation Service. The python wrapper around binary `cas` from project Codenotary Community Attestation Service.

View File

@ -1,119 +1,88 @@
import json import json
import logging import logging
import typing import typing
from functools import wraps
from pathlib import Path
from plumbum import ProcessExecutionError, local from plumbum import local, ProcessExecutionError
DEFAULT_BINARY_NAME = "vcn"
DEFAULT_BINARY_PATH = "/usr/local/bin/"
def with_env_context(func):
@wraps(func)
def wrapper(self, *args, **kwargs):
if not isinstance(self, CasWrapper):
raise TypeError(
'Cannot use "with_env_context" decorator outside of CasWrapper instance'
)
with local.env(
VCN_LC_HOST=self._vcn_lc_host,
VCN_LC_API_KEY=self._vcn_lc_api_key,
VCN_LC_PORT=self._vcn_lc_port,
):
return func(self, *args, **kwargs)
return wrapper
class CasWrapper: class CasWrapper:
""" """
The python wrapper around binary `vcn` The python wrapper around binary `cas`
from Codenotary Community Attestation Service from Codenotary Community Attestation Service
""" """
binary_name = 'cas'
@classmethod @classmethod
def _is_binary_present( def _is_binary_present(cls):
cls, if cls.binary_name not in local:
binary_name: str,
binary_path: str,
):
if not Path(binary_path, binary_name).exists():
raise FileNotFoundError( raise FileNotFoundError(
f"Binary VCN is not found in {binary_path} on the machine", 'Binary CAS is not found in PATH on the machine',
) )
def __init__( def __init__(
self, self,
vcn_lc_api_key: str, cas_api_key: str,
vcn_lc_host: str = "eval-honeywell.codenotary.com", cas_signer_id: str,
vcn_lc_port: int = 443, logger: logging.Logger = None,
logger: logging.Logger = None,
binary_name: str = DEFAULT_BINARY_NAME,
binary_path: str = DEFAULT_BINARY_PATH,
): ):
self._is_binary_present(binary_name, binary_path) self._is_binary_present()
self._vcn_lc_api_key = vcn_lc_api_key self._cas_api_key = cas_api_key
self._vcn_lc_host = vcn_lc_host self._cas_signer_id = cas_signer_id
self._vcn_lc_port = vcn_lc_port self._cas = local['cas']
self._binary_name = binary_name
self._binary_path = binary_path
self._full_binary_path = Path(self._binary_path, self._binary_name)
self._vcn = local[str(self._full_binary_path)]
self._logger = logger self._logger = logger
if self._logger is None: if self._logger is None:
self._logger = logging.getLogger() self._logger = logging.getLogger()
@classmethod @classmethod
def get_version( def get_version(cls):
cls, cls._is_binary_present()
binary_name: str = DEFAULT_BINARY_NAME, command = local['cas']['--version']
binary_path: str = DEFAULT_BINARY_PATH, version = command().split()[-1].split('v')[1]
):
cls._is_binary_present(binary_name, binary_path)
full_path = Path(binary_path, binary_name)
command = local[str(full_path)]["--version"]
version = command().split()[-1].split("v")[1]
return version return version
@with_env_context
def ensure_login(self): def ensure_login(self):
self._vcn["login"]() with local.env(
CAS_API_KEY=self._cas_api_key,
SIGNER_ID=self._cas_signer_id,
):
self._cas['login']()
@with_env_context
def notarize( def notarize(
self, self,
local_path: str, local_path: str,
metadata: typing.Dict = None, metadata: typing.Dict = None,
) -> str: ) -> str:
""" """
Wrapper around `vcn notarize` Wrapper around `cas notarize`
:param local_path: path to a local Git repo :param local_path: path to a local Git repo
:param metadata: additional metadata :param metadata: additional metadata
:return: hash of notarized commit :return: hash of notarized commit
:rtype: str :rtype: str
""" """
command = self._vcn[ command = self._cas[
"notarize", 'notarize',
local_path, local_path,
"-o", '-o',
"json", 'json',
] ]
if metadata is not None: if metadata is not None:
for key, value in metadata.items(): for key, value in metadata.items():
command = command[ command = command[
"-a", '-a',
f"{key}={value}", f'{key}={value}',
] ]
result_of_execution = command() with local.env(
result_of_execution, *_ = json.loads(result_of_execution) CAS_API_KEY=self._cas_api_key,
return result_of_execution["hash"] SIGNER_ID=self._cas_signer_id
):
result_of_execution = command()
return json.loads(result_of_execution)['hash']
def notarize_no_exc( def notarize_no_exc(
self, self,
local_path: str, local_path: str,
metadata: typing.Dict = None, metadata: typing.Dict = None,
) -> typing.Tuple[bool, str]: ) -> typing.Tuple[bool, str]:
""" """
Wrapper for avoiding raising exceptions during notarization. Wrapper for avoiding raising exceptions during notarization.
@ -126,23 +95,23 @@ class CasWrapper:
""" """
success = False success = False
try: try:
vcn_hash = self.notarize(local_path, metadata=metadata) cas_hash = self.notarize(local_path, metadata=metadata)
success = True success = True
except Exception: except Exception:
self._logger.exception("Cannot notarize artifact: %s", local_path) self._logger.exception('Cannot notarize artifact: %s',
vcn_hash = "" local_path)
return success, vcn_hash cas_hash = ''
return success, cas_hash
@with_env_context
def authenticate( def authenticate(
self, self,
local_path: str, local_path: str,
return_json: bool = False, return_json: bool = False,
use_hash: bool = False, use_hash: bool = False,
signer_id: str = "", signer_id: str = None,
) -> typing.Union[bool, dict]: ):
""" """
Wrapper around `vcn authenticate` Wrapper around `cas authenticate`
:param local_path: path to a local Git repo :param local_path: path to a local Git repo
(should be started from `git://`) (should be started from `git://`)
or to a single local file or hash or to a single local file or hash
@ -152,54 +121,62 @@ class CasWrapper:
or dict with result if return_json param is True or dict with result if return_json param is True
:rtype: bool or dict :rtype: bool or dict
""" """
command_args = ["authenticate", local_path] command_args = ['authenticate', local_path]
if use_hash: if use_hash:
command_args = ["authenticate", "--hash", local_path] command_args = ['authenticate', '--hash', local_path]
if signer_id: if signer_id:
command_args.extend(("--signerID", signer_id)) command_args.extend(('--signerID', signer_id))
command_args.extend(("-o", "json")) command_args.extend(('-o', 'json'))
command = self._vcn[command_args] command = self._cas[command_args]
try: try:
result_of_execution = command() with local.env(
CAS_API_KEY=self._cas_api_key,
SIGNER_ID=self._cas_signer_id
):
result_of_execution = command()
except ProcessExecutionError: except ProcessExecutionError:
# in case if commit is untrusted with local.env(
result_of_execution = command(retcode=1) CAS_API_KEY=self._cas_api_key,
SIGNER_ID=self._cas_signer_id
):
# in case if commit is untrusted
result_of_execution = command(retcode=1)
json_result = json.loads(result_of_execution) json_result = json.loads(result_of_execution)
if return_json: if return_json:
return json_result return json_result
return not bool(json_result["status"]) return not bool(json_result['status'])
def authenticate_source( def authenticate_source(
self, self,
local_path: str, local_path: str,
signer_id: str = "", signer_id: str = None,
) -> typing.Tuple[bool, typing.Optional[str]]: ) -> typing.Tuple[bool, typing.Optional[str]]:
""" """
Authenticates source by git path. Authenticates source by git path.
Returns authenticate result and source commit hash. Returns authenticate result and source commit hash.
""" """
is_authenticated = False is_authenticated = False
commit_vcn_hash = None commit_cas_hash = None
self.ensure_login() self.ensure_login()
try: try:
result_json = self.authenticate( result_json = self.authenticate(
local_path, local_path,
return_json=True, return_json=True,
signer_id=signer_id, signer_id=signer_id
) )
is_authenticated = result_json["verified"] is_authenticated = result_json['verified']
commit_vcn_hash = result_json["hash"] commit_cas_hash = result_json['hash']
# we can fall with ProcessExecutionError, # we can fall with ProcessExecutionError,
# because source can be not notarized # because source can be not notarized
except ProcessExecutionError: except ProcessExecutionError:
self._logger.exception("Cannot authenticate: %s", local_path) self._logger.exception('Cannot authenticate: %s', local_path)
return is_authenticated, commit_vcn_hash return is_authenticated, commit_cas_hash
def authenticate_artifact( def authenticate_artifact(
self, self,
local_path: str, local_path: str,
use_hash: bool = False, use_hash: bool = False,
signer_id: str = "", signer_id: str = None,
) -> bool: ) -> bool:
""" """
Authenticates artifact by artifact path or hash if `use_hash` is True. Authenticates artifact by artifact path or hash if `use_hash` is True.
@ -212,12 +189,12 @@ class CasWrapper:
local_path, local_path,
use_hash=use_hash, use_hash=use_hash,
return_json=True, return_json=True,
signer_id=signer_id, signer_id=signer_id
)["verified"] )['verified']
# we can fall with ProcessExecutionError, # we can fall with ProcessExecutionError,
# because artifact can be not notarized # because artifact can be not notarized
except ProcessExecutionError: except ProcessExecutionError:
self._logger.exception("Cannot authenticate: %s", local_path) self._logger.exception('Cannot authenticate: %s', local_path)
return is_authenticated return is_authenticated
def notarize_artifacts( def notarize_artifacts(
@ -227,8 +204,8 @@ class CasWrapper:
) -> typing.Tuple[bool, typing.Dict[str, str]]: ) -> typing.Tuple[bool, typing.Dict[str, str]]:
""" """
Notarize artifacts by their paths. Notarize artifacts by their paths.
Returns `True` if all artifacts was successful notarizated Returns `True` if all artifacts was succesful notarizated
and dict with VCN hashes. and dict with CAS hashes.
""" """
all_artifacts_is_notarized = True all_artifacts_is_notarized = True
notarized_artifacts = {} notarized_artifacts = {}
@ -241,13 +218,11 @@ class CasWrapper:
# resolved in CAS itself. # resolved in CAS itself.
for artifact_path in artifact_paths: for artifact_path in artifact_paths:
try: try:
vcn_artifact_hash = self.notarize(artifact_path, metadata) cas_artifact_hash = self.notarize(artifact_path, metadata)
except Exception: except Exception:
self._logger.exception( self._logger.exception('Cannot notarize artifact: %s',
"Cannot notarize artifact: %s", artifact_path)
artifact_path,
)
all_artifacts_is_notarized = False all_artifacts_is_notarized = False
continue continue
notarized_artifacts[artifact_path] = vcn_artifact_hash notarized_artifacts[artifact_path] = cas_artifact_hash
return all_artifacts_is_notarized, notarized_artifacts return all_artifacts_is_notarized, notarized_artifacts

View File

@ -2,10 +2,10 @@ from setuptools import setup
setup( setup(
name="cas_wrapper", name="cas_wrapper",
version="0.0.7", version="0.0.6",
author="Stepan Oksanichenko", author="Stepan Oksanichenko",
author_email="soksanichenko@almalinux.org", author_email="soksanichenko@almalinux.org",
description="The python wrapper around binary vcn from " description="The python wrapper around binary cas from "
"project Codenotary Community Attestation Service.", "project Codenotary Community Attestation Service.",
url="https://git.almalinux.org/almalinux/cas_wrapper", url="https://git.almalinux.org/almalinux/cas_wrapper",
project_urls={ project_urls={