Add a test for invalid characters in the API route

Currently the code is not UTF8 safe, so we need to return a clear error
when invalid characters are passed in.

This also adds tests for the routes to confirm that an error is
correctly returned.

(cherry picked from commit 74f5def3d4)
This commit is contained in:
Brian C. Lane 2018-08-06 12:31:52 -07:00
parent 7999ae3ee7
commit f5cdb94c9c
3 changed files with 271 additions and 0 deletions

View File

@ -0,0 +1,21 @@
#
# Copyright (C) 2018 Red Hat, Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import re
# These are the characters that we allow to be passed in via the
# API calls.
VALID_API_STRING = re.compile(r'^[a-zA-Z0-9_,.:*-]+$')

View File

@ -991,6 +991,7 @@ from pylorax.api.queue import uuid_tar, uuid_image, uuid_cancel, uuid_log
from pylorax.api.recipes import list_branch_files, read_recipe_commit, recipe_filename, list_commits
from pylorax.api.recipes import recipe_from_dict, recipe_from_toml, commit_recipe, delete_recipe, revert_recipe
from pylorax.api.recipes import tag_recipe_commit, recipe_diff
from pylorax.api.regexes import VALID_API_STRING
from pylorax.api.workspace import workspace_read, workspace_write, workspace_delete
# The API functions don't actually get called by any code here
@ -1032,6 +1033,9 @@ def v0_api(api):
@checkparams([("blueprint_names", "", "no blueprint names given")])
def v0_blueprints_info(blueprint_names):
"""Return the contents of the blueprint, or a list of blueprints"""
if VALID_API_STRING.match(blueprint_names) is None:
return jsonify(status=False, errors=["Invalid characters in API path"]), 400
branch = request.args.get("branch", "master")
out_fmt = request.args.get("format", "json")
blueprints = []
@ -1090,6 +1094,9 @@ def v0_api(api):
@checkparams([("blueprint_names", "", "no blueprint names given")])
def v0_blueprints_changes(blueprint_names):
"""Return the changes to a blueprint or list of blueprints"""
if VALID_API_STRING.match(blueprint_names) is None:
return jsonify(status=False, errors=["Invalid characters in API path"]), 400
branch = request.args.get("branch", "master")
try:
limit = int(request.args.get("limit", "20"))
@ -1126,6 +1133,9 @@ def v0_api(api):
else:
blueprint = recipe_from_dict(request.get_json(cache=False))
if VALID_API_STRING.match(blueprint["name"]) is None:
return jsonify(status=False, errors=["Invalid characters in API path"]), 400
with api.config["GITLOCK"].lock:
commit_recipe(api.config["GITLOCK"].repo, branch, blueprint)
@ -1144,6 +1154,9 @@ def v0_api(api):
@checkparams([("blueprint_name", "", "no blueprint name given")])
def v0_blueprints_delete(blueprint_name):
"""Delete a blueprint from git"""
if VALID_API_STRING.match(blueprint_name) is None:
return jsonify(status=False, errors=["Invalid characters in API path"]), 400
branch = request.args.get("branch", "master")
try:
with api.config["GITLOCK"].lock:
@ -1165,6 +1178,9 @@ def v0_api(api):
else:
blueprint = recipe_from_dict(request.get_json(cache=False))
if VALID_API_STRING.match(blueprint["name"]) is None:
return jsonify(status=False, errors=["Invalid characters in API path"]), 400
with api.config["GITLOCK"].lock:
workspace_write(api.config["GITLOCK"].repo, branch, blueprint)
except Exception as e:
@ -1179,6 +1195,9 @@ def v0_api(api):
@checkparams([("blueprint_name", "", "no blueprint name given")])
def v0_blueprints_delete_workspace(blueprint_name):
"""Delete a blueprint from the workspace"""
if VALID_API_STRING.match(blueprint_name) is None:
return jsonify(status=False, errors=["Invalid characters in API path"]), 400
branch = request.args.get("branch", "master")
try:
with api.config["GITLOCK"].lock:
@ -1197,6 +1216,9 @@ def v0_api(api):
("commit", "", "no commit ID given")])
def v0_blueprints_undo(blueprint_name, commit):
"""Undo changes to a blueprint by reverting to a previous commit."""
if VALID_API_STRING.match(blueprint_name) is None:
return jsonify(status=False, errors=["Invalid characters in API path"]), 400
branch = request.args.get("branch", "master")
try:
with api.config["GITLOCK"].lock:
@ -1217,6 +1239,9 @@ def v0_api(api):
@checkparams([("blueprint_name", "", "no blueprint name given")])
def v0_blueprints_tag(blueprint_name):
"""Tag a blueprint's latest blueprint commit as a 'revision'"""
if VALID_API_STRING.match(blueprint_name) is None:
return jsonify(status=False, errors=["Invalid characters in API path"]), 400
branch = request.args.get("branch", "master")
try:
with api.config["GITLOCK"].lock:
@ -1237,6 +1262,10 @@ def v0_api(api):
("to_commit", "", "no to commit ID given")])
def v0_blueprints_diff(blueprint_name, from_commit, to_commit):
"""Return the differences between two commits of a blueprint"""
for s in [blueprint_name, from_commit, to_commit]:
if VALID_API_STRING.match(s) is None:
return jsonify(status=False, errors=["Invalid characters in API path"]), 400
branch = request.args.get("branch", "master")
try:
if from_commit == "NEWEST":
@ -1276,6 +1305,9 @@ def v0_api(api):
@checkparams([("blueprint_names", "", "no blueprint names given")])
def v0_blueprints_freeze(blueprint_names):
"""Return the blueprint with the exact modules and packages selected by depsolve"""
if VALID_API_STRING.match(blueprint_names) is None:
return jsonify(status=False, errors=["Invalid characters in API path"]), 400
branch = request.args.get("branch", "master")
out_fmt = request.args.get("format", "json")
blueprints = []
@ -1331,6 +1363,9 @@ def v0_api(api):
@checkparams([("blueprint_names", "", "no blueprint names given")])
def v0_blueprints_depsolve(blueprint_names):
"""Return the dependencies for a blueprint"""
if VALID_API_STRING.match(blueprint_names) is None:
return jsonify(status=False, errors=["Invalid characters in API path"]), 400
branch = request.args.get("branch", "master")
blueprints = []
errors = []
@ -1408,6 +1443,9 @@ def v0_api(api):
@checkparams([("project_names", "", "no project names given")])
def v0_projects_info(project_names):
"""Return detailed information about the listed projects"""
if VALID_API_STRING.match(project_names) is None:
return jsonify(status=False, errors=["Invalid characters in API path"]), 400
try:
with api.config["DNFLOCK"].lock:
projects = projects_info(api.config["DNFLOCK"].dbo, project_names.split(","))
@ -1423,6 +1461,9 @@ def v0_api(api):
@checkparams([("project_names", "", "no project names given")])
def v0_projects_depsolve(project_names):
"""Return detailed information about the listed projects"""
if VALID_API_STRING.match(project_names) is None:
return jsonify(status=False, errors=["Invalid characters in API path"]), 400
try:
with api.config["DNFLOCK"].lock:
deps = projects_depsolve(api.config["DNFLOCK"].dbo, [(n, "*") for n in project_names.split(",")], [])
@ -1447,6 +1488,9 @@ def v0_api(api):
@checkparams([("source_names", "", "no source names given")])
def v0_projects_source_info(source_names):
"""Return detailed info about the list of sources"""
if VALID_API_STRING.match(source_names) is None:
return jsonify(status=False, errors=["Invalid characters in API path"]), 400
out_fmt = request.args.get("format", "json")
# Return info on all of the sources
@ -1540,6 +1584,9 @@ def v0_api(api):
@checkparams([("source_name", "", "no source name given")])
def v0_projects_source_delete(source_name):
"""Delete the named source and return a status response"""
if VALID_API_STRING.match(source_name) is None:
return jsonify(status=False, errors=["Invalid characters in API path"]), 400
system_sources = get_repo_sources("/etc/yum.repos.d/*.repo")
if source_name in system_sources:
return jsonify(status=False, errors=["%s is a system source, it cannot be deleted." % source_name]), 400
@ -1567,6 +1614,9 @@ def v0_api(api):
@crossdomain(origin="*")
def v0_modules_list(module_names=None):
"""List available modules, filtering by module_names"""
if module_names and VALID_API_STRING.match(module_names) is None:
return jsonify(status=False, errors=["Invalid characters in API path"]), 400
try:
limit = int(request.args.get("limit", "20"))
offset = int(request.args.get("offset", "0"))
@ -1592,6 +1642,8 @@ def v0_api(api):
@checkparams([("module_names", "", "no module names given")])
def v0_modules_info(module_names):
"""Return detailed information about the listed modules"""
if VALID_API_STRING.match(module_names) is None:
return jsonify(status=False, errors=["Invalid characters in API path"]), 400
try:
with api.config["DNFLOCK"].lock:
modules = modules_info(api.config["DNFLOCK"].dbo, module_names.split(","))
@ -1644,6 +1696,9 @@ def v0_api(api):
else:
compose_type = compose["compose_type"]
if VALID_API_STRING.match(blueprint_name) is None:
errors.append("Invalid characters in API path")
if errors:
return jsonify(status=False, errors=errors), 400
@ -1689,6 +1744,9 @@ def v0_api(api):
@checkparams([("uuids", "", "no UUIDs given")])
def v0_compose_status(uuids):
"""Return the status of the listed uuids"""
if VALID_API_STRING.match(uuids) is None:
return jsonify(status=False, errors=["Invalid characters in API path"]), 400
results = []
for uuid in [n.strip().lower() for n in uuids.split(",")]:
details = uuid_status(api.config["COMPOSER_CFG"], uuid)
@ -1703,6 +1761,9 @@ def v0_api(api):
@checkparams([("uuid", "", "no UUID given")])
def v0_compose_cancel(uuid):
"""Cancel a running compose and delete its results directory"""
if VALID_API_STRING.match(uuid) is None:
return jsonify(status=False, errors=["Invalid characters in API path"]), 400
status = uuid_status(api.config["COMPOSER_CFG"], uuid)
if status is None:
return jsonify(status=False, errors=["%s is not a valid build uuid" % uuid]), 400
@ -1723,6 +1784,9 @@ def v0_api(api):
@checkparams([("uuids", "", "no UUIDs given")])
def v0_compose_delete(uuids):
"""Delete the compose results for the listed uuids"""
if VALID_API_STRING.match(uuids) is None:
return jsonify(status=False, errors=["Invalid characters in API path"]), 400
results = []
errors = []
for uuid in [n.strip().lower() for n in uuids.split(",")]:
@ -1746,6 +1810,9 @@ def v0_api(api):
@checkparams([("uuid", "", "no UUID given")])
def v0_compose_info(uuid):
"""Return detailed info about a compose"""
if VALID_API_STRING.match(uuid) is None:
return jsonify(status=False, errors=["Invalid characters in API path"]), 400
try:
info = uuid_info(api.config["COMPOSER_CFG"], uuid)
except Exception as e:
@ -1759,6 +1826,9 @@ def v0_api(api):
@checkparams([("uuid","", "no UUID given")])
def v0_compose_metadata(uuid):
"""Return a tar of the metadata for the build"""
if VALID_API_STRING.match(uuid) is None:
return jsonify(status=False, errors=["Invalid characters in API path"]), 400
status = uuid_status(api.config["COMPOSER_CFG"], uuid)
if status is None:
return jsonify(status=False, errors=["%s is not a valid build uuid" % uuid]), 400
@ -1776,6 +1846,9 @@ def v0_api(api):
@checkparams([("uuid","", "no UUID given")])
def v0_compose_results(uuid):
"""Return a tar of the metadata and the results for the build"""
if VALID_API_STRING.match(uuid) is None:
return jsonify(status=False, errors=["Invalid characters in API path"]), 400
status = uuid_status(api.config["COMPOSER_CFG"], uuid)
if status is None:
return jsonify(status=False, errors=["%s is not a valid build uuid" % uuid]), 400
@ -1793,6 +1866,9 @@ def v0_api(api):
@checkparams([("uuid","", "no UUID given")])
def v0_compose_logs(uuid):
"""Return a tar of the metadata for the build"""
if VALID_API_STRING.match(uuid) is None:
return jsonify(status=False, errors=["Invalid characters in API path"]), 400
status = uuid_status(api.config["COMPOSER_CFG"], uuid)
if status is None:
return jsonify(status=False, errors=["%s is not a valid build uuid" % uuid]), 400
@ -1810,6 +1886,9 @@ def v0_api(api):
@checkparams([("uuid","", "no UUID given")])
def v0_compose_image(uuid):
"""Return the output image for the build"""
if VALID_API_STRING.match(uuid) is None:
return jsonify(status=False, errors=["Invalid characters in API path"]), 400
status = uuid_status(api.config["COMPOSER_CFG"], uuid)
if status is None:
return jsonify(status=False, errors=["%s is not a valid build uuid" % uuid]), 400
@ -1833,6 +1912,9 @@ def v0_api(api):
@checkparams([("uuid","", "no UUID given")])
def v0_compose_log_tail(uuid):
"""Return the end of the main anaconda.log, defaults to 1Mbytes"""
if VALID_API_STRING.match(uuid) is None:
return jsonify(status=False, errors=["Invalid characters in API path"]), 400
try:
size = int(request.args.get("size", "1024"))
except ValueError as e:

