From 98ca413db7e7eda2e064507d1be340a0b256c938 Mon Sep 17 00:00:00 2001 From: anfimovdm Date: Fri, 1 Jul 2022 14:25:16 +0000 Subject: [PATCH] ALBS-444 (#2) Co-authored-by: Daniil Anfimov Co-authored-by: Vyacheslav Potoropin Reviewed-on: https://git.almalinux.org/almalinux/cas_wrapper/pulls/2 Co-authored-by: anfimovdm Co-committed-by: anfimovdm --- .gitignore | 4 ++ cas_wrapper.py | 117 +++++++++++++++++++++++++++++++++++++++++++------ setup.py | 2 +- 3 files changed, 108 insertions(+), 15 deletions(-) create mode 100644 .gitignore diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..e928de1 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class \ No newline at end of file diff --git a/cas_wrapper.py b/cas_wrapper.py index 8533dc9..512a923 100644 --- a/cas_wrapper.py +++ b/cas_wrapper.py @@ -1,5 +1,7 @@ +from concurrent.futures import ThreadPoolExecutor, as_completed import json -from typing import Dict +import logging +import typing from plumbum import local, ProcessExecutionError @@ -16,6 +18,7 @@ class CasWrapper: self, cas_api_key: str, cas_signer_id: str, + logger: logging.Logger = None, ): if self.binary_name not in local: raise FileNotFoundError( @@ -23,17 +26,22 @@ class CasWrapper: ) self._cas_api_key = cas_api_key self._cas_signer_id = cas_signer_id + self._cas = local['cas'] + self._logger = logger + if self._logger is None: + self._logger = logging.getLogger() + + def ensure_login(self): with local.env( - CAS_API_KEY=self._cas_api_key, - SIGNER_ID=self._cas_signer_id + CAS_API_KEY=self._cas_api_key, + SIGNER_ID=self._cas_signer_id, ): - self._cas = local['cas'] self._cas['login']() def notarize( self, local_path: str, - metadata: Dict = None, + metadata: typing.Dict = None, ) -> str: """ Wrapper around `cas notarize` @@ -64,21 +72,25 @@ class CasWrapper: def authenticate( self, local_path: str, + return_json: bool = False, + use_hash: bool = False, ): """ Wrapper around `cas authenticate` :param local_path: path to a local Git repo (should be started from `git://`) - or to a single local file + or to a single local file or hash + :param return_json: flag for return json response + :param use_hash: flag for authenticate by hash :return: true if a commit is trusted, vice versa - false - :rtype: bool + or dict with result if return_json param is True + :rtype: bool or dict """ - command = self._cas[ - 'authenticate', - local_path, - '-o', - 'json', - ] + command_args = ['authenticate', local_path] + if use_hash: + command_args = ['authenticate', '--hash', local_path] + command_args.extend(('-o', 'json')) + command = self._cas[command_args] try: with local.env( CAS_API_KEY=self._cas_api_key, @@ -92,4 +104,81 @@ class CasWrapper: ): # in case if commit is untrusted result_of_execution = command(retcode=1) - return not bool(json.loads(result_of_execution)['status']) + json_result = json.loads(result_of_execution) + if return_json: + return json_result + return not bool(json_result['status']) + + def authenticate_source( + self, + local_path: str, + ) -> typing.Tuple[bool, typing.Optional[str]]: + """ + Authenticates source by git path. + Returns authenticate result and source commit hash. + """ + is_authenticated = False + commit_cas_hash = None + self.ensure_login() + try: + result_json = self.authenticate(local_path, return_json=True) + is_authenticated = result_json['verified'] + commit_cas_hash = result_json['hash'] + # we can fall with ProcessExecutionError, + # because source can be not notarized + except ProcessExecutionError: + self._logger.exception('Cannot authenticate: %s', local_path) + return is_authenticated, commit_cas_hash + + def authenticate_artifact( + self, + local_path: str, + use_hash: bool = False, + ) -> bool: + """ + Authenticates artifact by artifact path or hash if `use_hash` is True. + Returns authenticate result. + """ + is_authenticated = False + self.ensure_login() + try: + is_authenticated = self.authenticate( + local_path, + use_hash=use_hash, + return_json=True, + )['verified'] + # we can fall with ProcessExecutionError, + # because artifact can be not notarized + except ProcessExecutionError: + self._logger.exception('Cannot authenticate: %s', local_path) + return is_authenticated + + def notarize_artifacts( + self, + artifact_paths: typing.List[str], + metadata: typing.Dict[str, typing.Any], + ) -> typing.Tuple[bool, typing.Dict[str, str]]: + """ + Notarize artifacts by their paths. + Returns `True` if all artifacts was succesful notarizated + and dict with CAS hashes. + """ + all_artifacts_is_notarized = True + notarized_artifacts = {} + self.ensure_login() + with ThreadPoolExecutor(max_workers=4) as executor: + futures = { + executor.submit(self.notarize, artifact_path, metadata): artifact_path + for artifact_path in artifact_paths + } + for future in as_completed(futures): + artifact_path = futures[future] + try: + cas_artifact_hash = future.result() + except Exception: + self._logger.exception('Cannot notarize artifact: %s', + artifact_path) + all_artifacts_is_notarized = False + continue + notarized_artifacts[artifact_path] = cas_artifact_hash + return all_artifacts_is_notarized, notarized_artifacts diff --git a/setup.py b/setup.py index d7c772d..f19dd49 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ from setuptools import setup setup( name="cas_wrapper", - version="0.0.1", + version="0.0.2", author="Stepan Oksanichenko", author_email="soksanichenko@almalinux.org", description="The python wrapper around binary cas from "