277 lines
10 KiB
277 lines
10 KiB
package comparer.py implemets difference checking logic
import bz2
import datetime
from pathlib import Path
import re
from typing import Tuple, List, Dict, Any
import logging
import json
import xml.etree.ElementTree as ET
import requests
from .config import Config
from .package import Package
from .sa import SecurityAdvisory
def download_oval(url: str, download_dir: Path) -> str:
download_oval downloads, decompreses oval file
and returns filepath of saved file
response = requests.get(url, stream=True, timeout=30)
decompressor = bz2.BZ2Decompressor()
fname = url.split('/')[-1].replace('.bz2', '')
fpath = download_dir / fname
with open(fpath, 'wb') as flw:
for chunk in response.iter_content(chunk_size=128):
return fpath
def download_errata(url: str, release_version: int, download_dir: Path) -> str:
downloads errata_full.json file end returns file path
response = requests.get(url, stream=True, timeout=30)
fname = f'alma-{release_version}.json'
fpath = download_dir / fname
with open(fpath, 'wb') as errata_file:
for chunk in response.iter_content(chunk_size=128):
return fpath
def parse_oval(fpath: str, not_before: datetime.datetime) -> Dict[str, SecurityAdvisory]:
converting oval xml file to dict
def extract_package(title: str) -> Package:
regexp = r'(.*) is earlier than \d+:(.+?(?=-))'
res = re.search(regexp, title)
name = res.group(1)
version = res.group(2)
return Package(name=name, version=version)
def extract_id(title: str) -> str:
regexp = r'[RH|AL]SA-(\d{4}:\d+)(.*)'
res = re.search(regexp, title)
return res.group(1)
tree = ET.parse(fpath)
root = tree.getroot()
namespase = {
'n': 'http://oval.mitre.org/XMLSchema/oval-definitions-5',
res = {}
for definition in root.findall('n:definitions/', namespase):
title = definition.find('n:metadata/n:title', namespase).text
issued = definition.find(
'n:metadata/n:advisory/n:issued', namespase).attrib['date']
issued_dt = datetime.datetime.strptime(issued, "%Y-%m-%d")
# we are only interesed in Security advisories after RHEL 8.3
if ('RHSA' not in title and 'ALSA' not in title) or issued_dt < not_before:
sa_id = extract_id(title)
packages = [extract_package(i.attrib['comment']) for
i in definition.findall(".//n:criterion", namespase)
if 'is earlier than' in i.attrib['comment']]
res[sa_id] = SecurityAdvisory(
title=title, id=sa_id, packages=packages)
return res
def parse_errata(fpath: str) -> Dict[str, SecurityAdvisory]:
parses alma errata file and converts it to dict of SA instances
with open(fpath, 'r', encoding='utf-8') as file_to_load:
erratas = json.load(file_to_load)
res = {}
for errata in erratas['data']:
title = errata['title']
sa_id = errata['id'].split('-')[-1]
packages = []
for package in errata['packages']:
full_name = f"{package['name']}-{package['version']}"
if full_name not in packages:
res[sa_id] = SecurityAdvisory(
title=title, id=sa_id, packages=packages)
return res
def compare(rhel_oval: Dict[str, SecurityAdvisory],
alma_oval: Dict[str, SecurityAdvisory],
alma_errata: Dict[str, SecurityAdvisory],
sa_exclude: List[str],
packages_exclude: List[str]) -> Tuple[dict, list]:
compares rhel oval with alma oval and alma errata
diff = []
report = {
# total amount of security advisories
'total_sa_count': 0,
# amount of SA that match with rhel
'good_sa_count': 0,
# total amount of differencies
'diff_count': 0,
# list of SA excluded from diff check
'excluded_sa': [],
# list of packages excluded from diff check
'excluded_pkg': [],
# amount of oval SA that dont exists in oval file
'oval_missing_sa_count': 0,
# amount of oval SA that have missing packages
'oval_missing_pkg_sa_count': 0,
# list of missing oval SA
'oval_missing_sa': [],
# list of oval SA that have missing packages
'oval_missing_pkg_sa': [],
# amount of SA that dont exists in errata file
'errata_missing_sa_count': 0,
# amount of errata SA that have missing packages
'errata_missing_pkg_sa_count': 0,
# list of SA that are missing in errata file
'errata_missing_sa': [],
# list of errata SA with missing packages
'errata_missing_pkg_sa': [],
# total amount of unique missing packages across all alma SA
'missing_packages_unique_count': 0,
# list of unique packages that missing across all alma SA
'missing_packages_unique': []
for rhel_sa_id, rhel_sa in rhel_oval.items():
report['total_sa_count'] += 1
sa_name = f'ALSA-{rhel_sa_id}'
# filtering out SA
if sa_name in sa_exclude:
# filtefing out packages
packages_to_check: List[Package] = []
for package in rhel_sa.packages:
if any(package.name == i for i in packages_exclude):
if str(package) not in report['excluded_pkg']:
# check oval
alma_oval_sa = alma_oval[rhel_sa_id]
except KeyError:
report['diff_count'] += 1
diff.append({'sa_name': sa_name, 'diff': 'SA is missing in oval'})
report['oval_missing_sa_count'] += 1
# check if some packages are missing from oval SA
alma_oval_packages = alma_oval_sa.packages
alma_oval_missing_packages = [str(r) for r in packages_to_check
if str(r) not in [str(i) for i in alma_oval_packages]]
if alma_oval_missing_packages:
report['diff_count'] += 1
diff_str = f"missing packages in oval SA: {','.join(alma_oval_missing_packages)}"
diff.append({'sa_name': sa_name,
'diff': diff_str})
report['oval_missing_pkg_sa_count'] += 1
for missing_package in alma_oval_missing_packages:
if missing_package not in report['missing_packages_unique']:
report['missing_packages_unique_count'] += 1
# check errata
alma_errata_sa = alma_errata[rhel_sa_id]
except KeyError:
report['errata_missing_sa_count'] += 1
report['diff_count'] += 1
{'sa_name': sa_name, 'diff': 'SA is missing in errata'})
# check if some packages are missing from errata SA
alma_errata_packages = alma_errata_sa.packages
alma_errata_missing_packages = \
[str(r) for r in packages_to_check
if str(r) not in [str(i) for i in alma_errata_packages]]
if alma_errata_missing_packages:
report['diff_count'] += 1
diff_str = f"missing packages in errata SA: {','.join(alma_errata_missing_packages)}"
diff.append({'sa_name': sa_name,
'diff': diff_str})
report['errata_missing_pkg_sa_count'] += 1
for missing_package in alma_errata_missing_packages:
if missing_package not in report['missing_packages_unique']:
report['missing_packages_unique_count'] += 1
# if we here, all checks were passed
report['good_sa_count'] += 1
for item in report.values():
if isinstance(item, list):
return report, diff
# starting point
def comparer_run(config: Config) -> Dict[str, Any]:
comperer_run is the starting point of comparer component
result = {}
for release, urls in config.releases.items():
logging.info('Processing release %i', release)
logging.info('Downloading rhel oval')
rhel_file = download_oval(urls.rhel_oval_url, config.download_dir)
logging.info('Parsing rhel oval')
rhel_oval_dict = parse_oval(rhel_file, config.not_before)
logging.info('Downloading alma oval')
alma_oval_file = download_oval(
urls.alma_oval_url, download_dir=config.download_dir)
logging.info('Parsing alma oval')
alma_oval_dict = parse_oval(alma_oval_file, config.not_before)
logging.info('Downloading alma errata')
alma_errata_file = download_errata(urls.alma_errata_url,
release, config.download_dir)
logging.info('Parsing alma errata')
alma_errata_dict = parse_errata(alma_errata_file)
logging.info('Comparing rhel and alma')
report_release, diff_release = \
result[release] = {'report': report_release,
'diff': diff_release,
'rhel_oval_url': urls.rhel_oval_url,
'alma_oval_url': urls.alma_oval_url,
'alma_errata_url': urls.alma_errata_url}
result['report_generated'] = datetime.datetime.now().timestamp() * 1000
result['sa_not_before'] = config.not_before.timestamp() * 1000
return result