ALBS-1122: Replace notarization interface from CAS to VCN #7
@ -1,3 +1,3 @@
|
|||||||
# cas_wrapper
|
# cas_wrapper
|
||||||
|
|
||||||
The python wrapper around binary `cas` from project Codenotary Community Attestation Service.
|
The python wrapper around binary `vcn` from project Codenotary Community Attestation Service.
|
||||||
|
203
cas_wrapper.py
203
cas_wrapper.py
@ -1,88 +1,119 @@
|
|||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
import typing
|
import typing
|
||||||
|
from functools import wraps
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
from plumbum import local, ProcessExecutionError
|
from plumbum import ProcessExecutionError, local
|
||||||
|
|
||||||
|
DEFAULT_BINARY_NAME = "vcn"
|
||||||
|
DEFAULT_BINARY_PATH = "/usr/local/bin/"
|
||||||
|
|
||||||
|
|
||||||
|
def with_env_context(func):
|
||||||
|
@wraps(func)
|
||||||
|
def wrapper(self, *args, **kwargs):
|
||||||
|
if not isinstance(self, CasWrapper):
|
||||||
|
raise TypeError(
|
||||||
|
'Cannot use "with_env_context" decorator outside of CasWrapper instance'
|
||||||
|
)
|
||||||
|
with local.env(
|
||||||
|
VCN_LC_HOST=self._vcn_lc_host,
|
||||||
|
VCN_LC_API_KEY=self._vcn_lc_api_key,
|
||||||
|
VCN_LC_PORT=self._vcn_lc_port,
|
||||||
|
):
|
||||||
|
return func(self, *args, **kwargs)
|
||||||
|
|
||||||
|
return wrapper
|
||||||
danfimov marked this conversation as resolved
Outdated
|
|||||||
|
|
||||||
|
|
||||||
class CasWrapper:
|
class CasWrapper:
|
||||||
"""
|
"""
|
||||||
The python wrapper around binary `cas`
|
The python wrapper around binary `vcn`
|
||||||
from Codenotary Community Attestation Service
|
from Codenotary Community Attestation Service
|
||||||
"""
|
"""
|
||||||
|
|
||||||
binary_name = 'cas'
|
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def _is_binary_present(cls):
|
def _is_binary_present(
|
||||||
if cls.binary_name not in local:
|
cls,
|
||||||
|
binary_name: str,
|
||||||
|
binary_path: str,
|
||||||
|
):
|
||||||
|
if not Path(binary_path, binary_name).exists():
|
||||||
raise FileNotFoundError(
|
raise FileNotFoundError(
|
||||||
'Binary CAS is not found in PATH on the machine',
|
f"Binary VCN is not found in {binary_path} on the machine",
|
||||||
)
|
)
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
cas_api_key: str,
|
vcn_lc_api_key: str,
|
||||||
cas_signer_id: str,
|
vcn_lc_host: str = "eval-honeywell.codenotary.com",
|
||||||
logger: logging.Logger = None,
|
vcn_lc_port: int = 443,
|
||||||
|
logger: logging.Logger = None,
|
||||||
|
binary_name: str = DEFAULT_BINARY_NAME,
|
||||||
|
binary_path: str = DEFAULT_BINARY_PATH,
|
||||||
):
|
):
|
||||||
self._is_binary_present()
|
self._is_binary_present(binary_name, binary_path)
|
||||||
self._cas_api_key = cas_api_key
|
self._vcn_lc_api_key = vcn_lc_api_key
|
||||||
self._cas_signer_id = cas_signer_id
|
self._vcn_lc_host = vcn_lc_host
|
||||||
self._cas = local['cas']
|
self._vcn_lc_port = vcn_lc_port
|
||||||
|
self._binary_name = binary_name
|
||||||
danfimov marked this conversation as resolved
jhernandez
commented
This breaks https://github.com/AlmaLinux/alma-sbom/blob/main/libsbom/cyclonedx.py#L30
danfimov
commented
yes, but we need to make changes in yes, but we need to make changes in `alma-sbom` repository anyway, because `CasWrapper` expect different values for initialization
also maybe there is a point, to left this method as `classmethod`
jhernandez
commented
Right, we need to update Right, we need to update `alma-sbom` too. And yes, I'd imagine that we might keep this as a `classmethod`.
|
|||||||
|
self._binary_path = binary_path
|
||||||
|
self._full_binary_path = Path(self._binary_path, self._binary_name)
|
||||||
|
self._vcn = local[str(self._full_binary_path)]
|
||||||
self._logger = logger
|
self._logger = logger
|
||||||
if self._logger is None:
|
if self._logger is None:
|
||||||
self._logger = logging.getLogger()
|
self._logger = logging.getLogger()
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def get_version(cls):
|
def get_version(
|
||||||
cls._is_binary_present()
|
cls,
|
||||||
command = local['cas']['--version']
|
binary_name: str = DEFAULT_BINARY_NAME,
|
||||||
version = command().split()[-1].split('v')[1]
|
binary_path: str = DEFAULT_BINARY_PATH,
|
||||||
|
):
|
||||||
|
cls._is_binary_present(binary_name, binary_path)
|
||||||
|
full_path = Path(binary_path, binary_name)
|
||||||
|
command = local[str(full_path)]["--version"]
|
||||||
|
version = command().split()[-1].split("v")[1]
|
||||||
return version
|
return version
|
||||||
|
|
||||||
|
@with_env_context
|
||||||
def ensure_login(self):
|
def ensure_login(self):
|
||||||
with local.env(
|
self._vcn["login"]()
|
||||||
CAS_API_KEY=self._cas_api_key,
|
|
||||||
SIGNER_ID=self._cas_signer_id,
|
|
||||||
):
|
|
||||||
self._cas['login']()
|
|
||||||
|
|
||||||
|
@with_env_context
|
||||||
def notarize(
|
def notarize(
|
||||||
self,
|
self,
|
||||||
local_path: str,
|
local_path: str,
|
||||||
metadata: typing.Dict = None,
|
metadata: typing.Dict = None,
|
||||||
) -> str:
|
) -> str:
|
||||||
"""
|
"""
|
||||||
Wrapper around `cas notarize`
|
Wrapper around `vcn notarize`
|
||||||
:param local_path: path to a local Git repo
|
:param local_path: path to a local Git repo
|
||||||
:param metadata: additional metadata
|
:param metadata: additional metadata
|
||||||
:return: hash of notarized commit
|
:return: hash of notarized commit
|
||||||
:rtype: str
|
:rtype: str
|
||||||
"""
|
"""
|
||||||
command = self._cas[
|
command = self._vcn[
|
||||||
'notarize',
|
"notarize",
|
||||||
local_path,
|
local_path,
|
||||||
'-o',
|
"-o",
|
||||||
'json',
|
"json",
|
||||||
]
|
]
|
||||||
if metadata is not None:
|
if metadata is not None:
|
||||||
for key, value in metadata.items():
|
for key, value in metadata.items():
|
||||||
command = command[
|
command = command[
|
||||||
'-a',
|
"-a",
|
||||||
f'{key}={value}',
|
f"{key}={value}",
|
||||||
]
|
]
|
||||||
with local.env(
|
result_of_execution = command()
|
||||||
CAS_API_KEY=self._cas_api_key,
|
result_of_execution, *_ = json.loads(result_of_execution)
|
||||||
SIGNER_ID=self._cas_signer_id
|
return result_of_execution["hash"]
|
||||||
):
|
|
||||||
result_of_execution = command()
|
|
||||||
return json.loads(result_of_execution)['hash']
|
|
||||||
|
|
||||||
def notarize_no_exc(
|
def notarize_no_exc(
|
||||||
self,
|
self,
|
||||||
local_path: str,
|
local_path: str,
|
||||||
metadata: typing.Dict = None,
|
metadata: typing.Dict = None,
|
||||||
) -> typing.Tuple[bool, str]:
|
) -> typing.Tuple[bool, str]:
|
||||||
"""
|
"""
|
||||||
Wrapper for avoiding raising exceptions during notarization.
|
Wrapper for avoiding raising exceptions during notarization.
|
||||||
@ -95,23 +126,23 @@ class CasWrapper:
|
|||||||
"""
|
"""
|
||||||
success = False
|
success = False
|
||||||
try:
|
try:
|
||||||
cas_hash = self.notarize(local_path, metadata=metadata)
|
vcn_hash = self.notarize(local_path, metadata=metadata)
|
||||||
success = True
|
success = True
|
||||||
except Exception:
|
except Exception:
|
||||||
self._logger.exception('Cannot notarize artifact: %s',
|
self._logger.exception("Cannot notarize artifact: %s", local_path)
|
||||||
local_path)
|
vcn_hash = ""
|
||||||
cas_hash = ''
|
return success, vcn_hash
|
||||||
return success, cas_hash
|
|
||||||
|
|
||||||
|
@with_env_context
|
||||||
def authenticate(
|
def authenticate(
|
||||||
self,
|
self,
|
||||||
local_path: str,
|
local_path: str,
|
||||||
return_json: bool = False,
|
return_json: bool = False,
|
||||||
use_hash: bool = False,
|
use_hash: bool = False,
|
||||||
signer_id: str = None,
|
signer_id: str = "",
|
||||||
):
|
) -> typing.Union[bool, dict]:
|
||||||
"""
|
"""
|
||||||
Wrapper around `cas authenticate`
|
Wrapper around `vcn authenticate`
|
||||||
:param local_path: path to a local Git repo
|
:param local_path: path to a local Git repo
|
||||||
(should be started from `git://`)
|
(should be started from `git://`)
|
||||||
or to a single local file or hash
|
or to a single local file or hash
|
||||||
@ -121,62 +152,54 @@ class CasWrapper:
|
|||||||
or dict with result if return_json param is True
|
or dict with result if return_json param is True
|
||||||
:rtype: bool or dict
|
:rtype: bool or dict
|
||||||
"""
|
"""
|
||||||
command_args = ['authenticate', local_path]
|
command_args = ["authenticate", local_path]
|
||||||
if use_hash:
|
if use_hash:
|
||||||
command_args = ['authenticate', '--hash', local_path]
|
command_args = ["authenticate", "--hash", local_path]
|
||||||
if signer_id:
|
if signer_id:
|
||||||
command_args.extend(('--signerID', signer_id))
|
command_args.extend(("--signerID", signer_id))
|
||||||
command_args.extend(('-o', 'json'))
|
command_args.extend(("-o", "json"))
|
||||||
command = self._cas[command_args]
|
command = self._vcn[command_args]
|
||||||
try:
|
try:
|
||||||
with local.env(
|
result_of_execution = command()
|
||||||
CAS_API_KEY=self._cas_api_key,
|
|
||||||
SIGNER_ID=self._cas_signer_id
|
|
||||||
):
|
|
||||||
result_of_execution = command()
|
|
||||||
except ProcessExecutionError:
|
except ProcessExecutionError:
|
||||||
with local.env(
|
# in case if commit is untrusted
|
||||||
CAS_API_KEY=self._cas_api_key,
|
result_of_execution = command(retcode=1)
|
||||||
SIGNER_ID=self._cas_signer_id
|
|
||||||
):
|
|
||||||
# in case if commit is untrusted
|
|
||||||
result_of_execution = command(retcode=1)
|
|
||||||
json_result = json.loads(result_of_execution)
|
json_result = json.loads(result_of_execution)
|
||||||
if return_json:
|
if return_json:
|
||||||
return json_result
|
return json_result
|
||||||
return not bool(json_result['status'])
|
return not bool(json_result["status"])
|
||||||
|
|
||||||
def authenticate_source(
|
def authenticate_source(
|
||||||
self,
|
self,
|
||||||
local_path: str,
|
local_path: str,
|
||||||
signer_id: str = None,
|
signer_id: str = "",
|
||||||
) -> typing.Tuple[bool, typing.Optional[str]]:
|
) -> typing.Tuple[bool, typing.Optional[str]]:
|
||||||
"""
|
"""
|
||||||
Authenticates source by git path.
|
Authenticates source by git path.
|
||||||
Returns authenticate result and source commit hash.
|
Returns authenticate result and source commit hash.
|
||||||
"""
|
"""
|
||||||
is_authenticated = False
|
is_authenticated = False
|
||||||
commit_cas_hash = None
|
commit_vcn_hash = None
|
||||||
self.ensure_login()
|
self.ensure_login()
|
||||||
try:
|
try:
|
||||||
result_json = self.authenticate(
|
result_json = self.authenticate(
|
||||||
local_path,
|
local_path,
|
||||||
return_json=True,
|
return_json=True,
|
||||||
signer_id=signer_id
|
signer_id=signer_id,
|
||||||
)
|
)
|
||||||
is_authenticated = result_json['verified']
|
is_authenticated = result_json["verified"]
|
||||||
commit_cas_hash = result_json['hash']
|
commit_vcn_hash = result_json["hash"]
|
||||||
# we can fall with ProcessExecutionError,
|
# we can fall with ProcessExecutionError,
|
||||||
# because source can be not notarized
|
# because source can be not notarized
|
||||||
except ProcessExecutionError:
|
except ProcessExecutionError:
|
||||||
self._logger.exception('Cannot authenticate: %s', local_path)
|
self._logger.exception("Cannot authenticate: %s", local_path)
|
||||||
return is_authenticated, commit_cas_hash
|
return is_authenticated, commit_vcn_hash
|
||||||
|
|
||||||
def authenticate_artifact(
|
def authenticate_artifact(
|
||||||
self,
|
self,
|
||||||
local_path: str,
|
local_path: str,
|
||||||
use_hash: bool = False,
|
use_hash: bool = False,
|
||||||
signer_id: str = None,
|
signer_id: str = "",
|
||||||
) -> bool:
|
) -> bool:
|
||||||
"""
|
"""
|
||||||
Authenticates artifact by artifact path or hash if `use_hash` is True.
|
Authenticates artifact by artifact path or hash if `use_hash` is True.
|
||||||
@ -189,12 +212,12 @@ class CasWrapper:
|
|||||||
local_path,
|
local_path,
|
||||||
use_hash=use_hash,
|
use_hash=use_hash,
|
||||||
return_json=True,
|
return_json=True,
|
||||||
signer_id=signer_id
|
signer_id=signer_id,
|
||||||
)['verified']
|
)["verified"]
|
||||||
# we can fall with ProcessExecutionError,
|
# we can fall with ProcessExecutionError,
|
||||||
# because artifact can be not notarized
|
# because artifact can be not notarized
|
||||||
except ProcessExecutionError:
|
except ProcessExecutionError:
|
||||||
self._logger.exception('Cannot authenticate: %s', local_path)
|
self._logger.exception("Cannot authenticate: %s", local_path)
|
||||||
return is_authenticated
|
return is_authenticated
|
||||||
|
|
||||||
def notarize_artifacts(
|
def notarize_artifacts(
|
||||||
@ -204,8 +227,8 @@ class CasWrapper:
|
|||||||
) -> typing.Tuple[bool, typing.Dict[str, str]]:
|
) -> typing.Tuple[bool, typing.Dict[str, str]]:
|
||||||
"""
|
"""
|
||||||
Notarize artifacts by their paths.
|
Notarize artifacts by their paths.
|
||||||
Returns `True` if all artifacts was succesful notarizated
|
Returns `True` if all artifacts was successful notarizated
|
||||||
and dict with CAS hashes.
|
and dict with VCN hashes.
|
||||||
"""
|
"""
|
||||||
all_artifacts_is_notarized = True
|
all_artifacts_is_notarized = True
|
||||||
notarized_artifacts = {}
|
notarized_artifacts = {}
|
||||||
@ -218,11 +241,13 @@ class CasWrapper:
|
|||||||
# resolved in CAS itself.
|
# resolved in CAS itself.
|
||||||
for artifact_path in artifact_paths:
|
for artifact_path in artifact_paths:
|
||||||
try:
|
try:
|
||||||
cas_artifact_hash = self.notarize(artifact_path, metadata)
|
vcn_artifact_hash = self.notarize(artifact_path, metadata)
|
||||||
except Exception:
|
except Exception:
|
||||||
self._logger.exception('Cannot notarize artifact: %s',
|
self._logger.exception(
|
||||||
artifact_path)
|
"Cannot notarize artifact: %s",
|
||||||
|
artifact_path,
|
||||||
|
)
|
||||||
all_artifacts_is_notarized = False
|
all_artifacts_is_notarized = False
|
||||||
continue
|
continue
|
||||||
notarized_artifacts[artifact_path] = cas_artifact_hash
|
notarized_artifacts[artifact_path] = vcn_artifact_hash
|
||||||
return all_artifacts_is_notarized, notarized_artifacts
|
return all_artifacts_is_notarized, notarized_artifacts
|
||||||
|
4
setup.py
4
setup.py
@ -2,10 +2,10 @@ from setuptools import setup
|
|||||||
|
|
||||||
setup(
|
setup(
|
||||||
name="cas_wrapper",
|
name="cas_wrapper",
|
||||||
version="0.0.6",
|
version="0.0.7",
|
||||||
author="Stepan Oksanichenko",
|
author="Stepan Oksanichenko",
|
||||||
author_email="soksanichenko@almalinux.org",
|
author_email="soksanichenko@almalinux.org",
|
||||||
description="The python wrapper around binary cas from "
|
description="The python wrapper around binary vcn from "
|
||||||
"project Codenotary Community Attestation Service.",
|
"project Codenotary Community Attestation Service.",
|
||||||
url="https://git.almalinux.org/almalinux/cas_wrapper",
|
url="https://git.almalinux.org/almalinux/cas_wrapper",
|
||||||
project_urls={
|
project_urls={
|
||||||
|
Loading…
Reference in New Issue
Block a user
Shall we rename it to VCNWrapper? This would also involve updating the project name, git repo name, etc. Not sure, what do you think? 🤔
I haven't renamed to
VCNWrapper
because of these things, probably we can to rename it in more abstract like style, i.e.CodeNotaryWrapper
or something like thisYeah, let's just keep it as it is now. We can do the move later.