LNX-104: Create gather_prepopulate file generator for Pungi

- It's added the tool which can generate json like as `centos-packages.json` using repodata from completed repos.

@BS-LINKED-5ffda6156f44affc6c5ea239  # pungi & dependencies
@BS-TARGET-CL8

Change-Id: Ib0466a1d8e06feb855e81fb7160fe170e2e82e04
This commit is contained in:
Stepan Oksanichenko 2021-01-15 10:53:01 +02:00 committed by oshyshatskyi
parent 903db91c0f
commit 94ad7603b8
8 changed files with 449 additions and 0 deletions

View File

@ -41,6 +41,7 @@ BuildRequires: python3-sphinx
Requires: python3-kobo-rpmlib >= 0.18.0
Requires: python3-kickstart
Requires: python3-requests
Requires: createrepo_c
Requires: koji >= 1.10.1-13
Requires: python3-koji-cli-plugins
@ -122,6 +123,7 @@ rm %{buildroot}%{_bindir}/%{name}-fedmsg-notification
%{_bindir}/%{name}-gather
%{_bindir}/%{name}-gather-rpms
%{_bindir}/%{name}-gather-modules
%{_bindir}/%{name}-generate-packages-json
%{_bindir}/comps_filter
%{_bindir}/%{name}-make-ostree
%{_mandir}/man1/pungi.1.gz

View File

