# coding=utf-8 """ The tool allow to generate package.json. This file is used by pungi # as parameter `gather_prepopulate` Sample of using repodata files taken from https://github.com/rpm-software-management/createrepo_c/blob/master/examples/python/repodata_parsing.py """ import argparse import gzip import json import logging import lzma import os import re import tempfile from collections import defaultdict from itertools import tee from pathlib import Path from typing import ( AnyStr, Dict, List, Any, Iterator, Optional, Tuple, ) import binascii from urllib.parse import urljoin import createrepo_c as cr import dnf.subject import hawkey import requests import rpm import yaml from createrepo_c import Package, PackageIterator from dataclasses import dataclass logging.basicConfig(level=logging.INFO) def _is_compressed_file(first_two_bytes: bytes, initial_bytes: bytes): return binascii.hexlify(first_two_bytes) == initial_bytes def is_gzip_file(first_two_bytes): return _is_compressed_file( first_two_bytes=first_two_bytes, initial_bytes=b'1f8b', ) def is_xz_file(first_two_bytes): return _is_compressed_file( first_two_bytes=first_two_bytes, initial_bytes=b'fd37', ) @dataclass class RepoInfo: # path to a directory with repo directories. E.g. '/var/repos' contains # 'appstream', 'baseos', etc. # Or 'http://koji.cloudlinux.com/mirrors/rhel_mirror' if you are # using remote repo path: AnyStr # name of folder with a repodata folder. E.g. 'baseos', 'appstream', etc folder: AnyStr # name of repo. E.g. 'BaseOS', 'AppStream', etc name: AnyStr # architecture of repo. E.g. 'x86_64', 'i686', etc arch: AnyStr # Is a repo remote or local is_remote: bool # Is a reference repository (usually it's a RHEL repo) # Layout of packages from such repository will be taken as example # Only layout of specific package (which don't exist # in a reference repository) will be taken as example is_reference: bool = False repo_type: str = 'present' class PackagesGenerator: repo_arches = defaultdict(lambda: list(('noarch',))) addon_repos = { 'x86_64': ['i686'], 'ppc64le': [], 'aarch64': [], 's390x': [], 'i686': [], } def __init__( self, repos: List[RepoInfo], excluded_packages: List[AnyStr], included_packages: List[AnyStr], ): self.repos = repos self.pkgs_iterators = dict() self.excluded_packages = excluded_packages self.included_packages = included_packages self.tmp_files = [] for arch, arch_list in self.addon_repos.items(): self.repo_arches[arch].extend(arch_list) self.repo_arches[arch].append(arch) def __del__(self): for tmp_file in self.tmp_files: if os.path.exists(tmp_file): os.remove(tmp_file) @staticmethod def _get_full_repo_path(repo_info: RepoInfo): if repo_info.is_remote: return urljoin( repo_info.path + '/', repo_info.folder, ) else: return os.path.join( repo_info.path, repo_info.folder ) @staticmethod def _warning_callback(warning_type, message): """ Warning callback for createrepo_c parsing functions """ print(f'Warning message: "{message}"; warning type: "{warning_type}"') return True @staticmethod def get_remote_file_content(file_url: AnyStr) -> AnyStr: """ Get content from a remote file and write it to a temp file :param file_url: url of a remote file :return: path to a temp file """ file_request = requests.get( url=file_url, ) file_request.raise_for_status() with tempfile.NamedTemporaryFile(delete=False) as file_stream: file_stream.write(file_request.content) return file_stream.name @staticmethod def _parse_repomd(repomd_file_path: AnyStr) -> cr.Repomd: """ Parse file repomd.xml and create object Repomd :param repomd_file_path: path to local repomd.xml """ return cr.Repomd(repomd_file_path) @classmethod def _parse_modules_file( cls, modules_file_path: AnyStr, ) -> Iterator[Any]: """ Parse modules.yaml.gz and returns parsed data :param modules_file_path: path to local modules.yaml.gz :return: List of dict for each module in a repo """ with open(modules_file_path, 'rb') as modules_file: data = modules_file.read() if is_gzip_file(data[:2]): data = gzip.decompress(data) elif is_xz_file(data[:2]): data = lzma.decompress(data) return yaml.load_all( data, Loader=yaml.BaseLoader, ) def _get_repomd_records( self, repo_info: RepoInfo, ) -> List[cr.RepomdRecord]: """ Get, parse file repomd.xml and extract from it repomd records :param repo_info: structure which contains info about a current repo :return: list with repomd records """ if repo_info.is_remote: repomd_file_path = urljoin( urljoin( repo_info.path + '/', repo_info.folder ) + '/', 'repodata/repomd.xml' ) repomd_file_path = self.get_remote_file_content(repomd_file_path) else: repomd_file_path = os.path.join( repo_info.path, repo_info.folder, 'repodata', 'repomd.xml', ) repomd_object = self._parse_repomd(repomd_file_path) if repo_info.is_remote: os.remove(repomd_file_path) return repomd_object.records def _download_repomd_records( self, repo_info: RepoInfo, repomd_records: List[cr.RepomdRecord], repomd_records_dict: Dict[str, str], ): """ Download repomd records :param repo_info: structure which contains info about a current repo :param repomd_records: list with repomd records :param repomd_records_dict: dict with paths to repodata files """ for repomd_record in repomd_records: if repomd_record.type not in ( 'primary', 'filelists', 'other', ): continue repomd_record_file_path = os.path.join( repo_info.path, repo_info.folder, repomd_record.location_href, ) if repo_info.is_remote: repomd_record_file_path = self.get_remote_file_content( repomd_record_file_path) self.tmp_files.append(repomd_record_file_path) repomd_records_dict[repomd_record.type] = repomd_record_file_path def _parse_module_repomd_record( self, repo_info: RepoInfo, repomd_records: List[cr.RepomdRecord], ) -> List[Dict]: """ Download repomd records :param repo_info: structure which contains info about a current repo :param repomd_records: list with repomd records :param repomd_records_dict: dict with paths to repodata files """ for repomd_record in repomd_records: if repomd_record.type != 'modules': continue repomd_record_file_path = os.path.join( repo_info.path, repo_info.folder, repomd_record.location_href, ) if repo_info.is_remote: repomd_record_file_path = self.get_remote_file_content( repomd_record_file_path) self.tmp_files.append(repomd_record_file_path) return list(self._parse_modules_file( repomd_record_file_path, )) @staticmethod def compare_pkgs_version(package_1: Package, package_2: Package) -> int: version_tuple_1 = ( package_1.epoch, package_1.version, package_1.release, ) version_tuple_2 = ( package_2.epoch, package_2.version, package_2.release, ) return rpm.labelCompare(version_tuple_1, version_tuple_2) def generate_packages_json( self ) -> Dict[AnyStr, Dict[AnyStr, Dict[AnyStr, List[AnyStr]]]]: """ Generate packages.json """ packages_json = defaultdict( lambda: defaultdict( lambda: defaultdict( list, ) ) ) all_packages = defaultdict(lambda: { 'variants': list(), 'package_info': dict(), }) for repo_info in sorted( self.repos, key=lambda i: i.repo_type, reverse=True, ): full_repo_path = self._get_full_repo_path(repo_info) if full_repo_path in self.pkgs_iterators: pkgs_iterator = tee(self.pkgs_iterators[full_repo_path]) else: repomd_records = self._get_repomd_records( repo_info=repo_info, ) repomd_records_dict = {} # type: Dict[str, str] self._download_repomd_records( repo_info=repo_info, repomd_records=repomd_records, repomd_records_dict=repomd_records_dict, ) pkgs_iterator = PackageIterator( primary_path=repomd_records_dict['primary'], filelists_path=repomd_records_dict['filelists'], other_path=repomd_records_dict['other'], warningcb=self._warning_callback, ) self.pkgs_iterators[full_repo_path] = tee(pkgs_iterator) for package in pkgs_iterator: if package.arch not in self.repo_arches[repo_info.arch]: package_arch = repo_info.arch else: package_arch = package.arch package_key = f'{package.name}.{package_arch}' package_variants = all_packages[package_key]['variants'] package_info = all_packages[package_key]['package_info'] if 'module' in package.release and not any( re.search(included_package, package.name) for included_package in self.included_packages ): # Even a module package will be added to packages.json if # it presents in the list of included packages continue if repo_info.repo_type == 'present' and not package_info: package_variants.append((repo_info.name, repo_info.arch)) package_info['arch'] = package_arch package_info['package'] = package package_info['type'] = repo_info.is_reference elif repo_info.repo_type == 'absent' and \ (repo_info.name, repo_info.arch) in package_variants: package_variants.remove((repo_info.name, repo_info.arch)) # replace an older package if it's not reference or # a newer package is from reference repo elif (not package_info['type'] or package_info['type'] == repo_info.is_reference) and \ self.compare_pkgs_version( package, package_info['package'] ) > 0 and repo_info.repo_type == 'present': all_packages[package_key]['variants'] = [ (repo_info.name, repo_info.arch) ] package_info['arch'] = package_arch package_info['package'] = package elif self.compare_pkgs_version( package, package_info['package'] ) == 0 and repo_info.repo_type == 'present': package_variants.append( (repo_info.name, repo_info.arch) ) for package_dict in all_packages.values(): for variant_name, variant_arch in package_dict['variants']: package_info = package_dict['package_info'] package_arch = package_info['arch'] package = package_info['package'] package_name = f'{package.name}.{package_arch}' if any(re.search(excluded_package, package_name) for excluded_package in self.excluded_packages): continue src_package_name = dnf.subject.Subject( package.rpm_sourcerpm, ).get_nevra_possibilities( forms=hawkey.FORM_NEVRA, ) if len(src_package_name) > 1: # We should stop utility if we can't get exact name of srpm raise ValueError( 'We can\'t get exact name of srpm ' f'by its NEVRA "{package.rpm_sourcerpm}"' ) else: src_package_name = src_package_name[0].name # TODO: for x86_64 + i686 in one packages.json # don't remove! # if package.arch in self.addon_repos[variant_arch]: # arches = self.addon_repos[variant_arch] + [variant_arch] # else: # arches = [variant_arch] # for arch in arches: # pkgs_list = packages_json[variant_name][ # arch][src_package_name] # added_pkg = f'{package_name}.{package_arch}' # if added_pkg not in pkgs_list: # pkgs_list.append(added_pkg) pkgs_list = packages_json[variant_name][ variant_arch][src_package_name] if package_name not in pkgs_list: pkgs_list.append(package_name) return packages_json def create_parser(): parser = argparse.ArgumentParser() parser.add_argument( '-c', '--config', type=Path, default=Path('config.yaml'), required=False, help='Path to a config', ) parser.add_argument( '-o', '--json-output-path', type=str, help='Full path to output json file', required=True, ) return parser def read_config(config_path: Path) -> Optional[Dict]: if not config_path.exists(): logging.error('A config by path "%s" does not exist', config_path) exit(1) with config_path.open('r') as config_fd: return yaml.safe_load(config_fd) def process_config(config_data: Dict) -> Tuple[ List[RepoInfo], List[str], List[str], ]: excluded_packages = config_data.get('excluded_packages', []) included_packages = config_data.get('included_packages', []) repos = [RepoInfo( path=variant_repo['path'], folder=variant_repo['folder'], name=variant_name, arch=variant_repo['arch'], is_remote=variant_repo['remote'], is_reference=variant_repo['reference'], repo_type=variant_repo.get('repo_type', 'present'), ) for variant_name, variant_repos in config_data['variants'].items() for variant_repo in variant_repos] return repos, excluded_packages, included_packages def cli_main(): args = create_parser().parse_args() repos, excluded_packages, included_packages = process_config( config_data=read_config(args.config) ) pg = PackagesGenerator( repos=repos, excluded_packages=excluded_packages, included_packages=included_packages, ) result = pg.generate_packages_json() with open(args.json_output_path, 'w') as packages_file: json.dump( result, packages_file, indent=4, sort_keys=True, ) if __name__ == '__main__': cli_main()