audit/0001-Add-ausysrulevalidate.patch
Sergio Correia 211fca783a
Validates the sample rules we ship
Resolves: rhbz#1985630
2021-08-16 11:09:09 -03:00

218 lines
7.3 KiB
Diff

From 4011007b445e8f8da9b0cc45eccd793b94f6b5ce Mon Sep 17 00:00:00 2001
From: Sergio Correia <scorreia@redhat.com>
Date: Thu, 29 Jul 2021 19:25:43 -0300
Subject: [PATCH] Add ausysrulevalidate
---
contrib/ausysrulevalidate | 198 ++++++++++++++++++++++++++++++++++++++
1 file changed, 198 insertions(+)
create mode 100755 contrib/ausysrulevalidate
diff --git a/contrib/ausysrulevalidate b/contrib/ausysrulevalidate
new file mode 100755
index 0000000..a251b2c
--- /dev/null
+++ b/contrib/ausysrulevalidate
@@ -0,0 +1,198 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+
+# ausysrulevalidate - A program that lets you validate the syscalls
+# in audit rules.
+# Copyright (c) 2021 Red Hat Inc., Durham, North Carolina.
+# All Rights Reserved.
+#
+# This software may be freely redistributed and/or modified under the
+# terms of the GNU General Public License as published by the Free
+# Software Foundation; either version 2, or (at your option) any
+# later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; see the file COPYING. If not, write to the
+# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor
+# Boston, MA 02110-1335, USA.
+#
+# Authors:
+# Sergio Correia <scorreia@redhat.com>
+
+""" This program lets you validate syscalls in audit rules. """
+
+import argparse
+import os.path
+import sys
+
+import audit
+
+
+class AuSyscallRuleValidate:
+ """AuSyscallRuleValidate validates syscalls in audit rules."""
+
+ def __init__(self):
+ self.syscalls_table = {}
+ self.invalid_syscalls = {}
+ self.machines = {
+ "b32": audit.audit_determine_machine("b32"),
+ "b64": audit.audit_determine_machine("b64"),
+ }
+
+ if self.machines["b32"] == -1 or self.machines["b64"] == -1:
+ sys.stderr.write("ERROR: Unable to determine machine type\n")
+ sys.exit(1)
+
+ def validate_syscall(self, arch, syscall):
+ """Validates a single syscall."""
+
+ if syscall == "all":
+ return True
+
+ lookup = "{0}:{1}".format(arch, syscall)
+ if lookup in self.syscalls_table:
+ return self.syscalls_table[lookup]
+
+ ret = audit.audit_name_to_syscall(syscall, self.machines[arch])
+ self.syscalls_table[lookup] = ret != -1
+ if not self.syscalls_table[lookup]:
+ self.invalid_syscalls[lookup] = lookup
+
+ return self.syscalls_table[lookup]
+
+ def process_syscalls(self, arch, syscalls):
+ """Processes a group of syscalls, validating them individually."""
+
+ scalls = syscalls.split(",")
+ processed = []
+ for syscall in scalls:
+ if self.validate_syscall(arch, syscall):
+ processed.append(syscall)
+ return ",".join(processed)
+
+ def parse_line(self, line):
+ """Processes a single line from the audit rules file, and returns the
+ same line adjusted, if required, by removing invalid syscalls, or even
+ removing the rule altogether, if no valid syscall remain after
+ validation."""
+
+ if line.lstrip().startswith("#") or "-S" not in line:
+ return line
+
+ # We do have a rule specifying syscalls, so let's validate them.
+ tokens = line.split()
+ processed = []
+ is_syscall = False
+ arch = None
+
+ for val in tokens:
+ if not is_syscall:
+ processed.append(val)
+
+ if val.startswith("arch="):
+ archs = val.split("=")
+ if len(archs) == 2:
+ arch = val.split("=")[1]
+ if arch not in self.machines:
+ sys.stderr.write("ERROR: unexpected arch '{0}'\n".format(arch))
+ continue
+
+ if val == "-S":
+ is_syscall = True
+ continue
+
+ if is_syscall:
+ is_syscall = False
+ scalls = self.process_syscalls(arch, val)
+
+ if len(scalls) == 0:
+ processed = processed[:-1]
+ continue
+ processed.append(scalls)
+
+ if "-S" not in processed:
+ # Removing rule altogether, as we have no valid syscalls remaining.
+ return None
+ return " ".join(processed)
+
+ def process_rules(self, rules_file):
+ """Reads a file with audit rules and returns the rules after
+ validation of syscalls/architecture. Invalid syscalls will be removed
+ and, if there are no valid remaining syscalls, the rule itself is
+ removed."""
+
+ if not os.path.isfile(rules_file):
+ sys.stderr.write("ERROR: rules file '{0}' not found\n".format(rules_file))
+ sys.exit(1)
+
+ with open(rules_file) as rules:
+ content = rules.readlines()
+
+ processed = []
+ changed = False
+ for line in content:
+ validated = self.parse_line(line)
+ if validated is None:
+ changed = True
+ continue
+
+ if validated.rstrip("\r\n") != line.rstrip("\r\n"):
+ changed = True
+ processed.append(validated.rstrip("\r\n"))
+
+ invalid_syscalls = []
+ for invalid in self.invalid_syscalls:
+ invalid_syscalls.append(invalid)
+
+ return (processed, changed, invalid_syscalls)
+
+ def update_rules(self, rules_file):
+ """Reads a file with audit rules and updates it after validation of
+ syscalls/architecture. Invalid syscalls will be removed and, if
+ there are no valid remaining syscalls, the rule itself is removed."""
+
+ new_rules, changed, invalid_syscalls = self.process_rules(rules_file)
+ if changed:
+ with open(rules_file, "w") as rules:
+ for line in new_rules:
+ rules.write("{0}\n".format(line))
+
+ return (new_rules, changed, invalid_syscalls)
+
+
+if __name__ == "__main__":
+ parser = argparse.ArgumentParser(description="ausysrulevalidate")
+ parser.add_argument(
+ "-u", "--update", help="Update rules file if required", action="store_true"
+ )
+ parser.add_argument(
+ "-v", "--verbose", help="Show the resulting rules file", action="store_true"
+ )
+ required_named = parser.add_argument_group("required named arguments")
+ required_named.add_argument(
+ "-r", "--rules-file", help="Rules file name", required=True
+ )
+ args = parser.parse_args()
+
+ validator = AuSyscallRuleValidate()
+
+ action = validator.process_rules
+ if args.update:
+ action = validator.update_rules
+
+ data, changed, invalid = action(args.rules_file)
+ if changed:
+ verb = "require"
+ if args.update:
+ verb += "d"
+ sys.stderr.write("Rules in '{0}' {1} changes\n".format(args.rules_file, verb))
+ if len(invalid) > 0:
+ sys.stderr.write("Invalid syscalls: {0}\n".format(", ".join(invalid)))
+
+ if args.verbose:
+ print(*data, sep="\n")
--
2.31.1