diff --git a/0004-Backport-Vary-Cookie-fix-from-upstream.patch b/0004-Backport-Vary-Cookie-fix-from-upstream.patch new file mode 100644 index 0000000..c985c72 --- /dev/null +++ b/0004-Backport-Vary-Cookie-fix-from-upstream.patch @@ -0,0 +1,182 @@ +From 4195dc236d4286d6d82b3a40dc595a1437afcdc9 Mon Sep 17 00:00:00 2001 +From: "Brian C. Lane" +Date: Tue, 9 May 2023 15:13:34 -0700 +Subject: [PATCH 4/5] Backport Vary: Cookie fix from upstream + +This fixes CVE-2023-30861 by backporting the patch and tests from this +upstream commit: +https://github.com/pallets/flask/commit/70f906c51ce49c485f1d355703e9cc3386b1cc2b + +Resolves: rhbz#2196683 +--- + flask/sessions.py | 6 +++ + tests/test_basic.py | 90 ++++++++++++++++++++++++++++++++++++++++++++- + tests/test_ext.py | 2 +- + 3 files changed, 95 insertions(+), 3 deletions(-) + +diff --git a/flask/sessions.py b/flask/sessions.py +index 525ff246..2f60e7eb 100644 +--- a/flask/sessions.py ++++ b/flask/sessions.py +@@ -338,6 +338,10 @@ class SecureCookieSessionInterface(SessionInterface): + domain = self.get_cookie_domain(app) + path = self.get_cookie_path(app) + ++ # Add a "Vary: Cookie" header if the session was accessed at all. ++ if session.accessed: ++ response.vary.add("Cookie") ++ + # Delete case. If there is no session we bail early. + # If the session was modified to be empty we remove the + # whole cookie. +@@ -345,6 +349,7 @@ class SecureCookieSessionInterface(SessionInterface): + if session.modified: + response.delete_cookie(app.session_cookie_name, + domain=domain, path=path) ++ response.vary.add("Cookie") + return + + # Modification case. There are upsides and downsides to +@@ -364,3 +369,4 @@ class SecureCookieSessionInterface(SessionInterface): + response.set_cookie(app.session_cookie_name, val, + expires=expires, httponly=httponly, + domain=domain, path=path, secure=secure) ++ response.vary.add("Cookie") +diff --git a/tests/test_basic.py b/tests/test_basic.py +index c5ec9f5c..85555300 100644 +--- a/tests/test_basic.py ++++ b/tests/test_basic.py +@@ -11,6 +11,7 @@ + + import pytest + ++import os + import re + import uuid + import time +@@ -196,7 +197,8 @@ def test_session(): + + @app.route('/get') + def get(): +- return flask.session['value'] ++ v = flask.session.get("value", "None") ++ return v + + c = app.test_client() + assert c.post('/set', data={'value': '42'}).data == b'value set' +@@ -333,7 +335,7 @@ def test_session_expiration(): + client = app.test_client() + rv = client.get('/') + assert 'set-cookie' in rv.headers +- match = re.search(r'\bexpires=([^;]+)(?i)', rv.headers['set-cookie']) ++ match = re.search(r'(?i)\bexpires=([^;]+)', rv.headers['set-cookie']) + expires = parse_date(match.group()) + expected = datetime.utcnow() + app.permanent_session_lifetime + assert expires.year == expected.year +@@ -445,6 +447,90 @@ def test_session_cookie_setting(): + run_test(expect_header=False) + + ++def test_session_vary_cookie(): ++ app = flask.Flask("flask_test", root_path=os.path.dirname(__file__)) ++ app.config.update( ++ TESTING=True, ++ SECRET_KEY="test key", ++ ) ++ client = app.test_client() ++ ++ @app.route("/set") ++ def set_session(): ++ flask.session["test"] = "test" ++ return "" ++ @app.route("/get") ++ def get(): ++ return flask.session.get("test") ++ @app.route("/getitem") ++ def getitem(): ++ return flask.session["test"] ++ @app.route("/setdefault") ++ def setdefault(): ++ return flask.session.setdefault("test", "default") ++ ++ @app.route("/clear") ++ def clear(): ++ flask.session.clear() ++ return "" ++ ++ @app.route("/vary-cookie-header-set") ++ def vary_cookie_header_set(): ++ response = flask.Response() ++ response.vary.add("Cookie") ++ flask.session["test"] = "test" ++ return response ++ @app.route("/vary-header-set") ++ def vary_header_set(): ++ response = flask.Response() ++ response.vary.update(("Accept-Encoding", "Accept-Language")) ++ flask.session["test"] = "test" ++ return response ++ @app.route("/no-vary-header") ++ def no_vary_header(): ++ return "" ++ def expect(path, header_value="Cookie"): ++ rv = client.get(path) ++ if header_value: ++ # The 'Vary' key should exist in the headers only once. ++ assert len(rv.headers.get_all("Vary")) == 1 ++ assert rv.headers["Vary"] == header_value ++ else: ++ assert "Vary" not in rv.headers ++ expect("/set") ++ expect("/get") ++ expect("/getitem") ++ expect("/setdefault") ++ expect("/clear") ++ expect("/vary-cookie-header-set") ++ expect("/vary-header-set", "Accept-Encoding, Accept-Language, Cookie") ++ expect("/no-vary-header", None) ++ ++ ++def test_session_refresh_vary(): ++ app = flask.Flask("flask_test", root_path=os.path.dirname(__file__)) ++ app.config.update( ++ TESTING=True, ++ SECRET_KEY="test key", ++ ) ++ client = app.test_client() ++ ++ @app.route("/login") ++ def login(): ++ flask.session["user_id"] = 1 ++ flask.session.permanent = True ++ return "" ++ ++ @app.route("/ignored") ++ def ignored(): ++ return "" ++ ++ rv = client.get("/login") ++ assert rv.headers["Vary"] == "Cookie" ++ rv = client.get("/ignored") ++ assert rv.headers["Vary"] == "Cookie" ++ ++ + def test_flashes(): + app = flask.Flask(__name__) + app.secret_key = 'testkey' +diff --git a/tests/test_ext.py b/tests/test_ext.py +index d336e404..c3e23cdf 100644 +--- a/tests/test_ext.py ++++ b/tests/test_ext.py +@@ -180,7 +180,7 @@ def test_no_error_swallowing(flaskext_broken): + with pytest.raises(ImportError) as excinfo: + import flask.ext.broken + +- assert excinfo.type is ImportError ++ assert excinfo.type in (ImportError, ModuleNotFoundError) + if PY2: + message = 'No module named missing_module' + else: +-- +2.40.1 + diff --git a/0005-Backport-support-for-the-accessed-attribute.patch b/0005-Backport-support-for-the-accessed-attribute.patch new file mode 100644 index 0000000..acf91bd --- /dev/null +++ b/0005-Backport-support-for-the-accessed-attribute.patch @@ -0,0 +1,107 @@ +From 640ff67e9d59d7acd786683bbab422d8aa17211c Mon Sep 17 00:00:00 2001 +From: "Brian C. Lane" +Date: Tue, 9 May 2023 15:36:11 -0700 +Subject: [PATCH 5/5] Backport support for the accessed attribute + +This is added to the SessionMixin and SecureCookieSession to support +fixing CVE-2023-30861. + +Related: rhbz#2196683 +--- + flask/sessions.py | 40 +++++++++++++++++++++++++++++++++++++++- + tests/test_basic.py | 8 ++++++++ + 2 files changed, 47 insertions(+), 1 deletion(-) + +diff --git a/flask/sessions.py b/flask/sessions.py +index 2f60e7eb..278c3583 100644 +--- a/flask/sessions.py ++++ b/flask/sessions.py +@@ -48,6 +48,11 @@ class SessionMixin(object): + #: The default mixin implementation just hardcodes ``True`` in. + modified = True + ++ #: Some implementations can detect when session data is read or ++ #: written and set this when that happens. The mixin default is hard ++ #: coded to ``True``. ++ accessed = True ++ + + def _tag(value): + if isinstance(value, tuple): +@@ -111,14 +116,47 @@ session_json_serializer = TaggedJSONSerializer() + + + class SecureCookieSession(CallbackDict, SessionMixin): +- """Base class for sessions based on signed cookies.""" ++ """Base class for sessions based on signed cookies. ++ ++ This session backend will set the :attr:`modified` and ++ :attr:`accessed` attributes. It cannot reliably track whether a ++ session is new (vs. empty), so :attr:`new` remains hard coded to ++ ``False``. ++ """ ++ ++ #: When data is changed, this is set to ``True``. Only the session ++ #: dictionary itself is tracked; if the session contains mutable ++ #: data (for example a nested dict) then this must be set to ++ #: ``True`` manually when modifying that data. The session cookie ++ #: will only be written to the response if this is ``True``. ++ modified = False ++ ++ #: When data is read or written, this is set to ``True``. Used by ++ # :class:`.SecureCookieSessionInterface` to add a ``Vary: Cookie`` ++ #: header, which allows caching proxies to cache different pages for ++ #: different users. ++ accessed = False ++ + + def __init__(self, initial=None): + def on_update(self): + self.modified = True ++ self.accessed = True + CallbackDict.__init__(self, initial, on_update) + self.modified = False + ++ def __getitem__(self, key): ++ self.accessed = True ++ return super().__getitem__(key) ++ ++ def get(self, key, default=None): ++ self.accessed = True ++ return super().get(key, default) ++ ++ def setdefault(self, key, default=None): ++ self.accessed = True ++ return super().setdefault(key, default) ++ + + class NullSession(SecureCookieSession): + """Class used to generate nicer error messages if sessions are not +diff --git a/tests/test_basic.py b/tests/test_basic.py +index 85555300..5ad8c3de 100644 +--- a/tests/test_basic.py ++++ b/tests/test_basic.py +@@ -192,12 +192,20 @@ def test_session(): + + @app.route('/set', methods=['POST']) + def set(): ++ assert not flask.session.accessed ++ assert not flask.session.modified + flask.session['value'] = flask.request.form['value'] ++ assert flask.session.accessed ++ assert flask.session.modified + return 'value set' + + @app.route('/get') + def get(): ++ assert not flask.session.accessed ++ assert not flask.session.modified + v = flask.session.get("value", "None") ++ assert flask.session.accessed ++ assert not flask.session.modified + return v + + c = app.test_client() +-- +2.40.1 + diff --git a/EMPTY b/EMPTY deleted file mode 100644 index 0519ecb..0000000 --- a/EMPTY +++ /dev/null @@ -1 +0,0 @@ - \ No newline at end of file diff --git a/python-flask.spec b/python-flask.spec index d149dfa..113b78f 100644 --- a/python-flask.spec +++ b/python-flask.spec @@ -10,7 +10,7 @@ Name: python-%{modname} Version: 0.12.2 -Release: 4%{?dist} +Release: 5%{?dist} Epoch: 1 Summary: A micro-framework for Python based on Werkzeug, Jinja 2 and good intentions @@ -28,6 +28,11 @@ Patch0001: 0001-detect-UTF-encodings-when-loading-json.patch Patch0002: 0002-Fix-ValueError-for-some-invalid-Range-requests.patch Patch0003: 0003-be-smarter-about-adding-.cli-to-reloader-command.patch +# rhbz#2196683 +# Backport fix for CVE-2023-30861 +Patch0004: 0004-Backport-Vary-Cookie-fix-from-upstream.patch +Patch0005: 0005-Backport-support-for-the-accessed-attribute.patch + BuildArch: noarch %global _description \ @@ -71,7 +76,7 @@ Requires: python-itsdangerous %description -n python2-%{modname} %{_description} Python 2 version. -%endif # with python2 +%endif %package -n python%{python3_pkgversion}-%{modname} Summary: %{summary} @@ -108,7 +113,7 @@ rm -rf examples/minitwit/ %build %if %{with python2} %py2_build -%endif # with python2 +%endif %py3_build PYTHONPATH=`pwd` sphinx-build-3 -b html docs/ docs/_build/html/ rm -rf docs/_build/html/{.buildinfo,.doctrees} @@ -118,7 +123,7 @@ rm -rf docs/_build/html/{.buildinfo,.doctrees} %py2_install mv %{buildroot}%{_bindir}/%{modname}{,-%{python2_version}} ln -s %{modname}-%{python2_version} %{buildroot}%{_bindir}/%{modname}-2 -%endif # with python2 +%endif %py3_install mv %{buildroot}%{_bindir}/%{modname}{,-%{python3_version}} @@ -128,13 +133,13 @@ ln -s %{modname}-%{python3_version} %{buildroot}%{_bindir}/%{modname}-3 ln -sf %{modname}-2 %{buildroot}%{_bindir}/%{modname} %else ln -sf %{modname}-3 %{buildroot}%{_bindir}/%{modname} -%endif # with python2 +%endif %check export LC_ALL=C.UTF-8 %if %{with python2} PYTHONPATH=%{buildroot}%{python2_sitelib} py.test-%{python2_version} -v -%endif # with python2 +%endif PYTHONPATH=%{buildroot}%{python3_sitelib} py.test-%{python3_version} -v || : %if %{with python2} @@ -147,7 +152,7 @@ PYTHONPATH=%{buildroot}%{python3_sitelib} py.test-%{python3_version} -v || : %{python2_sitelib}/%{modname}/ %{_bindir}/%{modname} -%endif # with python2 +%endif %files -n python%{python3_pkgversion}-%{modname} %license LICENSE @@ -159,13 +164,18 @@ PYTHONPATH=%{buildroot}%{python3_sitelib} py.test-%{python3_version} -v || : %if %{without python2} %{_bindir}/%{modname} -%endif # without python2 +%endif %files doc %license LICENSE %doc docs/_build/html examples %changelog +* Wed May 10 2023 Brian C. Lane - 0.12.2-5 +- Copy missing tests over from distgit +- Backport fix for CVE-2023-30861 + Resolves: rhbz#2196683 + * Thu Nov 07 2019 Brian C. Lane - 0.12.2-4 - Add upstream changes from 0.12.4 Resolves: rhbz#1585318 diff --git a/tests/scripts/run_tests.sh b/tests/scripts/run_tests.sh new file mode 100755 index 0000000..488aac7 --- /dev/null +++ b/tests/scripts/run_tests.sh @@ -0,0 +1,4 @@ +#!/usr/bin/bash +set -eux +# NOTE: test_helpers only contains the TestJSON class from upstream +pytest-3 ./test_basic.py ./test_helpers.py diff --git a/tests/scripts/static/config.json b/tests/scripts/static/config.json new file mode 100644 index 0000000..4a9722e --- /dev/null +++ b/tests/scripts/static/config.json @@ -0,0 +1,4 @@ +{ + "TEST_KEY": "foo", + "SECRET_KEY": "devkey" +} diff --git a/tests/scripts/static/index.html b/tests/scripts/static/index.html new file mode 100644 index 0000000..de8b69b --- /dev/null +++ b/tests/scripts/static/index.html @@ -0,0 +1 @@ +

