cas_wrapper/cas_wrapper.py

96 lines
2.7 KiB
Python
Raw Normal View History

2022-06-20 10:20:00 +00:00
import json
from typing import Dict
2022-06-20 10:20:00 +00:00
from plumbum import local, ProcessExecutionError
class CasWrapper:
"""
The python wrapper around binary `cas`
from Codenotary Community Attestation Service
"""
binary_name = 'cas'
def __init__(
self,
cas_api_key: str,
cas_signer_id: str,
):
if self.binary_name not in local:
raise FileNotFoundError(
'Binary CAS is not found in PATH on the machine',
)
self._cas_api_key = cas_api_key
self._cas_signer_id = cas_signer_id
with local.env(
CAS_API_KEY=self._cas_api_key,
SIGNER_ID=self._cas_signer_id
):
self._cas = local['cas']
2022-06-20 10:22:52 +00:00
self._cas['login']()
2022-06-20 10:20:00 +00:00
def notarize(
self,
local_path: str,
metadata: Dict = None,
) -> str:
2022-06-20 10:20:00 +00:00
"""
Wrapper around `cas notarize`
:param local_path: path to a local Git repo
:param metadata: additional metadata
2022-06-20 10:20:00 +00:00
:return: hash of notarized commit
:rtype: str
"""
command = self._cas[
'notarize',
local_path,
2022-06-20 10:20:00 +00:00
'-o',
'json',
]
if metadata is not None:
for key, value in metadata.items():
command = command[
'-a',
f'{key}={value}',
]
with local.env(
CAS_API_KEY=self._cas_api_key,
SIGNER_ID=self._cas_signer_id
):
result_of_execution = command()
2022-06-20 10:20:00 +00:00
return json.loads(result_of_execution)['hash']
def authenticate(
self,
local_path: str,
):
2022-06-20 10:20:00 +00:00
"""
Wrapper around `cas authenticate`
:param local_path: path to a local Git repo
(should be started from `git://`)
or to a single local file
2022-06-20 10:20:00 +00:00
:return: true if a commit is trusted, vice versa - false
:rtype: bool
"""
command = self._cas[
'authenticate',
local_path,
2022-06-20 10:20:00 +00:00
'-o',
'json',
]
try:
with local.env(
CAS_API_KEY=self._cas_api_key,
SIGNER_ID=self._cas_signer_id
):
result_of_execution = command()
2022-06-20 10:20:00 +00:00
except ProcessExecutionError:
with local.env(
CAS_API_KEY=self._cas_api_key,
SIGNER_ID=self._cas_signer_id
):
# in case if commit is untrusted
result_of_execution = command(retcode=1)
2022-06-20 10:20:00 +00:00
return not bool(json.loads(result_of_execution)['status'])