Source code for coaster.sqlalchemy.functions

"""
Helper functions
----------------
"""

from sqlalchemy import DDL, TIMESTAMP, Column, ForeignKey, Table, event, func, inspect
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.orm import ColumnProperty, relationship
from sqlalchemy.orm.exc import NoResultFound
from sqlalchemy.sql import functions

__all__ = [
    'make_timestamp_columns',
    'failsafe_add',
    'add_primary_relationship',
    'auto_init_default',
]


# --- SQL functions -----------------------------------------------------------

# Provide sqlalchemy.func.utcnow()
# Adapted from http://docs.sqlalchemy.org/en/rel_1_0/core/compiler.htm
# #utc-timestamp-function
class utcnow(functions.GenericFunction):  # NOQA: N801
    type = TIMESTAMP()  # NOQA: A003


@compiles(utcnow)
def _utcnow_default(element, compiler, **kw):
    return 'CURRENT_TIMESTAMP'


@compiles(utcnow, 'mysql')
def _utcnow_mysql(element, compiler, **kw):  # pragma: no cover
    return 'UTC_TIMESTAMP()'


@compiles(utcnow, 'mssql')
def _utcnow_mssql(element, compiler, **kw):  # pragma: no cover
    return 'SYSUTCDATETIME()'


# --- Helper functions --------------------------------------------------------


