from concurrent.futures import ThreadPoolExecutor, as_completed import json import logging import typing from plumbum import local, ProcessExecutionError from pydantic import BaseModel class CasArtifact(BaseModel): path: str cas_hash: typing.Optional[str] class CasWrapper: """ The python wrapper around binary `cas` from Codenotary Community Attestation Service """ binary_name = 'cas' def __init__( self, cas_api_key: str, cas_signer_id: str, logger: logging.Logger = None, ): if self.binary_name not in local: raise FileNotFoundError( 'Binary CAS is not found in PATH on the machine', ) self._cas_api_key = cas_api_key self._cas_signer_id = cas_signer_id self._cas = local['cas'] self._logger = logger if self._logger is None: self._logger = logging.getLogger() def __enter__(self): with local.env( CAS_API_KEY=self._cas_api_key, SIGNER_ID=self._cas_signer_id, ): self._cas['login']() return self def __exit__(self, exc_type, value, traceback): pass def notarize( self, local_path: str, metadata: typing.Dict = None, ) -> str: """ Wrapper around `cas notarize` :param local_path: path to a local Git repo :param metadata: additional metadata :return: hash of notarized commit :rtype: str """ command = self._cas[ 'notarize', local_path, '-o', 'json', ] if metadata is not None: for key, value in metadata.items(): command = command[ '-a', f'{key}={value}', ] with local.env( CAS_API_KEY=self._cas_api_key, SIGNER_ID=self._cas_signer_id ): result_of_execution = command() return json.loads(result_of_execution)['hash'] def authenticate( self, local_path: str, return_json: bool = False, ): """ Wrapper around `cas authenticate` :param local_path: path to a local Git repo (should be started from `git://`) or to a single local file :return: true if a commit is trusted, vice versa - false or dict with result if return_json param is True :rtype: bool or dict """ command = self._cas[ 'authenticate', local_path, '-o', 'json', ] try: with local.env( CAS_API_KEY=self._cas_api_key, SIGNER_ID=self._cas_signer_id ): result_of_execution = command() except ProcessExecutionError: with local.env( CAS_API_KEY=self._cas_api_key, SIGNER_ID=self._cas_signer_id ): # in case if commit is untrusted result_of_execution = command(retcode=1) json_result = json.loads(result_of_execution) if return_json: return json_result return not bool(json_result['status']) def authenticate_source( self, local_path: str, ) -> typing.Tuple[bool, typing.Optional[str]]: is_authenticated = False commit_cas_hash = None with self as cas: try: result_json = cas.authenticate(local_path, return_json=True) # it should return 0 for authenticated and trusted commits is_authenticated = not bool( result_json.get('status', 1)) commit_cas_hash = result_json.get('hash') # we can fall with ProcessExecutionError, # because source can be not notarized except ProcessExecutionError: self._logger.exception('Cannot authenticate %s:', local_path) return is_authenticated, commit_cas_hash def authenticate_artifact( self, local_path: str, ) -> bool: is_authenticated = False with self as cas: try: is_authenticated = cas.authenticate(local_path) # we can fall with ProcessExecutionError, # because source can be not notarized except ProcessExecutionError: self._logger.exception('Cannot authenticate %s:', local_path) return is_authenticated def notarize_artifacts( self, artifacts: typing.List[CasArtifact], metadata: typing.Dict[str, typing.Any], ) -> bool: all_artifacts_is_notarized = True with self as cas, ThreadPoolExecutor(max_workers=4) as executor: futures = { executor.submit(cas.notarize, artifact.path, metadata): artifact for artifact in artifacts if not artifact.cas_hash } for future in as_completed(futures): artifact = futures[future] try: cas_artifact_hash = future.result() except Exception: self._logger.exception('Cannot notarize artifact:') all_artifacts_is_notarized = False continue artifact.cas_hash = cas_artifact_hash return all_artifacts_is_notarized