import json import logging import typing from functools import wraps from pathlib import Path from plumbum import ProcessExecutionError, local def with_env_context(func): @wraps(func) def wrapper(self, *args, **kwargs): if not isinstance(self, CasWrapper): raise TypeError( 'Cannot use "with_env_context" decorator outside of CasWrapper instance' ) with local.env( VCN_LC_HOST=self._vcn_lc_host, VCN_LC_API_KEY=self._vcn_lc_api_key, VCN_LC_PORT=self._vcn_lc_port, ): return func(self, *args, **kwargs) return wrapper class CasWrapper: """ The python wrapper around binary `vcn` from Codenotary Community Attestation Service """ def _is_binary_present(self): if not self._full_binary_path.exists(): raise FileNotFoundError( f"Binary VCN is not found in {self._binary_path} on the machine", ) def __init__( self, vcn_lc_api_key: str, vcn_lc_host: str = "eval-honeywell.codenotary.com", vcn_lc_port: int = 443, logger: logging.Logger = None, binary_name: str = "vcn", binary_path: str = "/usr/local/bin/", ): self._vcn_lc_api_key = vcn_lc_api_key self._vcn_lc_host = vcn_lc_host self._vcn_lc_port = vcn_lc_port self._binary_name = binary_name self._binary_path = binary_path self._full_binary_path = Path(self._binary_path, self._binary_name) self._vcn = local[str(self._full_binary_path)] self._logger = logger if self._logger is None: self._logger = logging.getLogger() self._is_binary_present() def get_version(self): self._is_binary_present() command = self._vcn["--version"] version = command().split()[-1].split("v")[1] return version @with_env_context def ensure_login(self): self._vcn["login"]() @with_env_context def notarize( self, local_path: str, metadata: typing.Dict = None, ) -> str: """ Wrapper around `vcn notarize` :param local_path: path to a local Git repo :param metadata: additional metadata :return: hash of notarized commit :rtype: str """ command = self._vcn[ "notarize", local_path, "-o", "json", ] if metadata is not None: for key, value in metadata.items(): command = command[ "-a", f"{key}={value}", ] result_of_execution = command() result_of_execution, *_ = json.loads(result_of_execution) return result_of_execution["hash"] def notarize_no_exc( self, local_path: str, metadata: typing.Dict = None, ) -> typing.Tuple[bool, str]: """ Wrapper for avoiding raising exceptions during notarization. Return `success` flag instead for library user to react respectively. :param local_path: path to a local Git repo :param metadata: additional metadata :return: boolean flag for operation success and the hash of the notarized artifact. :rtype: tuple """ success = False try: vcn_hash = self.notarize(local_path, metadata=metadata) success = True except Exception: self._logger.exception("Cannot notarize artifact: %s", local_path) vcn_hash = "" return success, vcn_hash @with_env_context def authenticate( self, local_path: str, return_json: bool = False, use_hash: bool = False, signer_id: str = "", ) -> typing.Union[bool, dict]: """ Wrapper around `vcn authenticate` :param local_path: path to a local Git repo (should be started from `git://`) or to a single local file or hash :param return_json: flag for return json response :param use_hash: flag for authenticate by hash :return: true if a commit is trusted, vice versa - false or dict with result if return_json param is True :rtype: bool or dict """ command_args = ["authenticate", local_path] if use_hash: command_args = ["authenticate", "--hash", local_path] if signer_id: command_args.extend(("--signerID", signer_id)) command_args.extend(("-o", "json")) command = self._vcn[command_args] try: result_of_execution = command() except ProcessExecutionError: # in case if commit is untrusted result_of_execution = command(retcode=1) json_result = json.loads(result_of_execution) if return_json: return json_result return not bool(json_result["status"]) def authenticate_source( self, local_path: str, signer_id: str = "", ) -> typing.Tuple[bool, typing.Optional[str]]: """ Authenticates source by git path. Returns authenticate result and source commit hash. """ is_authenticated = False commit_vcn_hash = None self.ensure_login() try: result_json = self.authenticate( local_path, return_json=True, signer_id=signer_id, ) is_authenticated = result_json["verified"] commit_vcn_hash = result_json["hash"] # we can fall with ProcessExecutionError, # because source can be not notarized except ProcessExecutionError: self._logger.exception("Cannot authenticate: %s", local_path) return is_authenticated, commit_vcn_hash def authenticate_artifact( self, local_path: str, use_hash: bool = False, signer_id: str = "", ) -> bool: """ Authenticates artifact by artifact path or hash if `use_hash` is True. Returns authenticate result. """ is_authenticated = False self.ensure_login() try: is_authenticated = self.authenticate( local_path, use_hash=use_hash, return_json=True, signer_id=signer_id, )["verified"] # we can fall with ProcessExecutionError, # because artifact can be not notarized except ProcessExecutionError: self._logger.exception("Cannot authenticate: %s", local_path) return is_authenticated def notarize_artifacts( self, artifact_paths: typing.List[str], metadata: typing.Dict[str, typing.Any], ) -> typing.Tuple[bool, typing.Dict[str, str]]: """ Notarize artifacts by their paths. Returns `True` if all artifacts was successful notarizated and dict with VCN hashes. """ all_artifacts_is_notarized = True notarized_artifacts = {} self.ensure_login() # ALBS-576: We stopped doing this process in parallel due to the # problems experienced and described in this CAS issue: # https://github.com/codenotary/cas/issues/275 # Hence, we decided to go sequential here until the problem is # resolved in CAS itself. for artifact_path in artifact_paths: try: vcn_artifact_hash = self.notarize(artifact_path, metadata) except Exception: self._logger.exception( "Cannot notarize artifact: %s", artifact_path, ) all_artifacts_is_notarized = False continue notarized_artifacts[artifact_path] = vcn_artifact_hash return all_artifacts_is_notarized, notarized_artifacts