Added basic functionality

This commit is contained in:
Kirill Zhukov 2022-12-28 17:21:40 +01:00
parent 4961530c19
commit 798f7e4b65
8 changed files with 411 additions and 0 deletions

5
.gitignore vendored Normal file
View File

@ -0,0 +1,5 @@
venv
logs
results
*.pyc
__pycache__

4
albs-oval-errata-diff.py Normal file
View File

@ -0,0 +1,4 @@
from albs_oval_erratta_diff.start import start
start()

View File

View File

@ -0,0 +1,254 @@
import bz2
import datetime
import re
import requests
from typing import Tuple, List, Dict, Any
import xml.etree.ElementTree as ET
import logging
import json
from .config import DOWNLOAD_DIR, NOT_BEFORE, RELEASES, SA_EXCLUDE, PACKAGES_EXCLUDE
from .sa import SecurityAdvisory
from .package import Package
def download_oval(url: str) -> str:
"""
download_oval downloads, decompreses oval file
and returns filepath of saved file
"""
r = requests.get(url, stream=True, timeout=30)
decompressor = bz2.BZ2Decompressor()
fname = url.split('/')[-1].replace('.bz2', '')
fpath = DOWNLOAD_DIR / fname
with open(fpath, 'wb') as fd:
for chunk in r.iter_content(chunk_size=128):
fd.write(decompressor.decompress(chunk))
return fpath
def download_errata(url: str, release_version: int) -> str:
"""
downloads errata_full.json file end returns file path
"""
response = requests.get(url, stream=True, timeout=30)
fname = f'alma-{release_version}.json'
fpath = DOWNLOAD_DIR / fname
with open(fpath, 'wb') as errata_file:
for chunk in response.iter_content(chunk_size=128):
errata_file.write(chunk)
return fpath
def parse_oval(fpath: str) -> Dict[str, SecurityAdvisory]:
"""
converting oval xml file to dict
"""
def extract_package(title: str) -> Package:
r = r'(.*) is earlier than \d+:(.+?(?=-))'
res = re.search(r, title)
name = res.group(1)
version = res.group(2)
return Package(name=name, version=version)
def extract_id(title: str) -> str:
r = r'[RH|AL]SA-(\d{4}:\d+)(.*)'
res = re.search(r, title)
return res.group(1)
tree = ET.parse(fpath)
root = tree.getroot()
ns = {
'n': 'http://oval.mitre.org/XMLSchema/oval-definitions-5',
}
res = {}
for definition in root.findall('n:definitions/', ns):
title = definition.find('n:metadata/n:title', ns).text
issued = definition.find(
'n:metadata/n:advisory/n:issued', ns).attrib['date']
issued_dt = datetime.datetime.strptime(issued, "%Y-%m-%d")
# we are only interesed in Security advisories after RHEL 8.3
if ('RHSA' not in title and 'ALSA' not in title) or issued_dt < NOT_BEFORE:
continue
sa_id = extract_id(title)
packages = [extract_package(i.attrib['comment']) for i in definition.findall(".//n:criterion", ns)
if 'is earlier than' in i.attrib['comment']]
res[sa_id] = SecurityAdvisory(
title=title, id=sa_id, packages=packages)
return res
def parse_errata(fpath: str) -> Dict[str, SecurityAdvisory]:
"""
parses alma errata file and converts it to dict of SA instances
"""
with open(fpath, 'r', encoding='utf-8') as file_to_load:
erratas = json.load(file_to_load)
res = {}
for errata in erratas['data']:
title = errata['title']
sa_id = errata['id'].split('-')[-1]
packages = []
for package in errata['packages']:
full_name = f"{package['name']}-{package['version']}"
if full_name not in packages:
packages.append(full_name)
packages.sort()
res[sa_id] = SecurityAdvisory(
title=title, id=sa_id, packages=packages)
return res
def compare(rhel_oval: Dict[str, SecurityAdvisory],
alma_oval: Dict[str, SecurityAdvisory],
alma_errata: Dict[str, SecurityAdvisory]) -> Tuple[dict, list]:
"""
compares rhel oval with alma oval and alma errata
"""
diff = []
report = {
# total amount of security advisories
'total_sa_count': 0,
# amount of SA that match with rhel
'good_sa_count': 0,
# total amount of differencies
'diff_count': 0,
# list of SA excluded from diff check
'excluded_sa': [],
# list of packages excluded from diff check
'excluded_pkg': [],
# amount of oval SA that dont exists in oval file
'oval_missing_sa_count': 0,
# amount of oval SA that have missing packages
'oval_missing_pkg_sa_count': 0,
# list of missing oval SA
'oval_missing_sa': [],
# list of oval SA that have missing packages
'oval_missing_pkg_sa': [],
# amount of SA that dont exists in errata file
'errata_missing_sa_count': 0,
# amount of errata SA that have missing packages
'errata_missing_pkg_sa_count': 0,
# list of SA that are missing in errata file
'errata_missing_sa': [],
# list of errata SA with missing packages
'errata_missing_pkg_sa': [],
# total amount of unique missing packages across all alma SA
'missing_packages_unique_count': 0,
# list of unique packages that missing across all alma SA
'missing_packages_unique': []
}
for rhel_sa_id, rhel_sa in rhel_oval.items():
report['total_sa_count'] += 1
sa_name = f'ALSA-{rhel_sa_id}'
# filtering out SA
if sa_name in SA_EXCLUDE:
report['excluded_sa'].append(sa_name)
continue
# filtefing out packages
packages_to_check: List[Package] = []
for p in rhel_sa.packages:
if any(p.name == i for i in PACKAGES_EXCLUDE):
if str(p) not in report['excluded_pkg']:
report['excluded_pkg'].append(str(p))
else:
packages_to_check.append(p)
# check oval
try:
alma_oval_sa = alma_oval[rhel_sa_id]
except KeyError:
report['diff_count'] += 1
diff.append({'sa_name': sa_name, 'diff': 'SA is missing in oval'})
report['oval_missing_sa'].append(sa_name)
report['oval_missing_sa_count'] += 1
else:
# check if some packages are missing from oval SA
alma_oval_packages = alma_oval_sa.packages
alma_oval_missing_packages = [str(r) for r in packages_to_check
if r not in alma_oval_packages]
if alma_oval_missing_packages:
report['diff_count'] += 1
diff.append({'sa_name': sa_name,
'diff': f"missing packages in oval SA: {','.join(alma_oval_missing_packages)}"})
report['oval_missing_pkg_sa'].append(sa_name)
report['oval_missing_pkg_sa_count'] += 1
for mp in alma_oval_missing_packages:
if mp not in report['missing_packages_unique']:
report['missing_packages_unique'].append(mp)
report['missing_packages_unique_count'] += 1
# check errata
try:
alma_errata_sa = alma_errata[rhel_sa_id]
except KeyError:
report['errata_missing_sa'].append(sa_name)
report['errata_missing_sa_count'] += 1
report['diff_count'] += 1
diff.append(
{'sa_name': sa_name, 'diff': 'SA is missing in errata'})
continue
# check if some packages are missing from errata SA
alma_errata_packages = alma_errata_sa.packages
alma_errata_missing_packages = [
str(r) for r in packages_to_check if r not in alma_errata_packages]
if alma_errata_missing_packages:
report['diff_count'] += 1
diff.append({'sa_name': sa_name,
'diff': f"missing packages in errata SA: {','.join(alma_errata_missing_packages)}"})
report['errata_missing_pkg_sa'].append(sa_name)
report['errata_missing_pkg_sa_count'] += 1
for mp in alma_errata_missing_packages:
if mp not in report['missing_packages_unique']:
report['missing_packages_unique'].append(mp)
report['missing_packages_unique_count'] += 1
else:
# if we here, all checks were passed
report['good_sa_count'] += 1
for item in report.values():
if isinstance(item, list):
item.sort()
return report, diff
# starting point
def comparer_run() -> Dict[str, Any]:
result = {}
for release, urls in RELEASES.items():
logging.info('Processing release %i', release)
logging.info('downloading rhel oval')
rhel_file = download_oval(urls['rhel_oval_url'])
logging.info('parsing rhel oval')
rhel_oval_dict = parse_oval(rhel_file)
logging.info('downloading alma oval')
alma_oval_file = download_oval(urls['alma_oval_url'])
logging.info('parsing alma oval')
alma_oval_dict = parse_oval(alma_oval_file)
logging.info('downloading alma errata')
alma_errata_file = download_errata(urls['alma_errata_url'], release)
logging.info('parsing alma errata')
alma_errata_dict = parse_errata(alma_errata_file)
logging.info('comparing rhel and alma')
report_release, diff_release = compare(
rhel_oval_dict, alma_oval_dict, alma_errata_dict)
result[release] = {'report': report_release,
'diff': diff_release,
'rhel_oval_url': urls['rhel_oval_url'],
'alma_oval_url': urls['alma_oval_url'],
'alma_errata_url': urls['alma_errata_url']}
result['report_generated'] = datetime.datetime.now().timestamp() * 1000
result['sa_not_before'] = NOT_BEFORE.timestamp() * 1000
return result