Hello World!

diff --git a/tests/scripts/test_basic.py b/tests/scripts/test_basic.py new file mode 100644 index 0000000..5ad8c3d --- /dev/null +++ b/tests/scripts/test_basic.py @@ -0,0 +1,1794 @@ +# -*- coding: utf-8 -*- +""" + tests.basic + ~~~~~~~~~~~~~~~~~~~~~ + + The basic functionality. + + :copyright: (c) 2015 by Armin Ronacher. + :license: BSD, see LICENSE for more details. +""" + +import pytest + +import os +import re +import uuid +import time +import flask +import pickle +from datetime import datetime +from threading import Thread +from flask._compat import text_type +from werkzeug.exceptions import BadRequest, NotFound, Forbidden +from werkzeug.http import parse_date +from werkzeug.routing import BuildError +import werkzeug.serving + + +def test_options_work(): + app = flask.Flask(__name__) + + @app.route('/', methods=['GET', 'POST']) + def index(): + return 'Hello World' + rv = app.test_client().open('/', method='OPTIONS') + assert sorted(rv.allow) == ['GET', 'HEAD', 'OPTIONS', 'POST'] + assert rv.data == b'' + + +def test_options_on_multiple_rules(): + app = flask.Flask(__name__) + + @app.route('/', methods=['GET', 'POST']) + def index(): + return 'Hello World' + + @app.route('/', methods=['PUT']) + def index_put(): + return 'Aha!' + rv = app.test_client().open('/', method='OPTIONS') + assert sorted(rv.allow) == ['GET', 'HEAD', 'OPTIONS', 'POST', 'PUT'] + + +def test_options_handling_disabled(): + app = flask.Flask(__name__) + + def index(): + return 'Hello World!' + index.provide_automatic_options = False + app.route('/')(index) + rv = app.test_client().open('/', method='OPTIONS') + assert rv.status_code == 405 + + app = flask.Flask(__name__) + + def index2(): + return 'Hello World!' + index2.provide_automatic_options = True + app.route('/', methods=['OPTIONS'])(index2) + rv = app.test_client().open('/', method='OPTIONS') + assert sorted(rv.allow) == ['OPTIONS'] + + +def test_request_dispatching(): + app = flask.Flask(__name__) + + @app.route('/') + def index(): + return flask.request.method + + @app.route('/more', methods=['GET', 'POST']) + def more(): + return flask.request.method + + c = app.test_client() + assert c.get('/').data == b'GET' + rv = c.post('/') + assert rv.status_code == 405 + assert sorted(rv.allow) == ['GET', 'HEAD', 'OPTIONS'] + rv = c.head('/') + assert rv.status_code == 200 + assert not rv.data # head truncates + assert c.post('/more').data == b'POST' + assert c.get('/more').data == b'GET' + rv = c.delete('/more') + assert rv.status_code == 405 + assert sorted(rv.allow) == ['GET', 'HEAD', 'OPTIONS', 'POST'] + + +def test_disallow_string_for_allowed_methods(): + app = flask.Flask(__name__) + with pytest.raises(TypeError): + @app.route('/', methods='GET POST') + def index(): + return "Hey" + + +def test_url_mapping(): + app = flask.Flask(__name__) + + random_uuid4 = "7eb41166-9ebf-4d26-b771-ea3f54f8b383" + + def index(): + return flask.request.method + + def more(): + return flask.request.method + + def options(): + return random_uuid4 + + + app.add_url_rule('/', 'index', index) + app.add_url_rule('/more', 'more', more, methods=['GET', 'POST']) + + # Issue 1288: Test that automatic options are not added when non-uppercase 'options' in methods + app.add_url_rule('/options', 'options', options, methods=['options']) + + c = app.test_client() + assert c.get('/').data == b'GET' + rv = c.post('/') + assert rv.status_code == 405 + assert sorted(rv.allow) == ['GET', 'HEAD', 'OPTIONS'] + rv = c.head('/') + assert rv.status_code == 200 + assert not rv.data # head truncates + assert c.post('/more').data == b'POST' + assert c.get('/more').data == b'GET' + rv = c.delete('/more') + assert rv.status_code == 405 + assert sorted(rv.allow) == ['GET', 'HEAD', 'OPTIONS', 'POST'] + rv = c.open('/options', method='OPTIONS') + assert rv.status_code == 200 + assert random_uuid4 in rv.data.decode("utf-8") + + +def test_werkzeug_routing(): + from werkzeug.routing import Submount, Rule + app = flask.Flask(__name__) + app.url_map.add(Submount('/foo', [ + Rule('/bar', endpoint='bar'), + Rule('/', endpoint='index') + ])) + + def bar(): + return 'bar' + + def index(): + return 'index' + app.view_functions['bar'] = bar + app.view_functions['index'] = index + + c = app.test_client() + assert c.get('/foo/').data == b'index' + assert c.get('/foo/bar').data == b'bar' + + +def test_endpoint_decorator(): + from werkzeug.routing import Submount, Rule + app = flask.Flask(__name__) + app.url_map.add(Submount('/foo', [ + Rule('/bar', endpoint='bar'), + Rule('/', endpoint='index') + ])) + + @app.endpoint('bar') + def bar(): + return 'bar' + + @app.endpoint('index') + def index(): + return 'index' + + c = app.test_client() + assert c.get('/foo/').data == b'index' + assert c.get('/foo/bar').data == b'bar' + + +def test_session(): + app = flask.Flask(__name__) + app.secret_key = 'testkey' + + @app.route('/set', methods=['POST']) + def set(): + assert not flask.session.accessed + assert not flask.session.modified + flask.session['value'] = flask.request.form['value'] + assert flask.session.accessed + assert flask.session.modified + return 'value set' + + @app.route('/get') + def get(): + assert not flask.session.accessed + assert not flask.session.modified + v = flask.session.get("value", "None") + assert flask.session.accessed + assert not flask.session.modified + return v + + c = app.test_client() + assert c.post('/set', data={'value': '42'}).data == b'value set' + assert c.get('/get').data == b'42' + + +def test_session_using_server_name(): + app = flask.Flask(__name__) + app.config.update( + SECRET_KEY='foo', + SERVER_NAME='example.com' + ) + + @app.route('/') + def index(): + flask.session['testing'] = 42 + return 'Hello World' + rv = app.test_client().get('/', 'http://example.com/') + assert 'domain=.example.com' in rv.headers['set-cookie'].lower() + assert 'httponly' in rv.headers['set-cookie'].lower() + + +def test_session_using_server_name_and_port(): + app = flask.Flask(__name__) + app.config.update( + SECRET_KEY='foo', + SERVER_NAME='example.com:8080' + ) + + @app.route('/') + def index(): + flask.session['testing'] = 42 + return 'Hello World' + rv = app.test_client().get('/', 'http://example.com:8080/') + assert 'domain=.example.com' in rv.headers['set-cookie'].lower() + assert 'httponly' in rv.headers['set-cookie'].lower() + + +def test_session_using_server_name_port_and_path(): + app = flask.Flask(__name__) + app.config.update( + SECRET_KEY='foo', + SERVER_NAME='example.com:8080', + APPLICATION_ROOT='/foo' + ) + + @app.route('/') + def index(): + flask.session['testing'] = 42 + return 'Hello World' + rv = app.test_client().get('/', 'http://example.com:8080/foo') + assert 'domain=example.com' in rv.headers['set-cookie'].lower() + assert 'path=/foo' in rv.headers['set-cookie'].lower() + assert 'httponly' in rv.headers['set-cookie'].lower() + + +def test_session_using_application_root(): + class PrefixPathMiddleware(object): + + def __init__(self, app, prefix): + self.app = app + self.prefix = prefix + + def __call__(self, environ, start_response): + environ['SCRIPT_NAME'] = self.prefix + return self.app(environ, start_response) + + app = flask.Flask(__name__) + app.wsgi_app = PrefixPathMiddleware(app.wsgi_app, '/bar') + app.config.update( + SECRET_KEY='foo', + APPLICATION_ROOT='/bar' + ) + + @app.route('/') + def index(): + flask.session['testing'] = 42 + return 'Hello World' + rv = app.test_client().get('/', 'http://example.com:8080/') + assert 'path=/bar' in rv.headers['set-cookie'].lower() + + +def test_session_using_session_settings(): + app = flask.Flask(__name__) + app.config.update( + SECRET_KEY='foo', + SERVER_NAME='www.example.com:8080', + APPLICATION_ROOT='/test', + SESSION_COOKIE_DOMAIN='.example.com', + SESSION_COOKIE_HTTPONLY=False, + SESSION_COOKIE_SECURE=True, + SESSION_COOKIE_PATH='/' + ) + + @app.route('/') + def index(): + flask.session['testing'] = 42 + return 'Hello World' + rv = app.test_client().get('/', 'http://www.example.com:8080/test/') + cookie = rv.headers['set-cookie'].lower() + assert 'domain=.example.com' in cookie + assert 'path=/' in cookie + assert 'secure' in cookie + assert 'httponly' not in cookie + + +def test_missing_session(): + app = flask.Flask(__name__) + + def expect_exception(f, *args, **kwargs): + e = pytest.raises(RuntimeError, f, *args, **kwargs) + assert e.value.args and 'session is unavailable' in e.value.args[0] + with app.test_request_context(): + assert flask.session.get('missing_key') is None + expect_exception(flask.session.__setitem__, 'foo', 42) + expect_exception(flask.session.pop, 'foo') + + +def test_session_expiration(): + permanent = True + app = flask.Flask(__name__) + app.secret_key = 'testkey' + + @app.route('/') + def index(): + flask.session['test'] = 42 + flask.session.permanent = permanent + return '' + + @app.route('/test') + def test(): + return text_type(flask.session.permanent) + + client = app.test_client() + rv = client.get('/') + assert 'set-cookie' in rv.headers + match = re.search(r'(?i)\bexpires=([^;]+)', rv.headers['set-cookie']) + expires = parse_date(match.group()) + expected = datetime.utcnow() + app.permanent_session_lifetime + assert expires.year == expected.year + assert expires.month == expected.month + assert expires.day == expected.day + + rv = client.get('/test') + assert rv.data == b'True' + + permanent = False + rv = app.test_client().get('/') + assert 'set-cookie' in rv.headers + match = re.search(r'\bexpires=([^;]+)', rv.headers['set-cookie']) + assert match is None + + +def test_session_stored_last(): + app = flask.Flask(__name__) + app.secret_key = 'development-key' + app.testing = True + + @app.after_request + def modify_session(response): + flask.session['foo'] = 42 + return response + + @app.route('/') + def dump_session_contents(): + return repr(flask.session.get('foo')) + + c = app.test_client() + assert c.get('/').data == b'None' + assert c.get('/').data == b'42' + + +def test_session_special_types(): + app = flask.Flask(__name__) + app.secret_key = 'development-key' + app.testing = True + now = datetime.utcnow().replace(microsecond=0) + the_uuid = uuid.uuid4() + + @app.after_request + def modify_session(response): + flask.session['m'] = flask.Markup('Hello!') + flask.session['u'] = the_uuid + flask.session['dt'] = now + flask.session['b'] = b'\xff' + flask.session['t'] = (1, 2, 3) + return response + + @app.route('/') + def dump_session_contents(): + return pickle.dumps(dict(flask.session)) + + c = app.test_client() + c.get('/') + rv = pickle.loads(c.get('/').data) + assert rv['m'] == flask.Markup('Hello!') + assert type(rv['m']) == flask.Markup + assert rv['dt'] == now + assert rv['u'] == the_uuid + assert rv['b'] == b'\xff' + assert type(rv['b']) == bytes + assert rv['t'] == (1, 2, 3) + + +def test_session_cookie_setting(): + app = flask.Flask(__name__) + app.testing = True + app.secret_key = 'dev key' + is_permanent = True + + @app.route('/bump') + def bump(): + rv = flask.session['foo'] = flask.session.get('foo', 0) + 1 + flask.session.permanent = is_permanent + return str(rv) + + @app.route('/read') + def read(): + return str(flask.session.get('foo', 0)) + + def run_test(expect_header): + with app.test_client() as c: + assert c.get('/bump').data == b'1' + assert c.get('/bump').data == b'2' + assert c.get('/bump').data == b'3' + + rv = c.get('/read') + set_cookie = rv.headers.get('set-cookie') + assert (set_cookie is not None) == expect_header + assert rv.data == b'3' + + is_permanent = True + app.config['SESSION_REFRESH_EACH_REQUEST'] = True + run_test(expect_header=True) + + is_permanent = True + app.config['SESSION_REFRESH_EACH_REQUEST'] = False + run_test(expect_header=False) + + is_permanent = False + app.config['SESSION_REFRESH_EACH_REQUEST'] = True + run_test(expect_header=False) + + is_permanent = False + app.config['SESSION_REFRESH_EACH_REQUEST'] = False + run_test(expect_header=False) + + +def test_session_vary_cookie(): + app = flask.Flask("flask_test", root_path=os.path.dirname(__file__)) + app.config.update( + TESTING=True, + SECRET_KEY="test key", + ) + client = app.test_client() + + @app.route("/set") + def set_session(): + flask.session["test"] = "test" + return "" + @app.route("/get") + def get(): + return flask.session.get("test") + @app.route("/getitem") + def getitem(): + return flask.session["test"] + @app.route("/setdefault") + def setdefault(): + return flask.session.setdefault("test", "default") + + @app.route("/clear") + def clear(): + flask.session.clear() + return "" + + @app.route("/vary-cookie-header-set") + def vary_cookie_header_set(): + response = flask.Response() + response.vary.add("Cookie") + flask.session["test"] = "test" + return response + @app.route("/vary-header-set") + def vary_header_set(): + response = flask.Response() + response.vary.update(("Accept-Encoding", "Accept-Language")) + flask.session["test"] = "test" + return response + @app.route("/no-vary-header") + def no_vary_header(): + return "" + def expect(path, header_value="Cookie"): + rv = client.get(path) + if header_value: + # The 'Vary' key should exist in the headers only once. + assert len(rv.headers.get_all("Vary")) == 1 + assert rv.headers["Vary"] == header_value + else: + assert "Vary" not in rv.headers + expect("/set") + expect("/get") + expect("/getitem") + expect("/setdefault") + expect("/clear") + expect("/vary-cookie-header-set") + expect("/vary-header-set", "Accept-Encoding, Accept-Language, Cookie") + expect("/no-vary-header", None) + + +def test_session_refresh_vary(): + app = flask.Flask("flask_test", root_path=os.path.dirname(__file__)) + app.config.update( + TESTING=True, + SECRET_KEY="test key", + ) + client = app.test_client() + + @app.route("/login") + def login(): + flask.session["user_id"] = 1 + flask.session.permanent = True + return "" + + @app.route("/ignored") + def ignored(): + return "" + + rv = client.get("/login") + assert rv.headers["Vary"] == "Cookie" + rv = client.get("/ignored") + assert rv.headers["Vary"] == "Cookie" + + +def test_flashes(): + app = flask.Flask(__name__) + app.secret_key = 'testkey' + + with app.test_request_context(): + assert not flask.session.modified + flask.flash('Zap') + flask.session.modified = False + flask.flash('Zip') + assert flask.session.modified + assert list(flask.get_flashed_messages()) == ['Zap', 'Zip'] + + +def test_extended_flashing(): + # Be sure app.testing=True below, else tests can fail silently. + # + # Specifically, if app.testing is not set to True, the AssertionErrors + # in the view functions will cause a 500 response to the test client + # instead of propagating exceptions. + + app = flask.Flask(__name__) + app.secret_key = 'testkey' + app.testing = True + + @app.route('/') + def index(): + flask.flash(u'Hello World') + flask.flash(u'Hello World', 'error') + flask.flash(flask.Markup(u'Testing'), 'warning') + return '' + + @app.route('/test/') + def test(): + messages = flask.get_flashed_messages() + assert list(messages) == [ + u'Hello World', + u'Hello World', + flask.Markup(u'Testing') + ] + return '' + + @app.route('/test_with_categories/') + def test_with_categories(): + messages = flask.get_flashed_messages(with_categories=True) + assert len(messages) == 3 + assert list(messages) == [ + ('message', u'Hello World'), + ('error', u'Hello World'), + ('warning', flask.Markup(u'Testing')) + ] + return '' + + @app.route('/test_filter/') + def test_filter(): + messages = flask.get_flashed_messages( + category_filter=['message'], with_categories=True) + assert list(messages) == [('message', u'Hello World')] + return '' + + @app.route('/test_filters/') + def test_filters(): + messages = flask.get_flashed_messages( + category_filter=['message', 'warning'], with_categories=True) + assert list(messages) == [ + ('message', u'Hello World'), + ('warning', flask.Markup(u'Testing')) + ] + return '' + + @app.route('/test_filters_without_returning_categories/') + def test_filters2(): + messages = flask.get_flashed_messages( + category_filter=['message', 'warning']) + assert len(messages) == 2 + assert messages[0] == u'Hello World' + assert messages[1] == flask.Markup(u'Testing') + return '' + + # Create new test client on each test to clean flashed messages. + + c = app.test_client() + c.get('/') + c.get('/test/') + + c = app.test_client() + c.get('/') + c.get('/test_with_categories/') + + c = app.test_client() + c.get('/') + c.get('/test_filter/') + + c = app.test_client() + c.get('/') + c.get('/test_filters/') + + c = app.test_client() + c.get('/') + c.get('/test_filters_without_returning_categories/') + + +def test_request_processing(): + app = flask.Flask(__name__) + evts = [] + + @app.before_request + def before_request(): + evts.append('before') + + @app.after_request + def after_request(response): + response.data += b'|after' + evts.append('after') + return response + + @app.route('/') + def index(): + assert 'before' in evts + assert 'after' not in evts + return 'request' + assert 'after' not in evts + rv = app.test_client().get('/').data + assert 'after' in evts + assert rv == b'request|after' + + +def test_request_preprocessing_early_return(): + app = flask.Flask(__name__) + evts = [] + + @app.before_request + def before_request1(): + evts.append(1) + + @app.before_request + def before_request2(): + evts.append(2) + return "hello" + + @app.before_request + def before_request3(): + evts.append(3) + return "bye" + + @app.route('/') + def index(): + evts.append('index') + return "damnit" + + rv = app.test_client().get('/').data.strip() + assert rv == b'hello' + assert evts == [1, 2] + + +def test_after_request_processing(): + app = flask.Flask(__name__) + app.testing = True + + @app.route('/') + def index(): + @flask.after_this_request + def foo(response): + response.headers['X-Foo'] = 'a header' + return response + return 'Test' + c = app.test_client() + resp = c.get('/') + assert resp.status_code == 200 + assert resp.headers['X-Foo'] == 'a header' + + +def test_teardown_request_handler(): + called = [] + app = flask.Flask(__name__) + + @app.teardown_request + def teardown_request(exc): + called.append(True) + return "Ignored" + + @app.route('/') + def root(): + return "Response" + rv = app.test_client().get('/') + assert rv.status_code == 200 + assert b'Response' in rv.data + assert len(called) == 1 + + +def test_teardown_request_handler_debug_mode(): + called = [] + app = flask.Flask(__name__) + app.testing = True + + @app.teardown_request + def teardown_request(exc): + called.append(True) + return "Ignored" + + @app.route('/') + def root(): + return "Response" + rv = app.test_client().get('/') + assert rv.status_code == 200 + assert b'Response' in rv.data + assert len(called) == 1 + + +def test_teardown_request_handler_error(): + called = [] + app = flask.Flask(__name__) + app.config['LOGGER_HANDLER_POLICY'] = 'never' + + @app.teardown_request + def teardown_request1(exc): + assert type(exc) == ZeroDivisionError + called.append(True) + # This raises a new error and blows away sys.exc_info(), so we can + # test that all teardown_requests get passed the same original + # exception. + try: + raise TypeError() + except: + pass + + @app.teardown_request + def teardown_request2(exc): + assert type(exc) == ZeroDivisionError + called.append(True) + # This raises a new error and blows away sys.exc_info(), so we can + # test that all teardown_requests get passed the same original + # exception. + try: + raise TypeError() + except: + pass + + @app.route('/') + def fails(): + 1 // 0 + rv = app.test_client().get('/') + assert rv.status_code == 500 + assert b'Internal Server Error' in rv.data + assert len(called) == 2 + + +def test_before_after_request_order(): + called = [] + app = flask.Flask(__name__) + + @app.before_request + def before1(): + called.append(1) + + @app.before_request + def before2(): + called.append(2) + + @app.after_request + def after1(response): + called.append(4) + return response + + @app.after_request + def after2(response): + called.append(3) + return response + + @app.teardown_request + def finish1(exc): + called.append(6) + + @app.teardown_request + def finish2(exc): + called.append(5) + + @app.route('/') + def index(): + return '42' + rv = app.test_client().get('/') + assert rv.data == b'42' + assert called == [1, 2, 3, 4, 5, 6] + + +def test_error_handling(): + app = flask.Flask(__name__) + app.config['LOGGER_HANDLER_POLICY'] = 'never' + + @app.errorhandler(404) + def not_found(e): + return 'not found', 404 + + @app.errorhandler(500) + def internal_server_error(e): + return 'internal server error', 500 + + @app.errorhandler(Forbidden) + def forbidden(e): + return 'forbidden', 403 + + @app.route('/') + def index(): + flask.abort(404) + + @app.route('/error') + def error(): + 1 // 0 + + @app.route('/forbidden') + def error2(): + flask.abort(403) + c = app.test_client() + rv = c.get('/') + assert rv.status_code == 404 + assert rv.data == b'not found' + rv = c.get('/error') + assert rv.status_code == 500 + assert b'internal server error' == rv.data + rv = c.get('/forbidden') + assert rv.status_code == 403 + assert b'forbidden' == rv.data + + +def test_error_handling_processing(): + app = flask.Flask(__name__) + app.config['LOGGER_HANDLER_POLICY'] = 'never' + + @app.errorhandler(500) + def internal_server_error(e): + return 'internal server error', 500 + + @app.route('/') + def broken_func(): + 1 // 0 + + @app.after_request + def after_request(resp): + resp.mimetype = 'text/x-special' + return resp + + with app.test_client() as c: + resp = c.get('/') + assert resp.mimetype == 'text/x-special' + assert resp.data == b'internal server error' + + +def test_baseexception_error_handling(): + app = flask.Flask(__name__) + app.config['LOGGER_HANDLER_POLICY'] = 'never' + + @app.route('/') + def broken_func(): + raise KeyboardInterrupt() + + with app.test_client() as c: + with pytest.raises(KeyboardInterrupt): + c.get('/') + + ctx = flask._request_ctx_stack.top + assert ctx.preserved + assert type(ctx._preserved_exc) is KeyboardInterrupt + + +def test_before_request_and_routing_errors(): + app = flask.Flask(__name__) + + @app.before_request + def attach_something(): + flask.g.something = 'value' + + @app.errorhandler(404) + def return_something(error): + return flask.g.something, 404 + rv = app.test_client().get('/') + assert rv.status_code == 404 + assert rv.data == b'value' + + +def test_user_error_handling(): + class MyException(Exception): + pass + + app = flask.Flask(__name__) + + @app.errorhandler(MyException) + def handle_my_exception(e): + assert isinstance(e, MyException) + return '42' + + @app.route('/') + def index(): + raise MyException() + + c = app.test_client() + assert c.get('/').data == b'42' + + +def test_http_error_subclass_handling(): + class ForbiddenSubclass(Forbidden): + pass + + app = flask.Flask(__name__) + + @app.errorhandler(ForbiddenSubclass) + def handle_forbidden_subclass(e): + assert isinstance(e, ForbiddenSubclass) + return 'banana' + + @app.errorhandler(403) + def handle_forbidden_subclass(e): + assert not isinstance(e, ForbiddenSubclass) + assert isinstance(e, Forbidden) + return 'apple' + + @app.route('/1') + def index1(): + raise ForbiddenSubclass() + + @app.route('/2') + def index2(): + flask.abort(403) + + @app.route('/3') + def index3(): + raise Forbidden() + + c = app.test_client() + assert c.get('/1').data == b'banana' + assert c.get('/2').data == b'apple' + assert c.get('/3').data == b'apple' + + +def test_trapping_of_bad_request_key_errors(): + app = flask.Flask(__name__) + app.testing = True + + @app.route('/fail') + def fail(): + flask.request.form['missing_key'] + c = app.test_client() + assert c.get('/fail').status_code == 400 + + app.config['TRAP_BAD_REQUEST_ERRORS'] = True + c = app.test_client() + with pytest.raises(KeyError) as e: + c.get("/fail") + assert e.errisinstance(BadRequest) + + +def test_trapping_of_all_http_exceptions(): + app = flask.Flask(__name__) + app.testing = True + app.config['TRAP_HTTP_EXCEPTIONS'] = True + + @app.route('/fail') + def fail(): + flask.abort(404) + + c = app.test_client() + with pytest.raises(NotFound): + c.get('/fail') + + +def test_enctype_debug_helper(): + from flask.debughelpers import DebugFilesKeyError + app = flask.Flask(__name__) + app.debug = True + + @app.route('/fail', methods=['POST']) + def index(): + return flask.request.files['foo'].filename + + # with statement is important because we leave an exception on the + # stack otherwise and we want to ensure that this is not the case + # to not negatively affect other tests. + with app.test_client() as c: + with pytest.raises(DebugFilesKeyError) as e: + c.post('/fail', data={'foo': 'index.txt'}) + assert 'no file contents were transmitted' in str(e.value) + assert 'This was submitted: "index.txt"' in str(e.value) + + +def test_response_creation(): + app = flask.Flask(__name__) + + @app.route('/unicode') + def from_unicode(): + return u'Hällo Wörld' + + @app.route('/string') + def from_string(): + return u'Hällo Wörld'.encode('utf-8') + + @app.route('/args') + def from_tuple(): + return 'Meh', 400, { + 'X-Foo': 'Testing', + 'Content-Type': 'text/plain; charset=utf-8' + } + + @app.route('/two_args') + def from_two_args_tuple(): + return 'Hello', { + 'X-Foo': 'Test', + 'Content-Type': 'text/plain; charset=utf-8' + } + + @app.route('/args_status') + def from_status_tuple(): + return 'Hi, status!', 400 + + @app.route('/args_header') + def from_response_instance_status_tuple(): + return flask.Response('Hello world', 404), { + "X-Foo": "Bar", + "X-Bar": "Foo" + } + + c = app.test_client() + assert c.get('/unicode').data == u'Hällo Wörld'.encode('utf-8') + assert c.get('/string').data == u'Hällo Wörld'.encode('utf-8') + rv = c.get('/args') + assert rv.data == b'Meh' + assert rv.headers['X-Foo'] == 'Testing' + assert rv.status_code == 400 + assert rv.mimetype == 'text/plain' + rv2 = c.get('/two_args') + assert rv2.data == b'Hello' + assert rv2.headers['X-Foo'] == 'Test' + assert rv2.status_code == 200 + assert rv2.mimetype == 'text/plain' + rv3 = c.get('/args_status') + assert rv3.data == b'Hi, status!' + assert rv3.status_code == 400 + assert rv3.mimetype == 'text/html' + rv4 = c.get('/args_header') + assert rv4.data == b'Hello world' + assert rv4.headers['X-Foo'] == 'Bar' + assert rv4.headers['X-Bar'] == 'Foo' + assert rv4.status_code == 404 + + +def test_make_response(): + app = flask.Flask(__name__) + with app.test_request_context(): + rv = flask.make_response() + assert rv.status_code == 200 + assert rv.data == b'' + assert rv.mimetype == 'text/html' + + rv = flask.make_response('Awesome') + assert rv.status_code == 200 + assert rv.data == b'Awesome' + assert rv.mimetype == 'text/html' + + rv = flask.make_response('W00t', 404) + assert rv.status_code == 404 + assert rv.data == b'W00t' + assert rv.mimetype == 'text/html' + + +def test_make_response_with_response_instance(): + app = flask.Flask(__name__) + with app.test_request_context(): + rv = flask.make_response( + flask.jsonify({'msg': 'W00t'}), 400) + assert rv.status_code == 400 + assert rv.data == b'{\n "msg": "W00t"\n}\n' + assert rv.mimetype == 'application/json' + + rv = flask.make_response( + flask.Response(''), 400) + assert rv.status_code == 400 + assert rv.data == b'' + assert rv.mimetype == 'text/html' + + rv = flask.make_response( + flask.Response('', headers={'Content-Type': 'text/html'}), + 400, [('X-Foo', 'bar')]) + assert rv.status_code == 400 + assert rv.headers['Content-Type'] == 'text/html' + assert rv.headers['X-Foo'] == 'bar' + + +def test_jsonify_no_prettyprint(): + app = flask.Flask(__name__) + app.config.update({"JSONIFY_PRETTYPRINT_REGULAR": False}) + with app.test_request_context(): + compressed_msg = b'{"msg":{"submsg":"W00t"},"msg2":"foobar"}\n' + uncompressed_msg = { + "msg": { + "submsg": "W00t" + }, + "msg2": "foobar" + } + + rv = flask.make_response( + flask.jsonify(uncompressed_msg), 200) + assert rv.data == compressed_msg + + +def test_jsonify_prettyprint(): + app = flask.Flask(__name__) + app.config.update({"JSONIFY_PRETTYPRINT_REGULAR": True}) + with app.test_request_context(): + compressed_msg = {"msg":{"submsg":"W00t"},"msg2":"foobar"} + pretty_response =\ + b'{\n "msg": {\n "submsg": "W00t"\n }, \n "msg2": "foobar"\n}\n' + + rv = flask.make_response( + flask.jsonify(compressed_msg), 200) + assert rv.data == pretty_response + + +def test_jsonify_mimetype(): + app = flask.Flask(__name__) + app.config.update({"JSONIFY_MIMETYPE": 'application/vnd.api+json'}) + with app.test_request_context(): + msg = { + "msg": {"submsg": "W00t"}, + } + rv = flask.make_response( + flask.jsonify(msg), 200) + assert rv.mimetype == 'application/vnd.api+json' + + +def test_jsonify_args_and_kwargs_check(): + app = flask.Flask(__name__) + with app.test_request_context(): + with pytest.raises(TypeError) as e: + flask.jsonify('fake args', kwargs='fake') + assert 'behavior undefined' in str(e.value) + + +def test_url_generation(): + app = flask.Flask(__name__) + + @app.route('/hello/', methods=['POST']) + def hello(): + pass + with app.test_request_context(): + assert flask.url_for('hello', name='test x') == '/hello/test%20x' + assert flask.url_for('hello', name='test x', _external=True) == \ + 'http://localhost/hello/test%20x' + + +def test_build_error_handler(): + app = flask.Flask(__name__) + + # Test base case, a URL which results in a BuildError. + with app.test_request_context(): + pytest.raises(BuildError, flask.url_for, 'spam') + + # Verify the error is re-raised if not the current exception. + try: + with app.test_request_context(): + flask.url_for('spam') + except BuildError as err: + error = err + try: + raise RuntimeError('Test case where BuildError is not current.') + except RuntimeError: + pytest.raises( + BuildError, app.handle_url_build_error, error, 'spam', {}) + + # Test a custom handler. + def handler(error, endpoint, values): + # Just a test. + return '/test_handler/' + app.url_build_error_handlers.append(handler) + with app.test_request_context(): + assert flask.url_for('spam') == '/test_handler/' + + +def test_build_error_handler_reraise(): + app = flask.Flask(__name__) + + # Test a custom handler which reraises the BuildError + def handler_raises_build_error(error, endpoint, values): + raise error + app.url_build_error_handlers.append(handler_raises_build_error) + + with app.test_request_context(): + pytest.raises(BuildError, flask.url_for, 'not.existing') + + +def test_custom_converters(): + from werkzeug.routing import BaseConverter + + class ListConverter(BaseConverter): + + def to_python(self, value): + return value.split(',') + + def to_url(self, value): + base_to_url = super(ListConverter, self).to_url + return ','.join(base_to_url(x) for x in value) + app = flask.Flask(__name__) + app.url_map.converters['list'] = ListConverter + + @app.route('/') + def index(args): + return '|'.join(args) + c = app.test_client() + assert c.get('/1,2,3').data == b'1|2|3' + + +def test_static_files(): + app = flask.Flask(__name__) + app.testing = True + rv = app.test_client().get('/static/index.html') + assert rv.status_code == 200 + assert rv.data.strip() == b'

