albs-oval-errata-diff/albs_oval_errata_diff/comparer.py

325 lines
13 KiB
Python
Raw Normal View History

2022-12-29 14:29:18 +00:00
"""
module comparer.py implemets difference checking logic
2022-12-29 14:29:18 +00:00
"""
2022-12-28 16:21:40 +00:00
import bz2
import datetime
2022-12-29 14:29:18 +00:00
from pathlib import Path
2022-12-28 16:21:40 +00:00
import re
from typing import Tuple, List, Dict, Any
import logging
import json
2022-12-29 14:29:18 +00:00
import xml.etree.ElementTree as ET
2022-12-28 16:21:40 +00:00
2022-12-29 14:29:18 +00:00
import requests
from .advisory import Advisory
from .albs import ALBS
2022-12-29 14:29:18 +00:00
from .config import Config
2022-12-28 16:21:40 +00:00
from .package import Package
2022-12-29 14:29:18 +00:00
def download_oval(url: str, download_dir: Path) -> str:
2022-12-28 16:21:40 +00:00
"""
download_oval downloads, decompreses oval file
and returns filepath of saved file
"""
2022-12-29 14:29:18 +00:00
response = requests.get(url, stream=True, timeout=30)
2022-12-28 16:21:40 +00:00
decompressor = bz2.BZ2Decompressor()
fname = url.split('/')[-1].replace('.bz2', '')
2022-12-29 14:29:18 +00:00
fpath = download_dir / fname
with open(fpath, 'wb') as flw:
for chunk in response.iter_content(chunk_size=128):
flw.write(decompressor.decompress(chunk))
2022-12-28 16:21:40 +00:00
return fpath
2022-12-29 14:29:18 +00:00
def download_errata(url: str, release_version: int, download_dir: Path) -> str:
2022-12-28 16:21:40 +00:00
"""
downloads errata_full.json file end returns file path
"""
response = requests.get(url, stream=True, timeout=30)
fname = f'alma-{release_version}.json'
2022-12-29 14:29:18 +00:00
fpath = download_dir / fname
2022-12-28 16:21:40 +00:00
with open(fpath, 'wb') as errata_file:
for chunk in response.iter_content(chunk_size=128):
errata_file.write(chunk)
return fpath
def extract_id_and_type(string: str) -> Tuple[str, str]:
2022-12-28 16:21:40 +00:00
"""
Extracts advisory id and advisory type from OVAL title or errata id
Example:
oval: "RHSA-2022:5749: .NET 6.0 bugfix update (Moderate)" -> (2022:5749, SA)
errata: ALSA-2022:6165 -> (id=2022:6165, SA)
"""
regexp = r'((RH|AL)(SA|BA|EA))-(\d{4}:\d+)'
res = re.search(regexp, string)
return res.group(4), res.group(3)
def parse_oval(fpath: str, not_before: datetime.datetime) -> Dict[str, Advisory]:
"""
Converts OVAL XML file to Dict of Advisories
2022-12-28 16:21:40 +00:00
"""
def extract_package(title: str) -> Package:
2022-12-29 14:29:18 +00:00
regexp = r'(.*) is earlier than \d+:(.+?(?=-))'
res = re.search(regexp, title)
2022-12-28 16:21:40 +00:00
name = res.group(1)
version = res.group(2)
return Package(name=name, version=version)
tree = ET.parse(fpath)
root = tree.getroot()
2022-12-29 14:29:18 +00:00
namespase = {
2022-12-28 16:21:40 +00:00
'n': 'http://oval.mitre.org/XMLSchema/oval-definitions-5',
}
res = {}
2022-12-29 14:29:18 +00:00
for definition in root.findall('n:definitions/', namespase):
title = definition.find('n:metadata/n:title', namespase).text
2022-12-28 16:21:40 +00:00
issued = definition.find(
2022-12-29 14:29:18 +00:00
'n:metadata/n:advisory/n:issued', namespase).attrib['date']
2022-12-28 16:21:40 +00:00
issued_dt = datetime.datetime.strptime(issued, "%Y-%m-%d")
# we are only interested in RHEL/OVAL SA/BA/EA
# released after RHEL 8.3
if not re.match(r'((RH|AL)(SA|BA|EA))', title) or issued_dt < not_before:
2022-12-28 16:21:40 +00:00
continue
# we are only interested in security based advisories
severity = definition.find(
'n:metadata/n:advisory/n:severity', namespase)
if severity is None:
continue
if severity.text.lower() not in ['low', 'moderate', 'important', 'critical']:
continue
advisory_id, advisory_type = extract_id_and_type(title)
2022-12-29 14:29:18 +00:00
packages = [extract_package(i.attrib['comment']) for
i in definition.findall(".//n:criterion", namespase)
2022-12-28 16:21:40 +00:00
if 'is earlier than' in i.attrib['comment']]
res[advisory_id] = Advisory(title=title, id=advisory_id,
advisory_type=advisory_type,
packages=packages)
2022-12-28 16:21:40 +00:00
return res
def parse_errata(fpath: str) -> Dict[str, Advisory]:
2022-12-28 16:21:40 +00:00
"""
Parses Alma Errata file and converts it to dict of Advisory instances
2022-12-28 16:21:40 +00:00
"""
with open(fpath, 'r', encoding='utf-8') as file_to_load:
erratas = json.load(file_to_load)
res = {}
for errata in erratas['data']:
title = errata['title']
advisory_id, advisory_type = extract_id_and_type(errata['id'])
2022-12-28 16:21:40 +00:00
packages = []
for package in errata['packages']:
full_name = f"{package['name']}-{package['version']}"
if full_name not in packages:
packages.append(full_name)
packages.sort()
res[advisory_id] = Advisory(title=title,
id=advisory_id,
advisory_type=advisory_type,
packages=packages)
2022-12-28 16:21:40 +00:00
return res
def compare(rhel_oval: Dict[str, Advisory],
alma_oval: Dict[str, Advisory],
alma_errata: Dict[str, Advisory],
advisory_exclude: List[str],
packages_exclude: List[str],
albs: ALBS,
release: str) -> Tuple[dict, list]:
2022-12-28 16:21:40 +00:00
"""
compares rhel oval with alma oval and alma errata
"""
diff = []
2022-12-28 16:21:40 +00:00
report = {
# total amount of security advisories
'total_advisory_count': 0,
# amount of ALMA advisory that match with RHEL
'good_advisory_count': 0,
2022-12-28 16:21:40 +00:00
# total amount of differencies
'diff_count': 0,
# list of advisories excluded from diff check
'excluded_adv': [],
2022-12-28 16:21:40 +00:00
# list of packages excluded from diff check
'excluded_pkg': [],
# amount of oval advisories that dont exists in oval file
'oval_missing_advisory_count': 0,
# amount of oval advisories that have missing packages
'oval_missing_pkg_advisory_count': 0,
# list of missing oval advisories
'oval_missing_advisory': [],
# list of oval advisories that have missing packages
'oval_missing_pkg_advisory': [],
# amount of advisories that dont exists in errata file
'errata_missing_advisory_count': 0,
# amount of errata advisories that have missing packages
'errata_missing_pkg_advisory_count': 0,
# list of advisories that are missing in errata file
'errata_missing_advisory': [],
# list of errata advisories with missing packages
'errata_missing_pkg_advisory': [],
2022-12-28 16:21:40 +00:00
# total amount of unique missing packages across all alma SA
'missing_packages_unique_count': 0,
# list of unique packages that missing across all alma SA
'missing_packages_unique': [],
# contains errata release status from buildsystem
# this list populated for missing advisories only
'miss_adv_albs_errata_release_status': [],
2022-12-28 16:21:40 +00:00
}
for rhel_advisory_id, rhel_advisory in rhel_oval.items():
report['total_advisory_count'] += 1
advisory_name = f'AL{rhel_advisory.advisory_type}-{rhel_advisory_id}'
2022-12-28 16:21:40 +00:00
# filtering out advisories
if advisory_name in advisory_exclude:
report['excluded_advisory'].append(advisory_name)
2022-12-28 16:21:40 +00:00
continue
# filtefing out packages
packages_to_check: List[Package] = []
for package in rhel_advisory.packages:
2022-12-29 14:29:18 +00:00
if any(package.name == i for i in packages_exclude):
if str(package) not in report['excluded_pkg']:
report['excluded_pkg'].append(str(package))
2022-12-28 16:21:40 +00:00
else:
2022-12-29 14:29:18 +00:00
packages_to_check.append(package)
2022-12-28 16:21:40 +00:00
# check oval
try:
alma_oval_advisory = alma_oval[rhel_advisory_id]
2022-12-28 16:21:40 +00:00
except KeyError:
report['diff_count'] += 1
diff.append({'advisory_name': advisory_name,
'diff': 'Advisory is missing in OVAL'})
report['oval_missing_advisory'].append(advisory_name)
report['oval_missing_advisory_count'] += 1
2022-12-28 16:21:40 +00:00
else:
# check if some packages are missing from OVAL advisories
alma_oval_packages = alma_oval_advisory.packages
2022-12-28 16:21:40 +00:00
alma_oval_missing_packages = [str(r) for r in packages_to_check
if str(r) not in [str(i) for i in alma_oval_packages]]
2022-12-28 16:21:40 +00:00
if alma_oval_missing_packages:
report['diff_count'] += 1
diff_str = f"OVAL advisory has missing packages: {','.join(alma_oval_missing_packages)}"
diff.append({'advisory_name': advisory_name,
2022-12-29 14:29:18 +00:00
'diff': diff_str})
report['oval_missing_pkg_advisory'].append(advisory_name)
report['oval_missing_pkg_advisory_count'] += 1
2022-12-29 14:29:18 +00:00
for missing_package in alma_oval_missing_packages:
if missing_package not in report['missing_packages_unique']:
report['missing_packages_unique'].append(
missing_package)
2022-12-28 16:21:40 +00:00
report['missing_packages_unique_count'] += 1
# check errata
try:
alma_errata_sa = alma_errata[rhel_advisory_id]
2022-12-28 16:21:40 +00:00
except KeyError:
report['errata_missing_advisory'].append(advisory_name)
report['errata_missing_advisory_count'] += 1
2022-12-28 16:21:40 +00:00
report['diff_count'] += 1
diff.append(
{'advisory_name': advisory_name, 'diff': 'Advisory is missing in Errata'})
2022-12-28 16:21:40 +00:00
continue
# check if some packages are missing from errata SA
alma_errata_packages = alma_errata_sa.packages
alma_errata_missing_packages = \
[str(r) for r in packages_to_check
if str(r) not in [str(i) for i in alma_errata_packages]]
2022-12-28 16:21:40 +00:00
if alma_errata_missing_packages:
report['diff_count'] += 1
mp_string = ','.join(alma_errata_missing_packages)
diff_str = f"Errata advisory has missing packages: {mp_string}"
diff.append({'advisory_name': advisory_name,
2022-12-29 14:29:18 +00:00
'diff': diff_str})
report['errata_missing_pkg_advisory'].append(advisory_name)
report['errata_missing_pkg_advisory_count'] += 1
2022-12-29 14:29:18 +00:00
for missing_package in alma_errata_missing_packages:
if missing_package not in report['missing_packages_unique']:
report['missing_packages_unique'].append(missing_package)
2022-12-28 16:21:40 +00:00
report['missing_packages_unique_count'] += 1
else:
# if we here, all checks were passed
report['good_advisory_count'] += 1
2022-12-28 16:21:40 +00:00
# albs errata flow
logging.info('Getting errata release status for missing advisories')
missing_advisories = report['errata_missing_advisory'] + \
report['oval_missing_advisory']
missing_advisories = list(dict.fromkeys(missing_advisories))
for adv in missing_advisories:
try:
release_status = albs.get_errata_status(
adv, f'AlmaLinux-{release}')
except Exception as err: # pylint: disable=broad-except
logging.error("cant get release status for %s: %s", adv, err)
continue
if release_status is None:
release_status = 'not-found-in-errata-flow'
report['miss_adv_albs_errata_release_status'].append(
{"advisory": adv, "release_status": release_status})
2022-12-28 16:21:40 +00:00
return report, diff
# starting point
2022-12-29 14:29:18 +00:00
def comparer_run(config: Config) -> Dict[str, Any]:
"""
comperer_run is the starting point of comparer component
"""
2022-12-28 16:21:40 +00:00
result = {}
2022-12-29 14:29:18 +00:00
for release, urls in config.releases.items():
2022-12-28 16:21:40 +00:00
logging.info('Processing release %i', release)
2022-12-29 16:08:51 +00:00
logging.info('Downloading rhel oval')
2022-12-29 14:29:18 +00:00
rhel_file = download_oval(urls.rhel_oval_url, config.download_dir)
2022-12-29 16:08:51 +00:00
logging.info('Parsing rhel oval')
2022-12-29 14:29:18 +00:00
rhel_oval_dict = parse_oval(rhel_file, config.not_before)
2022-12-28 16:21:40 +00:00
2022-12-29 16:08:51 +00:00
logging.info('Downloading alma oval')
2022-12-29 14:29:18 +00:00
alma_oval_file = download_oval(
urls.alma_oval_url, download_dir=config.download_dir)
2022-12-29 16:08:51 +00:00
logging.info('Parsing alma oval')
2022-12-29 14:29:18 +00:00
alma_oval_dict = parse_oval(alma_oval_file, config.not_before)
2022-12-28 16:21:40 +00:00
2022-12-29 16:08:51 +00:00
logging.info('Downloading alma errata')
2022-12-29 14:29:18 +00:00
alma_errata_file = download_errata(urls.alma_errata_url,
release, config.download_dir)
2022-12-29 16:08:51 +00:00
logging.info('Parsing alma errata')
2022-12-28 16:21:40 +00:00
alma_errata_dict = parse_errata(alma_errata_file)
2022-12-29 16:08:51 +00:00
logging.info('Comparing rhel and alma')
albs = ALBS(config.albs_url,
config.albs_jwt_token,
config.albs_timeout)
report_release, diff_release =\
2022-12-29 14:29:18 +00:00
compare(rhel_oval_dict,
alma_oval_dict,
alma_errata_dict,
config.advisory_exclude,
config.packages_exclude,
albs, release)
2022-12-28 16:21:40 +00:00
result[release] = {'report': report_release,
'diff': diff_release,
2022-12-29 14:29:18 +00:00
'rhel_oval_url': urls.rhel_oval_url,
'alma_oval_url': urls.alma_oval_url,
'alma_errata_url': urls.alma_errata_url}
2022-12-28 16:21:40 +00:00
result['report_generated'] = datetime.datetime.now().timestamp() * 1000
result['advisory_not_before'] = config.not_before.timestamp() * 1000
2022-12-28 16:21:40 +00:00
return result