cas_wrapper/cas_wrapper.py
soksanichenko 646b984d89 ALBS-443: Integrate CodeNotary with git updater tool
- Env vars with credentials
2022-06-21 11:04:31 +03:00

96 lines
2.7 KiB
Python

import json
from typing import Dict
from plumbum import local, ProcessExecutionError
class CasWrapper:
"""
The python wrapper around binary `cas`
from Codenotary Community Attestation Service
"""
binary_name = 'cas'
def __init__(
self,
cas_api_key: str,
cas_signer_id: str,
):
if self.binary_name not in local:
raise FileNotFoundError(
'Binary CAS is not found in PATH on the machine',
)
self._cas_api_key = cas_api_key
self._cas_signer_id = cas_signer_id
with local.env(
CAS_API_KEY=self._cas_api_key,
SIGNER_ID=self._cas_signer_id
):
self._cas = local['cas']
self._cas['login']()
def notarize(
self,
local_path: str,
metadata: Dict = None,
) -> str:
"""
Wrapper around `cas notarize`
:param local_path: path to a local Git repo
:param metadata: additional metadata
:return: hash of notarized commit
:rtype: str
"""
command = self._cas[
'notarize',
local_path,
'-o',
'json',
]
if metadata is not None:
for key, value in metadata.items():
command = command[
'-a',
f'{key}={value}',
]
with local.env(
CAS_API_KEY=self._cas_api_key,
SIGNER_ID=self._cas_signer_id
):
result_of_execution = command()
return json.loads(result_of_execution)['hash']
def authenticate(
self,
local_path: str,
):
"""
Wrapper around `cas authenticate`
:param local_path: path to a local Git repo
(should be started from `git://`)
or to a single local file
:return: true if a commit is trusted, vice versa - false
:rtype: bool
"""
command = self._cas[
'authenticate',
local_path,
'-o',
'json',
]
try:
with local.env(
CAS_API_KEY=self._cas_api_key,
SIGNER_ID=self._cas_signer_id
):
result_of_execution = command()
except ProcessExecutionError:
with local.env(
CAS_API_KEY=self._cas_api_key,
SIGNER_ID=self._cas_signer_id
):
# in case if commit is untrusted
result_of_execution = command(retcode=1)
return not bool(json.loads(result_of_execution)['status'])