View File

@ -1,3 +1,4 @@
# -*- coding: UTF-8 -*-
#
# Copyright (C) 2017 Red Hat, Inc.
#
@ -32,6 +33,9 @@ from pylorax.api.server import server, GitLock, DNFLock
from pylorax.api.dnfbase import get_base_object
from pylorax.sysutils import joinpaths
# Used for testing UTF-8 input support
UTF8_TEST_STRING = "I 𝒊ll 𝟉ο𝘁 𝛠𝔰 𝘁𝒉𝝸𝚜".decode("utf-8")
def get_system_repo():
"""Get an enabled system repo from /etc/yum.repos.d/*repo
@ -1021,3 +1025,167 @@ class ServerTestCase(unittest.TestCase):
data = json.loads(resp.data)
self.assertNotEqual(data, None)
self.assertEqual(data["finished"], [], "Failed to delete the failed build: %s" % data)
def assertInputError(self, resp):
"""Check all the conditions for a successful input check error result"""
data = json.loads(resp.data)
self.assertNotEqual(data, None)
self.assertEqual(resp.status_code, 400)
self.assertEqual(data["status"], False)
self.assertEqual(data["errors"], ["Invalid characters in API path"])
def test_blueprints_info_input(self):
"""Test the blueprints/info input character checking"""
# /api/v0/blueprints/info/<blueprint_names>
resp = self.server.get("/api/v0/blueprints/info/" + UTF8_TEST_STRING)
self.assertInputError(resp)
def test_blueprints_changes_input(self):
"""Test the blueprints/changes input character checking"""
# /api/v0/blueprints/changes/<blueprint_names>
resp = self.server.get("/api/v0/blueprints/changes/" + UTF8_TEST_STRING)
self.assertInputError(resp)
def test_blueprints_new_input(self):
"""Test the blueprints/new input character checking"""
# /api/v0/blueprints/new
test_blueprint = {"description": "An example GlusterFS server with samba",
"name":UTF8_TEST_STRING,
"version": "0.2.0",
"modules":[{"name":"glusterfs", "version":"3.*.*"},
{"name":"glusterfs-cli", "version":"3.*.*"}],
"packages":[{"name":"samba", "version":"4.*.*"},
{"name":"tmux", "version":"1.8"}],
"groups": []}
resp = self.server.post("/api/v0/blueprints/new",
data=json.dumps(test_blueprint),
content_type="application/json")
self.assertInputError(resp)
def test_blueprints_delete_input(self):
"""Test the blueprints/delete input character checking"""
resp = self.server.delete("/api/v0/blueprints/delete/" + UTF8_TEST_STRING)
self.assertInputError(resp)
def test_blueprints_workspace_input(self):
"""Test the blueprints/workspace input character checking"""
test_blueprint = {"description": "An example GlusterFS server with samba, ws version",
"name":UTF8_TEST_STRING,
"version": "0.3.0",
"modules":[{"name":"glusterfs", "version":"3.*.*"},
{"name":"glusterfs-cli", "version":"3.*.*"}],
"packages":[{"name":"samba", "version":"4.*.*"},
{"name":"tmux", "version":"1.8"}],
"groups": []}
resp = self.server.post("/api/v0/blueprints/workspace",
data=json.dumps(test_blueprint),
content_type="application/json")
self.assertInputError(resp)
def test_blueprints_workspace_delete_input(self):
"""Test the DELETE blueprints/workspace input character checking"""
resp = self.server.delete("/api/v0/blueprints/workspace/" + UTF8_TEST_STRING)
self.assertInputError(resp)
def test_blueprints_undo_input(self):
"""Test the blueprints/undo/... input character checking"""
resp = self.server.post("/api/v0/blueprints/undo/" + UTF8_TEST_STRING + "/deadbeef")
self.assertInputError(resp)
def test_blueprints_tag_input(self):
"""Test the blueprints/tag input character checking"""
resp = self.server.post("/api/v0/blueprints/tag/" + UTF8_TEST_STRING)
self.assertInputError(resp)
def test_blueprints_diff_input(self):
"""Test the blueprints/diff input character checking"""
# /api/v0/blueprints/diff/<blueprint_name>/<from_commit>/<to_commit>
resp = self.server.get("/api/v0/blueprints/diff/" + UTF8_TEST_STRING + "/NEWEST/WORKSPACE")
self.assertInputError(resp)
def test_blueprints_freeze_input(self):
"""Test the blueprints/freeze input character checking"""
resp = self.server.get("/api/v0/blueprints/freeze/" + UTF8_TEST_STRING)
self.assertInputError(resp)
def test_blueprints_depsolve_input(self):
"""Test the blueprints/depsolve input character checking"""
resp = self.server.get("/api/v0/blueprints/depsolve/" + UTF8_TEST_STRING)
self.assertInputError(resp)
def test_projects_info_input(self):
"""Test the projects/info input character checking"""
resp = self.server.get("/api/v0/projects/info/" + UTF8_TEST_STRING)
self.assertInputError(resp)
def test_projects_depsolve_input(self):
"""Test the projects/depsolve input character checking"""
resp = self.server.get("/api/v0/projects/depsolve/" + UTF8_TEST_STRING)
self.assertInputError(resp)
def test_projects_source_info_input(self):
"""Test the projects/source/info input character checking"""
resp = self.server.get("/api/v0/projects/source/info/" + UTF8_TEST_STRING)
self.assertInputError(resp)
def test_projects_source_delete_input(self):
"""Test the projects/source/delete input character checking"""
resp = self.server.delete("/api/v0/projects/source/delete/" + UTF8_TEST_STRING)
self.assertInputError(resp)
def test_modules_list_input(self):
"""Test the modules/list input character checking"""
resp = self.server.get("/api/v0/modules/list/" + UTF8_TEST_STRING)
self.assertInputError(resp)
def test_modules_info_input(self):
"""Test the modules/info input character checking"""
resp = self.server.get("/api/v0/modules/info/" + UTF8_TEST_STRING)
self.assertInputError(resp)
def test_compose_status_input(self):
"""Test the compose/status input character checking"""
resp = self.server.get("/api/v0/compose/status/" + UTF8_TEST_STRING)
self.assertInputError(resp)
def test_compose_cancel_input(self):
"""Test the compose/cancel input character checking"""
resp = self.server.delete("/api/v0/compose/cancel/" + UTF8_TEST_STRING)
self.assertInputError(resp)
def test_compose_delete_input(self):
"""Test the compose/delete input character checking"""
resp = self.server.delete("/api/v0/compose/delete/" + UTF8_TEST_STRING)
self.assertInputError(resp)
def test_compose_info_input(self):
"""Test the compose/info input character checking"""
resp = self.server.get("/api/v0/compose/info/" + UTF8_TEST_STRING)
self.assertInputError(resp)
def test_compose_metadata_input(self):
"""Test the compose/metadata input character checking"""
resp = self.server.get("/api/v0/compose/metadata/" + UTF8_TEST_STRING)
self.assertInputError(resp)
def test_compose_results_input(self):
"""Test the compose/results input character checking"""
resp = self.server.get("/api/v0/compose/results/" + UTF8_TEST_STRING)
self.assertInputError(resp)
def test_compose_logs_input(self):
"""Test the compose/logs input character checking"""
resp = self.server.get("/api/v0/compose/logs/" + UTF8_TEST_STRING)
self.assertInputError(resp)
def test_compose_image_input(self):
"""Test the compose/image input character checking"""
resp = self.server.get("/api/v0/compose/image/" + UTF8_TEST_STRING)
self.assertInputError(resp)
def test_compose_log_input(self):
"""Test the compose/log input character checking"""
resp = self.server.get("/api/v0/compose/log/" + UTF8_TEST_STRING)
self.assertInputError(resp)