From e8e303aa3ca9db564ea52258de15a81851c3b265 Mon Sep 17 00:00:00 2001 From: Matej Tyc Date: Wed, 12 Oct 2022 11:37:04 +0200 Subject: [PATCH 1/5] Add capability to preselect content from archives Users can specify content path and tailoring path in kickstarts, and the addon should be able to assure that those files are available, and that they have precedence over other files. --- org_fedora_oscap/content_discovery.py | 35 +++++++++++++++++++ tests/test_content_discovery.py | 48 +++++++++++++++++++++++++++ 2 files changed, 83 insertions(+) create mode 100644 tests/test_content_discovery.py diff --git a/org_fedora_oscap/content_discovery.py b/org_fedora_oscap/content_discovery.py index 5fc7343..f654449 100644 --- a/org_fedora_oscap/content_discovery.py +++ b/org_fedora_oscap/content_discovery.py @@ -11,6 +11,7 @@ from org_fedora_oscap import data_fetch, utils from org_fedora_oscap import common from org_fedora_oscap import content_handling +from org_fedora_oscap.content_handling import CONTENT_TYPES from org_fedora_oscap.common import _ @@ -167,6 +168,38 @@ def _verify_fingerprint(self, dest_filename, fingerprint=""): msg = _(f"Integrity check of the content failed - {hash_obj.name} hash didn't match") raise content_handling.ContentCheckError(msg) + def filter_discovered_content(self, labelled_files): + expected_path = self._addon_data.content_path + categories = (CONTENT_TYPES["DATASTREAM"], CONTENT_TYPES["XCCDF_CHECKLIST"]) + if expected_path: + labelled_files = self.reduce_files(labelled_files, expected_path, categories) + + expected_path = self._addon_data.tailoring_path + categories = (CONTENT_TYPES["TAILORING"], ) + if expected_path: + labelled_files = self.reduce_files(labelled_files, expected_path, categories) + + expected_path = self._addon_data.cpe_path + categories = (CONTENT_TYPES["CPE_DICT"], ) + if expected_path: + labelled_files = self.reduce_files(labelled_files, expected_path, categories) + + return labelled_files + + def reduce_files(self, labelled_files, expected_path, categories): + reduced_files = dict() + if expected_path not in labelled_files: + msg = ( + f"Expected a file {expected_path} to be part of the supplied content, " + f"but it was not the case, got only {list(labelled_files.keys())}" + ) + raise RuntimeError(msg) + for path, label in labelled_files.items(): + if label in categories and path != expected_path: + continue + reduced_files[path] = label + return reduced_files + def _finish_actual_fetch(self, wait_for, fingerprint, report_callback, dest_filename): threadMgr.wait(wait_for) actually_fetched_content = wait_for is not None @@ -182,6 +215,8 @@ def _finish_actual_fetch(self, wait_for, fingerprint, report_callback, dest_file structured_content.add_content_archive(dest_filename) labelled_files = content_handling.identify_files(fpaths) + labelled_files = self.filter_discovered_content(labelled_files) + for fname, label in labelled_files.items(): structured_content.add_file(fname, label) diff --git a/tests/test_content_discovery.py b/tests/test_content_discovery.py new file mode 100644 index 0000000..5463c9a --- /dev/null +++ b/tests/test_content_discovery.py @@ -0,0 +1,48 @@ +import pytest + +import org_fedora_oscap.content_discovery as tested_module + + +@pytest.fixture +def labelled_files(): + return { + "dir/datastream": "D", + "dir/datastream2": "D", + "dir/dir/datastream3": "D", + "dir/dir/datastream3": "D", + "dir/XCCDF": "X", + "XCCDF2": "X", + "cpe": "C", + "t1": "T", + "dir3/t2": "T", + } + + +def test_reduce(labelled_files): + bringer = tested_module.ContentBringer(None) + + d_count = 0 + x_count = 0 + for l in labelled_files.values(): + if l == "D": + d_count += 1 + elif l == "X": + x_count += 1 + + reduced = bringer.reduce_files(labelled_files, "dir/datastream", ["D"]) + assert len(reduced) == len(labelled_files) - d_count + 1 + assert "dir/datastream" in reduced + + reduced = bringer.reduce_files(labelled_files, "dir/datastream", ["D", "X"]) + assert len(reduced) == len(labelled_files) - d_count - x_count + 1 + assert "dir/datastream" in reduced + + reduced = bringer.reduce_files(labelled_files, "dir/XCCDF", ["D", "X"]) + assert len(reduced) == len(labelled_files) - d_count - x_count + 1 + assert "dir/XCCDF" in reduced + + with pytest.raises(RuntimeError, match="dir/datastream4"): + bringer.reduce_files(labelled_files, "dir/datastream4", ["D"]) + + reduced = bringer.reduce_files(labelled_files, "cpe", ["C"]) + assert reduced == labelled_files From 82c1950903fcce079cd71f021c1fde25f75f9521 Mon Sep 17 00:00:00 2001 From: Matej Tyc Date: Wed, 12 Oct 2022 11:40:11 +0200 Subject: [PATCH 2/5] Handle changes in content identification The code is able to handle changes in the way how oscap identifies content much more gracefully. --- org_fedora_oscap/content_discovery.py | 13 +++++++++---- org_fedora_oscap/content_handling.py | 5 +++++ 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/org_fedora_oscap/content_discovery.py b/org_fedora_oscap/content_discovery.py index f654449..b20f3a6 100644 --- a/org_fedora_oscap/content_discovery.py +++ b/org_fedora_oscap/content_discovery.py @@ -2,6 +2,7 @@ import logging import pathlib import shutil +import os from glob import glob from pyanaconda.core import constants @@ -214,11 +215,15 @@ def _finish_actual_fetch(self, wait_for, fingerprint, report_callback, dest_file if content_type in ("archive", "rpm"): structured_content.add_content_archive(dest_filename) - labelled_files = content_handling.identify_files(fpaths) - labelled_files = self.filter_discovered_content(labelled_files) + labelled_filenames = content_handling.identify_files(fpaths) + labelled_relative_filenames = { + os.path.relpath(path, self.CONTENT_DOWNLOAD_LOCATION): label + for path, label in labelled_filenames.items()} + labelled_relative_filenames = self.filter_discovered_content(labelled_relative_filenames) - for fname, label in labelled_files.items(): - structured_content.add_file(fname, label) + for rel_fname, label in labelled_relative_filenames.items(): + fname = self.CONTENT_DOWNLOAD_LOCATION / rel_fname + structured_content.add_file(str(fname), label) if fingerprint and dest_filename: structured_content.record_verification(dest_filename) diff --git a/org_fedora_oscap/content_handling.py b/org_fedora_oscap/content_handling.py index 65d5a28..3e2ecae 100644 --- a/org_fedora_oscap/content_handling.py +++ b/org_fedora_oscap/content_handling.py @@ -122,6 +122,11 @@ def get_doc_type(file_path): if line.startswith("Document type:"): _prefix, _sep, type_info = line.partition(":") content_type = type_info.strip() + if content_type not in CONTENT_TYPES.values(): + log.info( + f"File {file_path} labelled by oscap as {content_type}, " + "which is an unexpected type.") + content_type = f"unknown - {content_type}" break except OSError: # 'oscap info' exitted with a non-zero exit code -> unknown doc From b6bf5a6c96f5dbbd78043455802ebc0033cf1a6a Mon Sep 17 00:00:00 2001 From: Matej Tyc Date: Wed, 12 Oct 2022 11:38:51 +0200 Subject: [PATCH 3/5] Remove unused code The function is not referenced anywhere in the project --- org_fedora_oscap/content_handling.py | 40 ---------------------------- 1 file changed, 40 deletions(-) diff --git a/org_fedora_oscap/content_handling.py b/org_fedora_oscap/content_handling.py index 3e2ecae..5096bab 100644 --- a/org_fedora_oscap/content_handling.py +++ b/org_fedora_oscap/content_handling.py @@ -141,43 +141,3 @@ def get_doc_type(file_path): log.info("OSCAP addon: Identified {file_path} as {content_type}" .format(file_path=file_path, content_type=content_type)) return content_type - - -def explore_content_files(fpaths): - """ - Function for finding content files in a list of file paths. SIMPLY PICKS - THE FIRST USABLE CONTENT FILE OF A PARTICULAR TYPE AND JUST PREFERS DATA - STREAMS OVER STANDALONE BENCHMARKS. - - :param fpaths: a list of file paths to search for content files in - :type fpaths: [str] - :return: ContentFiles instance containing the file names of the XCCDF file, - CPE dictionary and tailoring file or "" in place of those items - if not found - :rtype: ContentFiles - - """ - xccdf_file = "" - cpe_file = "" - tailoring_file = "" - found_ds = False - - for fpath in fpaths: - doc_type = get_doc_type(fpath) - if not doc_type: - continue - - # prefer DS over standalone XCCDF - if doc_type == "Source Data Stream" and (not xccdf_file or not found_ds): - xccdf_file = fpath - found_ds = True - elif doc_type == "XCCDF Checklist" and not xccdf_file: - xccdf_file = fpath - elif doc_type == "CPE Dictionary" and not cpe_file: - cpe_file = fpath - elif doc_type == "XCCDF Tailoring" and not tailoring_file: - tailoring_file = fpath - - # TODO: raise exception if no xccdf_file is found? - files = ContentFiles(xccdf_file, cpe_file, tailoring_file) - return files From a990568ccddb2864c8daeae91fdc1f6588b3c6f3 Mon Sep 17 00:00:00 2001 From: Matej Tyc Date: Thu, 13 Oct 2022 14:11:25 +0200 Subject: [PATCH 4/5] Dont use tailoring if it is not expected Take tailorings into account only if it is specified in the kickstart. Compulsive usage of tailoring may be unwanted. --- org_fedora_oscap/content_discovery.py | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/org_fedora_oscap/content_discovery.py b/org_fedora_oscap/content_discovery.py index b20f3a6..e9cf34a 100644 --- a/org_fedora_oscap/content_discovery.py +++ b/org_fedora_oscap/content_discovery.py @@ -169,16 +169,25 @@ def _verify_fingerprint(self, dest_filename, fingerprint=""): msg = _(f"Integrity check of the content failed - {hash_obj.name} hash didn't match") raise content_handling.ContentCheckError(msg) + def allow_one_expected_tailoring_or_no_tailoring(self, labelled_files): + expected_tailoring = self._addon_data.tailoring_path + tailoring_label = CONTENT_TYPES["TAILORING"] + if expected_tailoring: + labelled_files = self.reduce_files(labelled_files, expected_tailoring, [tailoring_label]) + else: + labelled_files = { + path: label for path, label in labelled_files.items() + if label != tailoring_label + } + return labelled_files + def filter_discovered_content(self, labelled_files): expected_path = self._addon_data.content_path categories = (CONTENT_TYPES["DATASTREAM"], CONTENT_TYPES["XCCDF_CHECKLIST"]) if expected_path: labelled_files = self.reduce_files(labelled_files, expected_path, categories) - expected_path = self._addon_data.tailoring_path - categories = (CONTENT_TYPES["TAILORING"], ) - if expected_path: - labelled_files = self.reduce_files(labelled_files, expected_path, categories) + labelled_files = self.allow_one_expected_tailoring_or_no_tailoring(labelled_files) expected_path = self._addon_data.cpe_path categories = (CONTENT_TYPES["CPE_DICT"], ) From c4cb296ca3838a0967c8258b9ed5221691884a36 Mon Sep 17 00:00:00 2001 From: Matej Tyc Date: Tue, 8 Nov 2022 10:46:59 +0100 Subject: [PATCH 5/5] Make the content RPM installation robust If a package manager fails to install the package, use the rpm command directly and skip deps. --- org_fedora_oscap/ks/oscap.py | 41 ++++++++++++++++++++++++++++-------- 1 file changed, 32 insertions(+), 9 deletions(-) diff --git a/org_fedora_oscap/ks/oscap.py b/org_fedora_oscap/ks/oscap.py index e47d6ba..dac273d 100644 --- a/org_fedora_oscap/ks/oscap.py +++ b/org_fedora_oscap/ks/oscap.py @@ -23,6 +23,7 @@ import shutil import re import os +import io import time import logging import pathlib @@ -473,6 +474,33 @@ def setup(self, storage, ksdata, payload): if pkg not in ksdata.packages.packageList: ksdata.packages.packageList.append(pkg) + def _attempt_rpm_installation(self): + log.info("OSCAP addon: Installing the security content RPM to the installed system.") + stdout = io.StringIO() + ret = util.execWithRedirect( + "yum", ["-y", "--nogpg", "install", self.raw_postinst_content_path], + stdout=stdout, root=conf.target.system_root) + stdout.seek(0) + if ret != 0: + log.error( + "OSCAP addon: Error installing security content RPM using yum: {0}", + stdout.read()) + + stdout = io.StringIO() + ret = util.execWithRedirect( + "rpm", ["--install", "--nodeps", self.raw_postinst_content_path], + stdout=stdout, root=conf.target.system_root) + if ret != 0: + log.error( + "OSCAP addon: Error installing security content RPM using rpm: {0}", + stdout.read()) + msg = _(f"Failed to install content RPM to the target system.") + raise RuntimeError(msg) + + def _copy_rpm_to_target_and_install(self, target_content_dir): + shutil.copy2(self.raw_preinst_content_path, target_content_dir) + self._attempt_rpm_installation() + def execute(self, storage, ksdata, users, payload): """ The execute method that should make changes to the installed system. It @@ -507,15 +535,10 @@ def execute(self, storage, ksdata, users, payload): if self.content_type == "datastream": shutil.copy2(self.preinst_content_path, target_content_dir) elif self.content_type == "rpm": - # copy the RPM to the target system - shutil.copy2(self.raw_preinst_content_path, target_content_dir) - - # and install it with yum - ret = util.execInSysroot("yum", ["-y", "--nogpg", "install", - self.raw_postinst_content_path]) - if ret != 0: - msg = _(f"Failed to install content RPM to the target system.") - self._terminate(msg) + try: + self._copy_rpm_to_target_and_install(target_content_dir) + except Exception as exc: + self._terminate(str(exc)) return elif self.content_type == "scap-security-guide": # nothing needed