View File

@ -0,0 +1,21 @@
from pathlib import Path
import datetime
RELEASES = {
8: {'rhel_oval_url': 'https://www.redhat.com/security/data/oval/v2/RHEL8/rhel-8.oval.xml.bz2',
'alma_oval_url': 'https://repo.almalinux.org/security/oval/org.almalinux.alsa-8.xml.bz2',
'alma_errata_url': "https://errata.almalinux.org/8/errata.full.json", },
9: {'rhel_oval_url': 'https://www.redhat.com/security/data/oval/v2/RHEL9/rhel-9.oval.xml.bz2',
'alma_oval_url': 'https://repo.almalinux.org/security/oval/org.almalinux.alsa-9.xml.bz2',
'alma_errata_url': "https://errata.almalinux.org/9/errata.full.json", }
}
LOG_FILE = Path('logs/albs-oval-errata-diff.log')
DIFF_FILE = Path('results/diff.json')
DOWNLOAD_DIR = Path('/tmp')
# not checking anything before RHEL-9.0 release
NOT_BEFORE = datetime.datetime(2022, 5, 18)
UPDATE_INTERVAL_MINUTES = 30
SERVER_PORT = 3001
SERVER_IP = "127.0.0.1"
SA_EXCLUDE = []
PACKAGES_EXCLUDE = ["dotnet-sdk-3.1-source-built-artifacts"]

