ALBS-334: Make the ability of Pungi to give module_defaults from remote sources #4
@ -2,7 +2,7 @@
|
||||
|
||||
Name: pungi
|
||||
Version: 4.2.15
|
||||
Release: 1%{?dist}.cloudlinux
|
||||
Release: 2%{?dist}.cloudlinux
|
||||
Summary: Distribution compose tool
|
||||
|
||||
License: GPLv2
|
||||
|
@ -161,7 +161,7 @@ class CreateExtraRepo(PackagesGenerator):
|
||||
if os.path.exists(self.default_modules_yaml_path):
|
||||
os.remove(self.default_modules_yaml_path)
|
||||
|
||||
def _get_remote_file_content(
|
||||
def get_remote_file_content(
|
||||
self,
|
||||
file_url: AnyStr,
|
||||
) -> AnyStr:
|
||||
|
@ -16,6 +16,7 @@ import tempfile
|
||||
from collections import defaultdict
|
||||
from typing import AnyStr, Dict, List, Optional
|
||||
|
||||
import binascii
|
||||
import createrepo_c as cr
|
||||
import dnf.subject
|
||||
import hawkey
|
||||
@ -25,7 +26,24 @@ import yaml
|
||||
from createrepo_c import Package
|
||||
from dataclasses import dataclass
|
||||
|
||||
from .gather_modules import is_gzip_file, is_xz_file
|
||||
|
||||
def _is_compressed_file(first_two_bytes: bytes, initial_bytes: bytes):
|
||||
return binascii.hexlify(first_two_bytes) == initial_bytes
|
||||
|
||||
|
||||
def is_gzip_file(first_two_bytes):
|
||||
return _is_compressed_file(
|
||||
first_two_bytes=first_two_bytes,
|
||||
initial_bytes=b'1f8b',
|
||||
)
|
||||
|
||||
|
||||
def is_xz_file(first_two_bytes):
|
||||
return _is_compressed_file(
|
||||
first_two_bytes=first_two_bytes,
|
||||
initial_bytes=b'fd37',
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class RepoInfo:
|
||||
@ -69,7 +87,7 @@ class PackagesGenerator:
|
||||
return True
|
||||
|
||||
@staticmethod
|
||||
def _get_remote_file_content(file_url: AnyStr) -> AnyStr:
|
||||
def get_remote_file_content(file_url: AnyStr) -> AnyStr:
|
||||
"""
|
||||
Get content from a remote file and write it to a temp file
|
||||
:param file_url: url of a remote file
|
||||
@ -194,7 +212,7 @@ class PackagesGenerator:
|
||||
'repomd.xml',
|
||||
)
|
||||
if repo_info.is_remote:
|
||||
repomd_file_path = self._get_remote_file_content(repomd_file_path)
|
||||
repomd_file_path = self.get_remote_file_content(repomd_file_path)
|
||||
else:
|
||||
repomd_file_path = repomd_file_path
|
||||
repomd_object = self._parse_repomd(repomd_file_path)
|
||||
@ -232,9 +250,8 @@ class PackagesGenerator:
|
||||
repomd_record.location_href,
|
||||
)
|
||||
if repo_info.is_remote:
|
||||
repomd_record_file_path = self._get_remote_file_content(
|
||||
repomd_record_file_path,
|
||||
)
|
||||
repomd_record_file_path = self.get_remote_file_content(
|
||||
repomd_record_file_path)
|
||||
if repomd_record.type == 'modules':
|
||||
modules_data = self._parse_modules_file(
|
||||
repomd_record_file_path,
|
||||
|
@ -1,73 +1,133 @@
|
||||
import binascii
|
||||
import gzip
|
||||
import lzma
|
||||
import os
|
||||
from argparse import ArgumentParser, FileType
|
||||
from io import BytesIO
|
||||
from pathlib import Path
|
||||
from typing import List, AnyStr
|
||||
from typing import List, AnyStr, Iterable, Union, Optional
|
||||
import logging
|
||||
from urllib.parse import urljoin
|
||||
|
||||
import yaml
|
||||
import createrepo_c as cr
|
||||
from typing.io import BinaryIO
|
||||
|
||||
from .create_packages_json import PackagesGenerator, is_gzip_file, is_xz_file
|
||||
|
||||
EMPTY_FILE = '.empty'
|
||||
|
||||
|
||||
def _is_compressed_file(first_two_bytes: bytes, initial_bytes: bytes):
|
||||
return binascii.hexlify(first_two_bytes) == initial_bytes
|
||||
def read_modules_yaml(modules_yaml_path: Union[str, Path]) -> BytesIO:
|
||||
with open(modules_yaml_path, 'rb') as fp:
|
||||
return BytesIO(fp.read())
|
||||
|
||||
|
||||
def is_gzip_file(first_two_bytes):
|
||||
return _is_compressed_file(
|
||||
first_two_bytes=first_two_bytes,
|
||||
initial_bytes=b'1f8b',
|
||||
)
|
||||
|
||||
|
||||
def is_xz_file(first_two_bytes):
|
||||
return _is_compressed_file(
|
||||
first_two_bytes=first_two_bytes,
|
||||
initial_bytes=b'fd37',
|
||||
)
|
||||
|
||||
|
||||
def grep_list_of_modules_yaml_gz(repo_path: AnyStr) -> List[BytesIO]:
|
||||
def grep_list_of_modules_yaml(repos_path: AnyStr) -> Iterable[BytesIO]:
|
||||
"""
|
||||
Find all of valid *modules.yaml.gz in repos
|
||||
:param repo_path: path to a directory which contains repodirs
|
||||
:return: list of content from *modules.yaml.gz
|
||||
:param repos_path: path to a directory which contains repo dirs
|
||||
:return: iterable object of content from *modules.yaml.*
|
||||
"""
|
||||
|
||||
result = []
|
||||
for path in Path(repo_path).rglob('repomd.xml'):
|
||||
repo_dir_path = Path(path.parent).parent
|
||||
repomd_obj = cr.Repomd(str(path))
|
||||
for record in repomd_obj.records:
|
||||
if record.type != 'modules':
|
||||
continue
|
||||
with open(os.path.join(
|
||||
repo_dir_path,
|
||||
return (
|
||||
read_modules_yaml_from_specific_repo(repo_path=path.parent)
|
||||
for path in Path(repos_path).rglob('repodata')
|
||||
)
|
||||
|
||||
|
||||
def _is_remote(path: str):
|
||||
return any(str(path).startswith(protocol)
|
||||
for protocol in ('http', 'https'))
|
||||
|
||||
|
||||
def read_modules_yaml_from_specific_repo(
|
||||
repo_path: Union[str, Path]
|
||||
) -> Optional[BytesIO]:
|
||||
"""
|
||||
Read modules_yaml from a specific repo (remote or local)
|
||||
:param repo_path: path/url to a specific repo
|
||||
(final dir should contain dir `repodata`)
|
||||
:return: iterable object of content from *modules.yaml.*
|
||||
"""
|
||||
|
||||
if _is_remote(repo_path):
|
||||
repomd_url = urljoin(
|
||||
repo_path + '/',
|
||||
'repodata/repomd.xml',
|
||||
)
|
||||
repomd_file_path = PackagesGenerator.get_remote_file_content(
|
||||
file_url=repomd_url
|
||||
)
|
||||
else:
|
||||
repomd_file_path = os.path.join(
|
||||
repo_path,
|
||||
'repodata/repomd.xml',
|
||||
)
|
||||
repomd_obj = cr.Repomd(str(repomd_file_path))
|
||||
for record in repomd_obj.records:
|
||||
if record.type != 'modules':
|
||||
continue
|
||||
else:
|
||||
if _is_remote(repo_path):
|
||||
modules_yaml_url = urljoin(
|
||||
repo_path + '/',
|
||||
record.location_href,
|
||||
), 'rb') as fp:
|
||||
result.append(
|
||||
BytesIO(fp.read())
|
||||
)
|
||||
return result
|
||||
modules_yaml_path = PackagesGenerator.get_remote_file_content(
|
||||
file_url=modules_yaml_url
|
||||
)
|
||||
else:
|
||||
modules_yaml_path = os.path.join(
|
||||
repo_path,
|
||||
record.location_href,
|
||||
)
|
||||
return read_modules_yaml(modules_yaml_path=modules_yaml_path)
|
||||
else:
|
||||
return None
|
||||
|
||||
|
||||
def collect_modules(modules_paths: List[BinaryIO], target_dir: str):
|
||||
def _should_grep_defaults(
|
||||
document_type: str,
|
||||
grep_only_modules_data: bool = False,
|
||||
grep_only_modules_defaults_data: bool = False,
|
||||
) -> bool:
|
||||
xor_flag = grep_only_modules_data == grep_only_modules_defaults_data
|
||||
if document_type == 'modulemd' and (xor_flag or grep_only_modules_data):
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def _should_grep_modules(
|
||||
document_type: str,
|
||||
grep_only_modules_data: bool = False,
|
||||
grep_only_modules_defaults_data: bool = False,
|
||||
) -> bool:
|
||||
xor_flag = grep_only_modules_data == grep_only_modules_defaults_data
|
||||
if document_type == 'modulemd-defaults' and \
|
||||
(xor_flag or grep_only_modules_defaults_data):
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def collect_modules(
|
||||
modules_paths: List[BinaryIO],
|
||||
target_dir: str,
|
||||
grep_only_modules_data: bool = False,
|
||||
grep_only_modules_defaults_data: bool = False,
|
||||
):
|
||||
"""
|
||||
Read given modules.yaml.gz files and export modules
|
||||
and modulemd files from it.
|
||||
Returns:
|
||||
object:
|
||||
"""
|
||||
xor_flag = grep_only_modules_defaults_data is grep_only_modules_data
|
||||
modules_path = os.path.join(target_dir, 'modules')
|
||||
module_defaults_path = os.path.join(target_dir, 'module_defaults')
|
||||
os.makedirs(modules_path, exist_ok=True)
|
||||
os.makedirs(module_defaults_path, exist_ok=True)
|
||||
if grep_only_modules_data or xor_flag:
|
||||
os.makedirs(modules_path, exist_ok=True)
|
||||
if grep_only_modules_defaults_data or xor_flag:
|
||||
os.makedirs(module_defaults_path, exist_ok=True)
|
||||
# Defaults modules can be empty, but pungi detects
|
||||
# empty folder while copying and raises the exception in this case
|
||||
Path(os.path.join(module_defaults_path, EMPTY_FILE)).touch()
|
||||
@ -80,11 +140,20 @@ def collect_modules(modules_paths: List[BinaryIO], target_dir: str):
|
||||
data = lzma.decompress(data)
|
||||
documents = yaml.load_all(data, Loader=yaml.BaseLoader)
|
||||
for doc in documents:
|
||||
if doc['document'] == 'modulemd-defaults':
|
||||
path = None
|
||||
if _should_grep_modules(
|
||||
doc['document'],
|
||||
grep_only_modules_data,
|
||||
grep_only_modules_defaults_data,
|
||||
):
|
||||
name = f"{doc['data']['module']}.yaml"
|
||||
path = os.path.join(module_defaults_path, name)
|
||||
logging.info('Found %s module defaults', name)
|
||||
else:
|
||||
elif _should_grep_defaults(
|
||||
doc['document'],
|
||||
grep_only_modules_data,
|
||||
grep_only_modules_defaults_data,
|
||||
):
|
||||
# pungi.phases.pkgset.sources.source_koji.get_koji_modules
|
||||
stream = doc['data']['stream'].replace('-', '_')
|
||||
doc_data = doc['data']
|
||||
@ -106,13 +175,24 @@ def collect_modules(modules_paths: List[BinaryIO], target_dir: str):
|
||||
'RPM %s does not have explicit list of artifacts',
|
||||
name
|
||||
)
|
||||
|
||||
with open(path, 'w') as f:
|
||||
yaml.dump(doc, f, default_flow_style=False)
|
||||
if path is not None:
|
||||
with open(path, 'w') as f:
|
||||
yaml.dump(doc, f, default_flow_style=False)
|
||||
|
||||
|
||||
def cli_main():
|
||||
parser = ArgumentParser()
|
||||
content_type_group = parser.add_mutually_exclusive_group(required=False)
|
||||
content_type_group.add_argument(
|
||||
'--get-only-modules-data',
|
||||
action='store_true',
|
||||
help='Parse and get only modules data',
|
||||
)
|
||||
content_type_group.add_argument(
|
||||
'--get-only-modules-defaults-data',
|
||||
action='store_true',
|
||||
help='Parse and get only modules_defaults data',
|
||||
)
|
||||
path_group = parser.add_mutually_exclusive_group(required=True)
|
||||
path_group.add_argument(
|
||||
'-p', '--path',
|
||||
@ -127,16 +207,33 @@ def cli_main():
|
||||
default=None,
|
||||
help='Path to a directory which contains repodirs. E.g. /var/repos'
|
||||
)
|
||||
path_group.add_argument(
|
||||
'-rd', '--repodata-paths',
|
||||
required=False,
|
||||
type=str,
|
||||
nargs='+',
|
||||
default=[],
|
||||
help='Paths/urls to the directories with directory `repodata`',
|
||||
)
|
||||
parser.add_argument('-t', '--target', required=True)
|
||||
|
||||
namespace = parser.parse_args()
|
||||
if namespace.repo_path is None:
|
||||
if namespace.repodata_paths:
|
||||
modules = []
|
||||
for repodata_path in namespace.repodata_paths:
|
||||
modules.append(read_modules_yaml_from_specific_repo(
|
||||
repodata_path,
|
||||
))
|
||||
elif namespace.path is not None:
|
||||
modules = namespace.path
|
||||
else:
|
||||
modules = grep_list_of_modules_yaml_gz(namespace.repo_path)
|
||||
modules = grep_list_of_modules_yaml(namespace.repo_path)
|
||||
modules = list(filter(lambda i: i is not None, modules))
|
||||
collect_modules(
|
||||
modules,
|
||||
namespace.target,
|
||||
namespace.get_only_modules_data,
|
||||
namespace.get_only_modules_defaults_data,
|
||||
)
|
||||
|
||||
|
||||
|
@ -33,7 +33,7 @@ test_repo_info_2 = RepoInfo(
|
||||
|
||||
|
||||
class TestPackagesJson(TestCase):
|
||||
def test_01__get_remote_file_content(self):
|
||||
def test_01_get_remote_file_content(self):
|
||||
"""
|
||||
Test the getting of content from a remote file
|
||||
"""
|
||||
@ -47,9 +47,8 @@ class TestPackagesJson(TestCase):
|
||||
'pungi.scripts.create_packages_json.tempfile.NamedTemporaryFile',
|
||||
) as mock_tempfile:
|
||||
mock_tempfile.return_value.__enter__.return_value.name = 'tmpfile'
|
||||
file_name = PackagesGenerator._get_remote_file_content(
|
||||
file_url='fakeurl'
|
||||
)
|
||||
file_name = PackagesGenerator.get_remote_file_content(
|
||||
file_url='fakeurl')
|
||||
mock_requests_get.assert_called_once_with(url='fakeurl')
|
||||
mock_tempfile.assert_called_once_with(delete=False)
|
||||
mock_tempfile.return_value.__enter__().\
|
||||
|
Loading…
Reference in New Issue
Block a user