Hello World!

' + with app.test_request_context(): + assert flask.url_for('static', filename='index.html') == \ + '/static/index.html' + rv.close() + + +def test_static_path_deprecated(recwarn): + app = flask.Flask(__name__, static_path='/foo') + recwarn.pop(DeprecationWarning) + + app.testing = True + rv = app.test_client().get('/foo/index.html') + assert rv.status_code == 200 + rv.close() + + with app.test_request_context(): + assert flask.url_for('static', filename='index.html') == '/foo/index.html' + + +def test_static_url_path(): + app = flask.Flask(__name__, static_url_path='/foo') + app.testing = True + rv = app.test_client().get('/foo/index.html') + assert rv.status_code == 200 + rv.close() + + with app.test_request_context(): + assert flask.url_for('static', filename='index.html') == '/foo/index.html' + + +def test_none_response(): + app = flask.Flask(__name__) + app.testing = True + + @app.route('/') + def test(): + return None + try: + app.test_client().get('/') + except ValueError as e: + assert str(e) == 'View function did not return a response' + pass + else: + assert "Expected ValueError" + + +def test_request_locals(): + assert repr(flask.g) == '' + assert not flask.g + + +def test_test_app_proper_environ(): + app = flask.Flask(__name__) + app.config.update( + SERVER_NAME='localhost.localdomain:5000' + ) + + @app.route('/') + def index(): + return 'Foo' + + @app.route('/', subdomain='foo') + def subdomain(): + return 'Foo SubDomain' + + rv = app.test_client().get('/') + assert rv.data == b'Foo' + + rv = app.test_client().get('/', 'http://localhost.localdomain:5000') + assert rv.data == b'Foo' + + rv = app.test_client().get('/', 'https://localhost.localdomain:5000') + assert rv.data == b'Foo' + + app.config.update(SERVER_NAME='localhost.localdomain') + rv = app.test_client().get('/', 'https://localhost.localdomain') + assert rv.data == b'Foo' + + try: + app.config.update(SERVER_NAME='localhost.localdomain:443') + rv = app.test_client().get('/', 'https://localhost.localdomain') + # Werkzeug 0.8 + assert rv.status_code == 404 + except ValueError as e: + # Werkzeug 0.7 + assert str(e) == ( + "the server name provided " + "('localhost.localdomain:443') does not match the " + "server name from the WSGI environment ('localhost.localdomain')" + ) + + try: + app.config.update(SERVER_NAME='localhost.localdomain') + rv = app.test_client().get('/', 'http://foo.localhost') + # Werkzeug 0.8 + assert rv.status_code == 404 + except ValueError as e: + # Werkzeug 0.7 + assert str(e) == ( + "the server name provided " + "('localhost.localdomain') does not match the " + "server name from the WSGI environment ('foo.localhost')" + ) + + rv = app.test_client().get('/', 'http://foo.localhost.localdomain') + assert rv.data == b'Foo SubDomain' + + +def test_exception_propagation(): + def apprunner(config_key): + app = flask.Flask(__name__) + app.config['LOGGER_HANDLER_POLICY'] = 'never' + + @app.route('/') + def index(): + 1 // 0 + c = app.test_client() + if config_key is not None: + app.config[config_key] = True + with pytest.raises(Exception): + c.get('/') + else: + assert c.get('/').status_code == 500 + + # we have to run this test in an isolated thread because if the + # debug flag is set to true and an exception happens the context is + # not torn down. This causes other tests that run after this fail + # when they expect no exception on the stack. + for config_key in 'TESTING', 'PROPAGATE_EXCEPTIONS', 'DEBUG', None: + t = Thread(target=apprunner, args=(config_key,)) + t.start() + t.join() + + +@pytest.mark.parametrize('debug', [True, False]) +@pytest.mark.parametrize('use_debugger', [True, False]) +@pytest.mark.parametrize('use_reloader', [True, False]) +@pytest.mark.parametrize('propagate_exceptions', [None, True, False]) +def test_werkzeug_passthrough_errors(monkeypatch, debug, use_debugger, + use_reloader, propagate_exceptions): + rv = {} + + # Mocks werkzeug.serving.run_simple method + def run_simple_mock(*args, **kwargs): + rv['passthrough_errors'] = kwargs.get('passthrough_errors') + + app = flask.Flask(__name__) + monkeypatch.setattr(werkzeug.serving, 'run_simple', run_simple_mock) + app.config['PROPAGATE_EXCEPTIONS'] = propagate_exceptions + app.run(debug=debug, use_debugger=use_debugger, use_reloader=use_reloader) + + +def test_max_content_length(): + app = flask.Flask(__name__) + app.config['MAX_CONTENT_LENGTH'] = 64 + + @app.before_request + def always_first(): + flask.request.form['myfile'] + assert False + + @app.route('/accept', methods=['POST']) + def accept_file(): + flask.request.form['myfile'] + assert False + + @app.errorhandler(413) + def catcher(error): + return '42' + + c = app.test_client() + rv = c.post('/accept', data={'myfile': 'foo' * 100}) + assert rv.data == b'42' + + +def test_url_processors(): + app = flask.Flask(__name__) + + @app.url_defaults + def add_language_code(endpoint, values): + if flask.g.lang_code is not None and \ + app.url_map.is_endpoint_expecting(endpoint, 'lang_code'): + values.setdefault('lang_code', flask.g.lang_code) + + @app.url_value_preprocessor + def pull_lang_code(endpoint, values): + flask.g.lang_code = values.pop('lang_code', None) + + @app.route('//') + def index(): + return flask.url_for('about') + + @app.route('//about') + def about(): + return flask.url_for('something_else') + + @app.route('/foo') + def something_else(): + return flask.url_for('about', lang_code='en') + + c = app.test_client() + + assert c.get('/de/').data == b'/de/about' + assert c.get('/de/about').data == b'/foo' + assert c.get('/foo').data == b'/en/about' + + +def test_inject_blueprint_url_defaults(): + app = flask.Flask(__name__) + bp = flask.Blueprint('foo.bar.baz', __name__, + template_folder='template') + + @bp.url_defaults + def bp_defaults(endpoint, values): + values['page'] = 'login' + + @bp.route('/') + def view(page): + pass + + app.register_blueprint(bp) + + values = dict() + app.inject_url_defaults('foo.bar.baz.view', values) + expected = dict(page='login') + assert values == expected + + with app.test_request_context('/somepage'): + url = flask.url_for('foo.bar.baz.view') + expected = '/login' + assert url == expected + + +def test_nonascii_pathinfo(): + app = flask.Flask(__name__) + app.testing = True + + @app.route(u'/киртест') + def index(): + return 'Hello World!' + + c = app.test_client() + rv = c.get(u'/киртест') + assert rv.data == b'Hello World!' + + +def test_debug_mode_complains_after_first_request(): + app = flask.Flask(__name__) + app.debug = True + + @app.route('/') + def index(): + return 'Awesome' + assert not app.got_first_request + assert app.test_client().get('/').data == b'Awesome' + with pytest.raises(AssertionError) as e: + @app.route('/foo') + def broken(): + return 'Meh' + assert 'A setup function was called' in str(e) + + app.debug = False + + @app.route('/foo') + def working(): + return 'Meh' + assert app.test_client().get('/foo').data == b'Meh' + assert app.got_first_request + + +def test_before_first_request_functions(): + got = [] + app = flask.Flask(__name__) + + @app.before_first_request + def foo(): + got.append(42) + c = app.test_client() + c.get('/') + assert got == [42] + c.get('/') + assert got == [42] + assert app.got_first_request + + +def test_before_first_request_functions_concurrent(): + got = [] + app = flask.Flask(__name__) + + @app.before_first_request + def foo(): + time.sleep(0.2) + got.append(42) + + c = app.test_client() + + def get_and_assert(): + c.get("/") + assert got == [42] + + t = Thread(target=get_and_assert) + t.start() + get_and_assert() + t.join() + assert app.got_first_request + + +def test_routing_redirect_debugging(): + app = flask.Flask(__name__) + app.debug = True + + @app.route('/foo/', methods=['GET', 'POST']) + def foo(): + return 'success' + with app.test_client() as c: + with pytest.raises(AssertionError) as e: + c.post('/foo', data={}) + assert 'http://localhost/foo/' in str(e) + assert ('Make sure to directly send ' + 'your POST-request to this URL') in str(e) + + rv = c.get('/foo', data={}, follow_redirects=True) + assert rv.data == b'success' + + app.debug = False + with app.test_client() as c: + rv = c.post('/foo', data={}, follow_redirects=True) + assert rv.data == b'success' + + +def test_route_decorator_custom_endpoint(): + app = flask.Flask(__name__) + app.debug = True + + @app.route('/foo/') + def foo(): + return flask.request.endpoint + + @app.route('/bar/', endpoint='bar') + def for_bar(): + return flask.request.endpoint + + @app.route('/bar/123', endpoint='123') + def for_bar_foo(): + return flask.request.endpoint + + with app.test_request_context(): + assert flask.url_for('foo') == '/foo/' + assert flask.url_for('bar') == '/bar/' + assert flask.url_for('123') == '/bar/123' + + c = app.test_client() + assert c.get('/foo/').data == b'foo' + assert c.get('/bar/').data == b'bar' + assert c.get('/bar/123').data == b'123' + + +def test_preserve_only_once(): + app = flask.Flask(__name__) + app.debug = True + + @app.route('/fail') + def fail_func(): + 1 // 0 + + c = app.test_client() + for x in range(3): + with pytest.raises(ZeroDivisionError): + c.get('/fail') + + assert flask._request_ctx_stack.top is not None + assert flask._app_ctx_stack.top is not None + # implicit appctx disappears too + flask._request_ctx_stack.top.pop() + assert flask._request_ctx_stack.top is None + assert flask._app_ctx_stack.top is None + + +def test_preserve_remembers_exception(): + app = flask.Flask(__name__) + app.debug = True + errors = [] + + @app.route('/fail') + def fail_func(): + 1 // 0 + + @app.route('/success') + def success_func(): + return 'Okay' + + @app.teardown_request + def teardown_handler(exc): + errors.append(exc) + + c = app.test_client() + + # After this failure we did not yet call the teardown handler + with pytest.raises(ZeroDivisionError): + c.get('/fail') + assert errors == [] + + # But this request triggers it, and it's an error + c.get('/success') + assert len(errors) == 2 + assert isinstance(errors[0], ZeroDivisionError) + + # At this point another request does nothing. + c.get('/success') + assert len(errors) == 3 + assert errors[1] is None + + +def test_get_method_on_g(): + app = flask.Flask(__name__) + app.testing = True + + with app.app_context(): + assert flask.g.get('x') is None + assert flask.g.get('x', 11) == 11 + flask.g.x = 42 + assert flask.g.get('x') == 42 + assert flask.g.x == 42 + + +def test_g_iteration_protocol(): + app = flask.Flask(__name__) + app.testing = True + + with app.app_context(): + flask.g.foo = 23 + flask.g.bar = 42 + assert 'foo' in flask.g + assert 'foos' not in flask.g + assert sorted(flask.g) == ['bar', 'foo'] + + +def test_subdomain_basic_support(): + app = flask.Flask(__name__) + app.config['SERVER_NAME'] = 'localhost' + + @app.route('/') + def normal_index(): + return 'normal index' + + @app.route('/', subdomain='test') + def test_index(): + return 'test index' + + c = app.test_client() + rv = c.get('/', 'http://localhost/') + assert rv.data == b'normal index' + + rv = c.get('/', 'http://test.localhost/') + assert rv.data == b'test index' + + +def test_subdomain_matching(): + app = flask.Flask(__name__) + app.config['SERVER_NAME'] = 'localhost' + + @app.route('/', subdomain='') + def index(user): + return 'index for %s' % user + + c = app.test_client() + rv = c.get('/', 'http://mitsuhiko.localhost/') + assert rv.data == b'index for mitsuhiko' + + +def test_subdomain_matching_with_ports(): + app = flask.Flask(__name__) + app.config['SERVER_NAME'] = 'localhost:3000' + + @app.route('/', subdomain='') + def index(user): + return 'index for %s' % user + + c = app.test_client() + rv = c.get('/', 'http://mitsuhiko.localhost:3000/') + assert rv.data == b'index for mitsuhiko' + + +def test_multi_route_rules(): + app = flask.Flask(__name__) + + @app.route('/') + @app.route('//') + def index(test='a'): + return test + + rv = app.test_client().open('/') + assert rv.data == b'a' + rv = app.test_client().open('/b/') + assert rv.data == b'b' + + +def test_multi_route_class_views(): + class View(object): + + def __init__(self, app): + app.add_url_rule('/', 'index', self.index) + app.add_url_rule('//', 'index', self.index) + + def index(self, test='a'): + return test + + app = flask.Flask(__name__) + _ = View(app) + rv = app.test_client().open('/') + assert rv.data == b'a' + rv = app.test_client().open('/b/') + assert rv.data == b'b' + + +def test_run_defaults(monkeypatch): + rv = {} + + # Mocks werkzeug.serving.run_simple method + def run_simple_mock(*args, **kwargs): + rv['result'] = 'running...' + + app = flask.Flask(__name__) + monkeypatch.setattr(werkzeug.serving, 'run_simple', run_simple_mock) + app.run() + assert rv['result'] == 'running...' + + +def test_run_server_port(monkeypatch): + rv = {} + + # Mocks werkzeug.serving.run_simple method + def run_simple_mock(hostname, port, application, *args, **kwargs): + rv['result'] = 'running on %s:%s ...' % (hostname, port) + + app = flask.Flask(__name__) + monkeypatch.setattr(werkzeug.serving, 'run_simple', run_simple_mock) + hostname, port = 'localhost', 8000 + app.run(hostname, port, debug=True) + assert rv['result'] == 'running on %s:%s ...' % (hostname, port) diff --git a/tests/scripts/test_helpers.py b/tests/scripts/test_helpers.py new file mode 100644 index 0000000..7f84fdd --- /dev/null +++ b/tests/scripts/test_helpers.py @@ -0,0 +1,358 @@ +# -*- coding: utf-8 -*- +""" + tests.helpers + ~~~~~~~~~~~~~~~~~~~~~~~ + + Various helpers. + + :copyright: (c) 2015 by Armin Ronacher. + :license: BSD, see LICENSE for more details. +""" + +import pytest + +import uuid +import datetime + +import flask +from werkzeug.exceptions import BadRequest +from werkzeug.http import http_date + +from flask import json +from flask._compat import StringIO, text_type + + +def has_encoding(name): + try: + import codecs + codecs.lookup(name) + return True + except LookupError: + return False + + +class TestJSON(object): + @pytest.mark.parametrize('value', ( + 1, 't', True, False, None, + [], [1, 2, 3], + {}, {'foo': u'🐍'}, + )) + @pytest.mark.parametrize('encoding', ( + 'utf-8', 'utf-8-sig', + 'utf-16-le', 'utf-16-be', 'utf-16', + 'utf-32-le', 'utf-32-be', 'utf-32', + )) + def test_detect_encoding(self, value, encoding): + data = json.dumps(value).encode(encoding) + assert json.detect_encoding(data) == encoding + assert json.loads(data) == value + + def test_ignore_cached_json(self): + app = flask.Flask(__name__) + with app.test_request_context('/', method='POST', data='malformed', + content_type='application/json'): + assert flask.request.get_json(silent=True, cache=True) is None + with pytest.raises(BadRequest): + flask.request.get_json(silent=False, cache=False) + + def test_post_empty_json_adds_exception_to_response_content_in_debug(self): + app = flask.Flask(__name__) + app.config['DEBUG'] = True + @app.route('/json', methods=['POST']) + def post_json(): + flask.request.get_json() + return None + c = app.test_client() + rv = c.post('/json', data=None, content_type='application/json') + assert rv.status_code == 400 + assert b'Failed to decode JSON object' in rv.data + + def test_post_empty_json_wont_add_exception_to_response_if_no_debug(self): + app = flask.Flask(__name__) + app.config['DEBUG'] = False + @app.route('/json', methods=['POST']) + def post_json(): + flask.request.get_json() + return None + c = app.test_client() + rv = c.post('/json', data=None, content_type='application/json') + assert rv.status_code == 400 + assert b'Failed to decode JSON object' not in rv.data + + def test_json_bad_requests(self): + app = flask.Flask(__name__) + @app.route('/json', methods=['POST']) + def return_json(): + return flask.jsonify(foo=text_type(flask.request.get_json())) + c = app.test_client() + rv = c.post('/json', data='malformed', content_type='application/json') + assert rv.status_code == 400 + + def test_json_custom_mimetypes(self): + app = flask.Flask(__name__) + @app.route('/json', methods=['POST']) + def return_json(): + return flask.request.get_json() + c = app.test_client() + rv = c.post('/json', data='"foo"', content_type='application/x+json') + assert rv.data == b'foo' + + def test_json_as_unicode(self): + app = flask.Flask(__name__) + + app.config['JSON_AS_ASCII'] = True + with app.app_context(): + rv = flask.json.dumps(u'\N{SNOWMAN}') + assert rv == '"\\u2603"' + + app.config['JSON_AS_ASCII'] = False + with app.app_context(): + rv = flask.json.dumps(u'\N{SNOWMAN}') + assert rv == u'"\u2603"' + + def test_json_dump_to_file(self): + app = flask.Flask(__name__) + test_data = {'name': 'Flask'} + out = StringIO() + + with app.app_context(): + flask.json.dump(test_data, out) + out.seek(0) + rv = flask.json.load(out) + assert rv == test_data + + @pytest.mark.parametrize('test_value', [0, -1, 1, 23, 3.14, 's', "longer string", True, False, None]) + def test_jsonify_basic_types(self, test_value): + """Test jsonify with basic types.""" + app = flask.Flask(__name__) + c = app.test_client() + + url = '/jsonify_basic_types' + app.add_url_rule(url, url, lambda x=test_value: flask.jsonify(x)) + rv = c.get(url) + assert rv.mimetype == 'application/json' + assert flask.json.loads(rv.data) == test_value + + def test_jsonify_dicts(self): + """Test jsonify with dicts and kwargs unpacking.""" + d = dict( + a=0, b=23, c=3.14, d='t', e='Hi', f=True, g=False, + h=['test list', 10, False], + i={'test':'dict'} + ) + app = flask.Flask(__name__) + @app.route('/kw') + def return_kwargs(): + return flask.jsonify(**d) + @app.route('/dict') + def return_dict(): + return flask.jsonify(d) + c = app.test_client() + for url in '/kw', '/dict': + rv = c.get(url) + assert rv.mimetype == 'application/json' + assert flask.json.loads(rv.data) == d + + def test_jsonify_arrays(self): + """Test jsonify of lists and args unpacking.""" + l = [ + 0, 42, 3.14, 't', 'hello', True, False, + ['test list', 2, False], + {'test':'dict'} + ] + app = flask.Flask(__name__) + @app.route('/args_unpack') + def return_args_unpack(): + return flask.jsonify(*l) + @app.route('/array') + def return_array(): + return flask.jsonify(l) + c = app.test_client() + for url in '/args_unpack', '/array': + rv = c.get(url) + assert rv.mimetype == 'application/json' + assert flask.json.loads(rv.data) == l + + def test_jsonify_date_types(self): + """Test jsonify with datetime.date and datetime.datetime types.""" + test_dates = ( + datetime.datetime(1973, 3, 11, 6, 30, 45), + datetime.date(1975, 1, 5) + ) + app = flask.Flask(__name__) + c = app.test_client() + + for i, d in enumerate(test_dates): + url = '/datetest{0}'.format(i) + app.add_url_rule(url, str(i), lambda val=d: flask.jsonify(x=val)) + rv = c.get(url) + assert rv.mimetype == 'application/json' + assert flask.json.loads(rv.data)['x'] == http_date(d.timetuple()) + + def test_jsonify_uuid_types(self): + """Test jsonify with uuid.UUID types""" + + test_uuid = uuid.UUID(bytes=b'\xDE\xAD\xBE\xEF' * 4) + app = flask.Flask(__name__) + url = '/uuid_test' + app.add_url_rule(url, url, lambda: flask.jsonify(x=test_uuid)) + + c = app.test_client() + rv = c.get(url) + + rv_x = flask.json.loads(rv.data)['x'] + assert rv_x == str(test_uuid) + rv_uuid = uuid.UUID(rv_x) + assert rv_uuid == test_uuid + + def test_json_attr(self): + app = flask.Flask(__name__) + @app.route('/add', methods=['POST']) + def add(): + json = flask.request.get_json() + return text_type(json['a'] + json['b']) + c = app.test_client() + rv = c.post('/add', data=flask.json.dumps({'a': 1, 'b': 2}), + content_type='application/json') + assert rv.data == b'3' + + def test_template_escaping(self): + app = flask.Flask(__name__) + render = flask.render_template_string + with app.test_request_context(): + rv = flask.json.htmlsafe_dumps('') + assert rv == u'"\\u003c/script\\u003e"' + assert type(rv) == text_type + rv = render('{{ ""|tojson }}') + assert rv == '"\\u003c/script\\u003e"' + rv = render('{{ "<\0/script>"|tojson }}') + assert rv == '"\\u003c\\u0000/script\\u003e"' + rv = render('{{ "