ALBS-444 #2

Merged
anfimovdm merged 9 commits from ALBS-444 into master 2022-07-01 14:25:17 +00:00
3 changed files with 108 additions and 15 deletions

4
.gitignore vendored Normal file
View File

@ -0,0 +1,4 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class

View File

@ -1,5 +1,7 @@
from concurrent.futures import ThreadPoolExecutor, as_completed
import json import json
from typing import Dict import logging
import typing
from plumbum import local, ProcessExecutionError from plumbum import local, ProcessExecutionError
@ -16,6 +18,7 @@ class CasWrapper:
self, self,
cas_api_key: str, cas_api_key: str,
cas_signer_id: str, cas_signer_id: str,
logger: logging.Logger = None,
): ):
if self.binary_name not in local: if self.binary_name not in local:
raise FileNotFoundError( raise FileNotFoundError(
@ -23,17 +26,22 @@ class CasWrapper:
) )
self._cas_api_key = cas_api_key self._cas_api_key = cas_api_key
self._cas_signer_id = cas_signer_id self._cas_signer_id = cas_signer_id
self._cas = local['cas']
self._logger = logger
if self._logger is None:
self._logger = logging.getLogger()
def ensure_login(self):
with local.env( with local.env(
CAS_API_KEY=self._cas_api_key, CAS_API_KEY=self._cas_api_key,
SIGNER_ID=self._cas_signer_id SIGNER_ID=self._cas_signer_id,
): ):
self._cas = local['cas']
self._cas['login']() self._cas['login']()
def notarize( def notarize(
self, self,
local_path: str, local_path: str,
metadata: Dict = None, metadata: typing.Dict = None,
) -> str: ) -> str:
""" """
Wrapper around `cas notarize` Wrapper around `cas notarize`
@ -64,21 +72,25 @@ class CasWrapper:
def authenticate( def authenticate(
self, self,
local_path: str, local_path: str,
return_json: bool = False,
use_hash: bool = False,
): ):
""" """
Wrapper around `cas authenticate` Wrapper around `cas authenticate`
:param local_path: path to a local Git repo :param local_path: path to a local Git repo
(should be started from `git://`) (should be started from `git://`)
or to a single local file or to a single local file or hash
:param return_json: flag for return json response
:param use_hash: flag for authenticate by hash
:return: true if a commit is trusted, vice versa - false :return: true if a commit is trusted, vice versa - false
:rtype: bool or dict with result if return_json param is True
:rtype: bool or dict
""" """
command = self._cas[ command_args = ['authenticate', local_path]
'authenticate', if use_hash:
local_path, command_args = ['authenticate', '--hash', local_path]
'-o', command_args.extend(('-o', 'json'))
'json', command = self._cas[command_args]
]
try: try:
with local.env( with local.env(
CAS_API_KEY=self._cas_api_key, CAS_API_KEY=self._cas_api_key,
@ -92,4 +104,81 @@ class CasWrapper:
): ):
# in case if commit is untrusted # in case if commit is untrusted
result_of_execution = command(retcode=1) result_of_execution = command(retcode=1)
return not bool(json.loads(result_of_execution)['status']) json_result = json.loads(result_of_execution)
if return_json:
return json_result
return not bool(json_result['status'])
def authenticate_source(
self,
local_path: str,
) -> typing.Tuple[bool, typing.Optional[str]]:
"""
Authenticates source by git path.
Returns authenticate result and source commit hash.
"""
is_authenticated = False
commit_cas_hash = None
self.ensure_login()
try:
result_json = self.authenticate(local_path, return_json=True)
anfimovdm marked this conversation as resolved
Review

You don't get hash because use_hash equals to Fasle by default

You don't get hash because use_hash equals to Fasle by default
Review

I can get hash here, because when I use return_json flag, self.authenticate returns full JSON response

I can get hash here, because when I use `return_json` flag, `self.authenticate` returns full JSON response
is_authenticated = result_json['verified']
commit_cas_hash = result_json['hash']
# we can fall with ProcessExecutionError,
# because source can be not notarized
except ProcessExecutionError:
self._logger.exception('Cannot authenticate: %s', local_path)
return is_authenticated, commit_cas_hash
def authenticate_artifact(
self,
local_path: str,
use_hash: bool = False,
) -> bool:
"""
Authenticates artifact by artifact path or hash if `use_hash` is True.
Returns authenticate result.
"""
is_authenticated = False
self.ensure_login()
try:
is_authenticated = self.authenticate(
local_path,
use_hash=use_hash,
anfimovdm marked this conversation as resolved Outdated

Instead of with self I would prefer self.ensure_login() or something, because you didn't use exit

Instead of `with self` I would prefer `self.ensure_login()` or something, because you didn't use __exit__

Done

Done
return_json=True,
anfimovdm marked this conversation as resolved Outdated

I guess colon is not needed in the logging message or it's in wrong place of the message

I guess colon is not needed in the logging message or it's in wrong place of the message
)['verified']
# we can fall with ProcessExecutionError,
# because artifact can be not notarized
except ProcessExecutionError:
self._logger.exception('Cannot authenticate: %s', local_path)
return is_authenticated
def notarize_artifacts(
self,
artifact_paths: typing.List[str],
metadata: typing.Dict[str, typing.Any],
) -> typing.Tuple[bool, typing.Dict[str, str]]:
"""
Notarize artifacts by their paths.
Returns `True` if all artifacts was succesful notarizated
and dict with CAS hashes.
"""
all_artifacts_is_notarized = True
notarized_artifacts = {}
self.ensure_login()
with ThreadPoolExecutor(max_workers=4) as executor:
anfimovdm marked this conversation as resolved Outdated

Add name of an artifact to the logging message

Add name of an artifact to the logging message
futures = {
executor.submit(self.notarize, artifact_path, metadata): artifact_path
for artifact_path in artifact_paths
}
for future in as_completed(futures):
artifact_path = futures[future]
try:
cas_artifact_hash = future.result()
anfimovdm marked this conversation as resolved Outdated

It's bad practice to mutate arguments like this, can you return all artifacts with the respected hashes instead?

It's bad practice to mutate arguments like this, can you return all artifacts with the respected hashes instead?

Done

Done
except Exception:
self._logger.exception('Cannot notarize artifact: %s',
artifact_path)
all_artifacts_is_notarized = False
continue
notarized_artifacts[artifact_path] = cas_artifact_hash
return all_artifacts_is_notarized, notarized_artifacts

View File

@ -2,7 +2,7 @@ from setuptools import setup
setup( setup(
name="cas_wrapper", name="cas_wrapper",
version="0.0.1", version="0.0.2",
author="Stepan Oksanichenko", author="Stepan Oksanichenko",
author_email="soksanichenko@almalinux.org", author_email="soksanichenko@almalinux.org",
description="The python wrapper around binary cas from " description="The python wrapper around binary cas from "