# -*- coding: utf-8 -*-
"""Tornado request argument parsing module.
Example: ::
import tornado.web
from marshmallow import fields
from webargs.tornadoparser import use_args
class HelloHandler(tornado.web.RequestHandler):
@use_args({'name': fields.Str(missing='World')})
def get(self, args):
response = {'message': 'Hello {}'.format(args['name'])}
self.write(response)
"""
import tornado.web
from tornado.escape import _unicode
from marshmallow.compat import basestring
from webargs import core
[docs]class HTTPError(tornado.web.HTTPError):
"""`tornado.web.HTTPError` that stores validation errors."""
def __init__(self, *args, **kwargs):
self.messages = kwargs.pop('messages', {})
super(HTTPError, self).__init__(*args, **kwargs)
[docs]def parse_json_body(req):
"""Return the decoded JSON body from the request."""
content_type = req.headers.get('Content-Type')
if content_type and core.is_json(content_type):
try:
return core.parse_json(req.body)
except (TypeError, ValueError):
pass
return {}
# From tornado.web.RequestHandler.decode_argument
[docs]def decode_argument(value, name=None):
"""Decodes an argument from the request.
"""
try:
return _unicode(value)
except UnicodeDecodeError:
raise HTTPError(400, "Invalid unicode in %s: %r" %
(name or "url", value[:40]))
[docs]def get_value(d, name, field):
"""Handle gets from 'multidicts' made of lists
It handles cases: ``{"key": [value]}`` and ``{"key": value}``
"""
multiple = core.is_multiple(field)
value = d.get(name, core.missing)
if value is core.missing:
return core.missing
if multiple and value is not core.missing:
return [decode_argument(v, name) if isinstance(v, basestring) else v
for v in value]
ret = value
if value and isinstance(value, (list, tuple)):
ret = value[0]
if isinstance(ret, basestring):
return decode_argument(ret, name)
else:
return ret
[docs]class TornadoParser(core.Parser):
"""Tornado request argument parser."""
def __init__(self, *args, **kwargs):
super(TornadoParser, self).__init__(*args, **kwargs)
self.json = None
[docs] def parse_json(self, req, name, field):
"""Pull a json value from the request."""
json_data = self._cache.get('json')
if json_data is None:
self._cache['json'] = json_data = parse_json_body(req)
if json_data is None:
return core.missing
return core.get_value(json_data, name, field, allow_many_nested=True)
[docs] def parse_querystring(self, req, name, field):
"""Pull a querystring value from the request."""
return get_value(req.query_arguments, name, field)
[docs] def parse_cookies(self, req, name, field):
"""Pull a value from the header data."""
cookie = req.cookies.get(name)
if cookie is not None:
return [cookie.value] if core.is_multiple(field) else cookie.value
else:
return [] if core.is_multiple(field) else None
[docs] def parse_files(self, req, name, field):
"""Pull a file from the request."""
return get_value(req.files, name, field)
[docs] def handle_error(self, error):
"""Handles errors during parsing. Raises a `tornado.web.HTTPError`
with a 400 error.
"""
status_code = getattr(error, 'status_code', core.DEFAULT_VALIDATION_STATUS)
if status_code == 422:
reason = 'Unprocessable Entity'
else:
reason = None
raise HTTPError(status_code, log_message=str(error.messages),
reason=reason, messages=error.messages)
[docs] def get_request_from_view_args(self, view, args, kwargs):
return args[0].request
parser = TornadoParser()
use_args = parser.use_args
use_kwargs = parser.use_kwargs