Merge pull request 'ALBS-915: add support for Bug/Enhancement Advisories' (#2) from ALBS-915 into main

Reviewed-on: #2
This commit is contained in:
kzhukov 2023-01-16 13:38:05 +00:00
commit 3a40df50b4
9 changed files with 130 additions and 99 deletions

View File

@ -0,0 +1,25 @@
"""
advisory contains Advisory dataclass definition
"""
from dataclasses import dataclass
from typing import List
from .package import Package
@dataclass
class Advisory:
"""
Represents Secutity/Bug/Enhancment advisory deffition extracted
from oval or errata
Params
------
title: RHBA-2022:5749: .NET 6.0 bugfix update (Moderate)
advisory_type: RHBA
id: 2022-5749
"""
title: str
advisory_type: str
id: str # pylint: disable=invalid-name
packages: List[Package]

View File

@ -15,7 +15,7 @@ import requests
from .config import Config from .config import Config
from .package import Package from .package import Package
from .sa import SecurityAdvisory from .advisory import Advisory
def download_oval(url: str, download_dir: Path) -> str: def download_oval(url: str, download_dir: Path) -> str:
@ -46,9 +46,21 @@ def download_errata(url: str, release_version: int, download_dir: Path) -> str:
return fpath return fpath
def parse_oval(fpath: str, not_before: datetime.datetime) -> Dict[str, SecurityAdvisory]: def extract_id_and_type(string: str) -> Tuple[str, str]:
""" """
converting oval xml file to dict Extracts advisory id and advisory type from OVAL title or errata id
Example:
oval: "RHSA-2022:5749: .NET 6.0 bugfix update (Moderate)" -> (2022:5749, SA)
errata: ALSA-2022:6165 -> (id=2022:6165, SA)
"""
regexp = r'((RH|AL)(SA|BA|EA))-(\d{4}:\d+)'
res = re.search(regexp, string)
return res.group(4), res.group(3)
def parse_oval(fpath: str, not_before: datetime.datetime) -> Dict[str, Advisory]:
"""
Converts OVAL XML file to Dict of Advisories
""" """
def extract_package(title: str) -> Package: def extract_package(title: str) -> Package:
@ -58,11 +70,6 @@ def parse_oval(fpath: str, not_before: datetime.datetime) -> Dict[str, SecurityA
version = res.group(2) version = res.group(2)
return Package(name=name, version=version) return Package(name=name, version=version)
def extract_id(title: str) -> str:
regexp = r'[RH|AL]SA-(\d{4}:\d+)(.*)'
res = re.search(regexp, title)
return res.group(1)
tree = ET.parse(fpath) tree = ET.parse(fpath)
root = tree.getroot() root = tree.getroot()
namespase = { namespase = {
@ -76,43 +83,57 @@ def parse_oval(fpath: str, not_before: datetime.datetime) -> Dict[str, SecurityA
'n:metadata/n:advisory/n:issued', namespase).attrib['date'] 'n:metadata/n:advisory/n:issued', namespase).attrib['date']
issued_dt = datetime.datetime.strptime(issued, "%Y-%m-%d") issued_dt = datetime.datetime.strptime(issued, "%Y-%m-%d")
# we are only interesed in Security advisories after RHEL 8.3 # we are only interested in RHEL/OVAL SA/BA/EA
if ('RHSA' not in title and 'ALSA' not in title) or issued_dt < not_before: # released after RHEL 8.3
if not re.match(r'((RH|AL)(SA|BA|EA))', title) or issued_dt < not_before:
continue continue
sa_id = extract_id(title)
# we are only interested in security based advisories
severity = definition.find(
'n:metadata/n:advisory/n:severity', namespase)
if severity is None:
continue
if severity.text.lower() not in ['low', 'moderate', 'important', 'critical']:
continue
advisory_id, advisory_type = extract_id_and_type(title)
packages = [extract_package(i.attrib['comment']) for packages = [extract_package(i.attrib['comment']) for
i in definition.findall(".//n:criterion", namespase) i in definition.findall(".//n:criterion", namespase)
if 'is earlier than' in i.attrib['comment']] if 'is earlier than' in i.attrib['comment']]
res[sa_id] = SecurityAdvisory( res[advisory_id] = Advisory(title=title, id=advisory_id,
title=title, id=sa_id, packages=packages) advisory_type=advisory_type,
packages=packages)
return res return res
def parse_errata(fpath: str) -> Dict[str, SecurityAdvisory]: def parse_errata(fpath: str) -> Dict[str, Advisory]:
""" """
parses alma errata file and converts it to dict of SA instances Parses Alma Errata file and converts it to dict of Advisory instances
""" """
with open(fpath, 'r', encoding='utf-8') as file_to_load: with open(fpath, 'r', encoding='utf-8') as file_to_load:
erratas = json.load(file_to_load) erratas = json.load(file_to_load)
res = {} res = {}
for errata in erratas['data']: for errata in erratas['data']:
title = errata['title'] title = errata['title']
sa_id = errata['id'].split('-')[-1] advisory_id, advisory_type = extract_id_and_type(errata['id'])
packages = [] packages = []
for package in errata['packages']: for package in errata['packages']:
full_name = f"{package['name']}-{package['version']}" full_name = f"{package['name']}-{package['version']}"
if full_name not in packages: if full_name not in packages:
packages.append(full_name) packages.append(full_name)
packages.sort() packages.sort()
res[sa_id] = SecurityAdvisory( res[advisory_id] = Advisory(title=title,
title=title, id=sa_id, packages=packages) id=advisory_id,
advisory_type=advisory_type,
packages=packages)
return res return res
def compare(rhel_oval: Dict[str, SecurityAdvisory], def compare(rhel_oval: Dict[str, Advisory],
alma_oval: Dict[str, SecurityAdvisory], alma_oval: Dict[str, Advisory],
alma_errata: Dict[str, SecurityAdvisory], alma_errata: Dict[str, Advisory],
sa_exclude: List[str], advisory_exclude: List[str],
packages_exclude: List[str]) -> Tuple[dict, list]: packages_exclude: List[str]) -> Tuple[dict, list]:
""" """
compares rhel oval with alma oval and alma errata compares rhel oval with alma oval and alma errata
@ -120,49 +141,49 @@ def compare(rhel_oval: Dict[str, SecurityAdvisory],
diff = [] diff = []
report = { report = {
# total amount of security advisories # total amount of security advisories
'total_sa_count': 0, 'total_advisory_count': 0,
# amount of SA that match with rhel # amount of ALMA advisory that match with RHEL
'good_sa_count': 0, 'good_advisory_count': 0,
# total amount of differencies # total amount of differencies
'diff_count': 0, 'diff_count': 0,
# list of SA excluded from diff check # list of advisories excluded from diff check
'excluded_sa': [], 'excluded_adv': [],
# list of packages excluded from diff check # list of packages excluded from diff check
'excluded_pkg': [], 'excluded_pkg': [],
# amount of oval SA that dont exists in oval file # amount of oval advisories that dont exists in oval file
'oval_missing_sa_count': 0, 'oval_missing_advisory_count': 0,
# amount of oval SA that have missing packages # amount of oval advisories that have missing packages
'oval_missing_pkg_sa_count': 0, 'oval_missing_pkg_advisory_count': 0,
# list of missing oval SA # list of missing oval advisories
'oval_missing_sa': [], 'oval_missing_advisory': [],
# list of oval SA that have missing packages # list of oval advisories that have missing packages
'oval_missing_pkg_sa': [], 'oval_missing_pkg_advisory': [],
# amount of SA that dont exists in errata file # amount of advisories that dont exists in errata file
'errata_missing_sa_count': 0, 'errata_missing_advisory_count': 0,
# amount of errata SA that have missing packages # amount of errata advisories that have missing packages
'errata_missing_pkg_sa_count': 0, 'errata_missing_pkg_advisory_count': 0,
# list of SA that are missing in errata file # list of advisories that are missing in errata file
'errata_missing_sa': [], 'errata_missing_advisory': [],
# list of errata SA with missing packages # list of errata advisories with missing packages
'errata_missing_pkg_sa': [], 'errata_missing_pkg_advisory': [],
# total amount of unique missing packages across all alma SA # total amount of unique missing packages across all alma SA
'missing_packages_unique_count': 0, 'missing_packages_unique_count': 0,
# list of unique packages that missing across all alma SA # list of unique packages that missing across all alma SA
'missing_packages_unique': [] 'missing_packages_unique': []
} }
for rhel_sa_id, rhel_sa in rhel_oval.items(): for rhel_advisory_id, rhel_advisory in rhel_oval.items():
report['total_sa_count'] += 1 report['total_advisory_count'] += 1
sa_name = f'ALSA-{rhel_sa_id}' advisory_name = f'AL{rhel_advisory.advisory_type}-{rhel_advisory_id}'
# filtering out SA # filtering out advisories
if sa_name in sa_exclude: if advisory_name in advisory_exclude:
report['excluded_sa'].append(sa_name) report['excluded_advisory'].append(advisory_name)
continue continue
# filtefing out packages # filtefing out packages
packages_to_check: List[Package] = [] packages_to_check: List[Package] = []
for package in rhel_sa.packages: for package in rhel_advisory.packages:
if any(package.name == i for i in packages_exclude): if any(package.name == i for i in packages_exclude):
if str(package) not in report['excluded_pkg']: if str(package) not in report['excluded_pkg']:
report['excluded_pkg'].append(str(package)) report['excluded_pkg'].append(str(package))
@ -171,24 +192,25 @@ def compare(rhel_oval: Dict[str, SecurityAdvisory],
# check oval # check oval
try: try:
alma_oval_sa = alma_oval[rhel_sa_id] alma_oval_advisory = alma_oval[rhel_advisory_id]
except KeyError: except KeyError:
report['diff_count'] += 1 report['diff_count'] += 1
diff.append({'sa_name': sa_name, 'diff': 'SA is missing in oval'}) diff.append({'advisory_name': advisory_name,
report['oval_missing_sa'].append(sa_name) 'diff': 'Advisory is missing in OVAL'})
report['oval_missing_sa_count'] += 1 report['oval_missing_advisory'].append(advisory_name)
report['oval_missing_advisory_count'] += 1
else: else:
# check if some packages are missing from oval SA # check if some packages are missing from OVAL advisories
alma_oval_packages = alma_oval_sa.packages alma_oval_packages = alma_oval_advisory.packages
alma_oval_missing_packages = [str(r) for r in packages_to_check alma_oval_missing_packages = [str(r) for r in packages_to_check
if str(r) not in [str(i) for i in alma_oval_packages]] if str(r) not in [str(i) for i in alma_oval_packages]]
if alma_oval_missing_packages: if alma_oval_missing_packages:
report['diff_count'] += 1 report['diff_count'] += 1
diff_str = f"missing packages in oval SA: {','.join(alma_oval_missing_packages)}" diff_str = f"OVAL advisory has missing packages: {','.join(alma_oval_missing_packages)}"
diff.append({'sa_name': sa_name, diff.append({'advisory_name': advisory_name,
'diff': diff_str}) 'diff': diff_str})
report['oval_missing_pkg_sa'].append(sa_name) report['oval_missing_pkg_advisory'].append(advisory_name)
report['oval_missing_pkg_sa_count'] += 1 report['oval_missing_pkg_advisory_count'] += 1
for missing_package in alma_oval_missing_packages: for missing_package in alma_oval_missing_packages:
if missing_package not in report['missing_packages_unique']: if missing_package not in report['missing_packages_unique']:
report['missing_packages_unique'].append( report['missing_packages_unique'].append(
@ -197,13 +219,13 @@ def compare(rhel_oval: Dict[str, SecurityAdvisory],
# check errata # check errata
try: try:
alma_errata_sa = alma_errata[rhel_sa_id] alma_errata_sa = alma_errata[rhel_advisory_id]
except KeyError: except KeyError:
report['errata_missing_sa'].append(sa_name) report['errata_missing_advisory'].append(advisory_name)
report['errata_missing_sa_count'] += 1 report['errata_missing_advisory_count'] += 1
report['diff_count'] += 1 report['diff_count'] += 1
diff.append( diff.append(
{'sa_name': sa_name, 'diff': 'SA is missing in errata'}) {'advisory_name': advisory_name, 'diff': 'Advisory is missing in Errata'})
continue continue
# check if some packages are missing from errata SA # check if some packages are missing from errata SA
alma_errata_packages = alma_errata_sa.packages alma_errata_packages = alma_errata_sa.packages
@ -212,18 +234,18 @@ def compare(rhel_oval: Dict[str, SecurityAdvisory],
if str(r) not in [str(i) for i in alma_errata_packages]] if str(r) not in [str(i) for i in alma_errata_packages]]
if alma_errata_missing_packages: if alma_errata_missing_packages:
report['diff_count'] += 1 report['diff_count'] += 1
diff_str = f"missing packages in errata SA: {','.join(alma_errata_missing_packages)}" diff_str = f"Errata advisory has missing packages: {','.join(alma_errata_missing_packages)}"
diff.append({'sa_name': sa_name, diff.append({'advisory_name': advisory_name,
'diff': diff_str}) 'diff': diff_str})
report['errata_missing_pkg_sa'].append(sa_name) report['errata_missing_pkg_advisory'].append(advisory_name)
report['errata_missing_pkg_sa_count'] += 1 report['errata_missing_pkg_advisory_count'] += 1
for missing_package in alma_errata_missing_packages: for missing_package in alma_errata_missing_packages:
if missing_package not in report['missing_packages_unique']: if missing_package not in report['missing_packages_unique']:
report['missing_packages_unique'].append(missing_package) report['missing_packages_unique'].append(missing_package)
report['missing_packages_unique_count'] += 1 report['missing_packages_unique_count'] += 1
else: else:
# if we here, all checks were passed # if we here, all checks were passed
report['good_sa_count'] += 1 report['good_advisory_count'] += 1
for item in report.values(): for item in report.values():
if isinstance(item, list): if isinstance(item, list):
@ -262,7 +284,7 @@ def comparer_run(config: Config) -> Dict[str, Any]:
compare(rhel_oval_dict, compare(rhel_oval_dict,
alma_oval_dict, alma_oval_dict,
alma_errata_dict, alma_errata_dict,
config.sa_exclude, config.advisory_exclude,
config.packages_exclude) config.packages_exclude)
result[release] = {'report': report_release, result[release] = {'report': report_release,
'diff': diff_release, 'diff': diff_release,
@ -271,6 +293,6 @@ def comparer_run(config: Config) -> Dict[str, Any]:
'alma_errata_url': urls.alma_errata_url} 'alma_errata_url': urls.alma_errata_url}
result['report_generated'] = datetime.datetime.now().timestamp() * 1000 result['report_generated'] = datetime.datetime.now().timestamp() * 1000
result['sa_not_before'] = config.not_before.timestamp() * 1000 result['advisory_not_before'] = config.not_before.timestamp() * 1000
return result return result

View File

@ -16,7 +16,7 @@ DIFF_FILE = Path('/tmp/albs-oval-errata-diff.json')
DOWNLOAD_DIR = Path('/tmp') DOWNLOAD_DIR = Path('/tmp')
LOG_FILE = Path('logs/albs-oval-errata-diff.log') LOG_FILE = Path('logs/albs-oval-errata-diff.log')
PACKAGES_EXCLUDE = [] PACKAGES_EXCLUDE = []
SA_EXCLUDE = [] ADVISORY_EXCLUDE = []
SERVER_PORT = 3001 SERVER_PORT = 3001
SERVER_IP = IPv4Address('127.0.0.1') SERVER_IP = IPv4Address('127.0.0.1')
# not checking anything before RHEL-9.0 release # not checking anything before RHEL-9.0 release
@ -50,9 +50,9 @@ class Config(BaseModel):
default=PACKAGES_EXCLUDE) default=PACKAGES_EXCLUDE)
releases: Dict[int, ReleaseUrls] = Field( releases: Dict[int, ReleaseUrls] = Field(
description='list of OS releases with Oval/Errata URLs to check') description='list of OS releases with Oval/Errata URLs to check')
sa_exclude: List[str] = Field( advisory_exclude: List[str] = Field(
description='list of Security Advisory IDs (ALSA-2022:5219) to exclude from checking', description='list of Security Advisory IDs (ALSA-2022:5219) to exclude from checking',
default=SA_EXCLUDE) default=ADVISORY_EXCLUDE)
server_port: int = Field( server_port: int = Field(
description='port that will be used by websever', description='port that will be used by websever',
default=SERVER_PORT) default=SERVER_PORT)

View File

@ -7,7 +7,7 @@ from dataclasses import dataclass
@dataclass @dataclass
class Package: class Package:
""" """
Package represents RPM package exstracted from RHEL OVAL Package represents RPM package extracted from RHEL/ALMA OVAL/Errata files
""" """
name: str name: str
version: str version: str

View File

@ -1,18 +0,0 @@
"""
sa contains SecurityAdvisory dataclass definition
"""
from dataclasses import dataclass
from typing import List
from .package import Package
@dataclass
class SecurityAdvisory:
"""
SecurityAdvisory represents Security advisory deffition extracted
from oval or errata
"""
title: str
id: str # pylint: disable=invalid-name
packages: List[Package]

View File

@ -16,7 +16,7 @@ from .config import get_config, Config
from .comparer import comparer_run from .comparer import comparer_run
# This dict holds all current differentes # This dict holds all current differences
diffs = {} diffs = {}
diffs_lock = threading.Lock() diffs_lock = threading.Lock()

View File

@ -37,11 +37,11 @@ releases:
alma_oval_url: https://repo.almalinux.org/security/oval/org.almalinux.alsa-9.xml.bz2 alma_oval_url: https://repo.almalinux.org/security/oval/org.almalinux.alsa-9.xml.bz2
alma_errata_url: https://errata.almalinux.org/9/errata.full.json alma_errata_url: https://errata.almalinux.org/9/errata.full.json
# sa_exclude # advisory_exclude
# list of Security Advisory IDs (ALSA-2022:5219) to exclude from checking # list of advisory IDs (ALSA-2022:5219) to exclude from checking
# requred: no # requred: no
# default: [] # default: []
sa_exclude: [] advisory_exclude: []
# server_port # server_port
# port that will be used by websever # port that will be used by websever

View File

@ -2,3 +2,5 @@
First version of service First version of service
2023-01-04 v1.0.1 2023-01-04 v1.0.1
Fixed missing packages false positives Fixed missing packages false positives
2023-01-12 v1.0.2
Added support for Bug/Enhancement Advisories