Compare commits

..

16 Commits

Author SHA1 Message Date
danfimov
d5ae64321c Merge pull request 'Revert "ALBS-1122: Replace notarization interface from CAS to VCN (#7)"' (#8) from revert into master
Reviewed-on: #8
2023-06-22 11:40:26 +00:00
b5a2b87568 Revert "ALBS-1122: Replace notarization interface from CAS to VCN (#7)"
This reverts commit d68685bf72.
2023-06-22 13:37:34 +02:00
danfimov
d68685bf72 ALBS-1122: Replace notarization interface from CAS to VCN (#7)
Co-authored-by: Daniil Anfimov <anfimovdan@gmail.com>
Reviewed-on: #7
2023-06-20 14:43:27 +00:00
Javier Hernández
d9c09a5d58 Merge pull request #6 from jhernandez/ALBS-662
ALBS-662: Added the ability to authenticate other CAS users' artifacts

Reviewed-on: #6
2022-11-10 10:15:42 +00:00
a78784fee4 New version 0.0.6 2022-11-10 11:10:06 +01:00
4ced19c78c ALBS-662: Added the ability to authenticate other CAS users' artifacts 2022-11-08 14:04:09 +01:00
Korulag
211d4521c0 Merge pull request 'Added notarize_no_exc method to return success state instead of raising the exception' (#5) from albs-637 into master
Reviewed-on: #5
2022-09-14 20:02:22 +00:00
Vasily Kleschov
de5d91217f Fixed review comments 2022-09-14 19:28:45 +03:00
Vasily Kleschov
7093f13a10 Added notarize_no_exc method to return success state instead of raising the exception 2022-09-14 18:58:37 +03:00
Stepan Oksanichenko
49c4c97645 Merge pull request 'ALBS-639: Create a CLI tool to generate SBOM' (#4) from ALBS-639 into master
Reviewed-on: #4
2022-09-13 12:26:18 +00:00
soksanichenko
d2f8cdda42 ALBS-639: Create a CLI tool to generate SBOM
- It's fixed splitting of result of the command `cas --version`
2022-09-13 14:57:26 +03:00
soksanichenko
e5fd2bf3b1 ALBS-639: Create a CLI tool to generate SBOM
- Class method `get_version` is implemented
2022-09-13 14:39:54 +03:00
d6eca7c2e3 New version 0.0.3 2022-08-18 11:38:55 +02:00
Javier Hernández
9bb706e2d0 Merge pull request 'ALBS-576: Notarize artifacts sequentially' (#3)
Reviewed-on: #3
2022-08-18 09:23:07 +00:00
f93f95fca5 ALBS-576: Notarize artifacts sequentially
See: https://github.com/codenotary/cas/issues/275
2022-08-18 10:27:46 +02:00
anfimovdm
98ca413db7 ALBS-444 (#2)
Co-authored-by: Daniil Anfimov <anfimovdan@gmail.com>
Co-authored-by: Vyacheslav Potoropin <vpotoropin@almalinux.org>
Reviewed-on: #2
Co-authored-by: anfimovdm <anfimovdm@noreply.git.almalinux.org>
Co-committed-by: anfimovdm <anfimovdm@noreply.git.almalinux.org>
2022-07-01 14:25:16 +00:00
3 changed files with 156 additions and 19 deletions

4
.gitignore vendored Normal file
View File

@ -0,0 +1,4 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class

View File

@ -1,5 +1,6 @@
import json import json
from typing import Dict import logging
import typing
from plumbum import local, ProcessExecutionError from plumbum import local, ProcessExecutionError
@ -12,28 +13,45 @@ class CasWrapper:
binary_name = 'cas' binary_name = 'cas'
@classmethod
def _is_binary_present(cls):
if cls.binary_name not in local:
raise FileNotFoundError(
'Binary CAS is not found in PATH on the machine',
)
def __init__( def __init__(
self, self,
cas_api_key: str, cas_api_key: str,
cas_signer_id: str, cas_signer_id: str,
logger: logging.Logger = None,
): ):
if self.binary_name not in local: self._is_binary_present()
raise FileNotFoundError(
'Binary CAS is not found in PATH on the machine',
)
self._cas_api_key = cas_api_key self._cas_api_key = cas_api_key
self._cas_signer_id = cas_signer_id self._cas_signer_id = cas_signer_id
self._cas = local['cas']
self._logger = logger
if self._logger is None:
self._logger = logging.getLogger()
@classmethod
def get_version(cls):
cls._is_binary_present()
command = local['cas']['--version']
version = command().split()[-1].split('v')[1]
return version
def ensure_login(self):
with local.env( with local.env(
CAS_API_KEY=self._cas_api_key, CAS_API_KEY=self._cas_api_key,
SIGNER_ID=self._cas_signer_id SIGNER_ID=self._cas_signer_id,
): ):
self._cas = local['cas']
self._cas['login']() self._cas['login']()
def notarize( def notarize(
self, self,
local_path: str, local_path: str,
metadata: Dict = None, metadata: typing.Dict = None,
) -> str: ) -> str:
""" """
Wrapper around `cas notarize` Wrapper around `cas notarize`
@ -61,24 +79,55 @@ class CasWrapper:
result_of_execution = command() result_of_execution = command()
return json.loads(result_of_execution)['hash'] return json.loads(result_of_execution)['hash']
def notarize_no_exc(
self,
local_path: str,
metadata: typing.Dict = None,
) -> typing.Tuple[bool, str]:
"""
Wrapper for avoiding raising exceptions during notarization.
Return `success` flag instead for library user to react respectively.
:param local_path: path to a local Git repo
:param metadata: additional metadata
:return: boolean flag for operation success and the hash
of the notarized artifact.
:rtype: tuple
"""
success = False
try:
cas_hash = self.notarize(local_path, metadata=metadata)
success = True
except Exception:
self._logger.exception('Cannot notarize artifact: %s',
local_path)
cas_hash = ''
return success, cas_hash
def authenticate( def authenticate(
self, self,
local_path: str, local_path: str,
return_json: bool = False,
use_hash: bool = False,
signer_id: str = None,
): ):
""" """
Wrapper around `cas authenticate` Wrapper around `cas authenticate`
:param local_path: path to a local Git repo :param local_path: path to a local Git repo
(should be started from `git://`) (should be started from `git://`)
or to a single local file or to a single local file or hash
:param return_json: flag for return json response
:param use_hash: flag for authenticate by hash
:return: true if a commit is trusted, vice versa - false :return: true if a commit is trusted, vice versa - false
:rtype: bool or dict with result if return_json param is True
:rtype: bool or dict
""" """
command = self._cas[ command_args = ['authenticate', local_path]
'authenticate', if use_hash:
local_path, command_args = ['authenticate', '--hash', local_path]
'-o', if signer_id:
'json', command_args.extend(('--signerID', signer_id))
] command_args.extend(('-o', 'json'))
command = self._cas[command_args]
try: try:
with local.env( with local.env(
CAS_API_KEY=self._cas_api_key, CAS_API_KEY=self._cas_api_key,
@ -92,4 +141,88 @@ class CasWrapper:
): ):
# in case if commit is untrusted # in case if commit is untrusted
result_of_execution = command(retcode=1) result_of_execution = command(retcode=1)
return not bool(json.loads(result_of_execution)['status']) json_result = json.loads(result_of_execution)
if return_json:
return json_result
return not bool(json_result['status'])
def authenticate_source(
self,
local_path: str,
signer_id: str = None,
) -> typing.Tuple[bool, typing.Optional[str]]:
"""
Authenticates source by git path.
Returns authenticate result and source commit hash.
"""
is_authenticated = False
commit_cas_hash = None
self.ensure_login()
try:
result_json = self.authenticate(
local_path,
return_json=True,
signer_id=signer_id
)
is_authenticated = result_json['verified']
commit_cas_hash = result_json['hash']
# we can fall with ProcessExecutionError,
# because source can be not notarized
except ProcessExecutionError:
self._logger.exception('Cannot authenticate: %s', local_path)
return is_authenticated, commit_cas_hash
def authenticate_artifact(
self,
local_path: str,
use_hash: bool = False,
signer_id: str = None,
) -> bool:
"""
Authenticates artifact by artifact path or hash if `use_hash` is True.
Returns authenticate result.
"""
is_authenticated = False
self.ensure_login()
try:
is_authenticated = self.authenticate(
local_path,
use_hash=use_hash,
return_json=True,
signer_id=signer_id
)['verified']
# we can fall with ProcessExecutionError,
# because artifact can be not notarized
except ProcessExecutionError:
self._logger.exception('Cannot authenticate: %s', local_path)
return is_authenticated
def notarize_artifacts(
self,
artifact_paths: typing.List[str],
metadata: typing.Dict[str, typing.Any],
) -> typing.Tuple[bool, typing.Dict[str, str]]:
"""
Notarize artifacts by their paths.
Returns `True` if all artifacts was succesful notarizated
and dict with CAS hashes.
"""
all_artifacts_is_notarized = True
notarized_artifacts = {}
self.ensure_login()
# ALBS-576: We stopped doing this process in parallel due to the
# problems experienced and described in this CAS issue:
# https://github.com/codenotary/cas/issues/275
# Hence, we decided to go sequential here until the problem is
# resolved in CAS itself.
for artifact_path in artifact_paths:
try:
cas_artifact_hash = self.notarize(artifact_path, metadata)
except Exception:
self._logger.exception('Cannot notarize artifact: %s',
artifact_path)
all_artifacts_is_notarized = False
continue
notarized_artifacts[artifact_path] = cas_artifact_hash
return all_artifacts_is_notarized, notarized_artifacts

View File

@ -2,7 +2,7 @@ from setuptools import setup
setup( setup(
name="cas_wrapper", name="cas_wrapper",
version="0.0.1", version="0.0.6",
author="Stepan Oksanichenko", author="Stepan Oksanichenko",
author_email="soksanichenko@almalinux.org", author_email="soksanichenko@almalinux.org",
description="The python wrapper around binary cas from " description="The python wrapper around binary cas from "