ALBS-444 (#2)
Co-authored-by: Daniil Anfimov <anfimovdan@gmail.com> Co-authored-by: Vyacheslav Potoropin <vpotoropin@almalinux.org> Reviewed-on: #2 Co-authored-by: anfimovdm <anfimovdm@noreply.git.almalinux.org> Co-committed-by: anfimovdm <anfimovdm@noreply.git.almalinux.org>
This commit is contained in:
parent
bd5686e9e4
commit
98ca413db7
4
.gitignore
vendored
Normal file
4
.gitignore
vendored
Normal file
@ -0,0 +1,4 @@
|
||||
# Byte-compiled / optimized / DLL files
|
||||
__pycache__/
|
||||
*.py[cod]
|
||||
*$py.class
|
115
cas_wrapper.py
115
cas_wrapper.py
@ -1,5 +1,7 @@
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
import json
|
||||
from typing import Dict
|
||||
import logging
|
||||
import typing
|
||||
|
||||
from plumbum import local, ProcessExecutionError
|
||||
|
||||
@ -16,6 +18,7 @@ class CasWrapper:
|
||||
self,
|
||||
cas_api_key: str,
|
||||
cas_signer_id: str,
|
||||
logger: logging.Logger = None,
|
||||
):
|
||||
if self.binary_name not in local:
|
||||
raise FileNotFoundError(
|
||||
@ -23,17 +26,22 @@ class CasWrapper:
|
||||
)
|
||||
self._cas_api_key = cas_api_key
|
||||
self._cas_signer_id = cas_signer_id
|
||||
self._cas = local['cas']
|
||||
self._logger = logger
|
||||
if self._logger is None:
|
||||
self._logger = logging.getLogger()
|
||||
|
||||
def ensure_login(self):
|
||||
with local.env(
|
||||
CAS_API_KEY=self._cas_api_key,
|
||||
SIGNER_ID=self._cas_signer_id
|
||||
SIGNER_ID=self._cas_signer_id,
|
||||
):
|
||||
self._cas = local['cas']
|
||||
self._cas['login']()
|
||||
|
||||
def notarize(
|
||||
self,
|
||||
local_path: str,
|
||||
metadata: Dict = None,
|
||||
metadata: typing.Dict = None,
|
||||
) -> str:
|
||||
"""
|
||||
Wrapper around `cas notarize`
|
||||
@ -64,21 +72,25 @@ class CasWrapper:
|
||||
def authenticate(
|
||||
self,
|
||||
local_path: str,
|
||||
return_json: bool = False,
|
||||
use_hash: bool = False,
|
||||
):
|
||||
"""
|
||||
Wrapper around `cas authenticate`
|
||||
:param local_path: path to a local Git repo
|
||||
(should be started from `git://`)
|
||||
or to a single local file
|
||||
or to a single local file or hash
|
||||
:param return_json: flag for return json response
|
||||
:param use_hash: flag for authenticate by hash
|
||||
:return: true if a commit is trusted, vice versa - false
|
||||
:rtype: bool
|
||||
or dict with result if return_json param is True
|
||||
:rtype: bool or dict
|
||||
"""
|
||||
command = self._cas[
|
||||
'authenticate',
|
||||
local_path,
|
||||
'-o',
|
||||
'json',
|
||||
]
|
||||
command_args = ['authenticate', local_path]
|
||||
if use_hash:
|
||||
command_args = ['authenticate', '--hash', local_path]
|
||||
command_args.extend(('-o', 'json'))
|
||||
command = self._cas[command_args]
|
||||
try:
|
||||
with local.env(
|
||||
CAS_API_KEY=self._cas_api_key,
|
||||
@ -92,4 +104,81 @@ class CasWrapper:
|
||||
):
|
||||
# in case if commit is untrusted
|
||||
result_of_execution = command(retcode=1)
|
||||
return not bool(json.loads(result_of_execution)['status'])
|
||||
json_result = json.loads(result_of_execution)
|
||||
if return_json:
|
||||
return json_result
|
||||
return not bool(json_result['status'])
|
||||
|
||||
def authenticate_source(
|
||||
self,
|
||||
local_path: str,
|
||||
) -> typing.Tuple[bool, typing.Optional[str]]:
|
||||
"""
|
||||
Authenticates source by git path.
|
||||
Returns authenticate result and source commit hash.
|
||||
"""
|
||||
is_authenticated = False
|
||||
commit_cas_hash = None
|
||||
self.ensure_login()
|
||||
try:
|
||||
result_json = self.authenticate(local_path, return_json=True)
|
||||
is_authenticated = result_json['verified']
|
||||
commit_cas_hash = result_json['hash']
|
||||
# we can fall with ProcessExecutionError,
|
||||
# because source can be not notarized
|
||||
except ProcessExecutionError:
|
||||
self._logger.exception('Cannot authenticate: %s', local_path)
|
||||
return is_authenticated, commit_cas_hash
|
||||
|
||||
def authenticate_artifact(
|
||||
self,
|
||||
local_path: str,
|
||||
use_hash: bool = False,
|
||||
) -> bool:
|
||||
"""
|
||||
Authenticates artifact by artifact path or hash if `use_hash` is True.
|
||||
Returns authenticate result.
|
||||
"""
|
||||
is_authenticated = False
|
||||
self.ensure_login()
|
||||
try:
|
||||
is_authenticated = self.authenticate(
|
||||
local_path,
|
||||
use_hash=use_hash,
|
||||
return_json=True,
|
||||
)['verified']
|
||||
# we can fall with ProcessExecutionError,
|
||||
# because artifact can be not notarized
|
||||
except ProcessExecutionError:
|
||||
self._logger.exception('Cannot authenticate: %s', local_path)
|
||||
return is_authenticated
|
||||
|
||||
def notarize_artifacts(
|
||||
self,
|
||||
artifact_paths: typing.List[str],
|
||||
metadata: typing.Dict[str, typing.Any],
|
||||
) -> typing.Tuple[bool, typing.Dict[str, str]]:
|
||||
"""
|
||||
Notarize artifacts by their paths.
|
||||
Returns `True` if all artifacts was succesful notarizated
|
||||
and dict with CAS hashes.
|
||||
"""
|
||||
all_artifacts_is_notarized = True
|
||||
notarized_artifacts = {}
|
||||
self.ensure_login()
|
||||
with ThreadPoolExecutor(max_workers=4) as executor:
|
||||
futures = {
|
||||
executor.submit(self.notarize, artifact_path, metadata): artifact_path
|
||||
for artifact_path in artifact_paths
|
||||
}
|
||||
for future in as_completed(futures):
|
||||
artifact_path = futures[future]
|
||||
try:
|
||||
cas_artifact_hash = future.result()
|
||||
except Exception:
|
||||
self._logger.exception('Cannot notarize artifact: %s',
|
||||
artifact_path)
|
||||
all_artifacts_is_notarized = False
|
||||
continue
|
||||
notarized_artifacts[artifact_path] = cas_artifact_hash
|
||||
return all_artifacts_is_notarized, notarized_artifacts
|
||||
|
Loading…
Reference in New Issue
Block a user