# coding=utf-8 """ The tool allow to generate package.json. This file is used by pungi # as parameter `gather_prepopulate` Sample of using repodata files taken from https://github.com/rpm-software-management/createrepo_c/blob/master/examples/python/repodata_parsing.py """ import argparse import gzip import json import lzma import os import re import tempfile from collections import defaultdict from typing import AnyStr, Dict, List, Any, Iterator import binascii import createrepo_c as cr import dnf.subject import hawkey import requests import rpm import yaml from createrepo_c import Package, PackageIterator from dataclasses import dataclass def _is_compressed_file(first_two_bytes: bytes, initial_bytes: bytes): return binascii.hexlify(first_two_bytes) == initial_bytes def is_gzip_file(first_two_bytes): return _is_compressed_file( first_two_bytes=first_two_bytes, initial_bytes=b'1f8b', ) def is_xz_file(first_two_bytes): return _is_compressed_file( first_two_bytes=first_two_bytes, initial_bytes=b'fd37', ) @dataclass class RepoInfo: # path to a directory with repo directories. E.g. '/var/repos' contains # 'appstream', 'baseos', etc. # Or 'http://koji.cloudlinux.com/mirrors/rhel_mirror' if you are # using remote repo path: AnyStr # name of folder with a repodata folder. E.g. 'baseos', 'appstream', etc folder: AnyStr # name of repo. E.g. 'BaseOS', 'AppStream', etc name: AnyStr # architecture of repo. E.g. 'x86_64', 'i686', etc arch: AnyStr # Is a repo remote or local is_remote: bool # Is a reference repository (usually it's a RHEL repo) # Layout of packages from such repository will be taken as example # Only layout of specific package (which don't exist # in a reference repository) will be taken as example is_reference: bool = False repo_type: str = 'present' class PackagesGenerator: repo_arches = defaultdict(lambda: list(('noarch',))) addon_repos = { 'x86_64': ['i686'], 'ppc64le': [], 'aarch64': [], 's390x': [], 'i686': [], } def __init__( self, repos: List[RepoInfo], excluded_packages: List[AnyStr], included_packages: List[AnyStr], ): self.repos = repos self.excluded_packages = excluded_packages self.included_packages = included_packages self.tmp_files = [] for arch, arch_list in self.addon_repos.items(): self.repo_arches[arch].extend(arch_list) self.repo_arches[arch].append(arch) def __del__(self): for tmp_file in self.tmp_files: if os.path.exists(tmp_file): os.remove(tmp_file) @staticmethod def _warning_callback(warning_type, message): """ Warning callback for createrepo_c parsing functions """ print(f'Warning message: "{message}"; warning type: "{warning_type}"') return True @staticmethod def get_remote_file_content(file_url: AnyStr) -> AnyStr: """ Get content from a remote file and write it to a temp file :param file_url: url of a remote file :return: path to a temp file """ file_request = requests.get( url=file_url, ) file_request.raise_for_status() with tempfile.NamedTemporaryFile(delete=False) as file_stream: file_stream.write(file_request.content) return file_stream.name @staticmethod def _parse_repomd(repomd_file_path: AnyStr) -> cr.Repomd: """ Parse file repomd.xml and create object Repomd :param repomd_file_path: path to local repomd.xml """ return cr.Repomd(repomd_file_path) @classmethod def _parse_modules_file( cls, modules_file_path: AnyStr, ) -> Iterator[Any]: """ Parse modules.yaml.gz and returns parsed data :param modules_file_path: path to local modules.yaml.gz :return: List of dict for each modules in a repo """ with open(modules_file_path, 'rb') as modules_file: data = modules_file.read() if is_gzip_file(data[:2]): data = gzip.decompress(data) elif is_xz_file(data[:2]): data = lzma.decompress(data) return yaml.load_all( data, Loader=yaml.BaseLoader, ) def _get_repomd_records( self, repo_info: RepoInfo, ) -> List[cr.RepomdRecord]: """ Get, parse file repomd.xml and extract from it repomd records :param repo_info: structure which contains info about a current repo :return: list with repomd records """ repomd_file_path = os.path.join( repo_info.path, repo_info.folder, 'repodata', 'repomd.xml', ) if repo_info.is_remote: repomd_file_path = self.get_remote_file_content(repomd_file_path) else: repomd_file_path = repomd_file_path repomd_object = self._parse_repomd(repomd_file_path) if repo_info.is_remote: os.remove(repomd_file_path) return repomd_object.records def _download_repomd_records( self, repo_info: RepoInfo, repomd_records: List[cr.RepomdRecord], repomd_records_dict: Dict[str, str], ): """ Download repomd records :param repo_info: structure which contains info about a current repo :param repomd_records: list with repomd records :param repomd_records_dict: dict with paths to repodata files """ for repomd_record in repomd_records: if repomd_record.type not in ( 'primary', 'filelists', 'other', ): continue repomd_record_file_path = os.path.join( repo_info.path, repo_info.folder, repomd_record.location_href, ) if repo_info.is_remote: repomd_record_file_path = self.get_remote_file_content( repomd_record_file_path) self.tmp_files.append(repomd_record_file_path) repomd_records_dict[repomd_record.type] = repomd_record_file_path def _parse_module_repomd_record( self, repo_info: RepoInfo, repomd_records: List[cr.RepomdRecord], ) -> List[Dict]: """ Download repomd records :param repo_info: structure which contains info about a current repo :param repomd_records: list with repomd records :param repomd_records_dict: dict with paths to repodata files """ for repomd_record in repomd_records: if repomd_record.type != 'modules': continue repomd_record_file_path = os.path.join( repo_info.path, repo_info.folder, repomd_record.location_href, ) if repo_info.is_remote: repomd_record_file_path = self.get_remote_file_content( repomd_record_file_path) self.tmp_files.append(repomd_record_file_path) return list(self._parse_modules_file( repomd_record_file_path, )) @staticmethod def compare_pkgs_version(package_1: Package, package_2: Package) -> int: version_tuple_1 = ( package_1.epoch, package_1.version, package_1.release, ) version_tuple_2 = ( package_2.epoch, package_2.version, package_2.release, ) return rpm.labelCompare(version_tuple_1, version_tuple_2) def generate_packages_json( self ) -> Dict[AnyStr, Dict[AnyStr, Dict[AnyStr, List[AnyStr]]]]: """ Generate packages.json """ packages_json = defaultdict( lambda: defaultdict( lambda: defaultdict( list, ) ) ) all_packages = defaultdict(lambda: {'variants': list()}) for repo_info in sorted( self.repos, key=lambda i: i.repo_type, reverse=True, ): repomd_records = self._get_repomd_records( repo_info=repo_info, ) repomd_records_dict = {} # type: Dict[str, str] self._download_repomd_records( repo_info=repo_info, repomd_records=repomd_records, repomd_records_dict=repomd_records_dict, ) packages_iterator = PackageIterator( primary_path=repomd_records_dict['primary'], filelists_path=repomd_records_dict['filelists'], other_path=repomd_records_dict['other'], warningcb=self._warning_callback, ) for package in packages_iterator: if package.arch not in self.repo_arches[repo_info.arch]: package_arch = repo_info.arch else: package_arch = package.arch package_key = f'{package.name}.{package_arch}' if 'module' in package.release and not any( re.search(included_package, package.name) for included_package in self.included_packages ): # Even a module package will be added to packages.json if # it presents in the list of included packages continue if package_key not in all_packages: all_packages[package_key]['variants'].append( (repo_info.name, repo_info.arch) ) all_packages[package_key]['arch'] = package_arch all_packages[package_key]['package'] = package all_packages[package_key]['type'] = repo_info.is_reference elif repo_info.repo_type == 'absent' and (repo_info.name, repo_info.arch) in all_packages[package_key]['variants']: all_packages[package_key]['variants'].remove((repo_info.name, repo_info.arch)) # replace an older package if it's not reference or # a newer package is from reference repo elif (not all_packages[package_key]['type'] or all_packages[package_key]['type'] == repo_info.is_reference) and \ self.compare_pkgs_version( package, all_packages[package_key]['package'] ) > 0: all_packages[package_key]['variants'] = [ (repo_info.name, repo_info.arch) ] all_packages[package_key]['arch'] = package_arch all_packages[package_key]['package'] = package elif self.compare_pkgs_version( package, all_packages[package_key]['package'] ) == 0: all_packages[package_key]['variants'].append( (repo_info.name, repo_info.arch) ) for package_dict in all_packages.values(): for variant_name, variant_arch in package_dict['variants']: package_arch = package_dict['arch'] package = package_dict['package'] package_name = package.name if any(re.search(excluded_package, package_name) for excluded_package in self.excluded_packages): continue src_package_name = dnf.subject.Subject( package.rpm_sourcerpm, ).get_nevra_possibilities( forms=hawkey.FORM_NEVRA, ) if len(src_package_name) > 1: # We should stop utility if we can't get exact name of srpm raise ValueError( 'We can\'t get exact name of srpm ' f'by its NEVRA "{package.rpm_sourcerpm}"' ) else: src_package_name = src_package_name[0].name # TODO: for x86_64 + i686 in one packages.json # don't remove! # if package.arch in self.addon_repos[variant_arch]: # arches = self.addon_repos[variant_arch] + [variant_arch] # else: # arches = [variant_arch] # for arch in arches: # pkgs_list = packages_json[variant_name][ # arch][src_package_name] # added_pkg = f'{package_name}.{package_arch}' # if added_pkg not in pkgs_list: # pkgs_list.append(added_pkg) pkgs_list = packages_json[variant_name][ variant_arch][src_package_name] added_pkg = f'{package_name}.{package_arch}' if added_pkg not in pkgs_list: pkgs_list.append(added_pkg) return packages_json def create_parser(): parser = argparse.ArgumentParser() parser.add_argument( '--repo-path', action='append', help='Path to a folder with repofolders. E.g. "/var/repos" or ' '"http://koji.cloudlinux.com/mirrors/rhel_mirror"', required=True, ) parser.add_argument( '--repo-folder', action='append', help='A folder which contains folder repodata . E.g. "baseos-stream"', required=True, ) parser.add_argument( '--repo-arch', action='append', help='What architecture packages a repository contains. E.g. "x86_64"', required=True, ) parser.add_argument( '--repo-name', action='append', help='Name of a repository. E.g. "AppStream"', required=True, ) parser.add_argument( '--is-remote', action='append', type=str, help='A repository is remote or local', choices=['yes', 'no'], required=True, ) parser.add_argument( '--is-reference', action='append', type=str, help='A repository is used as reference for packages layout', choices=['yes', 'no'], required=True, ) parser.add_argument( '--repo-type', action='append', type=str, help='Packages from repository will be removed or added to variant', choices=['present', 'absent'], required=True, ) parser.add_argument( '--excluded-packages', nargs='+', type=str, default=[], help='A list of globally excluded packages from generated json.' 'All of list elements should be separated by space', required=False, ) parser.add_argument( '--included-packages', nargs='+', type=str, default=[], help='A list of globally included packages from generated json.' 'All of list elements should be separated by space', required=False, ) parser.add_argument( '--json-output-path', type=str, help='Full path to output json file', required=True, ) return parser def cli_main(): args = create_parser().parse_args() repos = [] for repo_path, repo_folder, repo_name, \ repo_arch, is_remote, is_reference, repo_type in zip( args.repo_path, args.repo_folder, args.repo_name, args.repo_arch, args.is_remote, args.is_reference, args.repo_type, ): repos.append(RepoInfo( path=repo_path, folder=repo_folder, name=repo_name, arch=repo_arch, is_remote=True if is_remote == 'yes' else False, is_reference=True if is_reference == 'yes' else False, repo_type=repo_type, )) pg = PackagesGenerator( repos=repos, excluded_packages=args.excluded_packages, included_packages=args.included_packages, ) result = pg.generate_packages_json() with open(args.json_output_path, 'w') as packages_file: json.dump( result, packages_file, indent=4, sort_keys=True, ) if __name__ == '__main__': cli_main()