From d68685bf72db747be540399d66b9312d549b3172 Mon Sep 17 00:00:00 2001 From: danfimov Date: Tue, 20 Jun 2023 14:43:27 +0000 Subject: [PATCH] ALBS-1122: Replace notarization interface from CAS to VCN (#7) Co-authored-by: Daniil Anfimov Reviewed-on: https://git.almalinux.org/almalinux/cas_wrapper/pulls/7 --- README.md | 2 +- cas_wrapper.py | 203 +++++++++++++++++++++++++++---------------------- setup.py | 4 +- 3 files changed, 117 insertions(+), 92 deletions(-) diff --git a/README.md b/README.md index b8c0491..20dad34 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,3 @@ # cas_wrapper -The python wrapper around binary `cas` from project Codenotary Community Attestation Service. +The python wrapper around binary `vcn` from project Codenotary Community Attestation Service. diff --git a/cas_wrapper.py b/cas_wrapper.py index 976995a..335c2b3 100644 --- a/cas_wrapper.py +++ b/cas_wrapper.py @@ -1,88 +1,119 @@ import json import logging import typing +from functools import wraps +from pathlib import Path -from plumbum import local, ProcessExecutionError +from plumbum import ProcessExecutionError, local + +DEFAULT_BINARY_NAME = "vcn" +DEFAULT_BINARY_PATH = "/usr/local/bin/" + + +def with_env_context(func): + @wraps(func) + def wrapper(self, *args, **kwargs): + if not isinstance(self, CasWrapper): + raise TypeError( + 'Cannot use "with_env_context" decorator outside of CasWrapper instance' + ) + with local.env( + VCN_LC_HOST=self._vcn_lc_host, + VCN_LC_API_KEY=self._vcn_lc_api_key, + VCN_LC_PORT=self._vcn_lc_port, + ): + return func(self, *args, **kwargs) + + return wrapper class CasWrapper: """ - The python wrapper around binary `cas` + The python wrapper around binary `vcn` from Codenotary Community Attestation Service """ - binary_name = 'cas' - @classmethod - def _is_binary_present(cls): - if cls.binary_name not in local: + def _is_binary_present( + cls, + binary_name: str, + binary_path: str, + ): + if not Path(binary_path, binary_name).exists(): raise FileNotFoundError( - 'Binary CAS is not found in PATH on the machine', + f"Binary VCN is not found in {binary_path} on the machine", ) def __init__( - self, - cas_api_key: str, - cas_signer_id: str, - logger: logging.Logger = None, + self, + vcn_lc_api_key: str, + vcn_lc_host: str = "eval-honeywell.codenotary.com", + vcn_lc_port: int = 443, + logger: logging.Logger = None, + binary_name: str = DEFAULT_BINARY_NAME, + binary_path: str = DEFAULT_BINARY_PATH, ): - self._is_binary_present() - self._cas_api_key = cas_api_key - self._cas_signer_id = cas_signer_id - self._cas = local['cas'] + self._is_binary_present(binary_name, binary_path) + self._vcn_lc_api_key = vcn_lc_api_key + self._vcn_lc_host = vcn_lc_host + self._vcn_lc_port = vcn_lc_port + self._binary_name = binary_name + self._binary_path = binary_path + self._full_binary_path = Path(self._binary_path, self._binary_name) + self._vcn = local[str(self._full_binary_path)] self._logger = logger if self._logger is None: self._logger = logging.getLogger() @classmethod - def get_version(cls): - cls._is_binary_present() - command = local['cas']['--version'] - version = command().split()[-1].split('v')[1] + def get_version( + cls, + binary_name: str = DEFAULT_BINARY_NAME, + binary_path: str = DEFAULT_BINARY_PATH, + ): + cls._is_binary_present(binary_name, binary_path) + full_path = Path(binary_path, binary_name) + command = local[str(full_path)]["--version"] + version = command().split()[-1].split("v")[1] return version + @with_env_context def ensure_login(self): - with local.env( - CAS_API_KEY=self._cas_api_key, - SIGNER_ID=self._cas_signer_id, - ): - self._cas['login']() + self._vcn["login"]() + @with_env_context def notarize( - self, - local_path: str, - metadata: typing.Dict = None, + self, + local_path: str, + metadata: typing.Dict = None, ) -> str: """ - Wrapper around `cas notarize` + Wrapper around `vcn notarize` :param local_path: path to a local Git repo :param metadata: additional metadata :return: hash of notarized commit :rtype: str """ - command = self._cas[ - 'notarize', + command = self._vcn[ + "notarize", local_path, - '-o', - 'json', + "-o", + "json", ] if metadata is not None: for key, value in metadata.items(): command = command[ - '-a', - f'{key}={value}', + "-a", + f"{key}={value}", ] - with local.env( - CAS_API_KEY=self._cas_api_key, - SIGNER_ID=self._cas_signer_id - ): - result_of_execution = command() - return json.loads(result_of_execution)['hash'] + result_of_execution = command() + result_of_execution, *_ = json.loads(result_of_execution) + return result_of_execution["hash"] def notarize_no_exc( - self, - local_path: str, - metadata: typing.Dict = None, + self, + local_path: str, + metadata: typing.Dict = None, ) -> typing.Tuple[bool, str]: """ Wrapper for avoiding raising exceptions during notarization. @@ -95,23 +126,23 @@ class CasWrapper: """ success = False try: - cas_hash = self.notarize(local_path, metadata=metadata) + vcn_hash = self.notarize(local_path, metadata=metadata) success = True except Exception: - self._logger.exception('Cannot notarize artifact: %s', - local_path) - cas_hash = '' - return success, cas_hash + self._logger.exception("Cannot notarize artifact: %s", local_path) + vcn_hash = "" + return success, vcn_hash + @with_env_context def authenticate( - self, - local_path: str, - return_json: bool = False, - use_hash: bool = False, - signer_id: str = None, - ): + self, + local_path: str, + return_json: bool = False, + use_hash: bool = False, + signer_id: str = "", + ) -> typing.Union[bool, dict]: """ - Wrapper around `cas authenticate` + Wrapper around `vcn authenticate` :param local_path: path to a local Git repo (should be started from `git://`) or to a single local file or hash @@ -121,62 +152,54 @@ class CasWrapper: or dict with result if return_json param is True :rtype: bool or dict """ - command_args = ['authenticate', local_path] + command_args = ["authenticate", local_path] if use_hash: - command_args = ['authenticate', '--hash', local_path] + command_args = ["authenticate", "--hash", local_path] if signer_id: - command_args.extend(('--signerID', signer_id)) - command_args.extend(('-o', 'json')) - command = self._cas[command_args] + command_args.extend(("--signerID", signer_id)) + command_args.extend(("-o", "json")) + command = self._vcn[command_args] try: - with local.env( - CAS_API_KEY=self._cas_api_key, - SIGNER_ID=self._cas_signer_id - ): - result_of_execution = command() + result_of_execution = command() except ProcessExecutionError: - with local.env( - CAS_API_KEY=self._cas_api_key, - SIGNER_ID=self._cas_signer_id - ): - # in case if commit is untrusted - result_of_execution = command(retcode=1) + # in case if commit is untrusted + result_of_execution = command(retcode=1) json_result = json.loads(result_of_execution) if return_json: return json_result - return not bool(json_result['status']) + return not bool(json_result["status"]) def authenticate_source( self, local_path: str, - signer_id: str = None, + signer_id: str = "", ) -> typing.Tuple[bool, typing.Optional[str]]: """ Authenticates source by git path. Returns authenticate result and source commit hash. """ is_authenticated = False - commit_cas_hash = None + commit_vcn_hash = None self.ensure_login() try: result_json = self.authenticate( local_path, return_json=True, - signer_id=signer_id + signer_id=signer_id, ) - is_authenticated = result_json['verified'] - commit_cas_hash = result_json['hash'] + is_authenticated = result_json["verified"] + commit_vcn_hash = result_json["hash"] # we can fall with ProcessExecutionError, # because source can be not notarized except ProcessExecutionError: - self._logger.exception('Cannot authenticate: %s', local_path) - return is_authenticated, commit_cas_hash + self._logger.exception("Cannot authenticate: %s", local_path) + return is_authenticated, commit_vcn_hash def authenticate_artifact( self, local_path: str, use_hash: bool = False, - signer_id: str = None, + signer_id: str = "", ) -> bool: """ Authenticates artifact by artifact path or hash if `use_hash` is True. @@ -189,12 +212,12 @@ class CasWrapper: local_path, use_hash=use_hash, return_json=True, - signer_id=signer_id - )['verified'] + signer_id=signer_id, + )["verified"] # we can fall with ProcessExecutionError, # because artifact can be not notarized except ProcessExecutionError: - self._logger.exception('Cannot authenticate: %s', local_path) + self._logger.exception("Cannot authenticate: %s", local_path) return is_authenticated def notarize_artifacts( @@ -204,8 +227,8 @@ class CasWrapper: ) -> typing.Tuple[bool, typing.Dict[str, str]]: """ Notarize artifacts by their paths. - Returns `True` if all artifacts was succesful notarizated - and dict with CAS hashes. + Returns `True` if all artifacts was successful notarizated + and dict with VCN hashes. """ all_artifacts_is_notarized = True notarized_artifacts = {} @@ -218,11 +241,13 @@ class CasWrapper: # resolved in CAS itself. for artifact_path in artifact_paths: try: - cas_artifact_hash = self.notarize(artifact_path, metadata) + vcn_artifact_hash = self.notarize(artifact_path, metadata) except Exception: - self._logger.exception('Cannot notarize artifact: %s', - artifact_path) + self._logger.exception( + "Cannot notarize artifact: %s", + artifact_path, + ) all_artifacts_is_notarized = False continue - notarized_artifacts[artifact_path] = cas_artifact_hash + notarized_artifacts[artifact_path] = vcn_artifact_hash return all_artifacts_is_notarized, notarized_artifacts diff --git a/setup.py b/setup.py index 13f550f..d1c9665 100644 --- a/setup.py +++ b/setup.py @@ -2,10 +2,10 @@ from setuptools import setup setup( name="cas_wrapper", - version="0.0.6", + version="0.0.7", author="Stepan Oksanichenko", author_email="soksanichenko@almalinux.org", - description="The python wrapper around binary cas from " + description="The python wrapper around binary vcn from " "project Codenotary Community Attestation Service.", url="https://git.almalinux.org/almalinux/cas_wrapper", project_urls={