lorax-composer: Add firewall support to blueprints

You can now open ports in the firewall, using port numbers or service
names:

    [customizations.firewall]
    ports = ["22:tcp", "80:tcp", "imap:tcp", "53:tcp", "53:udp"]

Or enable/disable services registered with firewalld:

     [customizations.firewall.services]
     enabled = ["ftp", "ntp", "dhcp"]
     disabled = ["telnet"]

If the template contains firewall --disabled it cannot be overridden,
under the assumption that it is required for the image to boot in the
selected environment.
This commit is contained in:
Brian C. Lane 2019-04-15 11:55:51 -07:00
parent e5a8700bdf
commit 4d35668ab5
4 changed files with 210 additions and 3 deletions

View File

@ -292,9 +292,8 @@ By default the firewall blocks all access except for services that enable their
like ``sshd``. This command can be used to open other ports or services. Ports are configured using like ``sshd``. This command can be used to open other ports or services. Ports are configured using
the port:protocol format:: the port:protocol format::
[customizations.firewall.ports] [customizations.firewall]
enabled = ["80:tcp", "imap:tcp", "53:tcp", "53:udp"] ports = ["22:tcp", "80:tcp", "imap:tcp", "53:tcp", "53:udp"]
disabled = ["23:tcp", "mysql:tcp"]
Numeric ports, or their names from ``/etc/services`` can be used in the ``ports`` enabled/disabled lists. Numeric ports, or their names from ``/etc/services`` can be used in the ``ports`` enabled/disabled lists.
@ -303,12 +302,15 @@ in a ``customizations.firewall.services`` section::
[customizations.firewall.services] [customizations.firewall.services]
enabled = ["ftp", "ntp", "dhcp"] enabled = ["ftp", "ntp", "dhcp"]
disabled = ["telnet"]
Note that these are different from the names in ``/etc/services``, and only ``enabled`` is supported. Note that these are different from the names in ``/etc/services``, and only ``enabled`` is supported.
Both are optional, if they are not used leave them out or set them to an empty list ``[]``. If you Both are optional, if they are not used leave them out or set them to an empty list ``[]``. If you
only want the default firewall setup this section can be omitted from the blueprint. only want the default firewall setup this section can be omitted from the blueprint.
NOTE: The ``Google`` and ``OpenStack`` templates explicitly disable the firewall for their environment.
This cannot be overridden by the blueprint.
[customizations.services] [customizations.services]
************************* *************************

View File

@ -281,6 +281,53 @@ def get_keyboard_layout(recipe):
return recipe["customizations"]["locale"]["keyboard"] return recipe["customizations"]["locale"]["keyboard"]
def firewall_cmd(line, settings):
""" Update the firewall line with the new ports and services
:param line: The firewall ... line
:type line: str
:param settings: A dict with the list of services and ports to enable and disable
:type settings: dict
Using pykickstart to process the line is the best way to make sure it
is parsed correctly, and re-assembled for inclusion into the final kickstart
"""
ks_version = makeVersion()
ks = KickstartParser(ks_version, errorsAreFatal=False, missingIncludeIsFatal=False)
ks.readKickstartFromString(line)
# Do not override firewall --disabled
if ks.handler.firewall.enabled != False and settings:
ks.handler.firewall.ports = settings["ports"]
ks.handler.firewall.services = settings["enabled"]
ks.handler.firewall.remove_services = settings["disabled"]
# Converting back to a string includes a comment, return just the keyboard line
return str(ks.handler.firewall).splitlines()[-1]
def get_firewall_settings(recipe):
"""Return the customizations.firewall settings
:param recipe: The recipe
:type recipe: Recipe object
:returns: A dict of settings
:rtype: dict
"""
settings = {"ports": [], "enabled": [], "disabled": []}
if "customizations" not in recipe or \
"firewall" not in recipe["customizations"]:
return settings
settings["ports"] = recipe["customizations"]["firewall"].get("ports", [])
if "services" in recipe["customizations"]["firewall"]:
settings["enabled"] = recipe["customizations"]["firewall"]["services"].get("enabled", [])
settings["disabled"] = recipe["customizations"]["firewall"]["services"].get("disabled", [])
return settings
def customize_ks_template(ks_template, recipe): def customize_ks_template(ks_template, recipe):
""" Customize the kickstart template and return it """ Customize the kickstart template and return it
@ -314,6 +361,9 @@ def customize_ks_template(ks_template, recipe):
"keyboard": [keyboard_cmd, "keyboard": [keyboard_cmd,
get_keyboard_layout(recipe), get_keyboard_layout(recipe),
'keyboard --xlayouts us --vckeymap us', True], 'keyboard --xlayouts us --vckeymap us', True],
"firewall": [firewall_cmd,
get_firewall_settings(recipe),
'firewall --enabled', True],
} }
found = {} found = {}

