albs-oval-errata-diff/albs_oval_errata_diff/comparer.py

325 lines
13 KiB
Python

"""
module comparer.py implemets difference checking logic
"""
import bz2
import datetime
from pathlib import Path
import re
from typing import Tuple, List, Dict, Any
import logging
import json
import xml.etree.ElementTree as ET
import requests
from .advisory import Advisory
from .albs import ALBS
from .config import Config
from .package import Package
def download_oval(url: str, download_dir: Path) -> str:
"""
download_oval downloads, decompreses oval file
and returns filepath of saved file
"""
response = requests.get(url, stream=True, timeout=30)
decompressor = bz2.BZ2Decompressor()
fname = url.split('/')[-1].replace('.bz2', '')
fpath = download_dir / fname
with open(fpath, 'wb') as flw:
for chunk in response.iter_content(chunk_size=128):
flw.write(decompressor.decompress(chunk))
return fpath
def download_errata(url: str, release_version: int, download_dir: Path) -> str:
"""
downloads errata_full.json file end returns file path
"""
response = requests.get(url, stream=True, timeout=30)
fname = f'alma-{release_version}.json'
fpath = download_dir / fname
with open(fpath, 'wb') as errata_file:
for chunk in response.iter_content(chunk_size=128):
errata_file.write(chunk)
return fpath
def extract_id_and_type(string: str) -> Tuple[str, str]:
"""
Extracts advisory id and advisory type from OVAL title or errata id
Example:
oval: "RHSA-2022:5749: .NET 6.0 bugfix update (Moderate)" -> (2022:5749, SA)
errata: ALSA-2022:6165 -> (id=2022:6165, SA)
"""
regexp = r'((RH|AL)(SA|BA|EA))-(\d{4}:\d+)'
res = re.search(regexp, string)
return res.group(4), res.group(3)
def parse_oval(fpath: str, not_before: datetime.datetime) -> Dict[str, Advisory]:
"""
Converts OVAL XML file to Dict of Advisories
"""
def extract_package(title: str) -> Package:
regexp = r'(.*) is earlier than \d+:(.+?(?=-))'
res = re.search(regexp, title)
name = res.group(1)
version = res.group(2)
return Package(name=name, version=version)
tree = ET.parse(fpath)
root = tree.getroot()
namespase = {
'n': 'http://oval.mitre.org/XMLSchema/oval-definitions-5',
}
res = {}
for definition in root.findall('n:definitions/', namespase):
title = definition.find('n:metadata/n:title', namespase).text
issued = definition.find(
'n:metadata/n:advisory/n:issued', namespase).attrib['date']
issued_dt = datetime.datetime.strptime(issued, "%Y-%m-%d")
# we are only interested in RHEL/OVAL SA/BA/EA
# released after RHEL 8.3
if not re.match(r'((RH|AL)(SA|BA|EA))', title) or issued_dt < not_before:
continue
# we are only interested in security based advisories
severity = definition.find(
'n:metadata/n:advisory/n:severity', namespase)
if severity is None:
continue
if severity.text.lower() not in ['low', 'moderate', 'important', 'critical']:
continue
advisory_id, advisory_type = extract_id_and_type(title)
packages = [extract_package(i.attrib['comment']) for
i in definition.findall(".//n:criterion", namespase)
if 'is earlier than' in i.attrib['comment']]
res[advisory_id] = Advisory(title=title, id=advisory_id,
advisory_type=advisory_type,
packages=packages)
return res
def parse_errata(fpath: str) -> Dict[str, Advisory]:
"""
Parses Alma Errata file and converts it to dict of Advisory instances
"""
with open(fpath, 'r', encoding='utf-8') as file_to_load:
erratas = json.load(file_to_load)
res = {}
for errata in erratas['data']:
title = errata['title']
advisory_id, advisory_type = extract_id_and_type(errata['id'])
packages = []
for package in errata['packages']:
full_name = f"{package['name']}-{package['version']}"
if full_name not in packages:
packages.append(full_name)
packages.sort()
res[advisory_id] = Advisory(title=title,
id=advisory_id,
advisory_type=advisory_type,
packages=packages)
return res
def compare(rhel_oval: Dict[str, Advisory],
alma_oval: Dict[str, Advisory],
alma_errata: Dict[str, Advisory],
advisory_exclude: List[str],
packages_exclude: List[str],
albs: ALBS,
release: str) -> Tuple[dict, list]:
"""
compares rhel oval with alma oval and alma errata
"""
diff = []
report = {
# total amount of security advisories
'total_advisory_count': 0,
# amount of ALMA advisory that match with RHEL
'good_advisory_count': 0,
# total amount of differencies
'diff_count': 0,
# list of advisories excluded from diff check
'excluded_adv': [],
# list of packages excluded from diff check
'excluded_pkg': [],
# amount of oval advisories that dont exists in oval file
'oval_missing_advisory_count': 0,
# amount of oval advisories that have missing packages
'oval_missing_pkg_advisory_count': 0,
# list of missing oval advisories
'oval_missing_advisory': [],
# list of oval advisories that have missing packages
'oval_missing_pkg_advisory': [],
# amount of advisories that dont exists in errata file
'errata_missing_advisory_count': 0,
# amount of errata advisories that have missing packages
'errata_missing_pkg_advisory_count': 0,
# list of advisories that are missing in errata file
'errata_missing_advisory': [],
# list of errata advisories with missing packages
'errata_missing_pkg_advisory': [],
# total amount of unique missing packages across all alma SA
'missing_packages_unique_count': 0,
# list of unique packages that missing across all alma SA
'missing_packages_unique': [],
# contains errata release status from buildsystem
# this list populated for missing advisories only
'miss_adv_albs_errata_release_status': [],
}
for rhel_advisory_id, rhel_advisory in rhel_oval.items():
report['total_advisory_count'] += 1
advisory_name = f'AL{rhel_advisory.advisory_type}-{rhel_advisory_id}'
# filtering out advisories
if advisory_name in advisory_exclude:
report['excluded_advisory'].append(advisory_name)
continue
# filtefing out packages
packages_to_check: List[Package] = []
for package in rhel_advisory.packages:
if any(package.name == i for i in packages_exclude):
if str(package) not in report['excluded_pkg']:
report['excluded_pkg'].append(str(package))
else:
packages_to_check.append(package)
# check oval
try:
alma_oval_advisory = alma_oval[rhel_advisory_id]
except KeyError:
report['diff_count'] += 1
diff.append({'advisory_name': advisory_name,
'diff': 'Advisory is missing in OVAL'})
report['oval_missing_advisory'].append(advisory_name)
report['oval_missing_advisory_count'] += 1
else:
# check if some packages are missing from OVAL advisories
alma_oval_packages = alma_oval_advisory.packages
alma_oval_missing_packages = [str(r) for r in packages_to_check
if str(r) not in [str(i) for i in alma_oval_packages]]
if alma_oval_missing_packages:
report['diff_count'] += 1
diff_str = f"OVAL advisory has missing packages: {','.join(alma_oval_missing_packages)}"
diff.append({'advisory_name': advisory_name,
'diff': diff_str})
report['oval_missing_pkg_advisory'].append(advisory_name)
report['oval_missing_pkg_advisory_count'] += 1
for missing_package in alma_oval_missing_packages:
if missing_package not in report['missing_packages_unique']:
report['missing_packages_unique'].append(
missing_package)
report['missing_packages_unique_count'] += 1
# check errata
try:
alma_errata_sa = alma_errata[rhel_advisory_id]
except KeyError:
report['errata_missing_advisory'].append(advisory_name)
report['errata_missing_advisory_count'] += 1
report['diff_count'] += 1
diff.append(
{'advisory_name': advisory_name, 'diff': 'Advisory is missing in Errata'})
continue
# check if some packages are missing from errata SA
alma_errata_packages = alma_errata_sa.packages
alma_errata_missing_packages = \
[str(r) for r in packages_to_check
if str(r) not in [str(i) for i in alma_errata_packages]]
if alma_errata_missing_packages:
report['diff_count'] += 1
mp_string = ','.join(alma_errata_missing_packages)
diff_str = f"Errata advisory has missing packages: {mp_string}"
diff.append({'advisory_name': advisory_name,
'diff': diff_str})
report['errata_missing_pkg_advisory'].append(advisory_name)
report['errata_missing_pkg_advisory_count'] += 1
for missing_package in alma_errata_missing_packages:
if missing_package not in report['missing_packages_unique']:
report['missing_packages_unique'].append(missing_package)
report['missing_packages_unique_count'] += 1
else:
# if we here, all checks were passed
report['good_advisory_count'] += 1
# albs errata flow
logging.info('Getting errata release status for missing advisories')
missing_advisories = report['errata_missing_advisory'] + \
report['oval_missing_advisory']
missing_advisories = list(dict.fromkeys(missing_advisories))
for adv in missing_advisories:
try:
release_status = albs.get_errata_status(
adv, f'AlmaLinux-{release}')
except Exception as err: # pylint: disable=broad-except
logging.error("cant get release status for %s: %s", adv, err)
continue
if release_status is None:
release_status = 'not-found-in-errata-flow'
report['miss_adv_albs_errata_release_status'].append(
{"advisory": adv, "release_status": release_status})
return report, diff
# starting point
def comparer_run(config: Config) -> Dict[str, Any]:
"""
comperer_run is the starting point of comparer component
"""
result = {}
for release, urls in config.releases.items():
logging.info('Processing release %i', release)
logging.info('Downloading rhel oval')
rhel_file = download_oval(urls.rhel_oval_url, config.download_dir)
logging.info('Parsing rhel oval')
rhel_oval_dict = parse_oval(rhel_file, config.not_before)
logging.info('Downloading alma oval')
alma_oval_file = download_oval(
urls.alma_oval_url, download_dir=config.download_dir)
logging.info('Parsing alma oval')
alma_oval_dict = parse_oval(alma_oval_file, config.not_before)
logging.info('Downloading alma errata')
alma_errata_file = download_errata(urls.alma_errata_url,
release, config.download_dir)
logging.info('Parsing alma errata')
alma_errata_dict = parse_errata(alma_errata_file)
logging.info('Comparing rhel and alma')
albs = ALBS(config.albs_url,
config.albs_jwt_token,
config.albs_timeout)
report_release, diff_release =\
compare(rhel_oval_dict,
alma_oval_dict,
alma_errata_dict,
config.advisory_exclude,
config.packages_exclude,
albs, release)
result[release] = {'report': report_release,
'diff': diff_release,
'rhel_oval_url': urls.rhel_oval_url,
'alma_oval_url': urls.alma_oval_url,
'alma_errata_url': urls.alma_errata_url}
result['report_generated'] = datetime.datetime.now().timestamp() * 1000
result['advisory_not_before'] = config.not_before.timestamp() * 1000
return result