import gzip import lzma import os from argparse import ArgumentParser, FileType from glob import iglob from io import BytesIO from pathlib import Path from typing import List, AnyStr, Iterable, Union, Optional import logging from urllib.parse import urljoin import yaml import createrepo_c as cr from typing.io import BinaryIO from .create_packages_json import PackagesGenerator, is_gzip_file, is_xz_file EMPTY_FILE = '.empty' def read_modules_yaml(modules_yaml_path: Union[str, Path]) -> BytesIO: with open(modules_yaml_path, 'rb') as fp: return BytesIO(fp.read()) def grep_list_of_modules_yaml(repos_path: AnyStr) -> Iterable[BytesIO]: """ Find all of valid *modules.yaml.gz in repos :param repos_path: path to a directory which contains repo dirs :return: iterable object of content from *modules.yaml.* """ return ( read_modules_yaml_from_specific_repo(repo_path=Path(path).parent) for path in iglob( str(Path(repos_path).joinpath('**/repodata')), recursive=True ) ) def _is_remote(path: str): return any(str(path).startswith(protocol) for protocol in ('http', 'https')) def read_modules_yaml_from_specific_repo( repo_path: Union[str, Path] ) -> Optional[BytesIO]: """ Read modules_yaml from a specific repo (remote or local) :param repo_path: path/url to a specific repo (final dir should contain dir `repodata`) :return: iterable object of content from *modules.yaml.* """ if _is_remote(repo_path): repomd_url = urljoin( repo_path + '/', 'repodata/repomd.xml', ) repomd_file_path = PackagesGenerator.get_remote_file_content( file_url=repomd_url ) else: repomd_file_path = os.path.join( repo_path, 'repodata/repomd.xml', ) repomd_obj = cr.Repomd(str(repomd_file_path)) for record in repomd_obj.records: if record.type != 'modules': continue else: if _is_remote(repo_path): modules_yaml_url = urljoin( repo_path + '/', record.location_href, ) modules_yaml_path = PackagesGenerator.get_remote_file_content( file_url=modules_yaml_url ) else: modules_yaml_path = os.path.join( repo_path, record.location_href, ) return read_modules_yaml(modules_yaml_path=modules_yaml_path) else: return None def _should_grep_defaults( document_type: str, grep_only_modules_data: bool = False, grep_only_modules_defaults_data: bool = False, ) -> bool: xor_flag = grep_only_modules_data == grep_only_modules_defaults_data if document_type == 'modulemd' and (xor_flag or grep_only_modules_data): return True return False def _should_grep_modules( document_type: str, grep_only_modules_data: bool = False, grep_only_modules_defaults_data: bool = False, ) -> bool: xor_flag = grep_only_modules_data == grep_only_modules_defaults_data if document_type == 'modulemd-defaults' and \ (xor_flag or grep_only_modules_defaults_data): return True return False def collect_modules( modules_paths: List[BinaryIO], target_dir: str, grep_only_modules_data: bool = False, grep_only_modules_defaults_data: bool = False, ): """ Read given modules.yaml.gz files and export modules and modulemd files from it. Returns: object: """ xor_flag = grep_only_modules_defaults_data is grep_only_modules_data modules_path = os.path.join(target_dir, 'modules') module_defaults_path = os.path.join(target_dir, 'module_defaults') if grep_only_modules_data or xor_flag: os.makedirs(modules_path, exist_ok=True) if grep_only_modules_defaults_data or xor_flag: os.makedirs(module_defaults_path, exist_ok=True) # Defaults modules can be empty, but pungi detects # empty folder while copying and raises the exception in this case Path(os.path.join(module_defaults_path, EMPTY_FILE)).touch() for module_file in modules_paths: data = module_file.read() if is_gzip_file(data[:2]): data = gzip.decompress(data) elif is_xz_file(data[:2]): data = lzma.decompress(data) documents = yaml.load_all(data, Loader=yaml.BaseLoader) for doc in documents: path = None if _should_grep_modules( doc['document'], grep_only_modules_data, grep_only_modules_defaults_data, ): name = f"{doc['data']['module']}.yaml" path = os.path.join(module_defaults_path, name) logging.info('Found %s module defaults', name) elif _should_grep_defaults( doc['document'], grep_only_modules_data, grep_only_modules_defaults_data, ): # pungi.phases.pkgset.sources.source_koji.get_koji_modules stream = doc['data']['stream'].replace('-', '_') doc_data = doc['data'] name = f"{doc_data['name']}-{stream}-" \ f"{doc_data['version']}.{doc_data['context']}" arch_dir = os.path.join( modules_path, doc_data['arch'] ) os.makedirs(arch_dir, exist_ok=True) path = os.path.join( arch_dir, name, ) logging.info('Found module %s', name) if 'artifacts' not in doc['data']: logging.warning( 'RPM %s does not have explicit list of artifacts', name ) if path is not None: with open(path, 'w') as f: yaml.dump(doc, f, default_flow_style=False) def cli_main(): parser = ArgumentParser() content_type_group = parser.add_mutually_exclusive_group(required=False) content_type_group.add_argument( '--get-only-modules-data', action='store_true', help='Parse and get only modules data', ) content_type_group.add_argument( '--get-only-modules-defaults-data', action='store_true', help='Parse and get only modules_defaults data', ) path_group = parser.add_mutually_exclusive_group(required=True) path_group.add_argument( '-p', '--path', type=FileType('rb'), nargs='+', help='Path to modules.yaml.gz file. ' 'You may pass multiple files by passing -p path1 path2' ) path_group.add_argument( '-rp', '--repo-path', required=False, type=str, default=None, help='Path to a directory which contains repodirs. E.g. /var/repos' ) path_group.add_argument( '-rd', '--repodata-paths', required=False, type=str, nargs='+', default=[], help='Paths/urls to the directories with directory `repodata`', ) parser.add_argument('-t', '--target', required=True) namespace = parser.parse_args() if namespace.repodata_paths: modules = [] for repodata_path in namespace.repodata_paths: modules.append(read_modules_yaml_from_specific_repo( repodata_path, )) elif namespace.path is not None: modules = namespace.path else: modules = grep_list_of_modules_yaml(namespace.repo_path) modules = list(filter(lambda i: i is not None, modules)) collect_modules( modules, namespace.target, namespace.get_only_modules_data, namespace.get_only_modules_defaults_data, ) if __name__ == '__main__': cli_main()