View File

@ -24,6 +24,7 @@ from pylorax import get_buildarch
from pylorax.api.compose import add_customizations, get_extra_pkgs, compose_types from pylorax.api.compose import add_customizations, get_extra_pkgs, compose_types
from pylorax.api.compose import timezone_cmd, get_timezone_settings from pylorax.api.compose import timezone_cmd, get_timezone_settings
from pylorax.api.compose import lang_cmd, get_languages, keyboard_cmd, get_keyboard_layout from pylorax.api.compose import lang_cmd, get_languages, keyboard_cmd, get_keyboard_layout
from pylorax.api.compose import firewall_cmd, get_firewall_settings
from pylorax.api.compose import get_kernel_append, bootloader_append, customize_ks_template from pylorax.api.compose import get_kernel_append, bootloader_append, customize_ks_template
from pylorax.api.config import configure, make_dnf_dirs from pylorax.api.config import configure, make_dnf_dirs
from pylorax.api.dnfbase import get_base_object from pylorax.api.dnfbase import get_base_object
@ -389,6 +390,65 @@ languages = ["en_CA.utf8", "en_HK.utf8"]
"de (dvorak)"), "de (dvorak)"),
"keyboard 'de (dvorak)'") "keyboard 'de (dvorak)'")
def test_get_firewall_settings(self):
"""Test get_firewall_settings function"""
blueprint_data = """name = "test-firewall"
description = "test recipe"
version = "0.0.1"
"""
firewall_ports = """
[customizations.firewall]
ports = ["22:tcp", "80:tcp", "imap:tcp", "53:tcp", "53:udp"]
"""
firewall_services = """
[customizations.firewall.services]
enabled = ["ftp", "ntp", "dhcp"]
disabled = ["telnet"]
"""
blueprint2_data = blueprint_data + firewall_ports
blueprint3_data = blueprint_data + firewall_services
blueprint4_data = blueprint_data + firewall_ports + firewall_services
recipe = recipe_from_toml(blueprint_data)
self.assertEqual(get_firewall_settings(recipe), {'ports': [], 'enabled': [], 'disabled': []})
recipe = recipe_from_toml(blueprint2_data)
self.assertEqual(get_firewall_settings(recipe),
{"ports": ["22:tcp", "80:tcp", "imap:tcp", "53:tcp", "53:udp"],
"enabled": [], "disabled": []})
recipe = recipe_from_toml(blueprint3_data)
self.assertEqual(get_firewall_settings(recipe),
{"ports": [],
"enabled": ["ftp", "ntp", "dhcp"], "disabled": ["telnet"]})
recipe = recipe_from_toml(blueprint4_data)
self.assertEqual(get_firewall_settings(recipe),
{"ports": ["22:tcp", "80:tcp", "imap:tcp", "53:tcp", "53:udp"],
"enabled": ["ftp", "ntp", "dhcp"], "disabled": ["telnet"]})
def test_firewall_cmd(self):
"""Test firewall_cmd function"""
self.assertEqual(firewall_cmd("firewall --enabled", {}), "firewall --enabled")
self.assertEqual(firewall_cmd("firewall --enabled",
{"ports": ["22:tcp", "80:tcp", "imap:tcp", "53:tcp", "53:udp"],
"enabled": [], "disabled": []}),
"firewall --enabled --port=22:tcp,80:tcp,imap:tcp,53:tcp,53:udp")
self.assertEqual(firewall_cmd("firewall --enabled",
{"ports": ["22:tcp", "80:tcp", "imap:tcp", "53:tcp", "53:udp"],
"enabled": ["ftp", "ntp", "dhcp"], "disabled": []}),
"firewall --enabled --port=22:tcp,80:tcp,imap:tcp,53:tcp,53:udp --service=ftp,ntp,dhcp")
self.assertEqual(firewall_cmd("firewall --enabled",
{"ports": ["22:tcp", "80:tcp", "imap:tcp", "53:tcp", "53:udp"],
"enabled": ["ftp", "ntp", "dhcp"], "disabled": ["telnet"]}),
"firewall --enabled --port=22:tcp,80:tcp,imap:tcp,53:tcp,53:udp --service=ftp,ntp,dhcp --remove-service=telnet")
# Make sure that --disabled overrides setting ports and services
self.assertEqual(firewall_cmd("firewall --disabled",
{"ports": ["22:tcp", "80:tcp", "imap:tcp", "53:tcp", "53:udp"],
"enabled": ["ftp", "ntp", "dhcp"], "disabled": ["telnet"]}),
"firewall --disabled")
def _checkBootloader(self, result, append_str, line_limit=0): def _checkBootloader(self, result, append_str, line_limit=0):
"""Find the bootloader line and make sure append_str is in it""" """Find the bootloader line and make sure append_str is in it"""
# Optionally check to make sure the change is at the top of the template # Optionally check to make sure the change is at the top of the template
@ -454,6 +514,28 @@ languages = ["en_CA.utf8", "en_HK.utf8"]
line_num += 1 line_num += 1
return False return False
def _checkFirewall(self, result, settings, line_limit=0):
"""Find the firewall line and make sure it is as expected"""
# Optionally check to make sure the change is at the top of the template
line_num = 0
for line in result.splitlines():
if line.startswith("firewall"):
# First layout is used twice, so total count should be n+1
ports = all([bool(p in line) for p in settings["ports"]])
enabled = all([bool(e in line) for e in settings["enabled"]])
disabled = all([bool(d in line) for d in settings["disabled"]])
if ports and enabled and disabled:
if line_limit == 0 or line_num < line_limit:
return True
else:
print("FAILED: firewall not in the first %d lines of the output" % line_limit)
return False
else:
print("FAILED: %s not matching %s" % (settings, line))
line_num += 1
return False
def test_template_defaults(self): def test_template_defaults(self):
"""Test that customize_ks_template includes defaults correctly""" """Test that customize_ks_template includes defaults correctly"""
blueprint_data = """name = "test-kernel" blueprint_data = """name = "test-kernel"
@ -506,6 +588,13 @@ ntpservers = ["0.north-america.pool.ntp.org", "1.north-america.pool.ntp.org"]
[customizations.locale] [customizations.locale]
keyboard = "de (dvorak)" keyboard = "de (dvorak)"
languages = ["en_CA.utf8", "en_HK.utf8"] languages = ["en_CA.utf8", "en_HK.utf8"]
[customizations.firewall]
ports = ["22:tcp", "80:tcp", "imap:tcp", "53:tcp", "53:udp"]
[customizations.firewall.services]
enabled = ["ftp", "ntp", "dhcp"]
disabled = ["telnet"]
""" """
tz_dict = {"timezone": "US/Samoa", "ntpservers": ["0.north-america.pool.ntp.org", "1.north-america.pool.ntp.org"]} tz_dict = {"timezone": "US/Samoa", "ntpservers": ["0.north-america.pool.ntp.org", "1.north-america.pool.ntp.org"]}
recipe = recipe_from_toml(blueprint_data) recipe = recipe_from_toml(blueprint_data)
@ -520,6 +609,10 @@ languages = ["en_CA.utf8", "en_HK.utf8"]
self.assertEqual(sum([1 for l in result.splitlines() if l.startswith("lang")]), 1) self.assertEqual(sum([1 for l in result.splitlines() if l.startswith("lang")]), 1)
self.assertTrue(self._checkKeyboard(result, "de (dvorak)", line_limit=4)) self.assertTrue(self._checkKeyboard(result, "de (dvorak)", line_limit=4))
self.assertEqual(sum([1 for l in result.splitlines() if l.startswith("keyboard")]), 1) self.assertEqual(sum([1 for l in result.splitlines() if l.startswith("keyboard")]), 1)
self.assertTrue(self._checkFirewall(result,
{"ports": ["22:tcp", "80:tcp", "imap:tcp", "53:tcp", "53:udp"],
"enabled": ["ftp", "ntp", "dhcp"], "disabled": ["telnet"]}, line_limit=6))
self.assertEqual(sum([1 for l in result.splitlines() if l.startswith("firewall")]), 1)
# Test against a kickstart with a bootloader line # Test against a kickstart with a bootloader line
result = customize_ks_template("firewall --enabled\nbootloader --location=mbr\n", recipe) result = customize_ks_template("firewall --enabled\nbootloader --location=mbr\n", recipe)
@ -560,6 +653,18 @@ languages = ["en_CA.utf8", "en_HK.utf8"]
if sum([1 for l in result.splitlines() if l.startswith("keyboard")]) != 1: if sum([1 for l in result.splitlines() if l.startswith("keyboard")]) != 1:
errors.append(("keyboard for compose_type %s failed: More than 1 entry" % compose_type, result)) errors.append(("keyboard for compose_type %s failed: More than 1 entry" % compose_type, result))
# google and openstack templates requires the firewall to be disabled
if compose_type == "google" or compose_type == "openstack":
if not self._checkFirewall(result, {'ports': [], 'enabled': [], 'disabled': []}):
errors.append(("firewall for compose_type %s failed" % compose_type, result))
else:
if not self._checkFirewall(result,
{"ports": ["22:tcp", "80:tcp", "imap:tcp", "53:tcp", "53:udp"],
"enabled": ["ftp", "ntp", "dhcp"], "disabled": ["telnet"]}):
errors.append(("firewall for compose_type %s failed" % compose_type, result))
if sum([1 for l in result.splitlines() if l.startswith("firewall")]) != 1:
errors.append(("firewall for compose_type %s failed: More than 1 entry" % compose_type, result))
# Print the bad results # Print the bad results
for e, r in errors: for e, r in errors:
print("%s:\n%s\n\n" % (e, r)) print("%s:\n%s\n\n" % (e, r))

