From b5a2b87568d726b2b6154f1cacd0c21eefd80a71 Mon Sep 17 00:00:00 2001 From: Daniil Anfimov Date: Thu, 22 Jun 2023 13:37:34 +0200 Subject: [PATCH] Revert "ALBS-1122: Replace notarization interface from CAS to VCN (#7)" This reverts commit d68685bf72db747be540399d66b9312d549b3172. --- README.md | 2 +- cas_wrapper.py | 203 ++++++++++++++++++++++--------------------------- setup.py | 4 +- 3 files changed, 92 insertions(+), 117 deletions(-) diff --git a/README.md b/README.md index 20dad34..b8c0491 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,3 @@ # cas_wrapper -The python wrapper around binary `vcn` from project Codenotary Community Attestation Service. +The python wrapper around binary `cas` from project Codenotary Community Attestation Service. diff --git a/cas_wrapper.py b/cas_wrapper.py index 335c2b3..976995a 100644 --- a/cas_wrapper.py +++ b/cas_wrapper.py @@ -1,119 +1,88 @@ import json import logging import typing -from functools import wraps -from pathlib import Path -from plumbum import ProcessExecutionError, local - -DEFAULT_BINARY_NAME = "vcn" -DEFAULT_BINARY_PATH = "/usr/local/bin/" - - -def with_env_context(func): - @wraps(func) - def wrapper(self, *args, **kwargs): - if not isinstance(self, CasWrapper): - raise TypeError( - 'Cannot use "with_env_context" decorator outside of CasWrapper instance' - ) - with local.env( - VCN_LC_HOST=self._vcn_lc_host, - VCN_LC_API_KEY=self._vcn_lc_api_key, - VCN_LC_PORT=self._vcn_lc_port, - ): - return func(self, *args, **kwargs) - - return wrapper +from plumbum import local, ProcessExecutionError class CasWrapper: """ - The python wrapper around binary `vcn` + The python wrapper around binary `cas` from Codenotary Community Attestation Service """ + binary_name = 'cas' + @classmethod - def _is_binary_present( - cls, - binary_name: str, - binary_path: str, - ): - if not Path(binary_path, binary_name).exists(): + def _is_binary_present(cls): + if cls.binary_name not in local: raise FileNotFoundError( - f"Binary VCN is not found in {binary_path} on the machine", + 'Binary CAS is not found in PATH on the machine', ) def __init__( - self, - vcn_lc_api_key: str, - vcn_lc_host: str = "eval-honeywell.codenotary.com", - vcn_lc_port: int = 443, - logger: logging.Logger = None, - binary_name: str = DEFAULT_BINARY_NAME, - binary_path: str = DEFAULT_BINARY_PATH, + self, + cas_api_key: str, + cas_signer_id: str, + logger: logging.Logger = None, ): - self._is_binary_present(binary_name, binary_path) - self._vcn_lc_api_key = vcn_lc_api_key - self._vcn_lc_host = vcn_lc_host - self._vcn_lc_port = vcn_lc_port - self._binary_name = binary_name - self._binary_path = binary_path - self._full_binary_path = Path(self._binary_path, self._binary_name) - self._vcn = local[str(self._full_binary_path)] + self._is_binary_present() + self._cas_api_key = cas_api_key + self._cas_signer_id = cas_signer_id + self._cas = local['cas'] self._logger = logger if self._logger is None: self._logger = logging.getLogger() @classmethod - def get_version( - cls, - binary_name: str = DEFAULT_BINARY_NAME, - binary_path: str = DEFAULT_BINARY_PATH, - ): - cls._is_binary_present(binary_name, binary_path) - full_path = Path(binary_path, binary_name) - command = local[str(full_path)]["--version"] - version = command().split()[-1].split("v")[1] + def get_version(cls): + cls._is_binary_present() + command = local['cas']['--version'] + version = command().split()[-1].split('v')[1] return version - @with_env_context def ensure_login(self): - self._vcn["login"]() + with local.env( + CAS_API_KEY=self._cas_api_key, + SIGNER_ID=self._cas_signer_id, + ): + self._cas['login']() - @with_env_context def notarize( - self, - local_path: str, - metadata: typing.Dict = None, + self, + local_path: str, + metadata: typing.Dict = None, ) -> str: """ - Wrapper around `vcn notarize` + Wrapper around `cas notarize` :param local_path: path to a local Git repo :param metadata: additional metadata :return: hash of notarized commit :rtype: str """ - command = self._vcn[ - "notarize", + command = self._cas[ + 'notarize', local_path, - "-o", - "json", + '-o', + 'json', ] if metadata is not None: for key, value in metadata.items(): command = command[ - "-a", - f"{key}={value}", + '-a', + f'{key}={value}', ] - result_of_execution = command() - result_of_execution, *_ = json.loads(result_of_execution) - return result_of_execution["hash"] + with local.env( + CAS_API_KEY=self._cas_api_key, + SIGNER_ID=self._cas_signer_id + ): + result_of_execution = command() + return json.loads(result_of_execution)['hash'] def notarize_no_exc( - self, - local_path: str, - metadata: typing.Dict = None, + self, + local_path: str, + metadata: typing.Dict = None, ) -> typing.Tuple[bool, str]: """ Wrapper for avoiding raising exceptions during notarization. @@ -126,23 +95,23 @@ class CasWrapper: """ success = False try: - vcn_hash = self.notarize(local_path, metadata=metadata) + cas_hash = self.notarize(local_path, metadata=metadata) success = True except Exception: - self._logger.exception("Cannot notarize artifact: %s", local_path) - vcn_hash = "" - return success, vcn_hash + self._logger.exception('Cannot notarize artifact: %s', + local_path) + cas_hash = '' + return success, cas_hash - @with_env_context def authenticate( - self, - local_path: str, - return_json: bool = False, - use_hash: bool = False, - signer_id: str = "", - ) -> typing.Union[bool, dict]: + self, + local_path: str, + return_json: bool = False, + use_hash: bool = False, + signer_id: str = None, + ): """ - Wrapper around `vcn authenticate` + Wrapper around `cas authenticate` :param local_path: path to a local Git repo (should be started from `git://`) or to a single local file or hash @@ -152,54 +121,62 @@ class CasWrapper: or dict with result if return_json param is True :rtype: bool or dict """ - command_args = ["authenticate", local_path] + command_args = ['authenticate', local_path] if use_hash: - command_args = ["authenticate", "--hash", local_path] + command_args = ['authenticate', '--hash', local_path] if signer_id: - command_args.extend(("--signerID", signer_id)) - command_args.extend(("-o", "json")) - command = self._vcn[command_args] + command_args.extend(('--signerID', signer_id)) + command_args.extend(('-o', 'json')) + command = self._cas[command_args] try: - result_of_execution = command() + with local.env( + CAS_API_KEY=self._cas_api_key, + SIGNER_ID=self._cas_signer_id + ): + result_of_execution = command() except ProcessExecutionError: - # in case if commit is untrusted - result_of_execution = command(retcode=1) + with local.env( + CAS_API_KEY=self._cas_api_key, + SIGNER_ID=self._cas_signer_id + ): + # in case if commit is untrusted + result_of_execution = command(retcode=1) json_result = json.loads(result_of_execution) if return_json: return json_result - return not bool(json_result["status"]) + return not bool(json_result['status']) def authenticate_source( self, local_path: str, - signer_id: str = "", + signer_id: str = None, ) -> typing.Tuple[bool, typing.Optional[str]]: """ Authenticates source by git path. Returns authenticate result and source commit hash. """ is_authenticated = False - commit_vcn_hash = None + commit_cas_hash = None self.ensure_login() try: result_json = self.authenticate( local_path, return_json=True, - signer_id=signer_id, + signer_id=signer_id ) - is_authenticated = result_json["verified"] - commit_vcn_hash = result_json["hash"] + is_authenticated = result_json['verified'] + commit_cas_hash = result_json['hash'] # we can fall with ProcessExecutionError, # because source can be not notarized except ProcessExecutionError: - self._logger.exception("Cannot authenticate: %s", local_path) - return is_authenticated, commit_vcn_hash + self._logger.exception('Cannot authenticate: %s', local_path) + return is_authenticated, commit_cas_hash def authenticate_artifact( self, local_path: str, use_hash: bool = False, - signer_id: str = "", + signer_id: str = None, ) -> bool: """ Authenticates artifact by artifact path or hash if `use_hash` is True. @@ -212,12 +189,12 @@ class CasWrapper: local_path, use_hash=use_hash, return_json=True, - signer_id=signer_id, - )["verified"] + signer_id=signer_id + )['verified'] # we can fall with ProcessExecutionError, # because artifact can be not notarized except ProcessExecutionError: - self._logger.exception("Cannot authenticate: %s", local_path) + self._logger.exception('Cannot authenticate: %s', local_path) return is_authenticated def notarize_artifacts( @@ -227,8 +204,8 @@ class CasWrapper: ) -> typing.Tuple[bool, typing.Dict[str, str]]: """ Notarize artifacts by their paths. - Returns `True` if all artifacts was successful notarizated - and dict with VCN hashes. + Returns `True` if all artifacts was succesful notarizated + and dict with CAS hashes. """ all_artifacts_is_notarized = True notarized_artifacts = {} @@ -241,13 +218,11 @@ class CasWrapper: # resolved in CAS itself. for artifact_path in artifact_paths: try: - vcn_artifact_hash = self.notarize(artifact_path, metadata) + cas_artifact_hash = self.notarize(artifact_path, metadata) except Exception: - self._logger.exception( - "Cannot notarize artifact: %s", - artifact_path, - ) + self._logger.exception('Cannot notarize artifact: %s', + artifact_path) all_artifacts_is_notarized = False continue - notarized_artifacts[artifact_path] = vcn_artifact_hash + notarized_artifacts[artifact_path] = cas_artifact_hash return all_artifacts_is_notarized, notarized_artifacts diff --git a/setup.py b/setup.py index d1c9665..13f550f 100644 --- a/setup.py +++ b/setup.py @@ -2,10 +2,10 @@ from setuptools import setup setup( name="cas_wrapper", - version="0.0.7", + version="0.0.6", author="Stepan Oksanichenko", author_email="soksanichenko@almalinux.org", - description="The python wrapper around binary vcn from " + description="The python wrapper around binary cas from " "project Codenotary Community Attestation Service.", url="https://git.almalinux.org/almalinux/cas_wrapper", project_urls={