ALBS-987: Generate i686 and dev repositories with pungi on building new distr. version automatically

- [Generator of packages.json] Replace using CLI by config.yaml
- [Gather RPMs] os.path is replaced by Path
This commit is contained in:
soksanichenko 2023-03-22 15:56:58 +02:00
parent 60a347a4a2
commit 6d58bc2ed8
2 changed files with 68 additions and 111 deletions

View File

@ -9,12 +9,22 @@ https://github.com/rpm-software-management/createrepo_c/blob/master/examples/pyt
import argparse
import gzip
import json
import logging
import lzma
import os
import re
import tempfile
from collections import defaultdict
from typing import AnyStr, Dict, List, Any, Iterator
from pathlib import Path
from typing import (
AnyStr,
Dict,
List,
Any,
Iterator,
Optional,
Tuple,
)
import binascii
import createrepo_c as cr
@ -26,6 +36,8 @@ import yaml
from createrepo_c import Package, PackageIterator
from dataclasses import dataclass
logging.basicConfig(level=logging.INFO)
def _is_compressed_file(first_two_bytes: bytes, initial_bytes: bytes):
return binascii.hexlify(first_two_bytes) == initial_bytes
@ -330,7 +342,7 @@ class PackagesGenerator:
for variant_name, variant_arch in package_dict['variants']:
package_arch = package_dict['arch']
package = package_dict['package']
package_name = package.name
package_name = f'{package.name}.{package_arch}'
if any(re.search(excluded_package, package_name)
for excluded_package in self.excluded_packages):
continue
@ -370,73 +382,15 @@ class PackagesGenerator:
def create_parser():
parser = argparse.ArgumentParser()
parser.add_argument(
'--repo-path',
action='append',
help='Path to a folder with repofolders. E.g. "/var/repos" or '
'"http://koji.cloudlinux.com/mirrors/rhel_mirror"',
required=True,
)
parser.add_argument(
'--repo-folder',
action='append',
help='A folder which contains folder repodata . E.g. "baseos-stream"',
required=True,
)
parser.add_argument(
'--repo-arch',
action='append',
help='What architecture packages a repository contains. E.g. "x86_64"',
required=True,
)
parser.add_argument(
'--repo-name',
action='append',
help='Name of a repository. E.g. "AppStream"',
required=True,
)
parser.add_argument(
'--is-remote',
action='append',
type=str,
help='A repository is remote or local',
choices=['yes', 'no'],
required=True,
)
parser.add_argument(
'--is-reference',
action='append',
type=str,
help='A repository is used as reference for packages layout',
choices=['yes', 'no'],
required=True,
)
parser.add_argument(
'--repo-type',
action='append',
type=str,
help='Packages from repository will be removed or added to variant',
choices=['present', 'absent'],
required=True,
)
parser.add_argument(
'--excluded-packages',
nargs='+',
type=str,
default=[],
help='A list of globally excluded packages from generated json.'
'All of list elements should be separated by space',
required=False,
)
parser.add_argument(
'--included-packages',
nargs='+',
type=str,
default=[],
help='A list of globally included packages from generated json.'
'All of list elements should be separated by space',
'-c',
'--config',
type=Path,
default=Path('config.yaml'),
required=False,
help='Path to a config',
)
parser.add_argument(
'-o',
'--json-output-path',
type=str,
help='Full path to output json file',
@ -446,32 +400,43 @@ def create_parser():
return parser
def read_config(config_path: Path) -> Optional[Dict]:
if not config_path.exists():
logging.error('A config by path "%s" does not exist', config_path)
exit(1)
with config_path.open('r') as config_fd:
return yaml.safe_load(config_fd)
def process_config(config_data: Dict) -> Tuple[
List[RepoInfo],
List[str],
List[str],
]:
excluded_packages = config_data.get('excluded_packages', [])
included_packages = config_data.get('included_packages', [])
repos = [RepoInfo(
path=variant_repo['path'],
folder=variant_repo['folder'],
name=variant_name,
arch=variant_repo['arch'],
is_remote=variant_repo['remote'],
is_reference=variant_repo['reference'],
repo_type=variant_repo.get('repo_type', 'present'),
) for variant_name, variant_repos in config_data['variants'].items()
for variant_repo in variant_repos]
return repos, excluded_packages, included_packages
def cli_main():
args = create_parser().parse_args()
repos = []
for repo_path, repo_folder, repo_name, \
repo_arch, is_remote, is_reference, repo_type in zip(
args.repo_path,
args.repo_folder,
args.repo_name,
args.repo_arch,
args.is_remote,
args.is_reference,
args.repo_type,
):
repos.append(RepoInfo(
path=repo_path,
folder=repo_folder,
name=repo_name,
arch=repo_arch,
is_remote=True if is_remote == 'yes' else False,
is_reference=True if is_reference == 'yes' else False,
repo_type=repo_type,
))
repos, excluded_packages, included_packages = process_config(
config_data=read_config(args.config)
)
pg = PackagesGenerator(
repos=repos,
excluded_packages=args.excluded_packages,
included_packages=args.included_packages,
excluded_packages=excluded_packages,
included_packages=included_packages,
)
result = pg.generate_packages_json()
with open(args.json_output_path, 'w') as packages_file:

View File

@ -2,38 +2,32 @@ from argparse import ArgumentParser
import os
from typing import List
from pathlib import Path
from attr import dataclass
from dataclasses import dataclass
from productmd.common import parse_nvra
@dataclass
class Package:
nvra: str
path: str
path: Path
def search_rpms(top_dir) -> List[Package]:
def search_rpms(top_dir: Path) -> List[Package]:
"""
Search for all *.rpm files recursively
in given top directory
Returns:
list: list of paths
"""
rpms = []
for root, dirs, files in os.walk(top_dir):
path = root.split(os.sep)
for file in files:
if not file.endswith('.rpm'):
continue
nvra, _ = os.path.splitext(file)
rpms.append(
Package(nvra=nvra, path=os.path.join('/', *path, file))
)
return rpms
return [Package(
nvra=path.stem,
path=path,
) for path in top_dir.rglob('*.rpm')]
def copy_rpms(packages: List[Package], target_top_dir: str):
def copy_rpms(packages: List[Package], target_top_dir: Path):
"""
Search synced repos for rpms and prepare
koji-like structure for pungi
@ -46,24 +40,22 @@ def copy_rpms(packages: List[Package], target_top_dir: str):
"""
for package in packages:
info = parse_nvra(package.nvra)
target_arch_dir = os.path.join(target_top_dir, info['arch'])
target_arch_dir = target_top_dir.joinpath(info['arch'])
target_file = target_arch_dir.joinpath(package.path.name)
os.makedirs(target_arch_dir, exist_ok=True)
target_file = os.path.join(target_arch_dir, os.path.basename(package.path))
if not os.path.exists(target_file):
if not target_file.exists():
try:
os.link(package.path, target_file)
except OSError:
# hardlink failed, try symlinking
os.symlink(package.path, target_file)
package.path.symlink_to(target_file)
def cli_main():
parser = ArgumentParser()
parser.add_argument('-p', '--path', required=True)
parser.add_argument('-t', '--target', required=True)
parser.add_argument('-p', '--path', required=True, type=Path)
parser.add_argument('-t', '--target', required=True, type=Path)
namespace = parser.parse_args()