From 804cca076ea500ace069ac940571f63cf804ac7f Mon Sep 17 00:00:00 2001 From: Eugene Zamriy Date: Wed, 25 Aug 2021 17:10:58 +0300 Subject: [PATCH] 0.0.1 version - added alma_get_sources tool - added tests for common functions - added setup.py and the project uploaded to PyPi --- README.md | 36 +- almalinux/gitutils/__init__.py | 1 + .../gitutils/blob_upload.py | 349 ++++++++---------- almalinux/gitutils/common.py | 120 ++++++ almalinux/gitutils/errors.py | 15 + almalinux/gitutils/get_sources.py | 120 ++++++ pyproject.toml | 6 + setup.py | 36 ++ tests/gitutils/test_common.py | 94 +++++ 9 files changed, 570 insertions(+), 207 deletions(-) create mode 100644 almalinux/gitutils/__init__.py rename alma_blob_upload.py => almalinux/gitutils/blob_upload.py (54%) create mode 100644 almalinux/gitutils/common.py create mode 100644 almalinux/gitutils/errors.py create mode 100644 almalinux/gitutils/get_sources.py create mode 100644 pyproject.toml create mode 100644 setup.py create mode 100644 tests/gitutils/test_common.py diff --git a/README.md b/README.md index 2197cdd..cc526e7 100644 --- a/README.md +++ b/README.md @@ -3,6 +3,24 @@ Utilities for working with the AlmaLinux OS Git server. +## alma_get_sources + +The `alma_get_sources` script downloads sources and BLOBs from the AlmaLinux +sources cache. + +### Usage + +Run the `alma_get_sources` in a git project root directory: + +1. Clone an AlmaLinux RPM package git project from + [git.almalinux.org](https://git.almalinux.org). +2. Switch to a required branch. +3. Run the `alma_get_sources` tool: + ```shell + $ alma_get_sources + ``` + + ## alma_blob_upload The `alma_blob_upload` script uploads sources and BLOBs to the AlmaLinux @@ -10,16 +28,6 @@ sources cache. ### Prerequirements -Install the `python3-boto3` package: - -```shell -# RPM-based distributions. On EL8 derivatives the package is available from EPEL. -$ sudo dnf install python3 python3-boto3 - -# Debian-based distributions -$ sudo apt install python3-boto3 -``` - Create an AWS credentials file ~/.aws/credentials with the following content: ```ini @@ -43,20 +51,20 @@ For CentOS repositories workflow will be the following: 3. Run the `alma_blob_upload` tool (don't forget to replace `PROJECT_NAME` with an actual project name): ```shell - $ alma_blob_upload.py -i .PROJECT_NAME.metadata + $ alma_blob_upload -i .PROJECT_NAME.metadata ``` Alternatively, you can upload a list of files in the following way: ```shell -$ alma_blob_upload.py -f SOURCES/FILE_1 SOURCES/FILE_N +$ alma_blob_upload -f SOURCES/FILE_1 SOURCES/FILE_N ``` The `alma_blob_upload` utility can also generate a CentOS-compatible metadata file: -``` -$ alma_blob_upload.py -o .PROJECT_NAME.metadata -f SOURCES/FILE_1 SOURCES/FILE_N +```shell +$ alma_blob_upload -o .PROJECT_NAME.metadata -f SOURCES/FILE_1 SOURCES/FILE_N ``` diff --git a/almalinux/gitutils/__init__.py b/almalinux/gitutils/__init__.py new file mode 100644 index 0000000..7e0eac1 --- /dev/null +++ b/almalinux/gitutils/__init__.py @@ -0,0 +1 @@ +name = 'gitutils' diff --git a/alma_blob_upload.py b/almalinux/gitutils/blob_upload.py similarity index 54% rename from alma_blob_upload.py rename to almalinux/gitutils/blob_upload.py index cd6d42b..8f821fb 100644 --- a/alma_blob_upload.py +++ b/almalinux/gitutils/blob_upload.py @@ -1,193 +1,156 @@ -#!/usr/bin/env python3 - -""" -Uploads sources and BLOBs to the AlmaLinux sources cache. -""" - -import argparse -import hashlib -import os.path -import sys -from typing import Iterator, List, TextIO, Tuple - -import boto3 # type: ignore -from botocore.exceptions import ClientError # type: ignore - - -def init_arg_parser() -> argparse.ArgumentParser: - """ - Initializes a command line arguments parser. - - Returns: - Command line arguments parser. - """ - arg_parser = argparse.ArgumentParser( - prog="alma_blob_upload", - description="Uploads sources and BLOBs to the AlmaLinux sources cache" - ) - group = arg_parser.add_mutually_exclusive_group(required=True) - group.add_argument('-f', '--file', nargs='+', help='file(s) to upload') - group.add_argument('-i', '--input-metadata', metavar='INPUT_FILE', - help='input metadata file list to upload') - arg_parser.add_argument('-b', '--bucket', default="sources.almalinux.org", - help='Amazon S3 bucket name. Default is ' - 'sources.almalinux.org') - arg_parser.add_argument('-o', '--output-metadata', metavar='OUTPUT_FILE', - help='output metadata file path') - arg_parser.add_argument('-p', '--private', action='store_true', - help='set uploaded file mode to private. All ' - 'uploaded files are public by default') - arg_parser.add_argument('--domain-name', default='sources.almalinux.org', - help='AlmaLinux sources server domain name. ' - 'Default is sources.almalinux.org') - arg_parser.add_argument('-v', '--verbose', action='store_true', - help='enable additional debug output') - return arg_parser - - -def get_file_checksum(file_path: str, checksum_type: str = 'sha1', - buff_size: int = 1048576) -> str: - """ - Calculates a file checksum. - - Args: - file_path: File path. - checksum_type: Checksum type. - buff_size: Number of bytes to read at once. - - Returns: - File checksum. - """ - hasher = hashlib.new(checksum_type) - with open(file_path, 'rb') as fd: - buff = fd.read(buff_size) - while len(buff): - hasher.update(buff) - buff = fd.read(buff_size) - return hasher.hexdigest() - - -def normalize_path(path: str) -> str: - """ - Returns an absolute path with all variables expanded. - - Args: - path: Path to normalize. - - Returns: - Normalized path. - """ - return os.path.abspath(os.path.expanduser(os.path.expandvars(path))) - - -def iter_metadata(metadata_path: str) -> Iterator[Tuple[str, str]]: - """ - Iterates over records in a CentOS git repository-compatible metadata file. - - Args: - metadata_path: Metadata file path. - - Returns: - Iterator over files and their checksums. - """ - with open(metadata_path, 'r') as fd: - for line in fd: - checksum, file_path = line.split() - file_path = normalize_path(file_path) - assert checksum == get_file_checksum(file_path) - yield file_path, checksum - - -def iter_files(files: List[str]) -> Iterator[Tuple[str, str]]: - """ - Iterates over a list of files and calculates checksums for them. - - Args: - files: List of files. - - Returns: - Iterator over files and their checksums. - """ - for file_path in files: - file_path = normalize_path(file_path) - checksum = get_file_checksum(file_path) - yield file_path, checksum - - -def is_file_exist(s3_client, bucket_name: str, checksum: str) -> bool: - """ - Checks is a file with a given checksum is already uploaded. - Args: - s3_client: Amazon S3 client. - bucket_name: S3 bucket name. - checksum: File checksum. - - Returns: - True if a file is already uploaded, False otherwise. - """ - try: - s3_client.head_object(Bucket=bucket_name, Key=checksum) - return True - except ClientError: - return False - - -def upload_file(s3_client, bucket_name: str, file_path: str, - checksum: str, private: bool): - """ - Uploads a file to an Amazon S3 bucket. - - Args: - s3_client: Amazon S3 client. - bucket_name: S3 bucket name. - file_path: File path. - checksum: File checksum. - private: False if file should be public, True otherwise. - """ - acl = 'bucket-owner-full-control' if private else 'public-read' - s3_client.upload_file(file_path, bucket_name, checksum, - ExtraArgs={'ACL': acl}) - - -def add_metadata_record(metadata_fd: TextIO, file_path: str, checksum: str): - """ - Adds a source file record to a metadata file. - - Args: - metadata_fd: Metadata file descriptor. - file_path: Source file path. - checksum: Source file checksum. - """ - rel_path = os.path.relpath(file_path) - metadata_fd.write(f'{checksum} {rel_path}\n') - - -def main(sys_args): - arg_parser = init_arg_parser() - args = arg_parser.parse_args(sys_args) - s3_client = boto3.client('s3') - if args.input_metadata: - iterator = iter_metadata(args.input_metadata) - else: - iterator = iter_files(args.file) - out_fd = None - if args.output_metadata: - out_fd = open(args.output_metadata, 'w') - try: - for file_path, checksum in iterator: - file_url = f'https://{args.domain_name}/{checksum}' - if not is_file_exist(s3_client, args.bucket, checksum): - upload_file(s3_client, args.bucket, file_path, checksum, - args.private) - print(f'{file_path} uploaded: {file_url}') - else: - print(f'{file_path} exists: {file_url}') - if out_fd: - add_metadata_record(out_fd, file_path, checksum) - finally: - if out_fd: - out_fd.close() - - -if __name__ == '__main__': - sys.exit(main(sys.argv[1:])) +"""Uploads sources and BLOBs to the AlmaLinux sources cache""" + +import argparse +import logging +import os +import sys +from typing import Iterator, List, Optional, Tuple + +import boto3 +import botocore.exceptions + +from almalinux.gitutils.errors import ChecksumError +from almalinux.gitutils.common import ( + configure_logger, find_metadata_file, get_file_checksum, iter_metadata, + normalize_path +) + + +def init_arg_parser() -> argparse.ArgumentParser: + """ + Initializes a command line arguments parser. + + Returns: + Command line arguments parser. + """ + arg_parser = argparse.ArgumentParser( + prog="alma_blob_upload", + description="Uploads sources and BLOBs to the AlmaLinux sources cache" + ) + group = arg_parser.add_mutually_exclusive_group() + group.add_argument('-f', '--file', nargs='+', help='file(s) to upload') + group.add_argument('-i', '--input-metadata', metavar='INPUT_FILE', + help='input metadata file list to upload. Will be ' + 'detected automatically if omitted and no files ' + 'provided') + arg_parser.add_argument('-b', '--bucket', default="sources.almalinux.org", + help='Amazon S3 bucket name. Default is ' + 'sources.almalinux.org') + arg_parser.add_argument('-o', '--output-metadata', metavar='OUTPUT_FILE', + help='output metadata file path') + arg_parser.add_argument('-p', '--private', action='store_true', + help='set uploaded file mode to private. All ' + 'uploaded files are public by default') + arg_parser.add_argument('--domain-name', default='sources.almalinux.org', + help='AlmaLinux sources server domain name. ' + 'Default is sources.almalinux.org') + arg_parser.add_argument('-v', '--verbose', action='store_true', + help='enable additional debug output') + return arg_parser + + +def iter_files(files: List[str]) -> Iterator[Tuple[str, str, str]]: + """ + Iterates over a list of files and calculates checksums for them. + + Args: + files: List of files. + + Returns: + Iterator over files and their checksums. + """ + checksum_type = 'sha1' + for rel_path in files: + file_path = normalize_path(rel_path) + checksum = get_file_checksum(file_path, checksum_type) + yield rel_path, checksum, checksum_type + + +def is_file_exist(s3_client, bucket_name: str, checksum: str) -> bool: + """ + Checks is a file with a given checksum is already uploaded. + Args: + s3_client: Amazon S3 client. + bucket_name: S3 bucket name. + checksum: File checksum. + + Returns: + True if a file is already uploaded, False otherwise. + """ + try: + s3_client.head_object(Bucket=bucket_name, Key=checksum) + return True + except botocore.exceptions.ClientError: + return False + + +def upload_file(s3_client, bucket_name: str, file_path: str, + checksum: str, private: bool): + """ + Uploads a file to an Amazon S3 bucket. + + Args: + s3_client: Amazon S3 client. + bucket_name: S3 bucket name. + file_path: File path. + checksum: File checksum. + private: False if file should be public, True otherwise. + """ + acl = 'bucket-owner-full-control' if private else 'public-read' + s3_client.upload_file(file_path, bucket_name, checksum, + ExtraArgs={'ACL': acl}) + + +def get_file_iterator( + files: List[str], metadata_path: Optional[str] +) -> Iterator[Tuple[str, str, str]]: + """ + Finds a suitable file iterator for given arguments. + + Args: + files: List of files. + metadata_path: Metadata file path. + + Returns: + File iterator. + """ + if files: + iterator = iter_files(files) + else: + if not metadata_path: + metadata_path = find_metadata_file(os.getcwd()) + iterator = iter_metadata(metadata_path) + return iterator + + +def main(): + arg_parser = init_arg_parser() + args = arg_parser.parse_args(sys.argv[1:]) + configure_logger(args.verbose) + s3_client = boto3.client('s3') + iterator = get_file_iterator(args.file, args.input_metadata) + out_fd = None + if args.output_metadata: + out_fd = open(args.output_metadata, 'w') + try: + for rel_path, checksum, checksum_type in iterator: + file_path = normalize_path(rel_path) + if not args.file: + real_checksum = get_file_checksum(file_path, checksum_type) + if real_checksum != checksum: + raise ChecksumError( + f"{rel_path} {checksum_type} checksum {real_checksum} " + f"doesn't match expected {checksum}" + ) + file_url = f'https://{args.domain_name}/{checksum}' + if not is_file_exist(s3_client, args.bucket, checksum): + upload_file(s3_client, args.bucket, file_path, checksum, + args.private) + logging.info(f'{rel_path} successfully uploaded: {file_url}') + else: + logging.info(f'{rel_path} is already uploaded: {file_url}') + if out_fd: + out_fd.write(f'{checksum} {rel_path}\n') + finally: + if out_fd: + out_fd.close() diff --git a/almalinux/gitutils/common.py b/almalinux/gitutils/common.py new file mode 100644 index 0000000..b35085f --- /dev/null +++ b/almalinux/gitutils/common.py @@ -0,0 +1,120 @@ +"""AlmaLinux Git server utilities common functions""" + +import hashlib +import logging +import os +import re +from typing import Iterator, Tuple + +__all__ = ['configure_logger', 'detect_checksum_type', 'find_metadata_file', + 'get_file_checksum', 'iter_metadata', 'normalize_path'] + + +def configure_logger(verbose: bool) -> logging.Logger: + """ + Configures a console logger. + + Args: + verbose: Show DEBUG messages if True, show INFO and higher otherwise. + + Returns: + Configured logger. + """ + level = logging.DEBUG if verbose else logging.INFO + handler = logging.StreamHandler() + handler.setLevel(level) + log_format = "%(levelname)-8s: %(message)s" + formatter = logging.Formatter(log_format, '%y-%m-%d %H:%M:%S') + handler.setFormatter(formatter) + logger = logging.getLogger() + logger.addHandler(handler) + logger.setLevel(level) + return logger + + +def detect_checksum_type(checksum: str) -> str: + """ + Detects checksum by its length. + + Args: + checksum: Checksum. + + Returns: + Checksum type. + """ + hash_types = {32: 'md5', 40: 'sha1', 64: 'sha256', 128: 'sha512'} + hash_type = hash_types.get(len(checksum)) + if not hash_type: + raise ValueError(f'unknown checksum type {checksum}') + return hash_type + + +def find_metadata_file(path: str) -> str: + """ + Finds a sources metadata file in the specified directory. + + Args: + path: Directory to search in. + + Returns: + Sources metadata file path. + """ + files = [f for f in os.listdir(path) if re.match(r'^\.\S*?\.metadata$', f)] + if not files: + raise Exception('metadata file is not found') + elif len(files) > 1: + raise Exception('multiple metadata files found. Please specify one to ' + 'use') + return os.path.join(path, files[0]) + + +def get_file_checksum(file_path: str, checksum_type: str = 'sha1', + buff_size: int = 1048576) -> str: + """ + Calculates a file checksum. + + Args: + file_path: File path. + checksum_type: Checksum type. + buff_size: Number of bytes to read at once. + + Returns: + File checksum. + """ + hasher = hashlib.new(checksum_type) + with open(file_path, 'rb') as fd: + buff = fd.read(buff_size) + while len(buff): + hasher.update(buff) + buff = fd.read(buff_size) + return hasher.hexdigest() + + +def iter_metadata(metadata_path: str) -> Iterator[Tuple[str, str, str]]: + """ + Iterates over records in a CentOS git repository-compatible metadata file. + + Args: + metadata_path: Metadata file path. + + Returns: + Iterator over files and their checksums. + """ + with open(metadata_path, 'r') as fd: + for line in fd: + checksum, file_path = line.split() + checksum_type = detect_checksum_type(checksum) + yield file_path, checksum, checksum_type + + +def normalize_path(path: str) -> str: + """ + Returns an absolute path with all variables expanded. + + Args: + path: Path to normalize. + + Returns: + Normalized path. + """ + return os.path.abspath(os.path.expanduser(os.path.expandvars(path))) diff --git a/almalinux/gitutils/errors.py b/almalinux/gitutils/errors.py new file mode 100644 index 0000000..8d7e5ce --- /dev/null +++ b/almalinux/gitutils/errors.py @@ -0,0 +1,15 @@ +"""AlmaLinux Git server utilities error classes""" + + +class ChecksumError(Exception): + + """File checksum mismatch exception""" + + pass + + +class NetworkError(Exception): + + """Network error exception""" + + pass diff --git a/almalinux/gitutils/get_sources.py b/almalinux/gitutils/get_sources.py new file mode 100644 index 0000000..a12937c --- /dev/null +++ b/almalinux/gitutils/get_sources.py @@ -0,0 +1,120 @@ +"""Downloads sources and blobs from AlmaLinux or CentOS sources cache""" + +import argparse +import logging +import os +import shutil +import sys + +import requests + +from almalinux.gitutils.common import ( + configure_logger, find_metadata_file, get_file_checksum, iter_metadata, + normalize_path +) +from almalinux.gitutils.errors import ChecksumError, NetworkError + + +def init_arg_parser() -> argparse.ArgumentParser: + """ + Initializes a command line arguments parser. + + Returns: + Command line arguments parser. + """ + arg_parser = argparse.ArgumentParser(prog='alma_get_sources', + description=__doc__) + arg_parser.add_argument('-i', '--input-metadata', metavar='INPUT_FILE', + help='input metadata file list to download') + arg_parser.add_argument('--domain-name', default='sources.almalinux.org', + help='AlmaLinux sources server domain name. ' + 'Default is sources.almalinux.org') + arg_parser.add_argument('-v', '--verbose', action='store_true', + help='enable additional debug output') + return arg_parser + + +def create_sources_dir(base_dir: str, rel_path: str): + """ + Creates a sources directory if it doesn't exist. + + Args: + base_dir: Project's base directory. + rel_path: Project's source file relative path. + """ + dir_name, file_name = os.path.split(rel_path) + dir_path = os.path.join(base_dir, dir_name) + if not os.path.exists(dir_path): + os.makedirs(dir_path) + + +def download_alma_blob(file_path: str, checksum: str, domain_name: str): + """ + Downloads a BLOB from the AlmaLinux Git sources cache. + + Args: + file_path: Destination file path. + checksum: File checksum. + domain_name: AlmaLinux Git source cache domain name. + """ + url = f'https://{domain_name}/{checksum}' + with requests.get(url, stream=True) as rqst: + try: + rqst.raise_for_status() + except requests.exceptions.HTTPError as e: + raise NetworkError(str(e)) + with open(file_path, 'wb') as fd: + shutil.copyfileobj(rqst.raw, fd) + + +def download_metadata_blobs(metadata_path: str, base_dir: str, + domain_name: str): + """ + Downloads BLOBs listed in a metadata file from AlmaLinux Git sources cache. + + Args: + metadata_path: Metadata file path. + base_dir: Package sources base directory. + domain_name: AlmaLinux Git sources cache domain name. + """ + for rel_path, checksum, checksum_type in iter_metadata(metadata_path): + file_path = os.path.join(base_dir, rel_path) + if os.path.exists(file_path): + real_checksum = get_file_checksum(file_path, checksum_type) + if real_checksum != checksum: + raise ChecksumError( + f"{rel_path} already exists but its {checksum_type} " + f"checksum {real_checksum} doesn't match expected " + f"{checksum}" + ) + logging.info(f'{rel_path} already exists and its checksum is ' + f'correct') + continue + create_sources_dir(base_dir, rel_path) + download_alma_blob(file_path, checksum, domain_name) + real_checksum = get_file_checksum(file_path, checksum_type) + if real_checksum != checksum: + raise ChecksumError( + f"{rel_path} has been downloaded but its {checksum_type} " + f"checksum {real_checksum} doesn't match expected {checksum}" + ) + logging.info(f'{rel_path} has been successfully downloaded') + + +def main(): + arg_parser = init_arg_parser() + args = arg_parser.parse_args(sys.argv[1:]) + configure_logger(args.verbose) + base_dir = os.getcwd() + if args.input_metadata: + metadata_path = normalize_path(args.input_metadata) + else: + metadata_path = find_metadata_file(base_dir) + try: + download_metadata_blobs(metadata_path, base_dir, args.domain_name) + except ChecksumError as e: + logging.error(e) + return os.EX_DATAERR + except NetworkError as e: + logging.error(e) + return os.EX_IOERR diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..236c211 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,6 @@ +[build-system] +requires = [ + "setuptools>=42", + "wheel" +] +build-backend = "setuptools.build_meta" diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..ee60c1a --- /dev/null +++ b/setup.py @@ -0,0 +1,36 @@ +from setuptools import find_namespace_packages, setup + +with open("README.md", "r", encoding="utf-8") as fd: + long_description = fd.read() + +setup( + name="almalinux-git-utils", + version="0.0.1", + author="Eugene Zamriy", + author_email="ezamriy@almalinux.org", + description="Utilities for working with the AlmaLinux OS Git server", + long_description=long_description, + long_description_content_type="text/markdown", + url="https://git.almalinux.org/almalinux/almalinux-git-utils", + project_urls={ + "Bug Tracker": "https://git.almalinux.org/almalinux/almalinux-git-utils/issues", + }, + classifiers=[ + "Programming Language :: Python :: 3", + "License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)", + "Operating System :: OS Independent", + ], + packages=find_namespace_packages(include=['almalinux.*']), + entry_points={ + 'console_scripts': [ + 'alma_blob_upload=almalinux.gitutils.blob_upload:main', + 'alma_get_sources=almalinux.gitutils.get_sources:main' + ] + }, + install_requires=[ + 'boto3>=1.15.15', + 'requests>=2.20.0' + ], + python_requires=">=3.6", + zip_safe=False +) diff --git a/tests/gitutils/test_common.py b/tests/gitutils/test_common.py new file mode 100644 index 0000000..49d3d46 --- /dev/null +++ b/tests/gitutils/test_common.py @@ -0,0 +1,94 @@ +import os + +import pytest + +from almalinux.gitutils.common import * + + +@pytest.mark.parametrize( + 'checksum,expected', + [('35d14f5ab4ee239b070f3b645fb82837', 'md5'), + ('1014c8812720619a5a6bcd189e5d7f5d16276d86', 'sha1'), + ('86d8a9a32cdaff2c6003c67a12549466319e0ae51b7665fd01fd9354a3b1cf55', + 'sha256'), + ('9906e61ef0b693bf978e2a88b737c79dd2c815cfc1a09443f04b79b994b4646ff72f18' + '6e42461b3a5768667119f39fa006ce71530791a5b35c2278e9252ec3ea', 'sha512')] +) +def test_detect_checksum_type(checksum, expected): + """detect_checksum_type returns type for supported checksums""" + assert detect_checksum_type(checksum) == expected + + +def test_detect_checksum_type_error(): + """detect_checksum_type raises ValueError if checksum type is unknown""" + with pytest.raises(ValueError): + detect_checksum_type('somethingwrong') + + +@pytest.mark.parametrize( + 'checksum_type,checksum', + [(None, '06364afe79d801433188262478a76d19777ef351'), + ('sha1', '06364afe79d801433188262478a76d19777ef351'), + ('sha256', 'b37758528c0338d529b3fb16fd39f28da58241abc856e16bf0bc8b99c60cd632')] +) +def test_get_file_checksum(tmpdir, checksum_type, checksum): + """get_file_checksum supports different checksum types""" + file_path = os.path.join(tmpdir, 'test_file.txt') + with open(file_path, 'w') as fd: + fd.write('TESTDATA\n') + args = [file_path] + if checksum_type: + args.append(checksum_type) + assert get_file_checksum(*args) == checksum + + +def test_find_metadata_file_single(tmpdir): + """find_metadata_file returns a single metadata file""" + file_path = os.path.join(tmpdir, '.project.metadata') + open(file_path, 'a').close() + assert find_metadata_file(tmpdir) == file_path + + +def test_find_metadata_file_missing(tmpdir): + """find_metadata_file raises Exception when metadata file is not found""" + with pytest.raises(Exception): + find_metadata_file(tmpdir) + + +def test_find_metadata_file_multiple(tmpdir): + """ + find_metadata_file raises Exception when there are multiple metadata files + """ + for i in range(2): + open(os.path.join(tmpdir, f'.project{i}.metadata'), 'a').close() + with pytest.raises(Exception): + find_metadata_file(tmpdir) + + +def test_iter_metadata(tmpdir): + """iter_metadata returns checksums from metadata file""" + data = [ + ('SOURCES/mc-4.8.19.tar.xz', + '850747ae43a5c81f1dd0d906dfa9e149eb19748a', 'sha1'), + ('SOURCES/binary-blob', + 'b37758528c0338d529b3fb16fd39f28da58241abc856e16bf0bc8b99c60cd632', + 'sha256') + ] + metadata_path = os.path.join(tmpdir, '.project.metadata') + with open(metadata_path, 'w') as fd: + for rec in data: + fd.write(f'{rec[1]} {rec[0]}\n') + metadata = [] + for file_path, checksum, checksum_type in iter_metadata(metadata_path): + metadata.append((file_path, checksum, checksum_type)) + assert metadata == data + + +def test_normalize_path(monkeypatch): + """ + normalize_path expands variables and converts relative paths to absolute + """ + cwd = os.getcwd() + expected = os.path.join(cwd, 'basedir', 'subdir') + monkeypatch.setenv('BASE_DIR', 'basedir') + assert normalize_path('${BASE_DIR}/subdir') == expected