From b836b65caa9112262bb479a4a9ab33856a422c3e Mon Sep 17 00:00:00 2001 From: Matej Tyc Date: Fri, 14 Jan 2022 17:24:00 +0100 Subject: [PATCH] Add support for the firstboot remediation OpenSCAP can support a new remediation type - a classical scan + remediate that is executed during the first boot. This PR adds support for this functionality, and allows to control it via a remediate kickstart property. --- org_fedora_oscap/common.py | 41 ++++++++++++++++++++ org_fedora_oscap/service/installation.py | 49 +++++++++++++++++++++++- org_fedora_oscap/service/kickstart.py | 8 ++++ org_fedora_oscap/service/oscap.py | 42 ++++++++++++-------- org_fedora_oscap/structures.py | 14 +++++++ tests/test_interface.py | 6 ++- 6 files changed, 140 insertions(+), 20 deletions(-) diff --git a/org_fedora_oscap/common.py b/org_fedora_oscap/common.py index eeb27fc..08df723 100644 --- a/org_fedora_oscap/common.py +++ b/org_fedora_oscap/common.py @@ -29,6 +29,7 @@ import subprocess import zipfile import tarfile +import textwrap import re import logging @@ -341,6 +342,46 @@ def run_oscap_remediate(profile, fpath, ds_id="", xccdf_id="", tailoring="", return proc.stdout +def _schedule_firstboot_remediation( + chroot, profile, ds_path, results_path, report_path, ds_id, xccdf_id, tailoring_path): + config = textwrap.dedent(f"""\ + OSCAP_REMEDIATE_DS='{ds_path}' + OSCAP_REMEDIATE_PROFILE_ID='{profile}' + OSCAP_REMEDIATE_ARF_RESULT='{results_path}' + OSCAP_REMEDIATE_HTML_REPORT='{report_path}' + """) + if ds_id: + config += "OSCAP_REMEDIATE_DATASTREAM_ID='{ds_id}'\n" + if xccdf_id: + config += "OSCAP_REMEDIATE_XCCDF_ID='{xccdf_id}'\n" + if tailoring_path: + config += "OSCAP_REMEDIATE_TAILORING='{tailoring_path}'\n" + + relative_filename = "var/tmp/oscap-remediate-offline.conf.sh" + local_config_filename = f"/{relative_filename}" + chroot_config_filename = os.path.join(chroot, relative_filename) + with open(chroot_config_filename, "w") as f: + f.write(config) + os.symlink(local_config_filename, + os.path.join(chroot, "system-update")) + + +def schedule_firstboot_remediation(chroot, profile, fpath, ds_id="", xccdf_id="", tailoring=""): + if not profile: + return "" + + # make sure the directory for the results exists + results_dir = os.path.dirname(RESULTS_PATH) + results_dir = os.path.normpath(chroot + "/" + results_dir) + utils.ensure_dir_exists(results_dir) + + log.info("OSCAP addon: Scheduling firstboot remediation") + _schedule_firstboot_remediation( + chroot, profile, fpath, RESULTS_PATH, REPORT_PATH, ds_id, xccdf_id, tailoring) + + return "" + + def extract_data(archive, out_dir, ensure_has_files=None): """ Fuction that extracts the given archive to the given output directory. It diff --git a/org_fedora_oscap/service/installation.py b/org_fedora_oscap/service/installation.py index 290da40..d79cce9 100644 --- a/org_fedora_oscap/service/installation.py +++ b/org_fedora_oscap/service/installation.py @@ -31,7 +31,8 @@ log = logging.getLogger("anaconda") -REQUIRED_PACKAGES = ("openscap", "openscap-scanner",) +# scanner is useful for the post remediation, utils for the firstboot remediation +REQUIRED_PACKAGES = ("openscap", "openscap-scanner", "openscap-utils",) def _handle_error(exception): @@ -263,3 +264,49 @@ def run(self): msg = _(f"Something went wrong during the final hardening: {str(exc)}.") terminate(msg) return + + +class ScheduleFirstbootRemediationTask(Task): + """The installation task for running the remediation.""" + + def __init__(self, sysroot, policy_data, target_content_path, + target_tailoring_path): + """Create a task.""" + super().__init__() + self._sysroot = sysroot + self._policy_data = policy_data + self._target_content_path = target_content_path + self._target_tailoring_path = target_tailoring_path + + @property + def name(self): + return "Schedule first-boot remediation" + + def run(self): + """Run the task.""" + try: + common.assert_scanner_works( + chroot=self._sysroot, executable="oscap") + except Exception as exc: + msg_lines = [_( + "The 'oscap' scanner doesn't work in the installed system: {error}" + .format(error=str(exc)))] + msg_lines.append(_("As a result, the installed system can't be hardened.")) + terminate("\n".join(msg_lines)) + return + + try: + common.schedule_firstboot_remediation( + self._sysroot, + self._policy_data.profile_id, + self._target_content_path, + self._policy_data.datastream_id, + self._policy_data.xccdf_id, + self._target_tailoring_path, + ) + except Exception as exc: + msg = _( + "Something went wrong when scheduling the first-boot remediation: {exc}." + .format(exc=str(exc))) + terminate(msg) + return diff --git a/org_fedora_oscap/service/kickstart.py b/org_fedora_oscap/service/kickstart.py index dc1a100..3a762d7 100644 --- a/org_fedora_oscap/service/kickstart.py +++ b/org_fedora_oscap/service/kickstart.py @@ -76,6 +76,7 @@ def handle_line(self, line, line_number=None): "tailoring-path": self._parse_tailoring_path, "fingerprint": self._parse_fingerprint, "certificates": self._parse_certificates, + "remediate": self._parse_remediate, } line = line.strip() @@ -145,6 +146,10 @@ def _parse_fingerprint(self, value): def _parse_certificates(self, value): self.policy_data.certificates = value + def _parse_remediate(self, value): + assert value in ("none", "post", "firstboot", "both") + self.policy_data.remediate = value + def handle_end(self): """Handle the end of the section.""" tmpl = "%s missing for the %s addon" @@ -235,6 +240,9 @@ def __str__(self): if self.policy_data.certificates: ret += "\n%s" % key_value_pair("certificates", self.policy_data.certificates) + if self.policy_data.remediate: + ret += "\n%s" % key_value_pair("remediate", self.policy_data.remediate) + ret += "\n%end\n\n" return ret diff --git a/org_fedora_oscap/service/oscap.py b/org_fedora_oscap/service/oscap.py index 65da08b..c54135b 100755 --- a/org_fedora_oscap/service/oscap.py +++ b/org_fedora_oscap/service/oscap.py @@ -29,7 +29,7 @@ from org_fedora_oscap import common from org_fedora_oscap.constants import OSCAP from org_fedora_oscap.service.installation import PrepareValidContent, \ - EvaluateRulesTask, InstallContentTask, RemediateSystemTask + EvaluateRulesTask, InstallContentTask, RemediateSystemTask, ScheduleFirstbootRemediationTask from org_fedora_oscap.service.kickstart import OSCAPKickstartSpecification, KickstartParseError from org_fedora_oscap.service.oscap_interface import OSCAPInterface from org_fedora_oscap.structures import PolicyData @@ -208,22 +208,30 @@ def install_with_tasks(self): log.debug("OSCAP Addon: The installation is disabled. Skip the installation.") return [] - tasks = [ - InstallContentTask( - sysroot=conf.target.system_root, - policy_data=self.policy_data, - file_path=common.get_raw_preinst_content_path(self.policy_data), - content_path=common.get_preinst_content_path(self.policy_data), - tailoring_path=common.get_preinst_tailoring_path(self.policy_data), - target_directory=common.TARGET_CONTENT_DIR, - ), - RemediateSystemTask( - sysroot=conf.target.system_root, - policy_data=self.policy_data, - target_content_path=common.get_postinst_content_path(self.policy_data), - target_tailoring_path=common.get_postinst_tailoring_path(self.policy_data) - ) - ] + tasks = [] + tasks.append(InstallContentTask( + sysroot=conf.target.system_root, + policy_data=self.policy_data, + file_path=common.get_raw_preinst_content_path(self.policy_data), + content_path=common.get_preinst_content_path(self.policy_data), + tailoring_path=common.get_preinst_tailoring_path(self.policy_data), + target_directory=common.TARGET_CONTENT_DIR + )) + if self.policy_data.remediate in ("", "post", "both"): + tasks.append(RemediateSystemTask( + sysroot=conf.target.system_root, + policy_data=self.policy_data, + target_content_path=common.get_postinst_content_path(self.policy_data), + target_tailoring_path=common.get_postinst_tailoring_path(self.policy_data) + )) + + if self.policy_data.remediate in ("", "firstboot", "both"): + tasks.append(ScheduleFirstbootRemediationTask( + sysroot=conf.target.system_root, + policy_data=self.policy_data, + target_content_path=common.get_postinst_content_path(self.policy_data), + target_tailoring_path=common.get_postinst_tailoring_path(self.policy_data) + )) self._cancel_tasks_on_error(tasks) return tasks diff --git a/org_fedora_oscap/structures.py b/org_fedora_oscap/structures.py index 03c71b4..644836e 100644 --- a/org_fedora_oscap/structures.py +++ b/org_fedora_oscap/structures.py @@ -36,6 +36,7 @@ def __init__(self): self._tailoring_path = "" self._fingerprint = "" self._certificates = "" + self._remediate = "" def update_from(self, rhs): self._content_type = rhs._content_type @@ -48,6 +49,7 @@ def update_from(self, rhs): self._tailoring_path = rhs._tailoring_path self._fingerprint = rhs._fingerprint self._certificates = rhs._certificates + self._remediate = rhs._remediate @property def content_type(self) -> Str: @@ -190,3 +192,15 @@ def certificates(self) -> Str: @certificates.setter def certificates(self, value: Str): self._certificates = value + + @property + def remediate(self) -> Str: + """What remediations to perform + + :return: a remediation mode + """ + return self._remediate + + @remediate.setter + def remediate(self, value: Str): + self._remediate = value diff --git a/tests/test_interface.py b/tests/test_interface.py index 395071f..daa1de1 100644 --- a/tests/test_interface.py +++ b/tests/test_interface.py @@ -85,7 +85,8 @@ def test_policy_data(interface: OSCAPInterface, callback): "cpe-path": get_variant(Str, "/usr/share/oscap/cpe.xml"), "tailoring-path": get_variant(Str, "/usr/share/oscap/tailoring.xml"), "fingerprint": get_variant(Str, "240f2f18222faa98856c3b4fc50c4195"), - "certificates": get_variant(Str, "/usr/share/oscap/cacert.pem") + "certificates": get_variant(Str, "/usr/share/oscap/cacert.pem"), + "remediate": get_variant(Str, "both"), } interface.PolicyData = policy_structure @@ -178,11 +179,12 @@ def test_install_with_tasks(service: OSCAPService, interface: OSCAPInterface): service.policy_data = data object_paths = interface.InstallWithTasks() - assert len(object_paths) == 2 + assert len(object_paths) == 3 tasks = TaskContainer.from_object_path_list(object_paths) assert isinstance(tasks[0], installation.InstallContentTask) assert isinstance(tasks[1], installation.RemediateSystemTask) + assert isinstance(tasks[2], installation.ScheduleFirstbootRemediationTask) def test_cancel_tasks(service: OSCAPService):