@ -0,0 +1,339 @@
# coding=utf-8
"""
The tool allow to generate package.json. This file is used by pungi
# as parameter `gather_prepopulate`
Sample of using repodata files taken from
https://github.com/rpm-software-management/createrepo_c/blob/master/examples/python/repodata_parsing.py
"""
import argparse
import json
import os
import tempfile
from collections import defaultdict
from typing import AnyStr, Dict, List
import createrepo_c as cr
import dnf.subject
import hawkey
import requests
from dataclasses import dataclass
@dataclass
class RepoInfo:
# path to a directory with repo directories. E.g. '/var/repos' contains
# 'appstream', 'baseos', etc.
# Or 'http://koji.cloudlinux.com/mirrors/rhel_mirror' if you are
# using remote repo
path: AnyStr
# name of folder with a repodata folder. E.g. 'baseos', 'appstream', etc
folder: AnyStr
# name of repo. E.g. 'BaseOS', 'AppStream', etc
name: AnyStr
# architecture of repo. E.g. 'x86_64', 'i686', etc
arch: AnyStr
# Is a repo remote or local
is_remote: bool
class PackagesGenerator:
def __init__(
self,
repos: List[RepoInfo],
excluded_packages: List[AnyStr],
):
self.repos = repos
self.excluded_packages = excluded_packages
@staticmethod
def _warning_callback(warning_type, message):
"""
Warning callback for createrepo_c parsing functions
"""
print(f'Warning message: "{message}"; warning type: "{warning_type}"')
return True
@staticmethod
def _get_remote_file_content(file_url: AnyStr) -> AnyStr:
"""
Get content from a remote file and write it to a temp file
:param file_url: url of a remote file
:return: path to a temp file
"""
file_request = requests.get(
url=file_url,
)
file_request.raise_for_status()
with tempfile.NamedTemporaryFile(delete=False) as file_stream:
file_stream.write(file_request.content)
return file_stream.name
@staticmethod
def _parse_repomd(repomd_file_path: AnyStr) -> cr.Repomd:
"""
Parse file repomd.xml and create object Repomd
:param repomd_file_path: path to local repomd.xml
"""
return cr.Repomd(repomd_file_path)
def _parse_primary_file(
self,
primary_file_path: AnyStr,
packages: Dict[AnyStr, cr.Package],
) -> None:
"""
Parse primary.xml.gz, take from it info about packages and put it to
dict packages
:param primary_file_path: path to local primary.xml.gz
:param packages: dictionary which will be contain info about packages
from repository
"""
cr.xml_parse_primary(
path=primary_file_path,
pkgcb=lambda pkg: packages.update({
pkg.pkgId: pkg,
}),
do_files=False,
warningcb=self._warning_callback,
)
def _parse_filelists_file(
self,
filelists_file_path: AnyStr,
packages: Dict[AnyStr, cr.Package],
) -> None:
"""
Parse filelists.xml.gz, take from it info about packages and put it to
dict packages
:param filelists_file_path: path to local filelists.xml.gz
:param packages: dictionary which will be contain info about packages
from repository
"""
cr.xml_parse_filelists(
path=filelists_file_path,
newpkgcb=lambda pkg_id, name, arch: packages.get(
pkg_id,
None,
),
warningcb=self._warning_callback,
)
def _parse_other_file(
self,
other_file_path: AnyStr,
packages: Dict[AnyStr, cr.Package],
) -> None:
"""
Parse other.xml.gz, take from it info about packages and put it to
dict packages
:param other_file_path: path to local other.xml.gz
:param packages: dictionary which will be contain info about packages
from repository
"""
cr.xml_parse_other(
path=other_file_path,
newpkgcb=lambda pkg_id, name, arch: packages.get(
pkg_id,
None,
),
warningcb=self._warning_callback,
)
def _get_repomd_records(
self,
repo_info: RepoInfo,
) -> List[cr.RepomdRecord]:
"""
Get, parse file repomd.xml and extract from it repomd records
:param repo_info: structure which contains info about a current repo
:return: list with repomd records
"""
repomd_file_path = os.path.join(
repo_info.path,
repo_info.folder,
'repodata',
'repomd.xml',
)
if repo_info.is_remote:
repomd_file_path = self._get_remote_file_content(repomd_file_path)
else:
repomd_file_path = repomd_file_path
repomd_object = self._parse_repomd(repomd_file_path)
if repo_info.is_remote:
os.remove(repomd_file_path)
return repomd_object.records
def _parse_repomd_records(
self,
repo_info: RepoInfo,
repomd_records: List[cr.RepomdRecord],
packages: Dict[AnyStr, cr.Package],
) -> None:
"""
Parse repomd records and extract from repodata file info about packages
:param repo_info: structure which contains info about a current repo
:param repomd_records: list with repomd records
:param packages: dictionary which will be contain info about packages
from repository
"""
for repomd_record in repomd_records:
if repomd_record.type not in (
'primary',
'filelists',
'other',
):
continue
repomd_record_file_path = os.path.join(
repo_info.path,
repo_info.folder,
repomd_record.location_href,
)
if repo_info.is_remote:
repomd_record_file_path = self._get_remote_file_content(
repomd_record_file_path,
)
parse_file_method = getattr(
self,
f'_parse_{repomd_record.type}_file'
)
parse_file_method(
repomd_record_file_path,
packages,
)
if repo_info.is_remote:
os.remove(repomd_record_file_path)
def generate_packages_json(
self
) -> Dict[AnyStr, Dict[AnyStr, Dict[AnyStr, List[AnyStr]]]]:
"""
Generate packages.json
"""
packages_json = defaultdict(
lambda: defaultdict(
lambda: defaultdict(
list,
)
)
)
for repo_info in self.repos:
packages = {}
repomd_records = self._get_repomd_records(
repo_info=repo_info,
)
self._parse_repomd_records(
repo_info=repo_info,
repomd_records=repomd_records,
packages=packages,
)
for package in packages.values():
package_name = package.name
package_arch = package.arch
if 'module' in package.release:
continue
if package_name in self.excluded_packages:
continue
src_package_name = dnf.subject.Subject(
package.rpm_sourcerpm,
).get_nevra_possibilities(
forms=hawkey.FORM_NEVRA,
)
if len(src_package_name) > 1:
# We should stop utility if we can't get exact name of srpm
raise ValueError(
'We can\'t get exact name of srpm '
f'by its NEVRA "{package.rpm_sourcerpm}"'
)
else:
src_package_name = src_package_name[0].name
pkgs_list = packages_json[repo_info.name][
repo_info.arch][src_package_name]
added_pkg = f'{package_name}.{package_arch}'
if added_pkg not in pkgs_list:
pkgs_list.append(added_pkg)
return packages_json
def create_parser():
parser = argparse.ArgumentParser()
parser.add_argument(
'--repo-path',
action='append',
help='Path to a folder with repofolders. E.g. "/var/repos" or '
'"http://koji.cloudlinux.com/mirrors/rhel_mirror"',
required=True,
)
parser.add_argument(
'--repo-folder',
action='append',
help='A folder which contains folder repodata . E.g. "baseos-stream"',
required=True,
)
parser.add_argument(
'--repo-arch',
action='append',
help='What architecture packages a repository contains. E.g. "x86_64"',
required=True,
)
parser.add_argument(
'--repo-name',
action='append',
help='Name of a repository. E.g. "AppStream"',
required=True,
)
parser.add_argument(
'--is-remote',
action='append',
type=str,
help='A repository is remote or local',
choices=['yes', 'no'],
required=True,
)
parser.add_argument(
'--excluded-packages',
nargs='+',
type=str,
default=[],
help='A list of globally excluded packages from generated json.'
'All of list elements should be separated by space',
required=False,
)
return parser
def cli_main():
args = create_parser().parse_args()
repos = []
for repo_path, repo_folder, repo_name, repo_arch, is_remote in zip(
args.repo_path,
args.repo_folder,
args.repo_name,
args.repo_arch,
args.is_remote,
):
repos.append(RepoInfo(
path=repo_path,
folder=repo_folder,
name=repo_name,
arch=repo_arch,
is_remote=True if is_remote == 'yes' else False,
))
pg = PackagesGenerator(
repos=repos,
excluded_packages=args.excluded_packages,
)
result = pg.generate_packages_json()
with open('packages.json', 'w') as packages_file:
json.dump(
result,
packages_file,
indent=4,
sort_keys=True,
)
if __name__ == '__main__':
cli_main()

