diff --git a/src/pylorax/api/v0.py b/src/pylorax/api/v0.py index 3317a552..f9b5501e 100644 --- a/src/pylorax/api/v0.py +++ b/src/pylorax/api/v0.py @@ -1017,6 +1017,9 @@ def v0_api(api): def v0_blueprints_list(): """List the available blueprints on a branch.""" branch = request.args.get("branch", "master") + if VALID_API_STRING.match(branch) is None: + return jsonify(status=False, errors=["Invalid characters in branch argument"]), 400 + try: limit = int(request.args.get("limit", "20")) offset = int(request.args.get("offset", "0")) @@ -1037,7 +1040,13 @@ def v0_api(api): return jsonify(status=False, errors=["Invalid characters in API path"]), 400 branch = request.args.get("branch", "master") + if VALID_API_STRING.match(branch) is None: + return jsonify(status=False, errors=["Invalid characters in branch argument"]), 400 + out_fmt = request.args.get("format", "json") + if VALID_API_STRING.match(out_fmt) is None: + return jsonify(status=False, errors=["Invalid characters in format argument"]), 400 + blueprints = [] changes = [] errors = [] @@ -1098,6 +1107,9 @@ def v0_api(api): return jsonify(status=False, errors=["Invalid characters in API path"]), 400 branch = request.args.get("branch", "master") + if VALID_API_STRING.match(branch) is None: + return jsonify(status=False, errors=["Invalid characters in branch argument"]), 400 + try: limit = int(request.args.get("limit", "20")) offset = int(request.args.get("offset", "0")) @@ -1127,6 +1139,9 @@ def v0_api(api): def v0_blueprints_new(): """Commit a new blueprint""" branch = request.args.get("branch", "master") + if VALID_API_STRING.match(branch) is None: + return jsonify(status=False, errors=["Invalid characters in branch argument"]), 400 + try: if request.headers['Content-Type'] == "text/x-toml": blueprint = recipe_from_toml(request.data) @@ -1158,6 +1173,9 @@ def v0_api(api): return jsonify(status=False, errors=["Invalid characters in API path"]), 400 branch = request.args.get("branch", "master") + if VALID_API_STRING.match(branch) is None: + return jsonify(status=False, errors=["Invalid characters in branch argument"]), 400 + try: with api.config["GITLOCK"].lock: delete_recipe(api.config["GITLOCK"].repo, branch, blueprint_name) @@ -1172,6 +1190,9 @@ def v0_api(api): def v0_blueprints_workspace(): """Write a blueprint to the workspace""" branch = request.args.get("branch", "master") + if VALID_API_STRING.match(branch) is None: + return jsonify(status=False, errors=["Invalid characters in branch argument"]), 400 + try: if request.headers['Content-Type'] == "text/x-toml": blueprint = recipe_from_toml(request.data) @@ -1199,6 +1220,9 @@ def v0_api(api): return jsonify(status=False, errors=["Invalid characters in API path"]), 400 branch = request.args.get("branch", "master") + if VALID_API_STRING.match(branch) is None: + return jsonify(status=False, errors=["Invalid characters in branch argument"]), 400 + try: with api.config["GITLOCK"].lock: workspace_delete(api.config["GITLOCK"].repo, branch, blueprint_name) @@ -1220,6 +1244,9 @@ def v0_api(api): return jsonify(status=False, errors=["Invalid characters in API path"]), 400 branch = request.args.get("branch", "master") + if VALID_API_STRING.match(branch) is None: + return jsonify(status=False, errors=["Invalid characters in branch argument"]), 400 + try: with api.config["GITLOCK"].lock: revert_recipe(api.config["GITLOCK"].repo, branch, blueprint_name, commit) @@ -1243,6 +1270,9 @@ def v0_api(api): return jsonify(status=False, errors=["Invalid characters in API path"]), 400 branch = request.args.get("branch", "master") + if VALID_API_STRING.match(branch) is None: + return jsonify(status=False, errors=["Invalid characters in branch argument"]), 400 + try: with api.config["GITLOCK"].lock: tag_recipe_commit(api.config["GITLOCK"].repo, branch, blueprint_name) @@ -1267,6 +1297,9 @@ def v0_api(api): return jsonify(status=False, errors=["Invalid characters in API path"]), 400 branch = request.args.get("branch", "master") + if VALID_API_STRING.match(branch) is None: + return jsonify(status=False, errors=["Invalid characters in branch argument"]), 400 + try: if from_commit == "NEWEST": with api.config["GITLOCK"].lock: @@ -1309,7 +1342,13 @@ def v0_api(api): return jsonify(status=False, errors=["Invalid characters in API path"]), 400 branch = request.args.get("branch", "master") + if VALID_API_STRING.match(branch) is None: + return jsonify(status=False, errors=["Invalid characters in branch argument"]), 400 + out_fmt = request.args.get("format", "json") + if VALID_API_STRING.match(out_fmt) is None: + return jsonify(status=False, errors=["Invalid characters in format argument"]), 400 + blueprints = [] errors = [] for blueprint_name in [n.strip() for n in sorted(blueprint_names.split(","), key=lambda n: n.lower())]: @@ -1367,6 +1406,9 @@ def v0_api(api): return jsonify(status=False, errors=["Invalid characters in API path"]), 400 branch = request.args.get("branch", "master") + if VALID_API_STRING.match(branch) is None: + return jsonify(status=False, errors=["Invalid characters in branch argument"]), 400 + blueprints = [] errors = [] for blueprint_name in [n.strip() for n in sorted(blueprint_names.split(","), key=lambda n: n.lower())]: @@ -1492,6 +1534,8 @@ def v0_api(api): return jsonify(status=False, errors=["Invalid characters in API path"]), 400 out_fmt = request.args.get("format", "json") + if VALID_API_STRING.match(out_fmt) is None: + return jsonify(status=False, errors=["Invalid characters in format argument"]), 400 # Return info on all of the sources if source_names == "*": diff --git a/tests/pylorax/test_server.py b/tests/pylorax/test_server.py index ba4c191a..0bd32959 100644 --- a/tests/pylorax/test_server.py +++ b/tests/pylorax/test_server.py @@ -1032,7 +1032,12 @@ class ServerTestCase(unittest.TestCase): self.assertNotEqual(data, None) self.assertEqual(resp.status_code, 400) self.assertEqual(data["status"], False) - self.assertEqual(data["errors"], ["Invalid characters in API path"]) + self.assertTrue(len(data["errors"]) > 0) + self.assertTrue("Invalid characters in" in data["errors"][0]) + + def test_blueprints_list_branch(self): + resp = self.server.get("/api/v0/blueprints/list?branch=" + UTF8_TEST_STRING) + self.assertInputError(resp) def test_blueprints_info_input(self): """Test the blueprints/info input character checking""" @@ -1040,12 +1045,21 @@ class ServerTestCase(unittest.TestCase): resp = self.server.get("/api/v0/blueprints/info/" + UTF8_TEST_STRING) self.assertInputError(resp) + resp = self.server.get("/api/v0/blueprints/info/http-server?branch=" + UTF8_TEST_STRING) + self.assertInputError(resp) + + resp = self.server.get("/api/v0/blueprints/info/http-server?format=" + UTF8_TEST_STRING) + self.assertInputError(resp) + def test_blueprints_changes_input(self): """Test the blueprints/changes input character checking""" # /api/v0/blueprints/changes/ resp = self.server.get("/api/v0/blueprints/changes/" + UTF8_TEST_STRING) self.assertInputError(resp) + resp = self.server.get("/api/v0/blueprints/changes/http-server?branch=" + UTF8_TEST_STRING) + self.assertInputError(resp) + def test_blueprints_new_input(self): """Test the blueprints/new input character checking""" # /api/v0/blueprints/new @@ -1063,11 +1077,20 @@ class ServerTestCase(unittest.TestCase): content_type="application/json") self.assertInputError(resp) + test_blueprint["name"] = "glusterfs" + resp = self.server.post("/api/v0/blueprints/new?branch=" + UTF8_TEST_STRING, + data=json.dumps(test_blueprint), + content_type="application/json") + self.assertInputError(resp) + def test_blueprints_delete_input(self): """Test the blueprints/delete input character checking""" resp = self.server.delete("/api/v0/blueprints/delete/" + UTF8_TEST_STRING) self.assertInputError(resp) + resp = self.server.delete("/api/v0/blueprints/delete/http-server?branch=" + UTF8_TEST_STRING) + self.assertInputError(resp) + def test_blueprints_workspace_input(self): """Test the blueprints/workspace input character checking""" test_blueprint = {"description": "An example GlusterFS server with samba, ws version", @@ -1084,37 +1107,64 @@ class ServerTestCase(unittest.TestCase): content_type="application/json") self.assertInputError(resp) + test_blueprint["name"] = "glusterfs" + resp = self.server.post("/api/v0/blueprints/workspace?branch=" + UTF8_TEST_STRING, + data=json.dumps(test_blueprint), + content_type="application/json") + self.assertInputError(resp) + def test_blueprints_workspace_delete_input(self): """Test the DELETE blueprints/workspace input character checking""" resp = self.server.delete("/api/v0/blueprints/workspace/" + UTF8_TEST_STRING) self.assertInputError(resp) + resp = self.server.delete("/api/v0/blueprints/workspace/http-server?branch=" + UTF8_TEST_STRING) + self.assertInputError(resp) + def test_blueprints_undo_input(self): """Test the blueprints/undo/... input character checking""" resp = self.server.post("/api/v0/blueprints/undo/" + UTF8_TEST_STRING + "/deadbeef") self.assertInputError(resp) + resp = self.server.post("/api/v0/blueprints/undo/http-server/deadbeef?branch=" + UTF8_TEST_STRING) + self.assertInputError(resp) + def test_blueprints_tag_input(self): """Test the blueprints/tag input character checking""" resp = self.server.post("/api/v0/blueprints/tag/" + UTF8_TEST_STRING) self.assertInputError(resp) + resp = self.server.post("/api/v0/blueprints/tag/http-server?branch=" + UTF8_TEST_STRING) + self.assertInputError(resp) + def test_blueprints_diff_input(self): """Test the blueprints/diff input character checking""" # /api/v0/blueprints/diff/// resp = self.server.get("/api/v0/blueprints/diff/" + UTF8_TEST_STRING + "/NEWEST/WORKSPACE") self.assertInputError(resp) + resp = self.server.get("/api/v0/blueprints/diff/http-server/NEWEST/WORKSPACE?branch=" + UTF8_TEST_STRING) + self.assertInputError(resp) + def test_blueprints_freeze_input(self): """Test the blueprints/freeze input character checking""" resp = self.server.get("/api/v0/blueprints/freeze/" + UTF8_TEST_STRING) self.assertInputError(resp) + resp = self.server.get("/api/v0/blueprints/freeze/http-server?branch=" + UTF8_TEST_STRING) + self.assertInputError(resp) + + resp = self.server.get("/api/v0/blueprints/freeze/http-server?format=" + UTF8_TEST_STRING) + self.assertInputError(resp) + def test_blueprints_depsolve_input(self): """Test the blueprints/depsolve input character checking""" resp = self.server.get("/api/v0/blueprints/depsolve/" + UTF8_TEST_STRING) self.assertInputError(resp) + resp = self.server.get("/api/v0/blueprints/depsolve/http-server?branch=" + UTF8_TEST_STRING) + self.assertInputError(resp) + def test_projects_info_input(self): """Test the projects/info input character checking""" resp = self.server.get("/api/v0/projects/info/" + UTF8_TEST_STRING)