Backport fix for CVE-2023-25577

Also bring over tests from dist-git, and add test for formparser to test
the backported changes.

Resolves: rhbz#2188442
This commit is contained in:
Brian C. Lane 2023-04-18 16:15:31 -07:00
parent d59799fe03
commit f9244201cb
29 changed files with 1182 additions and 6 deletions

View File

@ -0,0 +1,143 @@
From 722f58f8221c013e2dd6cf1fde59fd686619f483 Mon Sep 17 00:00:00 2001
From: "Brian C. Lane" <bcl@redhat.com>
Date: Tue, 18 Apr 2023 15:57:50 -0700
Subject: [PATCH] Backport limits for multiple form parts
This fixes CVE-2023-25577, and is backported from the fix for 2.2.3
here:
https://github.com/pallets/werkzeug/commit/517cac5a804e8c4dc4ed038bb20dacd038e7a9f1
It adds a max_form_parts limit that defaults to 1000. This will prevent
memory exhaustion when it receives too many form parts.
Resolves: rhbz#2170325
---
tests/test_formparser.py | 9 +++++++++
werkzeug/formparser.py | 15 ++++++++++++---
werkzeug/wrappers.py | 12 +++++++++++-
3 files changed, 32 insertions(+), 4 deletions(-)
diff --git a/tests/test_formparser.py b/tests/test_formparser.py
index c140476b..23f44800 100644
--- a/tests/test_formparser.py
+++ b/tests/test_formparser.py
@@ -101,6 +101,15 @@ class TestFormParser(object):
req.max_form_memory_size = 400
strict_eq(req.form['foo'], u'Hello World')
+ # Test fix for CVE-2023-25577 that limits on form-data raise an error
+ req = Request.from_values(input_stream=BytesIO(data),
+ content_length=len(data),
+ content_type='multipart/form-data; boundary=foo',
+ method='POST')
+ req.max_form_parts = 1
+ pytest.raises(RequestEntityTooLarge, lambda: req.form['foo'])
+
+
def test_missing_multipart_boundary(self):
data = (b'--foo\r\nContent-Disposition: form-field; name=foo\r\n\r\n'
b'Hello World\r\n'
diff --git a/werkzeug/formparser.py b/werkzeug/formparser.py
index a0118054..7a21e123 100644
--- a/werkzeug/formparser.py
+++ b/werkzeug/formparser.py
@@ -137,12 +137,14 @@ class FormDataParser(object):
:param cls: an optional dict class to use. If this is not specified
or `None` the default :class:`MultiDict` is used.
:param silent: If set to False parsing errors will not be caught.
+ :param max_form_parts: The maximum number of parts to be parsed. If this is
+ exceeded, a :exc:`~exceptions.RequestEntityTooLarge` exception is raised.
"""
def __init__(self, stream_factory=None, charset='utf-8',
errors='replace', max_form_memory_size=None,
max_content_length=None, cls=None,
- silent=True):
+ silent=True, max_form_parts=None):
if stream_factory is None:
stream_factory = default_stream_factory
self.stream_factory = stream_factory
@@ -154,6 +156,7 @@ class FormDataParser(object):
cls = MultiDict
self.cls = cls
self.silent = silent
+ self.max_form_parts = max_form_parts
def get_parse_func(self, mimetype, options):
return self.parse_functions.get(mimetype)
@@ -203,7 +206,7 @@ class FormDataParser(object):
def _parse_multipart(self, stream, mimetype, content_length, options):
parser = MultiPartParser(self.stream_factory, self.charset, self.errors,
max_form_memory_size=self.max_form_memory_size,
- cls=self.cls)
+ cls=self.cls, max_form_parts=self.max_form_parts)
boundary = options.get('boundary')
if boundary is None:
raise ValueError('Missing boundary')
@@ -285,12 +288,14 @@ _end = 'end'
class MultiPartParser(object):
def __init__(self, stream_factory=None, charset='utf-8', errors='replace',
- max_form_memory_size=None, cls=None, buffer_size=64 * 1024):
+ max_form_memory_size=None, cls=None, buffer_size=64 * 1024,
+ max_form_parts=None):
self.charset = charset
self.errors = errors
self.max_form_memory_size = max_form_memory_size
self.stream_factory = default_stream_factory if stream_factory is None else stream_factory
self.cls = MultiDict if cls is None else cls
+ self.max_form_parts = max_form_parts
# make sure the buffer size is divisible by four so that we can base64
# decode chunk by chunk
@@ -472,6 +477,7 @@ class MultiPartParser(object):
``('form', (name, val))`` parts.
"""
in_memory = 0
+ parts_decoded = 0
for ellt, ell in self.parse_lines(file, boundary, content_length):
if ellt == _begin_file:
@@ -500,6 +506,9 @@ class MultiPartParser(object):
self.in_memory_threshold_reached(in_memory)
elif ellt == _end:
+ parts_decoded += 1
+ if self.max_form_parts is not None and parts_decoded > self.max_form_parts:
+ raise exceptions.RequestEntityTooLarge()
if is_file:
container.seek(0)
yield ('file',
diff --git a/werkzeug/wrappers.py b/werkzeug/wrappers.py
index ea35228e..eca7eec7 100644
--- a/werkzeug/wrappers.py
+++ b/werkzeug/wrappers.py
@@ -169,6 +169,15 @@ class BaseRequest(object):
#: .. versionadded:: 0.5
max_form_memory_size = None
+ #: The maximum number of multipart parts to parse, passed to
+ #: :attr:`form_data_parser_class`. Parsing form data with more than this
+ #: many parts will raise :exc:`~.RequestEntityTooLarge`.
+ #:
+ #: Backported to 0.12.2 from 2.2.3
+ #:
+ #: .. versionadded:: 2.2.3
+ max_form_parts = 1000
+
#: the class to use for `args` and `form`. The default is an
#: :class:`~werkzeug.datastructures.ImmutableMultiDict` which supports
#: multiple values per key. alternatively it makes sense to use an
@@ -345,7 +354,8 @@ class BaseRequest(object):
self.encoding_errors,
self.max_form_memory_size,
self.max_content_length,
- self.parameter_storage_class)
+ self.parameter_storage_class,
+ max_form_parts=self.max_form_parts)
def _load_form_data(self):
"""Method used internally to retrieve submitted data. After calling
--
2.40.0

View File

@ -9,7 +9,7 @@
Name: python-werkzeug
Version: 0.12.2
Release: 4%{?dist}
Release: 7%{?dist}
Summary: The Swiss Army knife of Python web development
Group: Development/Libraries
@ -20,6 +20,8 @@ Source0: https://files.pythonhosted.org/packages/source/W/Werkzeug/%{srcn
# See https://github.com/mitsuhiko/werkzeug/issues/761
Source1: werkzeug-sphinx-theme.tar.gz
Patch0001: 0001-Backport-limits-for-multiple-form-parts.patch
BuildArch: noarch
%global _description\
@ -53,7 +55,7 @@ BuildRequires: python2-setuptools
%{?python_provide:%python_provide python2-werkzeug}
%description -n python2-werkzeug %_description
%endif # with python2
%endif
%package -n python3-werkzeug
@ -79,7 +81,7 @@ Documentation and examples for python-werkzeug.
%prep
%setup -q -n %{srcname}-%{version}
%autosetup -p1 -n %{srcname}-%{version}
%{__sed} -i 's/\r//' LICENSE
%{__sed} -i '1d' tests/multipart/test_collect.py
tar -xf %{SOURCE1}
@ -94,7 +96,7 @@ find %{py3dir} -name '*.py' | xargs sed -i '1s|^#!python|#!%{__python3}|'
%py2_build
find examples/ -name '*.py' -executable | xargs chmod -x
find examples/ -name '*.png' -executable | xargs chmod -x
%endif # with python2
%endif
pushd %{py3dir}
%py3_build
@ -113,7 +115,7 @@ popd
%install
%if %{with python2}
%py2_install
%endif # with python2
%endif
pushd %{py3dir}
%py3_install
@ -126,7 +128,7 @@ popd
%license LICENSE
%doc AUTHORS PKG-INFO CHANGES
%{python2_sitelib}/*
%endif # with python2
%endif
%files -n python3-werkzeug
%license LICENSE
@ -138,6 +140,12 @@ popd
%changelog
* Mon Apr 24 2023 Brian C. Lane <bcl@redhat.com> - 0.12.2-7
- Add tests from dist-git
- Add new test for formdata
- Backport fix for CVE-2023-25577
Resolves: rhbz#2188442
* Fri Jun 22 2018 Charalampos Stratakis <cstratak@redhat.com> - 0.12.2-4
- Use python3-sphinx for the docs

Binary file not shown.

After

Width:  |  Height:  |  Size: 523 B

Binary file not shown.

After

Width:  |  Height:  |  Size: 703 B

Binary file not shown.

View File

@ -0,0 +1 @@
example text

Binary file not shown.

After

Width:  |  Height:  |  Size: 781 B

Binary file not shown.

After

Width:  |  Height:  |  Size: 733 B

View File

@ -0,0 +1,3 @@
--long text
--with boundary
--lookalikes--

Binary file not shown.

After

Width:  |  Height:  |  Size: 523 B

Binary file not shown.

After

Width:  |  Height:  |  Size: 703 B

Binary file not shown.

View File

@ -0,0 +1 @@
ie6 sucks :-/

Binary file not shown.

Binary file not shown.

After

Width:  |  Height:  |  Size: 582 B

Binary file not shown.

After

Width:  |  Height:  |  Size: 733 B

Binary file not shown.

View File

@ -0,0 +1 @@
blafasel öäü

View File

@ -0,0 +1,56 @@
"""
Hacky helper application to collect form data.
"""
from werkzeug.serving import run_simple
from werkzeug.wrappers import Request, Response
def copy_stream(request):
from os import mkdir
from time import time
folder = 'request-%d' % time()
mkdir(folder)
environ = request.environ
f = open(folder + '/request.txt', 'wb+')
f.write(environ['wsgi.input'].read(int(environ['CONTENT_LENGTH'])))
f.flush()
f.seek(0)
environ['wsgi.input'] = f
request.stat_folder = folder
def stats(request):
copy_stream(request)
f1 = request.files['file1']
f2 = request.files['file2']
text = request.form['text']
f1.save(request.stat_folder + '/file1.bin')
f2.save(request.stat_folder + '/file2.bin')
with open(request.stat_folder + '/text.txt', 'w') as f:
f.write(text.encode('utf-8'))
return Response('Done.')
def upload_file(request):
return Response('''
<h1>Upload File</h1>
<form action="" method="post" enctype="multipart/form-data">
<input type="file" name="file1"><br>
<input type="file" name="file2"><br>
<textarea name="text"></textarea><br>
<input type="submit" value="Send">
</form>
''', mimetype='text/html')
def application(environ, start_responseonse):
request = Request(environ)
if request.method == 'POST':
response = stats(request)
else:
response = upload_file(request)
return response(environ, start_responseonse)
if __name__ == '__main__':
run_simple('localhost', 5000, application, use_debugger=True)

Binary file not shown.

After

Width:  |  Height:  |  Size: 1002 B

Binary file not shown.

After

Width:  |  Height:  |  Size: 952 B

Binary file not shown.

View File

@ -0,0 +1 @@
this is another text with ümläüts

View File

@ -0,0 +1 @@
FOUND

4
tests/scripts/run_tests.sh Executable file
View File

@ -0,0 +1,4 @@
#!/usr/bin/bash
set -eux
pytest-3 ./test_wsgi.py ./test_formparser.py

View File

@ -0,0 +1,468 @@
# -*- coding: utf-8 -*-
"""
tests.formparser
~~~~~~~~~~~~~~~~
Tests the form parsing facilities.
:copyright: (c) 2014 by Armin Ronacher.
:license: BSD, see LICENSE for more details.
"""
from __future__ import with_statement
import pytest
from os.path import join, dirname
#from tests import strict_eq
from werkzeug import formparser
from werkzeug.test import create_environ, Client
from werkzeug.wrappers import Request, Response
from werkzeug.exceptions import RequestEntityTooLarge
from werkzeug.datastructures import MultiDict
from werkzeug.formparser import parse_form_data, FormDataParser
from werkzeug._compat import BytesIO
def strict_eq(x, y):
'''Equality test bypassing the implicit string conversion in Python 2'''
__tracebackhide__ = True
assert x == y
assert issubclass(type(x), type(y)) or issubclass(type(y), type(x))
if isinstance(x, dict) and isinstance(y, dict):
x = sorted(x.items())
y = sorted(y.items())
elif isinstance(x, set) and isinstance(y, set):
x = sorted(x)
y = sorted(y)
assert repr(x) == repr(y)
@Request.application
def form_data_consumer(request):
result_object = request.args['object']
if result_object == 'text':
return Response(repr(request.form['text']))
f = request.files[result_object]
return Response(b'\n'.join((
repr(f.filename).encode('ascii'),
repr(f.name).encode('ascii'),
repr(f.content_type).encode('ascii'),
f.stream.read()
)))
def get_contents(filename):
with open(filename, 'rb') as f:
return f.read()
class TestFormParser(object):
def test_limiting(self):
data = b'foo=Hello+World&bar=baz'
req = Request.from_values(input_stream=BytesIO(data),
content_length=len(data),
content_type='application/x-www-form-urlencoded',
method='POST')
req.max_content_length = 400
strict_eq(req.form['foo'], u'Hello World')
req = Request.from_values(input_stream=BytesIO(data),
content_length=len(data),
content_type='application/x-www-form-urlencoded',
method='POST')
req.max_form_memory_size = 7
pytest.raises(RequestEntityTooLarge, lambda: req.form['foo'])
req = Request.from_values(input_stream=BytesIO(data),
content_length=len(data),
content_type='application/x-www-form-urlencoded',
method='POST')
req.max_form_memory_size = 400
strict_eq(req.form['foo'], u'Hello World')
data = (b'--foo\r\nContent-Disposition: form-field; name=foo\r\n\r\n'
b'Hello World\r\n'
b'--foo\r\nContent-Disposition: form-field; name=bar\r\n\r\n'
b'bar=baz\r\n--foo--')
req = Request.from_values(input_stream=BytesIO(data),
content_length=len(data),
content_type='multipart/form-data; boundary=foo',
method='POST')
req.max_content_length = 4
pytest.raises(RequestEntityTooLarge, lambda: req.form['foo'])
req = Request.from_values(input_stream=BytesIO(data),
content_length=len(data),
content_type='multipart/form-data; boundary=foo',
method='POST')
req.max_content_length = 400
strict_eq(req.form['foo'], u'Hello World')
req = Request.from_values(input_stream=BytesIO(data),
content_length=len(data),
content_type='multipart/form-data; boundary=foo',
method='POST')
req.max_form_memory_size = 7
pytest.raises(RequestEntityTooLarge, lambda: req.form['foo'])
req = Request.from_values(input_stream=BytesIO(data),
content_length=len(data),
content_type='multipart/form-data; boundary=foo',
method='POST')
req.max_form_memory_size = 400
strict_eq(req.form['foo'], u'Hello World')
# Test fix for CVE-2023-25577 that limits on form-data raise an error
req = Request.from_values(input_stream=BytesIO(data),
content_length=len(data),
content_type='multipart/form-data; boundary=foo',
method='POST')
req.max_form_parts = 1
pytest.raises(RequestEntityTooLarge, lambda: req.form['foo'])
def test_missing_multipart_boundary(self):
data = (b'--foo\r\nContent-Disposition: form-field; name=foo\r\n\r\n'
b'Hello World\r\n'
b'--foo\r\nContent-Disposition: form-field; name=bar\r\n\r\n'
b'bar=baz\r\n--foo--')
req = Request.from_values(input_stream=BytesIO(data),
content_length=len(data),
content_type='multipart/form-data',
method='POST')
assert req.form == {}
def test_parse_form_data_put_without_content(self):
# A PUT without a Content-Type header returns empty data
# Both rfc1945 and rfc2616 (1.0 and 1.1) say "Any HTTP/[1.0/1.1] message
# containing an entity-body SHOULD include a Content-Type header field
# defining the media type of that body." In the case where either
# headers are omitted, parse_form_data should still work.
env = create_environ('/foo', 'http://example.org/', method='PUT')
del env['CONTENT_TYPE']
del env['CONTENT_LENGTH']
stream, form, files = formparser.parse_form_data(env)
strict_eq(stream.read(), b'')
strict_eq(len(form), 0)
strict_eq(len(files), 0)
def test_parse_form_data_get_without_content(self):
env = create_environ('/foo', 'http://example.org/', method='GET')
del env['CONTENT_TYPE']
del env['CONTENT_LENGTH']
stream, form, files = formparser.parse_form_data(env)
strict_eq(stream.read(), b'')
strict_eq(len(form), 0)
strict_eq(len(files), 0)
def test_large_file(self):
data = b'x' * (1024 * 600)
req = Request.from_values(data={'foo': (BytesIO(data), 'test.txt')},
method='POST')
# make sure we have a real file here, because we expect to be
# on the disk. > 1024 * 500
assert hasattr(req.files['foo'].stream, u'fileno')
# close file to prevent fds from leaking
req.files['foo'].close()
def test_streaming_parse(self):
data = b'x' * (1024 * 600)
class StreamMPP(formparser.MultiPartParser):
def parse(self, file, boundary, content_length):
i = iter(self.parse_lines(file, boundary, content_length,
cap_at_buffer=False))
one = next(i)
two = next(i)
return self.cls(()), {'one': one, 'two': two}
class StreamFDP(formparser.FormDataParser):
def _sf_parse_multipart(self, stream, mimetype,
content_length, options):
form, files = StreamMPP(
self.stream_factory, self.charset, self.errors,
max_form_memory_size=self.max_form_memory_size,
cls=self.cls).parse(stream, options.get('boundary').encode('ascii'),
content_length)
return stream, form, files
parse_functions = {}
parse_functions.update(formparser.FormDataParser.parse_functions)
parse_functions['multipart/form-data'] = _sf_parse_multipart
class StreamReq(Request):
form_data_parser_class = StreamFDP
req = StreamReq.from_values(data={'foo': (BytesIO(data), 'test.txt')},
method='POST')
strict_eq('begin_file', req.files['one'][0])
strict_eq(('foo', 'test.txt'), req.files['one'][1][1:])
strict_eq('cont', req.files['two'][0])
strict_eq(data, req.files['two'][1])
def test_parse_bad_content_type(self):
parser = FormDataParser()
assert parser.parse('', 'bad-mime-type', 0) == \
('', MultiDict([]), MultiDict([]))
def test_parse_from_environ(self):
parser = FormDataParser()
stream, _, _ = parser.parse_from_environ({'wsgi.input': ''})
assert stream is not None
class TestMultiPart(object):
def test_basic(self):
resources = join(dirname(__file__), 'multipart')
client = Client(form_data_consumer, Response)
repository = [
('firefox3-2png1txt', '---------------------------186454651713519341951581030105', [
(u'anchor.png', 'file1', 'image/png', 'file1.png'),
(u'application_edit.png', 'file2', 'image/png', 'file2.png')
], u'example text'),
('firefox3-2pnglongtext', '---------------------------14904044739787191031754711748', [
(u'accept.png', 'file1', 'image/png', 'file1.png'),
(u'add.png', 'file2', 'image/png', 'file2.png')
], u'--long text\r\n--with boundary\r\n--lookalikes--'),
('opera8-2png1txt', '----------zEO9jQKmLc2Cq88c23Dx19', [
(u'arrow_branch.png', 'file1', 'image/png', 'file1.png'),
(u'award_star_bronze_1.png', 'file2', 'image/png', 'file2.png')
], u'blafasel öäü'),
('webkit3-2png1txt', '----WebKitFormBoundaryjdSFhcARk8fyGNy6', [
(u'gtk-apply.png', 'file1', 'image/png', 'file1.png'),
(u'gtk-no.png', 'file2', 'image/png', 'file2.png')
], u'this is another text with ümläüts'),
('ie6-2png1txt', '---------------------------7d91b03a20128', [
(u'file1.png', 'file1', 'image/x-png', 'file1.png'),
(u'file2.png', 'file2', 'image/x-png', 'file2.png')
], u'ie6 sucks :-/')
]
for name, boundary, files, text in repository:
folder = join(resources, name)
data = get_contents(join(folder, 'request.txt'))
for filename, field, content_type, fsname in files:
response = client.post(
'/?object=' + field,
data=data,
content_type='multipart/form-data; boundary="%s"' % boundary,
content_length=len(data))
lines = response.get_data().split(b'\n', 3)
strict_eq(lines[0], repr(filename).encode('ascii'))
strict_eq(lines[1], repr(field).encode('ascii'))
strict_eq(lines[2], repr(content_type).encode('ascii'))
strict_eq(lines[3], get_contents(join(folder, fsname)))
response = client.post(
'/?object=text',
data=data,
content_type='multipart/form-data; boundary="%s"' % boundary,
content_length=len(data))
strict_eq(response.get_data(), repr(text).encode('utf-8'))
def test_ie7_unc_path(self):
client = Client(form_data_consumer, Response)
data_file = join(dirname(__file__), 'multipart', 'ie7_full_path_request.txt')
data = get_contents(data_file)
boundary = '---------------------------7da36d1b4a0164'
response = client.post(
'/?object=cb_file_upload_multiple',
data=data,
content_type='multipart/form-data; boundary="%s"' % boundary,
content_length=len(data))
lines = response.get_data().split(b'\n', 3)
strict_eq(lines[0],
repr(u'Sellersburg Town Council Meeting 02-22-2010doc.doc').encode('ascii'))
def test_end_of_file(self):
# This test looks innocent but it was actually timeing out in
# the Werkzeug 0.5 release version (#394)
data = (
b'--foo\r\n'
b'Content-Disposition: form-data; name="test"; filename="test.txt"\r\n'
b'Content-Type: text/plain\r\n\r\n'
b'file contents and no end'
)
data = Request.from_values(input_stream=BytesIO(data),
content_length=len(data),
content_type='multipart/form-data; boundary=foo',
method='POST')
assert not data.files
assert not data.form
def test_broken(self):
data = (
'--foo\r\n'
'Content-Disposition: form-data; name="test"; filename="test.txt"\r\n'
'Content-Transfer-Encoding: base64\r\n'
'Content-Type: text/plain\r\n\r\n'
'broken base 64'
'--foo--'
)
_, form, files = formparser.parse_form_data(create_environ(
data=data, method='POST', content_type='multipart/form-data; boundary=foo'
))
assert not files
assert not form
pytest.raises(ValueError, formparser.parse_form_data,
create_environ(data=data, method='POST',
content_type='multipart/form-data; boundary=foo'),
silent=False)
def test_file_no_content_type(self):
data = (
b'--foo\r\n'
b'Content-Disposition: form-data; name="test"; filename="test.txt"\r\n\r\n'
b'file contents\r\n--foo--'
)
data = Request.from_values(input_stream=BytesIO(data),
content_length=len(data),
content_type='multipart/form-data; boundary=foo',
method='POST')
assert data.files['test'].filename == 'test.txt'
strict_eq(data.files['test'].read(), b'file contents')
def test_extra_newline(self):
# this test looks innocent but it was actually timeing out in
# the Werkzeug 0.5 release version (#394)
data = (
b'\r\n\r\n--foo\r\n'
b'Content-Disposition: form-data; name="foo"\r\n\r\n'
b'a string\r\n'
b'--foo--'
)
data = Request.from_values(input_stream=BytesIO(data),
content_length=len(data),
content_type='multipart/form-data; boundary=foo',
method='POST')
assert not data.files
strict_eq(data.form['foo'], u'a string')
def test_headers(self):
data = (b'--foo\r\n'
b'Content-Disposition: form-data; name="foo"; filename="foo.txt"\r\n'
b'X-Custom-Header: blah\r\n'
b'Content-Type: text/plain; charset=utf-8\r\n\r\n'
b'file contents, just the contents\r\n'
b'--foo--')
req = Request.from_values(input_stream=BytesIO(data),
content_length=len(data),
content_type='multipart/form-data; boundary=foo',
method='POST')
foo = req.files['foo']
strict_eq(foo.mimetype, 'text/plain')
strict_eq(foo.mimetype_params, {'charset': 'utf-8'})
strict_eq(foo.headers['content-type'], foo.content_type)
strict_eq(foo.content_type, 'text/plain; charset=utf-8')
strict_eq(foo.headers['x-custom-header'], 'blah')
def test_nonstandard_line_endings(self):
for nl in b'\n', b'\r', b'\r\n':
data = nl.join((
b'--foo',
b'Content-Disposition: form-data; name=foo',
b'',
b'this is just bar',
b'--foo',
b'Content-Disposition: form-data; name=bar',
b'',
b'blafasel',
b'--foo--'
))
req = Request.from_values(input_stream=BytesIO(data),
content_length=len(data),
content_type='multipart/form-data; '
'boundary=foo', method='POST')
strict_eq(req.form['foo'], u'this is just bar')
strict_eq(req.form['bar'], u'blafasel')
def test_failures(self):
def parse_multipart(stream, boundary, content_length):
parser = formparser.MultiPartParser(content_length)
return parser.parse(stream, boundary, content_length)
pytest.raises(ValueError, parse_multipart, BytesIO(), b'broken ', 0)
data = b'--foo\r\n\r\nHello World\r\n--foo--'
pytest.raises(ValueError, parse_multipart, BytesIO(data), b'foo', len(data))
data = b'--foo\r\nContent-Disposition: form-field; name=foo\r\n' \
b'Content-Transfer-Encoding: base64\r\n\r\nHello World\r\n--foo--'
pytest.raises(ValueError, parse_multipart, BytesIO(data), b'foo', len(data))
data = b'--foo\r\nContent-Disposition: form-field; name=foo\r\n\r\nHello World\r\n'
pytest.raises(ValueError, parse_multipart, BytesIO(data), b'foo', len(data))
x = formparser.parse_multipart_headers(['foo: bar\r\n', ' x test\r\n'])
strict_eq(x['foo'], 'bar\n x test')
pytest.raises(ValueError, formparser.parse_multipart_headers,
['foo: bar\r\n', ' x test'])
def test_bad_newline_bad_newline_assumption(self):
class ISORequest(Request):
charset = 'latin1'
contents = b'U2vlbmUgbORu'
data = b'--foo\r\nContent-Disposition: form-data; name="test"\r\n' \
b'Content-Transfer-Encoding: base64\r\n\r\n' + \
contents + b'\r\n--foo--'
req = ISORequest.from_values(input_stream=BytesIO(data),
content_length=len(data),
content_type='multipart/form-data; boundary=foo',
method='POST')
strict_eq(req.form['test'], u'Sk\xe5ne l\xe4n')
def test_empty_multipart(self):
environ = {}
data = b'--boundary--'
environ['REQUEST_METHOD'] = 'POST'
environ['CONTENT_TYPE'] = 'multipart/form-data; boundary=boundary'
environ['CONTENT_LENGTH'] = str(len(data))
environ['wsgi.input'] = BytesIO(data)
stream, form, files = parse_form_data(environ, silent=False)
rv = stream.read()
assert rv == b''
assert form == MultiDict()
assert files == MultiDict()
class TestMultiPartParser(object):
def test_constructor_not_pass_stream_factory_and_cls(self):
parser = formparser.MultiPartParser()
assert parser.stream_factory is formparser.default_stream_factory
assert parser.cls is MultiDict
def test_constructor_pass_stream_factory_and_cls(self):
def stream_factory():
pass
parser = formparser.MultiPartParser(stream_factory=stream_factory, cls=dict)
assert parser.stream_factory is stream_factory
assert parser.cls is dict
class TestInternalFunctions(object):
def test_line_parser(self):
assert formparser._line_parse('foo') == ('foo', False)
assert formparser._line_parse('foo\r\n') == ('foo', True)
assert formparser._line_parse('foo\r') == ('foo', True)
assert formparser._line_parse('foo\n') == ('foo', True)
def test_find_terminator(self):
lineiter = iter(b'\n\n\nfoo\nbar\nbaz'.splitlines(True))
find_terminator = formparser.MultiPartParser()._find_terminator
line = find_terminator(lineiter)
assert line == b'foo'
assert list(lineiter) == [b'bar\n', b'baz']
assert find_terminator([]) == b''
assert find_terminator([b'']) == b''

474
tests/scripts/test_wsgi.py Normal file
View File

@ -0,0 +1,474 @@
# -*- coding: utf-8 -*-
"""
tests.wsgi
~~~~~~~~~~
Tests the WSGI utilities.
:copyright: (c) 2014 by Armin Ronacher.
:license: BSD, see LICENSE for more details.
"""
import os
import pytest
from os import path
from contextlib import closing
#from tests import strict_eq
from werkzeug.wrappers import BaseResponse
from werkzeug.exceptions import BadRequest, ClientDisconnected
from werkzeug.test import Client, create_environ, run_wsgi_app
from werkzeug import wsgi
from werkzeug._compat import StringIO, BytesIO, NativeStringIO, to_native, \
to_bytes
from werkzeug.wsgi import _RangeWrapper, wrap_file
def strict_eq(x, y):
'''Equality test bypassing the implicit string conversion in Python 2'''
__tracebackhide__ = True
assert x == y
assert issubclass(type(x), type(y)) or issubclass(type(y), type(x))
if isinstance(x, dict) and isinstance(y, dict):
x = sorted(x.items())
y = sorted(y.items())
elif isinstance(x, set) and isinstance(y, set):
x = sorted(x)
y = sorted(y)
assert repr(x) == repr(y)
def test_shareddatamiddleware_get_file_loader():
app = wsgi.SharedDataMiddleware(None, {})
assert callable(app.get_file_loader('foo'))
def test_shared_data_middleware(tmpdir):
def null_application(environ, start_response):
start_response('404 NOT FOUND', [('Content-Type', 'text/plain')])
yield b'NOT FOUND'
test_dir = str(tmpdir)
with open(path.join(test_dir, to_native(u'äöü', 'utf-8')), 'w') as test_file:
test_file.write(u'FOUND')
app = wsgi.SharedDataMiddleware(null_application, {
'/': path.join(path.dirname(__file__), 'res'),
'/sources': path.join(path.dirname(__file__), 'res'),
'/pkg': ('werkzeug.debug', 'shared'),
'/foo': test_dir
})
for p in '/test.txt', '/sources/test.txt', '/foo/äöü':
app_iter, status, headers = run_wsgi_app(app, create_environ(p))
assert status == '200 OK'
with closing(app_iter) as app_iter:
data = b''.join(app_iter).strip()
assert data == b'FOUND'
app_iter, status, headers = run_wsgi_app(
app, create_environ('/pkg/debugger.js'))
with closing(app_iter) as app_iter:
contents = b''.join(app_iter)
assert b'$(function() {' in contents
app_iter, status, headers = run_wsgi_app(
app, create_environ('/missing'))
assert status == '404 NOT FOUND'
assert b''.join(app_iter).strip() == b'NOT FOUND'
def test_dispatchermiddleware():
def null_application(environ, start_response):
start_response('404 NOT FOUND', [('Content-Type', 'text/plain')])
yield b'NOT FOUND'
def dummy_application(environ, start_response):
start_response('200 OK', [('Content-Type', 'text/plain')])
yield to_bytes(environ['SCRIPT_NAME'])
app = wsgi.DispatcherMiddleware(null_application, {
'/test1': dummy_application,
'/test2/very': dummy_application,
})
tests = {
'/test1': ('/test1', '/test1/asfd', '/test1/very'),
'/test2/very': ('/test2/very', '/test2/very/long/path/after/script/name')
}
for name, urls in tests.items():
for p in urls:
environ = create_environ(p)
app_iter, status, headers = run_wsgi_app(app, environ)
assert status == '200 OK'
assert b''.join(app_iter).strip() == to_bytes(name)
app_iter, status, headers = run_wsgi_app(
app, create_environ('/missing'))
assert status == '404 NOT FOUND'
assert b''.join(app_iter).strip() == b'NOT FOUND'
def test_get_host():
env = {'HTTP_X_FORWARDED_HOST': 'example.org',
'SERVER_NAME': 'bullshit', 'HOST_NAME': 'ignore me dammit'}
assert wsgi.get_host(env) == 'example.org'
assert wsgi.get_host(create_environ('/', 'http://example.org')) == \
'example.org'
def test_get_host_multiple_forwarded():
env = {'HTTP_X_FORWARDED_HOST': 'example.com, example.org',
'SERVER_NAME': 'bullshit', 'HOST_NAME': 'ignore me dammit'}
assert wsgi.get_host(env) == 'example.com'
assert wsgi.get_host(create_environ('/', 'http://example.com')) == \
'example.com'
def test_get_host_validation():
env = {'HTTP_X_FORWARDED_HOST': 'example.org',
'SERVER_NAME': 'bullshit', 'HOST_NAME': 'ignore me dammit'}
assert wsgi.get_host(env, trusted_hosts=['.example.org']) == 'example.org'
pytest.raises(BadRequest, wsgi.get_host, env,
trusted_hosts=['example.com'])
def test_responder():
def foo(environ, start_response):
return BaseResponse(b'Test')
client = Client(wsgi.responder(foo), BaseResponse)
response = client.get('/')
assert response.status_code == 200
assert response.data == b'Test'
def test_pop_path_info():
original_env = {'SCRIPT_NAME': '/foo', 'PATH_INFO': '/a/b///c'}
# regular path info popping
def assert_tuple(script_name, path_info):
assert env.get('SCRIPT_NAME') == script_name
assert env.get('PATH_INFO') == path_info
env = original_env.copy()
pop = lambda: wsgi.pop_path_info(env)
assert_tuple('/foo', '/a/b///c')
assert pop() == 'a'
assert_tuple('/foo/a', '/b///c')
assert pop() == 'b'
assert_tuple('/foo/a/b', '///c')
assert pop() == 'c'
assert_tuple('/foo/a/b///c', '')
assert pop() is None
def test_peek_path_info():
env = {
'SCRIPT_NAME': '/foo',
'PATH_INFO': '/aaa/b///c'
}
assert wsgi.peek_path_info(env) == 'aaa'
assert wsgi.peek_path_info(env) == 'aaa'
assert wsgi.peek_path_info(env, charset=None) == b'aaa'
assert wsgi.peek_path_info(env, charset=None) == b'aaa'
def test_path_info_and_script_name_fetching():
env = create_environ(u'/\N{SNOWMAN}', u'http://example.com/\N{COMET}/')
assert wsgi.get_path_info(env) == u'/\N{SNOWMAN}'
assert wsgi.get_path_info(env, charset=None) == u'/\N{SNOWMAN}'.encode('utf-8')
assert wsgi.get_script_name(env) == u'/\N{COMET}'
assert wsgi.get_script_name(env, charset=None) == u'/\N{COMET}'.encode('utf-8')
def test_query_string_fetching():
env = create_environ(u'/?\N{SNOWMAN}=\N{COMET}')
qs = wsgi.get_query_string(env)
strict_eq(qs, '%E2%98%83=%E2%98%84')
def test_limited_stream():
class RaisingLimitedStream(wsgi.LimitedStream):
def on_exhausted(self):
raise BadRequest('input stream exhausted')
io = BytesIO(b'123456')
stream = RaisingLimitedStream(io, 3)
strict_eq(stream.read(), b'123')
pytest.raises(BadRequest, stream.read)
io = BytesIO(b'123456')
stream = RaisingLimitedStream(io, 3)
strict_eq(stream.tell(), 0)
strict_eq(stream.read(1), b'1')
strict_eq(stream.tell(), 1)
strict_eq(stream.read(1), b'2')
strict_eq(stream.tell(), 2)
strict_eq(stream.read(1), b'3')
strict_eq(stream.tell(), 3)
pytest.raises(BadRequest, stream.read)
io = BytesIO(b'123456\nabcdefg')
stream = wsgi.LimitedStream(io, 9)
strict_eq(stream.readline(), b'123456\n')
strict_eq(stream.readline(), b'ab')
io = BytesIO(b'123456\nabcdefg')
stream = wsgi.LimitedStream(io, 9)
strict_eq(stream.readlines(), [b'123456\n', b'ab'])
io = BytesIO(b'123456\nabcdefg')
stream = wsgi.LimitedStream(io, 9)
strict_eq(stream.readlines(2), [b'12'])
strict_eq(stream.readlines(2), [b'34'])
strict_eq(stream.readlines(), [b'56\n', b'ab'])
io = BytesIO(b'123456\nabcdefg')
stream = wsgi.LimitedStream(io, 9)
strict_eq(stream.readline(100), b'123456\n')
io = BytesIO(b'123456\nabcdefg')
stream = wsgi.LimitedStream(io, 9)
strict_eq(stream.readlines(100), [b'123456\n', b'ab'])
io = BytesIO(b'123456')
stream = wsgi.LimitedStream(io, 3)
strict_eq(stream.read(1), b'1')
strict_eq(stream.read(1), b'2')
strict_eq(stream.read(), b'3')
strict_eq(stream.read(), b'')
io = BytesIO(b'123456')
stream = wsgi.LimitedStream(io, 3)
strict_eq(stream.read(-1), b'123')
io = BytesIO(b'123456')
stream = wsgi.LimitedStream(io, 0)
strict_eq(stream.read(-1), b'')
io = StringIO(u'123456')
stream = wsgi.LimitedStream(io, 0)
strict_eq(stream.read(-1), u'')
io = StringIO(u'123\n456\n')
stream = wsgi.LimitedStream(io, 8)
strict_eq(list(stream), [u'123\n', u'456\n'])
def test_limited_stream_disconnection():
io = BytesIO(b'A bit of content')
# disconnect detection on out of bytes
stream = wsgi.LimitedStream(io, 255)
with pytest.raises(ClientDisconnected):
stream.read()
# disconnect detection because file close
io = BytesIO(b'x' * 255)
io.close()
stream = wsgi.LimitedStream(io, 255)
with pytest.raises(ClientDisconnected):
stream.read()
def test_path_info_extraction():
x = wsgi.extract_path_info('http://example.com/app', '/app/hello')
assert x == u'/hello'
x = wsgi.extract_path_info('http://example.com/app',
'https://example.com/app/hello')
assert x == u'/hello'
x = wsgi.extract_path_info('http://example.com/app/',
'https://example.com/app/hello')
assert x == u'/hello'
x = wsgi.extract_path_info('http://example.com/app/',
'https://example.com/app')
assert x == u'/'
x = wsgi.extract_path_info(u'http://☃.net/', u'/fööbär')
assert x == u'/fööbär'
x = wsgi.extract_path_info(u'http://☃.net/x', u'http://☃.net/x/fööbär')
assert x == u'/fööbär'
env = create_environ(u'/fööbär', u'http://☃.net/x/')
x = wsgi.extract_path_info(env, u'http://☃.net/x/fööbär')
assert x == u'/fööbär'
x = wsgi.extract_path_info('http://example.com/app/',
'https://example.com/a/hello')
assert x is None
x = wsgi.extract_path_info('http://example.com/app/',
'https://example.com/app/hello',
collapse_http_schemes=False)
assert x is None
def test_get_host_fallback():
assert wsgi.get_host({
'SERVER_NAME': 'foobar.example.com',
'wsgi.url_scheme': 'http',
'SERVER_PORT': '80'
}) == 'foobar.example.com'
assert wsgi.get_host({
'SERVER_NAME': 'foobar.example.com',
'wsgi.url_scheme': 'http',
'SERVER_PORT': '81'
}) == 'foobar.example.com:81'
def test_get_current_url_unicode():
env = create_environ()
env['QUERY_STRING'] = 'foo=bar&baz=blah&meh=\xcf'
rv = wsgi.get_current_url(env)
strict_eq(rv,
u'http://localhost/?foo=bar&baz=blah&meh=\ufffd')
def test_multi_part_line_breaks():
data = 'abcdef\r\nghijkl\r\nmnopqrstuvwxyz\r\nABCDEFGHIJK'
test_stream = NativeStringIO(data)
lines = list(wsgi.make_line_iter(test_stream, limit=len(data),
buffer_size=16))
assert lines == ['abcdef\r\n', 'ghijkl\r\n', 'mnopqrstuvwxyz\r\n',
'ABCDEFGHIJK']
data = 'abc\r\nThis line is broken by the buffer length.' \
'\r\nFoo bar baz'
test_stream = NativeStringIO(data)
lines = list(wsgi.make_line_iter(test_stream, limit=len(data),
buffer_size=24))
assert lines == ['abc\r\n', 'This line is broken by the buffer '
'length.\r\n', 'Foo bar baz']
def test_multi_part_line_breaks_bytes():
data = b'abcdef\r\nghijkl\r\nmnopqrstuvwxyz\r\nABCDEFGHIJK'
test_stream = BytesIO(data)
lines = list(wsgi.make_line_iter(test_stream, limit=len(data),
buffer_size=16))
assert lines == [b'abcdef\r\n', b'ghijkl\r\n', b'mnopqrstuvwxyz\r\n',
b'ABCDEFGHIJK']
data = b'abc\r\nThis line is broken by the buffer length.' \
b'\r\nFoo bar baz'
test_stream = BytesIO(data)
lines = list(wsgi.make_line_iter(test_stream, limit=len(data),
buffer_size=24))
assert lines == [b'abc\r\n', b'This line is broken by the buffer '
b'length.\r\n', b'Foo bar baz']
def test_multi_part_line_breaks_problematic():
data = 'abc\rdef\r\nghi'
for x in range(1, 10):
test_stream = NativeStringIO(data)
lines = list(wsgi.make_line_iter(test_stream, limit=len(data),
buffer_size=4))
assert lines == ['abc\r', 'def\r\n', 'ghi']
def test_iter_functions_support_iterators():
data = ['abcdef\r\nghi', 'jkl\r\nmnopqrstuvwxyz\r', '\nABCDEFGHIJK']
lines = list(wsgi.make_line_iter(data))
assert lines == ['abcdef\r\n', 'ghijkl\r\n', 'mnopqrstuvwxyz\r\n',
'ABCDEFGHIJK']
def test_make_chunk_iter():
data = [u'abcdefXghi', u'jklXmnopqrstuvwxyzX', u'ABCDEFGHIJK']
rv = list(wsgi.make_chunk_iter(data, 'X'))
assert rv == [u'abcdef', u'ghijkl', u'mnopqrstuvwxyz', u'ABCDEFGHIJK']
data = u'abcdefXghijklXmnopqrstuvwxyzXABCDEFGHIJK'
test_stream = StringIO(data)
rv = list(wsgi.make_chunk_iter(test_stream, 'X', limit=len(data),
buffer_size=4))
assert rv == [u'abcdef', u'ghijkl', u'mnopqrstuvwxyz', u'ABCDEFGHIJK']
def test_make_chunk_iter_bytes():
data = [b'abcdefXghi', b'jklXmnopqrstuvwxyzX', b'ABCDEFGHIJK']
rv = list(wsgi.make_chunk_iter(data, 'X'))
assert rv == [b'abcdef', b'ghijkl', b'mnopqrstuvwxyz', b'ABCDEFGHIJK']
data = b'abcdefXghijklXmnopqrstuvwxyzXABCDEFGHIJK'
test_stream = BytesIO(data)
rv = list(wsgi.make_chunk_iter(test_stream, 'X', limit=len(data),
buffer_size=4))
assert rv == [b'abcdef', b'ghijkl', b'mnopqrstuvwxyz', b'ABCDEFGHIJK']
data = b'abcdefXghijklXmnopqrstuvwxyzXABCDEFGHIJK'
test_stream = BytesIO(data)
rv = list(wsgi.make_chunk_iter(test_stream, 'X', limit=len(data),
buffer_size=4, cap_at_buffer=True))
assert rv == [b'abcd', b'ef', b'ghij', b'kl', b'mnop', b'qrst', b'uvwx',
b'yz', b'ABCD', b'EFGH', b'IJK']
def test_lines_longer_buffer_size():
data = '1234567890\n1234567890\n'
for bufsize in range(1, 15):
lines = list(wsgi.make_line_iter(NativeStringIO(data), limit=len(data),
buffer_size=4))
assert lines == ['1234567890\n', '1234567890\n']
def test_lines_longer_buffer_size_cap():
data = '1234567890\n1234567890\n'
for bufsize in range(1, 15):
lines = list(wsgi.make_line_iter(NativeStringIO(data), limit=len(data),
buffer_size=4, cap_at_buffer=True))
assert lines == ['1234', '5678', '90\n', '1234', '5678', '90\n']
def test_range_wrapper():
response = BaseResponse(b'Hello World')
range_wrapper = _RangeWrapper(response.response, 6, 4)
assert next(range_wrapper) == b'Worl'
response = BaseResponse(b'Hello World')
range_wrapper = _RangeWrapper(response.response, 1, 0)
with pytest.raises(StopIteration):
next(range_wrapper)
response = BaseResponse(b'Hello World')
range_wrapper = _RangeWrapper(response.response, 6, 100)
assert next(range_wrapper) == b'World'
response = BaseResponse((x for x in (b'He', b'll', b'o ', b'Wo', b'rl', b'd')))
range_wrapper = _RangeWrapper(response.response, 6, 4)
assert not range_wrapper.seekable
assert next(range_wrapper) == b'Wo'
assert next(range_wrapper) == b'rl'
response = BaseResponse((x for x in (b'He', b'll', b'o W', b'o', b'rld')))
range_wrapper = _RangeWrapper(response.response, 6, 4)
assert next(range_wrapper) == b'W'
assert next(range_wrapper) == b'o'
assert next(range_wrapper) == b'rl'
with pytest.raises(StopIteration):
next(range_wrapper)
response = BaseResponse((x for x in (b'Hello', b' World')))
range_wrapper = _RangeWrapper(response.response, 1, 1)
assert next(range_wrapper) == b'e'
with pytest.raises(StopIteration):
next(range_wrapper)
resources = os.path.join(os.path.dirname(__file__), 'res')
env = create_environ()
with open(os.path.join(resources, 'test.txt'), 'rb') as f:
response = BaseResponse(wrap_file(env, f))
range_wrapper = _RangeWrapper(response.response, 1, 2)
assert range_wrapper.seekable
assert next(range_wrapper) == b'OU'
with pytest.raises(StopIteration):
next(range_wrapper)
with open(os.path.join(resources, 'test.txt'), 'rb') as f:
response = BaseResponse(wrap_file(env, f))
range_wrapper = _RangeWrapper(response.response, 2)
assert next(range_wrapper) == b'UND\n'
with pytest.raises(StopIteration):
next(range_wrapper)

15
tests/tests.yml Normal file
View File

@ -0,0 +1,15 @@
---
# Run Werkzeug wsgi and formparser tests
- hosts: localhost
roles:
- role: standard-test-basic
tags:
- classic
required_packages:
- python3-pytest
tests:
- simple:
dir: scripts
run: ./run_tests.sh