Initial immudb_wrapper implementation

This commit is contained in:
Daniil Anfimov 2023-07-22 12:55:18 +02:00
parent c093e0f468
commit f1940f4725
2 changed files with 365 additions and 0 deletions

339
immudb_wrapper.py Normal file
View File

@ -0,0 +1,339 @@
import hashlib
import json
import logging
import os
import re
from dataclasses import asdict
from pathlib import Path
from traceback import format_exc
from typing import IO, Any, Dict, Optional, Union
from urllib.parse import urlparse
from git import Repo
from grpc import RpcError
from immudb import ImmudbClient
from immudb.datatypes import SafeGetResponse, SetResponse
from immudb.rootService import RootService
Dict = Dict[str, Any]
class ImmudbWrapper(ImmudbClient):
def __init__(
self,
username: str = 'immudb',
password: str = 'immudb',
database: str = 'defaultdb',
immudb_address: Optional[str] = 'localhost:3322',
root_service: Optional[RootService] = None,
public_key_file: Optional[str] = None,
timeout: Optional[int] = None,
max_grpc_message_length: Optional[int] = None,
logger: Optional[logging.Logger] = None,
):
"""
The wrapper around binary `immuclient` from Codenotary.
Args:
username (str): Immudb username to log in (default: "immudb").
password (str): Immudb password to log in (default: "immudb").
database (str): Immudb database to be used (default: "defaultdb").
immudb_address (str, optional): url in format ``host:port``
(e.g. ``localhost:3322``) of your immudb instance.
Defaults to ``localhost:3322`` when no value is set.
root_service (RootService, optional): object that implements
RootService, allowing requests to be verified. Optional.
By default in-memory RootService instance will be created
public_key_file (str, optional): path of the public key to use
for authenticating requests. Optional.
timeout (int, optional): global timeout for GRPC requests. Requests
will hang until the server responds if no timeout is set.
max_grpc_message_length (int, optional): maximum size of message
the server should send. The default (4Mb) is used if no
value is set.
logger (logging.Logger, optional): Logger to be used
"""
self.username = username
self.password = password
self.database = database
if not logger:
self._logger = logging.getLogger()
super().__init__(
immudUrl=immudb_address,
rs=root_service,
publicKeyFile=public_key_file,
timeout=timeout,
max_grpc_message_length=max_grpc_message_length,
)
self.login(
username=self.username,
password=self.password,
)
self.useDatabase(self.encode(self.database))
def encode(
self,
value: Union[str, bytes, dict],
) -> bytes:
if isinstance(value, str):
result = value.encode()
elif isinstance(value, bytes):
result = value
elif isinstance(value, dict):
result = json.dumps(value).encode()
else:
raise ValueError(
"Cannot encode value that isn't str, bytes or dict."
)
return result
def to_dict(
self,
response: SafeGetResponse,
) -> Dict:
result = asdict(response)
result['key'] = result['key'].decode()
result['value'] = json.loads(result['value'].decode())
return result
def get_size_format(
self,
value: int,
factor: int = 1024,
suffix: str = "B",
) -> str:
"""
Scale bytes to its proper byte format
e.g:
1253656 => '1.20 MB'
1253656678 => '1.17 GB'
"""
for unit in ["", "K", "M", "G", "T", "P", "E", "Z"]:
if value < factor:
return f"{value:.2f} {unit}{suffix}"
value /= factor
return f"{value:.2f} Y{suffix}"
def get_directory_size(self, path: Union[str, os.PathLike]) -> int:
return sum(file.stat().st_size for file in Path(path).rglob('*'))
def get_file_size(self, file_path: Union[str, os.PathLike]) -> int:
return Path(file_path).stat().st_size
def get_hasher(self, checksum_type: str = 'sha256'):
"""
Returns a corresponding hashlib hashing function for the specified
checksum type.
Parameters
----------
checksum_type : str
Checksum type (e.g. sha1, sha256).
Returns
-------
hashlib._Hash
Hashlib hashing function.
"""
return hashlib.new(checksum_type)
def hash_file(
self,
file_path: Union[str, IO],
hash_type: str = 'sha256',
buff_size: int = 1048576,
hasher=None,
) -> str:
"""
Returns checksum (hexadecimal digest) of the file.
Parameters
----------
file_path : str or file-like
File to hash. It could be either a path or a file descriptor.
hash_type : str
Hash type (e.g. sha1, sha256).
buff_size : int
Number of bytes to read at once.
hasher : hashlib._Hash
Any hash algorithm from hashlib.
Returns
-------
str
Checksum (hexadecimal digest) of the file.
"""
if hasher is None:
hasher = self.get_hasher(hash_type)
def feed_hasher(_fd):
buff = _fd.read(buff_size)
while len(buff):
if not isinstance(buff, bytes):
buff = buff.encode('utf')
hasher.update(buff)
buff = _fd.read(buff_size)
if isinstance(file_path, str):
with open(file_path, "rb") as fd:
feed_hasher(fd)
else:
file_path.seek(0)
feed_hasher(file_path)
return hasher.hexdigest()
def hash_content(
self,
content: Union[str, bytes],
) -> str:
hasher = self.get_hasher()
if isinstance(content, str):
content = content.encode()
hasher.update(content)
return hasher.hexdigest()
@staticmethod
def extract_git_metadata(
repo_path: Union[str, os.PathLike],
) -> Dict:
with Repo(repo_path) as repo:
url = urlparse(repo.remote().url)
commit = repo.commit()
name = (
f'git@{url.netloc}'
f'{re.sub(r"^/", ":", url.path)}'
f'@{commit.hexsha[:7]}'
)
return {
'Name': name,
'git': {
'Author': {
'Email': commit.author.email,
'Name': commit.author.name,
'When': commit.authored_datetime.strftime(
'%Y-%m-%dT%H:%M:%S%z',
),
},
'Commit': commit.hexsha,
'Committer': {
'Email': commit.committer.email,
'Name': commit.committer.name,
'When': commit.committed_datetime.strftime(
'%Y-%m-%dT%H:%M:%S%z',
),
},
'Message': commit.message,
'PGPSignature': commit.gpgsig,
'Parents': [
parent.hexsha for parent in commit.iter_parents()
],
'Tree': commit.tree.hexsha,
},
}
@property
def default_metadata(self) -> Dict:
return {
'sbom_api_ver': '0.2',
}
def verified_get(
self,
key: Union[str, bytes],
revision: Optional[int] = None,
) -> Dict:
try:
return self.to_dict(
self.verifiedGet(
key=self.encode(key),
atRevision=revision,
),
)
except RpcError:
return {'error': format_exc()}
def verified_set(
self,
key: Union[str, bytes],
value: Union[str, bytes, Dict],
) -> Dict:
try:
return asdict(
self.verifiedSet(
key=self.encode(key),
value=self.encode(value),
),
)
except RpcError:
return {'error': format_exc()}
def notarize(
self,
key: str,
value: Union[str, bytes, Dict],
) -> Dict:
return self.verified_set(key, value)
def notarize_file(
self,
file: str,
user_metadata: Dict,
) -> Dict:
hash_file = self.hash_file(file)
payload = {
'Name': Path(file).name,
'Kind': 'file',
'Size': self.get_size_format(self.get_file_size(file)),
'Hash': hash_file,
'Metadata': {
**self.default_metadata,
**user_metadata,
},
}
return self.notarize(
key=hash_file,
value=payload,
)
def notarize_git_repo(
self,
repo_path: Union[str, os.PathLike],
user_metadata: Dict,
) -> Dict:
git_metadata = self.extract_git_metadata(repo_path)
metadata_hash = self.hash_content(json.dumps(git_metadata['git']))
payload = {
'Name': git_metadata['Name'],
'Kind': 'git',
'Size': self.get_size_format(self.get_directory_size(repo_path)),
'Hash': metadata_hash,
'Metadata': {
'git': git_metadata['git'],
**self.default_metadata,
**user_metadata,
},
}
return self.notarize(
key=metadata_hash,
value=payload,
)
def authenticate(
self,
key: Union[str, bytes],
) -> Dict:
return self.verified_get(key)
def authenticate_file(self, file: str) -> Dict:
return self.authenticate(self.hash_file(file))
def authenticate_git_repo(
self,
repo_path: Union[str, os.PathLike],
) -> Dict:
metadata_hash = self.hash_content(
json.dumps(
self.extract_git_metadata(repo_path)['git'],
),
)
return self.authenticate(metadata_hash)

26
setup.py Normal file
View File

@ -0,0 +1,26 @@
from setuptools import setup
setup(
name='immudb_wrapper',
version='0.1.0',
author='Daniil Anfimov',
author_email='anfimovdan@gmail.com',
description='The wrapper around binary `immudbclient` from Codenotary.',
url='https://git.almalinux.org/almalinux/immudb_wrapper',
project_urls={
'Bug Tracker': 'https://git.almalinux.org/almalinux/immudb_wrapper/issues',
},
classifiers=[
'Programming Language :: Python :: 3',
'License :: OSI Approved :: '
'GNU General Public License v3 or later (GPLv3+)',
'Operating System :: OS Independent',
],
py_modules=['immudb_wrapper'],
scripts=['immudb_wrapper.py'],
install_requires=[
'GitPython>=3.1.20',
'immudb-py>=1.4.0'
],
python_requires='>=3.6',
)