Compare commits

...

10 Commits
v1.0.1 ... main

11 changed files with 270 additions and 109 deletions

3
.gitignore vendored
View File

@ -3,4 +3,5 @@ logs
results results
*.pyc *.pyc
__pycache__ __pycache__
.vscode .vscode
private*

View File

@ -30,7 +30,7 @@ check [config.default.yml](config.default.yml) for references
```bash ```bash
$ pip install -r requirements.txt $ pip install -r requirements.txt
``` ```
4. Create config file using [config.default.yml](config.default.yml) and start service with _albs_oval_errata_diff.py_ script 4. Create config file using [config.default.yml](config.default.yml) and start service with _albs_oval_errata_diff.py_ script
```bash ```bash
$ python albs_oval_errata_diff.py config.yml $ python albs_oval_errata_diff.py config.yml
2022-12-29 16:20:11,139 INFO start Trying to load diff file from disk 2022-12-29 16:20:11,139 INFO start Trying to load diff file from disk

View File

@ -0,0 +1,25 @@
"""
advisory contains Advisory dataclass definition
"""
from dataclasses import dataclass
from typing import List
from .package import Package
@dataclass
class Advisory:
"""
Represents Secutity/Bug/Enhancment advisory deffition extracted
from oval or errata
Params
------
title: RHBA-2022:5749: .NET 6.0 bugfix update (Moderate)
advisory_type: RHBA
id: 2022-5749
"""
title: str
advisory_type: str
id: str # pylint: disable=invalid-name
packages: List[Package]

View File

@ -0,0 +1,70 @@
"""
albs.py contains ALBS class
"""
from urllib.parse import urljoin
from typing import Union, Dict
import requests
class ALBS:
"""
ALBS class implemets buildsys.almalinux.org API interaction logic
"""
def __init__(self, url: str, token: str, timeout: int):
self.url = url
self.token = token
self.timeout = timeout
self._platforms = self._get_platforms()
def _get_platforms(self) -> Dict[str, int]:
'''
Getting list of all platforms and
return Dict: platform_name -> platform_id
'''
endpoint = '/api/v1/platforms/'
headers = {'accept': 'application/json',
'Authorization': f'Bearer {self.token}'}
response = requests.get(url=urljoin(self.url, endpoint),
headers=headers,
timeout=self.timeout)
response.raise_for_status()
res = {platform['name']: platform['id']
for platform in response.json()}
return res
def get_errata_status(self, errata_id: str, platform_name: str) -> Union[str, None]:
"""
Get release status for particular errata_id
Params
------
errata_id: str: errata id to get (ALSA-2023:0095)
Returns
-------
str: release status
If errata_id was not found Returns None
Raises
------
Any errors raised by requests libary
ValueError if platform_name not found in buildsys
"""
endpoint = '/api/v1/errata/query/'
# platformId
try:
platform_id = self._platforms[platform_name]
except KeyError as error:
raise ValueError(f'{platform_name} was not found') from error
params = {'id': errata_id, 'platformId': platform_id}
headers = {'accept': 'application/json',
'Authorization': f'Bearer {self.token}'}
response = requests.get(url=urljoin(self.url, endpoint),
params=params, headers=headers,
timeout=self.timeout)
response.raise_for_status()
response_json = response.json()
# errata_id was not found
if response_json['total_records'] == 0:
return
return response_json['records'][0]['release_status']

View File

