from __future__ import absolute_import, division, print_function, with_statement
from tornado import gen
from tornado.escape import json_decode, utf8, to_unicode, recursive_unicode, native_str, to_basestring
from tornado.httputil import format_timestamp
from tornado.iostream import IOStream
from tornado.log import app_log, gen_log
from tornado.simple_httpclient import SimpleAsyncHTTPClient
from tornado.template import DictLoader
from tornado.testing import AsyncHTTPTestCase, ExpectLog
from tornado.test.util import unittest
from tornado.util import u, bytes_type, ObjectDict, unicode_type
from tornado.web import RequestHandler, authenticated, Application, asynchronous, url, HTTPError, StaticFileHandler, _create_signature_v1, create_signed_value, decode_signed_value, ErrorHandler, UIModule, MissingArgumentError
import binascii
import datetime
import email.utils
import logging
import os
import re
import socket
import sys
try:
import urllib.parse as urllib_parse # py3
except ImportError:
import urllib as urllib_parse # py2
wsgi_safe_tests = []
relpath = lambda *a: os.path.join(os.path.dirname(__file__), *a)
def wsgi_safe(cls):
wsgi_safe_tests.append(cls)
return cls
class WebTestCase(AsyncHTTPTestCase):
"""Base class for web tests that also supports WSGI mode.
Override get_handlers and get_app_kwargs instead of get_app.
Append to wsgi_safe to have it run in wsgi_test as well.
"""
def get_app(self):
self.app = Application(self.get_handlers(), **self.get_app_kwargs())
return self.app
def get_handlers(self):
raise NotImplementedError()
def get_app_kwargs(self):
return {}
class SimpleHandlerTestCase(WebTestCase):
"""Simplified base class for tests that work with a single handler class.
To use, define a nested class named ``Handler``.
"""
def get_handlers(self):
return [('/', self.Handler)]
class HelloHandler(RequestHandler):
def get(self):
self.write('hello')
class CookieTestRequestHandler(RequestHandler):
# stub out enough methods to make the secure_cookie functions work
def __init__(self):
# don't call super.__init__
self._cookies = {}
self.application = ObjectDict(settings=dict(cookie_secret='0123456789'))
def get_cookie(self, name):
return self._cookies.get(name)
def set_cookie(self, name, value, expires_days=None):
self._cookies[name] = value
# See SignedValueTest below for more.
class SecureCookieV1Test(unittest.TestCase):
def test_round_trip(self):
handler = CookieTestRequestHandler()
handler.set_secure_cookie('foo', b'bar', version=1)
self.assertEqual(handler.get_secure_cookie('foo', min_version=1),
b'bar')
def test_cookie_tampering_future_timestamp(self):
handler = CookieTestRequestHandler()
# this string base64-encodes to '12345678'
handler.set_secure_cookie('foo', binascii.a2b_hex(b'd76df8e7aefc'),
version=1)
cookie = handler._cookies['foo']
match = re.match(br'12345678\|([0-9]+)\|([0-9a-f]+)', cookie)
self.assertTrue(match)
timestamp = match.group(1)
sig = match.group(2)
self.assertEqual(
_create_signature_v1(handler.application.settings["cookie_secret"],
'foo', '12345678', timestamp),
sig)
# shifting digits from payload to timestamp doesn't alter signature
# (this is not desirable behavior, just confirming that that's how it
# works)
self.assertEqual(
_create_signature_v1(handler.application.settings["cookie_secret"],
'foo', '1234', b'5678' + timestamp),
sig)
# tamper with the cookie
handler._cookies['foo'] = utf8('1234|5678%s|%s' % (
to_basestring(timestamp), to_basestring(sig)))
# it gets rejected
with ExpectLog(gen_log, "Cookie timestamp in future"):
self.assertTrue(
handler.get_secure_cookie('foo', min_version=1) is None)
def test_arbitrary_bytes(self):
# Secure cookies accept arbitrary data (which is base64 encoded).
# Note that normal cookies accept only a subset of ascii.
handler = CookieTestRequestHandler()
handler.set_secure_cookie('foo', b'\xe9', version=1)
self.assertEqual(handler.get_secure_cookie('foo', min_version=1), b'\xe9')
class CookieTest(WebTestCase):
def get_handlers(self):
class SetCookieHandler(RequestHandler):
def get(self):
# Try setting cookies with different argument types
# to ensure that everything gets encoded correctly
self.set_cookie("str", "asdf")
self.set_cookie("unicode", u("qwer"))
self.set_cookie("bytes", b"zxcv")
class GetCookieHandler(RequestHandler):
def get(self):
self.write(self.get_cookie("foo", "default"))
class SetCookieDomainHandler(RequestHandler):
def get(self):
# unicode domain and path arguments shouldn't break things
# either (see bug #285)
self.set_cookie("unicode_args", "blah", domain=u("foo.com"),
path=u("/foo"))
class SetCookieSpecialCharHandler(RequestHandler):
def get(self):
self.set_cookie("equals", "a=b")
self.set_cookie("semicolon", "a;b")
self.set_cookie("quote", 'a"b')
class SetCookieOverwriteHandler(RequestHandler):
def get(self):
self.set_cookie("a", "b", domain="example.com")
self.set_cookie("c", "d", domain="example.com")
# A second call with the same name clobbers the first.
# Attributes from the first call are not carried over.
self.set_cookie("a", "e")
return [("/set", SetCookieHandler),
("/get", GetCookieHandler),
("/set_domain", SetCookieDomainHandler),
("/special_char", SetCookieSpecialCharHandler),
("/set_overwrite", SetCookieOverwriteHandler),
]
def test_set_cookie(self):
response = self.fetch("/set")
self.assertEqual(sorted(response.headers.get_list("Set-Cookie")),
["bytes=zxcv; Path=/",
"str=asdf; Path=/",
"unicode=qwer; Path=/",
])
def test_get_cookie(self):
response = self.fetch("/get", headers={"Cookie": "foo=bar"})
self.assertEqual(response.body, b"bar")
response = self.fetch("/get", headers={"Cookie": 'foo="bar"'})
self.assertEqual(response.body, b"bar")
response = self.fetch("/get", headers={"Cookie": "/=exception;"})
self.assertEqual(response.body, b"default")
def test_set_cookie_domain(self):
response = self.fetch("/set_domain")
self.assertEqual(response.headers.get_list("Set-Cookie"),
["unicode_args=blah; Domain=foo.com; Path=/foo"])
def test_cookie_special_char(self):
response = self.fetch("/special_char")
headers = sorted(response.headers.get_list("Set-Cookie"))
self.assertEqual(len(headers), 3)
self.assertEqual(headers[0], 'equals="a=b"; Path=/')
self.assertEqual(headers[1], 'quote="a\\"b"; Path=/')
# python 2.7 octal-escapes the semicolon; older versions leave it alone
self.assertTrue(headers[2] in ('semicolon="a;b"; Path=/',
'semicolon="a\\073b"; Path=/'),
headers[2])
data = [('foo=a=b', 'a=b'),
('foo="a=b"', 'a=b'),
('foo="a;b"', 'a;b'),
# ('foo=a\\073b', 'a;b'), # even encoded, ";" is a delimiter
('foo="a\\073b"', 'a;b'),
('foo="a\\"b"', 'a"b'),
]
for header, expected in data:
logging.debug("trying %r", header)
response = self.fetch("/get", headers={"Cookie": header})
self.assertEqual(response.body, utf8(expected))
def test_set_cookie_overwrite(self):
response = self.fetch("/set_overwrite")
headers = response.headers.get_list("Set-Cookie")
self.assertEqual(sorted(headers),
["a=e; Path=/", "c=d; Domain=example.com; Path=/"])
class AuthRedirectRequestHandler(RequestHandler):
def initialize(self, login_url):
self.login_url = login_url
def get_login_url(self):
return self.login_url
@authenticated
def get(self):
# we'll never actually get here because the test doesn't follow redirects
self.send_error(500)
class AuthRedirectTest(WebTestCase):
def get_handlers(self):
return [('/relative', AuthRedirectRequestHandler,
dict(login_url='/login')),
('/absolute', AuthRedirectRequestHandler,
dict(login_url='http://example.com/login'))]
def test_relative_auth_redirect(self):
self.http_client.fetch(self.get_url('/relative'), self.stop,
follow_redirects=False)
response = self.wait()
self.assertEqual(response.code, 302)
self.assertEqual(response.headers['Location'], '/login?next=%2Frelative')
def test_absolute_auth_redirect(self):
self.http_client.fetch(self.get_url('/absolute'), self.stop,
follow_redirects=False)
response = self.wait()
self.assertEqual(response.code, 302)
self.assertTrue(re.match(
'http://example.com/login\?next=http%3A%2F%2Flocalhost%3A[0-9]+%2Fabsolute',
response.headers['Location']), response.headers['Location'])
class ConnectionCloseHandler(RequestHandler):
def initialize(self, test):
self.test = test
@asynchronous
def get(self):
self.test.on_handler_waiting()
def on_connection_close(self):
self.test.on_connection_close()
class ConnectionCloseTest(WebTestCase):
def get_handlers(self):
return [('/', ConnectionCloseHandler, dict(test=self))]
def test_connection_close(self):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
s.connect(("localhost", self.get_http_port()))
self.stream = IOStream(s, io_loop=self.io_loop)
self.stream.write(b"GET / HTTP/1.0\r\n\r\n")
self.wait()
def on_handler_waiting(self):
logging.debug('handler waiting')
self.stream.close()
def on_connection_close(self):
logging.debug('connection closed')
self.stop()
class EchoHandler(RequestHandler):
def get(self, *path_args):
# Type checks: web.py interfaces convert argument values to
# unicode strings (by default, but see also decode_argument).
# In httpserver.py (i.e. self.request.arguments), they're left
# as bytes. Keys are always native strings.
for key in self.request.arguments:
if type(key) != str:
raise Exception("incorrect type for key: %r" % type(key))
for value in self.request.arguments[key]:
if type(value) != bytes_type:
raise Exception("incorrect type for value: %r" %
type(value))
for value in self.get_arguments(key):
if type(value) != unicode_type:
raise Exception("incorrect type for value: %r" %
type(value))
for arg in path_args:
if type(arg) != unicode_type:
raise Exception("incorrect type for path arg: %r" % type(arg))
self.write(dict(path=self.request.path,
path_args=path_args,
args=recursive_unicode(self.request.arguments)))
class RequestEncodingTest(WebTestCase):
def get_handlers(self):
return [("/group/(.*)", EchoHandler),
("/slashes/([^/]*)/([^/]*)", EchoHandler),
]
def fetch_json(self, path):
return json_decode(self.fetch(path).body)
def test_group_question_mark(self):
# Ensure that url-encoded question marks are handled properly
self.assertEqual(self.fetch_json('/group/%3F'),
dict(path='/group/%3F', path_args=['?'], args={}))
self.assertEqual(self.fetch_json('/group/%3F?%3F=%3F'),
dict(path='/group/%3F', path_args=['?'], args={'?': ['?']}))
def test_group_encoding(self):
# Path components and query arguments should be decoded the same way
self.assertEqual(self.fetch_json('/group/%C3%A9?arg=%C3%A9'),
{u("path"): u("/group/%C3%A9"),
u("path_args"): [u("\u00e9")],
u("args"): {u("arg"): [u("\u00e9")]}})
def test_slashes(self):
# Slashes may be escaped to appear as a single "directory" in the path,
# but they are then unescaped when passed to the get() method.
self.assertEqual(self.fetch_json('/slashes/foo/bar'),
dict(path="/slashes/foo/bar",
path_args=["foo", "bar"],
args={}))
self.assertEqual(self.fetch_json('/slashes/a%2Fb/c%2Fd'),
dict(path="/slashes/a%2Fb/c%2Fd",
path_args=["a/b", "c/d"],
args={}))
class TypeCheckHandler(RequestHandler):
def prepare(self):
self.errors = {}
self.check_type('status', self.get_status(), int)
# get_argument is an exception from the general rule of using
# type str for non-body data mainly for historical reasons.
self.check_type('argument', self.get_argument('foo'), unicode_type)
self.check_type('cookie_key', list(self.cookies.keys())[0], str)
self.check_type('cookie_value', list(self.cookies.values())[0].value, str)
# Secure cookies return bytes because they can contain arbitrary
# data, but regular cookies are native strings.
if list(self.cookies.keys()) != ['asdf']:
raise Exception("unexpected values for cookie keys: %r" %
self.cookies.keys())
self.check_type('get_secure_cookie', self.get_secure_cookie('asdf'), bytes_type)
self.check_type('get_cookie', self.get_cookie('asdf'), str)
self.check_type('xsrf_token', self.xsrf_token, bytes_type)
self.check_type('xsrf_form_html', self.xsrf_form_html(), str)
self.check_type('reverse_url', self.reverse_url('typecheck', 'foo'), str)
self.check_type('request_summary', self._request_summary(), str)
def get(self, path_component):
# path_component uses type unicode instead of str for consistency
# with get_argument()
self.check_type('path_component', path_component, unicode_type)
self.write(self.errors)
def post(self, path_component):
self.check_type('path_component', path_component, unicode_type)
self.write(self.errors)
def check_type(self, name, obj, expected_type):
actual_type = type(obj)
if expected_type != actual_type:
self.errors[name] = "expected %s, got %s" % (expected_type,
actual_type)
class DecodeArgHandler(RequestHandler):
def decode_argument(self, value, name=None):
if type(value) != bytes_type:
raise Exception("unexpected type for value: %r" % type(value))
# use self.request.arguments directly to avoid recursion
if 'encoding' in self.request.arguments:
return value.decode(to_unicode(self.request.arguments['encoding'][0]))
else:
return value
def get(self, arg):
def describe(s):
if type(s) == bytes_type:
return ["bytes", native_str(binascii.b2a_hex(s))]
elif type(s) == unicode_type:
return ["unicode", s]
raise Exception("unknown type")
self.write({'path': describe(arg),
'query': describe(self.get_argument("foo")),
})
class LinkifyHandler(RequestHandler):
def get(self):
self.render("linkify.html", message="http://example.com")
class UIModuleResourceHandler(RequestHandler):
def get(self):
self.render("page.html", entries=[1, 2])
class OptionalPathHandler(RequestHandler):
def get(self, path):
self.write({"path": path})
class FlowControlHandler(RequestHandler):
# These writes are too small to demonstrate real flow control,
# but at least it shows that the callbacks get run.
@asynchronous
def get(self):
self.write("1")
self.flush(callback=self.step2)
def step2(self):
self.write("2")
self.flush(callback=self.step3)
def step3(self):
self.write("3")
self.finish()
class MultiHeaderHandler(RequestHandler):
def get(self):
self.set_header("x-overwrite", "1")
self.set_header("X-Overwrite", 2)
self.add_header("x-multi", 3)
self.add_header("X-Multi", "4")
class RedirectHandler(RequestHandler):
def get(self):
if self.get_argument('permanent', None) is not None:
self.redirect('/', permanent=int(self.get_argument('permanent')))
elif self.get_argument('status', None) is not None:
self.redirect('/', status=int(self.get_argument('status')))
else:
raise Exception("didn't get permanent or status arguments")
class EmptyFlushCallbackHandler(RequestHandler):
@gen.engine
@asynchronous
def get(self):
# Ensure that the flush callback is run whether or not there
# was any output.
yield gen.Task(self.flush) # "empty" flush, but writes headers
yield gen.Task(self.flush) # empty flush
self.write("o")
yield gen.Task(self.flush) # flushes the "o"
yield gen.Task(self.flush) # empty flush
self.finish("k")
class HeaderInjectionHandler(RequestHandler):
def get(self):
try:
self.set_header("X-Foo", "foo\r\nX-Bar: baz")
raise Exception("Didn't get expected exception")
except ValueError as e:
if "Unsafe header value" in str(e):
self.finish(b"ok")
else:
raise
class GetArgumentHandler(RequestHandler):
def prepare(self):
if self.get_argument('source', None) == 'query':
method = self.get_query_argument
elif self.get_argument('source', None) == 'body':
method = self.get_body_argument
else:
method = self.get_argument
self.finish(method("foo", "default"))
class GetArgumentsHandler(RequestHandler):
def prepare(self):
self.finish(dict(default=self.get_arguments("foo"),
query=self.get_query_arguments("foo"),
body=self.get_body_arguments("foo")))
# This test is shared with wsgi_test.py
@wsgi_safe
class WSGISafeWebTest(WebTestCase):
COOKIE_SECRET = "WebTest.COOKIE_SECRET"
def get_app_kwargs(self):
loader = DictLoader({
"linkify.html": "{% module linkify(message) %}",
"page.html": """\
{% for e in entries %}
{% module Template("entry.html", entry=e) %}
{% end %}
""",
"entry.html": """\
{{ set_resources(embedded_css=".entry { margin-bottom: 1em; }", embedded_javascript="js_embed()", css_files=["/base.css", "/foo.css"], javascript_files="/common.js", html_head="", html_body='') }}