Add input string checks to the branch and format arguments

Make sure no UTF8 characters are allowed and return an error if they
are.

Also includes tests to make sure the correct error is returned.
This commit is contained in:
Brian C. Lane 2018-08-06 14:21:24 -07:00
parent 74f5def3d4
commit 86d79cd8a6
2 changed files with 96 additions and 2 deletions

View File

@ -1018,6 +1018,9 @@ def v0_api(api):
def v0_blueprints_list(): def v0_blueprints_list():
"""List the available blueprints on a branch.""" """List the available blueprints on a branch."""
branch = request.args.get("branch", "master") branch = request.args.get("branch", "master")
if VALID_API_STRING.match(branch) is None:
return jsonify(status=False, errors=["Invalid characters in branch argument"]), 400
try: try:
limit = int(request.args.get("limit", "20")) limit = int(request.args.get("limit", "20"))
offset = int(request.args.get("offset", "0")) offset = int(request.args.get("offset", "0"))
@ -1038,7 +1041,13 @@ def v0_api(api):
return jsonify(status=False, errors=["Invalid characters in API path"]), 400 return jsonify(status=False, errors=["Invalid characters in API path"]), 400
branch = request.args.get("branch", "master") branch = request.args.get("branch", "master")
if VALID_API_STRING.match(branch) is None:
return jsonify(status=False, errors=["Invalid characters in branch argument"]), 400
out_fmt = request.args.get("format", "json") out_fmt = request.args.get("format", "json")
if VALID_API_STRING.match(out_fmt) is None:
return jsonify(status=False, errors=["Invalid characters in format argument"]), 400
blueprints = [] blueprints = []
changes = [] changes = []
errors = [] errors = []
@ -1099,6 +1108,9 @@ def v0_api(api):
return jsonify(status=False, errors=["Invalid characters in API path"]), 400 return jsonify(status=False, errors=["Invalid characters in API path"]), 400
branch = request.args.get("branch", "master") branch = request.args.get("branch", "master")
if VALID_API_STRING.match(branch) is None:
return jsonify(status=False, errors=["Invalid characters in branch argument"]), 400
try: try:
limit = int(request.args.get("limit", "20")) limit = int(request.args.get("limit", "20"))
offset = int(request.args.get("offset", "0")) offset = int(request.args.get("offset", "0"))
@ -1128,6 +1140,9 @@ def v0_api(api):
def v0_blueprints_new(): def v0_blueprints_new():
"""Commit a new blueprint""" """Commit a new blueprint"""
branch = request.args.get("branch", "master") branch = request.args.get("branch", "master")
if VALID_API_STRING.match(branch) is None:
return jsonify(status=False, errors=["Invalid characters in branch argument"]), 400
try: try:
if request.headers['Content-Type'] == "text/x-toml": if request.headers['Content-Type'] == "text/x-toml":
blueprint = recipe_from_toml(request.data) blueprint = recipe_from_toml(request.data)
@ -1159,6 +1174,9 @@ def v0_api(api):
return jsonify(status=False, errors=["Invalid characters in API path"]), 400 return jsonify(status=False, errors=["Invalid characters in API path"]), 400
branch = request.args.get("branch", "master") branch = request.args.get("branch", "master")
if VALID_API_STRING.match(branch) is None:
return jsonify(status=False, errors=["Invalid characters in branch argument"]), 400
try: try:
with api.config["GITLOCK"].lock: with api.config["GITLOCK"].lock:
delete_recipe(api.config["GITLOCK"].repo, branch, blueprint_name) delete_recipe(api.config["GITLOCK"].repo, branch, blueprint_name)
@ -1173,6 +1191,9 @@ def v0_api(api):
def v0_blueprints_workspace(): def v0_blueprints_workspace():
"""Write a blueprint to the workspace""" """Write a blueprint to the workspace"""
branch = request.args.get("branch", "master") branch = request.args.get("branch", "master")
if VALID_API_STRING.match(branch) is None:
return jsonify(status=False, errors=["Invalid characters in branch argument"]), 400
try: try:
if request.headers['Content-Type'] == "text/x-toml": if request.headers['Content-Type'] == "text/x-toml":
blueprint = recipe_from_toml(request.data) blueprint = recipe_from_toml(request.data)
@ -1200,6 +1221,9 @@ def v0_api(api):
return jsonify(status=False, errors=["Invalid characters in API path"]), 400 return jsonify(status=False, errors=["Invalid characters in API path"]), 400
branch = request.args.get("branch", "master") branch = request.args.get("branch", "master")
if VALID_API_STRING.match(branch) is None:
return jsonify(status=False, errors=["Invalid characters in branch argument"]), 400
try: try:
with api.config["GITLOCK"].lock: with api.config["GITLOCK"].lock:
workspace_delete(api.config["GITLOCK"].repo, branch, blueprint_name) workspace_delete(api.config["GITLOCK"].repo, branch, blueprint_name)
@ -1221,6 +1245,9 @@ def v0_api(api):
return jsonify(status=False, errors=["Invalid characters in API path"]), 400 return jsonify(status=False, errors=["Invalid characters in API path"]), 400
branch = request.args.get("branch", "master") branch = request.args.get("branch", "master")
if VALID_API_STRING.match(branch) is None:
return jsonify(status=False, errors=["Invalid characters in branch argument"]), 400
try: try:
with api.config["GITLOCK"].lock: with api.config["GITLOCK"].lock:
revert_recipe(api.config["GITLOCK"].repo, branch, blueprint_name, commit) revert_recipe(api.config["GITLOCK"].repo, branch, blueprint_name, commit)
@ -1244,6 +1271,9 @@ def v0_api(api):
return jsonify(status=False, errors=["Invalid characters in API path"]), 400 return jsonify(status=False, errors=["Invalid characters in API path"]), 400
branch = request.args.get("branch", "master") branch = request.args.get("branch", "master")
if VALID_API_STRING.match(branch) is None:
return jsonify(status=False, errors=["Invalid characters in branch argument"]), 400
try: try:
with api.config["GITLOCK"].lock: with api.config["GITLOCK"].lock:
tag_recipe_commit(api.config["GITLOCK"].repo, branch, blueprint_name) tag_recipe_commit(api.config["GITLOCK"].repo, branch, blueprint_name)
@ -1268,6 +1298,9 @@ def v0_api(api):
return jsonify(status=False, errors=["Invalid characters in API path"]), 400 return jsonify(status=False, errors=["Invalid characters in API path"]), 400
branch = request.args.get("branch", "master") branch = request.args.get("branch", "master")
if VALID_API_STRING.match(branch) is None:
return jsonify(status=False, errors=["Invalid characters in branch argument"]), 400
try: try:
if from_commit == "NEWEST": if from_commit == "NEWEST":
with api.config["GITLOCK"].lock: with api.config["GITLOCK"].lock:
@ -1310,7 +1343,13 @@ def v0_api(api):
return jsonify(status=False, errors=["Invalid characters in API path"]), 400 return jsonify(status=False, errors=["Invalid characters in API path"]), 400
branch = request.args.get("branch", "master") branch = request.args.get("branch", "master")
if VALID_API_STRING.match(branch) is None:
return jsonify(status=False, errors=["Invalid characters in branch argument"]), 400
out_fmt = request.args.get("format", "json") out_fmt = request.args.get("format", "json")
if VALID_API_STRING.match(out_fmt) is None:
return jsonify(status=False, errors=["Invalid characters in format argument"]), 400
blueprints = [] blueprints = []
errors = [] errors = []
for blueprint_name in [n.strip() for n in sorted(blueprint_names.split(","), key=lambda n: n.lower())]: for blueprint_name in [n.strip() for n in sorted(blueprint_names.split(","), key=lambda n: n.lower())]:
@ -1368,6 +1407,9 @@ def v0_api(api):
return jsonify(status=False, errors=["Invalid characters in API path"]), 400 return jsonify(status=False, errors=["Invalid characters in API path"]), 400
branch = request.args.get("branch", "master") branch = request.args.get("branch", "master")
if VALID_API_STRING.match(branch) is None:
return jsonify(status=False, errors=["Invalid characters in branch argument"]), 400
blueprints = [] blueprints = []
errors = [] errors = []
for blueprint_name in [n.strip() for n in sorted(blueprint_names.split(","), key=lambda n: n.lower())]: for blueprint_name in [n.strip() for n in sorted(blueprint_names.split(","), key=lambda n: n.lower())]:
@ -1492,6 +1534,8 @@ def v0_api(api):
return jsonify(status=False, errors=["Invalid characters in API path"]), 400 return jsonify(status=False, errors=["Invalid characters in API path"]), 400
out_fmt = request.args.get("format", "json") out_fmt = request.args.get("format", "json")
if VALID_API_STRING.match(out_fmt) is None:
return jsonify(status=False, errors=["Invalid characters in format argument"]), 400
# Return info on all of the sources # Return info on all of the sources
if source_names == "*": if source_names == "*":
@ -1660,7 +1704,7 @@ def v0_api(api):
if not modules: if not modules:
msg = "one of the requested modules does not exist: %s" % module_names msg = "one of the requested modules does not exist: %s" % module_names
log.error("(v0_modules_info) %s" % msg) log.error("(v0_modules_info) %s", msg)
return jsonify(status=False, errors=[msg]), 400 return jsonify(status=False, errors=[msg]), 400
return jsonify(modules=modules) return jsonify(modules=modules)

