Add seperate validation for blueprint names

The VALID_API_STRING function allows for characters that should not be
allowed in blueprint names. VALID_BLUEPRINT_NAME allows us to
specifically check if a blueprint contains a valid name.
This commit is contained in:
Jacob Kozol 2019-07-05 16:21:27 +02:00 committed by Lars Karlitski
parent e11fc2037f
commit 4174186c14
3 changed files with 37 additions and 15 deletions

View File

@ -19,3 +19,6 @@ import re
# These are the characters that we allow to be passed in via the
# API calls.
VALID_API_STRING = re.compile(r'^[a-zA-Z0-9_,.:+*-]+$')
# These are the characters that we allow to be used in blueprint names.
VALID_BLUEPRINT_NAME = re.compile(r'^[a-zA-Z0-9._-]+$')

View File

@ -68,7 +68,7 @@ from pylorax.api.queue import uuid_tar, uuid_image, uuid_cancel, uuid_log
from pylorax.api.recipes import RecipeError, list_branch_files, read_recipe_commit, recipe_filename, list_commits
from pylorax.api.recipes import recipe_from_dict, recipe_from_toml, commit_recipe, delete_recipe, revert_recipe
from pylorax.api.recipes import tag_recipe_commit, recipe_diff, RecipeFileError
from pylorax.api.regexes import VALID_API_STRING
from pylorax.api.regexes import VALID_API_STRING, VALID_BLUEPRINT_NAME
import pylorax.api.toml as toml
from pylorax.api.workspace import workspace_read, workspace_write, workspace_delete
@ -211,7 +211,7 @@ def v0_blueprints_info(blueprint_names):
"blueprints": []
}
"""
if VALID_API_STRING.match(blueprint_names) is None:
if any(VALID_BLUEPRINT_NAME.match(blueprint_name) is None for blueprint_name in blueprint_names.split(',')):
return jsonify(status=False, errors=[{"id": INVALID_CHARS, "msg": "Invalid characters in API path"}]), 400
branch = request.args.get("branch", "master")
@ -344,7 +344,7 @@ def v0_blueprints_changes(blueprint_names):
]
}
"""
if VALID_API_STRING.match(blueprint_names) is None:
if any(VALID_BLUEPRINT_NAME.match(blueprint_name) is None for blueprint_name in blueprint_names.split(',')):
return jsonify(status=False, errors=[{"id": INVALID_CHARS, "msg": "Invalid characters in API path"}]), 400
branch = request.args.get("branch", "master")
@ -401,8 +401,8 @@ def v0_blueprints_new():
blueprint = recipe_from_toml(request.data)
else:
blueprint = recipe_from_dict(request.get_json(cache=False))
if VALID_API_STRING.match(blueprint["name"]) is None:
if VALID_BLUEPRINT_NAME.match(blueprint["name"]) is None:
return jsonify(status=False, errors=[{"id": INVALID_CHARS, "msg": "Invalid characters in API path"}]), 400
with api.config["GITLOCK"].lock:
@ -433,7 +433,7 @@ def v0_blueprints_delete(blueprint_name):
The response will be a status response with `status` set to true, or an
error response with it set to false and an error message included.
"""
if VALID_API_STRING.match(blueprint_name) is None:
if VALID_BLUEPRINT_NAME.match(blueprint_name) is None:
return jsonify(status=False, errors=[{"id": INVALID_CHARS, "msg": "Invalid characters in API path"}]), 400
branch = request.args.get("branch", "master")
@ -476,7 +476,7 @@ def v0_blueprints_workspace():
else:
blueprint = recipe_from_dict(request.get_json(cache=False))
if VALID_API_STRING.match(blueprint["name"]) is None:
if VALID_BLUEPRINT_NAME.match(blueprint["name"]) is None:
return jsonify(status=False, errors=[{"id": INVALID_CHARS, "msg": "Invalid characters in API path"}]), 400
with api.config["GITLOCK"].lock:
@ -502,7 +502,7 @@ def v0_blueprints_delete_workspace(blueprint_name):
The response will be a status response with `status` set to true, or an
error response with it set to false and an error message included.
"""
if VALID_API_STRING.match(blueprint_name) is None:
if VALID_BLUEPRINT_NAME.match(blueprint_name) is None:
return jsonify(status=False, errors=[{"id": INVALID_CHARS, "msg": "Invalid characters in API path"}]), 400
branch = request.args.get("branch", "master")
@ -534,7 +534,7 @@ def v0_blueprints_undo(blueprint_name, commit):
The response will be a status response with `status` set to true, or an
error response with it set to false and an error message included.
"""
if VALID_API_STRING.match(blueprint_name) is None:
if VALID_BLUEPRINT_NAME.match(blueprint_name) is None:
return jsonify(status=False, errors=[{"id": INVALID_CHARS, "msg": "Invalid characters in API path"}]), 400
branch = request.args.get("branch", "master")
@ -570,7 +570,7 @@ def v0_blueprints_tag(blueprint_name):
The response will be a status response with `status` set to true, or an
error response with it set to false and an error message included.
"""
if VALID_API_STRING.match(blueprint_name) is None:
if VALID_BLUEPRINT_NAME.match(blueprint_name) is None:
return jsonify(status=False, errors=[{"id": INVALID_CHARS, "msg": "Invalid characters in API path"}]), 400
branch = request.args.get("branch", "master")
@ -734,7 +734,7 @@ def v0_blueprints_freeze(blueprint_names):
]
}
"""
if VALID_API_STRING.match(blueprint_names) is None:
if any(VALID_BLUEPRINT_NAME.match(blueprint_name) is None for blueprint_name in blueprint_names.split(',')):
return jsonify(status=False, errors=[{"id": INVALID_CHARS, "msg": "Invalid characters in API path"}]), 400
branch = request.args.get("branch", "master")
@ -875,7 +875,7 @@ def v0_blueprints_depsolve(blueprint_names):
]
}
"""
if VALID_API_STRING.match(blueprint_names) is None:
if any(VALID_BLUEPRINT_NAME.match(blueprint_name) is None for blueprint_name in blueprint_names.split(',')):
return jsonify(status=False, errors=[{"id": INVALID_CHARS, "msg": "Invalid characters in API path"}]), 400
branch = request.args.get("branch", "master")
@ -1520,7 +1520,7 @@ def v0_compose_start():
else:
compose_type = compose["compose_type"]
if VALID_API_STRING.match(blueprint_name) is None:
if VALID_BLUEPRINT_NAME.match(blueprint_name) is None:
errors.append({"id": INVALID_CHARS, "msg": "Invalid characters in API path"})
if not blueprint_exists(branch, blueprint_name):

