diff --git a/src/pylorax/api/regexes.py b/src/pylorax/api/regexes.py
new file mode 100644
index 00000000..7be3415e
--- /dev/null
+++ b/src/pylorax/api/regexes.py
@@ -0,0 +1,21 @@
+#
+# Copyright (C) 2018 Red Hat, Inc.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+#
+import re
+
+# These are the characters that we allow to be passed in via the
+# API calls.
+VALID_API_STRING = re.compile(r'^[a-zA-Z0-9_,.:*-]+$')
diff --git a/src/pylorax/api/v0.py b/src/pylorax/api/v0.py
index 72d3dbc7..3317a552 100644
--- a/src/pylorax/api/v0.py
+++ b/src/pylorax/api/v0.py
@@ -991,6 +991,7 @@ from pylorax.api.queue import uuid_tar, uuid_image, uuid_cancel, uuid_log
from pylorax.api.recipes import list_branch_files, read_recipe_commit, recipe_filename, list_commits
from pylorax.api.recipes import recipe_from_dict, recipe_from_toml, commit_recipe, delete_recipe, revert_recipe
from pylorax.api.recipes import tag_recipe_commit, recipe_diff
+from pylorax.api.regexes import VALID_API_STRING
from pylorax.api.workspace import workspace_read, workspace_write, workspace_delete
# The API functions don't actually get called by any code here
@@ -1032,6 +1033,9 @@ def v0_api(api):
@checkparams([("blueprint_names", "", "no blueprint names given")])
def v0_blueprints_info(blueprint_names):
"""Return the contents of the blueprint, or a list of blueprints"""
+ if VALID_API_STRING.match(blueprint_names) is None:
+ return jsonify(status=False, errors=["Invalid characters in API path"]), 400
+
branch = request.args.get("branch", "master")
out_fmt = request.args.get("format", "json")
blueprints = []
@@ -1090,6 +1094,9 @@ def v0_api(api):
@checkparams([("blueprint_names", "", "no blueprint names given")])
def v0_blueprints_changes(blueprint_names):
"""Return the changes to a blueprint or list of blueprints"""
+ if VALID_API_STRING.match(blueprint_names) is None:
+ return jsonify(status=False, errors=["Invalid characters in API path"]), 400
+
branch = request.args.get("branch", "master")
try:
limit = int(request.args.get("limit", "20"))
@@ -1126,6 +1133,9 @@ def v0_api(api):
else:
blueprint = recipe_from_dict(request.get_json(cache=False))
+ if VALID_API_STRING.match(blueprint["name"]) is None:
+ return jsonify(status=False, errors=["Invalid characters in API path"]), 400
+
with api.config["GITLOCK"].lock:
commit_recipe(api.config["GITLOCK"].repo, branch, blueprint)
@@ -1144,6 +1154,9 @@ def v0_api(api):
@checkparams([("blueprint_name", "", "no blueprint name given")])
def v0_blueprints_delete(blueprint_name):
"""Delete a blueprint from git"""
+ if VALID_API_STRING.match(blueprint_name) is None:
+ return jsonify(status=False, errors=["Invalid characters in API path"]), 400
+
branch = request.args.get("branch", "master")
try:
with api.config["GITLOCK"].lock:
@@ -1165,6 +1178,9 @@ def v0_api(api):
else:
blueprint = recipe_from_dict(request.get_json(cache=False))
+ if VALID_API_STRING.match(blueprint["name"]) is None:
+ return jsonify(status=False, errors=["Invalid characters in API path"]), 400
+
with api.config["GITLOCK"].lock:
workspace_write(api.config["GITLOCK"].repo, branch, blueprint)
except Exception as e:
@@ -1179,6 +1195,9 @@ def v0_api(api):
@checkparams([("blueprint_name", "", "no blueprint name given")])
def v0_blueprints_delete_workspace(blueprint_name):
"""Delete a blueprint from the workspace"""
+ if VALID_API_STRING.match(blueprint_name) is None:
+ return jsonify(status=False, errors=["Invalid characters in API path"]), 400
+
branch = request.args.get("branch", "master")
try:
with api.config["GITLOCK"].lock:
@@ -1197,6 +1216,9 @@ def v0_api(api):
("commit", "", "no commit ID given")])
def v0_blueprints_undo(blueprint_name, commit):
"""Undo changes to a blueprint by reverting to a previous commit."""
+ if VALID_API_STRING.match(blueprint_name) is None:
+ return jsonify(status=False, errors=["Invalid characters in API path"]), 400
+
branch = request.args.get("branch", "master")
try:
with api.config["GITLOCK"].lock:
@@ -1217,6 +1239,9 @@ def v0_api(api):
@checkparams([("blueprint_name", "", "no blueprint name given")])
def v0_blueprints_tag(blueprint_name):
"""Tag a blueprint's latest blueprint commit as a 'revision'"""
+ if VALID_API_STRING.match(blueprint_name) is None:
+ return jsonify(status=False, errors=["Invalid characters in API path"]), 400
+
branch = request.args.get("branch", "master")
try:
with api.config["GITLOCK"].lock:
@@ -1237,6 +1262,10 @@ def v0_api(api):
("to_commit", "", "no to commit ID given")])
def v0_blueprints_diff(blueprint_name, from_commit, to_commit):
"""Return the differences between two commits of a blueprint"""
+ for s in [blueprint_name, from_commit, to_commit]:
+ if VALID_API_STRING.match(s) is None:
+ return jsonify(status=False, errors=["Invalid characters in API path"]), 400
+
branch = request.args.get("branch", "master")
try:
if from_commit == "NEWEST":
@@ -1276,6 +1305,9 @@ def v0_api(api):
@checkparams([("blueprint_names", "", "no blueprint names given")])
def v0_blueprints_freeze(blueprint_names):
"""Return the blueprint with the exact modules and packages selected by depsolve"""
+ if VALID_API_STRING.match(blueprint_names) is None:
+ return jsonify(status=False, errors=["Invalid characters in API path"]), 400
+
branch = request.args.get("branch", "master")
out_fmt = request.args.get("format", "json")
blueprints = []
@@ -1331,6 +1363,9 @@ def v0_api(api):
@checkparams([("blueprint_names", "", "no blueprint names given")])
def v0_blueprints_depsolve(blueprint_names):
"""Return the dependencies for a blueprint"""
+ if VALID_API_STRING.match(blueprint_names) is None:
+ return jsonify(status=False, errors=["Invalid characters in API path"]), 400
+
branch = request.args.get("branch", "master")
blueprints = []
errors = []
@@ -1408,6 +1443,9 @@ def v0_api(api):
@checkparams([("project_names", "", "no project names given")])
def v0_projects_info(project_names):
"""Return detailed information about the listed projects"""
+ if VALID_API_STRING.match(project_names) is None:
+ return jsonify(status=False, errors=["Invalid characters in API path"]), 400
+
try:
with api.config["DNFLOCK"].lock:
projects = projects_info(api.config["DNFLOCK"].dbo, project_names.split(","))
@@ -1423,6 +1461,9 @@ def v0_api(api):
@checkparams([("project_names", "", "no project names given")])
def v0_projects_depsolve(project_names):
"""Return detailed information about the listed projects"""
+ if VALID_API_STRING.match(project_names) is None:
+ return jsonify(status=False, errors=["Invalid characters in API path"]), 400
+
try:
with api.config["DNFLOCK"].lock:
deps = projects_depsolve(api.config["DNFLOCK"].dbo, [(n, "*") for n in project_names.split(",")], [])
@@ -1447,6 +1488,9 @@ def v0_api(api):
@checkparams([("source_names", "", "no source names given")])
def v0_projects_source_info(source_names):
"""Return detailed info about the list of sources"""
+ if VALID_API_STRING.match(source_names) is None:
+ return jsonify(status=False, errors=["Invalid characters in API path"]), 400
+
out_fmt = request.args.get("format", "json")
# Return info on all of the sources
@@ -1540,6 +1584,9 @@ def v0_api(api):
@checkparams([("source_name", "", "no source name given")])
def v0_projects_source_delete(source_name):
"""Delete the named source and return a status response"""
+ if VALID_API_STRING.match(source_name) is None:
+ return jsonify(status=False, errors=["Invalid characters in API path"]), 400
+
system_sources = get_repo_sources("/etc/yum.repos.d/*.repo")
if source_name in system_sources:
return jsonify(status=False, errors=["%s is a system source, it cannot be deleted." % source_name]), 400
@@ -1567,6 +1614,9 @@ def v0_api(api):
@crossdomain(origin="*")
def v0_modules_list(module_names=None):
"""List available modules, filtering by module_names"""
+ if module_names and VALID_API_STRING.match(module_names) is None:
+ return jsonify(status=False, errors=["Invalid characters in API path"]), 400
+
try:
limit = int(request.args.get("limit", "20"))
offset = int(request.args.get("offset", "0"))
@@ -1592,6 +1642,8 @@ def v0_api(api):
@checkparams([("module_names", "", "no module names given")])
def v0_modules_info(module_names):
"""Return detailed information about the listed modules"""
+ if VALID_API_STRING.match(module_names) is None:
+ return jsonify(status=False, errors=["Invalid characters in API path"]), 400
try:
with api.config["DNFLOCK"].lock:
modules = modules_info(api.config["DNFLOCK"].dbo, module_names.split(","))
@@ -1644,6 +1696,9 @@ def v0_api(api):
else:
compose_type = compose["compose_type"]
+ if VALID_API_STRING.match(blueprint_name) is None:
+ errors.append("Invalid characters in API path")
+
if errors:
return jsonify(status=False, errors=errors), 400
@@ -1689,6 +1744,9 @@ def v0_api(api):
@checkparams([("uuids", "", "no UUIDs given")])
def v0_compose_status(uuids):
"""Return the status of the listed uuids"""
+ if VALID_API_STRING.match(uuids) is None:
+ return jsonify(status=False, errors=["Invalid characters in API path"]), 400
+
results = []
for uuid in [n.strip().lower() for n in uuids.split(",")]:
details = uuid_status(api.config["COMPOSER_CFG"], uuid)
@@ -1703,6 +1761,9 @@ def v0_api(api):
@checkparams([("uuid", "", "no UUID given")])
def v0_compose_cancel(uuid):
"""Cancel a running compose and delete its results directory"""
+ if VALID_API_STRING.match(uuid) is None:
+ return jsonify(status=False, errors=["Invalid characters in API path"]), 400
+
status = uuid_status(api.config["COMPOSER_CFG"], uuid)
if status is None:
return jsonify(status=False, errors=["%s is not a valid build uuid" % uuid]), 400
@@ -1723,6 +1784,9 @@ def v0_api(api):
@checkparams([("uuids", "", "no UUIDs given")])
def v0_compose_delete(uuids):
"""Delete the compose results for the listed uuids"""
+ if VALID_API_STRING.match(uuids) is None:
+ return jsonify(status=False, errors=["Invalid characters in API path"]), 400
+
results = []
errors = []
for uuid in [n.strip().lower() for n in uuids.split(",")]:
@@ -1746,6 +1810,9 @@ def v0_api(api):
@checkparams([("uuid", "", "no UUID given")])
def v0_compose_info(uuid):
"""Return detailed info about a compose"""
+ if VALID_API_STRING.match(uuid) is None:
+ return jsonify(status=False, errors=["Invalid characters in API path"]), 400
+
try:
info = uuid_info(api.config["COMPOSER_CFG"], uuid)
except Exception as e:
@@ -1759,6 +1826,9 @@ def v0_api(api):
@checkparams([("uuid","", "no UUID given")])
def v0_compose_metadata(uuid):
"""Return a tar of the metadata for the build"""
+ if VALID_API_STRING.match(uuid) is None:
+ return jsonify(status=False, errors=["Invalid characters in API path"]), 400
+
status = uuid_status(api.config["COMPOSER_CFG"], uuid)
if status is None:
return jsonify(status=False, errors=["%s is not a valid build uuid" % uuid]), 400
@@ -1776,6 +1846,9 @@ def v0_api(api):
@checkparams([("uuid","", "no UUID given")])
def v0_compose_results(uuid):
"""Return a tar of the metadata and the results for the build"""
+ if VALID_API_STRING.match(uuid) is None:
+ return jsonify(status=False, errors=["Invalid characters in API path"]), 400
+
status = uuid_status(api.config["COMPOSER_CFG"], uuid)
if status is None:
return jsonify(status=False, errors=["%s is not a valid build uuid" % uuid]), 400
@@ -1793,6 +1866,9 @@ def v0_api(api):
@checkparams([("uuid","", "no UUID given")])
def v0_compose_logs(uuid):
"""Return a tar of the metadata for the build"""
+ if VALID_API_STRING.match(uuid) is None:
+ return jsonify(status=False, errors=["Invalid characters in API path"]), 400
+
status = uuid_status(api.config["COMPOSER_CFG"], uuid)
if status is None:
return jsonify(status=False, errors=["%s is not a valid build uuid" % uuid]), 400
@@ -1810,6 +1886,9 @@ def v0_api(api):
@checkparams([("uuid","", "no UUID given")])
def v0_compose_image(uuid):
"""Return the output image for the build"""
+ if VALID_API_STRING.match(uuid) is None:
+ return jsonify(status=False, errors=["Invalid characters in API path"]), 400
+
status = uuid_status(api.config["COMPOSER_CFG"], uuid)
if status is None:
return jsonify(status=False, errors=["%s is not a valid build uuid" % uuid]), 400
@@ -1833,6 +1912,9 @@ def v0_api(api):
@checkparams([("uuid","", "no UUID given")])
def v0_compose_log_tail(uuid):
"""Return the end of the main anaconda.log, defaults to 1Mbytes"""
+ if VALID_API_STRING.match(uuid) is None:
+ return jsonify(status=False, errors=["Invalid characters in API path"]), 400
+
try:
size = int(request.args.get("size", "1024"))
except ValueError as e:
diff --git a/tests/pylorax/test_server.py b/tests/pylorax/test_server.py
index ba6011b2..ba4c191a 100644
--- a/tests/pylorax/test_server.py
+++ b/tests/pylorax/test_server.py
@@ -1,3 +1,4 @@
+# -*- coding: UTF-8 -*-
#
# Copyright (C) 2017 Red Hat, Inc.
#
@@ -32,6 +33,9 @@ from pylorax.api.server import server, GitLock, DNFLock
from pylorax.api.dnfbase import get_base_object
from pylorax.sysutils import joinpaths
+# Used for testing UTF-8 input support
+UTF8_TEST_STRING = "I ο½πll πΞΏπ π ο½π°κ± πππΈπ".decode("utf-8")
+
def get_system_repo():
"""Get an enabled system repo from /etc/yum.repos.d/*repo
@@ -1021,3 +1025,167 @@ class ServerTestCase(unittest.TestCase):
data = json.loads(resp.data)
self.assertNotEqual(data, None)
self.assertEqual(data["finished"], [], "Failed to delete the failed build: %s" % data)
+
+ def assertInputError(self, resp):
+ """Check all the conditions for a successful input check error result"""
+ data = json.loads(resp.data)
+ self.assertNotEqual(data, None)
+ self.assertEqual(resp.status_code, 400)
+ self.assertEqual(data["status"], False)
+ self.assertEqual(data["errors"], ["Invalid characters in API path"])
+
+ def test_blueprints_info_input(self):
+ """Test the blueprints/info input character checking"""
+ # /api/v0/blueprints/info/
+ resp = self.server.get("/api/v0/blueprints/info/" + UTF8_TEST_STRING)
+ self.assertInputError(resp)
+
+ def test_blueprints_changes_input(self):
+ """Test the blueprints/changes input character checking"""
+ # /api/v0/blueprints/changes/
+ resp = self.server.get("/api/v0/blueprints/changes/" + UTF8_TEST_STRING)
+ self.assertInputError(resp)
+
+ def test_blueprints_new_input(self):
+ """Test the blueprints/new input character checking"""
+ # /api/v0/blueprints/new
+ test_blueprint = {"description": "An example GlusterFS server with samba",
+ "name":UTF8_TEST_STRING,
+ "version": "0.2.0",
+ "modules":[{"name":"glusterfs", "version":"3.*.*"},
+ {"name":"glusterfs-cli", "version":"3.*.*"}],
+ "packages":[{"name":"samba", "version":"4.*.*"},
+ {"name":"tmux", "version":"1.8"}],
+ "groups": []}
+
+ resp = self.server.post("/api/v0/blueprints/new",
+ data=json.dumps(test_blueprint),
+ content_type="application/json")
+ self.assertInputError(resp)
+
+ def test_blueprints_delete_input(self):
+ """Test the blueprints/delete input character checking"""
+ resp = self.server.delete("/api/v0/blueprints/delete/" + UTF8_TEST_STRING)
+ self.assertInputError(resp)
+
+ def test_blueprints_workspace_input(self):
+ """Test the blueprints/workspace input character checking"""
+ test_blueprint = {"description": "An example GlusterFS server with samba, ws version",
+ "name":UTF8_TEST_STRING,
+ "version": "0.3.0",
+ "modules":[{"name":"glusterfs", "version":"3.*.*"},
+ {"name":"glusterfs-cli", "version":"3.*.*"}],
+ "packages":[{"name":"samba", "version":"4.*.*"},
+ {"name":"tmux", "version":"1.8"}],
+ "groups": []}
+
+ resp = self.server.post("/api/v0/blueprints/workspace",
+ data=json.dumps(test_blueprint),
+ content_type="application/json")
+ self.assertInputError(resp)
+
+ def test_blueprints_workspace_delete_input(self):
+ """Test the DELETE blueprints/workspace input character checking"""
+ resp = self.server.delete("/api/v0/blueprints/workspace/" + UTF8_TEST_STRING)
+ self.assertInputError(resp)
+
+ def test_blueprints_undo_input(self):
+ """Test the blueprints/undo/... input character checking"""
+ resp = self.server.post("/api/v0/blueprints/undo/" + UTF8_TEST_STRING + "/deadbeef")
+ self.assertInputError(resp)
+
+ def test_blueprints_tag_input(self):
+ """Test the blueprints/tag input character checking"""
+ resp = self.server.post("/api/v0/blueprints/tag/" + UTF8_TEST_STRING)
+ self.assertInputError(resp)
+
+ def test_blueprints_diff_input(self):
+ """Test the blueprints/diff input character checking"""
+ # /api/v0/blueprints/diff///
+ resp = self.server.get("/api/v0/blueprints/diff/" + UTF8_TEST_STRING + "/NEWEST/WORKSPACE")
+ self.assertInputError(resp)
+
+ def test_blueprints_freeze_input(self):
+ """Test the blueprints/freeze input character checking"""
+ resp = self.server.get("/api/v0/blueprints/freeze/" + UTF8_TEST_STRING)
+ self.assertInputError(resp)
+
+ def test_blueprints_depsolve_input(self):
+ """Test the blueprints/depsolve input character checking"""
+ resp = self.server.get("/api/v0/blueprints/depsolve/" + UTF8_TEST_STRING)
+ self.assertInputError(resp)
+
+ def test_projects_info_input(self):
+ """Test the projects/info input character checking"""
+ resp = self.server.get("/api/v0/projects/info/" + UTF8_TEST_STRING)
+ self.assertInputError(resp)
+
+ def test_projects_depsolve_input(self):
+ """Test the projects/depsolve input character checking"""
+ resp = self.server.get("/api/v0/projects/depsolve/" + UTF8_TEST_STRING)
+ self.assertInputError(resp)
+
+ def test_projects_source_info_input(self):
+ """Test the projects/source/info input character checking"""
+ resp = self.server.get("/api/v0/projects/source/info/" + UTF8_TEST_STRING)
+ self.assertInputError(resp)
+
+ def test_projects_source_delete_input(self):
+ """Test the projects/source/delete input character checking"""
+ resp = self.server.delete("/api/v0/projects/source/delete/" + UTF8_TEST_STRING)
+ self.assertInputError(resp)
+
+ def test_modules_list_input(self):
+ """Test the modules/list input character checking"""
+ resp = self.server.get("/api/v0/modules/list/" + UTF8_TEST_STRING)
+ self.assertInputError(resp)
+
+ def test_modules_info_input(self):
+ """Test the modules/info input character checking"""
+ resp = self.server.get("/api/v0/modules/info/" + UTF8_TEST_STRING)
+ self.assertInputError(resp)
+
+ def test_compose_status_input(self):
+ """Test the compose/status input character checking"""
+ resp = self.server.get("/api/v0/compose/status/" + UTF8_TEST_STRING)
+ self.assertInputError(resp)
+
+ def test_compose_cancel_input(self):
+ """Test the compose/cancel input character checking"""
+ resp = self.server.delete("/api/v0/compose/cancel/" + UTF8_TEST_STRING)
+ self.assertInputError(resp)
+
+ def test_compose_delete_input(self):
+ """Test the compose/delete input character checking"""
+ resp = self.server.delete("/api/v0/compose/delete/" + UTF8_TEST_STRING)
+ self.assertInputError(resp)
+
+ def test_compose_info_input(self):
+ """Test the compose/info input character checking"""
+ resp = self.server.get("/api/v0/compose/info/" + UTF8_TEST_STRING)
+ self.assertInputError(resp)
+
+ def test_compose_metadata_input(self):
+ """Test the compose/metadata input character checking"""
+ resp = self.server.get("/api/v0/compose/metadata/" + UTF8_TEST_STRING)
+ self.assertInputError(resp)
+
+ def test_compose_results_input(self):
+ """Test the compose/results input character checking"""
+ resp = self.server.get("/api/v0/compose/results/" + UTF8_TEST_STRING)
+ self.assertInputError(resp)
+
+ def test_compose_logs_input(self):
+ """Test the compose/logs input character checking"""
+ resp = self.server.get("/api/v0/compose/logs/" + UTF8_TEST_STRING)
+ self.assertInputError(resp)
+
+ def test_compose_image_input(self):
+ """Test the compose/image input character checking"""
+ resp = self.server.get("/api/v0/compose/image/" + UTF8_TEST_STRING)
+ self.assertInputError(resp)
+
+ def test_compose_log_input(self):
+ """Test the compose/log input character checking"""
+ resp = self.server.get("/api/v0/compose/log/" + UTF8_TEST_STRING)
+ self.assertInputError(resp)