- added alma_get_sources tool - added tests for common functions - added setup.py and the project uploaded to PyPimaster
parent
896ea1f1ba
commit
804cca076e
@ -0,0 +1 @@ |
||||
name = 'gitutils' |
@ -1,193 +1,156 @@ |
||||
#!/usr/bin/env python3 |
||||
|
||||
""" |
||||
Uploads sources and BLOBs to the AlmaLinux sources cache. |
||||
""" |
||||
|
||||
import argparse |
||||
import hashlib |
||||
import os.path |
||||
import sys |
||||
from typing import Iterator, List, TextIO, Tuple |
||||
|
||||
import boto3 # type: ignore |
||||
from botocore.exceptions import ClientError # type: ignore |
||||
|
||||
|
||||
def init_arg_parser() -> argparse.ArgumentParser: |
||||
""" |
||||
Initializes a command line arguments parser. |
||||
|
||||
Returns: |
||||
Command line arguments parser. |
||||
""" |
||||
arg_parser = argparse.ArgumentParser( |
||||
prog="alma_blob_upload", |
||||
description="Uploads sources and BLOBs to the AlmaLinux sources cache" |
||||
) |
||||
group = arg_parser.add_mutually_exclusive_group(required=True) |
||||
group.add_argument('-f', '--file', nargs='+', help='file(s) to upload') |
||||
group.add_argument('-i', '--input-metadata', metavar='INPUT_FILE', |
||||
help='input metadata file list to upload') |
||||
arg_parser.add_argument('-b', '--bucket', default="sources.almalinux.org", |
||||
help='Amazon S3 bucket name. Default is ' |
||||
'sources.almalinux.org') |
||||
arg_parser.add_argument('-o', '--output-metadata', metavar='OUTPUT_FILE', |
||||
help='output metadata file path') |
||||
arg_parser.add_argument('-p', '--private', action='store_true', |
||||
help='set uploaded file mode to private. All ' |
||||
'uploaded files are public by default') |
||||
arg_parser.add_argument('--domain-name', default='sources.almalinux.org', |
||||
help='AlmaLinux sources server domain name. ' |
||||
'Default is sources.almalinux.org') |
||||
arg_parser.add_argument('-v', '--verbose', action='store_true', |
||||
help='enable additional debug output') |
||||
return arg_parser |
||||
|
||||
|
||||
def get_file_checksum(file_path: str, checksum_type: str = 'sha1', |
||||
buff_size: int = 1048576) -> str: |
||||
""" |
||||
Calculates a file checksum. |
||||
|
||||
Args: |
||||
file_path: File path. |
||||
checksum_type: Checksum type. |
||||
buff_size: Number of bytes to read at once. |
||||
|
||||
Returns: |
||||
File checksum. |
||||
""" |
||||
hasher = hashlib.new(checksum_type) |
||||
with open(file_path, 'rb') as fd: |
||||
buff = fd.read(buff_size) |
||||
while len(buff): |
||||
hasher.update(buff) |
||||
buff = fd.read(buff_size) |
||||
return hasher.hexdigest() |
||||
|
||||
|
||||
def normalize_path(path: str) -> str: |
||||
""" |
||||
Returns an absolute path with all variables expanded. |
||||
|
||||
Args: |
||||
path: Path to normalize. |
||||
|
||||
Returns: |
||||
Normalized path. |
||||
""" |
||||
return os.path.abspath(os.path.expanduser(os.path.expandvars(path))) |
||||
|
||||
|
||||
def iter_metadata(metadata_path: str) -> Iterator[Tuple[str, str]]: |
||||
""" |
||||
Iterates over records in a CentOS git repository-compatible metadata file. |
||||
|
||||
Args: |
||||
metadata_path: Metadata file path. |
||||
|
||||
Returns: |
||||
Iterator over files and their checksums. |
||||
""" |
||||
with open(metadata_path, 'r') as fd: |
||||
for line in fd: |
||||
checksum, file_path = line.split() |
||||
file_path = normalize_path(file_path) |
||||
assert checksum == get_file_checksum(file_path) |
||||
yield file_path, checksum |
||||
|
||||
|
||||
def iter_files(files: List[str]) -> Iterator[Tuple[str, str]]: |
||||
""" |
||||
Iterates over a list of files and calculates checksums for them. |
||||
|
||||
Args: |
||||
files: List of files. |
||||
|
||||
Returns: |
||||
Iterator over files and their checksums. |
||||
""" |
||||
for file_path in files: |
||||
file_path = normalize_path(file_path) |
||||
checksum = get_file_checksum(file_path) |
||||
yield file_path, checksum |
||||
|
||||
|
||||
def is_file_exist(s3_client, bucket_name: str, checksum: str) -> bool: |
||||
""" |
||||
Checks is a file with a given checksum is already uploaded. |
||||
Args: |
||||
s3_client: Amazon S3 client. |
||||
bucket_name: S3 bucket name. |
||||
checksum: File checksum. |
||||
|
||||
Returns: |
||||
True if a file is already uploaded, False otherwise. |
||||
""" |
||||
try: |
||||
s3_client.head_object(Bucket=bucket_name, Key=checksum) |
||||
return True |
||||
except ClientError: |
||||
return False |
||||
|
||||
|
||||
def upload_file(s3_client, bucket_name: str, file_path: str, |
||||
checksum: str, private: bool): |
||||
""" |
||||
Uploads a file to an Amazon S3 bucket. |
||||
|
||||
Args: |
||||
s3_client: Amazon S3 client. |
||||
bucket_name: S3 bucket name. |
||||
file_path: File path. |
||||
checksum: File checksum. |
||||
private: False if file should be public, True otherwise. |
||||
""" |
||||
acl = 'bucket-owner-full-control' if private else 'public-read' |
||||
s3_client.upload_file(file_path, bucket_name, checksum, |
||||
ExtraArgs={'ACL': acl}) |
||||
|
||||
|
||||
def add_metadata_record(metadata_fd: TextIO, file_path: str, checksum: str): |
||||
""" |
||||
Adds a source file record to a metadata file. |
||||
|
||||
Args: |
||||
metadata_fd: Metadata file descriptor. |
||||
file_path: Source file path. |
||||
checksum: Source file checksum. |
||||
""" |
||||
rel_path = os.path.relpath(file_path) |
||||
metadata_fd.write(f'{checksum} {rel_path}\n') |
||||
|
||||
|
||||
def main(sys_args): |
||||
arg_parser = init_arg_parser() |
||||
args = arg_parser.parse_args(sys_args) |
||||
s3_client = boto3.client('s3') |
||||
if args.input_metadata: |
||||
iterator = iter_metadata(args.input_metadata) |
||||
else: |
||||
iterator = iter_files(args.file) |
||||
out_fd = None |
||||
if args.output_metadata: |
||||
out_fd = open(args.output_metadata, 'w') |
||||
try: |
||||
for file_path, checksum in iterator: |
||||
file_url = f'https://{args.domain_name}/{checksum}' |
||||
if not is_file_exist(s3_client, args.bucket, checksum): |
||||
upload_file(s3_client, args.bucket, file_path, checksum, |
||||
args.private) |
||||
print(f'{file_path} uploaded: {file_url}') |
||||
else: |
||||
print(f'{file_path} exists: {file_url}') |
||||
if out_fd: |
||||
add_metadata_record(out_fd, file_path, checksum) |
||||
finally: |
||||
if out_fd: |
||||
out_fd.close() |
||||
|
||||
|
||||
if __name__ == '__main__': |
||||
sys.exit(main(sys.argv[1:])) |
||||
"""Uploads sources and BLOBs to the AlmaLinux sources cache""" |
||||
|
||||
import argparse |
||||
import logging |
||||
import os |
||||
import sys |
||||
from typing import Iterator, List, Optional, Tuple |
||||
|
||||
import boto3 |
||||
import botocore.exceptions |
||||
|
||||
from almalinux.gitutils.errors import ChecksumError |
||||
from almalinux.gitutils.common import ( |
||||
configure_logger, find_metadata_file, get_file_checksum, iter_metadata, |
||||
normalize_path |
||||
) |
||||
|
||||
|
||||
def init_arg_parser() -> argparse.ArgumentParser: |
||||
""" |
||||
Initializes a command line arguments parser. |
||||
|
||||
Returns: |
||||
Command line arguments parser. |
||||
""" |
||||
arg_parser = argparse.ArgumentParser( |
||||
prog="alma_blob_upload", |
||||
description="Uploads sources and BLOBs to the AlmaLinux sources cache" |
||||
) |
||||
group = arg_parser.add_mutually_exclusive_group() |
||||
group.add_argument('-f', '--file', nargs='+', help='file(s) to upload') |
||||
group.add_argument('-i', '--input-metadata', metavar='INPUT_FILE', |
||||
help='input metadata file list to upload. Will be ' |
||||
'detected automatically if omitted and no files ' |
||||
'provided') |
||||
arg_parser.add_argument('-b', '--bucket', default="sources.almalinux.org", |
||||
help='Amazon S3 bucket name. Default is ' |
||||
'sources.almalinux.org') |
||||
arg_parser.add_argument('-o', '--output-metadata', metavar='OUTPUT_FILE', |
||||
help='output metadata file path') |
||||
arg_parser.add_argument('-p', '--private', action='store_true', |
||||
help='set uploaded file mode to private. All ' |
||||
'uploaded files are public by default') |
||||
arg_parser.add_argument('--domain-name', default='sources.almalinux.org', |
||||
help='AlmaLinux sources server domain name. ' |
||||
'Default is sources.almalinux.org') |
||||
arg_parser.add_argument('-v', '--verbose', action='store_true', |
||||
help='enable additional debug output') |
||||
return arg_parser |
||||
|
||||
|
||||
def iter_files(files: List[str]) -> Iterator[Tuple[str, str, str]]: |
||||
""" |
||||
Iterates over a list of files and calculates checksums for them. |
||||
|
||||
Args: |
||||
files: List of files. |
||||
|
||||
Returns: |
||||
Iterator over files and their checksums. |
||||
""" |
||||
checksum_type = 'sha1' |
||||
for rel_path in files: |
||||
file_path = normalize_path(rel_path) |
||||
checksum = get_file_checksum(file_path, checksum_type) |
||||
yield rel_path, checksum, checksum_type |
||||
|
||||
|
||||
def is_file_exist(s3_client, bucket_name: str, checksum: str) -> bool: |
||||
""" |
||||
Checks is a file with a given checksum is already uploaded. |
||||
Args: |
||||
s3_client: Amazon S3 client. |
||||
bucket_name: S3 bucket name. |
||||
checksum: File checksum. |
||||
|
||||
Returns: |
||||
True if a file is already uploaded, False otherwise. |
||||
""" |
||||
try: |
||||
s3_client.head_object(Bucket=bucket_name, Key=checksum) |
||||
return True |
||||
except botocore.exceptions.ClientError: |
||||
return False |
||||
|
||||
|
||||
def upload_file(s3_client, bucket_name: str, file_path: str, |
||||
checksum: str, private: bool): |
||||
""" |
||||
Uploads a file to an Amazon S3 bucket. |
||||
|
||||
Args: |
||||
s3_client: Amazon S3 client. |
||||
bucket_name: S3 bucket name. |
||||
file_path: File path. |
||||
checksum: File checksum. |
||||
private: False if file should be public, True otherwise. |
||||
""" |
||||
acl = 'bucket-owner-full-control' if private else 'public-read' |
||||
s3_client.upload_file(file_path, bucket_name, checksum, |
||||
ExtraArgs={'ACL': acl}) |
||||
|
||||
|
||||
def get_file_iterator( |
||||
files: List[str], metadata_path: Optional[str] |
||||
) -> Iterator[Tuple[str, str, str]]: |
||||
""" |
||||
Finds a suitable file iterator for given arguments. |
||||
|
||||
Args: |
||||
files: List of files. |
||||
metadata_path: Metadata file path. |
||||
|
||||
Returns: |
||||
File iterator. |
||||
""" |
||||
if files: |
||||
iterator = iter_files(files) |
||||
else: |
||||
if not metadata_path: |
||||
metadata_path = find_metadata_file(os.getcwd()) |
||||
iterator = iter_metadata(metadata_path) |
||||
return iterator |
||||
|
||||
|
||||
def main(): |
||||
arg_parser = init_arg_parser() |
||||
args = arg_parser.parse_args(sys.argv[1:]) |
||||
configure_logger(args.verbose) |
||||
s3_client = boto3.client('s3') |
||||
iterator = get_file_iterator(args.file, args.input_metadata) |
||||
out_fd = None |
||||
if args.output_metadata: |
||||
out_fd = open(args.output_metadata, 'w') |
||||
try: |
||||
for rel_path, checksum, checksum_type in iterator: |
||||
file_path = normalize_path(rel_path) |
||||
if not args.file: |
||||
real_checksum = get_file_checksum(file_path, checksum_type) |
||||
if real_checksum != checksum: |
||||
raise ChecksumError( |
||||
f"{rel_path} {checksum_type} checksum {real_checksum} " |
||||
f"doesn't match expected {checksum}" |
||||
) |
||||
file_url = f'https://{args.domain_name}/{checksum}' |
||||
if not is_file_exist(s3_client, args.bucket, checksum): |
||||
upload_file(s3_client, args.bucket, file_path, checksum, |
||||
args.private) |
||||
logging.info(f'{rel_path} successfully uploaded: {file_url}') |
||||
else: |
||||
logging.info(f'{rel_path} is already uploaded: {file_url}') |
||||
if out_fd: |
||||
out_fd.write(f'{checksum} {rel_path}\n') |
||||
finally: |
||||
if out_fd: |
||||
out_fd.close() |
@ -0,0 +1,120 @@ |
||||
"""AlmaLinux Git server utilities common functions""" |
||||
|
||||
import hashlib |
||||
import logging |
||||
import os |
||||
import re |
||||
from typing import Iterator, Tuple |
||||
|
||||
__all__ = ['configure_logger', 'detect_checksum_type', 'find_metadata_file', |
||||
'get_file_checksum', 'iter_metadata', 'normalize_path'] |
||||
|
||||
|
||||
def configure_logger(verbose: bool) -> logging.Logger: |
||||
""" |
||||
Configures a console logger. |
||||
|
||||
Args: |
||||
verbose: Show DEBUG messages if True, show INFO and higher otherwise. |
||||
|
||||
Returns: |
||||
Configured logger. |
||||
""" |
||||
level = logging.DEBUG if verbose else logging.INFO |
||||
handler = logging.StreamHandler() |
||||
handler.setLevel(level) |
||||
log_format = "%(levelname)-8s: %(message)s" |
||||
formatter = logging.Formatter(log_format, '%y-%m-%d %H:%M:%S') |
||||
handler.setFormatter(formatter) |
||||
logger = logging.getLogger() |
||||
logger.addHandler(handler) |
||||
logger.setLevel(level) |
||||
return logger |
||||
|
||||
|
||||
def detect_checksum_type(checksum: str) -> str: |
||||
""" |
||||
Detects checksum by its length. |
||||
|
||||
Args: |
||||
checksum: Checksum. |
||||
|
||||
Returns: |
||||
Checksum type. |
||||
""" |
||||
hash_types = {32: 'md5', 40: 'sha1', 64: 'sha256', 128: 'sha512'} |
||||
hash_type = hash_types.get(len(checksum)) |
||||
if not hash_type: |
||||
raise ValueError(f'unknown checksum type {checksum}') |
||||
return hash_type |
||||
|
||||
|
||||
def find_metadata_file(path: str) -> str: |
||||
""" |
||||
Finds a sources metadata file in the specified directory. |
||||
|
||||
Args: |
||||
path: Directory to search in. |
||||
|
||||
Returns: |
||||
Sources metadata file path. |
||||
""" |
||||
files = [f for f in os.listdir(path) if re.match(r'^\.\S*?\.metadata$', f)] |
||||
if not files: |
||||
raise Exception('metadata file is not found') |
||||
elif len(files) > 1: |
||||
raise Exception('multiple metadata files found. Please specify one to ' |
||||
'use') |
||||
return os.path.join(path, files[0]) |
||||
|
||||
|
||||
def get_file_checksum(file_path: str, checksum_type: str = 'sha1', |
||||
buff_size: int = 1048576) -> str: |
||||
""" |
||||
Calculates a file checksum. |
||||
|
||||
Args: |
||||
file_path: File path. |
||||
checksum_type: Checksum type. |
||||
buff_size: Number of bytes to read at once. |
||||
|
||||
Returns: |
||||
File checksum. |
||||
""" |
||||
hasher = hashlib.new(checksum_type) |
||||
with open(file_path, 'rb') as fd: |
||||
buff = fd.read(buff_size) |
||||
while len(buff): |
||||
hasher.update(buff) |
||||
buff = fd.read(buff_size) |
||||
return hasher.hexdigest() |
||||
|
||||
|
||||
def iter_metadata(metadata_path: str) -> Iterator[Tuple[str, str, str]]: |
||||
""" |
||||
Iterates over records in a CentOS git repository-compatible metadata file. |
||||
|
||||
Args: |
||||
metadata_path: Metadata file path. |
||||
|
||||
Returns: |
||||
Iterator over files and their checksums. |
||||
""" |
||||
with open(metadata_path, 'r') as fd: |
||||
for line in fd: |
||||
checksum, file_path = line.split() |
||||
checksum_type = detect_checksum_type(checksum) |
||||
yield file_path, checksum, checksum_type |
||||
|
||||
|
||||
def normalize_path(path: str) -> str: |
||||
""" |
||||
Returns an absolute path with all variables expanded. |
||||
|
||||
Args: |
||||
path: Path to normalize. |
||||
|
||||
Returns: |
||||
Normalized path. |
||||
""" |
||||
return os.path.abspath(os.path.expanduser(os.path.expandvars(path))) |
@ -0,0 +1,15 @@ |
||||
"""AlmaLinux Git server utilities error classes""" |
||||
|
||||
|
||||
class ChecksumError(Exception): |
||||
|
||||
"""File checksum mismatch exception""" |
||||
|
||||
pass |
||||
|
||||
|
||||
class NetworkError(Exception): |
||||
|
||||
"""Network error exception""" |
||||
|
||||
pass |
@ -0,0 +1,120 @@ |
||||
"""Downloads sources and blobs from AlmaLinux or CentOS sources cache""" |
||||
|
||||
import argparse |
||||
import logging |
||||
import os |
||||
import shutil |
||||
import sys |
||||
|
||||
import requests |
||||
|
||||
from almalinux.gitutils.common import ( |
||||
configure_logger, find_metadata_file, get_file_checksum, iter_metadata, |
||||
normalize_path |
||||
) |
||||
from almalinux.gitutils.errors import ChecksumError, NetworkError |
||||
|
||||
|
||||
def init_arg_parser() -> argparse.ArgumentParser: |
||||
""" |
||||
Initializes a command line arguments parser. |
||||
|
||||
Returns: |
||||
Command line arguments parser. |
||||
""" |
||||
arg_parser = argparse.ArgumentParser(prog='alma_get_sources', |
||||
description=__doc__) |
||||
arg_parser.add_argument('-i', '--input-metadata', metavar='INPUT_FILE', |
||||
help='input metadata file list to download') |
||||
arg_parser.add_argument('--domain-name', default='sources.almalinux.org', |
||||
help='AlmaLinux sources server domain name. ' |
||||
'Default is sources.almalinux.org') |
||||
arg_parser.add_argument('-v', '--verbose', action='store_true', |
||||
help='enable additional debug output') |
||||
return arg_parser |
||||
|
||||
|
||||
def create_sources_dir(base_dir: str, rel_path: str): |
||||
""" |
||||
Creates a sources directory if it doesn't exist. |
||||
|
||||
Args: |
||||
base_dir: Project's base directory. |
||||
rel_path: Project's source file relative path. |
||||
""" |
||||
dir_name, file_name = os.path.split(rel_path) |
||||
dir_path = os.path.join(base_dir, dir_name) |
||||
if not os.path.exists(dir_path): |
||||
os.makedirs(dir_path) |
||||
|
||||
|
||||
def download_alma_blob(file_path: str, checksum: str, domain_name: str): |
||||
""" |
||||
Downloads a BLOB from the AlmaLinux Git sources cache. |
||||
|
||||
Args: |
||||
file_path: Destination file path. |
||||
checksum: File checksum. |
||||
domain_name: AlmaLinux Git source cache domain name. |
||||
""" |
||||
url = f'https://{domain_name}/{checksum}' |
||||
with requests.get(url, stream=True) as rqst: |
||||
try: |
||||
rqst.raise_for_status() |
||||
except requests.exceptions.HTTPError as e: |
||||
raise NetworkError(str(e)) |
||||
with open(file_path, 'wb') as fd: |
||||
shutil.copyfileobj(rqst.raw, fd) |
||||
|
||||
|
||||
def download_metadata_blobs(metadata_path: str, base_dir: str, |
||||
domain_name: str): |
||||
""" |
||||
Downloads BLOBs listed in a metadata file from AlmaLinux Git sources cache. |
||||
|
||||
Args: |
||||
metadata_path: Metadata file path. |
||||
base_dir: Package sources base directory. |
||||
domain_name: AlmaLinux Git sources cache domain name. |
||||
""" |
||||
for rel_path, checksum, checksum_type in iter_metadata(metadata_path): |
||||
file_path = os.path.join(base_dir, rel_path) |
||||
if os.path.exists(file_path): |
||||
real_checksum = get_file_checksum(file_path, checksum_type) |
||||
if real_checksum != checksum: |
||||
raise ChecksumError( |
||||
f"{rel_path} already exists but its {checksum_type} " |
||||
f"checksum {real_checksum} doesn't match expected " |
||||
f"{checksum}" |
||||
) |
||||
logging.info(f'{rel_path} already exists and its checksum is ' |
||||
f'correct') |
||||
continue |
||||
create_sources_dir(base_dir, rel_path) |
||||
download_alma_blob(file_path, checksum, domain_name) |
||||
real_checksum = get_file_checksum(file_path, checksum_type) |
||||
if real_checksum != checksum: |
||||
raise ChecksumError( |
||||
f"{rel_path} has been downloaded but its {checksum_type} " |
||||
f"checksum {real_checksum} doesn't match expected {checksum}" |
||||
) |
||||
logging.info(f'{rel_path} has been successfully downloaded') |
||||
|
||||
|
||||
def main(): |
||||
arg_parser = init_arg_parser() |
||||
args = arg_parser.parse_args(sys.argv[1:]) |
||||
configure_logger(args.verbose) |
||||
base_dir = os.getcwd() |
||||
if args.input_metadata: |
||||
metadata_path = normalize_path(args.input_metadata) |
||||
else: |
||||
metadata_path = find_metadata_file(base_dir) |
||||
try: |
||||
download_metadata_blobs(metadata_path, base_dir, args.domain_name) |
||||
except ChecksumError as e: |
||||
logging.error(e) |
||||
return os.EX_DATAERR |
||||
except NetworkError as e: |
||||
logging.error(e) |
||||
return os.EX_IOERR |
@ -0,0 +1,6 @@ |
||||
[build-system] |
||||
requires = [ |
||||
"setuptools>=42", |
||||
"wheel" |
||||
] |
||||
build-backend = "setuptools.build_meta" |
@ -0,0 +1,36 @@ |
||||
from setuptools import find_namespace_packages, setup |
||||
|
||||
with open("README.md", "r", encoding="utf-8") as fd: |
||||
long_description = fd.read() |
||||
|
||||
setup( |
||||
name="almalinux-git-utils", |
||||
version="0.0.1", |
||||
author="Eugene Zamriy", |
||||
author_email="ezamriy@almalinux.org", |
||||
description="Utilities for working with the AlmaLinux OS Git server", |
||||
long_description=long_description, |
||||
long_description_content_type="text/markdown", |
||||
url="https://git.almalinux.org/almalinux/almalinux-git-utils", |
||||
project_urls={ |
||||
"Bug Tracker": "https://git.almalinux.org/almalinux/almalinux-git-utils/issues", |
||||
}, |
||||
classifiers=[ |
||||
"Programming Language :: Python :: 3", |
||||
"License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)", |
||||
"Operating System :: OS Independent", |
||||
], |
||||
packages=find_namespace_packages(include=['almalinux.*']), |
||||
entry_points={ |
||||
'console_scripts': [ |
||||
'alma_blob_upload=almalinux.gitutils.blob_upload:main', |
||||
'alma_get_sources=almalinux.gitutils.get_sources:main' |
||||
] |
||||
}, |
||||
install_requires=[ |
||||
'boto3>=1.15.15', |
||||
'requests>=2.20.0' |
||||
], |
||||
python_requires=">=3.6", |
||||
zip_safe=False |
||||
) |
@ -0,0 +1,94 @@ |
||||
import os |
||||
|
||||
import pytest |
||||
|
||||
from almalinux.gitutils.common import * |
||||
|
||||
|
||||
@pytest.mark.parametrize( |
||||
'checksum,expected', |
||||
[('35d14f5ab4ee239b070f3b645fb82837', 'md5'), |
||||
('1014c8812720619a5a6bcd189e5d7f5d16276d86', 'sha1'), |
||||
('86d8a9a32cdaff2c6003c67a12549466319e0ae51b7665fd01fd9354a3b1cf55', |
||||
'sha256'), |
||||
('9906e61ef0b693bf978e2a88b737c79dd2c815cfc1a09443f04b79b994b4646ff72f18' |
||||
'6e42461b3a5768667119f39fa006ce71530791a5b35c2278e9252ec3ea', 'sha512')] |
||||
) |
||||
def test_detect_checksum_type(checksum, expected): |
||||
"""detect_checksum_type returns type for supported checksums""" |
||||
assert detect_checksum_type(checksum) == expected |
||||
|
||||
|
||||
def test_detect_checksum_type_error(): |
||||
"""detect_checksum_type raises ValueError if checksum type is unknown""" |
||||
with pytest.raises(ValueError): |
||||
detect_checksum_type('somethingwrong') |
||||
|
||||
|
||||
@pytest.mark.parametrize( |
||||
'checksum_type,checksum', |
||||
[(None, '06364afe79d801433188262478a76d19777ef351'), |
||||
('sha1', '06364afe79d801433188262478a76d19777ef351'), |
||||
('sha256', 'b37758528c0338d529b3fb16fd39f28da58241abc856e16bf0bc8b99c60cd632')] |
||||
) |
||||
def test_get_file_checksum(tmpdir, checksum_type, checksum): |
||||
"""get_file_checksum supports different checksum types""" |
||||
file_path = os.path.join(tmpdir, 'test_file.txt') |
||||
with open(file_path, 'w') as fd: |
||||
fd.write('TESTDATA\n') |
||||
args = [file_path] |
||||
if checksum_type: |
||||
args.append(checksum_type) |
||||
assert get_file_checksum(*args) == checksum |
||||
|
||||
|
||||
def test_find_metadata_file_single(tmpdir): |
||||
"""find_metadata_file returns a single metadata file""" |
||||
file_path = os.path.join(tmpdir, '.project.metadata') |
||||
open(file_path, 'a').close() |
||||
assert find_metadata_file(tmpdir) == file_path |
||||
|
||||
|
||||
def test_find_metadata_file_missing(tmpdir): |
||||
"""find_metadata_file raises Exception when metadata file is not found""" |
||||
with pytest.raises(Exception): |
||||
find_metadata_file(tmpdir) |
||||
|
||||
|
||||
def test_find_metadata_file_multiple(tmpdir): |
||||
""" |
||||
find_metadata_file raises Exception when there are multiple metadata files |
||||
""" |
||||
for i in range(2): |
||||
open(os.path.join(tmpdir, f'.project{i}.metadata'), 'a').close() |
||||
with pytest.raises(Exception): |
||||
find_metadata_file(tmpdir) |
||||
|
||||
|
||||
def test_iter_metadata(tmpdir): |
||||
"""iter_metadata returns checksums from metadata file""" |
||||
data = [ |
||||
('SOURCES/mc-4.8.19.tar.xz', |
||||
'850747ae43a5c81f1dd0d906dfa9e149eb19748a', 'sha1'), |
||||
('SOURCES/binary-blob', |
||||
'b37758528c0338d529b3fb16fd39f28da58241abc856e16bf0bc8b99c60cd632', |
||||
'sha256') |
||||
] |
||||
metadata_path = os.path.join(tmpdir, '.project.metadata') |
||||
with open(metadata_path, 'w') as fd: |
||||
for rec in data: |
||||
fd.write(f'{rec[1]} {rec[0]}\n') |
||||
metadata = [] |
||||
for file_path, checksum, checksum_type in iter_metadata(metadata_path): |
||||
metadata.append((file_path, checksum, checksum_type)) |
||||
assert metadata == data |
||||
|
||||
|
||||
def test_normalize_path(monkeypatch): |
||||
""" |
||||
normalize_path expands variables and converts relative paths to absolute |
||||
""" |
||||
cwd = os.getcwd() |
||||
expected = os.path.join(cwd, 'basedir', 'subdir') |
||||
monkeypatch.setenv('BASE_DIR', 'basedir') |
||||
assert normalize_path('${BASE_DIR}/subdir') == expected |
Loading…
Reference in new issue