ALBS-334: Make the ability of Pungi to give module_defaults from remote sources #4
@ -16,6 +16,7 @@ import tempfile
|
|||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
from typing import AnyStr, Dict, List, Optional
|
from typing import AnyStr, Dict, List, Optional
|
||||||
|
|
||||||
|
import binascii
|
||||||
import createrepo_c as cr
|
import createrepo_c as cr
|
||||||
import dnf.subject
|
import dnf.subject
|
||||||
import hawkey
|
import hawkey
|
||||||
@ -25,8 +26,23 @@ import yaml
|
|||||||
from createrepo_c import Package
|
from createrepo_c import Package
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
|
|
||||||
from .gather_modules import is_gzip_file, is_xz_file
|
|
||||||
|
|
||||||
|
def _is_compressed_file(first_two_bytes: bytes, initial_bytes: bytes):
|
||||||
|
return binascii.hexlify(first_two_bytes) == initial_bytes
|
||||||
|
|
||||||
|
|
||||||
|
def is_gzip_file(first_two_bytes):
|
||||||
|
return _is_compressed_file(
|
||||||
|
first_two_bytes=first_two_bytes,
|
||||||
|
initial_bytes=b'1f8b',
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def is_xz_file(first_two_bytes):
|
||||||
|
return _is_compressed_file(
|
||||||
|
first_two_bytes=first_two_bytes,
|
||||||
|
initial_bytes=b'fd37',
|
||||||
|
)
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class RepoInfo:
|
class RepoInfo:
|
||||||
|
@ -1,4 +1,3 @@
|
|||||||
import binascii
|
|
||||||
import gzip
|
import gzip
|
||||||
import lzma
|
import lzma
|
||||||
import os
|
import os
|
||||||
@ -13,29 +12,11 @@ import yaml
|
|||||||
import createrepo_c as cr
|
import createrepo_c as cr
|
||||||
from typing.io import BinaryIO
|
from typing.io import BinaryIO
|
||||||
|
|
||||||
from .create_packages_json import PackagesGenerator
|
from .create_packages_json import PackagesGenerator, is_gzip_file, is_xz_file
|
||||||
|
|
||||||
EMPTY_FILE = '.empty'
|
EMPTY_FILE = '.empty'
|
||||||
|
|
||||||
|
|
||||||
def _is_compressed_file(first_two_bytes: bytes, initial_bytes: bytes):
|
|
||||||
return binascii.hexlify(first_two_bytes) == initial_bytes
|
|
||||||
|
|
||||||
|
|
||||||
def is_gzip_file(first_two_bytes):
|
|
||||||
return _is_compressed_file(
|
|
||||||
first_two_bytes=first_two_bytes,
|
|
||||||
initial_bytes=b'1f8b',
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def is_xz_file(first_two_bytes):
|
|
||||||
return _is_compressed_file(
|
|
||||||
first_two_bytes=first_two_bytes,
|
|
||||||
initial_bytes=b'fd37',
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def read_modules_yaml(modules_yaml_path: Union[str, Path]) -> BytesIO:
|
def read_modules_yaml(modules_yaml_path: Union[str, Path]) -> BytesIO:
|
||||||
with open(modules_yaml_path, 'rb') as fp:
|
with open(modules_yaml_path, 'rb') as fp:
|
||||||
return BytesIO(fp.read())
|
return BytesIO(fp.read())
|
||||||
|
Loading…
Reference in New Issue
Block a user