[docs]def make_timestamp_columns(timezone=False): """Return two columns, `created_at` and `updated_at`, with appropriate defaults""" return ( Column( 'created_at', TIMESTAMP(timezone=timezone), default=func.utcnow(), nullable=False, ), Column( 'updated_at', TIMESTAMP(timezone=timezone), default=func.utcnow(), onupdate=func.utcnow(), nullable=False, ), )
[docs]def failsafe_add(_session, _instance, **filters): """ Add and commit a new instance in a nested transaction (using SQL SAVEPOINT), gracefully handling failure in case a conflicting entry is already in the database (which may occur due to parallel requests causing race conditions in a production environment with multiple workers). Returns the instance saved to database if no error occurred, or loaded from database using the provided filters if an error occurred. If the filters fail to load from the database, the original IntegrityError is re-raised, as it is assumed to imply that the commit failed because of missing or invalid data, not because of a duplicate entry. However, when no filters are provided, nothing is returned and IntegrityError is also suppressed as there is no way to distinguish between data validation failure and an existing conflicting record in the database. Use this option when failures are acceptable but the cost of verification is not. Usage: ``failsafe_add(db.session, instance, **filters)`` where filters are the parameters passed to ``Model.query.filter_by(**filters).one()`` to load the instance. You must commit the transaction as usual after calling ``failsafe_add``. :param _session: Database session :param _instance: Instance to commit :param filters: Filters required to load existing instance from the database in case the commit fails (required) :return: Instance that is in the database """ if _instance in _session: # This instance is already in the session, most likely due to a # save-update cascade. SQLAlchemy will flush before beginning a # nested transaction, which defeats the purpose of nesting, so # remove it for now and add it back inside the SAVEPOINT. _session.expunge(_instance) savepoint = _session.begin_nested() try: _session.add(_instance) savepoint.commit() if filters: return _instance except IntegrityError as e: savepoint.rollback() if filters: try: return _session.query(_instance.__class__).filter_by(**filters).one() except NoResultFound: # Do not trap the other, MultipleResultsFound raise e
[docs]def add_primary_relationship(parent, childrel, child, parentrel, parentcol): """ When a parent-child relationship is defined as one-to-many, :func:`add_primary_relationship` lets the parent refer to one child as the primary, by creating a secondary table to hold the reference. Under PostgreSQL, a trigger is added as well to ensure foreign key integrity. A SQLAlchemy relationship named ``parent.childrel`` is added that makes usage seamless within SQLAlchemy. The secondary table is named after the parent and child tables, with ``_primary`` appended, in the form ``parent_child_primary``. This table can be found in the metadata in the ``parent.metadata.tables`` dictionary. Multi-column primary keys on either parent or child are unsupported at this time. :param parent: The parent model (on which this relationship will be added) :param childrel: The name of the relationship to the child that will be added :param child: The child model :param str parentrel: Name of the existing relationship on the child model that refers back to the parent model :param str parentcol: Name of the existing table column on the child model that refers back to the parent model :return: Secondary table that was created """ parent_table_name = parent.__tablename__ child_table_name = child.__tablename__ primary_table_name = parent_table_name + '_' + child_table_name + '_primary' parent_id_columns = [c.name for c in inspect(parent).primary_key] child_id_columns = [c.name for c in inspect(child).primary_key] primary_table_columns = ( [ Column( parent_table_name + '_' + name, None, ForeignKey(parent_table_name + '.' + name, ondelete='CASCADE'), primary_key=True, nullable=False, ) for name in parent_id_columns ] + [ Column( child_table_name + '_' + name, None, ForeignKey(child_table_name + '.' + name, ondelete='CASCADE'), nullable=False, ) for name in child_id_columns ] + list(make_timestamp_columns(timezone=parent.__with_timezone__)) ) primary_table = Table(primary_table_name, parent.metadata, *primary_table_columns) rel = relationship(child, uselist=False, secondary=primary_table) setattr(parent, childrel, rel) @event.listens_for(rel, 'set') def _validate_child(target, value, oldvalue, initiator): if value and getattr(value, parentrel) != target: raise ValueError("The target is not affiliated with this parent") # XXX: To support multi-column primary keys, update this SQL function event.listen( primary_table, 'after_create', DDL( ''' CREATE FUNCTION %(function)s() RETURNS TRIGGER AS $$ DECLARE target RECORD; BEGIN IF (NEW.%(rhs)s IS NOT NULL) THEN SELECT %(parentcol)s INTO target FROM %(child_table_name)s WHERE %(child_id_column)s = NEW.%(rhs)s; IF (target.%(parentcol)s != NEW.%(lhs)s) THEN RAISE foreign_key_violation USING MESSAGE = 'The target is not affiliated with this parent'; END IF; END IF; RETURN NEW; END; $$ LANGUAGE plpgsql; CREATE TRIGGER %(trigger)s BEFORE INSERT OR UPDATE ON %(table)s FOR EACH ROW EXECUTE PROCEDURE %(function)s(); ''', context={ 'table': primary_table_name, 'function': '%s_validate' % primary_table_name, 'trigger': '%s_trigger' % primary_table_name, 'parentcol': parentcol, 'child_table_name': child_table_name, 'child_id_column': child_id_columns[0], 'lhs': '%s_%s' % (parent_table_name, parent_id_columns[0]), 'rhs': '%s_%s' % (child_table_name, child_id_columns[0]), }, ).execute_if(dialect='postgresql'), ) event.listen( primary_table, 'before_drop', DDL( ''' DROP TRIGGER %(trigger)s ON %(table)s; DROP FUNCTION %(function)s(); ''', context={ 'table': primary_table_name, 'trigger': '%s_trigger' % primary_table_name, 'function': '%s_validate' % primary_table_name, }, ).execute_if(dialect='postgresql'), ) return primary_table
[docs]def auto_init_default(column): """ Set the default value for a column when it's first accessed rather than first committed to the database. """ if isinstance(column, ColumnProperty): default = column.columns[0].default else: default = column.default @event.listens_for(column, 'init_scalar', retval=True, propagate=True) def init_scalar(target, value, dict_): # A subclass may override the column and not provide a default. Watch out for # that. if default: if default.is_callable: value = default.arg(None) elif default.is_scalar: value = default.arg else: raise NotImplementedError( "Can't invoke pre-default for a SQL-level column default" ) dict_[column.key] = value return value