# coding=utf-8 """ The tool allow to generate package.json. This file is used by pungi # as parameter `gather_prepopulate` Sample of using repodata files taken from https://github.com/rpm-software-management/createrepo_c/blob/master/examples/python/repodata_parsing.py """ import argparse import gzip import json import lzma import os import re import tempfile from collections import defaultdict from typing import AnyStr, Dict, List, Optional import binascii import createrepo_c as cr import dnf.subject import hawkey import requests import rpm import yaml from createrepo_c import Package from dataclasses import dataclass def _is_compressed_file(first_two_bytes: bytes, initial_bytes: bytes): return binascii.hexlify(first_two_bytes) == initial_bytes def is_gzip_file(first_two_bytes): return _is_compressed_file( first_two_bytes=first_two_bytes, initial_bytes=b'1f8b', ) def is_xz_file(first_two_bytes): return _is_compressed_file( first_two_bytes=first_two_bytes, initial_bytes=b'fd37', ) @dataclass class RepoInfo: # path to a directory with repo directories. E.g. '/var/repos' contains # 'appstream', 'baseos', etc. # Or 'http://koji.cloudlinux.com/mirrors/rhel_mirror' if you are # using remote repo path: AnyStr # name of folder with a repodata folder. E.g. 'baseos', 'appstream', etc folder: AnyStr # name of repo. E.g. 'BaseOS', 'AppStream', etc name: AnyStr # architecture of repo. E.g. 'x86_64', 'i686', etc arch: AnyStr # Is a repo remote or local is_remote: bool # Is an reference repository (usually it's a RHEL repo) # Layout of packages from such repository will be taken as example # Only layout of specific package (which don't exist # in an reference repository) will be taken as example is_reference: bool = False class PackagesGenerator: def __init__( self, repos: List[RepoInfo], excluded_packages: List[AnyStr], included_packages: List[AnyStr], ): self.repos = repos self.excluded_packages = excluded_packages self.included_packages = included_packages @staticmethod def _warning_callback(warning_type, message): """ Warning callback for createrepo_c parsing functions """ print(f'Warning message: "{message}"; warning type: "{warning_type}"') return True @staticmethod def get_remote_file_content(file_url: AnyStr) -> AnyStr: """ Get content from a remote file and write it to a temp file :param file_url: url of a remote file :return: path to a temp file """ file_request = requests.get( url=file_url, ) file_request.raise_for_status() with tempfile.NamedTemporaryFile(delete=True) as file_stream: file_stream.write(file_request.content) return file_stream.name @staticmethod def _parse_repomd(repomd_file_path: AnyStr) -> cr.Repomd: """ Parse file repomd.xml and create object Repomd :param repomd_file_path: path to local repomd.xml """ return cr.Repomd(repomd_file_path) def _parse_primary_file( self, primary_file_path: AnyStr, packages: Dict[AnyStr, cr.Package], ) -> None: """ Parse primary.xml.gz, take from it info about packages and put it to dict packages :param primary_file_path: path to local primary.xml.gz :param packages: dictionary which will be contain info about packages from repository """ cr.xml_parse_primary( path=primary_file_path, pkgcb=lambda pkg: packages.update({ pkg.pkgId: pkg, }), do_files=False, warningcb=self._warning_callback, ) def _parse_filelists_file( self, filelists_file_path: AnyStr, packages: Dict[AnyStr, cr.Package], ) -> None: """ Parse filelists.xml.gz, take from it info about packages and put it to dict packages :param filelists_file_path: path to local filelists.xml.gz :param packages: dictionary which will be contain info about packages from repository """ cr.xml_parse_filelists( path=filelists_file_path, newpkgcb=lambda pkg_id, name, arch: packages.get( pkg_id, None, ), warningcb=self._warning_callback, ) def _parse_other_file( self, other_file_path: AnyStr, packages: Dict[AnyStr, cr.Package], ) -> None: """ Parse other.xml.gz, take from it info about packages and put it to dict packages :param other_file_path: path to local other.xml.gz :param packages: dictionary which will be contain info about packages from repository """ cr.xml_parse_other( path=other_file_path, newpkgcb=lambda pkg_id, name, arch: packages.get( pkg_id, None, ), warningcb=self._warning_callback, ) @classmethod def _parse_modules_file( cls, modules_file_path: AnyStr, ) -> List[Dict]: """ Parse modules.yaml.gz and returns parsed data :param modules_file_path: path to local modules.yaml.gz :return: List of dict for an each modules in a repo """ with open(modules_file_path, 'rb') as modules_file: data = modules_file.read() if is_gzip_file(data[:2]): data = gzip.decompress(data) elif is_xz_file(data[:2]): data = lzma.decompress(data) return yaml.load_all( data, Loader=yaml.BaseLoader, ) def _get_repomd_records( self, repo_info: RepoInfo, ) -> List[cr.RepomdRecord]: """ Get, parse file repomd.xml and extract from it repomd records :param repo_info: structure which contains info about a current repo :return: list with repomd records """ repomd_file_path = os.path.join( repo_info.path, repo_info.folder, 'repodata', 'repomd.xml', ) if repo_info.is_remote: repomd_file_path = self.get_remote_file_content(repomd_file_path) else: repomd_file_path = repomd_file_path repomd_object = self._parse_repomd(repomd_file_path) if repo_info.is_remote: os.remove(repomd_file_path) return repomd_object.records def _parse_repomd_records( self, repo_info: RepoInfo, repomd_records: List[cr.RepomdRecord], packages: Dict[AnyStr, cr.Package], ) -> Optional[List[Dict]]: """ Parse repomd records and extract from repodata file info about packages :param repo_info: structure which contains info about a current repo :param repomd_records: list with repomd records :param packages: dictionary which will be contain info about packages from repository :return: List of dict for an each modules in a repo if it contains modules info otherwise returns None """ modules_data = [] for repomd_record in repomd_records: if repomd_record.type not in ( 'primary', 'filelists', 'other', 'modules', ): continue repomd_record_file_path = os.path.join( repo_info.path, repo_info.folder, repomd_record.location_href, ) if repo_info.is_remote: repomd_record_file_path = self.get_remote_file_content( repomd_record_file_path) if repomd_record.type == 'modules': modules_data = self._parse_modules_file( repomd_record_file_path, ) else: parse_file_method = getattr( self, f'_parse_{repomd_record.type}_file' ) parse_file_method( repomd_record_file_path, packages, ) if repo_info.is_remote: os.remove(repomd_record_file_path) return list(modules_data) @staticmethod def compare_pkgs_version(package_1: Package, package_2: Package) -> int: version_tuple_1 = ( package_1.epoch, package_1.version, package_1.release, ) version_tuple_2 = ( package_2.epoch, package_2.version, package_2.release, ) return rpm.labelCompare(version_tuple_1, version_tuple_2) def generate_packages_json( self ) -> Dict[AnyStr, Dict[AnyStr, Dict[AnyStr, List[AnyStr]]]]: """ Generate packages.json """ packages_json = defaultdict( lambda: defaultdict( lambda: defaultdict( list, ) ) ) all_packages = defaultdict(lambda: {'variants': list()}) for repo_info in self.repos: repo_arches = [ repo_info.arch, 'noarch', ] if repo_info.arch == 'x86_64': repo_arches.extend([ 'i686', 'i386', ]) packages = {} # type: Dict[AnyStr, cr.Package] repomd_records = self._get_repomd_records( repo_info=repo_info, ) self._parse_repomd_records( repo_info=repo_info, repomd_records=repomd_records, packages=packages, ) for package in packages.values(): if package.arch not in repo_arches: package_arch = repo_info.arch else: package_arch = package.arch package_key = f'{package.name}.{package_arch}' if 'module' in package.release and not any( re.search(included_package, package.name) for included_package in self.included_packages ): # Even a module package will be added to packages.json if # it presents in the list of included packages continue if package_key not in all_packages: all_packages[package_key]['variants'].append( repo_info.name ) all_packages[package_key]['arch'] = repo_info.arch all_packages[package_key]['package'] = package all_packages[package_key]['type'] = repo_info.is_reference # replace an older package if it's not reference or # a newer package is from reference repo elif (not all_packages[package_key]['type'] or all_packages[package_key]['type'] == repo_info.is_reference) and \ self.compare_pkgs_version( package, all_packages[package_key]['package'] ) > 0: all_packages[package_key]['variants'] = [repo_info.name] all_packages[package_key]['arch'] = repo_info.arch all_packages[package_key]['package'] = package elif self.compare_pkgs_version( package, all_packages[package_key]['package'] ) == 0: all_packages[package_key]['variants'].append( repo_info.name ) for package_dict in all_packages.values(): repo_arches = [ package_dict['arch'], 'noarch', ] if package_dict['arch'] == 'x86_64': repo_arches.extend([ 'i686', 'i386', ]) for variant in package_dict['variants']: repo_arch = package_dict['arch'] package = package_dict['package'] package_name = package.name if package.arch not in repo_arches: package_arch = package_dict['arch'] else: package_arch = package.arch if any(re.search(excluded_package, package_name) for excluded_package in self.excluded_packages): continue src_package_name = dnf.subject.Subject( package.rpm_sourcerpm, ).get_nevra_possibilities( forms=hawkey.FORM_NEVRA, ) if len(src_package_name) > 1: # We should stop utility if we can't get exact name of srpm raise ValueError( 'We can\'t get exact name of srpm ' f'by its NEVRA "{package.rpm_sourcerpm}"' ) else: src_package_name = src_package_name[0].name pkgs_list = packages_json[variant][ repo_arch][src_package_name] added_pkg = f'{package_name}.{package_arch}' if added_pkg not in pkgs_list: pkgs_list.append(added_pkg) return packages_json def create_parser(): parser = argparse.ArgumentParser() parser.add_argument( '--repo-path', action='append', help='Path to a folder with repofolders. E.g. "/var/repos" or ' '"http://koji.cloudlinux.com/mirrors/rhel_mirror"', required=True, ) parser.add_argument( '--repo-folder', action='append', help='A folder which contains folder repodata . E.g. "baseos-stream"', required=True, ) parser.add_argument( '--repo-arch', action='append', help='What architecture packages a repository contains. E.g. "x86_64"', required=True, ) parser.add_argument( '--repo-name', action='append', help='Name of a repository. E.g. "AppStream"', required=True, ) parser.add_argument( '--is-remote', action='append', type=str, help='A repository is remote or local', choices=['yes', 'no'], required=True, ) parser.add_argument( '--is-reference', action='append', type=str, help='A repository is used as reference for packages layout', choices=['yes', 'no'], required=True, ) parser.add_argument( '--excluded-packages', nargs='+', type=str, default=[], help='A list of globally excluded packages from generated json.' 'All of list elements should be separated by space', required=False, ) parser.add_argument( '--included-packages', nargs='+', type=str, default=[], help='A list of globally included packages from generated json.' 'All of list elements should be separated by space', required=False, ) parser.add_argument( '--json-output-path', type=str, help='Full path to output json file', required=True, ) return parser def cli_main(): args = create_parser().parse_args() repos = [] for repo_path, repo_folder, repo_name, \ repo_arch, is_remote, is_reference in zip( args.repo_path, args.repo_folder, args.repo_name, args.repo_arch, args.is_remote, args.is_reference, ): repos.append(RepoInfo( path=repo_path, folder=repo_folder, name=repo_name, arch=repo_arch, is_remote=True if is_remote == 'yes' else False, is_reference=True if is_reference == 'yes' else False )) pg = PackagesGenerator( repos=repos, excluded_packages=args.excluded_packages, included_packages=args.included_packages, ) result = pg.generate_packages_json() with open(args.json_output_path, 'w') as packages_file: json.dump( result, packages_file, indent=4, sort_keys=True, ) if __name__ == '__main__': cli_main()