From 8786c9764c36052261f6ebf7a2b2c61fb8eae6d5 Mon Sep 17 00:00:00 2001 From: "Brian C. Lane" Date: Mon, 6 Aug 2018 12:31:52 -0700 Subject: [PATCH] Add a test for invalid characters in the API route Currently the code is not UTF8 safe, so we need to return a clear error when invalid characters are passed in. This also adds tests for the routes to confirm that an error is correctly returned. (cherry picked from commit 74f5def3d4cad01ef879579edc8784be93faec2a) --- src/pylorax/api/regexes.py | 21 +++++ src/pylorax/api/v0.py | 82 +++++++++++++++++ tests/pylorax/test_server.py | 168 +++++++++++++++++++++++++++++++++++ 3 files changed, 271 insertions(+) create mode 100644 src/pylorax/api/regexes.py diff --git a/src/pylorax/api/regexes.py b/src/pylorax/api/regexes.py new file mode 100644 index 00000000..7be3415e --- /dev/null +++ b/src/pylorax/api/regexes.py @@ -0,0 +1,21 @@ +# +# Copyright (C) 2018 Red Hat, Inc. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +import re + +# These are the characters that we allow to be passed in via the +# API calls. +VALID_API_STRING = re.compile(r'^[a-zA-Z0-9_,.:*-]+$') diff --git a/src/pylorax/api/v0.py b/src/pylorax/api/v0.py index 72d3dbc7..3317a552 100644 --- a/src/pylorax/api/v0.py +++ b/src/pylorax/api/v0.py @@ -991,6 +991,7 @@ from pylorax.api.queue import uuid_tar, uuid_image, uuid_cancel, uuid_log from pylorax.api.recipes import list_branch_files, read_recipe_commit, recipe_filename, list_commits from pylorax.api.recipes import recipe_from_dict, recipe_from_toml, commit_recipe, delete_recipe, revert_recipe from pylorax.api.recipes import tag_recipe_commit, recipe_diff +from pylorax.api.regexes import VALID_API_STRING from pylorax.api.workspace import workspace_read, workspace_write, workspace_delete # The API functions don't actually get called by any code here @@ -1032,6 +1033,9 @@ def v0_api(api): @checkparams([("blueprint_names", "", "no blueprint names given")]) def v0_blueprints_info(blueprint_names): """Return the contents of the blueprint, or a list of blueprints""" + if VALID_API_STRING.match(blueprint_names) is None: + return jsonify(status=False, errors=["Invalid characters in API path"]), 400 + branch = request.args.get("branch", "master") out_fmt = request.args.get("format", "json") blueprints = [] @@ -1090,6 +1094,9 @@ def v0_api(api): @checkparams([("blueprint_names", "", "no blueprint names given")]) def v0_blueprints_changes(blueprint_names): """Return the changes to a blueprint or list of blueprints""" + if VALID_API_STRING.match(blueprint_names) is None: + return jsonify(status=False, errors=["Invalid characters in API path"]), 400 + branch = request.args.get("branch", "master") try: limit = int(request.args.get("limit", "20")) @@ -1126,6 +1133,9 @@ def v0_api(api): else: blueprint = recipe_from_dict(request.get_json(cache=False)) + if VALID_API_STRING.match(blueprint["name"]) is None: + return jsonify(status=False, errors=["Invalid characters in API path"]), 400 + with api.config["GITLOCK"].lock: commit_recipe(api.config["GITLOCK"].repo, branch, blueprint) @@ -1144,6 +1154,9 @@ def v0_api(api): @checkparams([("blueprint_name", "", "no blueprint name given")]) def v0_blueprints_delete(blueprint_name): """Delete a blueprint from git""" + if VALID_API_STRING.match(blueprint_name) is None: + return jsonify(status=False, errors=["Invalid characters in API path"]), 400 + branch = request.args.get("branch", "master") try: with api.config["GITLOCK"].lock: @@ -1165,6 +1178,9 @@ def v0_api(api): else: blueprint = recipe_from_dict(request.get_json(cache=False)) + if VALID_API_STRING.match(blueprint["name"]) is None: + return jsonify(status=False, errors=["Invalid characters in API path"]), 400 + with api.config["GITLOCK"].lock: workspace_write(api.config["GITLOCK"].repo, branch, blueprint) except Exception as e: @@ -1179,6 +1195,9 @@ def v0_api(api): @checkparams([("blueprint_name", "", "no blueprint name given")]) def v0_blueprints_delete_workspace(blueprint_name): """Delete a blueprint from the workspace""" + if VALID_API_STRING.match(blueprint_name) is None: + return jsonify(status=False, errors=["Invalid characters in API path"]), 400 + branch = request.args.get("branch", "master") try: with api.config["GITLOCK"].lock: @@ -1197,6 +1216,9 @@ def v0_api(api): ("commit", "", "no commit ID given")]) def v0_blueprints_undo(blueprint_name, commit): """Undo changes to a blueprint by reverting to a previous commit.""" + if VALID_API_STRING.match(blueprint_name) is None: + return jsonify(status=False, errors=["Invalid characters in API path"]), 400 + branch = request.args.get("branch", "master") try: with api.config["GITLOCK"].lock: @@ -1217,6 +1239,9 @@ def v0_api(api): @checkparams([("blueprint_name", "", "no blueprint name given")]) def v0_blueprints_tag(blueprint_name): """Tag a blueprint's latest blueprint commit as a 'revision'""" + if VALID_API_STRING.match(blueprint_name) is None: + return jsonify(status=False, errors=["Invalid characters in API path"]), 400 + branch = request.args.get("branch", "master") try: with api.config["GITLOCK"].lock: @@ -1237,6 +1262,10 @@ def v0_api(api): ("to_commit", "", "no to commit ID given")]) def v0_blueprints_diff(blueprint_name, from_commit, to_commit): """Return the differences between two commits of a blueprint""" + for s in [blueprint_name, from_commit, to_commit]: + if VALID_API_STRING.match(s) is None: + return jsonify(status=False, errors=["Invalid characters in API path"]), 400 + branch = request.args.get("branch", "master") try: if from_commit == "NEWEST": @@ -1276,6 +1305,9 @@ def v0_api(api): @checkparams([("blueprint_names", "", "no blueprint names given")]) def v0_blueprints_freeze(blueprint_names): """Return the blueprint with the exact modules and packages selected by depsolve""" + if VALID_API_STRING.match(blueprint_names) is None: + return jsonify(status=False, errors=["Invalid characters in API path"]), 400 + branch = request.args.get("branch", "master") out_fmt = request.args.get("format", "json") blueprints = [] @@ -1331,6 +1363,9 @@ def v0_api(api): @checkparams([("blueprint_names", "", "no blueprint names given")]) def v0_blueprints_depsolve(blueprint_names): """Return the dependencies for a blueprint""" + if VALID_API_STRING.match(blueprint_names) is None: + return jsonify(status=False, errors=["Invalid characters in API path"]), 400 + branch = request.args.get("branch", "master") blueprints = [] errors = [] @@ -1408,6 +1443,9 @@ def v0_api(api): @checkparams([("project_names", "", "no project names given")]) def v0_projects_info(project_names): """Return detailed information about the listed projects""" + if VALID_API_STRING.match(project_names) is None: + return jsonify(status=False, errors=["Invalid characters in API path"]), 400 + try: with api.config["DNFLOCK"].lock: projects = projects_info(api.config["DNFLOCK"].dbo, project_names.split(",")) @@ -1423,6 +1461,9 @@ def v0_api(api): @checkparams([("project_names", "", "no project names given")]) def v0_projects_depsolve(project_names): """Return detailed information about the listed projects""" + if VALID_API_STRING.match(project_names) is None: + return jsonify(status=False, errors=["Invalid characters in API path"]), 400 + try: with api.config["DNFLOCK"].lock: deps = projects_depsolve(api.config["DNFLOCK"].dbo, [(n, "*") for n in project_names.split(",")], []) @@ -1447,6 +1488,9 @@ def v0_api(api): @checkparams([("source_names", "", "no source names given")]) def v0_projects_source_info(source_names): """Return detailed info about the list of sources""" + if VALID_API_STRING.match(source_names) is None: + return jsonify(status=False, errors=["Invalid characters in API path"]), 400 + out_fmt = request.args.get("format", "json") # Return info on all of the sources @@ -1540,6 +1584,9 @@ def v0_api(api): @checkparams([("source_name", "", "no source name given")]) def v0_projects_source_delete(source_name): """Delete the named source and return a status response""" + if VALID_API_STRING.match(source_name) is None: + return jsonify(status=False, errors=["Invalid characters in API path"]), 400 + system_sources = get_repo_sources("/etc/yum.repos.d/*.repo") if source_name in system_sources: return jsonify(status=False, errors=["%s is a system source, it cannot be deleted." % source_name]), 400 @@ -1567,6 +1614,9 @@ def v0_api(api): @crossdomain(origin="*") def v0_modules_list(module_names=None): """List available modules, filtering by module_names""" + if module_names and VALID_API_STRING.match(module_names) is None: + return jsonify(status=False, errors=["Invalid characters in API path"]), 400 + try: limit = int(request.args.get("limit", "20")) offset = int(request.args.get("offset", "0")) @@ -1592,6 +1642,8 @@ def v0_api(api): @checkparams([("module_names", "", "no module names given")]) def v0_modules_info(module_names): """Return detailed information about the listed modules""" + if VALID_API_STRING.match(module_names) is None: + return jsonify(status=False, errors=["Invalid characters in API path"]), 400 try: with api.config["DNFLOCK"].lock: modules = modules_info(api.config["DNFLOCK"].dbo, module_names.split(",")) @@ -1644,6 +1696,9 @@ def v0_api(api): else: compose_type = compose["compose_type"] + if VALID_API_STRING.match(blueprint_name) is None: + errors.append("Invalid characters in API path") + if errors: return jsonify(status=False, errors=errors), 400 @@ -1689,6 +1744,9 @@ def v0_api(api): @checkparams([("uuids", "", "no UUIDs given")]) def v0_compose_status(uuids): """Return the status of the listed uuids""" + if VALID_API_STRING.match(uuids) is None: + return jsonify(status=False, errors=["Invalid characters in API path"]), 400 + results = [] for uuid in [n.strip().lower() for n in uuids.split(",")]: details = uuid_status(api.config["COMPOSER_CFG"], uuid) @@ -1703,6 +1761,9 @@ def v0_api(api): @checkparams([("uuid", "", "no UUID given")]) def v0_compose_cancel(uuid): """Cancel a running compose and delete its results directory""" + if VALID_API_STRING.match(uuid) is None: + return jsonify(status=False, errors=["Invalid characters in API path"]), 400 + status = uuid_status(api.config["COMPOSER_CFG"], uuid) if status is None: return jsonify(status=False, errors=["%s is not a valid build uuid" % uuid]), 400 @@ -1723,6 +1784,9 @@ def v0_api(api): @checkparams([("uuids", "", "no UUIDs given")]) def v0_compose_delete(uuids): """Delete the compose results for the listed uuids""" + if VALID_API_STRING.match(uuids) is None: + return jsonify(status=False, errors=["Invalid characters in API path"]), 400 + results = [] errors = [] for uuid in [n.strip().lower() for n in uuids.split(",")]: @@ -1746,6 +1810,9 @@ def v0_api(api): @checkparams([("uuid", "", "no UUID given")]) def v0_compose_info(uuid): """Return detailed info about a compose""" + if VALID_API_STRING.match(uuid) is None: + return jsonify(status=False, errors=["Invalid characters in API path"]), 400 + try: info = uuid_info(api.config["COMPOSER_CFG"], uuid) except Exception as e: @@ -1759,6 +1826,9 @@ def v0_api(api): @checkparams([("uuid","", "no UUID given")]) def v0_compose_metadata(uuid): """Return a tar of the metadata for the build""" + if VALID_API_STRING.match(uuid) is None: + return jsonify(status=False, errors=["Invalid characters in API path"]), 400 + status = uuid_status(api.config["COMPOSER_CFG"], uuid) if status is None: return jsonify(status=False, errors=["%s is not a valid build uuid" % uuid]), 400 @@ -1776,6 +1846,9 @@ def v0_api(api): @checkparams([("uuid","", "no UUID given")]) def v0_compose_results(uuid): """Return a tar of the metadata and the results for the build""" + if VALID_API_STRING.match(uuid) is None: + return jsonify(status=False, errors=["Invalid characters in API path"]), 400 + status = uuid_status(api.config["COMPOSER_CFG"], uuid) if status is None: return jsonify(status=False, errors=["%s is not a valid build uuid" % uuid]), 400 @@ -1793,6 +1866,9 @@ def v0_api(api): @checkparams([("uuid","", "no UUID given")]) def v0_compose_logs(uuid): """Return a tar of the metadata for the build""" + if VALID_API_STRING.match(uuid) is None: + return jsonify(status=False, errors=["Invalid characters in API path"]), 400 + status = uuid_status(api.config["COMPOSER_CFG"], uuid) if status is None: return jsonify(status=False, errors=["%s is not a valid build uuid" % uuid]), 400 @@ -1810,6 +1886,9 @@ def v0_api(api): @checkparams([("uuid","", "no UUID given")]) def v0_compose_image(uuid): """Return the output image for the build""" + if VALID_API_STRING.match(uuid) is None: + return jsonify(status=False, errors=["Invalid characters in API path"]), 400 + status = uuid_status(api.config["COMPOSER_CFG"], uuid) if status is None: return jsonify(status=False, errors=["%s is not a valid build uuid" % uuid]), 400 @@ -1833,6 +1912,9 @@ def v0_api(api): @checkparams([("uuid","", "no UUID given")]) def v0_compose_log_tail(uuid): """Return the end of the main anaconda.log, defaults to 1Mbytes""" + if VALID_API_STRING.match(uuid) is None: + return jsonify(status=False, errors=["Invalid characters in API path"]), 400 + try: size = int(request.args.get("size", "1024")) except ValueError as e: diff --git a/tests/pylorax/test_server.py b/tests/pylorax/test_server.py index c63e39e0..ec88a700 100644 --- a/tests/pylorax/test_server.py +++ b/tests/pylorax/test_server.py @@ -1,3 +1,4 @@ +# -*- coding: UTF-8 -*- # # Copyright (C) 2017 Red Hat, Inc. # @@ -32,6 +33,9 @@ from pylorax.api.server import server, GitLock, DNFLock from pylorax.api.dnfbase import get_base_object from pylorax.sysutils import joinpaths +# Used for testing UTF-8 input support +UTF8_TEST_STRING = "I ο½—π’Šll πŸ‰ΞΏπ˜ π› ο½π”°κœ± π˜π’‰πΈπšœ".decode("utf-8") + def get_system_repo(): """Get an enabled system repo from /etc/yum.repos.d/*repo @@ -1021,3 +1025,167 @@ class ServerTestCase(unittest.TestCase): data = json.loads(resp.data) self.assertNotEqual(data, None) self.assertEqual(data["finished"], [], "Failed to delete the failed build: %s" % data) + + def assertInputError(self, resp): + """Check all the conditions for a successful input check error result""" + data = json.loads(resp.data) + self.assertNotEqual(data, None) + self.assertEqual(resp.status_code, 400) + self.assertEqual(data["status"], False) + self.assertEqual(data["errors"], ["Invalid characters in API path"]) + + def test_blueprints_info_input(self): + """Test the blueprints/info input character checking""" + # /api/v0/blueprints/info/ + resp = self.server.get("/api/v0/blueprints/info/" + UTF8_TEST_STRING) + self.assertInputError(resp) + + def test_blueprints_changes_input(self): + """Test the blueprints/changes input character checking""" + # /api/v0/blueprints/changes/ + resp = self.server.get("/api/v0/blueprints/changes/" + UTF8_TEST_STRING) + self.assertInputError(resp) + + def test_blueprints_new_input(self): + """Test the blueprints/new input character checking""" + # /api/v0/blueprints/new + test_blueprint = {"description": "An example GlusterFS server with samba", + "name":UTF8_TEST_STRING, + "version": "0.2.0", + "modules":[{"name":"glusterfs", "version":"3.*.*"}, + {"name":"glusterfs-cli", "version":"3.*.*"}], + "packages":[{"name":"samba", "version":"4.*.*"}, + {"name":"tmux", "version":"1.8"}], + "groups": []} + + resp = self.server.post("/api/v0/blueprints/new", + data=json.dumps(test_blueprint), + content_type="application/json") + self.assertInputError(resp) + + def test_blueprints_delete_input(self): + """Test the blueprints/delete input character checking""" + resp = self.server.delete("/api/v0/blueprints/delete/" + UTF8_TEST_STRING) + self.assertInputError(resp) + + def test_blueprints_workspace_input(self): + """Test the blueprints/workspace input character checking""" + test_blueprint = {"description": "An example GlusterFS server with samba, ws version", + "name":UTF8_TEST_STRING, + "version": "0.3.0", + "modules":[{"name":"glusterfs", "version":"3.*.*"}, + {"name":"glusterfs-cli", "version":"3.*.*"}], + "packages":[{"name":"samba", "version":"4.*.*"}, + {"name":"tmux", "version":"1.8"}], + "groups": []} + + resp = self.server.post("/api/v0/blueprints/workspace", + data=json.dumps(test_blueprint), + content_type="application/json") + self.assertInputError(resp) + + def test_blueprints_workspace_delete_input(self): + """Test the DELETE blueprints/workspace input character checking""" + resp = self.server.delete("/api/v0/blueprints/workspace/" + UTF8_TEST_STRING) + self.assertInputError(resp) + + def test_blueprints_undo_input(self): + """Test the blueprints/undo/... input character checking""" + resp = self.server.post("/api/v0/blueprints/undo/" + UTF8_TEST_STRING + "/deadbeef") + self.assertInputError(resp) + + def test_blueprints_tag_input(self): + """Test the blueprints/tag input character checking""" + resp = self.server.post("/api/v0/blueprints/tag/" + UTF8_TEST_STRING) + self.assertInputError(resp) + + def test_blueprints_diff_input(self): + """Test the blueprints/diff input character checking""" + # /api/v0/blueprints/diff/// + resp = self.server.get("/api/v0/blueprints/diff/" + UTF8_TEST_STRING + "/NEWEST/WORKSPACE") + self.assertInputError(resp) + + def test_blueprints_freeze_input(self): + """Test the blueprints/freeze input character checking""" + resp = self.server.get("/api/v0/blueprints/freeze/" + UTF8_TEST_STRING) + self.assertInputError(resp) + + def test_blueprints_depsolve_input(self): + """Test the blueprints/depsolve input character checking""" + resp = self.server.get("/api/v0/blueprints/depsolve/" + UTF8_TEST_STRING) + self.assertInputError(resp) + + def test_projects_info_input(self): + """Test the projects/info input character checking""" + resp = self.server.get("/api/v0/projects/info/" + UTF8_TEST_STRING) + self.assertInputError(resp) + + def test_projects_depsolve_input(self): + """Test the projects/depsolve input character checking""" + resp = self.server.get("/api/v0/projects/depsolve/" + UTF8_TEST_STRING) + self.assertInputError(resp) + + def test_projects_source_info_input(self): + """Test the projects/source/info input character checking""" + resp = self.server.get("/api/v0/projects/source/info/" + UTF8_TEST_STRING) + self.assertInputError(resp) + + def test_projects_source_delete_input(self): + """Test the projects/source/delete input character checking""" + resp = self.server.delete("/api/v0/projects/source/delete/" + UTF8_TEST_STRING) + self.assertInputError(resp) + + def test_modules_list_input(self): + """Test the modules/list input character checking""" + resp = self.server.get("/api/v0/modules/list/" + UTF8_TEST_STRING) + self.assertInputError(resp) + + def test_modules_info_input(self): + """Test the modules/info input character checking""" + resp = self.server.get("/api/v0/modules/info/" + UTF8_TEST_STRING) + self.assertInputError(resp) + + def test_compose_status_input(self): + """Test the compose/status input character checking""" + resp = self.server.get("/api/v0/compose/status/" + UTF8_TEST_STRING) + self.assertInputError(resp) + + def test_compose_cancel_input(self): + """Test the compose/cancel input character checking""" + resp = self.server.delete("/api/v0/compose/cancel/" + UTF8_TEST_STRING) + self.assertInputError(resp) + + def test_compose_delete_input(self): + """Test the compose/delete input character checking""" + resp = self.server.delete("/api/v0/compose/delete/" + UTF8_TEST_STRING) + self.assertInputError(resp) + + def test_compose_info_input(self): + """Test the compose/info input character checking""" + resp = self.server.get("/api/v0/compose/info/" + UTF8_TEST_STRING) + self.assertInputError(resp) + + def test_compose_metadata_input(self): + """Test the compose/metadata input character checking""" + resp = self.server.get("/api/v0/compose/metadata/" + UTF8_TEST_STRING) + self.assertInputError(resp) + + def test_compose_results_input(self): + """Test the compose/results input character checking""" + resp = self.server.get("/api/v0/compose/results/" + UTF8_TEST_STRING) + self.assertInputError(resp) + + def test_compose_logs_input(self): + """Test the compose/logs input character checking""" + resp = self.server.get("/api/v0/compose/logs/" + UTF8_TEST_STRING) + self.assertInputError(resp) + + def test_compose_image_input(self): + """Test the compose/image input character checking""" + resp = self.server.get("/api/v0/compose/image/" + UTF8_TEST_STRING) + self.assertInputError(resp) + + def test_compose_log_input(self): + """Test the compose/log input character checking""" + resp = self.server.get("/api/v0/compose/log/" + UTF8_TEST_STRING) + self.assertInputError(resp)