# coding=utf-8 """ The tool allow to generate package.json. This file is used by pungi # as parameter `gather_prepopulate` Sample of using repodata files taken from https://github.com/rpm-software-management/createrepo_c/blob/master/examples/python/repodata_parsing.py """ import argparse import gzip import json import lzma import os import re import tempfile from collections import defaultdict from typing import AnyStr, Dict, List, Optional, Any, Iterator import binascii import createrepo_c as cr import dnf.subject import hawkey import requests import rpm import yaml from createrepo_c import Package, PackageIterator from dataclasses import dataclass def _is_compressed_file(first_two_bytes: bytes, initial_bytes: bytes): return binascii.hexlify(first_two_bytes) == initial_bytes def is_gzip_file(first_two_bytes): return _is_compressed_file( first_two_bytes=first_two_bytes, initial_bytes=b'1f8b', ) def is_xz_file(first_two_bytes): return _is_compressed_file( first_two_bytes=first_two_bytes, initial_bytes=b'fd37', ) @dataclass class RepoInfo: # path to a directory with repo directories. E.g. '/var/repos' contains # 'appstream', 'baseos', etc. # Or 'http://koji.cloudlinux.com/mirrors/rhel_mirror' if you are # using remote repo path: AnyStr # name of folder with a repodata folder. E.g. 'baseos', 'appstream', etc folder: AnyStr # name of repo. E.g. 'BaseOS', 'AppStream', etc name: AnyStr # architecture of repo. E.g. 'x86_64', 'i686', etc arch: AnyStr # Is a repo remote or local is_remote: bool # Is a reference repository (usually it's a RHEL repo) # Layout of packages from such repository will be taken as example # Only layout of specific package (which don't exist # in a reference repository) will be taken as example is_reference: bool = False strict_arch: bool = False class PackagesGenerator: repo_arches = defaultdict(lambda: list(('noarch',))) addon_repos = { 'x86_64': ['i686'], 'ppc64le': [], 'aarch64': [], 's390x': [], 'i686': [], } def __init__( self, repos: List[RepoInfo], excluded_packages: List[AnyStr], included_packages: List[AnyStr], ): self.repos = repos self.excluded_packages = excluded_packages self.included_packages = included_packages self.tmp_files = [] for arch, arch_list in self.addon_repos.items(): self.repo_arches[arch].extend(arch_list) self.repo_arches[arch].append(arch) def __del__(self): for tmp_file in self.tmp_files: if os.path.exists(tmp_file): os.remove(tmp_file) @staticmethod def _warning_callback(warning_type, message): """ Warning callback for createrepo_c parsing functions """ print(f'Warning message: "{message}"; warning type: "{warning_type}"') return True @staticmethod def get_remote_file_content(file_url: AnyStr) -> AnyStr: """ Get content from a remote file and write it to a temp file :param file_url: url of a remote file :return: path to a temp file """ file_request = requests.get( url=file_url, ) file_request.raise_for_status() with tempfile.NamedTemporaryFile(delete=False) as file_stream: file_stream.write(file_request.content) return file_stream.name @staticmethod def _parse_repomd(repomd_file_path: AnyStr) -> cr.Repomd: """ Parse file repomd.xml and create object Repomd :param repomd_file_path: path to local repomd.xml """ return cr.Repomd(repomd_file_path) @classmethod def _parse_modules_file( cls, modules_file_path: AnyStr, ) -> Iterator[Any]: """ Parse modules.yaml.gz and returns parsed data :param modules_file_path: path to local modules.yaml.gz :return: List of dict for each modules in a repo """ with open(modules_file_path, 'rb') as modules_file: data = modules_file.read() if is_gzip_file(data[:2]): data = gzip.decompress(data) elif is_xz_file(data[:2]): data = lzma.decompress(data) return yaml.load_all( data, Loader=yaml.BaseLoader, ) def _get_repomd_records( self, repo_info: RepoInfo, ) -> List[cr.RepomdRecord]: """ Get, parse file repomd.xml and extract from it repomd records :param repo_info: structure which contains info about a current repo :return: list with repomd records """ repomd_file_path = os.path.join( repo_info.path, repo_info.folder, 'repodata', 'repomd.xml', ) if repo_info.is_remote: repomd_file_path = self.get_remote_file_content(repomd_file_path) else: repomd_file_path = repomd_file_path repomd_object = self._parse_repomd(repomd_file_path) if repo_info.is_remote: os.remove(repomd_file_path) return repomd_object.records def _download_repomd_records( self, repo_info: RepoInfo, repomd_records: List[cr.RepomdRecord], repomd_records_dict: Dict[str, str], ): """ Download repomd records :param repo_info: structure which contains info about a current repo :param repomd_records: list with repomd records :param repomd_records_dict: dict with paths to repodata files """ for repomd_record in repomd_records: if repomd_record.type not in ( 'primary', 'filelists', 'other', ): continue repomd_record_file_path = os.path.join( repo_info.path, repo_info.folder, repomd_record.location_href, ) if repo_info.is_remote: repomd_record_file_path = self.get_remote_file_content( repomd_record_file_path) self.tmp_files.append(repomd_record_file_path) repomd_records_dict[repomd_record.type] = repomd_record_file_path def _parse_module_repomd_record( self, repo_info: RepoInfo, repomd_records: List[cr.RepomdRecord], ) -> List[Dict]: """ Download repomd records :param repo_info: structure which contains info about a current repo :param repomd_records: list with repomd records :param repomd_records_dict: dict with paths to repodata files """ for repomd_record in repomd_records: if repomd_record.type != 'modules': continue repomd_record_file_path = os.path.join( repo_info.path, repo_info.folder, repomd_record.location_href, ) if repo_info.is_remote: repomd_record_file_path = self.get_remote_file_content( repomd_record_file_path) self.tmp_files.append(repomd_record_file_path) return list(self._parse_modules_file( repomd_record_file_path, )) @staticmethod def compare_pkgs_version(package_1: Package, package_2: Package) -> int: version_tuple_1 = ( package_1.epoch, package_1.version, package_1.release, ) version_tuple_2 = ( package_2.epoch, package_2.version, package_2.release, ) return rpm.labelCompare(version_tuple_1, version_tuple_2) def generate_packages_json( self ) -> Dict[AnyStr, Dict[AnyStr, Dict[AnyStr, List[AnyStr]]]]: """ Generate packages.json """ packages_json = defaultdict( lambda: defaultdict( lambda: defaultdict( list, ) ) ) all_packages = defaultdict(lambda: {'variants': list()}) for repo_info in self.repos: repomd_records = self._get_repomd_records( repo_info=repo_info, ) repomd_records_dict = {} # type: Dict[str, str] self._download_repomd_records( repo_info=repo_info, repomd_records=repomd_records, repomd_records_dict=repomd_records_dict, ) packages_iterator = PackageIterator( primary_path=repomd_records_dict['primary'], filelists_path=repomd_records_dict['filelists'], other_path=repomd_records_dict['other'], warningcb=self._warning_callback, ) for package in packages_iterator: if package.arch not in self.repo_arches[repo_info.arch]: package_arch = repo_info.arch else: package_arch = package.arch package_key = f'{package.name}.{package_arch}' if 'module' in package.release and not any( re.search(included_package, package.name) for included_package in self.included_packages ): # Even a module package will be added to packages.json if # it presents in the list of included packages continue if package_key not in all_packages: all_packages[package_key]['variants'].append( (repo_info.name, repo_info.arch) ) all_packages[package_key]['arch'] = package_arch all_packages[package_key]['package'] = package all_packages[package_key]['type'] = repo_info.is_reference # replace an older package if it's not reference or # a newer package is from reference repo elif (not all_packages[package_key]['type'] or all_packages[package_key]['type'] == repo_info.is_reference) and \ self.compare_pkgs_version( package, all_packages[package_key]['package'] ) > 0: all_packages[package_key]['variants'] = [ (repo_info.name, repo_info.arch) ] all_packages[package_key]['arch'] = package_arch all_packages[package_key]['package'] = package elif self.compare_pkgs_version( package, all_packages[package_key]['package'] ) == 0: all_packages[package_key]['variants'].append( (repo_info.name, repo_info.arch) ) for package_dict in all_packages.values(): for variant_name, variant_arch in package_dict['variants']: package_arch = package_dict['arch'] package = package_dict['package'] package_name = package.name if any(re.search(excluded_package, package_name) for excluded_package in self.excluded_packages): continue src_package_name = dnf.subject.Subject( package.rpm_sourcerpm, ).get_nevra_possibilities( forms=hawkey.FORM_NEVRA, ) if len(src_package_name) > 1: # We should stop utility if we can't get exact name of srpm raise ValueError( 'We can\'t get exact name of srpm ' f'by its NEVRA "{package.rpm_sourcerpm}"' ) else: src_package_name = src_package_name[0].name # TODO: for x86_64 + i686 in one packages.json # don't remove! # if package.arch in self.addon_repos[variant_arch]: # arches = self.addon_repos[variant_arch] + [variant_arch] # else: # arches = [variant_arch] # for arch in arches: # pkgs_list = packages_json[variant_name][ # arch][src_package_name] # added_pkg = f'{package_name}.{package_arch}' # if added_pkg not in pkgs_list: # pkgs_list.append(added_pkg) pkgs_list = packages_json[variant_name][ variant_arch][src_package_name] added_pkg = f'{package_name}.{package_arch}' if added_pkg not in pkgs_list: pkgs_list.append(added_pkg) return packages_json def create_parser(): parser = argparse.ArgumentParser() parser.add_argument( '--repo-path', action='append', help='Path to a folder with repofolders. E.g. "/var/repos" or ' '"http://koji.cloudlinux.com/mirrors/rhel_mirror"', required=True, ) parser.add_argument( '--repo-folder', action='append', help='A folder which contains folder repodata . E.g. "baseos-stream"', required=True, ) parser.add_argument( '--repo-arch', action='append', help='What architecture packages a repository contains. E.g. "x86_64"', required=True, ) parser.add_argument( '--repo-name', action='append', help='Name of a repository. E.g. "AppStream"', required=True, ) parser.add_argument( '--is-remote', action='append', type=str, help='A repository is remote or local', choices=['yes', 'no'], required=True, ) parser.add_argument( '--is-reference', action='append', type=str, help='A repository is used as reference for packages layout', choices=['yes', 'no'], required=True, ) parser.add_argument( '--excluded-packages', nargs='+', type=str, default=[], help='A list of globally excluded packages from generated json.' 'All of list elements should be separated by space', required=False, ) parser.add_argument( '--included-packages', nargs='+', type=str, default=[], help='A list of globally included packages from generated json.' 'All of list elements should be separated by space', required=False, ) parser.add_argument( '--json-output-path', type=str, help='Full path to output json file', required=True, ) return parser def cli_main(): args = create_parser().parse_args() repos = [] for repo_path, repo_folder, repo_name, \ repo_arch, is_remote, is_reference in zip( args.repo_path, args.repo_folder, args.repo_name, args.repo_arch, args.is_remote, args.is_reference, ): repos.append(RepoInfo( path=repo_path, folder=repo_folder, name=repo_name, arch=repo_arch, is_remote=True if is_remote == 'yes' else False, is_reference=True if is_reference == 'yes' else False )) pg = PackagesGenerator( repos=repos, excluded_packages=args.excluded_packages, included_packages=args.included_packages, ) result = pg.generate_packages_json() with open(args.json_output_path, 'w') as packages_file: json.dump( result, packages_file, indent=4, sort_keys=True, ) if __name__ == '__main__': cli_main()