ALBS-1122: Replace notarization interface from CAS to VCN

This commit is contained in:
Daniil Anfimov 2023-06-07 16:09:15 +02:00
parent d9c09a5d58
commit 786c9977ff
3 changed files with 70 additions and 61 deletions

View File

@ -1,3 +1,3 @@
# cas_wrapper
The python wrapper around binary `cas` from project Codenotary Community Attestation Service.
The python wrapper around binary `vcn` from project Codenotary Community Attestation Service.

View File

@ -1,66 +1,85 @@
import json
import logging
import typing
from pathlib import Path
from functools import wraps
from plumbum import local, ProcessExecutionError
def with_env_context(func):
@wraps(func)
def wrapper(self, *args, **kwargs):
if not isinstance(self, CasWrapper):
raise TypeError(
'Cannot use "with_env_context" decorator outside of CasWrapper instance'
)
with local.env(
VCN_LC_HOST=self._vcn_lc_host,
VCN_LC_API_KEY=self._vcn_lc_api_key,
VCN_LC_PORT=self._vcn_lc_port,
):
return func(self, *args, **kwargs)
return wrapper
class CasWrapper:
"""
The python wrapper around binary `cas`
The python wrapper around binary `vcn`
from Codenotary Community Attestation Service
"""
binary_name = 'cas'
@classmethod
def _is_binary_present(cls):
if cls.binary_name not in local:
def _is_binary_present(self):
if not self._full_binary_path.exists():
raise FileNotFoundError(
'Binary CAS is not found in PATH on the machine',
f'Binary VCN is not found in {self._binary_path} on the machine',
)
def __init__(
self,
cas_api_key: str,
cas_signer_id: str,
vcn_lc_api_key: str,
vcn_lc_host: str = "eval-honeywell.codenotary.com",
vcn_lc_port: int = 443,
logger: logging.Logger = None,
binary_name: str = "vcn",
binary_path: str = "/usr/local/bin/",
):
self._is_binary_present()
self._cas_api_key = cas_api_key
self._cas_signer_id = cas_signer_id
self._cas = local['cas']
self._vcn_lc_api_key = vcn_lc_api_key
self._vcn_lc_host = vcn_lc_host
self._vcn_lc_port = vcn_lc_port
self._binary_name = binary_name
self._binary_path = binary_path
self._full_binary_path = Path(self._binary_path, self._binary_name)
self._vcn = local[self._full_binary_path]
self._logger = logger
if self._logger is None:
self._logger = logging.getLogger()
self._is_binary_present()
@classmethod
def get_version(cls):
cls._is_binary_present()
command = local['cas']['--version']
def get_version(self):
self._is_binary_present()
command = self._vcn['--version']
version = command().split()[-1].split('v')[1]
return version
@with_env_context
def ensure_login(self):
with local.env(
CAS_API_KEY=self._cas_api_key,
SIGNER_ID=self._cas_signer_id,
):
self._cas['login']()
self._vcn['login']()
@with_env_context
def notarize(
self,
local_path: str,
metadata: typing.Dict = None,
) -> str:
"""
Wrapper around `cas notarize`
Wrapper around `vcn notarize`
:param local_path: path to a local Git repo
:param metadata: additional metadata
:return: hash of notarized commit
:rtype: str
"""
command = self._cas[
command = self._vcn[
'notarize',
local_path,
'-o',
@ -72,12 +91,9 @@ class CasWrapper:
'-a',
f'{key}={value}',
]
with local.env(
CAS_API_KEY=self._cas_api_key,
SIGNER_ID=self._cas_signer_id
):
result_of_execution = command()
return json.loads(result_of_execution)['hash']
result_of_execution = command()
result_of_execution, *_ = json.loads(result_of_execution)
return result_of_execution['hash']
def notarize_no_exc(
self,
@ -95,23 +111,24 @@ class CasWrapper:
"""
success = False
try:
cas_hash = self.notarize(local_path, metadata=metadata)
vcn_hash = self.notarize(local_path, metadata=metadata)
success = True
except Exception:
self._logger.exception('Cannot notarize artifact: %s',
local_path)
cas_hash = ''
return success, cas_hash
vcn_hash = ''
return success, vcn_hash
@with_env_context
def authenticate(
self,
local_path: str,
return_json: bool = False,
use_hash: bool = False,
signer_id: str = None,
):
signer_id: str = "",
) -> typing.Union[bool, dict]:
"""
Wrapper around `cas authenticate`
Wrapper around `vcn authenticate`
:param local_path: path to a local Git repo
(should be started from `git://`)
or to a single local file or hash
@ -127,20 +144,12 @@ class CasWrapper:
if signer_id:
command_args.extend(('--signerID', signer_id))
command_args.extend(('-o', 'json'))
command = self._cas[command_args]
command = self._vcn[command_args]
try:
with local.env(
CAS_API_KEY=self._cas_api_key,
SIGNER_ID=self._cas_signer_id
):
result_of_execution = command()
result_of_execution = command()
except ProcessExecutionError:
with local.env(
CAS_API_KEY=self._cas_api_key,
SIGNER_ID=self._cas_signer_id
):
# in case if commit is untrusted
result_of_execution = command(retcode=1)
# in case if commit is untrusted
result_of_execution = command(retcode=1)
json_result = json.loads(result_of_execution)
if return_json:
return json_result
@ -149,34 +158,34 @@ class CasWrapper:
def authenticate_source(
self,
local_path: str,
signer_id: str = None,
signer_id: str = "",
) -> typing.Tuple[bool, typing.Optional[str]]:
"""
Authenticates source by git path.
Returns authenticate result and source commit hash.
"""
is_authenticated = False
commit_cas_hash = None
commit_vcn_hash = None
self.ensure_login()
try:
result_json = self.authenticate(
local_path,
return_json=True,
signer_id=signer_id
signer_id=signer_id,
)
is_authenticated = result_json['verified']
commit_cas_hash = result_json['hash']
commit_vcn_hash = result_json['hash']
# we can fall with ProcessExecutionError,
# because source can be not notarized
except ProcessExecutionError:
self._logger.exception('Cannot authenticate: %s', local_path)
return is_authenticated, commit_cas_hash
return is_authenticated, commit_vcn_hash
def authenticate_artifact(
self,
local_path: str,
use_hash: bool = False,
signer_id: str = None,
signer_id: str = "",
) -> bool:
"""
Authenticates artifact by artifact path or hash if `use_hash` is True.
@ -204,8 +213,8 @@ class CasWrapper:
) -> typing.Tuple[bool, typing.Dict[str, str]]:
"""
Notarize artifacts by their paths.
Returns `True` if all artifacts was succesful notarizated
and dict with CAS hashes.
Returns `True` if all artifacts was successful notarizated
and dict with VCN hashes.
"""
all_artifacts_is_notarized = True
notarized_artifacts = {}
@ -218,11 +227,11 @@ class CasWrapper:
# resolved in CAS itself.
for artifact_path in artifact_paths:
try:
cas_artifact_hash = self.notarize(artifact_path, metadata)
vcn_artifact_hash = self.notarize(artifact_path, metadata)
except Exception:
self._logger.exception('Cannot notarize artifact: %s',
artifact_path)
all_artifacts_is_notarized = False
continue
notarized_artifacts[artifact_path] = cas_artifact_hash
notarized_artifacts[artifact_path] = vcn_artifact_hash
return all_artifacts_is_notarized, notarized_artifacts

View File

@ -2,10 +2,10 @@ from setuptools import setup
setup(
name="cas_wrapper",
version="0.0.6",
version="0.0.7",
author="Stepan Oksanichenko",
author_email="soksanichenko@almalinux.org",
description="The python wrapper around binary cas from "
description="The python wrapper around binary vcn from "
"project Codenotary Community Attestation Service.",
url="https://git.almalinux.org/almalinux/cas_wrapper",
project_urls={