|
|
|
@ -15,7 +15,7 @@ import requests
|
|
|
|
|
|
|
|
|
|
from .config import Config
|
|
|
|
|
from .package import Package
|
|
|
|
|
from .sa import SecurityAdvisory
|
|
|
|
|
from .advisory import Advisory
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def download_oval(url: str, download_dir: Path) -> str:
|
|
|
|
@ -46,9 +46,21 @@ def download_errata(url: str, release_version: int, download_dir: Path) -> str:
|
|
|
|
|
return fpath
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def parse_oval(fpath: str, not_before: datetime.datetime) -> Dict[str, SecurityAdvisory]:
|
|
|
|
|
def extract_id_and_type(string: str) -> Tuple[str, str]:
|
|
|
|
|
"""
|
|
|
|
|
converting oval xml file to dict
|
|
|
|
|
Extracts advisory id and advisory type from OVAL title or errata id
|
|
|
|
|
Example:
|
|
|
|
|
oval: "RHSA-2022:5749: .NET 6.0 bugfix update (Moderate)" -> (2022:5749, SA)
|
|
|
|
|
errata: ALSA-2022:6165 -> (id=2022:6165, SA)
|
|
|
|
|
"""
|
|
|
|
|
regexp = r'((RH|AL)(SA|BA|EA))-(\d{4}:\d+)'
|
|
|
|
|
res = re.search(regexp, string)
|
|
|
|
|
return res.group(4), res.group(3)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def parse_oval(fpath: str, not_before: datetime.datetime) -> Dict[str, Advisory]:
|
|
|
|
|
"""
|
|
|
|
|
Converts OVAL XML file to Dict of Advisories
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
def extract_package(title: str) -> Package:
|
|
|
|
@ -58,11 +70,6 @@ def parse_oval(fpath: str, not_before: datetime.datetime) -> Dict[str, SecurityA
|
|
|
|
|
version = res.group(2)
|
|
|
|
|
return Package(name=name, version=version)
|
|
|
|
|
|
|
|
|
|
def extract_id(title: str) -> str:
|
|
|
|
|
regexp = r'[RH|AL]SA-(\d{4}:\d+)(.*)'
|
|
|
|
|
res = re.search(regexp, title)
|
|
|
|
|
return res.group(1)
|
|
|
|
|
|
|
|
|
|
tree = ET.parse(fpath)
|
|
|
|
|
root = tree.getroot()
|
|
|
|
|
namespase = {
|
|
|
|
@ -76,43 +83,57 @@ def parse_oval(fpath: str, not_before: datetime.datetime) -> Dict[str, SecurityA
|
|
|
|
|
'n:metadata/n:advisory/n:issued', namespase).attrib['date']
|
|
|
|
|
issued_dt = datetime.datetime.strptime(issued, "%Y-%m-%d")
|
|
|
|
|
|
|
|
|
|
# we are only interesed in Security advisories after RHEL 8.3
|
|
|
|
|
if ('RHSA' not in title and 'ALSA' not in title) or issued_dt < not_before:
|
|
|
|
|
# we are only interested in RHEL/OVAL SA/BA/EA
|
|
|
|
|
# released after RHEL 8.3
|
|
|
|
|
if not re.match(r'((RH|AL)(SA|BA|EA))', title) or issued_dt < not_before:
|
|
|
|
|
continue
|
|
|
|
|
sa_id = extract_id(title)
|
|
|
|
|
|
|
|
|
|
# we are only interested in security based advisories
|
|
|
|
|
severity = definition.find(
|
|
|
|
|
'n:metadata/n:advisory/n:severity', namespase)
|
|
|
|
|
if severity is None:
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
if severity.text.lower() not in ['low', 'moderate', 'important', 'critical']:
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
advisory_id, advisory_type = extract_id_and_type(title)
|
|
|
|
|
packages = [extract_package(i.attrib['comment']) for
|
|
|
|
|
i in definition.findall(".//n:criterion", namespase)
|
|
|
|
|
if 'is earlier than' in i.attrib['comment']]
|
|
|
|
|
res[sa_id] = SecurityAdvisory(
|
|
|
|
|
title=title, id=sa_id, packages=packages)
|
|
|
|
|
res[advisory_id] = Advisory(title=title, id=advisory_id,
|
|
|
|
|
advisory_type=advisory_type,
|
|
|
|
|
packages=packages)
|
|
|
|
|
return res
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def parse_errata(fpath: str) -> Dict[str, SecurityAdvisory]:
|
|
|
|
|
def parse_errata(fpath: str) -> Dict[str, Advisory]:
|
|
|
|
|
"""
|
|
|
|
|
parses alma errata file and converts it to dict of SA instances
|
|
|
|
|
Parses Alma Errata file and converts it to dict of Advisory instances
|
|
|
|
|
"""
|
|
|
|
|
with open(fpath, 'r', encoding='utf-8') as file_to_load:
|
|
|
|
|
erratas = json.load(file_to_load)
|
|
|
|
|
res = {}
|
|
|
|
|
for errata in erratas['data']:
|
|
|
|
|
title = errata['title']
|
|
|
|
|
sa_id = errata['id'].split('-')[-1]
|
|
|
|
|
advisory_id, advisory_type = extract_id_and_type(errata['id'])
|
|
|
|
|
packages = []
|
|
|
|
|
for package in errata['packages']:
|
|
|
|
|
full_name = f"{package['name']}-{package['version']}"
|
|
|
|
|
if full_name not in packages:
|
|
|
|
|
packages.append(full_name)
|
|
|
|
|
packages.sort()
|
|
|
|
|
res[sa_id] = SecurityAdvisory(
|
|
|
|
|
title=title, id=sa_id, packages=packages)
|
|
|
|
|
res[advisory_id] = Advisory(title=title,
|
|
|
|
|
id=advisory_id,
|
|
|
|
|
advisory_type=advisory_type,
|
|
|
|
|
packages=packages)
|
|
|
|
|
return res
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def compare(rhel_oval: Dict[str, SecurityAdvisory],
|
|
|
|
|
alma_oval: Dict[str, SecurityAdvisory],
|
|
|
|
|
alma_errata: Dict[str, SecurityAdvisory],
|
|
|
|
|
sa_exclude: List[str],
|
|
|
|
|
def compare(rhel_oval: Dict[str, Advisory],
|
|
|
|
|
alma_oval: Dict[str, Advisory],
|
|
|
|
|
alma_errata: Dict[str, Advisory],
|
|
|
|
|
advisory_exclude: List[str],
|
|
|
|
|
packages_exclude: List[str]) -> Tuple[dict, list]:
|
|
|
|
|
"""
|
|
|
|
|
compares rhel oval with alma oval and alma errata
|
|
|
|
@ -120,49 +141,49 @@ def compare(rhel_oval: Dict[str, SecurityAdvisory],
|
|
|
|
|
diff = []
|
|
|
|
|
report = {
|
|
|
|
|
# total amount of security advisories
|
|
|
|
|
'total_sa_count': 0,
|
|
|
|
|
# amount of SA that match with rhel
|
|
|
|
|
'good_sa_count': 0,
|
|
|
|
|
'total_advisory_count': 0,
|
|
|
|
|
# amount of ALMA advisory that match with RHEL
|
|
|
|
|
'good_advisory_count': 0,
|
|
|
|
|
# total amount of differencies
|
|
|
|
|
'diff_count': 0,
|
|
|
|
|
# list of SA excluded from diff check
|
|
|
|
|
'excluded_sa': [],
|
|
|
|
|
# list of advisories excluded from diff check
|
|
|
|
|
'excluded_adv': [],
|
|
|
|
|
# list of packages excluded from diff check
|
|
|
|
|
'excluded_pkg': [],
|
|
|
|
|
# amount of oval SA that dont exists in oval file
|
|
|
|
|
'oval_missing_sa_count': 0,
|
|
|
|
|
# amount of oval SA that have missing packages
|
|
|
|
|
'oval_missing_pkg_sa_count': 0,
|
|
|
|
|
# list of missing oval SA
|
|
|
|
|
'oval_missing_sa': [],
|
|
|
|
|
# list of oval SA that have missing packages
|
|
|
|
|
'oval_missing_pkg_sa': [],
|
|
|
|
|
# amount of SA that dont exists in errata file
|
|
|
|
|
'errata_missing_sa_count': 0,
|
|
|
|
|
# amount of errata SA that have missing packages
|
|
|
|
|
'errata_missing_pkg_sa_count': 0,
|
|
|
|
|
# list of SA that are missing in errata file
|
|
|
|
|
'errata_missing_sa': [],
|
|
|
|
|
# list of errata SA with missing packages
|
|
|
|
|
'errata_missing_pkg_sa': [],
|
|
|
|
|
# amount of oval advisories that dont exists in oval file
|
|
|
|
|
'oval_missing_advisory_count': 0,
|
|
|
|
|
# amount of oval advisories that have missing packages
|
|
|
|
|
'oval_missing_pkg_advisory_count': 0,
|
|
|
|
|
# list of missing oval advisories
|
|
|
|
|
'oval_missing_advisory': [],
|
|
|
|
|
# list of oval advisories that have missing packages
|
|
|
|
|
'oval_missing_pkg_advisory': [],
|
|
|
|
|
# amount of advisories that dont exists in errata file
|
|
|
|
|
'errata_missing_advisory_count': 0,
|
|
|
|
|
# amount of errata advisories that have missing packages
|
|
|
|
|
'errata_missing_pkg_advisory_count': 0,
|
|
|
|
|
# list of advisories that are missing in errata file
|
|
|
|
|
'errata_missing_advisory': [],
|
|
|
|
|
# list of errata advisories with missing packages
|
|
|
|
|
'errata_missing_pkg_advisory': [],
|
|
|
|
|
# total amount of unique missing packages across all alma SA
|
|
|
|
|
'missing_packages_unique_count': 0,
|
|
|
|
|
# list of unique packages that missing across all alma SA
|
|
|
|
|
'missing_packages_unique': []
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for rhel_sa_id, rhel_sa in rhel_oval.items():
|
|
|
|
|
report['total_sa_count'] += 1
|
|
|
|
|
sa_name = f'ALSA-{rhel_sa_id}'
|
|
|
|
|
for rhel_advisory_id, rhel_advisory in rhel_oval.items():
|
|
|
|
|
report['total_advisory_count'] += 1
|
|
|
|
|
advisory_name = f'AL{rhel_advisory.advisory_type}-{rhel_advisory_id}'
|
|
|
|
|
|
|
|
|
|
# filtering out SA
|
|
|
|
|
if sa_name in sa_exclude:
|
|
|
|
|
report['excluded_sa'].append(sa_name)
|
|
|
|
|
# filtering out advisories
|
|
|
|
|
if advisory_name in advisory_exclude:
|
|
|
|
|
report['excluded_advisory'].append(advisory_name)
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
# filtefing out packages
|
|
|
|
|
packages_to_check: List[Package] = []
|
|
|
|
|
for package in rhel_sa.packages:
|
|
|
|
|
for package in rhel_advisory.packages:
|
|
|
|
|
if any(package.name == i for i in packages_exclude):
|
|
|
|
|
if str(package) not in report['excluded_pkg']:
|
|
|
|
|
report['excluded_pkg'].append(str(package))
|
|
|
|
@ -171,24 +192,25 @@ def compare(rhel_oval: Dict[str, SecurityAdvisory],
|
|
|
|
|
|
|
|
|
|
# check oval
|
|
|
|
|
try:
|
|
|
|
|
alma_oval_sa = alma_oval[rhel_sa_id]
|
|
|
|
|
alma_oval_advisory = alma_oval[rhel_advisory_id]
|
|
|
|
|
except KeyError:
|
|
|
|
|
report['diff_count'] += 1
|
|
|
|
|
diff.append({'sa_name': sa_name, 'diff': 'SA is missing in oval'})
|
|
|
|
|
report['oval_missing_sa'].append(sa_name)
|
|
|
|
|
report['oval_missing_sa_count'] += 1
|
|
|
|
|
diff.append({'advisory_name': advisory_name,
|
|
|
|
|
'diff': 'Advisory is missing in OVAL'})
|
|
|
|
|
report['oval_missing_advisory'].append(advisory_name)
|
|
|
|
|
report['oval_missing_advisory_count'] += 1
|
|
|
|
|
else:
|
|
|
|
|
# check if some packages are missing from oval SA
|
|
|
|
|
alma_oval_packages = alma_oval_sa.packages
|
|
|
|
|
# check if some packages are missing from OVAL advisories
|
|
|
|
|
alma_oval_packages = alma_oval_advisory.packages
|
|
|
|
|
alma_oval_missing_packages = [str(r) for r in packages_to_check
|
|
|
|
|
if str(r) not in [str(i) for i in alma_oval_packages]]
|
|
|
|
|
if alma_oval_missing_packages:
|
|
|
|
|
report['diff_count'] += 1
|
|
|
|
|
diff_str = f"missing packages in oval SA: {','.join(alma_oval_missing_packages)}"
|
|
|
|
|
diff.append({'sa_name': sa_name,
|
|
|
|
|
diff_str = f"OVAL advisory has missing packages: {','.join(alma_oval_missing_packages)}"
|
|
|
|
|
diff.append({'advisory_name': advisory_name,
|
|
|
|
|
'diff': diff_str})
|
|
|
|
|
report['oval_missing_pkg_sa'].append(sa_name)
|
|
|
|
|
report['oval_missing_pkg_sa_count'] += 1
|
|
|
|
|
report['oval_missing_pkg_advisory'].append(advisory_name)
|
|
|
|
|
report['oval_missing_pkg_advisory_count'] += 1
|
|
|
|
|
for missing_package in alma_oval_missing_packages:
|
|
|
|
|
if missing_package not in report['missing_packages_unique']:
|
|
|
|
|
report['missing_packages_unique'].append(
|
|
|
|
@ -197,13 +219,13 @@ def compare(rhel_oval: Dict[str, SecurityAdvisory],
|
|
|
|
|
|
|
|
|
|
# check errata
|
|
|
|
|
try:
|
|
|
|
|
alma_errata_sa = alma_errata[rhel_sa_id]
|
|
|
|
|
alma_errata_sa = alma_errata[rhel_advisory_id]
|
|
|
|
|
except KeyError:
|
|
|
|
|
report['errata_missing_sa'].append(sa_name)
|
|
|
|
|
report['errata_missing_sa_count'] += 1
|
|
|
|
|
report['errata_missing_advisory'].append(advisory_name)
|
|
|
|
|
report['errata_missing_advisory_count'] += 1
|
|
|
|
|
report['diff_count'] += 1
|
|
|
|
|
diff.append(
|
|
|
|
|
{'sa_name': sa_name, 'diff': 'SA is missing in errata'})
|
|
|
|
|
{'advisory_name': advisory_name, 'diff': 'Advisory is missing in Errata'})
|
|
|
|
|
continue
|
|
|
|
|
# check if some packages are missing from errata SA
|
|
|
|
|
alma_errata_packages = alma_errata_sa.packages
|
|
|
|
@ -212,18 +234,18 @@ def compare(rhel_oval: Dict[str, SecurityAdvisory],
|
|
|
|
|
if str(r) not in [str(i) for i in alma_errata_packages]]
|
|
|
|
|
if alma_errata_missing_packages:
|
|
|
|
|
report['diff_count'] += 1
|
|
|
|
|
diff_str = f"missing packages in errata SA: {','.join(alma_errata_missing_packages)}"
|
|
|
|
|
diff.append({'sa_name': sa_name,
|
|
|
|
|
diff_str = f"Errata advisory has missing packages: {','.join(alma_errata_missing_packages)}"
|
|
|
|
|
diff.append({'advisory_name': advisory_name,
|
|
|
|
|
'diff': diff_str})
|
|
|
|
|
report['errata_missing_pkg_sa'].append(sa_name)
|
|
|
|
|
report['errata_missing_pkg_sa_count'] += 1
|
|
|
|
|
report['errata_missing_pkg_advisory'].append(advisory_name)
|
|
|
|
|
report['errata_missing_pkg_advisory_count'] += 1
|
|
|
|
|
for missing_package in alma_errata_missing_packages:
|
|
|
|
|
if missing_package not in report['missing_packages_unique']:
|
|
|
|
|
report['missing_packages_unique'].append(missing_package)
|
|
|
|
|
report['missing_packages_unique_count'] += 1
|
|
|
|
|
else:
|
|
|
|
|
# if we here, all checks were passed
|
|
|
|
|
report['good_sa_count'] += 1
|
|
|
|
|
report['good_advisory_count'] += 1
|
|
|
|
|
|
|
|
|
|
for item in report.values():
|
|
|
|
|
if isinstance(item, list):
|
|
|
|
@ -262,7 +284,7 @@ def comparer_run(config: Config) -> Dict[str, Any]:
|
|
|
|
|
compare(rhel_oval_dict,
|
|
|
|
|
alma_oval_dict,
|
|
|
|
|
alma_errata_dict,
|
|
|
|
|
config.sa_exclude,
|
|
|
|
|
config.advisory_exclude,
|
|
|
|
|
config.packages_exclude)
|
|
|
|
|
result[release] = {'report': report_release,
|
|
|
|
|
'diff': diff_release,
|
|
|
|
@ -271,6 +293,6 @@ def comparer_run(config: Config) -> Dict[str, Any]:
|
|
|
|
|
'alma_errata_url': urls.alma_errata_url}
|
|
|
|
|
|
|
|
|
|
result['report_generated'] = datetime.datetime.now().timestamp() * 1000
|
|
|
|
|
result['sa_not_before'] = config.not_before.timestamp() * 1000
|
|
|
|
|
result['advisory_not_before'] = config.not_before.timestamp() * 1000
|
|
|
|
|
|
|
|
|
|
return result
|
|
|
|
|