diff --git a/README.md b/README.md index ac33a59..21e68f3 100644 --- a/README.md +++ b/README.md @@ -30,7 +30,7 @@ check [config.default.yml](config.default.yml) for references ```bash $ pip install -r requirements.txt ``` -4. Create config file using [config.default.yml](config.default.yml) and start service with _albs_oval_errata_diff.py_ script +4. Create config file using [config.default.yml](config.default.yml) and start service with _albs_oval_errata_diff.py_ script ```bash $ python albs_oval_errata_diff.py config.yml 2022-12-29 16:20:11,139 INFO start Trying to load diff file from disk diff --git a/albs_oval_errata_diff/advisory.py b/albs_oval_errata_diff/advisory.py new file mode 100644 index 0000000..e6ec886 --- /dev/null +++ b/albs_oval_errata_diff/advisory.py @@ -0,0 +1,25 @@ +""" +advisory contains Advisory dataclass definition +""" +from dataclasses import dataclass +from typing import List + +from .package import Package + + +@dataclass +class Advisory: + """ + Represents Secutity/Bug/Enhancment advisory deffition extracted + from oval or errata + + Params + ------ + title: RHBA-2022:5749: .NET 6.0 bugfix update (Moderate) + advisory_type: RHBA + id: 2022-5749 + """ + title: str + advisory_type: str + id: str # pylint: disable=invalid-name + packages: List[Package] diff --git a/albs_oval_errata_diff/comparer.py b/albs_oval_errata_diff/comparer.py index 6c0c437..248570d 100644 --- a/albs_oval_errata_diff/comparer.py +++ b/albs_oval_errata_diff/comparer.py @@ -15,7 +15,7 @@ import requests from .config import Config from .package import Package -from .sa import SecurityAdvisory +from .advisory import Advisory def download_oval(url: str, download_dir: Path) -> str: @@ -46,9 +46,21 @@ def download_errata(url: str, release_version: int, download_dir: Path) -> str: return fpath -def parse_oval(fpath: str, not_before: datetime.datetime) -> Dict[str, SecurityAdvisory]: +def extract_id_and_type(string: str) -> Tuple[str, str]: """ - converting oval xml file to dict + Extracts advisory id and advisory type from OVAL title or errata id + Example: + oval: "RHSA-2022:5749: .NET 6.0 bugfix update (Moderate)" -> (2022:5749, SA) + errata: ALSA-2022:6165 -> (id=2022:6165, SA) + """ + regexp = r'((RH|AL)(SA|BA|EA))-(\d{4}:\d+)' + res = re.search(regexp, string) + return res.group(4), res.group(3) + + +def parse_oval(fpath: str, not_before: datetime.datetime) -> Dict[str, Advisory]: + """ + Converts OVAL XML file to Dict of Advisories """ def extract_package(title: str) -> Package: @@ -58,11 +70,6 @@ def parse_oval(fpath: str, not_before: datetime.datetime) -> Dict[str, SecurityA version = res.group(2) return Package(name=name, version=version) - def extract_id(title: str) -> str: - regexp = r'[RH|AL]SA-(\d{4}:\d+)(.*)' - res = re.search(regexp, title) - return res.group(1) - tree = ET.parse(fpath) root = tree.getroot() namespase = { @@ -76,43 +83,57 @@ def parse_oval(fpath: str, not_before: datetime.datetime) -> Dict[str, SecurityA 'n:metadata/n:advisory/n:issued', namespase).attrib['date'] issued_dt = datetime.datetime.strptime(issued, "%Y-%m-%d") - # we are only interesed in Security advisories after RHEL 8.3 - if ('RHSA' not in title and 'ALSA' not in title) or issued_dt < not_before: + # we are only interested in RHEL/OVAL SA/BA/EA + # released after RHEL 8.3 + if not re.match(r'((RH|AL)(SA|BA|EA))', title) or issued_dt < not_before: continue - sa_id = extract_id(title) + + # we are only interested in security based advisories + severity = definition.find( + 'n:metadata/n:advisory/n:severity', namespase) + if severity is None: + continue + + if severity.text.lower() not in ['low', 'moderate', 'important', 'critical']: + continue + + advisory_id, advisory_type = extract_id_and_type(title) packages = [extract_package(i.attrib['comment']) for i in definition.findall(".//n:criterion", namespase) if 'is earlier than' in i.attrib['comment']] - res[sa_id] = SecurityAdvisory( - title=title, id=sa_id, packages=packages) + res[advisory_id] = Advisory(title=title, id=advisory_id, + advisory_type=advisory_type, + packages=packages) return res -def parse_errata(fpath: str) -> Dict[str, SecurityAdvisory]: +def parse_errata(fpath: str) -> Dict[str, Advisory]: """ - parses alma errata file and converts it to dict of SA instances + Parses Alma Errata file and converts it to dict of Advisory instances """ with open(fpath, 'r', encoding='utf-8') as file_to_load: erratas = json.load(file_to_load) res = {} for errata in erratas['data']: title = errata['title'] - sa_id = errata['id'].split('-')[-1] + advisory_id, advisory_type = extract_id_and_type(errata['id']) packages = [] for package in errata['packages']: full_name = f"{package['name']}-{package['version']}" if full_name not in packages: packages.append(full_name) packages.sort() - res[sa_id] = SecurityAdvisory( - title=title, id=sa_id, packages=packages) + res[advisory_id] = Advisory(title=title, + id=advisory_id, + advisory_type=advisory_type, + packages=packages) return res -def compare(rhel_oval: Dict[str, SecurityAdvisory], - alma_oval: Dict[str, SecurityAdvisory], - alma_errata: Dict[str, SecurityAdvisory], - sa_exclude: List[str], +def compare(rhel_oval: Dict[str, Advisory], + alma_oval: Dict[str, Advisory], + alma_errata: Dict[str, Advisory], + advisory_exclude: List[str], packages_exclude: List[str]) -> Tuple[dict, list]: """ compares rhel oval with alma oval and alma errata @@ -120,49 +141,49 @@ def compare(rhel_oval: Dict[str, SecurityAdvisory], diff = [] report = { # total amount of security advisories - 'total_sa_count': 0, - # amount of SA that match with rhel - 'good_sa_count': 0, + 'total_advisory_count': 0, + # amount of ALMA advisory that match with RHEL + 'good_advisory_count': 0, # total amount of differencies 'diff_count': 0, - # list of SA excluded from diff check - 'excluded_sa': [], + # list of advisories excluded from diff check + 'excluded_adv': [], # list of packages excluded from diff check 'excluded_pkg': [], - # amount of oval SA that dont exists in oval file - 'oval_missing_sa_count': 0, - # amount of oval SA that have missing packages - 'oval_missing_pkg_sa_count': 0, - # list of missing oval SA - 'oval_missing_sa': [], - # list of oval SA that have missing packages - 'oval_missing_pkg_sa': [], - # amount of SA that dont exists in errata file - 'errata_missing_sa_count': 0, - # amount of errata SA that have missing packages - 'errata_missing_pkg_sa_count': 0, - # list of SA that are missing in errata file - 'errata_missing_sa': [], - # list of errata SA with missing packages - 'errata_missing_pkg_sa': [], + # amount of oval advisories that dont exists in oval file + 'oval_missing_advisory_count': 0, + # amount of oval advisories that have missing packages + 'oval_missing_pkg_advisory_count': 0, + # list of missing oval advisories + 'oval_missing_advisory': [], + # list of oval advisories that have missing packages + 'oval_missing_pkg_advisory': [], + # amount of advisories that dont exists in errata file + 'errata_missing_advisory_count': 0, + # amount of errata advisories that have missing packages + 'errata_missing_pkg_advisory_count': 0, + # list of advisories that are missing in errata file + 'errata_missing_advisory': [], + # list of errata advisories with missing packages + 'errata_missing_pkg_advisory': [], # total amount of unique missing packages across all alma SA 'missing_packages_unique_count': 0, # list of unique packages that missing across all alma SA 'missing_packages_unique': [] } - for rhel_sa_id, rhel_sa in rhel_oval.items(): - report['total_sa_count'] += 1 - sa_name = f'ALSA-{rhel_sa_id}' + for rhel_advisory_id, rhel_advisory in rhel_oval.items(): + report['total_advisory_count'] += 1 + advisory_name = f'AL{rhel_advisory.advisory_type}-{rhel_advisory_id}' - # filtering out SA - if sa_name in sa_exclude: - report['excluded_sa'].append(sa_name) + # filtering out advisories + if advisory_name in advisory_exclude: + report['excluded_advisory'].append(advisory_name) continue # filtefing out packages packages_to_check: List[Package] = [] - for package in rhel_sa.packages: + for package in rhel_advisory.packages: if any(package.name == i for i in packages_exclude): if str(package) not in report['excluded_pkg']: report['excluded_pkg'].append(str(package)) @@ -171,24 +192,25 @@ def compare(rhel_oval: Dict[str, SecurityAdvisory], # check oval try: - alma_oval_sa = alma_oval[rhel_sa_id] + alma_oval_advisory = alma_oval[rhel_advisory_id] except KeyError: report['diff_count'] += 1 - diff.append({'sa_name': sa_name, 'diff': 'SA is missing in oval'}) - report['oval_missing_sa'].append(sa_name) - report['oval_missing_sa_count'] += 1 + diff.append({'advisory_name': advisory_name, + 'diff': 'advisory is missing in oval'}) + report['oval_missing_advisory'].append(advisory_name) + report['oval_missing_advisory_count'] += 1 else: - # check if some packages are missing from oval SA - alma_oval_packages = alma_oval_sa.packages + # check if some packages are missing from OVAL advisories + alma_oval_packages = alma_oval_advisory.packages alma_oval_missing_packages = [str(r) for r in packages_to_check if str(r) not in [str(i) for i in alma_oval_packages]] if alma_oval_missing_packages: report['diff_count'] += 1 - diff_str = f"missing packages in oval SA: {','.join(alma_oval_missing_packages)}" - diff.append({'sa_name': sa_name, + diff_str = f"OVAL advisory has missing packages: {','.join(alma_oval_missing_packages)}" + diff.append({'advisory_name': advisory_name, 'diff': diff_str}) - report['oval_missing_pkg_sa'].append(sa_name) - report['oval_missing_pkg_sa_count'] += 1 + report['oval_missing_pkg_advisory'].append(advisory_name) + report['oval_missing_pkg_advisory_count'] += 1 for missing_package in alma_oval_missing_packages: if missing_package not in report['missing_packages_unique']: report['missing_packages_unique'].append( @@ -197,13 +219,13 @@ def compare(rhel_oval: Dict[str, SecurityAdvisory], # check errata try: - alma_errata_sa = alma_errata[rhel_sa_id] + alma_errata_sa = alma_errata[rhel_advisory_id] except KeyError: - report['errata_missing_sa'].append(sa_name) - report['errata_missing_sa_count'] += 1 + report['errata_missing_advisory'].append(advisory_name) + report['errata_missing_advisory_count'] += 1 report['diff_count'] += 1 diff.append( - {'sa_name': sa_name, 'diff': 'SA is missing in errata'}) + {'advisory_name': advisory_name, 'diff': 'advisory is missing in errata'}) continue # check if some packages are missing from errata SA alma_errata_packages = alma_errata_sa.packages @@ -212,18 +234,18 @@ def compare(rhel_oval: Dict[str, SecurityAdvisory], if str(r) not in [str(i) for i in alma_errata_packages]] if alma_errata_missing_packages: report['diff_count'] += 1 - diff_str = f"missing packages in errata SA: {','.join(alma_errata_missing_packages)}" - diff.append({'sa_name': sa_name, + diff_str = f"Errata advisory has missing packages: {','.join(alma_errata_missing_packages)}" + diff.append({'advisory_name': advisory_name, 'diff': diff_str}) - report['errata_missing_pkg_sa'].append(sa_name) - report['errata_missing_pkg_sa_count'] += 1 + report['errata_missing_pkg_advisory'].append(advisory_name) + report['errata_missing_pkg_advisory_count'] += 1 for missing_package in alma_errata_missing_packages: if missing_package not in report['missing_packages_unique']: report['missing_packages_unique'].append(missing_package) report['missing_packages_unique_count'] += 1 else: # if we here, all checks were passed - report['good_sa_count'] += 1 + report['good_advisory_count'] += 1 for item in report.values(): if isinstance(item, list): @@ -262,7 +284,7 @@ def comparer_run(config: Config) -> Dict[str, Any]: compare(rhel_oval_dict, alma_oval_dict, alma_errata_dict, - config.sa_exclude, + config.advisory_exclude, config.packages_exclude) result[release] = {'report': report_release, 'diff': diff_release, @@ -271,6 +293,6 @@ def comparer_run(config: Config) -> Dict[str, Any]: 'alma_errata_url': urls.alma_errata_url} result['report_generated'] = datetime.datetime.now().timestamp() * 1000 - result['sa_not_before'] = config.not_before.timestamp() * 1000 + result['advisory_not_before'] = config.not_before.timestamp() * 1000 return result diff --git a/albs_oval_errata_diff/config.py b/albs_oval_errata_diff/config.py index 4896f88..eb7fd71 100644 --- a/albs_oval_errata_diff/config.py +++ b/albs_oval_errata_diff/config.py @@ -16,7 +16,7 @@ DIFF_FILE = Path('/tmp/albs-oval-errata-diff.json') DOWNLOAD_DIR = Path('/tmp') LOG_FILE = Path('logs/albs-oval-errata-diff.log') PACKAGES_EXCLUDE = [] -SA_EXCLUDE = [] +ADVISORY_EXCLUDE = [] SERVER_PORT = 3001 SERVER_IP = IPv4Address('127.0.0.1') # not checking anything before RHEL-9.0 release @@ -50,9 +50,9 @@ class Config(BaseModel): default=PACKAGES_EXCLUDE) releases: Dict[int, ReleaseUrls] = Field( description='list of OS releases with Oval/Errata URLs to check') - sa_exclude: List[str] = Field( + advisory_exclude: List[str] = Field( description='list of Security Advisory IDs (ALSA-2022:5219) to exclude from checking', - default=SA_EXCLUDE) + default=ADVISORY_EXCLUDE) server_port: int = Field( description='port that will be used by websever', default=SERVER_PORT) diff --git a/albs_oval_errata_diff/package.py b/albs_oval_errata_diff/package.py index 3da1272..59dfbf6 100644 --- a/albs_oval_errata_diff/package.py +++ b/albs_oval_errata_diff/package.py @@ -7,7 +7,7 @@ from dataclasses import dataclass @dataclass class Package: """ - Package represents RPM package exstracted from RHEL OVAL + Package represents RPM package extracted from RHEL/ALMA OVAL/Errata files """ name: str version: str diff --git a/albs_oval_errata_diff/sa.py b/albs_oval_errata_diff/sa.py deleted file mode 100644 index 40e07da..0000000 --- a/albs_oval_errata_diff/sa.py +++ /dev/null @@ -1,18 +0,0 @@ -""" -sa contains SecurityAdvisory dataclass definition -""" -from dataclasses import dataclass -from typing import List - -from .package import Package - - -@dataclass -class SecurityAdvisory: - """ - SecurityAdvisory represents Security advisory deffition extracted - from oval or errata - """ - title: str - id: str # pylint: disable=invalid-name - packages: List[Package] diff --git a/albs_oval_errata_diff/start.py b/albs_oval_errata_diff/start.py index 8b681c8..7cdc8a9 100644 --- a/albs_oval_errata_diff/start.py +++ b/albs_oval_errata_diff/start.py @@ -16,7 +16,7 @@ from .config import get_config, Config from .comparer import comparer_run -# This dict holds all current differentes +# This dict holds all current differences diffs = {} diffs_lock = threading.Lock() diff --git a/config.default.yml b/config.default.yml index e597784..e71029d 100644 --- a/config.default.yml +++ b/config.default.yml @@ -37,11 +37,11 @@ releases: alma_oval_url: https://repo.almalinux.org/security/oval/org.almalinux.alsa-9.xml.bz2 alma_errata_url: https://errata.almalinux.org/9/errata.full.json -# sa_exclude -# list of Security Advisory IDs (ALSA-2022:5219) to exclude from checking +# advisories_exclude +# list of advisory IDs (ALSA-2022:5219) to exclude from checking # requred: no # default: [] -sa_exclude: [] +advisories_exclude: [] # server_port # port that will be used by websever