lifted: Make sure inputs cannot have path elements
This processes the inputs with os.path.basename to strip off any path elements and prevent potential path traversal attacks. Also adds a test.
This commit is contained in:
parent
7396c272b2
commit
acbf63013a
@ -37,6 +37,9 @@ def _get_profile_path(ucfg, provider_name, profile, exists=True):
|
|||||||
:raises: ValueError when passed invalid settings or an invalid profile name
|
:raises: ValueError when passed invalid settings or an invalid profile name
|
||||||
:raises: RuntimeError when the provider or profile couldn't be found
|
:raises: RuntimeError when the provider or profile couldn't be found
|
||||||
"""
|
"""
|
||||||
|
# Make sure no path elements are present
|
||||||
|
profile = os.path.basename(profile)
|
||||||
|
provider_name = os.path.basename(provider_name)
|
||||||
if not profile:
|
if not profile:
|
||||||
raise ValueError("Profile name cannot be empty!")
|
raise ValueError("Profile name cannot be empty!")
|
||||||
if not provider_name:
|
if not provider_name:
|
||||||
@ -50,7 +53,7 @@ def _get_profile_path(ucfg, provider_name, profile, exists=True):
|
|||||||
if exists and not os.path.isfile(path):
|
if exists and not os.path.isfile(path):
|
||||||
raise RuntimeError(f'Couldn\'t find profile "{profile}"!')
|
raise RuntimeError(f'Couldn\'t find profile "{profile}"!')
|
||||||
|
|
||||||
return path
|
return os.path.abspath(path)
|
||||||
|
|
||||||
def resolve_provider(ucfg, provider_name):
|
def resolve_provider(ucfg, provider_name):
|
||||||
"""Get information about the specified provider as defined in that
|
"""Get information about the specified provider as defined in that
|
||||||
@ -71,6 +74,8 @@ def resolve_provider(ucfg, provider_name):
|
|||||||
:returns: the provider
|
:returns: the provider
|
||||||
:rtype: dict
|
:rtype: dict
|
||||||
"""
|
"""
|
||||||
|
# Make sure no path elements are present
|
||||||
|
provider_name = os.path.basename(provider_name)
|
||||||
path = os.path.join(ucfg["providers_dir"], provider_name, "provider.toml")
|
path = os.path.join(ucfg["providers_dir"], provider_name, "provider.toml")
|
||||||
try:
|
try:
|
||||||
with open(path) as provider_file:
|
with open(path) as provider_file:
|
||||||
@ -91,6 +96,8 @@ def load_profiles(ucfg, provider_name):
|
|||||||
:returns: a dict of settings dicts, keyed by profile name
|
:returns: a dict of settings dicts, keyed by profile name
|
||||||
:rtype: dict
|
:rtype: dict
|
||||||
"""
|
"""
|
||||||
|
# Make sure no path elements are present
|
||||||
|
provider_name = os.path.basename(provider_name)
|
||||||
|
|
||||||
def load_path(path):
|
def load_path(path):
|
||||||
with open(path) as file:
|
with open(path) as file:
|
||||||
@ -114,6 +121,9 @@ def resolve_playbook_path(ucfg, provider_name):
|
|||||||
:returns: the path to the playbook
|
:returns: the path to the playbook
|
||||||
:rtype: str
|
:rtype: str
|
||||||
"""
|
"""
|
||||||
|
# Make sure no path elements are present
|
||||||
|
provider_name = os.path.basename(provider_name)
|
||||||
|
|
||||||
path = os.path.join(ucfg["providers_dir"], provider_name, "playbook.yaml")
|
path = os.path.join(ucfg["providers_dir"], provider_name, "playbook.yaml")
|
||||||
if not os.path.isfile(path):
|
if not os.path.isfile(path):
|
||||||
raise RuntimeError(f'Couldn\'t find playbook for "{provider_name}"!')
|
raise RuntimeError(f'Couldn\'t find playbook for "{provider_name}"!')
|
||||||
|
@ -56,6 +56,9 @@ def _get_queue_path(ucfg):
|
|||||||
|
|
||||||
|
|
||||||
def _get_upload_path(ucfg, uuid, write=False):
|
def _get_upload_path(ucfg, uuid, write=False):
|
||||||
|
# Make sure no path elements are present
|
||||||
|
uuid = os.path.basename(uuid)
|
||||||
|
|
||||||
path = os.path.join(_get_queue_path(ucfg), f"{uuid}.toml")
|
path = os.path.join(_get_queue_path(ucfg), f"{uuid}.toml")
|
||||||
if write and not os.path.exists(path):
|
if write and not os.path.exists(path):
|
||||||
open(path, "a").close()
|
open(path, "a").close()
|
||||||
|
@ -22,6 +22,7 @@ import unittest
|
|||||||
import lifted.config
|
import lifted.config
|
||||||
from lifted.providers import list_providers, resolve_provider, resolve_playbook_path, save_settings
|
from lifted.providers import list_providers, resolve_provider, resolve_playbook_path, save_settings
|
||||||
from lifted.providers import load_profiles, validate_settings, load_settings, delete_profile
|
from lifted.providers import load_profiles, validate_settings, load_settings, delete_profile
|
||||||
|
from lifted.providers import _get_profile_path
|
||||||
import pylorax.api.config
|
import pylorax.api.config
|
||||||
from pylorax.sysutils import joinpaths
|
from pylorax.sysutils import joinpaths
|
||||||
|
|
||||||
@ -39,6 +40,14 @@ class ProvidersTestCase(unittest.TestCase):
|
|||||||
def tearDownClass(self):
|
def tearDownClass(self):
|
||||||
shutil.rmtree(self.root_dir)
|
shutil.rmtree(self.root_dir)
|
||||||
|
|
||||||
|
def test_get_profile_path(self):
|
||||||
|
"""Make sure that _get_profile_path strips path elements from the input"""
|
||||||
|
path = _get_profile_path(self.config["upload"], "azure", "staging-settings", exists=False)
|
||||||
|
self.assertEqual(path, os.path.abspath(joinpaths(self.config["upload"]["settings_dir"], "azure/staging-settings.toml")))
|
||||||
|
|
||||||
|
path = _get_profile_path(self.config["upload"], "../../../../foo/bar/azure", "/not/my/path/staging-settings", exists=False)
|
||||||
|
self.assertEqual(path, os.path.abspath(joinpaths(self.config["upload"]["settings_dir"], "azure/staging-settings.toml")))
|
||||||
|
|
||||||
def test_list_providers(self):
|
def test_list_providers(self):
|
||||||
p = list_providers(self.config["upload"])
|
p = list_providers(self.config["upload"])
|
||||||
self.assertEqual(p, ['azure', 'dummy', 'openstack', 'vsphere'])
|
self.assertEqual(p, ['azure', 'dummy', 'openstack', 'vsphere'])
|
||||||
|
Loading…
Reference in New Issue
Block a user