# coding=utf-8 """ The tool allow to generate package.json. This file is used by pungi # as parameter `gather_prepopulate` Sample of using repodata files taken from https://github.com/rpm-software-management/createrepo_c/blob/master/examples/python/repodata_parsing.py """ import argparse import gzip import json import logging import lzma import os import re import tempfile from collections import defaultdict from itertools import tee from pathlib import Path from typing import ( AnyStr, Dict, List, Any, Iterator, Optional, Tuple, Union, ) import binascii from urllib.parse import urljoin import requests import rpm import yaml from createrepo_c import ( Package, PackageIterator, Repomd, RepomdRecord, ) from dataclasses import dataclass, field from kobo.rpmlib import parse_nvra logging.basicConfig(level=logging.INFO) def _is_compressed_file(first_two_bytes: bytes, initial_bytes: bytes): return binascii.hexlify(first_two_bytes) == initial_bytes def is_gzip_file(first_two_bytes): return _is_compressed_file( first_two_bytes=first_two_bytes, initial_bytes=b'1f8b', ) def is_xz_file(first_two_bytes): return _is_compressed_file( first_two_bytes=first_two_bytes, initial_bytes=b'fd37', ) @dataclass class RepoInfo: # path to a directory with repo directories. E.g. '/var/repos' contains # 'appstream', 'baseos', etc. # Or 'http://koji.cloudlinux.com/mirrors/rhel_mirror' if you are # using remote repo path: str # name of folder with a repodata folder. E.g. 'baseos', 'appstream', etc folder: str # Is a repo remote or local is_remote: bool # Is a reference repository (usually it's a RHEL repo) # Layout of packages from such repository will be taken as example # Only layout of specific package (which doesn't exist # in a reference repository) will be taken as example is_reference: bool = False # The packages from 'present' repo will be added to a variant. # The packages from 'absent' repo will be removed from a variant. repo_type: str = 'present' @dataclass class VariantInfo: # name of variant. E.g. 'BaseOS', 'AppStream', etc name: AnyStr # architecture of variant. E.g. 'x86_64', 'i686', etc arch: AnyStr # The packages which will be not added to a variant excluded_packages: List[str] = field(default_factory=list) # Repos of a variant repos: List[RepoInfo] = field(default_factory=list) class PackagesGenerator: repo_arches = defaultdict(lambda: list(('noarch',))) addon_repos = { 'x86_64': ['i686'], 'ppc64le': [], 'aarch64': [], 's390x': [], 'i686': [], } def __init__( self, variants: List[VariantInfo], excluded_packages: List[AnyStr], included_packages: List[AnyStr], ): self.variants = variants self.pkgs = dict() self.excluded_packages = excluded_packages self.included_packages = included_packages self.tmp_files = [] for arch, arch_list in self.addon_repos.items(): self.repo_arches[arch].extend(arch_list) self.repo_arches[arch].append(arch) def __del__(self): for tmp_file in self.tmp_files: if os.path.exists(tmp_file): os.remove(tmp_file) @staticmethod def _get_full_repo_path(repo_info: RepoInfo): result = os.path.join( repo_info.path, repo_info.folder ) if repo_info.is_remote: result = urljoin( repo_info.path + '/', repo_info.folder, ) return result @staticmethod def _warning_callback(warning_type, message): """ Warning callback for createrepo_c parsing functions """ print(f'Warning message: "{message}"; warning type: "{warning_type}"') return True @staticmethod def get_remote_file_content(file_url: AnyStr) -> AnyStr: """ Get content from a remote file and write it to a temp file :param file_url: url of a remote file :return: path to a temp file """ file_request = requests.get( url=file_url, ) file_request.raise_for_status() with tempfile.NamedTemporaryFile(delete=False) as file_stream: file_stream.write(file_request.content) return file_stream.name @staticmethod def _parse_repomd(repomd_file_path: AnyStr) -> Repomd: """ Parse file repomd.xml and create object Repomd :param repomd_file_path: path to local repomd.xml """ return Repomd(repomd_file_path) @classmethod def _parse_modules_file( cls, modules_file_path: AnyStr, ) -> Iterator[Any]: """ Parse modules.yaml.gz and returns parsed data :param modules_file_path: path to local modules.yaml.gz :return: List of dict for each module in a repo """ with open(modules_file_path, 'rb') as modules_file: data = modules_file.read() if is_gzip_file(data[:2]): data = gzip.decompress(data) elif is_xz_file(data[:2]): data = lzma.decompress(data) return yaml.load_all( data, Loader=yaml.BaseLoader, ) def _get_repomd_records( self, repo_info: RepoInfo, ) -> List[RepomdRecord]: """ Get, parse file repomd.xml and extract from it repomd records :param repo_info: structure which contains info about a current repo :return: list with repomd records """ repomd_file_path = os.path.join( repo_info.path, repo_info.folder, 'repodata', 'repomd.xml', ) if repo_info.is_remote: repomd_file_path = urljoin( urljoin( repo_info.path + '/', repo_info.folder ) + '/', 'repodata/repomd.xml' ) repomd_file_path = self.get_remote_file_content(repomd_file_path) repomd_object = self._parse_repomd(repomd_file_path) if repo_info.is_remote: os.remove(repomd_file_path) return repomd_object.records def _download_repomd_records( self, repo_info: RepoInfo, repomd_records: List[RepomdRecord], repomd_records_dict: Dict[str, str], ): """ Download repomd records :param repo_info: structure which contains info about a current repo :param repomd_records: list with repomd records :param repomd_records_dict: dict with paths to repodata files """ for repomd_record in repomd_records: if repomd_record.type not in ( 'primary', 'filelists', 'other', ): continue repomd_record_file_path = os.path.join( repo_info.path, repo_info.folder, repomd_record.location_href, ) if repo_info.is_remote: repomd_record_file_path = self.get_remote_file_content( repomd_record_file_path) self.tmp_files.append(repomd_record_file_path) repomd_records_dict[repomd_record.type] = repomd_record_file_path def _parse_module_repomd_record( self, repo_info: RepoInfo, repomd_records: List[RepomdRecord], ) -> List[Dict]: """ Download repomd records :param repo_info: structure which contains info about a current repo :param repomd_records: list with repomd records """ for repomd_record in repomd_records: if repomd_record.type != 'modules': continue repomd_record_file_path = os.path.join( repo_info.path, repo_info.folder, repomd_record.location_href, ) if repo_info.is_remote: repomd_record_file_path = self.get_remote_file_content( repomd_record_file_path) self.tmp_files.append(repomd_record_file_path) return list(self._parse_modules_file( repomd_record_file_path, )) @staticmethod def compare_pkgs_version(package_1: Package, package_2: Package) -> int: version_tuple_1 = ( package_1.epoch, package_1.version, package_1.release, ) version_tuple_2 = ( package_2.epoch, package_2.version, package_2.release, ) return rpm.labelCompare(version_tuple_1, version_tuple_2) def get_packages_iterator( self, repo_info: RepoInfo, ) -> Union[PackageIterator, Iterator]: full_repo_path = self._get_full_repo_path(repo_info) pkgs_iterator = self.pkgs.get(full_repo_path) if pkgs_iterator is None: repomd_records = self._get_repomd_records( repo_info=repo_info, ) repomd_records_dict = {} # type: Dict[str, str] self._download_repomd_records( repo_info=repo_info, repomd_records=repomd_records, repomd_records_dict=repomd_records_dict, ) pkgs_iterator = PackageIterator( primary_path=repomd_records_dict['primary'], filelists_path=repomd_records_dict['filelists'], other_path=repomd_records_dict['other'], warningcb=self._warning_callback, ) pkgs_iterator, self.pkgs[full_repo_path] = tee(pkgs_iterator) return pkgs_iterator def get_package_arch( self, package: Package, variant_arch: str, ) -> str: result = variant_arch if package.arch in self.repo_arches[variant_arch]: result = package.arch return result def is_skipped_module_package(self, package: Package) -> bool: # Even a module package will be added to packages.json if # it presents in the list of included packages return 'module' in package.release and not any( re.search(included_package, package.name) for included_package in self.included_packages ) def is_excluded_package( self, package: Package, variant_arch: str, excluded_packages: List[str], ) -> bool: return any( re.search( excluded_pkg, self.get_package_key(package, variant_arch), ) for excluded_pkg in excluded_packages ) @staticmethod def get_source_rpm_name(package: Package) -> str: source_rpm_nvra = parse_nvra(package.rpm_sourcerpm) return source_rpm_nvra['name'] def get_package_key(self, package: Package, variant_arch: str) -> str: return ( f'{package.name}.' f'{self.get_package_arch(package, variant_arch)}' ) def generate_packages_json( self ) -> Dict[AnyStr, Dict[AnyStr, Dict[AnyStr, List[AnyStr]]]]: """ Generate packages.json """ packages = defaultdict(lambda: defaultdict(lambda: { 'variants': list(), })) for variant_info in self.variants: for repo_info in variant_info.repos: is_reference = repo_info.is_reference for package in self.get_packages_iterator(repo_info=repo_info): if self.is_skipped_module_package(package): continue if self.is_excluded_package( package, variant_info.arch, self.excluded_packages, ): continue if self.is_excluded_package( package, variant_info.arch, variant_info.excluded_packages, ): continue package_key = self.get_package_key( package, variant_info.arch, ) source_rpm_name = self.get_source_rpm_name(package) package_info = packages[source_rpm_name][package_key] if 'is_reference' not in package_info: package_info['variants'].append(variant_info.name) package_info['is_reference'] = is_reference package_info['package'] = package elif not package_info['is_reference'] or \ package_info['is_reference'] == is_reference and \ self.compare_pkgs_version( package_1=package, package_2=package_info['package'], ) > 0: package_info['variants'] = [variant_info.name] package_info['is_reference'] = is_reference package_info['package'] = package elif self.compare_pkgs_version( package_1=package, package_2=package_info['package'], ) == 0 and repo_info.repo_type != 'absent': package_info['variants'].append(variant_info.name) result = defaultdict(lambda: defaultdict( lambda: defaultdict(list), )) for variant_info in self.variants: for source_rpm_name, packages_info in packages.items(): for package_key, package_info in packages_info.items(): variant_pkgs = result[variant_info.name][variant_info.arch] if variant_info.name not in package_info['variants']: continue variant_pkgs[source_rpm_name].append(package_key) return result def create_parser(): parser = argparse.ArgumentParser() parser.add_argument( '-c', '--config', type=Path, default=Path('config.yaml'), required=False, help='Path to a config', ) parser.add_argument( '-o', '--json-output-path', type=str, help='Full path to output json file', required=True, ) return parser def read_config(config_path: Path) -> Optional[Dict]: if not config_path.exists(): logging.error('A config by path "%s" does not exist', config_path) exit(1) with config_path.open('r') as config_fd: return yaml.safe_load(config_fd) def process_config(config_data: Dict) -> Tuple[ List[VariantInfo], List[str], List[str], ]: excluded_packages = config_data.get('excluded_packages', []) included_packages = config_data.get('included_packages', []) variants = [VariantInfo( name=variant_name, arch=variant_info['arch'], excluded_packages=variant_info.get('excluded_packages', []), repos=[RepoInfo( path=variant_repo['path'], folder=variant_repo['folder'], is_remote=variant_repo['remote'], is_reference=variant_repo['reference'], repo_type=variant_repo.get('repo_type', 'present'), ) for variant_repo in variant_info['repos']] ) for variant_name, variant_info in config_data['variants'].items()] return variants, excluded_packages, included_packages def cli_main(): args = create_parser().parse_args() variants, excluded_packages, included_packages = process_config( config_data=read_config(args.config) ) pg = PackagesGenerator( variants=variants, excluded_packages=excluded_packages, included_packages=included_packages, ) result = pg.generate_packages_json() with open(args.json_output_path, 'w') as packages_file: json.dump( result, packages_file, indent=4, sort_keys=True, ) if __name__ == '__main__': cli_main()