albs-oval-errata-diff/albs_oval_errata_diff/comparer.py

255 lines
9.4 KiB
Python
Raw Normal View History

2022-12-28 16:21:40 +00:00
import bz2
import datetime
import re
import requests
from typing import Tuple, List, Dict, Any
import xml.etree.ElementTree as ET
import logging
import json
from .config import DOWNLOAD_DIR, NOT_BEFORE, RELEASES, SA_EXCLUDE, PACKAGES_EXCLUDE
from .sa import SecurityAdvisory
from .package import Package
def download_oval(url: str) -> str:
"""
download_oval downloads, decompreses oval file
and returns filepath of saved file
"""
r = requests.get(url, stream=True, timeout=30)
decompressor = bz2.BZ2Decompressor()
fname = url.split('/')[-1].replace('.bz2', '')
fpath = DOWNLOAD_DIR / fname
with open(fpath, 'wb') as fd:
for chunk in r.iter_content(chunk_size=128):
fd.write(decompressor.decompress(chunk))
return fpath
def download_errata(url: str, release_version: int) -> str:
"""
downloads errata_full.json file end returns file path
"""
response = requests.get(url, stream=True, timeout=30)
fname = f'alma-{release_version}.json'
fpath = DOWNLOAD_DIR / fname
with open(fpath, 'wb') as errata_file:
for chunk in response.iter_content(chunk_size=128):
errata_file.write(chunk)
return fpath
def parse_oval(fpath: str) -> Dict[str, SecurityAdvisory]:
"""
converting oval xml file to dict
"""
def extract_package(title: str) -> Package:
r = r'(.*) is earlier than \d+:(.+?(?=-))'
res = re.search(r, title)
name = res.group(1)
version = res.group(2)
return Package(name=name, version=version)
def extract_id(title: str) -> str:
r = r'[RH|AL]SA-(\d{4}:\d+)(.*)'
res = re.search(r, title)
return res.group(1)
tree = ET.parse(fpath)
root = tree.getroot()
ns = {
'n': 'http://oval.mitre.org/XMLSchema/oval-definitions-5',
}
res = {}
for definition in root.findall('n:definitions/', ns):
title = definition.find('n:metadata/n:title', ns).text
issued = definition.find(
'n:metadata/n:advisory/n:issued', ns).attrib['date']
issued_dt = datetime.datetime.strptime(issued, "%Y-%m-%d")
# we are only interesed in Security advisories after RHEL 8.3
if ('RHSA' not in title and 'ALSA' not in title) or issued_dt < NOT_BEFORE:
continue
sa_id = extract_id(title)
packages = [extract_package(i.attrib['comment']) for i in definition.findall(".//n:criterion", ns)
if 'is earlier than' in i.attrib['comment']]
res[sa_id] = SecurityAdvisory(
title=title, id=sa_id, packages=packages)
return res
def parse_errata(fpath: str) -> Dict[str, SecurityAdvisory]:
"""
parses alma errata file and converts it to dict of SA instances
"""
with open(fpath, 'r', encoding='utf-8') as file_to_load:
erratas = json.load(file_to_load)
res = {}
for errata in erratas['data']:
title = errata['title']
sa_id = errata['id'].split('-')[-1]
packages = []
for package in errata['packages']:
full_name = f"{package['name']}-{package['version']}"
if full_name not in packages:
packages.append(full_name)
packages.sort()
res[sa_id] = SecurityAdvisory(
title=title, id=sa_id, packages=packages)
return res
def compare(rhel_oval: Dict[str, SecurityAdvisory],
alma_oval: Dict[str, SecurityAdvisory],
alma_errata: Dict[str, SecurityAdvisory]) -> Tuple[dict, list]:
"""
compares rhel oval with alma oval and alma errata
"""
diff = []
report = {
# total amount of security advisories
'total_sa_count': 0,
# amount of SA that match with rhel
'good_sa_count': 0,
# total amount of differencies
'diff_count': 0,
# list of SA excluded from diff check
'excluded_sa': [],
# list of packages excluded from diff check
'excluded_pkg': [],
# amount of oval SA that dont exists in oval file
'oval_missing_sa_count': 0,
# amount of oval SA that have missing packages
'oval_missing_pkg_sa_count': 0,
# list of missing oval SA
'oval_missing_sa': [],
# list of oval SA that have missing packages
'oval_missing_pkg_sa': [],
# amount of SA that dont exists in errata file
'errata_missing_sa_count': 0,
# amount of errata SA that have missing packages
'errata_missing_pkg_sa_count': 0,
# list of SA that are missing in errata file
'errata_missing_sa': [],
# list of errata SA with missing packages
'errata_missing_pkg_sa': [],
# total amount of unique missing packages across all alma SA
'missing_packages_unique_count': 0,
# list of unique packages that missing across all alma SA
'missing_packages_unique': []
}
for rhel_sa_id, rhel_sa in rhel_oval.items():
report['total_sa_count'] += 1
sa_name = f'ALSA-{rhel_sa_id}'
# filtering out SA
if sa_name in SA_EXCLUDE:
report['excluded_sa'].append(sa_name)
continue
# filtefing out packages
packages_to_check: List[Package] = []
for p in rhel_sa.packages:
if any(p.name == i for i in PACKAGES_EXCLUDE):
if str(p) not in report['excluded_pkg']:
report['excluded_pkg'].append(str(p))
else:
packages_to_check.append(p)
# check oval
try:
alma_oval_sa = alma_oval[rhel_sa_id]
except KeyError:
report['diff_count'] += 1
diff.append({'sa_name': sa_name, 'diff': 'SA is missing in oval'})
report['oval_missing_sa'].append(sa_name)
report['oval_missing_sa_count'] += 1
else:
# check if some packages are missing from oval SA
alma_oval_packages = alma_oval_sa.packages
alma_oval_missing_packages = [str(r) for r in packages_to_check
if r not in alma_oval_packages]
if alma_oval_missing_packages:
report['diff_count'] += 1
diff.append({'sa_name': sa_name,
'diff': f"missing packages in oval SA: {','.join(alma_oval_missing_packages)}"})
report['oval_missing_pkg_sa'].append(sa_name)
report['oval_missing_pkg_sa_count'] += 1
for mp in alma_oval_missing_packages:
if mp not in report['missing_packages_unique']:
report['missing_packages_unique'].append(mp)
report['missing_packages_unique_count'] += 1
# check errata
try:
alma_errata_sa = alma_errata[rhel_sa_id]
except KeyError:
report['errata_missing_sa'].append(sa_name)
report['errata_missing_sa_count'] += 1
report['diff_count'] += 1
diff.append(
{'sa_name': sa_name, 'diff': 'SA is missing in errata'})
continue
# check if some packages are missing from errata SA
alma_errata_packages = alma_errata_sa.packages
alma_errata_missing_packages = [
str(r) for r in packages_to_check if r not in alma_errata_packages]
if alma_errata_missing_packages:
report['diff_count'] += 1
diff.append({'sa_name': sa_name,
'diff': f"missing packages in errata SA: {','.join(alma_errata_missing_packages)}"})
report['errata_missing_pkg_sa'].append(sa_name)
report['errata_missing_pkg_sa_count'] += 1
for mp in alma_errata_missing_packages:
if mp not in report['missing_packages_unique']:
report['missing_packages_unique'].append(mp)
report['missing_packages_unique_count'] += 1
else:
# if we here, all checks were passed
report['good_sa_count'] += 1
for item in report.values():
if isinstance(item, list):
item.sort()
return report, diff
# starting point
def comparer_run() -> Dict[str, Any]:
result = {}
for release, urls in RELEASES.items():
logging.info('Processing release %i', release)
logging.info('downloading rhel oval')
rhel_file = download_oval(urls['rhel_oval_url'])
logging.info('parsing rhel oval')
rhel_oval_dict = parse_oval(rhel_file)
logging.info('downloading alma oval')
alma_oval_file = download_oval(urls['alma_oval_url'])
logging.info('parsing alma oval')
alma_oval_dict = parse_oval(alma_oval_file)
logging.info('downloading alma errata')
alma_errata_file = download_errata(urls['alma_errata_url'], release)
logging.info('parsing alma errata')
alma_errata_dict = parse_errata(alma_errata_file)
logging.info('comparing rhel and alma')
report_release, diff_release = compare(
rhel_oval_dict, alma_oval_dict, alma_errata_dict)
result[release] = {'report': report_release,
'diff': diff_release,
'rhel_oval_url': urls['rhel_oval_url'],
'alma_oval_url': urls['alma_oval_url'],
'alma_errata_url': urls['alma_errata_url']}
result['report_generated'] = datetime.datetime.now().timestamp() * 1000
result['sa_not_before'] = NOT_BEFORE.timestamp() * 1000
return result