Add input string checks to the branch and format arguments

Make sure no UTF8 characters are allowed and return an error if they
are.

Also includes tests to make sure the correct error is returned.

(cherry picked from commit 86d79cd8a6)
This commit is contained in:
Brian C. Lane 2018-08-06 14:21:24 -07:00
parent f5cdb94c9c
commit 2b0f9c4bd7
2 changed files with 95 additions and 1 deletions

View File

@ -1017,6 +1017,9 @@ def v0_api(api):
def v0_blueprints_list():
"""List the available blueprints on a branch."""
branch = request.args.get("branch", "master")
if VALID_API_STRING.match(branch) is None:
return jsonify(status=False, errors=["Invalid characters in branch argument"]), 400
try:
limit = int(request.args.get("limit", "20"))
offset = int(request.args.get("offset", "0"))
@ -1037,7 +1040,13 @@ def v0_api(api):
return jsonify(status=False, errors=["Invalid characters in API path"]), 400
branch = request.args.get("branch", "master")
if VALID_API_STRING.match(branch) is None:
return jsonify(status=False, errors=["Invalid characters in branch argument"]), 400
out_fmt = request.args.get("format", "json")
if VALID_API_STRING.match(out_fmt) is None:
return jsonify(status=False, errors=["Invalid characters in format argument"]), 400
blueprints = []
changes = []
errors = []
@ -1098,6 +1107,9 @@ def v0_api(api):
return jsonify(status=False, errors=["Invalid characters in API path"]), 400
branch = request.args.get("branch", "master")
if VALID_API_STRING.match(branch) is None:
return jsonify(status=False, errors=["Invalid characters in branch argument"]), 400
try:
limit = int(request.args.get("limit", "20"))
offset = int(request.args.get("offset", "0"))
@ -1127,6 +1139,9 @@ def v0_api(api):
def v0_blueprints_new():
"""Commit a new blueprint"""
branch = request.args.get("branch", "master")
if VALID_API_STRING.match(branch) is None:
return jsonify(status=False, errors=["Invalid characters in branch argument"]), 400
try:
if request.headers['Content-Type'] == "text/x-toml":
blueprint = recipe_from_toml(request.data)
@ -1158,6 +1173,9 @@ def v0_api(api):
return jsonify(status=False, errors=["Invalid characters in API path"]), 400
branch = request.args.get("branch", "master")
if VALID_API_STRING.match(branch) is None:
return jsonify(status=False, errors=["Invalid characters in branch argument"]), 400
try:
with api.config["GITLOCK"].lock:
delete_recipe(api.config["GITLOCK"].repo, branch, blueprint_name)
@ -1172,6 +1190,9 @@ def v0_api(api):
def v0_blueprints_workspace():
"""Write a blueprint to the workspace"""
branch = request.args.get("branch", "master")
if VALID_API_STRING.match(branch) is None:
return jsonify(status=False, errors=["Invalid characters in branch argument"]), 400
try:
if request.headers['Content-Type'] == "text/x-toml":
blueprint = recipe_from_toml(request.data)
@ -1199,6 +1220,9 @@ def v0_api(api):
return jsonify(status=False, errors=["Invalid characters in API path"]), 400
branch = request.args.get("branch", "master")
if VALID_API_STRING.match(branch) is None:
return jsonify(status=False, errors=["Invalid characters in branch argument"]), 400
try:
with api.config["GITLOCK"].lock:
workspace_delete(api.config["GITLOCK"].repo, branch, blueprint_name)
@ -1220,6 +1244,9 @@ def v0_api(api):
return jsonify(status=False, errors=["Invalid characters in API path"]), 400
branch = request.args.get("branch", "master")
if VALID_API_STRING.match(branch) is None:
return jsonify(status=False, errors=["Invalid characters in branch argument"]), 400
try:
with api.config["GITLOCK"].lock:
revert_recipe(api.config["GITLOCK"].repo, branch, blueprint_name, commit)
@ -1243,6 +1270,9 @@ def v0_api(api):
return jsonify(status=False, errors=["Invalid characters in API path"]), 400
branch = request.args.get("branch", "master")
if VALID_API_STRING.match(branch) is None:
return jsonify(status=False, errors=["Invalid characters in branch argument"]), 400
try:
with api.config["GITLOCK"].lock:
tag_recipe_commit(api.config["GITLOCK"].repo, branch, blueprint_name)
@ -1267,6 +1297,9 @@ def v0_api(api):
return jsonify(status=False, errors=["Invalid characters in API path"]), 400
branch = request.args.get("branch", "master")
if VALID_API_STRING.match(branch) is None:
return jsonify(status=False, errors=["Invalid characters in branch argument"]), 400
try:
if from_commit == "NEWEST":
with api.config["GITLOCK"].lock:
@ -1309,7 +1342,13 @@ def v0_api(api):
return jsonify(status=False, errors=["Invalid characters in API path"]), 400
branch = request.args.get("branch", "master")
if VALID_API_STRING.match(branch) is None:
return jsonify(status=False, errors=["Invalid characters in branch argument"]), 400
out_fmt = request.args.get("format", "json")
if VALID_API_STRING.match(out_fmt) is None:
return jsonify(status=False, errors=["Invalid characters in format argument"]), 400
blueprints = []
errors = []
for blueprint_name in [n.strip() for n in sorted(blueprint_names.split(","), key=lambda n: n.lower())]:
@ -1367,6 +1406,9 @@ def v0_api(api):
return jsonify(status=False, errors=["Invalid characters in API path"]), 400
branch = request.args.get("branch", "master")
if VALID_API_STRING.match(branch) is None:
return jsonify(status=False, errors=["Invalid characters in branch argument"]), 400
blueprints = []
errors = []
for blueprint_name in [n.strip() for n in sorted(blueprint_names.split(","), key=lambda n: n.lower())]:
@ -1492,6 +1534,8 @@ def v0_api(api):
return jsonify(status=False, errors=["Invalid characters in API path"]), 400
out_fmt = request.args.get("format", "json")
if VALID_API_STRING.match(out_fmt) is None:
return jsonify(status=False, errors=["Invalid characters in format argument"]), 400
# Return info on all of the sources
if source_names == "*":