View File

@ -523,6 +523,56 @@ languages = ["en_CA.utf8", "en_HK.utf8"]
self.assertEqual(ks.handler.lang.lang, "en_CA.utf8") self.assertEqual(ks.handler.lang.lang, "en_CA.utf8")
self.assertEqual(ks.handler.lang.addsupport, ["en_HK.utf8"]) self.assertEqual(ks.handler.lang.addsupport, ["en_HK.utf8"])
def test_firewall_ports(self):
blueprint_data = """name = "test-firewall"
description = "test recipe"
version = "0.0.1"
"""
blueprint2_data = blueprint_data + """
[customizations.firewall]
ports = ["22:tcp", "80:tcp", "imap:tcp", "53:tcp", "53:udp"]
"""
ks = self._blueprint_to_ks(blueprint_data)
self.assertEqual(ks.handler.firewall.ports, [])
self.assertEqual(ks.handler.firewall.services, [])
self.assertEqual(ks.handler.firewall.remove_services, [])
ks = self._blueprint_to_ks(blueprint2_data)
self.assertEqual(ks.handler.firewall.ports, ["22:tcp", "80:tcp", "imap:tcp", "53:tcp", "53:udp"])
self.assertEqual(ks.handler.firewall.services, [])
self.assertEqual(ks.handler.firewall.remove_services, [])
def test_firewall_services(self):
blueprint_data = """name = "test-firewall"
description = "test recipe"
version = "0.0.1"
[customizations.firewall.services]
enabled = ["ftp", "ntp", "dhcp"]
disabled = ["telnet"]
"""
ks = self._blueprint_to_ks(blueprint_data)
self.assertEqual(ks.handler.firewall.ports, [])
self.assertEqual(ks.handler.firewall.services, ["ftp", "ntp", "dhcp"])
self.assertEqual(ks.handler.firewall.remove_services, ["telnet"])
def test_firewall(self):
blueprint_data = """name = "test-firewall"
description = "test recipe"
version = "0.0.1"
[customizations.firewall]
ports = ["22:tcp", "80:tcp", "imap:tcp", "53:tcp", "53:udp"]
[customizations.firewall.services]
enabled = ["ftp", "ntp", "dhcp"]
disabled = ["telnet"]
"""
ks = self._blueprint_to_ks(blueprint_data)
self.assertEqual(ks.handler.firewall.ports, ["22:tcp", "80:tcp", "imap:tcp", "53:tcp", "53:udp"])
self.assertEqual(ks.handler.firewall.services, ["ftp", "ntp", "dhcp"])
self.assertEqual(ks.handler.firewall.remove_services, ["telnet"])
def test_user(self): def test_user(self):
blueprint_data = """name = "test-user" blueprint_data = """name = "test-user"
description = "test recipe" description = "test recipe"