@ -1,5 +1,5 @@
""" """
package comparer.py implemets difference checking logic module comparer.py implemets difference checking logic
""" """
import bz2 import bz2
@ -13,9 +13,10 @@ import xml.etree.ElementTree as ET
import requests import requests
from .advisory import Advisory
from .albs import ALBS
from .config import Config from .config import Config
from .package import Package from .package import Package
from .sa import SecurityAdvisory
def download_oval(url: str, download_dir: Path) -> str: def download_oval(url: str, download_dir: Path) -> str:
@ -46,9 +47,21 @@ def download_errata(url: str, release_version: int, download_dir: Path) -> str:
return fpath return fpath
def parse_oval(fpath: str, not_before: datetime.datetime) -> Dict[str, SecurityAdvisory]: def extract_id_and_type(string: str) -> Tuple[str, str]:
""" """
converting oval xml file to dict Extracts advisory id and advisory type from OVAL title or errata id
Example:
oval: "RHSA-2022:5749: .NET 6.0 bugfix update (Moderate)" -> (2022:5749, SA)
errata: ALSA-2022:6165 -> (id=2022:6165, SA)
"""
regexp = r'((RH|AL)(SA|BA|EA))-(\d{4}:\d+)'
res = re.search(regexp, string)
return res.group(4), res.group(3)
def parse_oval(fpath: str, not_before: datetime.datetime) -> Dict[str, Advisory]:
"""
Converts OVAL XML file to Dict of Advisories
""" """
def extract_package(title: str) -> Package: def extract_package(title: str) -> Package:
@ -58,11 +71,6 @@ def parse_oval(fpath: str, not_before: datetime.datetime) -> Dict[str, SecurityA
version = res.group(2) version = res.group(2)
return Package(name=name, version=version) return Package(name=name, version=version)
def extract_id(title: str) -> str:
regexp = r'[RH|AL]SA-(\d{4}:\d+)(.*)'
res = re.search(regexp, title)
return res.group(1)
tree = ET.parse(fpath) tree = ET.parse(fpath)
root = tree.getroot() root = tree.getroot()
namespase = { namespase = {
@ -76,93 +84,113 @@ def parse_oval(fpath: str, not_before: datetime.datetime) -> Dict[str, SecurityA
'n:metadata/n:advisory/n:issued', namespase).attrib['date'] 'n:metadata/n:advisory/n:issued', namespase).attrib['date']
issued_dt = datetime.datetime.strptime(issued, "%Y-%m-%d") issued_dt = datetime.datetime.strptime(issued, "%Y-%m-%d")
# we are only interesed in Security advisories after RHEL 8.3 # we are only interested in RHEL/OVAL SA/BA/EA
if ('RHSA' not in title and 'ALSA' not in title) or issued_dt < not_before: # released after RHEL 8.3
if not re.match(r'((RH|AL)(SA|BA|EA))', title) or issued_dt < not_before:
continue continue
sa_id = extract_id(title)
# we are only interested in security based advisories
severity = definition.find(
'n:metadata/n:advisory/n:severity', namespase)
if severity is None:
continue
if severity.text.lower() not in ['low', 'moderate', 'important', 'critical']:
continue
advisory_id, advisory_type = extract_id_and_type(title)
packages = [extract_package(i.attrib['comment']) for packages = [extract_package(i.attrib['comment']) for
i in definition.findall(".//n:criterion", namespase) i in definition.findall(".//n:criterion", namespase)
if 'is earlier than' in i.attrib['comment']] if 'is earlier than' in i.attrib['comment']]
res[sa_id] = SecurityAdvisory( res[advisory_id] = Advisory(title=title, id=advisory_id,
title=title, id=sa_id, packages=packages) advisory_type=advisory_type,
packages=packages)
return res return res
def parse_errata(fpath: str) -> Dict[str, SecurityAdvisory]: def parse_errata(fpath: str) -> Dict[str, Advisory]:
""" """
parses alma errata file and converts it to dict of SA instances Parses Alma Errata file and converts it to dict of Advisory instances
""" """
with open(fpath, 'r', encoding='utf-8') as file_to_load: with open(fpath, 'r', encoding='utf-8') as file_to_load:
erratas = json.load(file_to_load) erratas = json.load(file_to_load)
res = {} res = {}
for errata in erratas['data']: for errata in erratas['data']:
title = errata['title'] title = errata['title']
sa_id = errata['id'].split('-')[-1] advisory_id, advisory_type = extract_id_and_type(errata['id'])
packages = [] packages = []
for package in errata['packages']: for package in errata['packages']:
full_name = f"{package['name']}-{package['version']}" full_name = f"{package['name']}-{package['version']}"
if full_name not in packages: if full_name not in packages:
packages.append(full_name) packages.append(full_name)
packages.sort() packages.sort()
res[sa_id] = SecurityAdvisory( res[advisory_id] = Advisory(title=title,
title=title, id=sa_id, packages=packages) id=advisory_id,
advisory_type=advisory_type,
packages=packages)
return res return res
def compare(rhel_oval: Dict[str, SecurityAdvisory], def compare(rhel_oval: Dict[str, Advisory],
alma_oval: Dict[str, SecurityAdvisory], alma_oval: Dict[str, Advisory],
alma_errata: Dict[str, SecurityAdvisory], alma_errata: Dict[str, Advisory],
sa_exclude: List[str], advisory_exclude: List[str],
packages_exclude: List[str]) -> Tuple[dict, list]: packages_exclude: List[str],
albs: ALBS,
release: str) -> Tuple[dict, list]:
""" """
compares rhel oval with alma oval and alma errata compares rhel oval with alma oval and alma errata
""" """
diff = [] diff = []
report = { report = {
# total amount of security advisories # total amount of security advisories
'total_sa_count': 0, 'total_advisory_count': 0,
# amount of SA that match with rhel # amount of ALMA advisory that match with RHEL
'good_sa_count': 0, 'good_advisory_count': 0,
# total amount of differencies # total amount of differencies
'diff_count': 0, 'diff_count': 0,
# list of SA excluded from diff check # list of advisories excluded from diff check
'excluded_sa': [], 'excluded_adv': [],
# list of packages excluded from diff check # list of packages excluded from diff check
'excluded_pkg': [], 'excluded_pkg': [],
# amount of oval SA that dont exists in oval file # amount of oval advisories that dont exists in oval file
'oval_missing_sa_count': 0, 'oval_missing_advisory_count': 0,
# amount of oval SA that have missing packages # amount of oval advisories that have missing packages
'oval_missing_pkg_sa_count': 0, 'oval_missing_pkg_advisory_count': 0,
# list of missing oval SA # list of missing oval advisories
'oval_missing_sa': [], 'oval_missing_advisory': [],
# list of oval SA that have missing packages # list of oval advisories that have missing packages
'oval_missing_pkg_sa': [], 'oval_missing_pkg_advisory': [],
# amount of SA that dont exists in errata file # amount of advisories that dont exists in errata file
'errata_missing_sa_count': 0, 'errata_missing_advisory_count': 0,
# amount of errata SA that have missing packages # amount of errata advisories that have missing packages
'errata_missing_pkg_sa_count': 0, 'errata_missing_pkg_advisory_count': 0,
# list of SA that are missing in errata file # list of advisories that are missing in errata file
'errata_missing_sa': [], 'errata_missing_advisory': [],
# list of errata SA with missing packages # list of errata advisories with missing packages
'errata_missing_pkg_sa': [], 'errata_missing_pkg_advisory': [],
# total amount of unique missing packages across all alma SA # total amount of unique missing packages across all alma SA
'missing_packages_unique_count': 0, 'missing_packages_unique_count': 0,
# list of unique packages that missing across all alma SA # list of unique packages that missing across all alma SA
'missing_packages_unique': [] 'missing_packages_unique': [],
# contains errata release status from buildsystem
# this list populated for missing advisories only
'miss_adv_albs_errata_release_status': [],
} }
for rhel_sa_id, rhel_sa in rhel_oval.items(): for rhel_advisory_id, rhel_advisory in rhel_oval.items():
report['total_sa_count'] += 1 report['total_advisory_count'] += 1
sa_name = f'ALSA-{rhel_sa_id}' advisory_name = f'AL{rhel_advisory.advisory_type}-{rhel_advisory_id}'
# filtering out SA # filtering out advisories
if sa_name in sa_exclude: if advisory_name in advisory_exclude:
report['excluded_sa'].append(sa_name) report['excluded_advisory'].append(advisory_name)
continue continue
# filtefing out packages # filtefing out packages
packages_to_check: List[Package] = [] packages_to_check: List[Package] = []
for package in rhel_sa.packages: for package in rhel_advisory.packages:
if any(package.name == i for i in packages_exclude): if any(package.name == i for i in packages_exclude):
if str(package) not in report['excluded_pkg']: if str(package) not in report['excluded_pkg']:
report['excluded_pkg'].append(str(package)) report['excluded_pkg'].append(str(package))
@ -171,24 +199,25 @@ def compare(rhel_oval: Dict[str, SecurityAdvisory],
# check oval # check oval
try: try:
alma_oval_sa = alma_oval[rhel_sa_id] alma_oval_advisory = alma_oval[rhel_advisory_id]
except KeyError: except KeyError:
report['diff_count'] += 1 report['diff_count'] += 1
diff.append({'sa_name': sa_name, 'diff': 'SA is missing in oval'}) diff.append({'advisory_name': advisory_name,
report['oval_missing_sa'].append(sa_name) 'diff': 'Advisory is missing in OVAL'})
report['oval_missing_sa_count'] += 1 report['oval_missing_advisory'].append(advisory_name)
report['oval_missing_advisory_count'] += 1
else: else:
# check if some packages are missing from oval SA # check if some packages are missing from OVAL advisories
alma_oval_packages = alma_oval_sa.packages alma_oval_packages = alma_oval_advisory.packages
alma_oval_missing_packages = [str(r) for r in packages_to_check alma_oval_missing_packages = [str(r) for r in packages_to_check
if str(r) not in [str(i) for i in alma_oval_packages]] if str(r) not in [str(i) for i in alma_oval_packages]]
if alma_oval_missing_packages: if alma_oval_missing_packages:
report['diff_count'] += 1 report['diff_count'] += 1
diff_str = f"missing packages in oval SA: {','.join(alma_oval_missing_packages)}" diff_str = f"OVAL advisory has missing packages: {','.join(alma_oval_missing_packages)}"
diff.append({'sa_name': sa_name, diff.append({'advisory_name': advisory_name,
'diff': diff_str}) 'diff': diff_str})
report['oval_missing_pkg_sa'].append(sa_name) report['oval_missing_pkg_advisory'].append(advisory_name)
report['oval_missing_pkg_sa_count'] += 1 report['oval_missing_pkg_advisory_count'] += 1
for missing_package in alma_oval_missing_packages: for missing_package in alma_oval_missing_packages:
if missing_package not in report['missing_packages_unique']: if missing_package not in report['missing_packages_unique']:
report['missing_packages_unique'].append( report['missing_packages_unique'].append(
@ -197,13 +226,13 @@ def compare(rhel_oval: Dict[str, SecurityAdvisory],
# check errata # check errata
try: try:
alma_errata_sa = alma_errata[rhel_sa_id] alma_errata_sa = alma_errata[rhel_advisory_id]
except KeyError: except KeyError:
report['errata_missing_sa'].append(sa_name) report['errata_missing_advisory'].append(advisory_name)
report['errata_missing_sa_count'] += 1 report['errata_missing_advisory_count'] += 1
report['diff_count'] += 1 report['diff_count'] += 1
diff.append( diff.append(
{'sa_name': sa_name, 'diff': 'SA is missing in errata'}) {'advisory_name': advisory_name, 'diff': 'Advisory is missing in Errata'})
continue continue
# check if some packages are missing from errata SA # check if some packages are missing from errata SA
alma_errata_packages = alma_errata_sa.packages alma_errata_packages = alma_errata_sa.packages
@ -212,22 +241,36 @@ def compare(rhel_oval: Dict[str, SecurityAdvisory],
if str(r) not in [str(i) for i in alma_errata_packages]] if str(r) not in [str(i) for i in alma_errata_packages]]
if alma_errata_missing_packages: if alma_errata_missing_packages:
report['diff_count'] += 1 report['diff_count'] += 1
diff_str = f"missing packages in errata SA: {','.join(alma_errata_missing_packages)}" mp_string = ','.join(alma_errata_missing_packages)
diff.append({'sa_name': sa_name, diff_str = f"Errata advisory has missing packages: {mp_string}"
diff.append({'advisory_name': advisory_name,
'diff': diff_str}) 'diff': diff_str})
report['errata_missing_pkg_sa'].append(sa_name) report['errata_missing_pkg_advisory'].append(advisory_name)
report['errata_missing_pkg_sa_count'] += 1 report['errata_missing_pkg_advisory_count'] += 1
for missing_package in alma_errata_missing_packages: for missing_package in alma_errata_missing_packages:
if missing_package not in report['missing_packages_unique']: if missing_package not in report['missing_packages_unique']:
report['missing_packages_unique'].append(missing_package) report['missing_packages_unique'].append(missing_package)
report['missing_packages_unique_count'] += 1 report['missing_packages_unique_count'] += 1
else: else:
# if we here, all checks were passed # if we here, all checks were passed
report['good_sa_count'] += 1 report['good_advisory_count'] += 1
for item in report.values(): # albs errata flow
if isinstance(item, list): logging.info('Getting errata release status for missing advisories')
item.sort() missing_advisories = report['errata_missing_advisory'] + \
report['oval_missing_advisory']
missing_advisories = list(dict.fromkeys(missing_advisories))
for adv in missing_advisories:
try:
release_status = albs.get_errata_status(
adv, f'AlmaLinux-{release}')
except Exception as err: # pylint: disable=broad-except
logging.error("cant get release status for %s: %s", adv, err)
continue
if release_status is None:
release_status = 'not-found-in-errata-flow'
report['miss_adv_albs_errata_release_status'].append(
{"advisory": adv, "release_status": release_status})
return report, diff return report, diff
@ -258,12 +301,17 @@ def comparer_run(config: Config) -> Dict[str, Any]:
alma_errata_dict = parse_errata(alma_errata_file) alma_errata_dict = parse_errata(alma_errata_file)
logging.info('Comparing rhel and alma') logging.info('Comparing rhel and alma')
report_release, diff_release = \ albs = ALBS(config.albs_url,
config.albs_jwt_token,
config.albs_timeout)
report_release, diff_release =\
compare(rhel_oval_dict, compare(rhel_oval_dict,
alma_oval_dict, alma_oval_dict,
alma_errata_dict, alma_errata_dict,
config.sa_exclude, config.advisory_exclude,
config.packages_exclude) config.packages_exclude,
albs, release)
result[release] = {'report': report_release, result[release] = {'report': report_release,
'diff': diff_release, 'diff': diff_release,
'rhel_oval_url': urls.rhel_oval_url, 'rhel_oval_url': urls.rhel_oval_url,
@ -271,6 +319,6 @@ def comparer_run(config: Config) -> Dict[str, Any]:
'alma_errata_url': urls.alma_errata_url} 'alma_errata_url': urls.alma_errata_url}
result['report_generated'] = datetime.datetime.now().timestamp() * 1000 result['report_generated'] = datetime.datetime.now().timestamp() * 1000
result['sa_not_before'] = config.not_before.timestamp() * 1000 result['advisory_not_before'] = config.not_before.timestamp() * 1000
return result return result

View File

@ -16,12 +16,14 @@ DIFF_FILE = Path('/tmp/albs-oval-errata-diff.json')
DOWNLOAD_DIR = Path('/tmp') DOWNLOAD_DIR = Path('/tmp')
LOG_FILE = Path('logs/albs-oval-errata-diff.log') LOG_FILE = Path('logs/albs-oval-errata-diff.log')
PACKAGES_EXCLUDE = [] PACKAGES_EXCLUDE = []
SA_EXCLUDE = [] ADVISORY_EXCLUDE = []
SERVER_PORT = 3001 SERVER_PORT = 3001
SERVER_IP = IPv4Address('127.0.0.1') SERVER_IP = IPv4Address('127.0.0.1')
# not checking anything before RHEL-9.0 release # not checking anything before RHEL-9.0 release
NOT_BEFORE = datetime(2022, 5, 18) NOT_BEFORE = datetime(2022, 5, 18)
UPDATE_INTERVAL_MINUTES = 30 UPDATE_INTERVAL_MINUTES = 30
ALBS_URL = 'https://build.almalinux.org'
ALBS_TIMEOUT = 30
class ReleaseUrls(BaseModel): class ReleaseUrls(BaseModel):
@ -50,9 +52,9 @@ class Config(BaseModel):
default=PACKAGES_EXCLUDE) default=PACKAGES_EXCLUDE)
releases: Dict[int, ReleaseUrls] = Field( releases: Dict[int, ReleaseUrls] = Field(
description='list of OS releases with Oval/Errata URLs to check') description='list of OS releases with Oval/Errata URLs to check')
sa_exclude: List[str] = Field( advisory_exclude: List[str] = Field(
description='list of Security Advisory IDs (ALSA-2022:5219) to exclude from checking', description='list of Security Advisory IDs (ALSA-2022:5219) to exclude from checking',
default=SA_EXCLUDE) default=ADVISORY_EXCLUDE)
server_port: int = Field( server_port: int = Field(
description='port that will be used by websever', description='port that will be used by websever',
default=SERVER_PORT) default=SERVER_PORT)
@ -65,6 +67,14 @@ class Config(BaseModel):
update_interval_minutes: int = Field( update_interval_minutes: int = Field(
description='how often service will be running difference checks (in minutes)', description='how often service will be running difference checks (in minutes)',
default=UPDATE_INTERVAL_MINUTES) default=UPDATE_INTERVAL_MINUTES)
albs_url: str = Field(
description='URL of Alma linux build system',
default=ALBS_URL)
albs_jwt_token: str = Field(
description='JWT token that will be used when querying ALBS API')
albs_timeout: int = Field(
description='max time (in seconds) that service will be wait for ALBS API to response',
default=ALBS_TIMEOUT)
@validator("releases", pre=True) @validator("releases", pre=True)
@classmethod @classmethod

View File

@ -7,7 +7,7 @@ from dataclasses import dataclass
@dataclass @dataclass
class Package: class Package:
""" """
Package represents RPM package exstracted from RHEL OVAL Package represents RPM package extracted from RHEL/ALMA OVAL/Errata files
""" """
name: str name: str
version: str version: str

View File

@ -1,18 +0,0 @@
"""
sa contains SecurityAdvisory dataclass definition
"""
from dataclasses import dataclass
from typing import List
from .package import Package
@dataclass
class SecurityAdvisory:
"""
SecurityAdvisory represents Security advisory deffition extracted
from oval or errata
"""
title: str
id: str # pylint: disable=invalid-name
packages: List[Package]

View File

@ -16,7 +16,7 @@ from .config import get_config, Config
from .comparer import comparer_run from .comparer import comparer_run
# This dict holds all current differentes # This dict holds all current differences
diffs = {} diffs = {}
diffs_lock = threading.Lock() diffs_lock = threading.Lock()

View File

@ -37,11 +37,11 @@ releases:
alma_oval_url: https://repo.almalinux.org/security/oval/org.almalinux.alsa-9.xml.bz2 alma_oval_url: https://repo.almalinux.org/security/oval/org.almalinux.alsa-9.xml.bz2
alma_errata_url: https://errata.almalinux.org/9/errata.full.json alma_errata_url: https://errata.almalinux.org/9/errata.full.json
# sa_exclude # advisory_exclude
# list of Security Advisory IDs (ALSA-2022:5219) to exclude from checking # list of advisory IDs (ALSA-2022:5219) to exclude from checking
# requred: no # requred: no
# default: [] # default: []
sa_exclude: [] advisory_exclude: []
# server_port # server_port
# port that will be used by websever # port that will be used by websever
@ -65,4 +65,22 @@ not_before: 2022-5-18
# how often service will be running difference checks (in minutes) # how often service will be running difference checks (in minutes)
# required: no # required: no
# default: 30 # default: 30
update_interval_minutes: 30 update_interval_minutes: 30
# albs_url
# URL of Alma linux build system
# required: no
# default: https://build.almalinux.org
albs_url: https://build.almalinux.org
# albs_jwt_token
# JWT token that will be used when querying ALBS API
# required: yes
# default: N/A
albs_jwt_token:
# albs_timeout
# max time (in seconds) that service will be wait for ALBS API to response
# required: no
# default: 30
albs_timeout: 30

View File

@ -1,4 +1,11 @@
2022-12-30 v1.0.0 2022-12-30 v1.0.0
First version of service First version of service
2023-01-04 v1.0.1 2023-01-04 v1.0.1
Fixed missing packages false positives Fixed missing packages false positives
2023-01-12 v1.0.2
Added support for Bug/Enhancement Advisories
2023-01-20 v2.0.0
Added integration with AlmaLinux Build System (errata feed)