From 786c9977ff694d9bf898e6fcea32491fc7e85bf7 Mon Sep 17 00:00:00 2001 From: Daniil Anfimov Date: Wed, 7 Jun 2023 16:09:15 +0200 Subject: [PATCH] ALBS-1122: Replace notarization interface from CAS to VCN --- README.md | 2 +- cas_wrapper.py | 125 ++++++++++++++++++++++++++----------------------- setup.py | 4 +- 3 files changed, 70 insertions(+), 61 deletions(-) diff --git a/README.md b/README.md index b8c0491..20dad34 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,3 @@ # cas_wrapper -The python wrapper around binary `cas` from project Codenotary Community Attestation Service. +The python wrapper around binary `vcn` from project Codenotary Community Attestation Service. diff --git a/cas_wrapper.py b/cas_wrapper.py index 976995a..1f66e6f 100644 --- a/cas_wrapper.py +++ b/cas_wrapper.py @@ -1,66 +1,85 @@ import json import logging import typing +from pathlib import Path +from functools import wraps from plumbum import local, ProcessExecutionError +def with_env_context(func): + @wraps(func) + def wrapper(self, *args, **kwargs): + if not isinstance(self, CasWrapper): + raise TypeError( + 'Cannot use "with_env_context" decorator outside of CasWrapper instance' + ) + with local.env( + VCN_LC_HOST=self._vcn_lc_host, + VCN_LC_API_KEY=self._vcn_lc_api_key, + VCN_LC_PORT=self._vcn_lc_port, + ): + return func(self, *args, **kwargs) + return wrapper + + class CasWrapper: """ - The python wrapper around binary `cas` + The python wrapper around binary `vcn` from Codenotary Community Attestation Service """ - binary_name = 'cas' - - @classmethod - def _is_binary_present(cls): - if cls.binary_name not in local: + def _is_binary_present(self): + if not self._full_binary_path.exists(): raise FileNotFoundError( - 'Binary CAS is not found in PATH on the machine', + f'Binary VCN is not found in {self._binary_path} on the machine', ) def __init__( self, - cas_api_key: str, - cas_signer_id: str, + vcn_lc_api_key: str, + vcn_lc_host: str = "eval-honeywell.codenotary.com", + vcn_lc_port: int = 443, logger: logging.Logger = None, + binary_name: str = "vcn", + binary_path: str = "/usr/local/bin/", ): - self._is_binary_present() - self._cas_api_key = cas_api_key - self._cas_signer_id = cas_signer_id - self._cas = local['cas'] + self._vcn_lc_api_key = vcn_lc_api_key + self._vcn_lc_host = vcn_lc_host + self._vcn_lc_port = vcn_lc_port + self._binary_name = binary_name + self._binary_path = binary_path + self._full_binary_path = Path(self._binary_path, self._binary_name) + self._vcn = local[self._full_binary_path] self._logger = logger if self._logger is None: self._logger = logging.getLogger() + self._is_binary_present() - @classmethod - def get_version(cls): - cls._is_binary_present() - command = local['cas']['--version'] + def get_version(self): + self._is_binary_present() + command = self._vcn['--version'] version = command().split()[-1].split('v')[1] return version + @with_env_context def ensure_login(self): - with local.env( - CAS_API_KEY=self._cas_api_key, - SIGNER_ID=self._cas_signer_id, - ): - self._cas['login']() + self._vcn['login']() + @with_env_context def notarize( self, local_path: str, metadata: typing.Dict = None, ) -> str: """ - Wrapper around `cas notarize` + Wrapper around `vcn notarize` :param local_path: path to a local Git repo :param metadata: additional metadata :return: hash of notarized commit :rtype: str """ - command = self._cas[ + command = self._vcn[ 'notarize', local_path, '-o', @@ -72,12 +91,9 @@ class CasWrapper: '-a', f'{key}={value}', ] - with local.env( - CAS_API_KEY=self._cas_api_key, - SIGNER_ID=self._cas_signer_id - ): - result_of_execution = command() - return json.loads(result_of_execution)['hash'] + result_of_execution = command() + result_of_execution, *_ = json.loads(result_of_execution) + return result_of_execution['hash'] def notarize_no_exc( self, @@ -95,23 +111,24 @@ class CasWrapper: """ success = False try: - cas_hash = self.notarize(local_path, metadata=metadata) + vcn_hash = self.notarize(local_path, metadata=metadata) success = True except Exception: self._logger.exception('Cannot notarize artifact: %s', local_path) - cas_hash = '' - return success, cas_hash + vcn_hash = '' + return success, vcn_hash + @with_env_context def authenticate( self, local_path: str, return_json: bool = False, use_hash: bool = False, - signer_id: str = None, - ): + signer_id: str = "", + ) -> typing.Union[bool, dict]: """ - Wrapper around `cas authenticate` + Wrapper around `vcn authenticate` :param local_path: path to a local Git repo (should be started from `git://`) or to a single local file or hash @@ -127,20 +144,12 @@ class CasWrapper: if signer_id: command_args.extend(('--signerID', signer_id)) command_args.extend(('-o', 'json')) - command = self._cas[command_args] + command = self._vcn[command_args] try: - with local.env( - CAS_API_KEY=self._cas_api_key, - SIGNER_ID=self._cas_signer_id - ): - result_of_execution = command() + result_of_execution = command() except ProcessExecutionError: - with local.env( - CAS_API_KEY=self._cas_api_key, - SIGNER_ID=self._cas_signer_id - ): - # in case if commit is untrusted - result_of_execution = command(retcode=1) + # in case if commit is untrusted + result_of_execution = command(retcode=1) json_result = json.loads(result_of_execution) if return_json: return json_result @@ -149,34 +158,34 @@ class CasWrapper: def authenticate_source( self, local_path: str, - signer_id: str = None, + signer_id: str = "", ) -> typing.Tuple[bool, typing.Optional[str]]: """ Authenticates source by git path. Returns authenticate result and source commit hash. """ is_authenticated = False - commit_cas_hash = None + commit_vcn_hash = None self.ensure_login() try: result_json = self.authenticate( local_path, return_json=True, - signer_id=signer_id + signer_id=signer_id, ) is_authenticated = result_json['verified'] - commit_cas_hash = result_json['hash'] + commit_vcn_hash = result_json['hash'] # we can fall with ProcessExecutionError, # because source can be not notarized except ProcessExecutionError: self._logger.exception('Cannot authenticate: %s', local_path) - return is_authenticated, commit_cas_hash + return is_authenticated, commit_vcn_hash def authenticate_artifact( self, local_path: str, use_hash: bool = False, - signer_id: str = None, + signer_id: str = "", ) -> bool: """ Authenticates artifact by artifact path or hash if `use_hash` is True. @@ -204,8 +213,8 @@ class CasWrapper: ) -> typing.Tuple[bool, typing.Dict[str, str]]: """ Notarize artifacts by their paths. - Returns `True` if all artifacts was succesful notarizated - and dict with CAS hashes. + Returns `True` if all artifacts was successful notarizated + and dict with VCN hashes. """ all_artifacts_is_notarized = True notarized_artifacts = {} @@ -218,11 +227,11 @@ class CasWrapper: # resolved in CAS itself. for artifact_path in artifact_paths: try: - cas_artifact_hash = self.notarize(artifact_path, metadata) + vcn_artifact_hash = self.notarize(artifact_path, metadata) except Exception: self._logger.exception('Cannot notarize artifact: %s', artifact_path) all_artifacts_is_notarized = False continue - notarized_artifacts[artifact_path] = cas_artifact_hash + notarized_artifacts[artifact_path] = vcn_artifact_hash return all_artifacts_is_notarized, notarized_artifacts diff --git a/setup.py b/setup.py index 13f550f..d1c9665 100644 --- a/setup.py +++ b/setup.py @@ -2,10 +2,10 @@ from setuptools import setup setup( name="cas_wrapper", - version="0.0.6", + version="0.0.7", author="Stepan Oksanichenko", author_email="soksanichenko@almalinux.org", - description="The python wrapper around binary cas from " + description="The python wrapper around binary vcn from " "project Codenotary Community Attestation Service.", url="https://git.almalinux.org/almalinux/cas_wrapper", project_urls={