create_packages_json: drop packages superseded by Obsoletes #18
@ -1,6 +1,6 @@
|
||||
Name: pungi
|
||||
Version: 4.10.1
|
||||
Release: 1%{?dist}.alma.2
|
||||
Release: 1%{?dist}.alma.3
|
||||
Summary: Distribution compose tool
|
||||
|
||||
License: GPL-2.0-only
|
||||
@ -169,6 +169,11 @@ gzip _build/man/pungi.1
|
||||
%{_bindir}/%{name}-cache-cleanup
|
||||
|
||||
%changelog
|
||||
* Mon May 04 2026 Andrew Lukoshko <alukoshko@almalinux.org> - 4.10.1-1.alma.3
|
||||
- create_packages_json: drop packages superseded by Obsoletes from another
|
||||
package in the compose (cross-variant within the same arch); honors
|
||||
included_packages overrides; new optional --obsoletes-report flag
|
||||
|
||||
* Tue Sep 30 2025 Eduard Abdullin <eabdullin@almalinux.org> - 4.10.1-1.alma.2
|
||||
- Set iso_hfs_ppc64le_compatible to False by default
|
||||
|
||||
|
||||
@ -118,6 +118,10 @@ class PackagesGenerator:
|
||||
self.pkgs = dict()
|
||||
self.excluded_packages = excluded_packages
|
||||
self.included_packages = included_packages
|
||||
# {variant: {arch: [{'srpm', 'package_key', 'obsoleted_by'}, ...]}}
|
||||
self.obsoletes_report: Dict[str, Dict[str, List[Dict[str, Any]]]] = (
|
||||
defaultdict(lambda: defaultdict(list))
|
||||
)
|
||||
self.tmp_files = [] # type: list[Path]
|
||||
for arch, arch_list in self.addon_repos.items():
|
||||
self.repo_arches[arch].extend(arch_list)
|
||||
@ -372,6 +376,107 @@ class PackagesGenerator:
|
||||
f'{self.get_package_arch(package, variant_arch)}'
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _obsolete_matches(
|
||||
candidate: Package,
|
||||
obsolete_entry: Tuple,
|
||||
) -> bool:
|
||||
"""
|
||||
Decide whether `candidate` is matched by an Obsoletes entry tuple
|
||||
from createrepo_c: (name, flag, epoch, version, release, pre).
|
||||
Unversioned obsoletes (flag is None) match any version.
|
||||
"""
|
||||
o_name, o_flag, o_epoch, o_version, o_release = obsolete_entry[:5]
|
||||
if candidate.name != o_name:
|
||||
return False
|
||||
if not o_flag or not o_version:
|
||||
return True
|
||||
cand_evr = (
|
||||
candidate.epoch or '0',
|
||||
candidate.version or '',
|
||||
candidate.release or '',
|
||||
)
|
||||
obs_evr = (o_epoch or '0', o_version or '', o_release or '')
|
||||
cmp_result = rpm.labelCompare(cand_evr, obs_evr)
|
||||
return {
|
||||
'EQ': cmp_result == 0,
|
||||
'LT': cmp_result < 0,
|
||||
'LE': cmp_result <= 0,
|
||||
'GT': cmp_result > 0,
|
||||
'GE': cmp_result >= 0,
|
||||
}.get(o_flag, False)
|
||||
|
||||
def _is_included_package(
|
||||
self,
|
||||
package: Package,
|
||||
variant_arch: str,
|
||||
) -> bool:
|
||||
package_key = self.get_package_key(package, variant_arch)
|
||||
return any(
|
||||
re.search(f'^{ip}$', package_key)
|
||||
or ip in (package.name, package_key)
|
||||
for ip in self.included_packages
|
||||
)
|
||||
|
||||
def _compute_obsoleted_drops(
|
||||
self,
|
||||
packages: Dict,
|
||||
) -> Dict[Tuple[str, str, str], List[str]]:
|
||||
"""
|
||||
Find packages superseded by another package's Obsoletes: declaration
|
||||
and return them as drops. Obsoletes are evaluated globally per
|
||||
variant arch (across all variants of that arch in the compose),
|
||||
because dnf at install time enforces Obsoletes across all enabled
|
||||
repos regardless of which variant they came from.
|
||||
|
||||
Self-obsoletes (same Name) are ignored — those describe old versions
|
||||
of the same package, not a different package being superseded.
|
||||
|
||||
Returns {(variant, srpm, package_key): [obsoleter_nvr, ...]}.
|
||||
"""
|
||||
# arch -> name -> list of (variant, srpm, package_key, Package)
|
||||
by_arch_name: Dict[str, Dict[str, List[Tuple[str, str, str, Package]]]] = (
|
||||
defaultdict(lambda: defaultdict(list))
|
||||
)
|
||||
for variant_info in self.variants:
|
||||
for srpm, pkgs_info in packages.items():
|
||||
for pkey, pinfo in pkgs_info.items():
|
||||
if variant_info.name not in pinfo['variants']:
|
||||
continue
|
||||
by_arch_name[variant_info.arch][pinfo['package'].name].append(
|
||||
(variant_info.name, srpm, pkey, pinfo['package'])
|
||||
)
|
||||
|
||||
drops: Dict[Tuple[str, str, str], List[str]] = defaultdict(list)
|
||||
# An obsoleter package may itself be present in several variants; we
|
||||
# only need to evaluate its Obsoletes once per (arch, name, EVR).
|
||||
seen_obsoleters: set = set()
|
||||
for arch, name_idx in by_arch_name.items():
|
||||
for entries in name_idx.values():
|
||||
for _, _, _, pkg in entries:
|
||||
obsoleter_id = (
|
||||
arch, pkg.name, pkg.epoch, pkg.version, pkg.release,
|
||||
)
|
||||
if obsoleter_id in seen_obsoleters:
|
||||
continue
|
||||
seen_obsoleters.add(obsoleter_id)
|
||||
obsoletes = getattr(pkg, 'obsoletes', None) or ()
|
||||
for obs_entry in obsoletes:
|
||||
obs_name = obs_entry[0]
|
||||
if obs_name == pkg.name:
|
||||
continue
|
||||
for cand_variant, cand_srpm, cand_key, cand_pkg in (
|
||||
name_idx.get(obs_name, ())
|
||||
):
|
||||
if not self._obsolete_matches(cand_pkg, obs_entry):
|
||||
continue
|
||||
if self._is_included_package(cand_pkg, arch):
|
||||
continue
|
||||
drops[(cand_variant, cand_srpm, cand_key)].append(
|
||||
f'{pkg.name}-{pkg.version}-{pkg.release}.{pkg.arch}'
|
||||
)
|
||||
return drops
|
||||
|
||||
def generate_packages_json(
|
||||
self
|
||||
) -> Dict[AnyStr, Dict[AnyStr, Dict[AnyStr, List[AnyStr]]]]:
|
||||
@ -426,6 +531,7 @@ class PackagesGenerator:
|
||||
package_2=package_info['package'],
|
||||
) == 0 and repo_info.repo_type != 'absent':
|
||||
package_info['variants'].append(variant_info.name)
|
||||
drops = self._compute_obsoleted_drops(packages)
|
||||
result = defaultdict(lambda: defaultdict(
|
||||
lambda: defaultdict(list),
|
||||
))
|
||||
@ -435,6 +541,20 @@ class PackagesGenerator:
|
||||
variant_pkgs = result[variant_info.name][variant_info.arch]
|
||||
if variant_info.name not in package_info['variants']:
|
||||
continue
|
||||
drop_key = (
|
||||
variant_info.name,
|
||||
source_rpm_name,
|
||||
package_key,
|
||||
)
|
||||
if drop_key in drops:
|
||||
self.obsoletes_report[
|
||||
variant_info.name
|
||||
][variant_info.arch].append({
|
||||
'srpm': source_rpm_name,
|
||||
'package_key': package_key,
|
||||
'obsoleted_by': sorted(set(drops[drop_key])),
|
||||
})
|
||||
continue
|
||||
variant_pkgs[source_rpm_name].append(package_key)
|
||||
return result
|
||||
|
||||
@ -456,6 +576,15 @@ def create_parser():
|
||||
help='Full path to output json file',
|
||||
required=True,
|
||||
)
|
||||
parser.add_argument(
|
||||
'--obsoletes-report',
|
||||
type=str,
|
||||
default=None,
|
||||
help=(
|
||||
'Optional path to write a JSON report of packages dropped due '
|
||||
'to Obsoletes: declarations from another package in the compose.'
|
||||
),
|
||||
)
|
||||
|
||||
return parser
|
||||
|
||||
@ -508,6 +637,14 @@ def cli_main():
|
||||
indent=4,
|
||||
sort_keys=True,
|
||||
)
|
||||
if args.obsoletes_report:
|
||||
with open(args.obsoletes_report, 'w') as report_file:
|
||||
json.dump(
|
||||
pg.obsoletes_report,
|
||||
report_file,
|
||||
indent=4,
|
||||
sort_keys=True,
|
||||
)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
Loading…
Reference in New Issue
Block a user