cas_wrapper/cas_wrapper.py

184 lines
6.0 KiB
Python

from concurrent.futures import ThreadPoolExecutor, as_completed
import json
import logging
import typing
from plumbum import local, ProcessExecutionError
class CasWrapper:
"""
The python wrapper around binary `cas`
from Codenotary Community Attestation Service
"""
binary_name = 'cas'
def __init__(
self,
cas_api_key: str,
cas_signer_id: str,
logger: logging.Logger = None,
):
if self.binary_name not in local:
raise FileNotFoundError(
'Binary CAS is not found in PATH on the machine',
)
self._cas_api_key = cas_api_key
self._cas_signer_id = cas_signer_id
self._cas = local['cas']
self._logger = logger
if self._logger is None:
self._logger = logging.getLogger()
def ensure_login(self):
with local.env(
CAS_API_KEY=self._cas_api_key,
SIGNER_ID=self._cas_signer_id,
):
self._cas['login']()
def notarize(
self,
local_path: str,
metadata: typing.Dict = None,
) -> str:
"""
Wrapper around `cas notarize`
:param local_path: path to a local Git repo
:param metadata: additional metadata
:return: hash of notarized commit
:rtype: str
"""
command = self._cas[
'notarize',
'--bom',
local_path,
'-o',
'json',
]
if metadata is not None:
for key, value in metadata.items():
command = command[
'-a',
f'{key}={value}',
]
with local.env(
CAS_API_KEY=self._cas_api_key,
SIGNER_ID=self._cas_signer_id
):
result_of_execution = command()
return json.loads(result_of_execution)['hash']
def authenticate(
self,
local_path: str,
return_json: bool = False,
use_hash: bool = False,
):
"""
Wrapper around `cas authenticate`
:param local_path: path to a local Git repo
(should be started from `git://`)
or to a single local file or hash
:param return_json: flag for return json response
:param use_hash: flag for authenticate by hash
:return: true if a commit is trusted, vice versa - false
or dict with result if return_json param is True
:rtype: bool or dict
"""
command_args = ['authenticate', local_path]
if use_hash:
command_args = ['authenticate', '--bom', '--hash', local_path]
command_args.extend(('-o', 'json'))
command = self._cas[command_args]
try:
with local.env(
CAS_API_KEY=self._cas_api_key,
SIGNER_ID=self._cas_signer_id
):
result_of_execution = command()
except ProcessExecutionError:
with local.env(
CAS_API_KEY=self._cas_api_key,
SIGNER_ID=self._cas_signer_id
):
# in case if commit is untrusted
result_of_execution = command(retcode=1)
json_result = json.loads(result_of_execution)
if return_json:
return json_result
return not bool(json_result['status'])
def authenticate_source(
self,
local_path: str,
) -> typing.Tuple[bool, typing.Optional[str]]:
"""
Authenticates source by git path.
Returns authenticate result and source commit hash.
"""
is_authenticated = False
commit_cas_hash = None
try:
result_json = self.authenticate(local_path, return_json=True)
is_authenticated = result_json['verified']
commit_cas_hash = result_json['hash']
# we can fall with ProcessExecutionError,
# because source can be not notarized
except ProcessExecutionError:
self._logger.exception('Cannot authenticate: %s', local_path)
return is_authenticated, commit_cas_hash
def authenticate_artifact(
self,
local_path: str,
use_hash: bool = False,
) -> bool:
"""
Authenticates artifact by artifact path or hash if `use_hash` is True.
Returns authenticate result.
"""
is_authenticated = False
try:
is_authenticated = self.authenticate(
local_path,
use_hash=use_hash,
return_json=True,
)['verified']
# we can fall with ProcessExecutionError,
# because artifact can be not notarized
except ProcessExecutionError:
self._logger.exception('Cannot authenticate: %s', local_path)
return is_authenticated
def notarize_artifacts(
self,
artifact_paths: typing.List[str],
metadata: typing.Dict[str, typing.Any],
) -> typing.Tuple[bool, typing.Dict[str, str]]:
"""
Notarize artifacts by their paths.
Returns `True` if all artifacts was succesful notarizated
and dict with CAS hashes.
"""
all_artifacts_is_notarized = True
notarized_artifacts = {}
self.ensure_login()
with ThreadPoolExecutor(max_workers=4) as executor:
futures = {
executor.submit(self.notarize, artifact_path, metadata): artifact_path
for artifact_path in artifact_paths
}
for future in as_completed(futures):
artifact_path = futures[future]
try:
cas_artifact_hash = future.result()
except Exception:
self._logger.exception('Cannot notarize artifact: %s',
artifact_path)
all_artifacts_is_notarized = False
continue
notarized_artifacts[artifact_path] = cas_artifact_hash
return all_artifacts_is_notarized, notarized_artifacts