ALBS-987: Generate i686 and dev repositories with pungi on building new distr. version automatically

- Refactoring
- Some absent packages are in packages.json now
This commit is contained in:
soksanichenko 2023-03-28 12:58:08 +03:00
parent 141d00e941
commit 596c5c0b7f
1 changed files with 169 additions and 150 deletions

View File

@ -25,19 +25,23 @@ from typing import (
Iterator,
Optional,
Tuple,
Union,
)
import binascii
from urllib.parse import urljoin
import createrepo_c as cr
import dnf.subject
import hawkey
import requests
import rpm
import yaml
from createrepo_c import Package, PackageIterator
from dataclasses import dataclass
from createrepo_c import (
Package,
PackageIterator,
Repomd,
RepomdRecord,
)
from dataclasses import dataclass, field
from kobo.rpmlib import parse_nvra
logging.basicConfig(level=logging.INFO)
@ -66,23 +70,33 @@ class RepoInfo:
# 'appstream', 'baseos', etc.
# Or 'http://koji.cloudlinux.com/mirrors/rhel_mirror' if you are
# using remote repo
path: AnyStr
path: str
# name of folder with a repodata folder. E.g. 'baseos', 'appstream', etc
folder: AnyStr
# name of repo. E.g. 'BaseOS', 'AppStream', etc
name: AnyStr
# architecture of repo. E.g. 'x86_64', 'i686', etc
arch: AnyStr
folder: str
# Is a repo remote or local
is_remote: bool
# Is a reference repository (usually it's a RHEL repo)
# Layout of packages from such repository will be taken as example
# Only layout of specific package (which don't exist
# Only layout of specific package (which doesn't exist
# in a reference repository) will be taken as example
is_reference: bool = False
# The packages from 'present' repo will be added to a variant.
# The packages from 'absent' repo will be removed from a variant.
repo_type: str = 'present'
@dataclass
class VariantInfo:
# name of variant. E.g. 'BaseOS', 'AppStream', etc
name: AnyStr
# architecture of variant. E.g. 'x86_64', 'i686', etc
arch: AnyStr
# The packages which will be not added to a variant
excluded_packages: List[str] = field(default_factory=list)
# Repos of a variant
repos: List[RepoInfo] = field(default_factory=list)
class PackagesGenerator:
repo_arches = defaultdict(lambda: list(('noarch',)))
@ -96,12 +110,12 @@ class PackagesGenerator:
def __init__(
self,
repos: List[RepoInfo],
variants: List[VariantInfo],
excluded_packages: List[AnyStr],
included_packages: List[AnyStr],
):
self.repos = repos
self.pkgs_iterators = dict()
self.variants = variants
self.pkgs = dict()
self.excluded_packages = excluded_packages
self.included_packages = included_packages
self.tmp_files = []
@ -152,12 +166,12 @@ class PackagesGenerator:
return file_stream.name
@staticmethod
def _parse_repomd(repomd_file_path: AnyStr) -> cr.Repomd:
def _parse_repomd(repomd_file_path: AnyStr) -> Repomd:
"""
Parse file repomd.xml and create object Repomd
:param repomd_file_path: path to local repomd.xml
"""
return cr.Repomd(repomd_file_path)
return Repomd(repomd_file_path)
@classmethod
def _parse_modules_file(
@ -185,7 +199,7 @@ class PackagesGenerator:
def _get_repomd_records(
self,
repo_info: RepoInfo,
) -> List[cr.RepomdRecord]:
) -> List[RepomdRecord]:
"""
Get, parse file repomd.xml and extract from it repomd records
:param repo_info: structure which contains info about a current repo
@ -215,7 +229,7 @@ class PackagesGenerator:
def _download_repomd_records(
self,
repo_info: RepoInfo,
repomd_records: List[cr.RepomdRecord],
repomd_records: List[RepomdRecord],
repomd_records_dict: Dict[str, str],
):
"""
@ -245,13 +259,12 @@ class PackagesGenerator:
def _parse_module_repomd_record(
self,
repo_info: RepoInfo,
repomd_records: List[cr.RepomdRecord],
repomd_records: List[RepomdRecord],
) -> List[Dict]:
"""
Download repomd records
:param repo_info: structure which contains info about a current repo
:param repomd_records: list with repomd records
:param repomd_records_dict: dict with paths to repodata files
"""
for repomd_record in repomd_records:
if repomd_record.type != 'modules':
@ -283,132 +296,136 @@ class PackagesGenerator:
)
return rpm.labelCompare(version_tuple_1, version_tuple_2)
def get_packages_iterator(
self,
repo_info: RepoInfo,
) -> Union[PackageIterator, Iterator]:
full_repo_path = self._get_full_repo_path(repo_info)
if full_repo_path in self.pkgs:
return self.pkgs[full_repo_path]
else:
repomd_records = self._get_repomd_records(
repo_info=repo_info,
)
repomd_records_dict = {} # type: Dict[str, str]
self._download_repomd_records(
repo_info=repo_info,
repomd_records=repomd_records,
repomd_records_dict=repomd_records_dict,
)
pkgs_iterator = PackageIterator(
primary_path=repomd_records_dict['primary'],
filelists_path=repomd_records_dict['filelists'],
other_path=repomd_records_dict['other'],
warningcb=self._warning_callback,
)
pkgs_iterator, self.pkgs[full_repo_path] = tee(pkgs_iterator)
return pkgs_iterator
def get_package_arch(
self,
package: Package,
variant_arch: str,
) -> str:
if package.arch in self.repo_arches[variant_arch]:
return package.arch
else:
return variant_arch
def is_skipped_module_package(self, package: Package) -> bool:
# Even a module package will be added to packages.json if
# it presents in the list of included packages
return 'module' in package.release and not any(
re.search(included_package, package.name)
for included_package in self.included_packages
)
def is_excluded_package(
self,
package: Package,
variant_arch: str,
excluded_packages: List[str],
) -> bool:
return any(
re.search(
excluded_pkg,
self.get_package_key(package, variant_arch),
) for excluded_pkg in excluded_packages
)
@staticmethod
def get_source_rpm_name(package: Package) -> str:
source_rpm_nvra = parse_nvra(package.rpm_sourcerpm)
return source_rpm_nvra['name']
def get_package_key(self, package: Package, variant_arch: str) -> str:
return (
f'{package.name}.'
f'{self.get_package_arch(package, variant_arch)}'
)
def generate_packages_json(
self
) -> Dict[AnyStr, Dict[AnyStr, Dict[AnyStr, List[AnyStr]]]]:
"""
Generate packages.json
"""
packages_json = defaultdict(
lambda: defaultdict(
lambda: defaultdict(
list,
)
)
)
all_packages = defaultdict(lambda: {
packages = defaultdict(lambda: defaultdict(lambda: {
'variants': list(),
'package_info': dict(),
})
for repo_info in sorted(
self.repos,
key=lambda i: i.repo_type,
reverse=True,
):
full_repo_path = self._get_full_repo_path(repo_info)
if full_repo_path in self.pkgs_iterators:
pkgs_iterator = tee(self.pkgs_iterators[full_repo_path])
else:
repomd_records = self._get_repomd_records(
repo_info=repo_info,
)
repomd_records_dict = {} # type: Dict[str, str]
self._download_repomd_records(
repo_info=repo_info,
repomd_records=repomd_records,
repomd_records_dict=repomd_records_dict,
)
pkgs_iterator = PackageIterator(
primary_path=repomd_records_dict['primary'],
filelists_path=repomd_records_dict['filelists'],
other_path=repomd_records_dict['other'],
warningcb=self._warning_callback,
)
self.pkgs_iterators[full_repo_path] = tee(pkgs_iterator)
for package in pkgs_iterator:
if package.arch not in self.repo_arches[repo_info.arch]:
package_arch = repo_info.arch
else:
package_arch = package.arch
package_key = f'{package.name}.{package_arch}'
package_variants = all_packages[package_key]['variants']
package_info = all_packages[package_key]['package_info']
if 'module' in package.release and not any(
re.search(included_package, package.name)
for included_package in self.included_packages
):
# Even a module package will be added to packages.json if
# it presents in the list of included packages
continue
if repo_info.repo_type == 'present' and not package_info:
package_variants.append((repo_info.name, repo_info.arch))
package_info['arch'] = package_arch
package_info['package'] = package
package_info['type'] = repo_info.is_reference
elif repo_info.repo_type == 'absent' and \
(repo_info.name, repo_info.arch) in package_variants:
package_variants.remove((repo_info.name, repo_info.arch))
# replace an older package if it's not reference or
# a newer package is from reference repo
elif (not package_info['type'] or
package_info['type'] ==
repo_info.is_reference) and \
self.compare_pkgs_version(
}))
for variant_info in self.variants:
for repo_info in variant_info.repos:
is_reference = repo_info.is_reference
for package in self.get_packages_iterator(repo_info=repo_info):
if self.is_skipped_module_package(package):
continue
if self.is_excluded_package(
package,
package_info['package']
) > 0 and repo_info.repo_type == 'present':
all_packages[package_key]['variants'] = [
(repo_info.name, repo_info.arch)
]
package_info['arch'] = package_arch
package_info['package'] = package
elif self.compare_pkgs_version(
variant_info.arch,
self.excluded_packages,
):
continue
if self.is_excluded_package(
package,
package_info['package']
) == 0 and repo_info.repo_type == 'present':
package_variants.append(
(repo_info.name, repo_info.arch)
variant_info.arch,
variant_info.excluded_packages,
):
continue
package_key = self.get_package_key(
package,
variant_info.arch,
)
for package_dict in all_packages.values():
for variant_name, variant_arch in package_dict['variants']:
package_info = package_dict['package_info']
package_arch = package_info['arch']
package = package_info['package']
package_name = f'{package.name}.{package_arch}'
if any(re.search(excluded_package, package_name)
for excluded_package in self.excluded_packages):
continue
src_package_name = dnf.subject.Subject(
package.rpm_sourcerpm,
).get_nevra_possibilities(
forms=hawkey.FORM_NEVRA,
)
if len(src_package_name) > 1:
# We should stop utility if we can't get exact name of srpm
raise ValueError(
'We can\'t get exact name of srpm '
f'by its NEVRA "{package.rpm_sourcerpm}"'
)
else:
src_package_name = src_package_name[0].name
# TODO: for x86_64 + i686 in one packages.json
# don't remove!
# if package.arch in self.addon_repos[variant_arch]:
# arches = self.addon_repos[variant_arch] + [variant_arch]
# else:
# arches = [variant_arch]
# for arch in arches:
# pkgs_list = packages_json[variant_name][
# arch][src_package_name]
# added_pkg = f'{package_name}.{package_arch}'
# if added_pkg not in pkgs_list:
# pkgs_list.append(added_pkg)
pkgs_list = packages_json[variant_name][
variant_arch][src_package_name]
if package_name not in pkgs_list:
pkgs_list.append(package_name)
return packages_json
source_rpm_name = self.get_source_rpm_name(package)
package_info = packages[source_rpm_name][package_key]
if 'is_reference' not in package_info:
package_info['variants'].append(variant_info.name)
package_info['is_reference'] = is_reference
package_info['package'] = package
elif not package_info['is_reference'] or \
package_info['is_reference'] == is_reference and \
self.compare_pkgs_version(
package_1=package,
package_2=package_info['package'],
) > 0:
package_info['variants'] = [variant_info.name]
package_info['is_reference'] = is_reference
package_info['package'] = package
elif self.compare_pkgs_version(
package_1=package,
package_2=package_info['package'],
) == 0 and repo_info.repo_type != 'absent':
package_info['variants'].append(variant_info.name)
result = defaultdict(lambda: defaultdict(
lambda: defaultdict(list),
))
for variant_info in self.variants:
for source_rpm_name, packages_info in packages.items():
for package_key, package_info in packages_info.items():
variant_pkgs = result[variant_info.name][variant_info.arch]
if variant_info.name not in package_info['variants']:
continue
variant_pkgs[source_rpm_name].append(package_key)
return result
def create_parser():
@ -441,32 +458,34 @@ def read_config(config_path: Path) -> Optional[Dict]:
def process_config(config_data: Dict) -> Tuple[
List[RepoInfo],
List[VariantInfo],
List[str],
List[str],
]:
excluded_packages = config_data.get('excluded_packages', [])
included_packages = config_data.get('included_packages', [])
repos = [RepoInfo(
path=variant_repo['path'],
folder=variant_repo['folder'],
variants = [VariantInfo(
name=variant_name,
arch=variant_repo['arch'],
is_remote=variant_repo['remote'],
is_reference=variant_repo['reference'],
repo_type=variant_repo.get('repo_type', 'present'),
) for variant_name, variant_repos in config_data['variants'].items()
for variant_repo in variant_repos]
return repos, excluded_packages, included_packages
arch=variant_info['arch'],
excluded_packages=variant_info.get('excluded_packages', []),
repos=[RepoInfo(
path=variant_repo['path'],
folder=variant_repo['folder'],
is_remote=variant_repo['remote'],
is_reference=variant_repo['reference'],
repo_type=variant_repo.get('repo_type', 'present'),
) for variant_repo in variant_info['repos']]
) for variant_name, variant_info in config_data['variants'].items()]
return variants, excluded_packages, included_packages
def cli_main():
args = create_parser().parse_args()
repos, excluded_packages, included_packages = process_config(
variants, excluded_packages, included_packages = process_config(
config_data=read_config(args.config)
)
pg = PackagesGenerator(
repos=repos,
variants=variants,
excluded_packages=excluded_packages,
included_packages=included_packages,
)