Update 'almalinux/gitutils/blob_upload.py'

This commit is contained in:
Stepan Oksanichenko 2023-05-11 12:34:05 +00:00
parent 804cca076e
commit 843e9670e0
1 changed files with 158 additions and 156 deletions

View File

@ -1,156 +1,158 @@
"""Uploads sources and BLOBs to the AlmaLinux sources cache""" """Uploads sources and BLOBs to the AlmaLinux sources cache"""
import argparse import argparse
import logging import logging
import os import os
import sys import sys
from typing import Iterator, List, Optional, Tuple from typing import Iterator, List, Optional, Tuple
import boto3 import boto3
import botocore.exceptions import botocore.exceptions
from almalinux.gitutils.errors import ChecksumError from almalinux.gitutils.errors import ChecksumError
from almalinux.gitutils.common import ( from almalinux.gitutils.common import (
configure_logger, find_metadata_file, get_file_checksum, iter_metadata, configure_logger, find_metadata_file, get_file_checksum, iter_metadata,
normalize_path normalize_path
) )
def init_arg_parser() -> argparse.ArgumentParser: def init_arg_parser() -> argparse.ArgumentParser:
""" """
Initializes a command line arguments parser. Initializes a command line arguments parser.
Returns: Returns:
Command line arguments parser. Command line arguments parser.
""" """
arg_parser = argparse.ArgumentParser( arg_parser = argparse.ArgumentParser(
prog="alma_blob_upload", prog="alma_blob_upload",
description="Uploads sources and BLOBs to the AlmaLinux sources cache" description="Uploads sources and BLOBs to the AlmaLinux sources cache"
) )
group = arg_parser.add_mutually_exclusive_group() group = arg_parser.add_mutually_exclusive_group()
group.add_argument('-f', '--file', nargs='+', help='file(s) to upload') group.add_argument('-f', '--file', nargs='+', help='file(s) to upload')
group.add_argument('-i', '--input-metadata', metavar='INPUT_FILE', group.add_argument('-i', '--input-metadata', metavar='INPUT_FILE',
help='input metadata file list to upload. Will be ' help='input metadata file list to upload. Will be '
'detected automatically if omitted and no files ' 'detected automatically if omitted and no files '
'provided') 'provided')
arg_parser.add_argument('-b', '--bucket', default="sources.almalinux.org", arg_parser.add_argument('-b', '--bucket', default="sources.almalinux.org",
help='Amazon S3 bucket name. Default is ' help='Amazon S3 bucket name. Default is '
'sources.almalinux.org') 'sources.almalinux.org')
arg_parser.add_argument('-o', '--output-metadata', metavar='OUTPUT_FILE', arg_parser.add_argument('-o', '--output-metadata', metavar='OUTPUT_FILE',
help='output metadata file path') help='output metadata file path')
arg_parser.add_argument('-p', '--private', action='store_true', arg_parser.add_argument('-a', '--append-metadata', action='store_true',
help='set uploaded file mode to private. All ' help='Append to an output metadata')
'uploaded files are public by default') arg_parser.add_argument('-p', '--private', action='store_true',
arg_parser.add_argument('--domain-name', default='sources.almalinux.org', help='set uploaded file mode to private. All '
help='AlmaLinux sources server domain name. ' 'uploaded files are public by default')
'Default is sources.almalinux.org') arg_parser.add_argument('--domain-name', default='sources.almalinux.org',
arg_parser.add_argument('-v', '--verbose', action='store_true', help='AlmaLinux sources server domain name. '
help='enable additional debug output') 'Default is sources.almalinux.org')
return arg_parser arg_parser.add_argument('-v', '--verbose', action='store_true',
help='enable additional debug output')
return arg_parser
def iter_files(files: List[str]) -> Iterator[Tuple[str, str, str]]:
"""
Iterates over a list of files and calculates checksums for them. def iter_files(files: List[str]) -> Iterator[Tuple[str, str, str]]:
"""
Args: Iterates over a list of files and calculates checksums for them.
files: List of files.
Args:
Returns: files: List of files.
Iterator over files and their checksums.
""" Returns:
checksum_type = 'sha1' Iterator over files and their checksums.
for rel_path in files: """
file_path = normalize_path(rel_path) checksum_type = 'sha1'
checksum = get_file_checksum(file_path, checksum_type) for rel_path in files:
yield rel_path, checksum, checksum_type file_path = normalize_path(rel_path)
checksum = get_file_checksum(file_path, checksum_type)
yield rel_path, checksum, checksum_type
def is_file_exist(s3_client, bucket_name: str, checksum: str) -> bool:
"""
Checks is a file with a given checksum is already uploaded. def is_file_exist(s3_client, bucket_name: str, checksum: str) -> bool:
Args: """
s3_client: Amazon S3 client. Checks is a file with a given checksum is already uploaded.
bucket_name: S3 bucket name. Args:
checksum: File checksum. s3_client: Amazon S3 client.
bucket_name: S3 bucket name.
Returns: checksum: File checksum.
True if a file is already uploaded, False otherwise.
""" Returns:
try: True if a file is already uploaded, False otherwise.
s3_client.head_object(Bucket=bucket_name, Key=checksum) """
return True try:
except botocore.exceptions.ClientError: s3_client.head_object(Bucket=bucket_name, Key=checksum)
return False return True
except botocore.exceptions.ClientError:
return False
def upload_file(s3_client, bucket_name: str, file_path: str,
checksum: str, private: bool):
""" def upload_file(s3_client, bucket_name: str, file_path: str,
Uploads a file to an Amazon S3 bucket. checksum: str, private: bool):
"""
Args: Uploads a file to an Amazon S3 bucket.
s3_client: Amazon S3 client.
bucket_name: S3 bucket name. Args:
file_path: File path. s3_client: Amazon S3 client.
checksum: File checksum. bucket_name: S3 bucket name.
private: False if file should be public, True otherwise. file_path: File path.
""" checksum: File checksum.
acl = 'bucket-owner-full-control' if private else 'public-read' private: False if file should be public, True otherwise.
s3_client.upload_file(file_path, bucket_name, checksum, """
ExtraArgs={'ACL': acl}) acl = 'bucket-owner-full-control' if private else 'public-read'
s3_client.upload_file(file_path, bucket_name, checksum,
ExtraArgs={'ACL': acl})
def get_file_iterator(
files: List[str], metadata_path: Optional[str]
) -> Iterator[Tuple[str, str, str]]: def get_file_iterator(
""" files: List[str], metadata_path: Optional[str]
Finds a suitable file iterator for given arguments. ) -> Iterator[Tuple[str, str, str]]:
"""
Args: Finds a suitable file iterator for given arguments.
files: List of files.
metadata_path: Metadata file path. Args:
files: List of files.
Returns: metadata_path: Metadata file path.
File iterator.
""" Returns:
if files: File iterator.
iterator = iter_files(files) """
else: if files:
if not metadata_path: iterator = iter_files(files)
metadata_path = find_metadata_file(os.getcwd()) else:
iterator = iter_metadata(metadata_path) if not metadata_path:
return iterator metadata_path = find_metadata_file(os.getcwd())
iterator = iter_metadata(metadata_path)
return iterator
def main():
arg_parser = init_arg_parser()
args = arg_parser.parse_args(sys.argv[1:]) def main():
configure_logger(args.verbose) arg_parser = init_arg_parser()
s3_client = boto3.client('s3') args = arg_parser.parse_args(sys.argv[1:])
iterator = get_file_iterator(args.file, args.input_metadata) configure_logger(args.verbose)
out_fd = None s3_client = boto3.client('s3')
if args.output_metadata: iterator = get_file_iterator(args.file, args.input_metadata)
out_fd = open(args.output_metadata, 'w') out_fd = None
try: if args.output_metadata:
for rel_path, checksum, checksum_type in iterator: out_fd = open(args.output_metadata, 'w+' if args.append_metadata else 'w')
file_path = normalize_path(rel_path) try:
if not args.file: for rel_path, checksum, checksum_type in iterator:
real_checksum = get_file_checksum(file_path, checksum_type) file_path = normalize_path(rel_path)
if real_checksum != checksum: if not args.file:
raise ChecksumError( real_checksum = get_file_checksum(file_path, checksum_type)
f"{rel_path} {checksum_type} checksum {real_checksum} " if real_checksum != checksum:
f"doesn't match expected {checksum}" raise ChecksumError(
) f"{rel_path} {checksum_type} checksum {real_checksum} "
file_url = f'https://{args.domain_name}/{checksum}' f"doesn't match expected {checksum}"
if not is_file_exist(s3_client, args.bucket, checksum): )
upload_file(s3_client, args.bucket, file_path, checksum, file_url = f'https://{args.domain_name}/{checksum}'
args.private) if not is_file_exist(s3_client, args.bucket, checksum):
logging.info(f'{rel_path} successfully uploaded: {file_url}') upload_file(s3_client, args.bucket, file_path, checksum,
else: args.private)
logging.info(f'{rel_path} is already uploaded: {file_url}') logging.info(f'{rel_path} successfully uploaded: {file_url}')
if out_fd: else:
out_fd.write(f'{checksum} {rel_path}\n') logging.info(f'{rel_path} is already uploaded: {file_url}')
finally: if out_fd:
if out_fd: out_fd.write(f'{checksum} {rel_path}\n')
out_fd.close() finally:
if out_fd:
out_fd.close()