create_packages_json: drop packages superseded by Obsoletes #18

Merged
alukoshko merged 1 commits from obsoletes-aware-packages-json into master 2026-05-04 13:59:03 +00:00
2 changed files with 143 additions and 1 deletions

View File

@ -1,6 +1,6 @@
Name: pungi
Version: 4.10.1
Release: 1%{?dist}.alma.2
Release: 1%{?dist}.alma.3
Summary: Distribution compose tool
License: GPL-2.0-only
@ -169,6 +169,11 @@ gzip _build/man/pungi.1
%{_bindir}/%{name}-cache-cleanup
%changelog
* Mon May 04 2026 Andrew Lukoshko <alukoshko@almalinux.org> - 4.10.1-1.alma.3
- create_packages_json: drop packages superseded by Obsoletes from another
package in the compose (cross-variant within the same arch); honors
included_packages overrides; new optional --obsoletes-report flag
* Tue Sep 30 2025 Eduard Abdullin <eabdullin@almalinux.org> - 4.10.1-1.alma.2
- Set iso_hfs_ppc64le_compatible to False by default

View File

@ -118,6 +118,10 @@ class PackagesGenerator:
self.pkgs = dict()
self.excluded_packages = excluded_packages
self.included_packages = included_packages
# {variant: {arch: [{'srpm', 'package_key', 'obsoleted_by'}, ...]}}
self.obsoletes_report: Dict[str, Dict[str, List[Dict[str, Any]]]] = (
defaultdict(lambda: defaultdict(list))
)
self.tmp_files = [] # type: list[Path]
for arch, arch_list in self.addon_repos.items():
self.repo_arches[arch].extend(arch_list)
@ -372,6 +376,107 @@ class PackagesGenerator:
f'{self.get_package_arch(package, variant_arch)}'
)
@staticmethod
def _obsolete_matches(
candidate: Package,
obsolete_entry: Tuple,
) -> bool:
"""
Decide whether `candidate` is matched by an Obsoletes entry tuple
from createrepo_c: (name, flag, epoch, version, release, pre).
Unversioned obsoletes (flag is None) match any version.
"""
o_name, o_flag, o_epoch, o_version, o_release = obsolete_entry[:5]
if candidate.name != o_name:
return False
if not o_flag or not o_version:
return True
cand_evr = (
candidate.epoch or '0',
candidate.version or '',
candidate.release or '',
)
obs_evr = (o_epoch or '0', o_version or '', o_release or '')
cmp_result = rpm.labelCompare(cand_evr, obs_evr)
return {
'EQ': cmp_result == 0,
'LT': cmp_result < 0,
'LE': cmp_result <= 0,
'GT': cmp_result > 0,
'GE': cmp_result >= 0,
}.get(o_flag, False)
def _is_included_package(
self,
package: Package,
variant_arch: str,
) -> bool:
package_key = self.get_package_key(package, variant_arch)
return any(
re.search(f'^{ip}$', package_key)
or ip in (package.name, package_key)
for ip in self.included_packages
)
def _compute_obsoleted_drops(
self,
packages: Dict,
) -> Dict[Tuple[str, str, str], List[str]]:
"""
Find packages superseded by another package's Obsoletes: declaration
and return them as drops. Obsoletes are evaluated globally per
variant arch (across all variants of that arch in the compose),
because dnf at install time enforces Obsoletes across all enabled
repos regardless of which variant they came from.
Self-obsoletes (same Name) are ignored those describe old versions
of the same package, not a different package being superseded.
Returns {(variant, srpm, package_key): [obsoleter_nvr, ...]}.
"""
# arch -> name -> list of (variant, srpm, package_key, Package)
by_arch_name: Dict[str, Dict[str, List[Tuple[str, str, str, Package]]]] = (
defaultdict(lambda: defaultdict(list))
)
for variant_info in self.variants:
for srpm, pkgs_info in packages.items():
for pkey, pinfo in pkgs_info.items():
if variant_info.name not in pinfo['variants']:
continue
by_arch_name[variant_info.arch][pinfo['package'].name].append(
(variant_info.name, srpm, pkey, pinfo['package'])
)
drops: Dict[Tuple[str, str, str], List[str]] = defaultdict(list)
# An obsoleter package may itself be present in several variants; we
# only need to evaluate its Obsoletes once per (arch, name, EVR).
seen_obsoleters: set = set()
for arch, name_idx in by_arch_name.items():
for entries in name_idx.values():
for _, _, _, pkg in entries:
obsoleter_id = (
arch, pkg.name, pkg.epoch, pkg.version, pkg.release,
)
if obsoleter_id in seen_obsoleters:
continue
seen_obsoleters.add(obsoleter_id)
obsoletes = getattr(pkg, 'obsoletes', None) or ()
for obs_entry in obsoletes:
obs_name = obs_entry[0]
if obs_name == pkg.name:
continue
for cand_variant, cand_srpm, cand_key, cand_pkg in (
name_idx.get(obs_name, ())
):
if not self._obsolete_matches(cand_pkg, obs_entry):
continue
if self._is_included_package(cand_pkg, arch):
continue
drops[(cand_variant, cand_srpm, cand_key)].append(
f'{pkg.name}-{pkg.version}-{pkg.release}.{pkg.arch}'
)
return drops
def generate_packages_json(
self
) -> Dict[AnyStr, Dict[AnyStr, Dict[AnyStr, List[AnyStr]]]]:
@ -426,6 +531,7 @@ class PackagesGenerator:
package_2=package_info['package'],
) == 0 and repo_info.repo_type != 'absent':
package_info['variants'].append(variant_info.name)
drops = self._compute_obsoleted_drops(packages)
result = defaultdict(lambda: defaultdict(
lambda: defaultdict(list),
))
@ -435,6 +541,20 @@ class PackagesGenerator:
variant_pkgs = result[variant_info.name][variant_info.arch]
if variant_info.name not in package_info['variants']:
continue
drop_key = (
variant_info.name,
source_rpm_name,
package_key,
)
if drop_key in drops:
self.obsoletes_report[
variant_info.name
][variant_info.arch].append({
'srpm': source_rpm_name,
'package_key': package_key,
'obsoleted_by': sorted(set(drops[drop_key])),
})
continue
variant_pkgs[source_rpm_name].append(package_key)
return result
@ -456,6 +576,15 @@ def create_parser():
help='Full path to output json file',
required=True,
)
parser.add_argument(
'--obsoletes-report',
type=str,
default=None,
help=(
'Optional path to write a JSON report of packages dropped due '
'to Obsoletes: declarations from another package in the compose.'
),
)
return parser
@ -508,6 +637,14 @@ def cli_main():
indent=4,
sort_keys=True,
)
if args.obsoletes_report:
with open(args.obsoletes_report, 'w') as report_file:
json.dump(
pg.obsoletes_report,
report_file,
indent=4,
sort_keys=True,
)
if __name__ == '__main__':