# -*- coding: utf-8 -*-
"""
View decorators
---------------
Decorators for view handlers.
All items in this module can be imported directly from :mod:`coaster.views`.
"""
from __future__ import absolute_import
import six
from functools import wraps
from flask import (
Response,
abort,
current_app,
g,
jsonify,
make_response,
redirect,
render_template,
request,
url_for,
)
from werkzeug.datastructures import Headers
from werkzeug.exceptions import BadRequest
from werkzeug.wrappers import Response as WerkzeugResponse
from ..auth import add_auth_attribute, current_auth
from ..utils import is_collection
from .misc import jsonp
__all__ = [
'RequestTypeError',
'RequestValueError',
'requestargs',
'requestform',
'requestquery',
'load_model',
'load_models',
'render_with',
'cors',
'requires_permission',
]
[docs]class RequestTypeError(BadRequest, TypeError):
"""
Exception that combines TypeError with BadRequest. Used by :func:`requestargs`.
"""
[docs]class RequestValueError(BadRequest, ValueError):
"""
Exception that combines ValueError with BadRequest. Used by :func:`requestargs`.
"""
[docs]def requestargs(*args, **config):
"""
Decorator that loads parameters from request.values if not specified in the
function's keyword arguments. Usage::
@requestargs('param1', ('param2', int), 'param3[]', ...)
def function(param1, param2=0, param3=None):
...
requestargs takes a list of parameters to pass to the wrapped function, with
an optional filter (useful to convert incoming string request data into integers
and other common types). If a required parameter is missing and your function does
not specify a default value, Python will raise TypeError. requestargs recasts this
as :exc:`RequestTypeError`, which returns HTTP 400 Bad Request.
If the parameter name ends in ``[]``, requestargs will attempt to read a list from
the incoming data. Filters are applied to each member of the list, not to the whole
list.
If the filter raises a ValueError, this is recast as a :exc:`RequestValueError`,
which also returns HTTP 400 Bad Request.
Tests::
>>> from flask import Flask
>>> app = Flask(__name__)
>>>
>>> @requestargs('p1', ('p2', int), ('p3[]', int))
... def f(p1, p2=None, p3=None):
... return p1, p2, p3
...
>>> f(p1=1)
(1, None, None)
>>> f(p1=1, p2=2)
(1, 2, None)
>>> f(p1='a', p2='b')
('a', 'b', None)
>>> with app.test_request_context('/?p2=2'):
... f(p1='1')
...
('1', 2, None)
>>> with app.test_request_context('/?p3=1&p3=2'):
... f(p1='1', p2='2')
...
('1', '2', [1, 2])
>>> with app.test_request_context('/?p2=100&p3=1&p3=2'):
... f(p1='1', p2=200)
...
('1', 200, [1, 2])
"""
if config and list(config.keys()) != ['source']:
raise TypeError("Unrecognised parameters: %s" % repr(config.keys()))
def inner(f):
namefilt = [
(name[:-2], filt, True) if name.endswith('[]') else (name, filt, False)
for name, filt in [
(a[0], a[1]) if isinstance(a, (list, tuple)) else (a, None)
for a in args
]
]
if config and config.get('source') == 'form':
def datasource():
return request.form if request else {}
elif config and config.get('source') == 'query':
def datasource():
return request.args if request else {}
else:
def datasource():
return request.values if request else {}
@wraps(f)
def decorated_function(*args, **kw):
values = datasource()
for name, filt, is_list in namefilt:
# Process name if
# (a) it's not in the function's parameters, and
# (b) is in the form/query
if name not in kw and name in values:
try:
if is_list:
kw[name] = values.getlist(name, type=filt)
else:
kw[name] = values.get(name, type=filt)
except ValueError as e:
raise RequestValueError(e)
try:
return f(*args, **kw)
except TypeError as e:
raise RequestTypeError(e)
return decorated_function
return inner
[docs]def requestquery(*args):
"""
Like :func:`requestargs`, but loads from request.args (the query string).
"""
return requestargs(*args, **{'source': 'query'})
[docs]def load_model(
model,
attributes=None,
parameter=None,
kwargs=False,
permission=None,
addlperms=None,
urlcheck=(),
):
"""
Decorator to load a model given a query parameter.
Typical usage::
@app.route('/<profile>')
@load_model(Profile, {'name': 'profile'}, 'profileob')
def profile_view(profileob):
# 'profileob' is now a Profile model instance.
# The load_model decorator replaced this:
# profileob = Profile.query.filter_by(name=profile).first_or_404()
return "Hello, %s" % profileob.name
Using the same name for request and parameter makes code easier to understand::
@app.route('/<profile>')
@load_model(Profile, {'name': 'profile'}, 'profile')
def profile_view(profile):
return "Hello, %s" % profile.name
``load_model`` aborts with a 404 if no instance is found.
:param model: The SQLAlchemy model to query. Must contain a ``query`` object
(which is the default with Flask-SQLAlchemy)
:param attributes: A dict of attributes (from the URL request) that will be
used to query for the object. For each key:value pair, the key is the name of
the column on the model and the value is the name of the request parameter that
contains the data
:param parameter: The name of the parameter to the decorated function via which
the result is passed. Usually the same as the attribute. If the parameter name
is prefixed with 'g.', the parameter is also made available as g.<parameter>
:param kwargs: If True, the original request parameters are passed to the decorated
function as a ``kwargs`` parameter
:param permission: If present, ``load_model`` calls the
:meth:`~coaster.sqlalchemy.PermissionMixin.permissions` method of the
retrieved object with ``current_auth.actor`` as a parameter. If
``permission`` is not present in the result, ``load_model`` aborts with
a 403. The permission may be a string or a list of strings, in which
case access is allowed if any of the listed permissions are available
:param addlperms: Iterable or callable that returns an iterable containing
additional permissions available to the user, apart from those granted by the
models. In an app that uses Lastuser for authentication, passing
``lastuser.permissions`` will pass through permissions granted via Lastuser
:param list urlcheck: If an attribute in this list has been used to load an object,
but the value of the attribute in the loaded object does not match the request
argument, issue a redirect to the corrected URL. This is useful for attributes
like ``url_id_name`` and ``url_name_uuid_b58`` where the ``name`` component may
change
"""
return load_models(
(model, attributes, parameter),
kwargs=kwargs,
permission=permission,
addlperms=addlperms,
urlcheck=urlcheck,
)
[docs]def load_models(*chain, **kwargs):
"""
Decorator to load a chain of models from the given parameters. This works just like
:func:`load_model` and accepts the same parameters, with some small differences.
:param chain: The chain is a list of tuples of (``model``, ``attributes``,
``parameter``). Lists and tuples can be used interchangeably. All retrieved
instances are passed as parameters to the decorated function
:param permission: Same as in :func:`load_model`, except
:meth:`~coaster.sqlalchemy.PermissionMixin.permissions` is called on every
instance in the chain and the retrieved permissions are passed as the second
parameter to the next instance in the chain. This allows later instances to
revoke permissions granted by earlier instances. As an example, if a URL
represents a hierarchy such as ``/<page>/<comment>``, the ``page`` can assign
``edit`` and ``delete`` permissions, while the ``comment`` can revoke ``edit``
and retain ``delete`` if the current user owns the page but not the comment
In the following example, load_models loads a Folder with a name matching the name
in the URL, then loads a Page with a matching name and with the just-loaded Folder
as parent. If the Page provides a 'view' permission to the current user, the
decorated function is called::
@app.route('/<folder_name>/<page_name>')
@load_models(
(Folder, {'name': 'folder_name'}, 'folder'),
(Page, {'name': 'page_name', 'parent': 'folder'}, 'page'),
permission='view')
def show_page(folder, page):
return render_template('page.html', folder=folder, page=page)
"""
def inner(f):
@wraps(f)
def decorated_function(*args, **kw):
permissions = None
permission_required = kwargs.get('permission')
url_check_attributes = kwargs.get('urlcheck', [])
if isinstance(permission_required, six.string_types):
permission_required = {permission_required}
elif permission_required is not None:
permission_required = set(permission_required)
result = {}
for models, attributes, parameter in chain:
if not isinstance(models, (list, tuple)):
models = (models,)
item = None
for model in models:
query = model.query
url_check = False
url_check_paramvalues = {}
for k, v in attributes.items():
if callable(v):
query = query.filter_by(**{k: v(result, kw)})
else:
if '.' in v:
first, attrs = v.split('.', 1)
val = result.get(first)
for attr in attrs.split('.'):
val = getattr(val, attr)
else:
val = result.get(v, kw.get(v))
query = query.filter_by(**{k: val})
if k in url_check_attributes:
url_check = True
url_check_paramvalues[k] = (v, val)
item = query.first()
if item is not None:
# We found it, so don't look in additional models
break
if item is None:
abort(404)
if hasattr(item, 'redirect_view_args'):
# This item is a redirect object. Redirect to destination
view_args = dict(request.view_args)
view_args.update(item.redirect_view_args())
location = url_for(request.endpoint, **view_args)
if request.query_string:
location = location + u'?' + request.query_string.decode()
return redirect(location, code=307)
if permission_required:
permissions = item.permissions(
current_auth.actor, inherited=permissions
)
addlperms = kwargs.get('addlperms') or []
if callable(addlperms):
addlperms = addlperms() or []
permissions.update(addlperms)
if g: # XXX: Deprecated
g.permissions = permissions
if request:
add_auth_attribute('permissions', permissions)
if (
url_check and request.method == 'GET'
): # Only do urlcheck redirects on GET requests
url_redirect = False
view_args = None
for k, v in url_check_paramvalues.items():
uparam, uvalue = v
if getattr(item, k) != uvalue:
url_redirect = True
if view_args is None:
view_args = dict(request.view_args)
view_args[uparam] = getattr(item, k)
if url_redirect:
location = url_for(request.endpoint, **view_args)
if request.query_string:
location = location + u'?' + request.query_string.decode()
return redirect(location, code=302)
if parameter.startswith('g.'):
parameter = parameter[2:]
setattr(g, parameter, item)
result[parameter] = item
if permission_required and not permission_required & permissions:
abort(403)
if kwargs.get('kwargs'):
return f(*args, kwargs=kw, **result)
else:
return f(*args, **result)
return decorated_function
return inner
def _best_mimetype_match(available_list, accept_mimetypes, default=None):
for use_mimetype, quality in accept_mimetypes:
for mimetype in available_list:
if use_mimetype.lower() == mimetype.lower():
return use_mimetype.lower()
return default
def dict_jsonify(param):
"""
Convert the parameter into a dictionary before calling jsonify, if it's not already
one
"""
if not isinstance(param, dict):
param = dict(param)
return jsonify(param)
def dict_jsonp(param):
"""
Convert the parameter into a dictionary before calling jsonp, if it's not already
one
"""
if not isinstance(param, dict):
param = dict(param)
return jsonp(param)
[docs]def render_with(template=None, json=False, jsonp=False): # skipcq: PYL-W0621
"""
Decorator to render the wrapped function with the given template (or dictionary
of mimetype keys to templates, where the template is a string name of a template
file or a callable that returns a Response). The function's return value must be
a dictionary and is passed to the template as parameters. Callable templates get
a single parameter with the function's return value. Usage::
@app.route('/myview')
@render_with('myview.html')
def myview():
return {'data': 'value'}
@app.route('/myview_with_json')
@render_with('myview.html', json=True)
def myview_no_json():
return {'data': 'value'}
@app.route('/otherview')
@render_with({
'text/html': 'otherview.html',
'text/xml': 'otherview.xml'})
def otherview():
return {'data': 'value'}
@app.route('/404view')
@render_with('myview.html')
def myview():
return {'error': '404 Not Found'}, 404
@app.route('/headerview')
@render_with('myview.html')
def myview():
return {'data': 'value'}, 200, {'X-Header': 'Header value'}
When a mimetype is specified and the template is not a callable, the response is
returned with the same mimetype. Callable templates must return Response objects
to ensure the correct mimetype is set.
If a dictionary of templates is provided and does not include a handler for ``*/*``,
render_with will attempt to use the handler for (in order) ``text/html``,
``text/plain`` and the various JSON types, falling back to rendering the value into
a unicode string.
If the method is called outside a request context, the wrapped method's original
return value is returned. This is meant to facilitate testing and should not be
used to call the method from within another view handler as the presence of a
request context will trigger template rendering.
Rendering may also be suspended by calling the view handler with ``_render=False``.
render_with provides JSON and JSONP handlers for the ``application/json``,
``text/json`` and ``text/x-json`` mimetypes if ``json`` or ``jsonp`` is True
(default is False).
:param template: Single template, or dictionary of MIME type to templates. If the
template is a callable, it is called with the output of the wrapped function
:param json: Helper to add a JSON handler (default is False)
:param jsonp: Helper to add a JSONP handler (if True, also provides JSON, default
is False)
"""
if jsonp:
templates = {
'application/json': dict_jsonp,
'application/javascript': dict_jsonp,
}
elif json:
templates = {'application/json': dict_jsonify}
else:
templates = {}
if isinstance(template, six.string_types):
templates['text/html'] = template
elif isinstance(template, dict):
templates.update(template)
elif template is None and (json or jsonp):
pass
else: # pragma: no cover
raise ValueError("Expected string or dict for template")
default_mimetype = '*/*'
if '*/*' not in templates:
templates['*/*'] = six.text_type
default_mimetype = 'text/plain'
for mimetype in ('text/html', 'text/plain', 'application/json'):
if mimetype in templates:
templates['*/*'] = templates[mimetype]
default_mimetype = (
mimetype # Remember which mimetype's handler is serving for */*
)
break
template_mimetypes = list(templates.keys())
template_mimetypes.remove(
'*/*'
) # */* messes up matching, so supply it only as last resort
def inner(f):
@wraps(f)
def decorated_function(*args, **kwargs):
# Check if we need to bypass rendering
render = kwargs.pop('_render', True)
# Get the result
result = f(*args, **kwargs)
# Is the result a Response object? Don't attempt rendering
if isinstance(
result, (Response, WerkzeugResponse, current_app.response_class)
):
return result
# Did the result include status code and headers?
if isinstance(result, tuple):
resultset = result
result = resultset[0]
if len(resultset) > 1:
status_code = resultset[1]
else:
status_code = None
if len(resultset) > 2:
headers = Headers(resultset[2])
else:
headers = Headers()
else:
status_code = None
headers = Headers()
if len(templates) > 1: # If we have more than one template handler
if 'Vary' in headers:
vary_values = [item.strip() for item in headers['Vary'].split(',')]
if 'Accept' not in vary_values:
vary_values.append('Accept')
headers['Vary'] = ', '.join(vary_values)
else:
headers['Vary'] = 'Accept'
# Find a matching mimetype between Accept headers and available templates
use_mimetype = None
if render and request:
# We do not use request.accept_mimetypes.best_match because it turns out
# to be buggy: it returns the least match instead of the best match.
# Previously:
# use_mimetype = request.accept_mimetypes.best_match(template_mimetypes,
# '*/*')
use_mimetype = _best_mimetype_match(
template_mimetypes, request.accept_mimetypes, '*/*'
)
# Now render the result with the template for the mimetype
if use_mimetype is not None:
if callable(templates[use_mimetype]):
rendered = templates[use_mimetype](result)
if isinstance(rendered, Response):
if status_code is not None:
rendered.status_code = status_code
if headers is not None:
rendered.headers.extend(headers)
else:
rendered = current_app.response_class(
rendered,
status=status_code,
headers=headers,
mimetype=default_mimetype
if use_mimetype == '*/*'
else use_mimetype,
)
else: # Not a callable mimetype. Render as a jinja2 template
rendered = current_app.response_class(
render_template(templates[use_mimetype], **result),
status=status_code or 200,
headers=headers,
mimetype=default_mimetype
if use_mimetype == '*/*'
else use_mimetype,
)
return rendered
else:
return result
return decorated_function
return inner
[docs]def cors(
origins,
methods=('HEAD', 'OPTIONS', 'GET', 'POST', 'PUT', 'PATCH', 'DELETE'),
headers=(
'Accept',
'Accept-Language',
'Content-Language',
'Content-Type',
'X-Requested-With',
),
max_age=None,
):
"""
Adds CORS headers to the decorated view function.
:param origins: Allowed origins (see below)
:param methods: A list of allowed HTTP methods
:param headers: A list of allowed HTTP headers
:param max_age: Duration in seconds for which the CORS response may be cached
The :obj:`origins` parameter may be one of:
1. A callable that receives the origin as a parameter.
2. A list of origins.
3. ``*``, indicating that this resource is accessible by any origin.
Example use::
from flask import Flask, Response
from coaster.views import cors
app = Flask(__name__)
@app.route('/any')
@cors('*')
def any_origin():
return Response()
@app.route('/static', methods=['GET', 'POST'])
@cors(
['https://hasgeek.com'],
methods=['GET'],
headers=['Content-Type', 'X-Requested-With'],
max_age=3600)
def static_list():
return Response()
def check_origin(origin):
# check if origin should be allowed
return True
@app.route('/callable')
@cors(check_origin)
def callable_function():
return Response()
"""
def inner(f):
@wraps(f)
def wrapper(*args, **kwargs):
origin = request.headers.get('Origin')
if request.method not in methods:
abort(405)
if origins == '*':
pass
elif is_collection(origins) and origin in origins:
pass
elif callable(origins) and origins(origin):
pass
else:
abort(403)
if request.method == 'OPTIONS':
# pre-flight request
resp = Response()
else:
result = f(*args, **kwargs)
resp = (
make_response(result)
if not isinstance(
result, (Response, WerkzeugResponse, current_app.response_class)
)
else result
)
resp.headers['Access-Control-Allow-Origin'] = origin if origin else ''
resp.headers['Access-Control-Allow-Methods'] = ', '.join(methods)
resp.headers['Access-Control-Allow-Headers'] = ', '.join(headers)
if max_age:
resp.headers['Access-Control-Max-Age'] = str(max_age)
# Add 'Origin' to the Vary header since response will vary by origin
if 'Vary' in resp.headers:
vary_values = [item.strip() for item in resp.headers['Vary'].split(',')]
if 'Origin' not in vary_values:
vary_values.append('Origin')
resp.headers['Vary'] = ', '.join(vary_values)
else:
resp.headers['Vary'] = 'Origin'
return resp
return wrapper
return inner
[docs]def requires_permission(permission):
"""
View decorator that requires a certain permission to be present in
``current_auth.permissions`` before the view is allowed to proceed.
Aborts with ``403 Forbidden`` if the permission is not present.
The decorated view will have an ``is_available`` method that can be called
to perform the same test.
:param permission: Permission that is required. If a collection type is
provided, any one permission must be available
"""
def inner(f):
def is_available_here():
if not current_auth.permissions:
return False
elif is_collection(permission):
return bool(current_auth.permissions & permission)
else:
return permission in current_auth.permissions
def is_available(context=None):
result = is_available_here()
if result and hasattr(f, 'is_available'):
# We passed, but we're wrapping another test, so ask there as well
return f.is_available(context)
return result
@wraps(f)
def wrapper(*args, **kwargs):
add_auth_attribute('login_required', True)
if not is_available_here():
abort(403)
return f(*args, **kwargs)
wrapper.requires_permission = permission
wrapper.is_available = is_available
return wrapper
return inner