ALBS-444 #2
4
.gitignore
vendored
Normal file
4
.gitignore
vendored
Normal file
@ -0,0 +1,4 @@
|
||||
# Byte-compiled / optimized / DLL files
|
||||
__pycache__/
|
||||
*.py[cod]
|
||||
*$py.class
|
117
cas_wrapper.py
117
cas_wrapper.py
@ -1,5 +1,7 @@
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
import json
|
||||
from typing import Dict
|
||||
import logging
|
||||
import typing
|
||||
|
||||
from plumbum import local, ProcessExecutionError
|
||||
|
||||
@ -16,6 +18,7 @@ class CasWrapper:
|
||||
self,
|
||||
cas_api_key: str,
|
||||
cas_signer_id: str,
|
||||
logger: logging.Logger = None,
|
||||
):
|
||||
if self.binary_name not in local:
|
||||
raise FileNotFoundError(
|
||||
@ -23,17 +26,22 @@ class CasWrapper:
|
||||
)
|
||||
self._cas_api_key = cas_api_key
|
||||
self._cas_signer_id = cas_signer_id
|
||||
self._cas = local['cas']
|
||||
self._logger = logger
|
||||
if self._logger is None:
|
||||
self._logger = logging.getLogger()
|
||||
|
||||
def ensure_login(self):
|
||||
with local.env(
|
||||
CAS_API_KEY=self._cas_api_key,
|
||||
SIGNER_ID=self._cas_signer_id
|
||||
CAS_API_KEY=self._cas_api_key,
|
||||
SIGNER_ID=self._cas_signer_id,
|
||||
):
|
||||
self._cas = local['cas']
|
||||
self._cas['login']()
|
||||
|
||||
def notarize(
|
||||
self,
|
||||
local_path: str,
|
||||
metadata: Dict = None,
|
||||
metadata: typing.Dict = None,
|
||||
) -> str:
|
||||
"""
|
||||
Wrapper around `cas notarize`
|
||||
@ -64,21 +72,25 @@ class CasWrapper:
|
||||
def authenticate(
|
||||
self,
|
||||
local_path: str,
|
||||
return_json: bool = False,
|
||||
use_hash: bool = False,
|
||||
):
|
||||
"""
|
||||
Wrapper around `cas authenticate`
|
||||
:param local_path: path to a local Git repo
|
||||
(should be started from `git://`)
|
||||
or to a single local file
|
||||
or to a single local file or hash
|
||||
:param return_json: flag for return json response
|
||||
:param use_hash: flag for authenticate by hash
|
||||
:return: true if a commit is trusted, vice versa - false
|
||||
:rtype: bool
|
||||
or dict with result if return_json param is True
|
||||
:rtype: bool or dict
|
||||
"""
|
||||
command = self._cas[
|
||||
'authenticate',
|
||||
local_path,
|
||||
'-o',
|
||||
'json',
|
||||
]
|
||||
command_args = ['authenticate', local_path]
|
||||
if use_hash:
|
||||
command_args = ['authenticate', '--hash', local_path]
|
||||
command_args.extend(('-o', 'json'))
|
||||
command = self._cas[command_args]
|
||||
try:
|
||||
with local.env(
|
||||
CAS_API_KEY=self._cas_api_key,
|
||||
@ -92,4 +104,81 @@ class CasWrapper:
|
||||
):
|
||||
# in case if commit is untrusted
|
||||
result_of_execution = command(retcode=1)
|
||||
return not bool(json.loads(result_of_execution)['status'])
|
||||
json_result = json.loads(result_of_execution)
|
||||
if return_json:
|
||||
return json_result
|
||||
return not bool(json_result['status'])
|
||||
|
||||
def authenticate_source(
|
||||
self,
|
||||
local_path: str,
|
||||
) -> typing.Tuple[bool, typing.Optional[str]]:
|
||||
"""
|
||||
Authenticates source by git path.
|
||||
Returns authenticate result and source commit hash.
|
||||
"""
|
||||
is_authenticated = False
|
||||
commit_cas_hash = None
|
||||
self.ensure_login()
|
||||
try:
|
||||
result_json = self.authenticate(local_path, return_json=True)
|
||||
anfimovdm marked this conversation as resolved
|
||||
is_authenticated = result_json['verified']
|
||||
commit_cas_hash = result_json['hash']
|
||||
# we can fall with ProcessExecutionError,
|
||||
# because source can be not notarized
|
||||
except ProcessExecutionError:
|
||||
self._logger.exception('Cannot authenticate: %s', local_path)
|
||||
return is_authenticated, commit_cas_hash
|
||||
|
||||
def authenticate_artifact(
|
||||
self,
|
||||
local_path: str,
|
||||
use_hash: bool = False,
|
||||
) -> bool:
|
||||
"""
|
||||
Authenticates artifact by artifact path or hash if `use_hash` is True.
|
||||
Returns authenticate result.
|
||||
"""
|
||||
is_authenticated = False
|
||||
self.ensure_login()
|
||||
try:
|
||||
is_authenticated = self.authenticate(
|
||||
local_path,
|
||||
use_hash=use_hash,
|
||||
anfimovdm marked this conversation as resolved
Outdated
AvoidMe
commented
Instead of Instead of `with self` I would prefer `self.ensure_login()` or something, because you didn't use __exit__
anfimovdm
commented
Done Done
|
||||
return_json=True,
|
||||
anfimovdm marked this conversation as resolved
Outdated
soksanichenko
commented
I guess colon is not needed in the logging message or it's in wrong place of the message I guess colon is not needed in the logging message or it's in wrong place of the message
|
||||
)['verified']
|
||||
# we can fall with ProcessExecutionError,
|
||||
# because artifact can be not notarized
|
||||
except ProcessExecutionError:
|
||||
self._logger.exception('Cannot authenticate: %s', local_path)
|
||||
return is_authenticated
|
||||
|
||||
def notarize_artifacts(
|
||||
self,
|
||||
artifact_paths: typing.List[str],
|
||||
metadata: typing.Dict[str, typing.Any],
|
||||
) -> typing.Tuple[bool, typing.Dict[str, str]]:
|
||||
"""
|
||||
Notarize artifacts by their paths.
|
||||
Returns `True` if all artifacts was succesful notarizated
|
||||
and dict with CAS hashes.
|
||||
"""
|
||||
all_artifacts_is_notarized = True
|
||||
notarized_artifacts = {}
|
||||
self.ensure_login()
|
||||
with ThreadPoolExecutor(max_workers=4) as executor:
|
||||
anfimovdm marked this conversation as resolved
Outdated
soksanichenko
commented
Add name of an artifact to the logging message Add name of an artifact to the logging message
|
||||
futures = {
|
||||
executor.submit(self.notarize, artifact_path, metadata): artifact_path
|
||||
for artifact_path in artifact_paths
|
||||
}
|
||||
for future in as_completed(futures):
|
||||
artifact_path = futures[future]
|
||||
try:
|
||||
cas_artifact_hash = future.result()
|
||||
anfimovdm marked this conversation as resolved
Outdated
AvoidMe
commented
It's bad practice to mutate arguments like this, can you return all artifacts with the respected hashes instead? It's bad practice to mutate arguments like this, can you return all artifacts with the respected hashes instead?
anfimovdm
commented
Done Done
|
||||
except Exception:
|
||||
self._logger.exception('Cannot notarize artifact: %s',
|
||||
artifact_path)
|
||||
all_artifacts_is_notarized = False
|
||||
continue
|
||||
notarized_artifacts[artifact_path] = cas_artifact_hash
|
||||
return all_artifacts_is_notarized, notarized_artifacts
|
||||
|
Loading…
Reference in New Issue
Block a user
You don't get hash because use_hash equals to Fasle by default
I can get hash here, because when I use
return_json
flag,self.authenticate
returns full JSON response