Add a test for invalid characters in the API route

Currently the code is not UTF8 safe, so we need to return a clear error
when invalid characters are passed in.

This also adds tests for the routes to confirm that an error is
correctly returned.
This commit is contained in:
Brian C. Lane 2018-08-06 12:31:52 -07:00
parent 5daf2d416a
commit 74f5def3d4
3 changed files with 271 additions and 0 deletions

View File

@ -0,0 +1,21 @@
#
# Copyright (C) 2018 Red Hat, Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import re
# These are the characters that we allow to be passed in via the
# API calls.
VALID_API_STRING = re.compile(r'^[a-zA-Z0-9_,.:*-]+$')

View File

@ -991,6 +991,7 @@ from pylorax.api.queue import uuid_tar, uuid_image, uuid_cancel, uuid_log
from pylorax.api.recipes import list_branch_files, read_recipe_commit, recipe_filename, list_commits
from pylorax.api.recipes import recipe_from_dict, recipe_from_toml, commit_recipe, delete_recipe, revert_recipe
from pylorax.api.recipes import tag_recipe_commit, recipe_diff
from pylorax.api.regexes import VALID_API_STRING
from pylorax.api.workspace import workspace_read, workspace_write, workspace_delete
from pylorax.api.yumbase import update_metadata
@ -1033,6 +1034,9 @@ def v0_api(api):
@checkparams([("blueprint_names", "", "no blueprint names given")])
def v0_blueprints_info(blueprint_names):
"""Return the contents of the blueprint, or a list of blueprints"""
if VALID_API_STRING.match(blueprint_names) is None:
return jsonify(status=False, errors=["Invalid characters in API path"]), 400
branch = request.args.get("branch", "master")
out_fmt = request.args.get("format", "json")
blueprints = []
@ -1091,6 +1095,9 @@ def v0_api(api):
@checkparams([("blueprint_names", "", "no blueprint names given")])
def v0_blueprints_changes(blueprint_names):
"""Return the changes to a blueprint or list of blueprints"""
if VALID_API_STRING.match(blueprint_names) is None:
return jsonify(status=False, errors=["Invalid characters in API path"]), 400
branch = request.args.get("branch", "master")
try:
limit = int(request.args.get("limit", "20"))
@ -1127,6 +1134,9 @@ def v0_api(api):
else:
blueprint = recipe_from_dict(request.get_json(cache=False))
if VALID_API_STRING.match(blueprint["name"]) is None:
return jsonify(status=False, errors=["Invalid characters in API path"]), 400
with api.config["GITLOCK"].lock:
commit_recipe(api.config["GITLOCK"].repo, branch, blueprint)
@ -1145,6 +1155,9 @@ def v0_api(api):
@checkparams([("blueprint_name", "", "no blueprint name given")])
def v0_blueprints_delete(blueprint_name):
"""Delete a blueprint from git"""
if VALID_API_STRING.match(blueprint_name) is None:
return jsonify(status=False, errors=["Invalid characters in API path"]), 400
branch = request.args.get("branch", "master")
try:
with api.config["GITLOCK"].lock:
@ -1166,6 +1179,9 @@ def v0_api(api):
else:
blueprint = recipe_from_dict(request.get_json(cache=False))
if VALID_API_STRING.match(blueprint["name"]) is None:
return jsonify(status=False, errors=["Invalid characters in API path"]), 400
with api.config["GITLOCK"].lock:
workspace_write(api.config["GITLOCK"].repo, branch, blueprint)
except Exception as e:
@ -1180,6 +1196,9 @@ def v0_api(api):
@checkparams([("blueprint_name", "", "no blueprint name given")])
def v0_blueprints_delete_workspace(blueprint_name):
"""Delete a blueprint from the workspace"""
if VALID_API_STRING.match(blueprint_name) is None:
return jsonify(status=False, errors=["Invalid characters in API path"]), 400
branch = request.args.get("branch", "master")
try:
with api.config["GITLOCK"].lock:
@ -1198,6 +1217,9 @@ def v0_api(api):
("commit", "", "no commit ID given")])
def v0_blueprints_undo(blueprint_name, commit):
"""Undo changes to a blueprint by reverting to a previous commit."""
if VALID_API_STRING.match(blueprint_name) is None:
return jsonify(status=False, errors=["Invalid characters in API path"]), 400
branch = request.args.get("branch", "master")
try:
with api.config["GITLOCK"].lock:
@ -1218,6 +1240,9 @@ def v0_api(api):
@checkparams([("blueprint_name", "", "no blueprint name given")])
def v0_blueprints_tag(blueprint_name):
"""Tag a blueprint's latest blueprint commit as a 'revision'"""
if VALID_API_STRING.match(blueprint_name) is None:
return jsonify(status=False, errors=["Invalid characters in API path"]), 400
branch = request.args.get("branch", "master")
try:
with api.config["GITLOCK"].lock:
@ -1238,6 +1263,10 @@ def v0_api(api):
("to_commit", "", "no to commit ID given")])
def v0_blueprints_diff(blueprint_name, from_commit, to_commit):
"""Return the differences between two commits of a blueprint"""
for s in [blueprint_name, from_commit, to_commit]:
if VALID_API_STRING.match(s) is None:
return jsonify(status=False, errors=["Invalid characters in API path"]), 400
branch = request.args.get("branch", "master")
try:
if from_commit == "NEWEST":
@ -1277,6 +1306,9 @@ def v0_api(api):
@checkparams([("blueprint_names", "", "no blueprint names given")])
def v0_blueprints_freeze(blueprint_names):
"""Return the blueprint with the exact modules and packages selected by depsolve"""
if VALID_API_STRING.match(blueprint_names) is None:
return jsonify(status=False, errors=["Invalid characters in API path"]), 400
branch = request.args.get("branch", "master")
out_fmt = request.args.get("format", "json")
blueprints = []
@ -1332,6 +1364,9 @@ def v0_api(api):
@checkparams([("blueprint_names", "", "no blueprint names given")])
def v0_blueprints_depsolve(blueprint_names):
"""Return the dependencies for a blueprint"""
if VALID_API_STRING.match(blueprint_names) is None:
return jsonify(status=False, errors=["Invalid characters in API path"]), 400
branch = request.args.get("branch", "master")
blueprints = []
errors = []
@ -1408,6 +1443,9 @@ def v0_api(api):
@checkparams([("project_names", "", "no project names given")])
def v0_projects_info(project_names):
"""Return detailed information about the listed projects"""
if VALID_API_STRING.match(project_names) is None:
return jsonify(status=False, errors=["Invalid characters in API path"]), 400
try:
with api.config["YUMLOCK"].lock:
projects = projects_info(api.config["YUMLOCK"].yb, project_names.split(","))
@ -1423,6 +1461,9 @@ def v0_api(api):
@checkparams([("project_names", "", "no project names given")])
def v0_projects_depsolve(project_names):
"""Return detailed information about the listed projects"""
if VALID_API_STRING.match(project_names) is None:
return jsonify(status=False, errors=["Invalid characters in API path"]), 400
try:
with api.config["YUMLOCK"].lock:
deps = projects_depsolve(api.config["YUMLOCK"].yb, [(n, "*") for n in project_names.split(",")], [])
@ -1447,6 +1488,9 @@ def v0_api(api):
@checkparams([("source_names", "", "no source names given")])
def v0_projects_source_info(source_names):
"""Return detailed info about the list of sources"""
if VALID_API_STRING.match(source_names) is None:
return jsonify(status=False, errors=["Invalid characters in API path"]), 400
out_fmt = request.args.get("format", "json")
# Return info on all of the sources
@ -1543,6 +1587,9 @@ def v0_api(api):
@checkparams([("source_name", "", "no source name given")])
def v0_projects_source_delete(source_name):
"""Delete the named source and return a status response"""
if VALID_API_STRING.match(source_name) is None:
return jsonify(status=False, errors=["Invalid characters in API path"]), 400
system_sources = get_repo_sources("/etc/yum.repos.d/*.repo")
if source_name in system_sources:
return jsonify(status=False, errors=["%s is a system source, it cannot be deleted." % source_name]), 400
@ -1574,6 +1621,9 @@ def v0_api(api):
@crossdomain(origin="*")
def v0_modules_list(module_names=None):
"""List available modules, filtering by module_names"""
if module_names and VALID_API_STRING.match(module_names) is None:
return jsonify(status=False, errors=["Invalid characters in API path"]), 400
try:
limit = int(request.args.get("limit", "20"))
offset = int(request.args.get("offset", "0"))
@ -1599,6 +1649,8 @@ def v0_api(api):
@checkparams([("module_names", "", "no module names given")])
def v0_modules_info(module_names):
"""Return detailed information about the listed modules"""
if VALID_API_STRING.match(module_names) is None:
return jsonify(status=False, errors=["Invalid characters in API path"]), 400
try:
with api.config["YUMLOCK"].lock:
modules = modules_info(api.config["YUMLOCK"].yb, module_names.split(","))
@ -1651,6 +1703,9 @@ def v0_api(api):
else:
compose_type = compose["compose_type"]
if VALID_API_STRING.match(blueprint_name) is None:
errors.append("Invalid characters in API path")
if errors:
return jsonify(status=False, errors=errors), 400
@ -1696,6 +1751,9 @@ def v0_api(api):
@checkparams([("uuids", "", "no UUIDs given")])
def v0_compose_status(uuids):
"""Return the status of the listed uuids"""
if VALID_API_STRING.match(uuids) is None:
return jsonify(status=False, errors=["Invalid characters in API path"]), 400
results = []
for uuid in [n.strip().lower() for n in uuids.split(",")]:
details = uuid_status(api.config["COMPOSER_CFG"], uuid)
@ -1710,6 +1768,9 @@ def v0_api(api):
@checkparams([("uuid", "", "no UUID given")])
def v0_compose_cancel(uuid):
"""Cancel a running compose and delete its results directory"""
if VALID_API_STRING.match(uuid) is None:
return jsonify(status=False, errors=["Invalid characters in API path"]), 400
status = uuid_status(api.config["COMPOSER_CFG"], uuid)
if status is None:
return jsonify(status=False, errors=["%s is not a valid build uuid" % uuid]), 400
@ -1730,6 +1791,9 @@ def v0_api(api):
@checkparams([("uuids", "", "no UUIDs given")])
def v0_compose_delete(uuids):
"""Delete the compose results for the listed uuids"""
if VALID_API_STRING.match(uuids) is None:
return jsonify(status=False, errors=["Invalid characters in API path"]), 400
results = []
errors = []
for uuid in [n.strip().lower() for n in uuids.split(",")]:
@ -1753,6 +1817,9 @@ def v0_api(api):
@checkparams([("uuid", "", "no UUID given")])
def v0_compose_info(uuid):
"""Return detailed info about a compose"""
if VALID_API_STRING.match(uuid) is None:
return jsonify(status=False, errors=["Invalid characters in API path"]), 400
try:
info = uuid_info(api.config["COMPOSER_CFG"], uuid)
except Exception as e:
@ -1766,6 +1833,9 @@ def v0_api(api):
@checkparams([("uuid","", "no UUID given")])
def v0_compose_metadata(uuid):
"""Return a tar of the metadata for the build"""
if VALID_API_STRING.match(uuid) is None:
return jsonify(status=False, errors=["Invalid characters in API path"]), 400
status = uuid_status(api.config["COMPOSER_CFG"], uuid)
if status is None:
return jsonify(status=False, errors=["%s is not a valid build uuid" % uuid]), 400
@ -1783,6 +1853,9 @@ def v0_api(api):
@checkparams([("uuid","", "no UUID given")])
def v0_compose_results(uuid):
"""Return a tar of the metadata and the results for the build"""
if VALID_API_STRING.match(uuid) is None:
return jsonify(status=False, errors=["Invalid characters in API path"]), 400
status = uuid_status(api.config["COMPOSER_CFG"], uuid)
if status is None:
return jsonify(status=False, errors=["%s is not a valid build uuid" % uuid]), 400
@ -1800,6 +1873,9 @@ def v0_api(api):
@checkparams([("uuid","", "no UUID given")])
def v0_compose_logs(uuid):
"""Return a tar of the metadata for the build"""
if VALID_API_STRING.match(uuid) is None:
return jsonify(status=False, errors=["Invalid characters in API path"]), 400
status = uuid_status(api.config["COMPOSER_CFG"], uuid)
if status is None:
return jsonify(status=False, errors=["%s is not a valid build uuid" % uuid]), 400
@ -1817,6 +1893,9 @@ def v0_api(api):
@checkparams([("uuid","", "no UUID given")])
def v0_compose_image(uuid):
"""Return the output image for the build"""
if VALID_API_STRING.match(uuid) is None:
return jsonify(status=False, errors=["Invalid characters in API path"]), 400
status = uuid_status(api.config["COMPOSER_CFG"], uuid)
if status is None:
return jsonify(status=False, errors=["%s is not a valid build uuid" % uuid]), 400
@ -1840,6 +1919,9 @@ def v0_api(api):
@checkparams([("uuid","", "no UUID given")])
def v0_compose_log_tail(uuid):
"""Return the end of the main anaconda.log, defaults to 1Mbytes"""
if VALID_API_STRING.match(uuid) is None:
return jsonify(status=False, errors=["Invalid characters in API path"]), 400
try:
size = int(request.args.get("size", "1024"))
except ValueError as e:

