#!/usr/bin/python
# -*- coding: utf-8 -*-
u"""
TODO: modul doc string
"""
__author__ = u"dr0iddr0id {at} gmail [dot] com (C) 2010"
import operator
import symplehfsm
from symplehfsm import BaseState
from symplehfsm import Transition
from symplehfsm import BaseHFSMTests
from symplehfsm import Structure
from symplehfsm import SympleHFSM
import unittest
# -----------------------------------------------------------------------------
# TODO: loop action back to event -> queue?
# -----------------------------------------------------------------------------
# making a statemachine testable based on: http://accu.org/index.php/journals/1548
# -----------------------------------------------------------------------------
#
# Statechart used to test the SympleHFSM
# based on
# [Samek] Miro Samek, Practical Statecharts in C/C++, CMP Books 2002.
# There's a companion website with additional information: http://www.quantum-leaps.com
# taken from: http://accu.org/index.php/journals/252
#
# see also: http://en.wikipedia.org/wiki/UML_state_machine#Local_versus_external_transitions
# making a statemachine testable based on: http://accu.org/index.php/journals/1548
# -----------------------------------------------------------------------------
[docs]class BaseStateTests(unittest.TestCase):
[docs] def setUp(self):
self.state = BaseState()
[docs] def test_name_is_set(self):
name = "XXX"
state = BaseState(name=name)
self.assertTrue(name in str(state), "name not set")
[docs] def test_parent_set_through_constructor(self):
self.state.name = "parend"
child = BaseState("child", self.state)
self.assertTrue(child.parent == self.state, "wrong parent set to child")
self.assertTrue(child in self.state.children, "child not registered in parent")
[docs] def test_self_is_not_child(self):
self.assertTrue(self.state.is_child(self.state) == False, "the state itself should not be a child")
[docs] def test_direct_child(self):
child = BaseState()
self.state.add(child)
self.assertTrue(child.is_child(self.state), "child should be a child")
[docs] def test_multiple_children(self):
child1 = BaseState()
child2 = BaseState()
child3 = BaseState()
self.state.add(child1)
child1.add(child2)
child2.add(child3)
self.assertTrue(child1.is_child(self.state), "child1 should be a child")
self.assertTrue(child2.is_child(self.state), "child2 should be a child")
self.assertTrue(child3.is_child(self.state), "child3 should be a child")
self.assertTrue(child2.is_child(child1), "child2 should be a child")
self.assertTrue(child3.is_child(child1), "child3 should be a child")
self.assertTrue(child3.is_child(child2), "child3 should be a child")
[docs] def test_representation_is_string(self):
repr = str(self.state)
self.assertTrue(len(repr) > 0, "should not be empty")
self.assertTrue(isinstance(repr, type("")), "should be a string")
[docs] def test_parent_is_not_child(self):
child = BaseState()
self.assertTrue(self.state.is_child(child) == False, "parent should not be a child")
[docs] def test_self_hast_not_self_as_child(self):
self.assertTrue(self.state.has_child(self.state) == False, "the state itself should not be a child")
[docs] def test_has_direct_child(self):
child = BaseState()
self.state.add(child)
self.assertTrue(self.state.has_child(child), "child should be a child")
[docs] def test_has_multiple_children(self):
child1 = BaseState()
child2 = BaseState()
child3 = BaseState()
self.state.add(child1)
child1.add(child2)
child2.add(child3)
self.assertTrue(self.state.has_child(child1), "child1 should be a child")
self.assertTrue(self.state.has_child(child2), "child2 should be a child")
self.assertTrue(self.state.has_child(child3), "child3 should be a child")
self.assertTrue(child1.has_child(child2), "child2 should be a child")
self.assertTrue(child1.has_child(child3), "child3 should be a child")
self.assertTrue(child2.has_child(child3), "child3 should be a child")
[docs] def test_child_has_not_parent_as_child(self):
child = BaseState()
self.assertTrue(child.has_child(self.state) == False, "parent should not be a child")
[docs] def test_add_child(self):
child = BaseState()
self.state.add(child)
self.assertTrue(child.parent == self.state, "child has wrong parent")
self.assertTrue(self.state.children[0] == child, "parent state should have child")
[docs] def test_add_child_and_initial(self):
child = BaseState()
self.state.add(child, True)
self.assertTrue(child.parent == self.state, "child has wrong parent")
self.assertTrue(self.state.children[0] == child, "parent state should have child")
self.assertTrue(self.state.initial == child, "initial state not set")
[docs] def test_add_child_initial_only_once(self):
child = BaseState()
child2 = BaseState()
self.state.add(child, True)
try:
self.state.add(child2, True)
self.fail("should throw exception when trying to add another state as inital")
except BaseState.InitialStateAlreadySetError:
pass
[docs] def test_add_returns_child(self):
child = BaseState()
ret = self.state.add(child)
self.assertTrue(ret==child, "add should return added child")
[docs] def test_check_if_child_has_parent(self):
child = BaseState()
child.parent = 1
try:
self.state.add(child)
self.fail("should have thrown exception because child has a parent set already")
except BaseState.ParentAlreadySetError:
pass
[docs] def test_remove_child(self):
child = BaseState()
self.state.add(child)
self.state.remove(child)
self.assertTrue(child.parent is None, "parent of removed child still set")
try:
self.state.children.index(child)
self.fail("child should not be in children after remove")
except ValueError:
pass
except Exception, e:
self.fail(str(e))
[docs] def test_remove_child_set_as_initial(self):
child = BaseState()
child2 = BaseState()
self.state.add(child, True)
self.state.add(child2)
self.state.remove(child, child2)
self.assertTrue(child.parent is None, "parent of removed child still set")
self.assertTrue(self.state.initial == child2, "initial not reset to other child")
try:
self.state.children.index(child)
self.fail("child should not be in children after remove")
except ValueError:
pass
[docs] def test_remove_child_set_as_initial_exception(self):
child = BaseState()
self.state.add(child, True)
try:
self.state.remove(child)
self.fail("removing initial state without replacement should raise a Exception")
except BaseState.InitialNotReplacedError, e:
pass
[docs] def test_remove_child_set_as_initial_replacment_is_child(self):
child = BaseState()
child2 = BaseState()
self.state.add(child, True)
try:
self.state.remove(child, child2)
self.fail("should raise exception because replacement child is not a child of the state")
except BaseState.ReplacementStateIsNotChildError:
pass
[docs] def test_check_consistency_check_initial_not_set(self):
child = BaseState()
child2 = BaseState()
self.state.add(child)
self.state.add(child2)
try:
self.state.check_consistency()
self.fail("should throw initial missing exception")
except BaseState.InitialNotSetError:
pass
[docs] def test_check_consistency_check_initial(self):
child = BaseState()
child2 = BaseState()
self.state.add(child)
self.state.add(child2, True)
try:
self.state.check_consistency()
except Exception, e:
self.fail(str(e))
[docs] def test_check_consistency_check_initial_wrong_parent(self):
child = BaseState()
child2 = BaseState()
self.state.add(child)
self.state.add(child2, True)
child.parent = child2
try:
self.state.check_consistency()
self.fail("should raise WrongParentError")
except BaseState.WrongParentError, e:
pass
# -----------------------------------------------------------------------------
[docs]class SympleHFSMTransitionTests(unittest.TestCase):
[docs] def setUp(self):
# def __init__(self, target_state, action=None, guard=None, name=None):
self.target = "target"
self.action = "action"
self.guard = "guard"
self.name = "name"
self.transition = Transition(self.target, self.action, self.guard, self.name)
[docs] def test_attributes(self):
self.assertTrue(self.transition.target == self.target)
self.assertTrue(self.transition.action == self.action)
self.assertTrue(self.transition.guard == self.guard)
self.assertTrue(self.transition.name == self.name)
[docs] def test_representation_is_string(self):
repr = str(self.transition)
self.assertTrue(len(repr) > 0, "should not be empty")
self.assertTrue(isinstance(repr, type("")), "should be a string")
# -----------------------------------------------------------------------------
[docs]class StructureTests(unittest.TestCase):
"""
..todo: test _get_methodcalls() !!!!!
Tests the Structure class.
"""
[docs] def setUp(self):
self.name = "asdlfkjdflkjUIPOIU"
self.structure = Structure(self.name)
self.state1 = "ARBD"
self.state2 = 234234234
self.state3 = object()
[docs] def test_that_name_is_set(self):
self.assertTrue(self.structure.name == self.name, "name not set!")
[docs] def test_add_root(self):
# def add_state(self, state, parent, initial, entry_action=None, exit_action=None):
self.structure.add_state(self.state1, None, None)
self.assertTrue(self.structure.root == self.state1, "no or wrong root set")
self.assertTrue(len(self.structure.states) == 1, "only one state (root) should be present")
[docs] def test_adding_second_root_raises_exception(self):
self.structure.add_state(self.state1, None, None)
try:
self.structure.add_state(self.state2, None, None)
self.fail("should have raised a RootAlreadySetOrParentMissingError exception")
except Structure.RootAlreadySetOrParentMissingError, e:
pass
[docs] def test_adding_state_twice_raises_exception(self):
self.structure.add_state(self.state1, None, None)
self.structure.add_state(self.state3, self.state1, None)
try:
self.structure.add_state(self.state3, self.state1, None)
self.fail("should have raised StateIdentifierAlreadyUsed exception")
except Structure.StateIdentifierAlreadyUsed, e:
pass
[docs] def test_add_state(self):
self.structure.add_state(self.state1, None, None)
self.structure.add_state(self.state2, self.state1, None)
parent = self.structure.states[self.state1]
child = self.structure.states[self.state2]
self.assertTrue(parent.has_child(child), "child not added")
self.assertTrue(child.parent == parent, "wrong parent set to child")
[docs] def test_add_state_parent_unkown(self):
try:
self.structure.add_state(self.state1, "unkown parent identifier", self.state2)
self.fail("should have raised 'ParentUnkownError'")
except self.structure.ParentUnkownError:
pass
[docs] def test_add_transition_to_unkown_state(self):
try:
# def add_trans(self, state, event, target, action=None, guard=None, name=None):
self.structure.add_trans("aaaa", "event1", self.state2)
self.fail("should have raised StateUnknownError")
except symplehfsm.StateUnknownError, e:
pass
[docs] def test_add_transition_unkown_target(self):
self.structure.add_state(self.state1, None, None)
try:
self.structure.add_trans(self.state1, "event", "unkown_state_identifier")
self.fail("should have raised StateUnknownError")
except symplehfsm.StateUnknownError, e:
pass
[docs] def test_add_transition_target_not_set(self):
self.structure.add_state(self.state1, None, None)
try:
self.structure.add_trans(self.state1, "event", None)
except symplehfsm.StateUnknownError, e:
self.fail("should have raised StateUnknownError")
[docs] def test_representation_is_string(self):
repr = str(self.structure)
self.assertTrue(len(repr) > 0, "should not be empty")
self.assertTrue(isinstance(repr, type("")), "should be a string")
[docs] def test_adding_transition_same_event_twice_on_same_state_raises_error(self):
self.structure.add_state(self.state1, None, None)
event_id = 12341234
self.structure.add_trans(self.state1, event_id, None)
try:
self.structure.add_trans(self.state1, event_id, None)
self.fail("should have raised EventAlreadyDefinedError")
except Structure.EventAlreadyDefinedError, e:
pass
[docs] def test_adding_transitions(self):
event_id = 12341234
event_id1 = 123412334
event_id2 = 1234
self.structure.add_state(self.state1, None, None)
self.structure.add_state(self.state2, self.state1, None)
self.structure.add_state(self.state3, self.state2, None)
self.structure.add_trans(self.state1, event_id, None)
self.structure.add_trans(self.state1, event_id1, None)
self.structure.add_trans(self.state1, event_id2, None)
self.structure.add_trans(self.state2, event_id, None)
self.structure.add_trans(self.state3, event_id, None)
[docs] def test_check_consistency_check_initial_not_set(self):
self.structure.add_state(self.state1, None, None) # root
self.structure.add_state(self.state2, self.state1, None) # child 1
self.structure.add_state(self.state3, self.state1, None) # child 2
try:
self.structure.check_consistency()
self.fail("should throw initial missing exception")
except BaseState.InitialNotSetError:
pass
[docs] def test_check_consistency_check_initial(self):
self.structure.add_state(self.state1, None, None) # root
self.structure.add_state(self.state2, self.state1, None) # child 1
self.structure.add_state(self.state3, self.state1, True) # child 2
# child = BaseState()
# child2 = BaseState()
# self.state.add(child)
# self.state.add(child2, True)
try:
self.structure.check_consistency()
except Exception, e:
self.fail(str(e))
[docs] def test_check_consistency_check_initial_wrong_parent(self):
self.structure.add_state(self.state1, None, None) # root
self.structure.add_state(self.state2, self.state1, True) # child 1
self.structure.add_state(self.state3, self.state1, None) # child 2
self.structure.states[self.state3].parent = self.state2
# child = BaseState()
# child2 = BaseState()
# self.state.add(child)
# self.state.add(child2, True)
# child.parent = child2
try:
self.structure.check_consistency()
self.fail("should raise WrongParentError")
except BaseState.WrongParentError, e:
pass
# -----------------------------------------------------------------------------
[docs]class StructureOptimizationTests(unittest.TestCase):
[docs] def setUp(self):
self.structure = Structure("optimized structure")
self.structure.add_state("root", None, None)
self.structure.add_state("level1-1", "root", True)
self.structure.add_state("level1-2", "root", None)
self.structure.add_state("level1-3", "root", None)
self.structure.add_state("level2-1-1", "level1-1", True)
self.structure.add_state("level2-1-2", "level1-1", None)
self.structure.add_state("level2-1-3", "level1-1", False)
self.structure.add_state("level2-2-1", "level1-2", True)
self.structure.add_state("level2-2-2", "level1-2", False)
self.structure.add_state("level2-2-3", "level1-2", False)
self.structure.add_state("level2-3-1", "level1-2", False)
self.structure.add_state("level2-3-2", "level1-2", False)
self.structure.add_state("level2-3-3", "level1-2", False)
self.structure.add_state("level3-1-1-1", "level2-1-1", False)
self.structure.add_state("level3-1-1-2", "level2-1-1", None)
self.structure.add_state("level3-1-1-3", "level2-1-1", False)
self.structure.add_state("level3-1-2-1", "level2-1-2", True)
self.structure.add_state("level3-1-2-2", "level2-1-2", None)
self.structure.add_state("level3-1-2-3", "level2-1-2", False)
self.structure.add_state("level3-2-1-1", "level2-2-1", False)
self.structure.add_state("level3-2-1-2", "level2-2-1", None)
self.structure.add_state("level3-2-1-3", "level2-2-1", False)
self.structure.add_state("level3-3-2-1", "level2-3-2", False)
self.structure.add_state("level3-3-2-2", "level2-3-2", None)
self.structure.add_state("level3-3-2-3", "level2-3-2", False)
[docs] def test_do_optimization_twice(self):
self.structure.do_optimize()
self.assertTrue(self.structure.is_optimized, "structure should indicate that it is optimized")
self.structure._get_methodcallers = self._get_methodcallers_testable
self.structure.do_optimize()
def _get_methodcallers_testable(self, *args, **kwargs):
self.fail("should not call '_get_methodcallers' on second 'do_optimization'")
[docs] def test_do_optimization_states(self):
try:
self.structure.do_optimize()
except Exception, e:
self.fail(str(e))
[docs] def test_do_optimization_with_transitions(self):
# handling state, event, next state, action, guard
self.structure.add_trans("level1-1", "a", "level2-2-1")
self.structure.add_trans("level1-2", "b", "level2-3-1")
self.structure.add_trans("level2-1-1", "c", "level3-3-2-1")
try:
self.structure.do_optimize()
except Exception, e:
self.fail(str(e))
# -----------------------------------------------------------------------------
import operator
from operator import methodcaller
[docs]class SympleHFSMTests(unittest.TestCase):
[docs] def setUp(self):
self.state_root_name = "root"
self.sub_state1 = "sub1"
self.sub_state2 = "sub2"
self.root_entry_name = "root_entry"
self.init_state_entry_name = "init_state_entry"
self.root_exit_name = "root_exit"
self.state_exit_name = "state_exit"
self.structure = Structure("SympleHFSMTests.structure")
self.structure.add_state(self.state_root_name, None, None, methodcaller(self.root_entry_name), methodcaller(self.root_exit_name))
self.structure.add_state(self.sub_state1, self.state_root_name, False)
self.structure.add_state(self.sub_state2, self.state_root_name, True, methodcaller(self.init_state_entry_name), methodcaller(self.state_exit_name))
self.event_id_loop_event = "this is the loop event trigger"
# def add_trans(self, state, event, target, action=None, guard=None, name=None):
self.structure.add_trans(self.sub_state2, self.event_id_loop_event, self.sub_state1, self._action_calling_event)
# def add_trans(self, state, event, target, action=None, guard=None, name=None):
self.event_id_guard_true = "guard true"
self.structure.add_trans(self.sub_state2, self.event_id_guard_true, self.sub_state1, self._record_action, self._guard_true)
self.event_id_guard_false = "guard false"
self.structure.add_trans(self.sub_state2, self.event_id_guard_false, self.sub_state1, self._record_action, self._guard_false)
self.actions = BaseHFSMTests.RecordingActions()
self.machine = SympleHFSM(self.structure, self.actions, "SympleHFSMTests.machine")
[docs] def test_repr(self):
msg = "" + str(self.machine)
self.assertTrue(len(msg) > 0, "no name set")
[docs] def test_set_state_unkown_state(self):
try:
self.machine.set_state("222")
self.fail("should have raised StateUnknownError")
except symplehfsm.StateUnknownError, e:
pass
[docs] def test_set_state_currently_handling_event(self):
self.machine._currently_handling_event = True
try:
self.machine.set_state(self.state_root_name)
self.fail("should have raised ReentrantEventException")
except SympleHFSM.ReentrantEventException, e:
pass
[docs] def test_set_state_normal(self):
self.machine.set_state(self.state_root_name)
self.assertTrue(self.machine.current_state == self.state_root_name, "wrong state set after set_state")
[docs] def test_init_optimizedstructure(self):
self.structure.do_optimize()
self._do_test_init(False)
self.assertTrue(self.machine.handle_event == self.machine._handle_event_normal)
[docs] def test_init_unoptimizedstructure(self):
self._do_test_init(False)
self.assertTrue(self.machine.handle_event == self.machine._handle_event_normal)
[docs] def test_init_use_optimization_unoptimizedstructure(self):
self._do_test_init(True)
self.assertTrue(self.machine.handle_event == self.machine._handle_event_normal)
def _do_test_init(self, use_optimization):
self.assertTrue(self.machine.current_state is None, "state should not be set yet")
try:
self.machine.init(use_optimization)
except Exception, e:
self.fail("error: " + str(e))
self.assertTrue(self.machine.current_state is not None, "current state should be set to a state")
self.assertTrue(self.machine.current_state == self.sub_state2, "wrong initial state")
[docs] def test_init_calls_entries(self):
self.machine.init()
self.assertTrue([self.root_entry_name, self.init_state_entry_name] == self.actions.captured_actions,
"inits not called on states: " + str(self.actions.captured_actions))
[docs] def test_exit_calls_exits(self):
self.machine.init()
self.actions.captured_actions = []
self.machine.exit()
self.assertTrue([self.state_exit_name, self.root_exit_name] == self.actions.captured_actions,
"exits not called on states: " + str(self.actions.captured_actions))
[docs] def test_handling_event_not_initialized(self):
try:
self.machine.handle_event("")
self.fail("should have thrown exception")
except SympleHFSM.NotInitializedException, e:
pass
[docs] def test_handling_unkonw_event(self):
self.handle_unkown_event()
[docs] def handle_unkown_event(self):
self.machine.init(True)
current_state = self.machine.current_state
self.machine.handle_event("e")
self.assertTrue(self.machine.current_state == current_state, "current state should be same as before, expected: {0} actual: {1}".format(current_state, self.machine.current_state))
[docs] def test_handle_event_during_action(self):
self.machine.init(False)
self._handle_event_during_action()
def _action_calling_event(self, actions):
self.machine.handle_event("error")
def _handle_event_during_action(self):
try:
self.machine.handle_event(self.event_id_loop_event)
self.fail("should have raised a ReentrantEventException")
except SympleHFSM.ReentrantEventException, e:
pass
def _record_action(self, actions):
actions._record_action()
def _guard_true(self, actions):
self.actions._guard_true()
return True
def _guard_false(self, actions):
self.actions._guard_false()
return False
[docs] def test_guard_false(self):
self.machine.init()
self.machine.handle_event(self.event_id_guard_false)
self.assertTrue(self._guard_false.__name__ in self.actions.captured_actions, "guard false not called")
[docs] def test_guard_true(self):
self.machine.init()
self.machine.handle_event(self.event_id_guard_true)
self.assertTrue(self._guard_true.__name__ in self.actions.captured_actions, "guard true not called")
[docs] def test_calling_handle_event_without_init_raises_exception(self):
try:
self.machine.handle_event(self.event_id_guard_true)
self.faile("should have raised NotInitializedException")
except SympleHFSM.NotInitializedException, ex:
pass
[docs] def test_calling_init_multifple_times_raises_error(self):
self.machine.init()
try:
self.machine.init()
self.fail("should raise InitAlreadyCalledError")
except SympleHFSM.InitAlreadyCalledError, ex:
pass
[docs] def test_calling_exit_multiple_times_does_not_matter(self):
self.machine.init()
self.machine.exit()
self.machine.exit()
[docs] def test_calling_init_after_exit_should_initialize_normally(self):
self.machine.init()
self.machine.exit()
self.machine.init()
# -----------------------------------------------------------------------------
[docs]class SympleHFSMTestsOptimized(SympleHFSMTests):
[docs] def setUp(self):
SympleHFSMTests.setUp(self)
self.structure.do_optimize()
self.machine = SympleHFSM(self.structure, self.actions) #, "SympleHFSMTests.machine")
[docs] def test_handle_event_during_action(self):
self.structure.do_optimize()
self.machine.init(True)
self._handle_event_during_action()
[docs] def test_init_use_optimization_unoptimizedstructure(self):
# this test can only be performed in a not optimized structure
pass
[docs] def test_init_use_optimization_optimizedstructure(self):
self.structure.do_optimize()
self._do_test_init(True)
self.assertTrue(self.machine.handle_event == self.machine._handle_event_optimized)
# -----------------------------------------------------------------------------