View File

@ -49,6 +49,7 @@ setup(
"pungi-config-validate = pungi.scripts.config_validate:cli_main",
"pungi-gather-modules = pungi.scripts.gather_modules:cli_main",
"pungi-gather-rpms = pungi.scripts.gather_rpms:cli_main",
"pungi-generate-packages-json = pungi.scripts.create_packages_json:cli_main", # noqa: E501
]
},
scripts=["contrib/yum-dnf-compare/pungi-compare-depsolving"],

View File

@ -0,0 +1,28 @@
<?xml version="1.0" encoding="UTF-8"?>
<repomd xmlns="http://linux.duke.edu/metadata/repo" xmlns:rpm="http://linux.duke.edu/metadata/rpm">
<revision>1610968727</revision>
<data type="primary">
<checksum type="sha256">2826d3f5dd3b03cfb5d2c079123f7add3a7d068e8dfd210873eb27eb32586a8e</checksum>
<open-checksum type="sha256">78efcf6b74f4c56aaab183336eab44fcbcc9cb6c25045fe5980ab83a85e48db7</open-checksum>
<location href="repodata/primary.xml.gz"/>
<timestamp>1610968715</timestamp>
<size>3094</size>
<open-size>16878</open-size>
</data>
<data type="filelists">
<checksum type="sha256">e41805c927fc4ad1b9bde52509afb37e47acc153283b23da17560d4e250b3a3e</checksum>
<open-checksum type="sha256">5f659e8c05b7d056748bf809bec8aa9fa5f791c2b0546d6c49b02a7ebfb26ce2</open-checksum>
<location href="repodata/filelists.xml.gz"/>
<timestamp>1610968715</timestamp>
<size>3970</size>
<open-size>19897</open-size>
</data>
<data type="other">
<checksum type="sha256">db6d0d88abcaf06dc8ef09207fdbb9ba5e3ffb505a7dd2bf94fdbc953a6de11e</checksum>
<open-checksum type="sha256">3ae1b186b4c3037805e2cf28a78b2204c37b4dc04acbd8bef98a7b24ab5b52a8</open-checksum>
<location href="repodata/other.xml.gz"/>
<timestamp>1610968715</timestamp>
<size>2191</size>
<open-size>8337</open-size>
</data>
</repomd>

View File

@ -0,0 +1,79 @@
# coding=utf-8
import os
from collections import defaultdict
from unittest import TestCase, mock, main
from pungi.scripts.create_packages_json import PackagesGenerator, RepoInfo
FOLDER_WITH_TEST_DATA = os.path.join(
os.path.dirname(
os.path.abspath(__file__)
),
'data/test_create_packages_json/',
)
test_repo_info = RepoInfo(
path=FOLDER_WITH_TEST_DATA,
folder='test_repo',
name='TestRepo',
arch='x86_64',
is_remote=False,
)
class TestPackagesJson(TestCase):
def test_01__get_remote_file_content(self):
"""
Test the getting of content from a remote file
"""
request_object = mock.Mock()
request_object.raise_for_status = lambda: True
request_object.content = b'TestContent'
with mock.patch(
'pungi.scripts.create_packages_json.requests.get',
return_value=request_object,
) as mock_requests_get, mock.patch(
'pungi.scripts.create_packages_json.tempfile.NamedTemporaryFile',
) as mock_tempfile:
mock_tempfile.return_value.__enter__.return_value.name = 'tmpfile'
file_name = PackagesGenerator._get_remote_file_content(
file_url='fakeurl'
)
mock_requests_get.assert_called_once_with(url='fakeurl')
mock_tempfile.assert_called_once_with(delete=False)
mock_tempfile.return_value.__enter__().\
write.assert_called_once_with(b'TestContent')
self.assertEqual(
file_name,
'tmpfile',
)
def test_02_generate_additional_packages(self):
pg = PackagesGenerator(
repos=[
test_repo_info,
],
excluded_packages=['zziplib-utils'],
)
test_packages = defaultdict(
lambda: defaultdict(
lambda: defaultdict(
list,
)
)
)
test_packages['TestRepo']['x86_64']['zziplib'] = \
[
'zziplib.i686',
'zziplib.x86_64',
]
result = pg.generate_packages_json()
self.assertEqual(
test_packages,
result,
)
if __name__ == '__main__':
main()