View File

@ -3,6 +3,7 @@
import composertest
import requests
import subprocess
import json
class TestApi(composertest.ComposerTestCase):
@ -30,9 +31,9 @@ class TestApi(composertest.ComposerTestCase):
self.forwarder_proc.kill()
super().tearDown()
def request(self, method, path, check=True):
def request(self, method, path, json=None, check=True):
self.assertEqual(path[0], "/")
r = requests.request(method, f"http://localhost:{self.composer_port}{path}", timeout=30)
r = requests.request(method, f"http://localhost:{self.composer_port}{path}", json=json, timeout=30)
if check:
r.raise_for_status()
return r
@ -67,6 +68,24 @@ class TestApi(composertest.ComposerTestCase):
"errors": [{ "id": "HTTPError", "code": 405, "msg": "Method Not Allowed" }]
})
#
# API create blueprint with InvalidChars
#
invalid_blueprint = {
"name": "Name,With,Commas",
"description": "",
"version": "0.0.1",
"modules": [],
"groups": []
}
r = self.request("POST", "/api/v0/blueprints/new", json=invalid_blueprint, check=False)
self.assertEqual(r.status_code, 400)
self.assertEqual(r.json(), {
"status": False,
"errors": [{ "id": "InvalidChars", "code": 400, "msg": "Invalid characters in API path" }]
})
if __name__ == '__main__':
composertest.main()