Source code for symplehfsm
# -*- coding: utf-8 -*-
"""
This module provides a simple but powerful way to define testable, hierarchical finite state machines. You should
know how a state machine works. This implementation provides following features:
* guard action
* entry and exit actions
* transition action
* external and local transitions, see: https://en.wikipedia.org/wiki/UML_state_machine#Local_versus_external_transitions
* hierarchically nested states, see: https://en.wikipedia.org/wiki/UML_state_machine#Hierarchically_nested_states
* testable
* clear defined events interface
* clear defined actions interface
* execution speed optimization (transitions are converted to lookups in a dictionary, even for a hierarchical structure)
* memory efficient because state machine structure can be shared by all instances
References:
#. https://en.wikipedia.org/wiki/State_pattern.
#. https://en.wikipedia.org/wiki/Finite-state_machine.
#. https://en.wikipedia.org/wiki/UML_state_machine.
.. todo:: tutorial (explaining the abilities, entry/exit/actions/guard, different kinds of transitions)
.. todo:: set log level through event handling method?? so one could debug one instance of a statemachine at once.
.. todo:: python debug mode should log what it does but respect the debug level (?)
Versioning scheme based on: http://en.wikipedia.org/wiki/Versioning#Designating_development_stage
::
+-- api change, probably incompatible with older versions
| +-- enhancements but no api change
| |
major.minor[.build[.revision]]
|
+-|* 0 for alpha (status)
|* 1 for beta (status)
|* 2 for release candidate
|* 3 for (public) release
.. versionchanged:: 2.0.2.0
introduced __versionnumber__ because __version__ should be a string according to PEP8.
__versionnumber__ is a tuple containing int as used in the versioning scheme. This
way its comparable out of the box, e.g. __versionnumber__ >= (2, 2, 3)
"""
__versionnumber__ = (2, 0, 2, 0)
__version__ = ".".join(map(str, __versionnumber__))
__author__ = "dr0iddr0id {at} gmail [dot] com (C) 2010-2012"
import unittest
import logging
import collections
if __file__:
LOG_FILENAME = __file__ + '.log'
else:
LOG_FILENAME = __module__ + '.log'
logger = logging.getLogger("symplehfsm") # pylint: disable=C0103
# loglevel = logging.DEBUG # pylint: disable=C0103
loglevel = logging.INFO # pylint: disable=C0103
logger.setLevel(loglevel)
if __debug__:
_ch = logging.StreamHandler() # pylint: disable=C0103
_ch.setLevel(logging.DEBUG)
logger.addHandler(_ch)
handler = logging.FileHandler(LOG_FILENAME) # pylint: disable=C0103
handler.setLevel(loglevel)
logger.addHandler(handler)
# -----------------------------------------------------------------------------
[docs]class StateUnknownError(Exception):
"""
Exception raised if the state is not known in the structure.
"""
pass
# -----------------------------------------------------------------------------
[docs]class BaseState(object):
"""
BaseState from which all hirarchical states should inherit.
:Note: The state itself is 'stateless'.
:Parameters:
name : string
name of this state
parent : BaseState
Reference to the parent state, for the root state use None
(only one state has None as parent since there is only
one root)
"""
[docs] class InitialStateAlreadySetError(Exception):
"""Exception is raised if initial state is already set."""
pass
[docs] class InitialNotSetError(Exception):
"""Exception raised if the initial state is not set."""
pass
[docs] class InitialNotReplacedError(Exception):
"""Exception raised if the initial state is not replaced."""
pass
[docs] class ParentAlreadySetError(Exception):
"""Exception raised when a child has already a parent set"""
pass
[docs] class ReplacementStateIsNotChildError(Exception):
"""Exception raised if the replaced initial state is not a child."""
pass
[docs] class WrongParentError(Exception):
"""
Exception raised if the set parent is not the same state
containint it as child
"""
pass
def __init__(self, name=None, parent=None):
self.initial = None
self.children = []
self.optimized = collections.defaultdict(lambda: (None, [], None))
self.entry = None
self.exit = None
self.events = {} # {event:trans}
self.parent = None
if parent:
parent.add(self)
self.name = str(id(self))
if name:
self.name = name
[docs] def add(self, child, initial=False):
"""
Adds another state as child to this state.
:Parameters:
child : BaseState
the child state to add
initial : bool
defaults to False, if set, the child state is the
initial state.
:raises: ParentAlreadySetError if the childs parent is already set.
:raises: InitialStateAlreadySetError if another initial state has already been defined.
"""
if child.parent is not None:
raise self.ParentAlreadySetError(
"child state '{0}' has already a parent {1} when trying to add it to {2}".format(child, child.parent, self))
if initial:
if self.initial is not None:
raise self.InitialStateAlreadySetError(\
str.format("initial already set to {1} for state {0}", self, self.initial))
self.initial = child
child.parent = self
self.children.append(child)
return child
[docs] def remove(self, child, replace=None):
"""
Removes a child state. If the removed child state was the initial
state it has to be replaced.
:Parameters:
child : BaseState
child state to be removed.
replace : BaseState
the new initial state if the removed one was the initial state.
:raises: InitialNotReplacedError if the initial state is removed but no other inital state is defined.
:raises: ReplacementStateIsNotChildError if the initial replacement isn't a child of this state.
"""
if child in self.children:
if replace is None:
if self.initial == child:
raise self.InitialNotReplacedError("missing replacement since child {0} \
to be removed is initial state for {1}".format(self.initial, self))
else:
if not self.has_child(replace):
raise self.ReplacementStateIsNotChildError("replacement state {0} is not \
a child of this state {1}".format(replace, self))
self.initial = replace
child.parent = None
self.children.remove(child)
[docs] def has_child(self, child_state):
"""
Checks if a state has a certain state as a child.
:Parameters:
child_state : BaseState
child_state to check
:returns:
bool
"""
parent = child_state.parent
while parent:
if parent is self:
return True
parent = parent.parent
return False
[docs] def is_child(self, parent_state):
"""
Checks if this state is a child state of a parent state.
:Parameters:
parent_state : BaseState
the parent state to check if this is its child state.
:returns:
bool
"""
parent = self.parent
while parent:
if parent is parent_state:
return True
parent = parent.parent
return False
[docs] def check_consistency(self):
"""
Checks the consistency of the state hierarchy.
It checks mainly two things:
- if the initial state is set for each state having a child or
children, raises InitialNotSetError otherwise
- if each child of a state has the parent attribute set to that
state, raises WrongParentError otherwise
.. deprecated:: 1.0.3.0
Use :func:`Structure.check_consistency` instead.
:raises: InitialNotSetError if no initial state has been set when this state has children.
:raises: WrongParentError if a child has not the parent set where it is a child.
"""
if self.initial is None and len(self.children) > 0:
raise self.InitialNotSetError(\
"state {0}: initial has to be set if a state has at least one child".format(self))
for child in self.children:
if child.parent != self:
raise self.WrongParentError(\
"parent {0} of a child {1} is set to another state, should be {2}".format(child.parent, child, self))
child.check_consistency()
def __str__(self):
return str.format("<{0}[{1}]>", self.__class__.__name__, str(self.name))
__repr__ = __str__
# -----------------------------------------------------------------------------
[docs]class Transition(object):
"""
This class holds the data needed for a transition.
Represents the transition between (composite) states (just the arrow
in the state chart).
The transition itself is 'stateless'.
:Parameters:
target_state : State
The state this transition should change to.
action : methodcaller
This should be a methodcaller object or a function
behaving like a methodcaller. Such a function would
have following signature (return value is ignored)::
def f(actions)
A function behaving like a methodcaller looks like
this::
f = lambda actions: actions.any_method_of_actions()
:Note: only the function knows which function to call on the actions object.
guard : methodcaller
a methodaller of a function that behaves like a methodcaller
returning a boolean, its signature is::
guard(actions) -> bool
If True is returned, then the transition will be followed,
otherwise the transition will be blocked and event processing
stops (no parent states are considered).
"""
def __init__(self, target_state, action=None, guard=None, name=None):
self.guard = guard
self.target = target_state
self.action = action
self.name = str(id(self))
if name:
self.name = name
def __str__(self):
return "<{0}[1][guard: {2}, target: {3} action: {4}]>".format(
self.__class__.__name__, str(self.name), self.guard, self.target, self.action)
# -----------------------------------------------------------------------------
[docs]class Structure(object):
"""
This is the class holding the state machine structure, e.g. the number
of states and their relationship (hierarchy) and its transitions in between them.
Ths is also the code that is shared by many instances of the same statemachine.
:Parameters:
name : string
Optional name for this instance of this class.
"""
[docs] class RootAlreadySetOrParentMissingError(Exception):
"""
Exception raised when the parent is missing or the root has already
been set.
"""
pass
[docs] class ParentUnkownError(Exception):
"""Exception raised when the parent is unkown."""
pass
[docs] class EventAlreadyDefinedError(Exception):
"""Exception raised when the event is already defined for that state"""
pass
[docs] class StateIdentifierAlreadyUsed(Exception):
"""Exception raised when another state has the same state identifier."""
pass
def __init__(self, name=None):
self.states = {} # {id:State}
self.root = None
self.is_optimized = False
self.name = str(id(self))
if name:
self.name = name
def __str__(self):
return str.format("<{0}[{1}]>", self.__class__.__name__, str(self.name))
# # name, parent, initial, entry, exit
# sm_structure.add_state("s0", None, False, methodcaller("entry_s0"), context.methodcaller("exit_s0"))
[docs] def add_state(self, state_identifier, parent, initial, entry_action=None, exit_action=None):
"""
Add a new node representing a state to the structure.
:Parameters:
state_identifier : State identifier
A hashable identifier for that state (name, id, etc.). Has to be unique.
parent : State identifier
A hashable identifier of the state that is set as parent.
The only one state will have set its parent to None, its the root state.
initial : bool
Only one of the children of a state can have this set to true, its the
state that is used to descent to a leaf node of the structure.
entry_action : methodcaller
The methodcaller or a function behaving like a methodcaller. That calls
the entry function on the actions object for that state. Optional, defaults to: None
exit_action : methodcaller
The methodcaller or a function behaving like a methodcaller. That calls
the exit function on the actions object for that state. Optional, defaults to: None
:raises: ParentUnkownError if the parent state identifier is not already known.
:raises: RootAlreadySetOrParentMissingError if a second root node is added (maybe the parent is missing).
:raises: StateIdentifierAlreadyUsed if the chosen state identifier is already in use.
"""
if parent and not parent in self.states:
raise self.ParentUnkownError("parent of {0} is unkown".format(str(state_identifier)))
internal_state = BaseState(name=state_identifier)
internal_state.events = {} # {event:trans}
# internal_state.parent = parent
internal_state.entry = entry_action
internal_state.exit = exit_action
if parent:
self.states[parent].add(internal_state, initial)
else:
if self.root:
raise self.RootAlreadySetOrParentMissingError("root is already set to '{0}' \
or parent of '{1}' is missing".format(self.root, state_identifier))
self.root = state_identifier
if state_identifier in self.states:
raise self.StateIdentifierAlreadyUsed(str(state_identifier))
self.states[state_identifier] = internal_state
# # handler, event, target, action, guard
# sm_structure.add_trans("s1", "a", "s1", methodcaller("action_a"), methodcaller("guard_a"))
[docs] def add_trans(self, state, event, target, action=None, guard=None, name=None):
"""
Add a transition between two states for a certain event.
:Parameters:
state : State identifier
A hashable identifier for that state (name, id, etc.).
event : event identifiert
A hashable event identifier. The same identifiert has to be used
when calling handle_event on the state machine.
target : state identifier
The state this transition will lead too.
action : methodcaller
The transition action. Optional, default: None
guard : methodcaller
The guard method. Should return a boolean.
If the return value is True, then the transition is carried out. Otherwise the
event processing stops and nothing changes.
:raises: StateUnknownError if either the state- or the target-identifier is not known.
:raises: EventAlreadyDefinedError if this event is already defined for that state.
"""
if not state in self.states:
raise StateUnknownError("Unknown state: " + str(state))
if target is not None and not target in self.states:
raise StateUnknownError("target not set or unkown")
internal_state = self.states[state]
if event in internal_state.events:
raise self.EventAlreadyDefinedError("state '{0}' has event '{1}' already set".format(\
str(state), str(event)))
internal_state.events[event] = Transition(target, action, guard, str(name))
if __debug__:
logger.debug("added transition to event: {0} : {1}".format(event, str(internal_state.events[event])))
[docs] def do_optimize(self):
"""
Optimizes the event processing of the state machine. Call this method before you pass
the structure to the constructor to create a state machine instance.
.. note::
It is not recommended to alter the structure after a call to this method, althought now it will
just update the optimization.
.. versionchanged:: 2.0.2.0
Does not raise any exception anymore if called multiple times, it rebuilts internal structure used
for optimization.
"""
# collect all possible events
events = set()
[events.update(list(x.events.keys())) for x in list(self.states.values())]
if __debug__:
logger.info(str(self) + ': all events: ' + str(events))
# apply alle events to all leaf states to get optimization
leafs = [x for x in list(self.states.values()) if not x.children]
if __debug__:
logger.info(str(self) + ': all states: ' + str(leafs))
for leaf in leafs:
leaf.optimized.clear()
for event in events:
if __debug__:
logger.info("{0}: optimizing state '{1}' for event '{2}".format(self, str(leaf), str(event)))
guard, methodcalls, target_node = self._get_methodcallers(str(self) + " (optimizing)", event, leaf)
leaf.optimized[event] = (guard, methodcalls, target_node)
self.is_optimized = True
[docs] def check_consistency(self):
"""
Checks the consistency of the state hierarchy.
It checks mainly two things:
- if the initial state is set for each state having a child or
children, raises InitialNotSetError otherwise
- if each child of a state has the parent attribute set to that
state, raises WrongParentError otherwise
.. versionadded:: 2.0.2.0
"""
self.states[self.root].check_consistency()
def _get_methodcallers(self, state_machine, event, current_state):
"""
Computes what 'actions' a transition has to execute for a given state and event.
:Parameters:
state_machine : SympleDictHFSM
The state machine to use, its for log purposes only.
event : eventidentifier
The event identifier to know which event to execute.
current_state : state
The actual state instance to apply the event (the next state is defined by the
transitions for that event if defined).
:Returns:
tuple containing: (guard, methodcalls, next_state) where
guard is a methodcaller returning bool
methodcalls is a list of methodcallers of entry/exit/transition actions in the correct order
next_state is the the state the state machine has to be after executing that event
(note: its not a state identifier, its an instance of BaseState)
"""
methodcalls = []
guard = None
# find the event handling state in the hierarchy
if __debug__:
logger.debug(str.format("{0}: handling event '{1}' (current state: '{2}')", \
state_machine, event, current_state))
source_node = current_state
nodes = []
transition = None
while transition is None:
if event in source_node.events:
transition = source_node.events[event]
if transition is None:
nodes.append(source_node)
if source_node.parent:
source_node = source_node.parent
else:
break
if transition is None or transition is False:
if __debug__:
logger.debug(\
str.format("{0}: no event handling state nor transition found, no state change", state_machine))
# event not handled
return guard, methodcalls, current_state
# transition.guard is here because the exits of the nodes
# should only be run if the guard returns true
if __debug__:
logger.info(str.format("{0}: handling event '{1}' in state '{3}' (current state: '{2}')", \
state_machine, event, current_state, source_node))
if transition and not issubclass(transition.__class__, Transition):
raise TypeError("transition returned by a state is not a subclass of " + str(Transition))
if transition.guard:
guard = transition.guard
else:
if __debug__:
logger.debug(str.format("{0}: transition has no guard function", state_machine))
if __debug__:
logger.debug(str.format("{0}: executing transition", state_machine, transition))
# exits
for node in nodes:
if node.exit:
if __debug__:
logger.debug(str.format("{0}: calling exit on state: {1}", state_machine, node))
methodcalls.append(node.exit)
# transition
# go up the hirarchy as needed for the transition, find shared parent state
if __debug__:
logger.debug(str.format("{0}: finding parent states of transition...", state_machine))
target_node = source_node
if transition.target is not None:
target_node = self.states[transition.target]
if __debug__:
logger.debug(str.format("{0}: source '{1}', target '{2}'", state_machine, source_node, target_node))
if source_node != target_node:
if __debug__:
logger.debug(str.format("{0}: case source != target", state_machine))
while not target_node.is_child(source_node):
if source_node == target_node:
if __debug__:
logger.debug(str.format("{0}: found target == source", state_machine))
break # this break is needed
else:
if source_node.exit:
if __debug__:
logger.debug(str.format("{0}: calling exit on state: {1}", state_machine, source_node))
methodcalls.append(source_node.exit)
source_node = source_node.parent
else:
if source_node.exit:
if __debug__:
logger.debug(str.format("{0}: case source == target", state_machine))
logger.debug(str.format("{0}: calling exit on state: {1}", state_machine, source_node))
methodcalls.append(source_node.exit)
source_node = source_node.parent
if transition.action:
if __debug__:
logger.debug(str.format("{0}: calling action of transition: {1}", state_machine, transition))
methodcalls.append(transition.action)
if __debug__:
logger.debug("{0}: source '{1}', target '{2}', transition action done".format(\
state_machine, source_node, target_node))
# find child node and go down the hirarchy until target_node is found
if __debug__:
logger.debug(str.format("{0}: finding child states for transition...", state_machine))
if transition.target is not None:
if source_node != target_node:
while source_node != target_node:
for child in source_node.children:
if child == target_node:
if target_node.entry:
if __debug__:
logger.debug(str.format("{0}: calling entry on: {1}", state_machine, target_node))
methodcalls.append(target_node.entry)
source_node = target_node
break
elif target_node.is_child(child):
if child.entry:
if __debug__:
logger.debug(str.format("{0}: calling entry on: {1}", state_machine, target_node))
methodcalls.append(child.entry)
source_node = child
break
else:
if __debug__:
logger.debug(str.format("{0}: find child node: source_node == target_node", state_machine))
# initial entries
if __debug__:
logger.debug(str.format("{0}: following initial states...", state_machine))
if target_node.initial:
target_node = target_node.initial
while target_node.initial:
if target_node.entry:
if __debug__:
logger.debug(str.format("{0}: calling entry on state: {1}", state_machine, target_node))
methodcalls.append(target_node.entry)
target_node = target_node.initial
# last node should be entered too
if target_node.entry:
if __debug__:
logger.debug(str.format("{0}: calling entry on state: {1}", state_machine, target_node))
methodcalls.append(target_node.entry)
if __debug__:
logger.info(str.format("{0}: changed state from {1} to {2} using transition {3}", \
state_machine, current_state, target_node, transition))
return guard, methodcalls, target_node
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
[docs]class SympleHFSM(object):
"""
.. todo:: should transition.action be able to return something to the caller?
.. todo:: should it be possible to pass in arguments for the transition action through the event handler method?
Base state machine logic. It implements the state transition logic.
:Parameters:
structure : Structure
The state machine structure of states and transitions
actions : Actions
The object implementing the actions interface to be used by the state machine.
name : string
Optional, default: None. This name will be used for logging and printing.
.. versionadded:: 2.0.2.0
"""
[docs] class ReentrantEventException(Exception):
"""
Exception raised if an event is already processing.
"""
pass
[docs] class NotInitializedException(Exception):
"""
Exception raised if it is attemped to process an event before
init has been called.
"""
pass
[docs] class InitAlreadyCalledError(Exception):
"""
Exception raised if init is calle more than once.
.. versionadded:: 2.0.2.0
Raised when init is called multiple times.
"""
pass
def __init__(self, structure, actions, name=None):
self.actions = actions
self._structure = structure
self._current_state = None
self._currently_handling_event = False
self.name = str(id(self))
if name:
self.name = name
self.handle_event = self._handle_event_not_inititalized
def __str__(self):
return str.format("<{0}[{1}]>", self.__class__.__name__, str(self.name))
def _get_current_state(self):
"""
Returns identifier of the current state.
:returns: the current state identifier of the state machine or None (only if not initialized).
.. versionchanged:: 2.0.2.0
Returns a state identifier or None instead of the state instance.
"""
return self._current_state.name if self._current_state else self._current_state
current_state = property(_get_current_state, doc="""Current state identifier\
or None if the state machine is not initialized""")
[docs] def set_state(self, state_identifier):
"""
Set the state directly as the current state without calling
any entry or exit or any other events on any state. Don't use it unless you need to (like initializing).
Use with caution. Raises a 'ReentrantEventException' if it is currently processing an event. If the
state is not known, then a 'StateUnkownError' is raised.
:Note: No actions are called! e.g.: exit, entry, transition action are not called, use init() instead!
:Parameters:
state_identifier : state
State to which current state will point afterwards.
..versionchanged:: 2.0.2.0
Define the state to be set by its identifier instead of the state instance.
:raises: StateUnknownError if there is no state defined for given state_identifier.
:raises: ReentrantEventException if this method is called during a event is handled.
"""
if state_identifier not in list(self._structure.states.keys()):
raise StateUnknownError(str(state_identifier))
if self._currently_handling_event:
raise self.ReentrantEventException("multi threading or calling set_state from within an actions \
during event handling is not supported")
self._current_state = self._structure.states[state_identifier]
[docs] def init(self, use_optimization=True):
"""
Initialize the state machine. It descents along the 'initial' attribute of the states and sets the
current_state accordingly.
:Parameters:
use_optimization : boolean
Default: True. If set to False the event handling method will always compute the entire path
through the structure. Otherwise if set to True and the structure has been optmized, then the
cached transition information is used.
:Raises:
InitAlreadyCalledError if calle more than once.
"""
if self.handle_event != self._handle_event_not_inititalized:
raise self.InitAlreadyCalledError(str(self))
node = self._structure.states[self._structure.root]
node.check_consistency()
while True:
if __debug__:
logger.debug(str.format("{0}: INIT, calling entry on {1}", self, node))
if node.entry:
node.entry(self.actions)
if not node.initial:
break
node = node.initial
if __debug__:
logger.debug(str.format("{0}: INIT done, current state: {1}", self, node))
self._current_state = node
# set up the right event handling method bo be used
if use_optimization and self._structure.is_optimized:
self.handle_event = self._handle_event_optimized
if __debug__:
logger.info("{0}: INIT using optmized structure".format(self))
else:
self.handle_event = self._handle_event_normal
if __debug__:
logger.info("{0}: INIT not using optmized structure".format(self))
[docs] def exit(self):
"""
Exits the state machine. Starting from the current_state it calls exit along the parent attribute on each
state until the root state is exited.
"""
node = self._structure.states[self.current_state] if self.current_state else None
while node:
if __debug__:
logger.debug(str.format("{0}: EXIT, calling exit on {1}", self, node))
if node.exit:
node.exit(self.actions)
node = node.parent
self._current_state = None
self.handle_event = self._handle_event_not_inititalized
def _handle_event_not_inititalized(self, *args):
"""
The event handling method that gets called when the state machine is not initialized yet.
:raises: NotInitializedException if init() has not been called before.
"""
if __debug__:
logger.debug("{0}: raising NotInitializedException".format(self))
raise self.NotInitializedException("Call init befor any event processing!")
def _handle_event_optimized(self, event):
"""
The event handling method used when the structure is optimized.
:raises: ReentrantEventException if this method is called while an event is handled.
.. todo:: how to remove those checks? use queue to make those check unneeded??
"""
if self._currently_handling_event:
logger.info("{0}: raising ReentrantEventException".format(self))
raise self.ReentrantEventException("multi threading or calling a event function from within an actions \
during event handling is not supported")
self._currently_handling_event = True
if __debug__:
logger.debug(str.format("{0}: _currently_handling_event 'True' {1}", self, event))
guard, methodcalls, target_node = self._current_state.optimized[event]
if __debug__:
logger.debug(str.format("{0}: get_methodcallers returnded: {0} {1} {2}", \
guard, methodcalls, target_node, self))
if guard:
if guard(self.actions) is False:
if __debug__:
logger.info(str.format("{0}: guard of transition returned 'False', not changing state", self))
self._currently_handling_event = False
return
if __debug__:
logger.info(str.format("{0}: guard of transition returned 'True'", self))
for methodcaller in methodcalls:
if __debug__:
logger.info(str.format("{0}: calling methodcaller: {1}".format(self, methodcaller)))
methodcaller(self.actions)
# set the new current state
if target_node:
if __debug__:
logger.info(str.format("{0}: setting new state {1}".format(self, target_node)))
self._current_state = target_node
self._currently_handling_event = False
if __debug__:
logger.debug(str.format("{0}: _currently_handling_event 'False'", self))
def _handle_event_normal(self, event):
"""
The event handling method if the structure is not optimized, computes the way through the hierarchy.
Handles the event and does a state change if needed. Raises a 'ReentrantEventException' if it is currently
processing an event.
:Parameters:
event_func : operator.methodcaller
A methodcaller instance pointed to the function that should be called on the state.
For example if the method 'a' should be called on each state, then this should be
'event_func = operator.methodcaller('a', context)'
context : context
the context of the state machine, where certain methods and data is
accesible (like the actions interface).
:raises: ReentrantEventException if this method is called while an event is handled.
"""
if self._currently_handling_event:
if __debug__:
logger.info("{0}: raising ReentrantEventException".format(self))
raise self.ReentrantEventException("multi threading or calling a event function from within an actions\
during event handling is not supported")
self._currently_handling_event = True
if __debug__:
logger.debug(str.format("{0}: _currently_handling_event 'True' {1}", self, event))
guard, methodcalls, target_node = self._structure._get_methodcallers(self, event, self._current_state)
if __debug__:
logger.info(str.format("{3}; get_methodcallers returned: {0} {1} {2}", \
guard, methodcalls, target_node, self))
if guard:
if guard(self.actions) is False:
if __debug__:
logger.info(str.format("{0}: guard of transition returned 'False', not changing state", self))
self._currently_handling_event = False
logger.info(str.format("{0}: _currently_handling_event 'False' with guard", self))
return
if __debug__:
logger.info(str.format("{0}: guard of transition returned 'True'", self))
for methodcaller in methodcalls:
if __debug__:
logger.debug(str.format("{0}: calling methodcaller: {1}".format(self, methodcaller)))
methodcaller(self.actions)
# set the new current state
logger.info(str.format("{0}: setting new state {1}".format(self, target_node)))
self._current_state = target_node
if __debug__:
logger.debug(str.format("{0}: _currently_handling_event 'False'", self))
self._currently_handling_event = False
[docs] def handle_event(self, event):
"""
Handles the event and does a state change if needed. Raises a 'ReentrantEventException' if it is
currently processing an event.
:Parameters:
event_func : operator.methodcaller
A methodcaller instance pointed to the function that should be called on the state.
For example if the method 'a' should be called on each state, then this should be
'event_func = operator.methodcaller('a', context)'
context : context
the context of the state machine, where certain methods and data is
accesible (like the actions interface).
"""
# its here for the documentation, its set in the code directly, used as a function pointer
pass
# -----------------------------------------------------------------------------
[docs]class SympleDictHFSM(object):
"""
.. deprecated:: 1.0.3.0 (use :class:`SympleHFSM` instead!)
"""
pass
# just not to break existing code, will be removed in next version
# SympleDictHFSM = SympleHFSM
# -----------------------------------------------------------------------------
[docs]class BaseHFSMTests(unittest.TestCase):
"""
Base TestCase that already defines test code for testing state machines
build using an events and action interface
(see: http://accu.org/index.php/journals/1548)
"""
[docs] class RecordingActions(object):
"""
This is a class that records the names of the functions called on it.
Instead of writing a TestActions class, that records which action was
activated, this class can be used. Just use the method names of the
Action interface to compare the actually called method with the
expected method in the tests.
:Instancevariable:
captured_actions : list
List of captured method names that where called.
args : list
List of tuples '(args, kwargs)' in the order the action methods
where called.
For each action method call there is a tuple inserted.
If no arguments are passed then a empty tuple is
inserted, e.g. '( ( ,), ( ,) )'
"""
def __init__(self):
self.captured_actions = []
self._name = None # for internal use
self.args = []
def __getattr__(self, name):
self._name = name
return self._nop
def _nop(self, *args, **kwargs):
"""
This is the method that actually gets called instead of
the real actions method. It will record the call.
"""
self.args.append((args, kwargs))
self.captured_actions.append(self._name)
[docs] class TestVector(object):
"""
A TestVector is basically the data container needed to test one
transition.
:Parameters:
title : string
Description of this TestVector
starting_state : State
the state from which this transition starts
event_func : Func
the function handling the event
expected_state : State
the state that should be the current_state after
the transition
expected_actions : list
list of expected actions to be compared with the
captured actions
"""
def __init__(self,
title,
starting_state,
event_func,
expected_state,
expected_actions):
self.title = title
self.starting_state = starting_state
self.event_func = event_func
self.expected_state = expected_state
self.expected_actions = expected_actions
[docs] def prove_one_transition(self,
state_machine,
resulting_actions,
test_vector):
"""
Test one transition.
:Parameters:
state_machine : StateMachine
the instance of the state machine to use
resulting_actions : Actions
instance of the class implementing the Actions that
captures the actions
needs to have an attribute 'captured_actions' which is a list
of the captured actions
test_vector : TestVector
the TestVector to test
"""
state_machine.set_state(test_vector.starting_state)
# clear the results of changing to the starting state
resulting_actions.captured_actions = []
test_vector.event_func()
if len(test_vector.expected_actions) != \
len(resulting_actions.captured_actions):
self.fail("Not same number of expected and captured actions!\
\n expected: {0} \n \
captured: {1}".format( \
", ".join(test_vector.expected_actions), \
", ".join(resulting_actions.captured_actions)))
for idx, expected_action in enumerate(test_vector.expected_actions):
action = resulting_actions.captured_actions[idx]
if action != expected_action:
self.fail(str.format("captured action does not match with \
expected action! \n expected: {0} \n captured: {1}", \
", ".join(test_vector.expected_actions), \
", ".join(resulting_actions.captured_actions)))
msg = "state machine not in expected state after transition, current: \
{0} expected: {1}".format(\
state_machine.current_state, test_vector.expected_state)
self.assertTrue(test_vector.expected_state == \
state_machine.current_state, msg)
msg = "state machine ! in expected state after transition, current: \
{0} expected: {1}".format(\
state_machine.current_state, test_vector.expected_state)
self.assertTrue(test_vector.expected_state is \
state_machine.current_state, msg)
[docs] def prove_transition_sequence(self,
title,
starting_state,
event_funcs,
expected_state,
expected_actions,
state_machine,
resulting_actions):
"""
Test a sequence of transitions by passing in a sequence of event and checking the actions.
:Parameters:
title : string
Description of this test
starting_state : State
the state from which this transition starts
event_funcs : Func
list of event functions to call
expected_state : State
the state that should be the current_state after the transition
expected_actions : list
list of expected actions to be compared with the captured actions
state_machine : SympleHFSM
the statemachine to test, an instance of SympleHFSM (or inheritet class)
resulting_actions : Actions
the actions used for the statemachine and for this test, has to have an attribute 'captured_actions'
"""
state_machine.set_state(starting_state)
# clear the results of changing to the starting state
resulting_actions.captured_actions = []
for event_func in event_funcs:
event_func()
if len(expected_actions) != len(resulting_actions.captured_actions):
self.fail(str.format("Not same number of expected and captured actions! \n \
expected: {0} \n \
captured: {1}", \
", ".join(expected_actions), \
", ".join(resulting_actions.captured_actions)))
for idx, expected_action in enumerate(expected_actions):
action = resulting_actions.captured_actions[idx]
if action != expected_action:
self.fail(str.format("captured action does not match with expected action! \n \
expected: {0} \n \
captured: {1}", \
", ".join(expected_actions), \
", ".join(resulting_actions.captured_actions)))
msg = "state machine not in expected state after transition, current: {0} expected: {1}".format(\
state_machine.current_state, expected_state)
self.assertTrue(expected_state == state_machine.current_state, msg)
msg = "state machine ! in expected state after transition, current: {0} expected: {1}".format(\
state_machine.current_state, expected_state)
self.assertTrue(expected_state is state_machine.current_state, msg)
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------