View File

@ -1022,7 +1022,12 @@ class ServerTestCase(unittest.TestCase):
self.assertNotEqual(data, None) self.assertNotEqual(data, None)
self.assertEqual(resp.status_code, 400) self.assertEqual(resp.status_code, 400)
self.assertEqual(data["status"], False) self.assertEqual(data["status"], False)
self.assertEqual(data["errors"], ["Invalid characters in API path"]) self.assertTrue(len(data["errors"]) > 0)
self.assertTrue("Invalid characters in" in data["errors"][0])
def test_blueprints_list_branch(self):
resp = self.server.get("/api/v0/blueprints/list?branch=" + UTF8_TEST_STRING)
self.assertInputError(resp)
def test_blueprints_info_input(self): def test_blueprints_info_input(self):
"""Test the blueprints/info input character checking""" """Test the blueprints/info input character checking"""
@ -1030,12 +1035,21 @@ class ServerTestCase(unittest.TestCase):
resp = self.server.get("/api/v0/blueprints/info/" + UTF8_TEST_STRING) resp = self.server.get("/api/v0/blueprints/info/" + UTF8_TEST_STRING)
self.assertInputError(resp) self.assertInputError(resp)
resp = self.server.get("/api/v0/blueprints/info/http-server?branch=" + UTF8_TEST_STRING)
self.assertInputError(resp)
resp = self.server.get("/api/v0/blueprints/info/http-server?format=" + UTF8_TEST_STRING)
self.assertInputError(resp)
def test_blueprints_changes_input(self): def test_blueprints_changes_input(self):
"""Test the blueprints/changes input character checking""" """Test the blueprints/changes input character checking"""
# /api/v0/blueprints/changes/<blueprint_names> # /api/v0/blueprints/changes/<blueprint_names>
resp = self.server.get("/api/v0/blueprints/changes/" + UTF8_TEST_STRING) resp = self.server.get("/api/v0/blueprints/changes/" + UTF8_TEST_STRING)
self.assertInputError(resp) self.assertInputError(resp)
resp = self.server.get("/api/v0/blueprints/changes/http-server?branch=" + UTF8_TEST_STRING)
self.assertInputError(resp)
def test_blueprints_new_input(self): def test_blueprints_new_input(self):
"""Test the blueprints/new input character checking""" """Test the blueprints/new input character checking"""
# /api/v0/blueprints/new # /api/v0/blueprints/new
@ -1053,11 +1067,20 @@ class ServerTestCase(unittest.TestCase):
content_type="application/json") content_type="application/json")
self.assertInputError(resp) self.assertInputError(resp)
test_blueprint["name"] = "glusterfs"
resp = self.server.post("/api/v0/blueprints/new?branch=" + UTF8_TEST_STRING,
data=json.dumps(test_blueprint),
content_type="application/json")
self.assertInputError(resp)
def test_blueprints_delete_input(self): def test_blueprints_delete_input(self):
"""Test the blueprints/delete input character checking""" """Test the blueprints/delete input character checking"""
resp = self.server.delete("/api/v0/blueprints/delete/" + UTF8_TEST_STRING) resp = self.server.delete("/api/v0/blueprints/delete/" + UTF8_TEST_STRING)
self.assertInputError(resp) self.assertInputError(resp)
resp = self.server.delete("/api/v0/blueprints/delete/http-server?branch=" + UTF8_TEST_STRING)
self.assertInputError(resp)
def test_blueprints_workspace_input(self): def test_blueprints_workspace_input(self):
"""Test the blueprints/workspace input character checking""" """Test the blueprints/workspace input character checking"""
test_blueprint = {"description": "An example GlusterFS server with samba, ws version", test_blueprint = {"description": "An example GlusterFS server with samba, ws version",
@ -1074,37 +1097,64 @@ class ServerTestCase(unittest.TestCase):
content_type="application/json") content_type="application/json")
self.assertInputError(resp) self.assertInputError(resp)
test_blueprint["name"] = "glusterfs"
resp = self.server.post("/api/v0/blueprints/workspace?branch=" + UTF8_TEST_STRING,
data=json.dumps(test_blueprint),
content_type="application/json")
self.assertInputError(resp)
def test_blueprints_workspace_delete_input(self): def test_blueprints_workspace_delete_input(self):
"""Test the DELETE blueprints/workspace input character checking""" """Test the DELETE blueprints/workspace input character checking"""
resp = self.server.delete("/api/v0/blueprints/workspace/" + UTF8_TEST_STRING) resp = self.server.delete("/api/v0/blueprints/workspace/" + UTF8_TEST_STRING)
self.assertInputError(resp) self.assertInputError(resp)
resp = self.server.delete("/api/v0/blueprints/workspace/http-server?branch=" + UTF8_TEST_STRING)
self.assertInputError(resp)
def test_blueprints_undo_input(self): def test_blueprints_undo_input(self):
"""Test the blueprints/undo/... input character checking""" """Test the blueprints/undo/... input character checking"""
resp = self.server.post("/api/v0/blueprints/undo/" + UTF8_TEST_STRING + "/deadbeef") resp = self.server.post("/api/v0/blueprints/undo/" + UTF8_TEST_STRING + "/deadbeef")
self.assertInputError(resp) self.assertInputError(resp)
resp = self.server.post("/api/v0/blueprints/undo/http-server/deadbeef?branch=" + UTF8_TEST_STRING)
self.assertInputError(resp)
def test_blueprints_tag_input(self): def test_blueprints_tag_input(self):
"""Test the blueprints/tag input character checking""" """Test the blueprints/tag input character checking"""
resp = self.server.post("/api/v0/blueprints/tag/" + UTF8_TEST_STRING) resp = self.server.post("/api/v0/blueprints/tag/" + UTF8_TEST_STRING)
self.assertInputError(resp) self.assertInputError(resp)
resp = self.server.post("/api/v0/blueprints/tag/http-server?branch=" + UTF8_TEST_STRING)
self.assertInputError(resp)
def test_blueprints_diff_input(self): def test_blueprints_diff_input(self):
"""Test the blueprints/diff input character checking""" """Test the blueprints/diff input character checking"""
# /api/v0/blueprints/diff/<blueprint_name>/<from_commit>/<to_commit> # /api/v0/blueprints/diff/<blueprint_name>/<from_commit>/<to_commit>
resp = self.server.get("/api/v0/blueprints/diff/" + UTF8_TEST_STRING + "/NEWEST/WORKSPACE") resp = self.server.get("/api/v0/blueprints/diff/" + UTF8_TEST_STRING + "/NEWEST/WORKSPACE")
self.assertInputError(resp) self.assertInputError(resp)
resp = self.server.get("/api/v0/blueprints/diff/http-server/NEWEST/WORKSPACE?branch=" + UTF8_TEST_STRING)
self.assertInputError(resp)
def test_blueprints_freeze_input(self): def test_blueprints_freeze_input(self):
"""Test the blueprints/freeze input character checking""" """Test the blueprints/freeze input character checking"""
resp = self.server.get("/api/v0/blueprints/freeze/" + UTF8_TEST_STRING) resp = self.server.get("/api/v0/blueprints/freeze/" + UTF8_TEST_STRING)
self.assertInputError(resp) self.assertInputError(resp)
resp = self.server.get("/api/v0/blueprints/freeze/http-server?branch=" + UTF8_TEST_STRING)
self.assertInputError(resp)
resp = self.server.get("/api/v0/blueprints/freeze/http-server?format=" + UTF8_TEST_STRING)
self.assertInputError(resp)
def test_blueprints_depsolve_input(self): def test_blueprints_depsolve_input(self):
"""Test the blueprints/depsolve input character checking""" """Test the blueprints/depsolve input character checking"""
resp = self.server.get("/api/v0/blueprints/depsolve/" + UTF8_TEST_STRING) resp = self.server.get("/api/v0/blueprints/depsolve/" + UTF8_TEST_STRING)
self.assertInputError(resp) self.assertInputError(resp)
resp = self.server.get("/api/v0/blueprints/depsolve/http-server?branch=" + UTF8_TEST_STRING)
self.assertInputError(resp)
def test_projects_info_input(self): def test_projects_info_input(self):
"""Test the projects/info input character checking""" """Test the projects/info input character checking"""
resp = self.server.get("/api/v0/projects/info/" + UTF8_TEST_STRING) resp = self.server.get("/api/v0/projects/info/" + UTF8_TEST_STRING)