View File

@ -1,3 +1,4 @@
# -*- coding: UTF-8 -*-
#
# Copyright (C) 2017 Red Hat, Inc.
#
@ -32,6 +33,9 @@ from pylorax.api.server import server, GitLock, YumLock
from pylorax.api.yumbase import get_base_object
from pylorax.sysutils import joinpaths
# Used for testing UTF-8 input support
UTF8_TEST_STRING = "I 𝒊ll 𝟉ο𝘁 𝛠𝔰 𝘁𝒉𝝸𝚜".decode("utf-8")
def get_system_repo():
"""Get an enabled system repo from /etc/yum.repos.d/*repo
@ -1011,3 +1015,167 @@ class ServerTestCase(unittest.TestCase):
data = json.loads(resp.data)
self.assertNotEqual(data, None)
self.assertEqual(data["finished"], [], "Failed to delete the failed build: %s" % data)
def assertInputError(self, resp):
"""Check all the conditions for a successful input check error result"""
data = json.loads(resp.data)
self.assertNotEqual(data, None)
self.assertEqual(resp.status_code, 400)
self.assertEqual(data["status"], False)
self.assertEqual(data["errors"], ["Invalid characters in API path"])
def test_blueprints_info_input(self):
"""Test the blueprints/info input character checking"""
# /api/v0/blueprints/info/<blueprint_names>
resp = self.server.get("/api/v0/blueprints/info/" + UTF8_TEST_STRING)
self.assertInputError(resp)
def test_blueprints_changes_input(self):
"""Test the blueprints/changes input character checking"""
# /api/v0/blueprints/changes/<blueprint_names>
resp = self.server.get("/api/v0/blueprints/changes/" + UTF8_TEST_STRING)
self.assertInputError(resp)
def test_blueprints_new_input(self):
"""Test the blueprints/new input character checking"""
# /api/v0/blueprints/new
test_blueprint = {"description": "An example GlusterFS server with samba",
"name":UTF8_TEST_STRING,
"version": "0.2.0",
"modules":[{"name":"glusterfs", "version":"3.*.*"},
{"name":"glusterfs-cli", "version":"3.*.*"}],
"packages":[{"name":"samba", "version":"4.*.*"},
{"name":"tmux", "version":"1.8"}],
"groups": []}
resp = self.server.post("/api/v0/blueprints/new",
data=json.dumps(test_blueprint),
content_type="application/json")
self.assertInputError(resp)
def test_blueprints_delete_input(self):
"""Test the blueprints/delete input character checking"""
resp = self.server.delete("/api/v0/blueprints/delete/" + UTF8_TEST_STRING)
self.assertInputError(resp)
def test_blueprints_workspace_input(self):
"""Test the blueprints/workspace input character checking"""
test_blueprint = {"description": "An example GlusterFS server with samba, ws version",
"name":UTF8_TEST_STRING,
"version": "0.3.0",
"modules":[{"name":"glusterfs", "version":"3.*.*"},
{"name":"glusterfs-cli", "version":"3.*.*"}],
"packages":[{"name":"samba", "version":"4.*.*"},
{"name":"tmux", "version":"1.8"}],
"groups": []}
resp = self.server.post("/api/v0/blueprints/workspace",
data=json.dumps(test_blueprint),
content_type="application/json")
self.assertInputError(resp)
def test_blueprints_workspace_delete_input(self):
"""Test the DELETE blueprints/workspace input character checking"""
resp = self.server.delete("/api/v0/blueprints/workspace/" + UTF8_TEST_STRING)
self.assertInputError(resp)
def test_blueprints_undo_input(self):
"""Test the blueprints/undo/... input character checking"""
resp = self.server.post("/api/v0/blueprints/undo/" + UTF8_TEST_STRING + "/deadbeef")
self.assertInputError(resp)
def test_blueprints_tag_input(self):
"""Test the blueprints/tag input character checking"""
resp = self.server.post("/api/v0/blueprints/tag/" + UTF8_TEST_STRING)
self.assertInputError(resp)
def test_blueprints_diff_input(self):
"""Test the blueprints/diff input character checking"""
# /api/v0/blueprints/diff/<blueprint_name>/<from_commit>/<to_commit>
resp = self.server.get("/api/v0/blueprints/diff/" + UTF8_TEST_STRING + "/NEWEST/WORKSPACE")
self.assertInputError(resp)
def test_blueprints_freeze_input(self):
"""Test the blueprints/freeze input character checking"""
resp = self.server.get("/api/v0/blueprints/freeze/" + UTF8_TEST_STRING)
self.assertInputError(resp)
def test_blueprints_depsolve_input(self):
"""Test the blueprints/depsolve input character checking"""
resp = self.server.get("/api/v0/blueprints/depsolve/" + UTF8_TEST_STRING)
self.assertInputError(resp)
def test_projects_info_input(self):
"""Test the projects/info input character checking"""
resp = self.server.get("/api/v0/projects/info/" + UTF8_TEST_STRING)
self.assertInputError(resp)
def test_projects_depsolve_input(self):
"""Test the projects/depsolve input character checking"""
resp = self.server.get("/api/v0/projects/depsolve/" + UTF8_TEST_STRING)
self.assertInputError(resp)
def test_projects_source_info_input(self):
"""Test the projects/source/info input character checking"""
resp = self.server.get("/api/v0/projects/source/info/" + UTF8_TEST_STRING)
self.assertInputError(resp)
def test_projects_source_delete_input(self):
"""Test the projects/source/delete input character checking"""
resp = self.server.delete("/api/v0/projects/source/delete/" + UTF8_TEST_STRING)
self.assertInputError(resp)
def test_modules_list_input(self):
"""Test the modules/list input character checking"""
resp = self.server.get("/api/v0/modules/list/" + UTF8_TEST_STRING)
self.assertInputError(resp)
def test_modules_info_input(self):
"""Test the modules/info input character checking"""
resp = self.server.get("/api/v0/modules/info/" + UTF8_TEST_STRING)
self.assertInputError(resp)
def test_compose_status_input(self):
"""Test the compose/status input character checking"""
resp = self.server.get("/api/v0/compose/status/" + UTF8_TEST_STRING)
self.assertInputError(resp)
def test_compose_cancel_input(self):
"""Test the compose/cancel input character checking"""
resp = self.server.delete("/api/v0/compose/cancel/" + UTF8_TEST_STRING)
self.assertInputError(resp)
def test_compose_delete_input(self):
"""Test the compose/delete input character checking"""
resp = self.server.delete("/api/v0/compose/delete/" + UTF8_TEST_STRING)
self.assertInputError(resp)
def test_compose_info_input(self):
"""Test the compose/info input character checking"""
resp = self.server.get("/api/v0/compose/info/" + UTF8_TEST_STRING)
self.assertInputError(resp)
def test_compose_metadata_input(self):
"""Test the compose/metadata input character checking"""
resp = self.server.get("/api/v0/compose/metadata/" + UTF8_TEST_STRING)
self.assertInputError(resp)
def test_compose_results_input(self):
"""Test the compose/results input character checking"""
resp = self.server.get("/api/v0/compose/results/" + UTF8_TEST_STRING)
self.assertInputError(resp)
def test_compose_logs_input(self):
"""Test the compose/logs input character checking"""
resp = self.server.get("/api/v0/compose/logs/" + UTF8_TEST_STRING)
self.assertInputError(resp)
def test_compose_image_input(self):
"""Test the compose/image input character checking"""
resp = self.server.get("/api/v0/compose/image/" + UTF8_TEST_STRING)
self.assertInputError(resp)
def test_compose_log_input(self):
"""Test the compose/log input character checking"""
resp = self.server.get("/api/v0/compose/log/" + UTF8_TEST_STRING)
self.assertInputError(resp)