242 lines
7.8 KiB
Python
242 lines
7.8 KiB
Python
import gzip
|
|
import lzma
|
|
import os
|
|
from argparse import ArgumentParser, FileType
|
|
from io import BytesIO
|
|
from pathlib import Path
|
|
from typing import List, AnyStr, Iterable, Union, Optional
|
|
import logging
|
|
from urllib.parse import urljoin
|
|
|
|
import yaml
|
|
import createrepo_c as cr
|
|
from typing.io import BinaryIO
|
|
|
|
from .create_packages_json import PackagesGenerator, is_gzip_file, is_xz_file
|
|
|
|
EMPTY_FILE = '.empty'
|
|
|
|
|
|
def read_modules_yaml(modules_yaml_path: Union[str, Path]) -> BytesIO:
|
|
with open(modules_yaml_path, 'rb') as fp:
|
|
return BytesIO(fp.read())
|
|
|
|
|
|
def grep_list_of_modules_yaml(repos_path: AnyStr) -> Iterable[BytesIO]:
|
|
"""
|
|
Find all of valid *modules.yaml.gz in repos
|
|
:param repos_path: path to a directory which contains repo dirs
|
|
:return: iterable object of content from *modules.yaml.*
|
|
"""
|
|
|
|
return (
|
|
read_modules_yaml_from_specific_repo(repo_path=path.parent)
|
|
for path in Path(repos_path).rglob('repodata')
|
|
)
|
|
|
|
|
|
def _is_remote(path: str):
|
|
return any(str(path).startswith(protocol)
|
|
for protocol in ('http', 'https'))
|
|
|
|
|
|
def read_modules_yaml_from_specific_repo(
|
|
repo_path: Union[str, Path]
|
|
) -> Optional[BytesIO]:
|
|
"""
|
|
Read modules_yaml from a specific repo (remote or local)
|
|
:param repo_path: path/url to a specific repo
|
|
(final dir should contain dir `repodata`)
|
|
:return: iterable object of content from *modules.yaml.*
|
|
"""
|
|
|
|
if _is_remote(repo_path):
|
|
repomd_url = urljoin(
|
|
repo_path + '/',
|
|
'repodata/repomd.xml',
|
|
)
|
|
repomd_file_path = PackagesGenerator.get_remote_file_content(
|
|
file_url=repomd_url
|
|
)
|
|
else:
|
|
repomd_file_path = os.path.join(
|
|
repo_path,
|
|
'repodata/repomd.xml',
|
|
)
|
|
repomd_obj = cr.Repomd(str(repomd_file_path))
|
|
for record in repomd_obj.records:
|
|
if record.type != 'modules':
|
|
continue
|
|
else:
|
|
if _is_remote(repo_path):
|
|
modules_yaml_url = urljoin(
|
|
repo_path + '/',
|
|
record.location_href,
|
|
)
|
|
modules_yaml_path = PackagesGenerator.get_remote_file_content(
|
|
file_url=modules_yaml_url
|
|
)
|
|
else:
|
|
modules_yaml_path = os.path.join(
|
|
repo_path,
|
|
record.location_href,
|
|
)
|
|
return read_modules_yaml(modules_yaml_path=modules_yaml_path)
|
|
else:
|
|
return None
|
|
|
|
|
|
def _should_grep_defaults(
|
|
document_type: str,
|
|
grep_only_modules_data: bool = False,
|
|
grep_only_modules_defaults_data: bool = False,
|
|
) -> bool:
|
|
xor_flag = grep_only_modules_data == grep_only_modules_defaults_data
|
|
if document_type == 'modulemd' and (xor_flag or grep_only_modules_data):
|
|
return True
|
|
return False
|
|
|
|
|
|
def _should_grep_modules(
|
|
document_type: str,
|
|
grep_only_modules_data: bool = False,
|
|
grep_only_modules_defaults_data: bool = False,
|
|
) -> bool:
|
|
xor_flag = grep_only_modules_data == grep_only_modules_defaults_data
|
|
if document_type == 'modulemd-defaults' and \
|
|
(xor_flag or grep_only_modules_defaults_data):
|
|
return True
|
|
return False
|
|
|
|
|
|
def collect_modules(
|
|
modules_paths: List[BinaryIO],
|
|
target_dir: str,
|
|
grep_only_modules_data: bool = False,
|
|
grep_only_modules_defaults_data: bool = False,
|
|
):
|
|
"""
|
|
Read given modules.yaml.gz files and export modules
|
|
and modulemd files from it.
|
|
Returns:
|
|
object:
|
|
"""
|
|
xor_flag = grep_only_modules_defaults_data is grep_only_modules_data
|
|
modules_path = os.path.join(target_dir, 'modules')
|
|
module_defaults_path = os.path.join(target_dir, 'module_defaults')
|
|
if grep_only_modules_data or xor_flag:
|
|
os.makedirs(modules_path, exist_ok=True)
|
|
if grep_only_modules_defaults_data or xor_flag:
|
|
os.makedirs(module_defaults_path, exist_ok=True)
|
|
# Defaults modules can be empty, but pungi detects
|
|
# empty folder while copying and raises the exception in this case
|
|
Path(os.path.join(module_defaults_path, EMPTY_FILE)).touch()
|
|
|
|
for module_file in modules_paths:
|
|
data = module_file.read()
|
|
if is_gzip_file(data[:2]):
|
|
data = gzip.decompress(data)
|
|
elif is_xz_file(data[:2]):
|
|
data = lzma.decompress(data)
|
|
documents = yaml.load_all(data, Loader=yaml.BaseLoader)
|
|
for doc in documents:
|
|
path = None
|
|
if _should_grep_modules(
|
|
doc['document'],
|
|
grep_only_modules_data,
|
|
grep_only_modules_defaults_data,
|
|
):
|
|
name = f"{doc['data']['module']}.yaml"
|
|
path = os.path.join(module_defaults_path, name)
|
|
logging.info('Found %s module defaults', name)
|
|
elif _should_grep_defaults(
|
|
doc['document'],
|
|
grep_only_modules_data,
|
|
grep_only_modules_defaults_data,
|
|
):
|
|
# pungi.phases.pkgset.sources.source_koji.get_koji_modules
|
|
stream = doc['data']['stream'].replace('-', '_')
|
|
doc_data = doc['data']
|
|
name = f"{doc_data['name']}-{stream}-" \
|
|
f"{doc_data['version']}.{doc_data['context']}"
|
|
arch_dir = os.path.join(
|
|
modules_path,
|
|
doc_data['arch']
|
|
)
|
|
os.makedirs(arch_dir, exist_ok=True)
|
|
path = os.path.join(
|
|
arch_dir,
|
|
name,
|
|
)
|
|
logging.info('Found module %s', name)
|
|
|
|
if 'artifacts' not in doc['data']:
|
|
logging.warning(
|
|
'RPM %s does not have explicit list of artifacts',
|
|
name
|
|
)
|
|
if path is not None:
|
|
with open(path, 'w') as f:
|
|
yaml.dump(doc, f, default_flow_style=False)
|
|
|
|
|
|
def cli_main():
|
|
parser = ArgumentParser()
|
|
content_type_group = parser.add_mutually_exclusive_group(required=False)
|
|
content_type_group.add_argument(
|
|
'--get-only-modules-data',
|
|
action='store_true',
|
|
help='Parse and get only modules data',
|
|
)
|
|
content_type_group.add_argument(
|
|
'--get-only-modules-defaults-data',
|
|
action='store_true',
|
|
help='Parse and get only modules_defaults data',
|
|
)
|
|
path_group = parser.add_mutually_exclusive_group(required=True)
|
|
path_group.add_argument(
|
|
'-p', '--path',
|
|
type=FileType('rb'), nargs='+',
|
|
help='Path to modules.yaml.gz file. '
|
|
'You may pass multiple files by passing -p path1 path2'
|
|
)
|
|
path_group.add_argument(
|
|
'-rp', '--repo-path',
|
|
required=False,
|
|
type=str,
|
|
default=None,
|
|
help='Path to a directory which contains repodirs. E.g. /var/repos'
|
|
)
|
|
path_group.add_argument(
|
|
'-rd', '--repodata-paths',
|
|
required=False,
|
|
type=str,
|
|
nargs='+',
|
|
default=[],
|
|
help='Paths/urls to the directories with directory `repodata`',
|
|
)
|
|
parser.add_argument('-t', '--target', required=True)
|
|
|
|
namespace = parser.parse_args()
|
|
if namespace.repodata_paths:
|
|
modules = []
|
|
for repodata_path in namespace.repodata_paths:
|
|
modules.append(read_modules_yaml_from_specific_repo(
|
|
repodata_path,
|
|
))
|
|
elif namespace.path is not None:
|
|
modules = namespace.path
|
|
else:
|
|
modules = grep_list_of_modules_yaml(namespace.repo_path)
|
|
modules = list(filter(lambda i: i is not None, modules))
|
|
collect_modules(
|
|
modules,
|
|
namespace.target,
|
|
namespace.get_only_modules_data,
|
|
namespace.get_only_modules_defaults_data,
|
|
)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
cli_main()
|