174 lines
5.7 KiB
Python
174 lines
5.7 KiB
Python
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
import json
|
|
import logging
|
|
import typing
|
|
|
|
from plumbum import local, ProcessExecutionError
|
|
|
|
|
|
class CasWrapper:
|
|
"""
|
|
The python wrapper around binary `cas`
|
|
from Codenotary Community Attestation Service
|
|
"""
|
|
|
|
binary_name = 'cas'
|
|
|
|
def __init__(
|
|
self,
|
|
cas_api_key: str,
|
|
cas_signer_id: str,
|
|
logger: logging.Logger = None,
|
|
):
|
|
if self.binary_name not in local:
|
|
raise FileNotFoundError(
|
|
'Binary CAS is not found in PATH on the machine',
|
|
)
|
|
self._cas_api_key = cas_api_key
|
|
self._cas_signer_id = cas_signer_id
|
|
self._cas = local['cas']
|
|
self._logger = logger
|
|
if self._logger is None:
|
|
self._logger = logging.getLogger()
|
|
|
|
def ensure_login(self):
|
|
with local.env(
|
|
CAS_API_KEY=self._cas_api_key,
|
|
SIGNER_ID=self._cas_signer_id,
|
|
):
|
|
self._cas['login']()
|
|
|
|
def notarize(
|
|
self,
|
|
local_path: str,
|
|
metadata: typing.Dict = None,
|
|
) -> str:
|
|
"""
|
|
Wrapper around `cas notarize`
|
|
:param local_path: path to a local Git repo
|
|
:param metadata: additional metadata
|
|
:return: hash of notarized commit
|
|
:rtype: str
|
|
"""
|
|
command = self._cas[
|
|
'notarize',
|
|
local_path,
|
|
'-o',
|
|
'json',
|
|
]
|
|
if metadata is not None:
|
|
for key, value in metadata.items():
|
|
command = command[
|
|
'-a',
|
|
f'{key}={value}',
|
|
]
|
|
with local.env(
|
|
CAS_API_KEY=self._cas_api_key,
|
|
SIGNER_ID=self._cas_signer_id
|
|
):
|
|
result_of_execution = command()
|
|
return json.loads(result_of_execution)['hash']
|
|
|
|
def authenticate(
|
|
self,
|
|
local_path: str,
|
|
return_json: bool = False,
|
|
use_hash: bool = False,
|
|
):
|
|
"""
|
|
Wrapper around `cas authenticate`
|
|
:param local_path: path to a local Git repo
|
|
(should be started from `git://`)
|
|
or to a single local file or hash
|
|
:param return_json: flag for return json response
|
|
:param use_hash: flag for authenticate by hash
|
|
:return: true if a commit is trusted, vice versa - false
|
|
or dict with result if return_json param is True
|
|
:rtype: bool or dict
|
|
"""
|
|
command_args = ['authenticate', local_path]
|
|
if use_hash:
|
|
command_args = ['authenticate', '--hash', local_path]
|
|
command_args.extend(('-o', 'json'))
|
|
command = self._cas[command_args]
|
|
try:
|
|
with local.env(
|
|
CAS_API_KEY=self._cas_api_key,
|
|
SIGNER_ID=self._cas_signer_id
|
|
):
|
|
result_of_execution = command()
|
|
except ProcessExecutionError:
|
|
with local.env(
|
|
CAS_API_KEY=self._cas_api_key,
|
|
SIGNER_ID=self._cas_signer_id
|
|
):
|
|
# in case if commit is untrusted
|
|
result_of_execution = command(retcode=1)
|
|
json_result = json.loads(result_of_execution)
|
|
if return_json:
|
|
return json_result
|
|
return not bool(json_result['status'])
|
|
|
|
def authenticate_source(
|
|
self,
|
|
local_path: str,
|
|
) -> typing.Tuple[bool, typing.Optional[str]]:
|
|
is_authenticated = False
|
|
commit_cas_hash = None
|
|
self.ensure_login()
|
|
try:
|
|
result_json = self.authenticate(local_path, return_json=True)
|
|
# it should return 0 for authenticated and trusted commits
|
|
is_authenticated = not bool(
|
|
result_json.get('status', 1))
|
|
commit_cas_hash = result_json.get('hash')
|
|
# we can fall with ProcessExecutionError,
|
|
# because source can be not notarized
|
|
except ProcessExecutionError:
|
|
self._logger.exception('Cannot authenticate %s:', local_path)
|
|
return is_authenticated, commit_cas_hash
|
|
|
|
def authenticate_artifact(
|
|
self,
|
|
local_path: str,
|
|
use_hash: bool = False,
|
|
return_json: bool = False,
|
|
) -> bool:
|
|
is_authenticated = False
|
|
self.ensure_login()
|
|
try:
|
|
is_authenticated = self.authenticate(
|
|
local_path,
|
|
use_hash=use_hash,
|
|
return_json=return_json,
|
|
)
|
|
# we can fall with ProcessExecutionError,
|
|
# because source can be not notarized
|
|
except ProcessExecutionError:
|
|
self._logger.exception('Cannot authenticate %s:', local_path)
|
|
return is_authenticated
|
|
|
|
def notarize_artifacts(
|
|
self,
|
|
artifact_paths: typing.List[str],
|
|
metadata: typing.Dict[str, typing.Any],
|
|
) -> typing.Tuple[bool, typing.Dict[str, str]]:
|
|
all_artifacts_is_notarized = True
|
|
notarized_artifacts = {}
|
|
self.ensure_login()
|
|
with ThreadPoolExecutor(max_workers=4) as executor:
|
|
futures = {
|
|
executor.submit(self.notarize, artifact_path, metadata): artifact_path
|
|
for artifact_path in artifact_paths
|
|
}
|
|
for future in as_completed(futures):
|
|
artifact_path = futures[future]
|
|
try:
|
|
cas_artifact_hash = future.result()
|
|
except Exception:
|
|
self._logger.exception('Cannot notarize artifact:')
|
|
all_artifacts_is_notarized = False
|
|
continue
|
|
notarized_artifacts[artifact_path] = cas_artifact_hash
|
|
return all_artifacts_is_notarized, notarized_artifacts
|