From acbf63013a7cbae3dbb8147aa3fdfe7ba9b96824 Mon Sep 17 00:00:00 2001 From: "Brian C. Lane" Date: Fri, 6 Sep 2019 14:05:06 -0700 Subject: [PATCH] lifted: Make sure inputs cannot have path elements This processes the inputs with os.path.basename to strip off any path elements and prevent potential path traversal attacks. Also adds a test. --- src/lifted/providers.py | 12 +++++++++++- src/lifted/queue.py | 3 +++ tests/lifted/test_providers.py | 9 +++++++++ 3 files changed, 23 insertions(+), 1 deletion(-) diff --git a/src/lifted/providers.py b/src/lifted/providers.py index 934b794a..dc97cb11 100644 --- a/src/lifted/providers.py +++ b/src/lifted/providers.py @@ -37,6 +37,9 @@ def _get_profile_path(ucfg, provider_name, profile, exists=True): :raises: ValueError when passed invalid settings or an invalid profile name :raises: RuntimeError when the provider or profile couldn't be found """ + # Make sure no path elements are present + profile = os.path.basename(profile) + provider_name = os.path.basename(provider_name) if not profile: raise ValueError("Profile name cannot be empty!") if not provider_name: @@ -50,7 +53,7 @@ def _get_profile_path(ucfg, provider_name, profile, exists=True): if exists and not os.path.isfile(path): raise RuntimeError(f'Couldn\'t find profile "{profile}"!') - return path + return os.path.abspath(path) def resolve_provider(ucfg, provider_name): """Get information about the specified provider as defined in that @@ -71,6 +74,8 @@ def resolve_provider(ucfg, provider_name): :returns: the provider :rtype: dict """ + # Make sure no path elements are present + provider_name = os.path.basename(provider_name) path = os.path.join(ucfg["providers_dir"], provider_name, "provider.toml") try: with open(path) as provider_file: @@ -91,6 +96,8 @@ def load_profiles(ucfg, provider_name): :returns: a dict of settings dicts, keyed by profile name :rtype: dict """ + # Make sure no path elements are present + provider_name = os.path.basename(provider_name) def load_path(path): with open(path) as file: @@ -114,6 +121,9 @@ def resolve_playbook_path(ucfg, provider_name): :returns: the path to the playbook :rtype: str """ + # Make sure no path elements are present + provider_name = os.path.basename(provider_name) + path = os.path.join(ucfg["providers_dir"], provider_name, "playbook.yaml") if not os.path.isfile(path): raise RuntimeError(f'Couldn\'t find playbook for "{provider_name}"!') diff --git a/src/lifted/queue.py b/src/lifted/queue.py index a025ef97..8f12537e 100644 --- a/src/lifted/queue.py +++ b/src/lifted/queue.py @@ -56,6 +56,9 @@ def _get_queue_path(ucfg): def _get_upload_path(ucfg, uuid, write=False): + # Make sure no path elements are present + uuid = os.path.basename(uuid) + path = os.path.join(_get_queue_path(ucfg), f"{uuid}.toml") if write and not os.path.exists(path): open(path, "a").close() diff --git a/tests/lifted/test_providers.py b/tests/lifted/test_providers.py index d644a20b..441e3021 100644 --- a/tests/lifted/test_providers.py +++ b/tests/lifted/test_providers.py @@ -22,6 +22,7 @@ import unittest import lifted.config from lifted.providers import list_providers, resolve_provider, resolve_playbook_path, save_settings from lifted.providers import load_profiles, validate_settings, load_settings, delete_profile +from lifted.providers import _get_profile_path import pylorax.api.config from pylorax.sysutils import joinpaths @@ -39,6 +40,14 @@ class ProvidersTestCase(unittest.TestCase): def tearDownClass(self): shutil.rmtree(self.root_dir) + def test_get_profile_path(self): + """Make sure that _get_profile_path strips path elements from the input""" + path = _get_profile_path(self.config["upload"], "azure", "staging-settings", exists=False) + self.assertEqual(path, os.path.abspath(joinpaths(self.config["upload"]["settings_dir"], "azure/staging-settings.toml"))) + + path = _get_profile_path(self.config["upload"], "../../../../foo/bar/azure", "/not/my/path/staging-settings", exists=False) + self.assertEqual(path, os.path.abspath(joinpaths(self.config["upload"]["settings_dir"], "azure/staging-settings.toml"))) + def test_list_providers(self): p = list_providers(self.config["upload"]) self.assertEqual(p, ['azure', 'dummy', 'openstack', 'vsphere'])