ALBS-1216: Implement support for new SHA512 external sources file format in almalinux-git-utils

- New key argument `--sha512` for the sources uploader if you need to upload a source in new format (using sha512 checksum)
- A separate recognizer of a checksum type in metadata file
- The sources downloader can automatically detect a type of checksum
This commit is contained in:
Stepan Oksanichenko 2024-06-11 23:43:51 +03:00
parent 16e73461cb
commit 478c020211
Signed by: soksanichenko
GPG Key ID: AB9983172AB1E45B
2 changed files with 37 additions and 7 deletions

View File

@ -40,6 +40,8 @@ def init_arg_parser() -> argparse.ArgumentParser:
help='output metadata file path') help='output metadata file path')
arg_parser.add_argument('-a', '--append-metadata', action='store_true', arg_parser.add_argument('-a', '--append-metadata', action='store_true',
help='Append to an output metadata') help='Append to an output metadata')
arg_parser.add_argument('--sha512', action='store_true',
help='Use a new format of hashsum')
arg_parser.add_argument('-p', '--private', action='store_true', arg_parser.add_argument('-p', '--private', action='store_true',
help='set uploaded file mode to private. All ' help='set uploaded file mode to private. All '
'uploaded files are public by default') 'uploaded files are public by default')
@ -51,17 +53,21 @@ def init_arg_parser() -> argparse.ArgumentParser:
return arg_parser return arg_parser
def iter_files(files: List[str]) -> Iterator[Tuple[str, str, str]]: def iter_files(
files: List[str],
sha512: bool
) -> Iterator[Tuple[str, str, str]]:
""" """
Iterates over a list of files and calculates checksums for them. Iterates over a list of files and calculates checksums for them.
Args: Args:
files: List of files. files: List of files.
sha512: True if we use new format of hashsum.
Returns: Returns:
Iterator over files and their checksums. Iterator over files and their checksums.
""" """
checksum_type = 'sha1' checksum_type = 'sha512' if sha512 else 'sha1'
for rel_path in files: for rel_path in files:
file_path = normalize_path(rel_path) file_path = normalize_path(rel_path)
checksum = get_file_checksum(file_path, checksum_type) checksum = get_file_checksum(file_path, checksum_type)
@ -104,7 +110,9 @@ def upload_file(s3_client, bucket_name: str, file_path: str,
def get_file_iterator( def get_file_iterator(
files: List[str], metadata_path: Optional[str] files: List[str],
metadata_path: Optional[str],
sha512: bool,
) -> Iterator[Tuple[str, str, str]]: ) -> Iterator[Tuple[str, str, str]]:
""" """
Finds a suitable file iterator for given arguments. Finds a suitable file iterator for given arguments.
@ -112,12 +120,16 @@ def get_file_iterator(
Args: Args:
files: List of files. files: List of files.
metadata_path: Metadata file path. metadata_path: Metadata file path.
sha512: True if we use new format of hashsum.
Returns: Returns:
File iterator. File iterator.
""" """
if files: if files:
iterator = iter_files(files) iterator = iter_files(
files=files,
sha512=sha512,
)
else: else:
if not metadata_path: if not metadata_path:
metadata_path = find_metadata_file(os.getcwd()) metadata_path = find_metadata_file(os.getcwd())
@ -130,10 +142,18 @@ def main():
args = arg_parser.parse_args(sys.argv[1:]) args = arg_parser.parse_args(sys.argv[1:])
configure_logger(args.verbose) configure_logger(args.verbose)
s3_client = boto3.client('s3') s3_client = boto3.client('s3')
iterator = get_file_iterator(args.file, args.input_metadata) iterator = get_file_iterator(
args.file,
args.input_metadata,
args.sha512,
)
out_fd = None out_fd = None
if args.append_metadata:
file_mdoe = 'a'
else:
file_mdoe = 'w'
if args.output_metadata: if args.output_metadata:
out_fd = open(args.output_metadata, 'a' if args.append_metadata else 'w') out_fd = open(args.output_metadata, file_mdoe)
try: try:
for rel_path, checksum, checksum_type in iterator: for rel_path, checksum, checksum_type in iterator:
file_path = normalize_path(rel_path) file_path = normalize_path(rel_path)

View File

@ -90,6 +90,16 @@ def get_file_checksum(file_path: str, checksum_type: str = 'sha1',
return hasher.hexdigest() return hasher.hexdigest()
def extract_checksum_and_file_path(line: str) -> Tuple[str, str]:
if 'SHA512' in line.upper():
file_path, checksum = line.split('=')
file_path = file_path.strip().split(' ')[1].strip('(').strip(')')
checksum = checksum.strip()
else:
checksum, file_path = line.split()
return checksum, file_path
def iter_metadata(metadata_path: str) -> Iterator[Tuple[str, str, str]]: def iter_metadata(metadata_path: str) -> Iterator[Tuple[str, str, str]]:
""" """
Iterates over records in a CentOS git repository-compatible metadata file. Iterates over records in a CentOS git repository-compatible metadata file.
@ -102,7 +112,7 @@ def iter_metadata(metadata_path: str) -> Iterator[Tuple[str, str, str]]:
""" """
with open(metadata_path, 'r') as fd: with open(metadata_path, 'r') as fd:
for line in fd: for line in fd:
checksum, file_path = line.split() checksum, file_path = extract_checksum_and_file_path(line)
checksum_type = detect_checksum_type(checksum) checksum_type = detect_checksum_type(checksum)
yield file_path, checksum, checksum_type yield file_path, checksum, checksum_type