View File

@ -0,0 +1,14 @@
from dataclasses import dataclass
from typing import List
@dataclass
class Package:
"""
Package represents RPM package exstracted from RHEL OVAL
"""
name: str
version: str
def __str__(self):
return f"{self.name}-{self.version}"

View File

@ -0,0 +1,15 @@
from dataclasses import dataclass
from typing import List
from .package import Package
@dataclass
class SecurityAdvisory:
"""
SecurityAdvisory represents Security advisory deffition extracted
from oval or errata
"""
title: str
id: str
packages: List[Package]

View File

@ -0,0 +1,98 @@
"""
service compares rhel oval with alma ovals and errata ovals
results available via API Call
"""
from aiohttp import web
import copy
import logging
import threading
from time import sleep
import json
from .config import LOG_FILE, DIFF_FILE, UPDATE_INTERVAL_MINUTES, SERVER_IP, SERVER_PORT
from .comparer import comparer_run
# This dict holds all current differentes
diffs = {}
diffs_lock = threading.Lock()
async def web_handler(request):
data = {}
try:
diffs_lock.acquire()
data = copy.deepcopy(diffs)
diffs_lock.release()
except Exception as e:
logging.critical("Unhandled exeption %s", e, exc_info=True)
return web.json_response(data=data)
def webserver_run():
app = web.Application()
app.add_routes([web.get('/', web_handler)])
web.run_app(app=app, host=SERVER_IP, port=SERVER_PORT)
def diff_checker():
while True:
logging.info("Start comparing")
# generating new diff
try:
result = comparer_run()
except Exception as e:
logging.critical("Unhandled exeption %s", e, exc_info=True)
else:
logging.info("Finished comparing, updating diff dict")
diffs_lock.acquire()
global diffs
diffs = result
diffs_lock.release()
# dumping
logging.info("Saving results to disk")
try:
with open(DIFF_FILE, 'w', encoding='utf-8') as flw:
json.dump(result, flw, indent=4)
except Exception as e:
logging.critical("Unhandled exeption %s", e, exc_info=True)
logging.info("Done")
logging.info("Finished comparing, go to sleep for %d minutes",
UPDATE_INTERVAL_MINUTES)
sleep(UPDATE_INTERVAL_MINUTES * 60)
def start():
# making sure that directory exists
for p in [LOG_FILE, DIFF_FILE]:
if not p.parent.exists():
p.parent.mkdir()
logging.basicConfig(level=logging.INFO,
format='%(asctime)s %(levelname)s %(funcName)s %(message)s',
handlers=[logging.FileHandler(LOG_FILE, mode='a'),
logging.StreamHandler()])
logging.info("Trying to load diff file from disk")
try:
with open(DIFF_FILE, 'r', encoding='utf-8') as flr:
loaded_data = json.load(flr)
diffs_lock.acquire()
diffs = loaded_data
diffs_lock.release()
except Exception as e:
logging.warning('cant load data from disk %s', e)
else:
logging.info('diff file was loaded')
logging.info("Starting diff_checker in background")
thread = threading.Thread(target=diff_checker)
thread.daemon = True
thread.start()
logging.info("Starting webserver")
webserver_run()
if __name__ == "__main__":
start()