"""
States and transitions
----------------------
:class:`StateManager` wraps a SQLAlchemy column with a
:class:`~coaster.utils.classes.LabeledEnum` to facilitate state inspection, and
to control state change via transitions. Sample usage::
class MY_STATE(LabeledEnum):
DRAFT = (0, "Draft")
PENDING = (1, 'pending', "Pending")
PUBLISHED = (2, "Published")
UNPUBLISHED = {DRAFT, PENDING}
# Classes can have more than one state variable
class REVIEW_STATE(LabeledEnum):
UNSUBMITTED = (0, "Unsubmitted")
PENDING = (1, "Pending")
REVIEWED = (2, "Reviewed")
class MyPost(BaseMixin, db.Model):
__tablename__ = 'my_post'
# The underlying state value columns
# (more than one state variable can exist)
_state = db.Column('state', db.Integer,
StateManager.check_constraint('state', MY_STATE),
default=MY_STATE.DRAFT, nullable=False)
_reviewstate = db.Column('reviewstate', db.Integer,
StateManager.check_constraint('state', REVIEW_STATE),
default=REVIEW_STATE.UNSUBMITTED, nullable=False)
# The state managers controlling the columns
state = StateManager('_state', MY_STATE, doc="The post's state")
reviewstate = StateManager('_reviewstate', REVIEW_STATE,
doc="Reviewer's state")
# Datetime for the additional states and transitions
datetime = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)
# Additional states:
# RECENT = PUBLISHED + in the last one hour
state.add_conditional_state('RECENT', state.PUBLISHED,
lambda post: post.datetime > datetime.utcnow() - timedelta(hours=1))
# REDRAFTABLE = DRAFT or PENDING or RECENT
state.add_state_group('REDRAFTABLE',
state.DRAFT, state.PENDING, state.RECENT)
# Transitions change FROM one state TO another, and can have
# an additional if_ condition (a callable) that must return True
@state.transition(state.DRAFT, state.PENDING, if_=reviewstate.UNSUBMITTED)
def submit(self):
pass
# Transitions can coordinate across state managers. All of them
# must be in a valid FROM state for the transition to be available.
# Transitions can also specify arbitrary metadata such as this `title`
# attribute (on any of the decorators). These are made available in a
# `data` dictionary, accessible here as `publish.data`
@state.transition(state.UNPUBLISHED, state.PUBLISHED, title="Publish")
@reviewstate.transition(reviewstate.UNSUBMITTED, reviewstate.PENDING)
def publish(self):
# A transition can do additional housekeeping
self.datetime = datetime.utcnow()
# A transition can use a conditional state. The condition is evaluated
# before the transition can proceed
@state.transition(state.RECENT, state.PENDING)
@reviewstate.transition(reviewstate.PENDING, reviewstate.UNSUBMITTED)
def undo(self):
pass
# Transitions can be defined FROM a group of states, but the TO
# state must always be an individual state
@state.transition(state.REDRAFTABLE, state.DRAFT)
def redraft(self):
pass
# Transitions can abort without changing state, with or without raising
# an exception to the caller
@state.transition(state.REDRAFTABLE, state.DRAFT)
def faulty_transition_examples(self):
# Cancel the transition, but don't raise an exception to the caller
raise AbortTransition()
# Cancel the transition and return a result to the caller
raise AbortTransition('failed')
# Need to return a data structure? That works as well
raise AbortTransition((False, 'faulty_failure'))
raise AbortTransition({'status': 'error', 'error': 'faulty_failure'})
# If any other exception is raised, it is passed up to the caller
raise ValueError("Faulty transition")
# The requires decorator specifies a transition that does not change
# state. It can be used to limit a method's availability
@state.requires(state.PUBLISHED)
def send_email_alert(self):
pass
Defining states and transitions
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Adding a :class:`StateManager` to the class links the underlying column
(specified as a string) to the :class:`~coaster.utils.classes.LabeledEnum`
(specified as an object). The :class:`StateManager` is read-only and state can
only be mutated via transitions. The :class:`~coaster.utils.classes.LabeledEnum`
is not required after this point. All symbol names in it are available as
attributes on the state manager henceforth (as instances of
:class:`ManagedState`).
Conditional states can be defined with
:meth:`~StateManager.add_conditional_state` as a combination of an existing
state and a validator that receives the object (the instance of the class
the StateManager is present on). This can be used to evaluate for additional
conditions. For example, to distinguish between a static "published" state and
a dynamic "recently published" state.
:meth:`~StateManager.add_conditional_state` also takes an optional
``class_validator`` parameter that is used for queries against the class (see
below for query examples).
State groups can be defined with :meth:`~StateManager.add_state_group`. These
are similar to grouped values in a LabeledEnum, but can also contain
conditional states, and are stored as instances of :class:`ManagedStateGroup`.
Grouped values in a :class:`~coaster.utils.classes.LabeledEnum` are more
efficient for testing state against, so those should be preferred if the group
does not contain a conditional state.
Transitions connect one managed state or group to another state (but not
group). Transitions are defined as methods and decorated with
:meth:`~StateManager.transition`, which transforms them into instances of
:class:`StateTransition`, a callable class. If the transition raises an
exception, the state change is aborted. Transitions may also abort without
changing state using :exc:`AbortTransition`. Transitions have two additional
attributes, :attr:`~StateTransitionWrapper.is_available`, a boolean property
which indicates if the transition is currently available, and
:attr:`~StateTransition.data`, a dictionary that contains all additional
parameters passed to the :meth:`~StateManager.transition` decorator.
Transitions can be chained to coordinate a state change across state managers
if the class has more than one. All state managers must be in a valid ``from``
state for the transition to be available. A dictionary of currently available
transitions can be obtained from the state manager using the
:meth:`~StateManagerWrapper.transitions` method.
Queries
~~~~~~~
The current state of the object can be retrieved by calling the state
attribute or reading its ``value`` attribute::
post = MyPost(_state=MY_STATE.DRAFT)
post.state() == MY_STATE.DRAFT
post.state.value == MY_STATE.DRAFT
The label associated with the state value can be accessed from the ``label`` attribute::
post.state.label == "Draft" # This is the string label from MY_STATE.DRAFT
post.submit() # Change state from DRAFT to PENDING
post.state.label.name == 'pending' # Is the NameTitle tuple from MY_STATE.PENDING
post.state.label.title == "Pending" # The title part of NameTitle
States can be tested by direct reference using the names they were originally
defined with in the :class:`~coaster.utils.classes.LabeledEnum`::
post.state.DRAFT # True
post.state.is_draft # True (is_* attrs are lowercased aliases to states)
post.state.PENDING # False (since it's a draft)
post.state.UNPUBLISHED # True (grouped state values work as expected)
post.publish() # Change state from DRAFT to PUBLISHED
post.state.RECENT # True (calls the validator if the base state matches)
States can also be used for database queries when accessed from the class::
# Generates MyPost._state == MY_STATE.DRAFT
MyPost.query.filter(MyPost.state.DRAFT)
# Generates MyPost._state.in_(MY_STATE.UNPUBLISHED)
MyPost.query.filter(MyPost.state.UNPUBLISHED)
# Generates and_(MyPost._state == MY_STATE.PUBLISHED,
# MyPost.datetime > datetime.utcnow() - timedelta(hours=1))
MyPost.query.filter(MyPost.state.RECENT)
This works because :class:`StateManager`, :class:`ManagedState`
and :class:`ManagedStateGroup` behave in three different ways, depending on
context:
1. During class definition, the state manager returns the managed state. All
methods on the state manager recognise these managed states and handle them
appropriately.
2. After class definition, the state manager returns the result of calling the
managed state instance. If accessed via the class, the managed state returns
a SQLAlchemy filter condition.
3. After class definition, if accessed via an instance, the managed state
returns itself wrapped in :class:`ManagedStateWrapper` (which holds context
for the instance). This is an object that evaluates to ``True`` if the state
is active, ``False`` otherwise. It also provides pass-through access to
all attributes of the managed state.
States can be changed via transitions, defined as methods with the
:meth:`~StateManager.transition` decorator. They add more power and safeguards
over direct state value changes:
1. Original and final states can be specified, prohibiting arbitrary state
changes.
2. The transition method can do additional validation and housekeeping.
3. Combined with the :func:`~coaster.sqlalchemy.roles.with_roles` decorator
and :class:`~coaster.sqlalchemy.roles.RoleMixin`, transitions provide
access control for state changes.
4. Signals are raised before and after a successful transition, or in case
of failures, allowing for the attempts to be logged.
"""
from __future__ import annotations
from typing import Generic, Optional, Type, TypeVar, Union
import functools
from sqlalchemy import CheckConstraint, and_
from sqlalchemy import column as column_constructor
from sqlalchemy import or_
from werkzeug.exceptions import BadRequest
from ..signals import coaster_signals
from ..utils import NameTitle, is_collection
from .roles import RoleMixin
__all__ = [
'StateManager',
'ManagedState',
'ManagedStateGroup',
'StateTransition',
'StateManagerWrapper',
'ManagedStateWrapper',
'StateTransitionWrapper',
'StateTransitionError',
'AbortTransition',
'transition_error',
'transition_before',
'transition_after',
'transition_exception',
]
# --- Internal types -------------------------------------------------------------------
T = TypeVar('T')
# --- Signals --------------------------------------------------------------------------
#: Signal raised when a transition fails validation
transition_error = coaster_signals.signal(
'transition-error', doc="Signal raised when a transition fails validation"
)
#: Signal raised before a transition (after validation)
transition_before = coaster_signals.signal(
'transition-before', doc="Signal raised before a transition (after validation)"
)
#: Signal raised after a successful transition
transition_after = coaster_signals.signal(
'transition-after', doc="Signal raised after a successful transition"
)
#: Signal raised when a transition raises an exception
transition_exception = coaster_signals.signal(
'transition-exception', doc="Signal raised when a transition raises an exception"
)
# --- Exceptions -----------------------------------------------------------------------
[docs]class StateTransitionError(BadRequest, TypeError):
"""Raised if a transition is attempted from a non-matching state"""
[docs]class AbortTransition(Exception):
"""
Transitions may raise :exc:`AbortTransition` to return without changing
state. The parameter to this exception is returned as the transition's
result.
This exception is a signal to :class:`StateTransition` and will not be
raised to the transition's caller.
:param result: Value to return to the transition's caller
"""
def __init__(self, result=None):
super().__init__(result)
# --- Classes --------------------------------------------------------------------------
[docs]class ManagedState:
"""
Represents a state managed by a :class:`StateManager`. Do not use this
class directly. Use :meth:`~StateManager.add_conditional_state` instead.
"""
def __init__(
self,
name,
statemanager,
value,
label=None,
validator=None,
class_validator=None,
cache_for=None,
):
self.name = name
self.statemanager = statemanager
self.value = value
self.label = label
self.validator = validator
self.class_validator = class_validator
self.cache_for = cache_for
@property
def is_conditional(self):
"""This is a conditional state"""
return self.validator is not None
@property
def is_scalar(self):
"""
This is a scalar state (not a group of states, and may or may not have a
condition)
"""
return not is_collection(self.value)
@property
def is_direct(self):
"""This is a direct state (scalar state without a condition)"""
return self.validator is None and not is_collection(self.value)
def __repr__(self):
return '%s.%s' % (self.statemanager.name, self.name)
def _eval(self, obj, cls=None):
# TODO: Respect cache as specified in `cache_for`
if obj is not None: # We're being called with an instance
if is_collection(self.value):
valuematch = self.statemanager._value(obj, cls) in self.value
else:
valuematch = self.statemanager._value(obj, cls) == self.value
if self.validator is not None:
return valuematch and self.validator(obj)
return valuematch
# We have a class, so return a filter condition, for use as
# cls.query.filter(result)
if is_collection(self.value):
valuematch = self.statemanager._value(obj, cls).in_(self.value)
else:
valuematch = self.statemanager._value(obj, cls) == self.value
cv = self.class_validator
if cv is None:
cv = self.validator
if cv is not None:
return and_(valuematch, cv(cls))
return valuematch
def __call__(self, obj, cls=None):
if obj is not None:
return ManagedStateWrapper(self, obj, cls)
return self._eval(obj, cls)
[docs]class ManagedStateGroup:
"""
Represents a group of managed states in a :class:`StateManager`. Do not use
this class directly. Use :meth:`~StateManager.add_state_group` instead.
"""
def __init__(self, name, statemanager, states):
self.name = name
self.statemanager = statemanager
self.states = []
# First, ensure all provided states are StateManager instances and associated
# with the state manager
for state in states:
if (
not isinstance(state, ManagedState)
or state.statemanager != statemanager
):
raise ValueError(
"Invalid state %s for state group %s" % (repr(state), repr(self))
)
# Second, separate conditional from regular states (regular states may still be
# grouped states)
regular_states = [s for s in states if not s.is_conditional]
conditional_states = [s for s in states if s.is_conditional]
# Third, add all the regular states and keep a copy of their state values
values = set()
for state in regular_states:
self.states.append(state)
if is_collection(state.value):
values.update(state.value)
else:
values.add(state.value)
# Fourth, prevent adding a conditional state if the value is already present
# from a regular state. This is an error as the condition will never be tested
for state in conditional_states:
# Prevent grouping of conditional states with their original states
state_values = set(
state.value if is_collection(state.value) else [state.value]
)
if state_values & values: # They overlap
raise ValueError(
"The value for state %s is already in this state group"
% repr(state)
)
self.states.append(state)
values.update(state_values)
def __repr__(self):
return '%s.%s' % (self.statemanager.name, self.name)
def _eval(self, obj, cls=None):
if obj is not None: # We're being called with an instance
return any(s(obj, cls) for s in self.states)
return or_(*[s(obj, cls) for s in self.states])
def __call__(self, obj, cls=None):
if obj is not None:
return ManagedStateWrapper(self, obj, cls)
return self._eval(obj, cls)
[docs]class ManagedStateWrapper:
"""
Wraps a :class:`ManagedState` or :class:`ManagedStateGroup` with
an object or class, and otherwise provides transparent access to contents.
This class is automatically constructed by :class:`StateManager`.
"""
def __init__(self, mstate, obj, cls=None):
if not isinstance(mstate, (ManagedState, ManagedStateGroup)):
raise TypeError("Parameter is not a managed state: %s" % repr(mstate))
self._mstate = mstate
self._obj = obj
self._cls = cls
def __repr__(self):
return '<ManagedStateWrapper %s>' % repr(self._mstate)
def __call__(self):
return self._mstate._eval(self._obj, self._cls)
def __getattr__(self, attr):
return getattr(self._mstate, attr)
def __eq__(self, other):
return (
isinstance(other, ManagedStateWrapper)
and self._mstate == other._mstate
and self._obj == other._obj
and self._cls == other._cls
)
def __ne__(self, other):
return not self.__eq__(other)
def __bool__(self):
return self()
__nonzero__ = __bool__
[docs]class StateTransition:
"""
Helper for transitions from one state to another. Do not use this class
directly. Use the :meth:`StateManager.transition` decorator instead, which
creates instances of this class.
To access the decorated function with ``help()``, use ``help(obj.func)``.
"""
def __init__(self, func, statemanager, from_, to, if_=None, data=None):
self.func = func
functools.update_wrapper(self, func)
self.name = func.__name__
# Repeated use of @StateManager.transition will add to this dictionary
# by calling add_transition directly
self.transitions = {}
# Repeated use of @StateManager.transition will update this dictionary
# instead of replacing it
self.data = {}
self.add_transition(statemanager, from_, to, if_, data)
def add_transition(self, statemanager, from_, to, if_=None, data=None):
if statemanager in self.transitions:
raise StateTransitionError("Duplicate transition decorator")
if from_ is not None and not isinstance(
from_, (ManagedState, ManagedStateGroup)
):
raise StateTransitionError(
"From state is not a managed state: %s" % repr(from_)
)
if from_ and from_.statemanager != statemanager:
raise StateTransitionError(
"From state is not managed by this state manager: %s" % repr(from_)
)
if to is not None:
if not isinstance(to, ManagedState):
raise StateTransitionError(
"To state is not a managed state: %s" % repr(to)
)
if to.statemanager != statemanager:
raise StateTransitionError(
"To state is not managed by this state manager: %s" % repr(to)
)
if not to.is_direct:
raise StateTransitionError(
"To state must be a direct state: %s" % repr(to)
)
if data:
if 'name' in data:
raise TypeError("Invalid transition data parameter 'name'")
self.data.update(data)
self.data['name'] = self.name
if if_ is None:
if_ = []
elif callable(if_):
if_ = [if_]
if from_ is None:
state_values = None
else:
# Unroll grouped values so we can do a quick IN test when performing the
# transition
state_values = {} # Value: ManagedState
# Step 1: Convert ManagedStateGroup into a list of ManagedState items
if isinstance(from_, ManagedStateGroup):
from_ = from_.states
else: # ManagedState
from_ = [from_]
# Step 2: Unroll grouped values from the original LabeledEnum
for mstate in from_:
if is_collection(mstate.value):
for value in mstate.value:
state_values[value] = mstate
else:
state_values[mstate.value] = mstate
self.transitions[statemanager] = {
'from': state_values, # Dict of scalar_value: ManagedState
'to': to, # ManagedState (is_direct) of new state
'if': if_, # Additional conditions that must ALL pass
}
def __set_name__(self, owner, name): # pragma: no cover
self.name = name
self.data['name'] = name
# Make the transition a non-data descriptor
def __get__(self, obj, cls=None):
if obj is None:
return self
return StateTransitionWrapper(self, obj)
[docs]class StateTransitionWrapper:
"""
Wraps :class:`StateTransition` with the context of the object it is
accessed from. Automatically constructed by :class:`StateTransition`.
"""
def __init__(self, statetransition, obj):
self.statetransition = statetransition
self.obj = obj
@property
def data(self):
"""
Dictionary containing all additional parameters to the
:meth:`~StateManager.transition` decorator.
"""
return self.statetransition.data
def _state_invalid(self):
"""
If the state is invalid for the transition, return details on what didn't match
:return: Tuple of (state manager, current state, label for current state)
"""
for statemanager, conditions in self.statetransition.transitions.items():
current_state = getattr(self.obj, statemanager.propname)
if conditions['from'] is None:
state_valid = True
else:
mstate = conditions['from'].get(current_state)
state_valid = mstate and mstate(self.obj)
if state_valid and conditions['if']:
state_valid = all(v(self.obj) for v in conditions['if'])
if not state_valid:
return (
statemanager,
current_state,
statemanager.lenum.get(current_state),
)
@property
def is_available(self):
"""
Property that indicates whether this transition is currently available.
"""
return not self._state_invalid()
def __getattr__(self, name):
return getattr(self.statetransition, name)
def __call__(self, *args, **kwargs):
"""Call the transition"""
# Validate that each of the state managers is in the correct state
state_invalid = self._state_invalid()
if state_invalid:
transition_error.send(
self.obj, transition=self.statetransition, statemanager=state_invalid[0]
)
label = state_invalid[2]
if isinstance(label, NameTitle):
label = label.title
raise StateTransitionError(
"Invalid state for transition {transition}: {state} = {label}".format(
transition=self.statetransition.name,
state=repr(state_invalid[0]),
label=label,
)
)
# Send a transition-before signal
transition_before.send(self.obj, transition=self.statetransition)
# Call the transition method
try:
result = self.statetransition.func(self.obj, *args, **kwargs)
except AbortTransition as e:
transition_exception.send(
self.obj, transition=self.statetransition, exception=e
)
return e.args[0]
except Exception as e: # NOQA: B902
transition_exception.send(
self.obj, transition=self.statetransition, exception=e
)
raise
# Change the state for each of the state managers
for statemanager, conditions in self.statetransition.transitions.items():
if (
conditions['to'] is not None
): # Allow to=None for the @requires decorator
statemanager._set(self.obj, conditions['to'].value) # Change state
# Send a transition-after signal
transition_after.send(self.obj, transition=self.statetransition)
return result
[docs]class StateManager:
"""
Wraps a property with a :class:`~coaster.utils.classes.LabeledEnum` to
facilitate state inspection and control state changes.
This is the main export of this module.
:param str propname: Name of the property that is to be wrapped
:param LabeledEnum lenum: The :class:`~coaster.utils.classes.LabeledEnum` containing
valid values
:param str doc: Optional docstring
"""
def __init__(self, propname, lenum, doc=None):
self.owner = None # Depend on __set_name__ or __get__ to correct
self.propname = propname
self.name = propname # Incorrect, so we depend on __set_name__ to correct this
self.lenum = lenum
self.__doc__ = doc
# name: ManagedState/ManagedStateGroup
self.states = {}
# value: ManagedState (no conditional states or groups)
self.states_by_value = {}
# Same, but as a list including conditional states
self.all_states_by_value = {}
self.transitions = [] # names of transitions linked to this state manager
# Make a copy of all states in the lenum within the state manager as a
# ManagedState. We do NOT convert grouped states into a ManagedStateGroup
# instance, as ManagedState is more efficient at testing whether a value is in
# a group: it uses the `in` operator while ManagedStateGroup does
# `any(s() for s in states)`.
for state_name, value in lenum.__names__.items():
self._add_state_internal(
state_name,
value,
# Grouped states are represented as sets and can't have labels, so be
# careful about those
label=lenum[value] if not isinstance(value, (list, set)) else None,
)
def __set_name__(self, owner, name):
self.owner = owner
self.name = name
def __repr__(self):
if self.owner is not None:
return '%s.%s' % (self.owner.__name__, self.name)
return '<StateManager %s>' % self.name
def __get__(
self, obj: Optional[T], cls: Optional[Type[T]] = None
) -> StateManagerWrapper[T]:
return StateManagerWrapper(self, obj, cls)
def __set__(self, obj, value):
raise AttributeError("States are read-only; use a transition")
# Since __get__ never returns self, the following methods will only be available
# within the owning class's namespace. It will not be possible to call them outside
# the class to add conditional states or transitions. If a use case arises,
# add wrapper methods to StateManagerWrapper.
def _set(self, obj, value):
"""Internal method to set state, called by meth:`StateTransition.__call__`"""
if value not in self.lenum:
raise ValueError("Not a valid value: %s" % value)
type(obj).__dict__[self.propname].__set__(obj, value)
def _add_state_internal(
self,
name,
value,
label=None,
validator=None,
class_validator=None,
cache_for=None,
):
# Also see `add_state_group` for similar code
if hasattr(self, name): # Don't clobber self with a state name
raise AttributeError(
"State name %s conflicts with existing attribute in the state manager"
% name
)
mstate = ManagedState(
name=name,
statemanager=self,
value=value,
label=label,
validator=validator,
class_validator=class_validator,
cache_for=cache_for,
)
# XXX: Since mstate.statemanager == self, the following assignments setup
# looping references and could cause a memory leak if the statemanager is ever
# deleted. We depend on it being permanent for the lifetime of the process in
# typical use (or for advanced memory management that can detect loops).
self.states[name] = mstate
if mstate.is_direct:
self.states_by_value[value] = mstate
if mstate.is_scalar:
self.all_states_by_value.setdefault(value, []).insert(0, mstate)
# Make the ManagedState available as `statemanager.STATE` (assuming original was
# uppercased)
setattr(self, name, mstate)
# Also make available as `statemanager.is_state`
setattr(self, 'is_' + name.lower(), mstate)
# Stub for mypy to recognise names added by _add_state_internal
def __getattr__(self, name: str) -> Union[ManagedState, ManagedStateGroup]:
raise AttributeError(name)
[docs] def add_state_group(self, name, *states):
"""
Add a group of managed states. Groups can be specified directly in the
:class:`~coaster.utils.classes.LabeledEnum`. This method is only useful
for grouping a conditional state with existing states. It cannot be
used to form a group of groups.
:param str name: Name of this group
:param states: :class:`ManagedState` instances to be grouped together
"""
# See `_add_state_internal` for explanation of the following
if hasattr(self, name):
raise AttributeError(
"State group name %s conflicts with existing "
"attribute in the state manager" % name
)
mstate = ManagedStateGroup(name, self, states)
self.states[name] = mstate
setattr(self, name, mstate)
setattr(self, 'is_' + name.lower(), mstate)
[docs] def add_conditional_state(
self, name, state, validator, class_validator=None, cache_for=None, label=None
):
"""
Add a conditional state that combines an existing state with a validator
that must also pass. The validator receives the object on which the property
is present as a parameter.
:param str name: Name of the new state
:param ManagedState state: Existing state that this is based on
:param validator: Function that will be called with the host object as a
parameter
:param class_validator: Function that will be called when the state is queried
on the class instead of the instance. Falls back to ``validator`` if not
specified. Receives the class as the parameter
:param cache_for: Integer or function that indicates how long ``validator``'s
result can be cached (not applicable to ``class_validator``). ``None``
implies no cache, ``0`` implies indefinite cache (until invalidated by a
transition) and any other integer is the number of seconds for which to
cache the assertion
:param label: Label for this state (string or 2-tuple)
TODO: `cache_for`'s implementation is currently pending a test case
demonstrating how it will be used.
"""
# We'll accept a ManagedState with grouped values, but not a ManagedStateGroup
if not isinstance(state, ManagedState):
raise TypeError("Not a managed state: %s" % repr(state))
if state.statemanager != self:
raise ValueError(
"State %s is not associated with this state manager" % repr(state)
)
if isinstance(label, tuple) and len(label) == 2:
label = NameTitle(*label)
self._add_state_internal(
name,
state.value,
label=label,
validator=validator,
class_validator=class_validator,
cache_for=cache_for,
)
[docs] def transition(self, from_, to, if_=None, **data):
"""
Decorates a method to transition from one state to another. The
decorated method can accept any necessary parameters and perform
additional processing, or raise an exception to abort the transition.
If it returns without an error, the state value is updated
automatically. Transitions may also abort without raising an exception
using :exc:`AbortTransition`.
:param from_: Required state to allow this transition (can be a state group)
:param to: The state of the object after this transition (automatically set if
no exception is raised)
:param if_: Validator(s) that, given the object, must all return True for the
transition to proceed
:param data: Additional metadata, stored on the `StateTransition` object as a
:attr:`data` attribute
"""
def decorator(f):
if isinstance(f, StateTransition):
f.add_transition(self, from_, to, if_, data)
st = f
else:
st = StateTransition(f, self, from_, to, if_, data)
self.transitions.append(st.name)
return st
return decorator
[docs] def requires(self, from_, if_=None, **data):
"""
Decorates a method that may be called if the given state is currently active.
Registers a transition internally, but does not change the state.
:param from_: Required state to allow this call (can be a state group)
:param if_: Validator(s) that, given the object, must all return True for the
call to proceed
:param data: Additional metadata, stored on the `StateTransition` object as a
:attr:`data` attribute
"""
return self.transition(from_, None, if_, **data)
def _value(self, obj, cls=None):
"""The state value (called from the wrapper)"""
if obj is not None:
return getattr(obj, self.propname)
return getattr(cls, self.propname)
[docs] @staticmethod
def check_constraint(column, lenum, **kwargs):
"""
Returns a SQL CHECK constraint string given a column name and a
:class:`~coaster.utils.classes.LabeledEnum`.
Alembic may not detect the CHECK constraint when autogenerating
migrations, so you may need to do this manually using the Python
console to extract the SQL string::
from coaster.sqlalchemy import StateManager
from your_app.models import YOUR_ENUM
print str(StateManager.check_constraint('your_column', YOUR_ENUM).sqltext)
:param str column: Column name
:param LabeledEnum lenum: :class:`~coaster.utils.classes.LabeledEnum` to
retrieve valid values from
:param kwargs: Additional options passed to CheckConstraint
"""
return CheckConstraint(
str(
column_constructor(column)
.in_(lenum.keys())
.compile(compile_kwargs={'literal_binds': True})
),
**kwargs,
)
[docs]class StateManagerWrapper(Generic[T]):
"""
Wraps :class:`StateManager` with the context of the containing object.
Automatically constructed when a :class:`StateManager` is accessed from
either a class or an instance.
"""
def __init__(self, statemanager, obj: Optional[T], cls: Optional[Type[T]]):
self.statemanager = statemanager # StateManager
# Instance we're being called on, None if called on the class instead
self.obj = obj
# The class of the instance we're being called on
self.cls = cls
def __repr__(self):
return '<StateManagerWrapper(%s.%s)>' % (
type(self.obj).__name__,
self.statemanager.name,
)
@property
def value(self):
"""The current state value."""
return self.statemanager._value(self.obj, self.cls)
@property
def label(self):
"""Label for the current state's value (using :meth:`bestmatch`)."""
return self.bestmatch().label
[docs] def bestmatch(self):
"""
Best matching current scalar state (direct or conditional), only
applicable when accessed via an instance.
"""
if self.obj is not None:
for mstate in self.statemanager.all_states_by_value[self.value]:
msw = mstate(self.obj, self.cls) # This returns a wrapper
if msw: # If the wrapper evaluates to True, it's our best match
return msw
[docs] def current(self):
"""
All states and state groups that are currently active.
"""
if self.obj is not None:
return {
name: mstate(self.obj, self.cls)
for name, mstate in self.statemanager.states.items()
if mstate(self.obj, self.cls)
}
[docs] def transitions(self, current=True):
"""
Returns available transitions for the current state, as a dictionary of
name: :class:`StateTransitionWrapper`.
:param bool current: Limit to transitions available in ``obj.``
:meth:`~coaster.sqlalchemy.mixins.RoleMixin.current_access`
"""
if current and isinstance(self.obj, RoleMixin):
proxy = self.obj.current_access()
else:
proxy = {}
current = False # In case the host object is not a RoleMixin
return {
name: transition
for name, transition in
# Retrieve transitions from the host object to activate the descriptor.
((name, getattr(self.obj, name)) for name in self.statemanager.transitions)
if transition.is_available and (name in proxy if current else True)
}
[docs] def transitions_for(self, roles=None, actor=None, anchors=()):
"""
For use on :class:`~coaster.sqlalchemy.mixins.RoleMixin` classes:
returns currently available transitions for the specified
roles or actor as a dictionary of name: :class:`StateTransitionWrapper`.
"""
proxy = self.obj.access_for(roles=roles, actor=actor, anchors=anchors)
return {
name: transition
for name, transition in self.transitions(current=False).items()
if name in proxy
}
[docs] def group(self, items, keep_empty=False):
"""
Given an iterable of instances, groups them by state using :class:`ManagedState`
instances as dictionary keys. Returns a dict that preserves the order of states
from the source :class:`~coaster.utils.classes.LabeledEnum`.
:param bool keep_empty: If ``True``, empty states are included in the result
"""
cls = (
self.cls if self.cls is not None else type(self.obj)
) # Class of the item being managed
groups = {}
for mstate in self.statemanager.states_by_value.values():
# Ensure we sort groups using the order of states in the source LabeledEnum.
# We'll discard the unused states later.
groups[mstate] = []
# Now process the items by state
for item in items:
# Use isinstance instead of `type(item) != cls` to account for subclasses
if not isinstance(item, cls):
raise TypeError(
"Item %s is not an instance of type %s"
% (repr(item), repr(self.cls))
)
statevalue = self.statemanager._value(item)
mstate = self.statemanager.states_by_value[statevalue]
groups[mstate].append(item)
if not keep_empty:
for key, value in list(groups.items()):
if not value:
del groups[key]
return groups
def __getattr__(self, name):
"""
Given the name of a state, returns:
1. If called on an instance, a ManagedStateWrapper, which implements __bool__
2. If called on a class, a query filter
Returns the default value or raises :exc:`AttributeError` on anything else.
"""
if hasattr(self.statemanager, name):
mstate = getattr(self.statemanager, name)
if isinstance(mstate, (ManagedState, ManagedStateGroup)):
return mstate(self.obj, self.cls)
raise AttributeError("Not a state: %s" % name)