View File

@ -1032,7 +1032,12 @@ class ServerTestCase(unittest.TestCase):
self.assertNotEqual(data, None)
self.assertEqual(resp.status_code, 400)
self.assertEqual(data["status"], False)
self.assertEqual(data["errors"], ["Invalid characters in API path"])
self.assertTrue(len(data["errors"]) > 0)
self.assertTrue("Invalid characters in" in data["errors"][0])
def test_blueprints_list_branch(self):
resp = self.server.get("/api/v0/blueprints/list?branch=" + UTF8_TEST_STRING)
self.assertInputError(resp)
def test_blueprints_info_input(self):
"""Test the blueprints/info input character checking"""
@ -1040,12 +1045,21 @@ class ServerTestCase(unittest.TestCase):
resp = self.server.get("/api/v0/blueprints/info/" + UTF8_TEST_STRING)
self.assertInputError(resp)
resp = self.server.get("/api/v0/blueprints/info/http-server?branch=" + UTF8_TEST_STRING)
self.assertInputError(resp)
resp = self.server.get("/api/v0/blueprints/info/http-server?format=" + UTF8_TEST_STRING)
self.assertInputError(resp)
def test_blueprints_changes_input(self):
"""Test the blueprints/changes input character checking"""
# /api/v0/blueprints/changes/<blueprint_names>
resp = self.server.get("/api/v0/blueprints/changes/" + UTF8_TEST_STRING)
self.assertInputError(resp)
resp = self.server.get("/api/v0/blueprints/changes/http-server?branch=" + UTF8_TEST_STRING)
self.assertInputError(resp)
def test_blueprints_new_input(self):
"""Test the blueprints/new input character checking"""
# /api/v0/blueprints/new
@ -1063,11 +1077,20 @@ class ServerTestCase(unittest.TestCase):
content_type="application/json")
self.assertInputError(resp)
test_blueprint["name"] = "glusterfs"
resp = self.server.post("/api/v0/blueprints/new?branch=" + UTF8_TEST_STRING,
data=json.dumps(test_blueprint),
content_type="application/json")
self.assertInputError(resp)
def test_blueprints_delete_input(self):
"""Test the blueprints/delete input character checking"""
resp = self.server.delete("/api/v0/blueprints/delete/" + UTF8_TEST_STRING)
self.assertInputError(resp)
resp = self.server.delete("/api/v0/blueprints/delete/http-server?branch=" + UTF8_TEST_STRING)
self.assertInputError(resp)
def test_blueprints_workspace_input(self):
"""Test the blueprints/workspace input character checking"""
test_blueprint = {"description": "An example GlusterFS server with samba, ws version",
@ -1084,37 +1107,64 @@ class ServerTestCase(unittest.TestCase):
content_type="application/json")
self.assertInputError(resp)
test_blueprint["name"] = "glusterfs"
resp = self.server.post("/api/v0/blueprints/workspace?branch=" + UTF8_TEST_STRING,
data=json.dumps(test_blueprint),
content_type="application/json")
self.assertInputError(resp)
def test_blueprints_workspace_delete_input(self):
"""Test the DELETE blueprints/workspace input character checking"""
resp = self.server.delete("/api/v0/blueprints/workspace/" + UTF8_TEST_STRING)
self.assertInputError(resp)
resp = self.server.delete("/api/v0/blueprints/workspace/http-server?branch=" + UTF8_TEST_STRING)
self.assertInputError(resp)
def test_blueprints_undo_input(self):
"""Test the blueprints/undo/... input character checking"""
resp = self.server.post("/api/v0/blueprints/undo/" + UTF8_TEST_STRING + "/deadbeef")
self.assertInputError(resp)
resp = self.server.post("/api/v0/blueprints/undo/http-server/deadbeef?branch=" + UTF8_TEST_STRING)
self.assertInputError(resp)
def test_blueprints_tag_input(self):
"""Test the blueprints/tag input character checking"""
resp = self.server.post("/api/v0/blueprints/tag/" + UTF8_TEST_STRING)
self.assertInputError(resp)
resp = self.server.post("/api/v0/blueprints/tag/http-server?branch=" + UTF8_TEST_STRING)
self.assertInputError(resp)
def test_blueprints_diff_input(self):
"""Test the blueprints/diff input character checking"""
# /api/v0/blueprints/diff/<blueprint_name>/<from_commit>/<to_commit>
resp = self.server.get("/api/v0/blueprints/diff/" + UTF8_TEST_STRING + "/NEWEST/WORKSPACE")
self.assertInputError(resp)
resp = self.server.get("/api/v0/blueprints/diff/http-server/NEWEST/WORKSPACE?branch=" + UTF8_TEST_STRING)
self.assertInputError(resp)
def test_blueprints_freeze_input(self):
"""Test the blueprints/freeze input character checking"""
resp = self.server.get("/api/v0/blueprints/freeze/" + UTF8_TEST_STRING)
self.assertInputError(resp)
resp = self.server.get("/api/v0/blueprints/freeze/http-server?branch=" + UTF8_TEST_STRING)
self.assertInputError(resp)
resp = self.server.get("/api/v0/blueprints/freeze/http-server?format=" + UTF8_TEST_STRING)
self.assertInputError(resp)
def test_blueprints_depsolve_input(self):
"""Test the blueprints/depsolve input character checking"""
resp = self.server.get("/api/v0/blueprints/depsolve/" + UTF8_TEST_STRING)
self.assertInputError(resp)
resp = self.server.get("/api/v0/blueprints/depsolve/http-server?branch=" + UTF8_TEST_STRING)
self.assertInputError(resp)
def test_projects_info_input(self):
"""Test the projects/info input character checking"""
resp = self.server.get("/api/v0/projects/info/" + UTF8_TEST_STRING)