diff --git a/immudb_wrapper.py b/immudb_wrapper.py new file mode 100644 index 0000000..37a8ad7 --- /dev/null +++ b/immudb_wrapper.py @@ -0,0 +1,339 @@ +import hashlib +import json +import logging +import os +import re +from dataclasses import asdict +from pathlib import Path +from traceback import format_exc +from typing import IO, Any, Dict, Optional, Union +from urllib.parse import urlparse + +from git import Repo +from grpc import RpcError +from immudb import ImmudbClient +from immudb.datatypes import SafeGetResponse, SetResponse +from immudb.rootService import RootService + +Dict = Dict[str, Any] + + +class ImmudbWrapper(ImmudbClient): + def __init__( + self, + username: str = 'immudb', + password: str = 'immudb', + database: str = 'defaultdb', + immudb_address: Optional[str] = 'localhost:3322', + root_service: Optional[RootService] = None, + public_key_file: Optional[str] = None, + timeout: Optional[int] = None, + max_grpc_message_length: Optional[int] = None, + logger: Optional[logging.Logger] = None, + ): + """ + The wrapper around binary `immuclient` from Codenotary. + + Args: + username (str): Immudb username to log in (default: "immudb"). + password (str): Immudb password to log in (default: "immudb"). + database (str): Immudb database to be used (default: "defaultdb"). + immudb_address (str, optional): url in format ``host:port`` + (e.g. ``localhost:3322``) of your immudb instance. + Defaults to ``localhost:3322`` when no value is set. + root_service (RootService, optional): object that implements + RootService, allowing requests to be verified. Optional. + By default in-memory RootService instance will be created + public_key_file (str, optional): path of the public key to use + for authenticating requests. Optional. + timeout (int, optional): global timeout for GRPC requests. Requests + will hang until the server responds if no timeout is set. + max_grpc_message_length (int, optional): maximum size of message + the server should send. The default (4Mb) is used if no + value is set. + logger (logging.Logger, optional): Logger to be used + """ + self.username = username + self.password = password + self.database = database + if not logger: + self._logger = logging.getLogger() + super().__init__( + immudUrl=immudb_address, + rs=root_service, + publicKeyFile=public_key_file, + timeout=timeout, + max_grpc_message_length=max_grpc_message_length, + ) + self.login( + username=self.username, + password=self.password, + ) + self.useDatabase(self.encode(self.database)) + + def encode( + self, + value: Union[str, bytes, dict], + ) -> bytes: + if isinstance(value, str): + result = value.encode() + elif isinstance(value, bytes): + result = value + elif isinstance(value, dict): + result = json.dumps(value).encode() + else: + raise ValueError( + "Cannot encode value that isn't str, bytes or dict." + ) + return result + + def to_dict( + self, + response: SafeGetResponse, + ) -> Dict: + result = asdict(response) + result['key'] = result['key'].decode() + result['value'] = json.loads(result['value'].decode()) + return result + + def get_size_format( + self, + value: int, + factor: int = 1024, + suffix: str = "B", + ) -> str: + """ + Scale bytes to its proper byte format + e.g: + 1253656 => '1.20 MB' + 1253656678 => '1.17 GB' + """ + for unit in ["", "K", "M", "G", "T", "P", "E", "Z"]: + if value < factor: + return f"{value:.2f} {unit}{suffix}" + value /= factor + return f"{value:.2f} Y{suffix}" + + def get_directory_size(self, path: Union[str, os.PathLike]) -> int: + return sum(file.stat().st_size for file in Path(path).rglob('*')) + + def get_file_size(self, file_path: Union[str, os.PathLike]) -> int: + return Path(file_path).stat().st_size + + def get_hasher(self, checksum_type: str = 'sha256'): + """ + Returns a corresponding hashlib hashing function for the specified + checksum type. + + Parameters + ---------- + checksum_type : str + Checksum type (e.g. sha1, sha256). + + Returns + ------- + hashlib._Hash + Hashlib hashing function. + """ + return hashlib.new(checksum_type) + + def hash_file( + self, + file_path: Union[str, IO], + hash_type: str = 'sha256', + buff_size: int = 1048576, + hasher=None, + ) -> str: + """ + Returns checksum (hexadecimal digest) of the file. + + Parameters + ---------- + file_path : str or file-like + File to hash. It could be either a path or a file descriptor. + hash_type : str + Hash type (e.g. sha1, sha256). + buff_size : int + Number of bytes to read at once. + hasher : hashlib._Hash + Any hash algorithm from hashlib. + + Returns + ------- + str + Checksum (hexadecimal digest) of the file. + """ + if hasher is None: + hasher = self.get_hasher(hash_type) + + def feed_hasher(_fd): + buff = _fd.read(buff_size) + while len(buff): + if not isinstance(buff, bytes): + buff = buff.encode('utf') + hasher.update(buff) + buff = _fd.read(buff_size) + + if isinstance(file_path, str): + with open(file_path, "rb") as fd: + feed_hasher(fd) + else: + file_path.seek(0) + feed_hasher(file_path) + return hasher.hexdigest() + + def hash_content( + self, + content: Union[str, bytes], + ) -> str: + hasher = self.get_hasher() + if isinstance(content, str): + content = content.encode() + hasher.update(content) + return hasher.hexdigest() + + @staticmethod + def extract_git_metadata( + repo_path: Union[str, os.PathLike], + ) -> Dict: + with Repo(repo_path) as repo: + url = urlparse(repo.remote().url) + commit = repo.commit() + name = ( + f'git@{url.netloc}' + f'{re.sub(r"^/", ":", url.path)}' + f'@{commit.hexsha[:7]}' + ) + return { + 'Name': name, + 'git': { + 'Author': { + 'Email': commit.author.email, + 'Name': commit.author.name, + 'When': commit.authored_datetime.strftime( + '%Y-%m-%dT%H:%M:%S%z', + ), + }, + 'Commit': commit.hexsha, + 'Committer': { + 'Email': commit.committer.email, + 'Name': commit.committer.name, + 'When': commit.committed_datetime.strftime( + '%Y-%m-%dT%H:%M:%S%z', + ), + }, + 'Message': commit.message, + 'PGPSignature': commit.gpgsig, + 'Parents': [ + parent.hexsha for parent in commit.iter_parents() + ], + 'Tree': commit.tree.hexsha, + }, + } + + @property + def default_metadata(self) -> Dict: + return { + 'sbom_api_ver': '0.2', + } + + def verified_get( + self, + key: Union[str, bytes], + revision: Optional[int] = None, + ) -> Dict: + try: + return self.to_dict( + self.verifiedGet( + key=self.encode(key), + atRevision=revision, + ), + ) + except RpcError: + return {'error': format_exc()} + + def verified_set( + self, + key: Union[str, bytes], + value: Union[str, bytes, Dict], + ) -> Dict: + try: + return asdict( + self.verifiedSet( + key=self.encode(key), + value=self.encode(value), + ), + ) + except RpcError: + return {'error': format_exc()} + + def notarize( + self, + key: str, + value: Union[str, bytes, Dict], + ) -> Dict: + return self.verified_set(key, value) + + def notarize_file( + self, + file: str, + user_metadata: Dict, + ) -> Dict: + hash_file = self.hash_file(file) + payload = { + 'Name': Path(file).name, + 'Kind': 'file', + 'Size': self.get_size_format(self.get_file_size(file)), + 'Hash': hash_file, + 'Metadata': { + **self.default_metadata, + **user_metadata, + }, + } + return self.notarize( + key=hash_file, + value=payload, + ) + + def notarize_git_repo( + self, + repo_path: Union[str, os.PathLike], + user_metadata: Dict, + ) -> Dict: + git_metadata = self.extract_git_metadata(repo_path) + metadata_hash = self.hash_content(json.dumps(git_metadata['git'])) + payload = { + 'Name': git_metadata['Name'], + 'Kind': 'git', + 'Size': self.get_size_format(self.get_directory_size(repo_path)), + 'Hash': metadata_hash, + 'Metadata': { + 'git': git_metadata['git'], + **self.default_metadata, + **user_metadata, + }, + } + return self.notarize( + key=metadata_hash, + value=payload, + ) + + def authenticate( + self, + key: Union[str, bytes], + ) -> Dict: + return self.verified_get(key) + + def authenticate_file(self, file: str) -> Dict: + return self.authenticate(self.hash_file(file)) + + def authenticate_git_repo( + self, + repo_path: Union[str, os.PathLike], + ) -> Dict: + metadata_hash = self.hash_content( + json.dumps( + self.extract_git_metadata(repo_path)['git'], + ), + ) + return self.authenticate(metadata_hash) diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..26029d9 --- /dev/null +++ b/setup.py @@ -0,0 +1,26 @@ +from setuptools import setup + +setup( + name='immudb_wrapper', + version='0.1.0', + author='Daniil Anfimov', + author_email='anfimovdan@gmail.com', + description='The wrapper around binary `immudbclient` from Codenotary.', + url='https://git.almalinux.org/almalinux/immudb_wrapper', + project_urls={ + 'Bug Tracker': 'https://git.almalinux.org/almalinux/immudb_wrapper/issues', + }, + classifiers=[ + 'Programming Language :: Python :: 3', + 'License :: OSI Approved :: ' + 'GNU General Public License v3 or later (GPLv3+)', + 'Operating System :: OS Independent', + ], + py_modules=['immudb_wrapper'], + scripts=['immudb_wrapper.py'], + install_requires=[ + 'GitPython>=3.1.20', + 'immudb-py>=1.4.0' + ], + python_requires='>=3.6', +)