transitions
transitions copied to clipboard
Improve Parallel State Support
Right now, states must be direct descendants from the same state to be entered in parallel. It would be a significant improvement if this restriction could be lifted. For instance, transitions could pass lists of states (or state names) as dest
to be entered in parallel.
m = HierarchicalMachine(states=['A', 'B', 'C'], transitions=[['go', 'A', ['B', 'C']]], initial='A')
assert m.state == 'A'
m.go()
assert m.state = ['B', 'C']
Nested state machines cannot be parallel:
m1=HierarchicalMachine(states=['a', 'b'], initial='a')
>>> m2=HierarchicalMachine(states=['d', 'e'], initial='b')
>>> m2=HierarchicalMachine(states=['d', 'e'], initial='d')
>>> m3=HierarchicalMachine(states={'name': 'c', 'children': [m1, m2], 'parallel': [m1, m2]}, initial='c')
Traceback (most recent call last):
File "<input>", line 1, in <module>
File "/home/thedrow/.cache/pypoetry/virtualenvs/jumpstarter-q-gDbjwh-py3.9/lib/python3.9/site-packages/transitions/extensions/nesting.py", line 339, in __init__
_super(HierarchicalMachine, self).__init__(*args, **kwargs)
File "/home/thedrow/.cache/pypoetry/virtualenvs/jumpstarter-q-gDbjwh-py3.9/lib/python3.9/site-packages/transitions/core.py", line 565, in __init__
self.add_states(states)
File "/home/thedrow/.cache/pypoetry/virtualenvs/jumpstarter-q-gDbjwh-py3.9/lib/python3.9/site-packages/transitions/extensions/nesting.py", line 488, in add_states
new_state.initial = [s if isinstance(s, string_types) else s['name'] for s in state_parallel]
File "/home/thedrow/.cache/pypoetry/virtualenvs/jumpstarter-q-gDbjwh-py3.9/lib/python3.9/site-packages/transitions/extensions/nesting.py", line 488, in <listcomp>
new_state.initial = [s if isinstance(s, string_types) else s['name'] for s in state_parallel]
TypeError: 'HierarchicalMachine' object is not subscriptable
you cannot pass more than one hsm to the children/states keyword since the states of the hsm are referenced in the named state. The reason is that there might be naming collisions of states defined in m1
and m2
.
m1=HierarchicalMachine(states=['a', 'b'], initial='a')
m2=HierarchicalMachine(states=['a', 'b'], initial='b')
m3=HierarchicalMachine(states=[{'name': 'c', 'children': [m1, m2]}], initial='c') # we now have two definitions for 'a' and 'b' and two potential initial states
We should raise an error for it.
So your suggestion is:
- Two machines should be supported and initial states should be concatenated
- If there is a state naming collision, an error should be raised.
Does this summarize your feature request correctly?
We should:
- Check if children contains more than one state machine in a list. If that's the case, we should raise an error for now.
- Implement parallel states support by nesting state machines.
The parent state machine would trigger transitions in the children state machines and construct the entire state by concating machine.state
.
Update:
It is possible to add two state machines as children and it behaves roughly as intended:
from transitions.extensions.nesting import HierarchicalMachine
m1 = HierarchicalMachine(states=['a', 'b'], initial='a')
m2 = HierarchicalMachine(states=['c', 'd'], initial='c')
m3 = HierarchicalMachine(states=[{'name': 'c', 'children': [m1, m2]}], initial='c')
print(m3.states['c'].initial) # >>> a
print(m3.state) # >>> c_a
The above mentioned case with duplicated naming causes:
ValueError: State a cannot be added since it already exists.
But m3 does not have three states. This just copies the state machines, right?
I think I have a design in mind but it will require refactoring the internals of the core state machine and the removal of most of the implementation of graph variants since they'll be built in.
I propose refactoring the way we store the state machine's internal state. As of now, we use dictionaries and lists to store the states, events and triggers. Since state machines can always be represented in diagrams I propose we store the states as nodes in a graph and use vertices to represent transitions. The triggers can be added as metadata to the vertices. This metadata will also include the vertix type. You might have a situation where you can only transition to state A if your also transitioning to parallel states B and C and it will take less time to check if a transition to A is possible if not all other requirements are met. When that happens you check which other transitions are currently in place and compare to A's requirements which are marked by you guessed it, vertices. You can also model an active transition in the graph by using a special ephemeral vertix which will help you detect what other transitions are taking place right now more easily.
This can be implemented using networkx. In fact, graphviz's API is based on networkx as far as I can tell so you and I could probably implement something without learning a lot of new things. NetworkX can export graphs to dot files and we can use graphviz to render them so the implementation of graph machines will become much simpler.
I also propose we abstract the graph library so that our users could use retworkx or so that we can workaround bugs in their implementation (which I think is unlikely with networkx but other libraries are newer). This will allow the library further extensibility.
Being able to export graphs as networkx graphs would be an awesome feature. EDIT: nevermind, one can easily get networkx graph from a GraphMachine:
nx.nx_agraph.from_agraph(graph_machine.get_graph())
see for instance: https://networkx.org/documentation/stable/reference/drawing.html#module-networkx.drawing.nx_pylab
@aleneum Would you be able to read this paper and resolve the issue? PHFSM.pdf
I will have a look, thanks!
If two or more modules are activated in the same node they have to be executed in parallel. For example, the modules Z1 and Z2 have to be activated in parallel from the module Z0. If two or more modules are called in parallel from the module Za, the module Za is allowed to continue its execution if and only if all called parallel modules have been terminated.
@aleneum This is exactly what I need and it seems like a very useful property to have.
I found something from the SCXML definition which hopefully represent what you had in mind:
When the state machine enters the
child of a element, the SCXML Processor MUST generate the event done.state.id after completion of the elements, where id is the id of the parent state. Immediately thereafter, if the parent is a child of a element, and all of the 's other children are also in final states, the Processor MUST generate the event done.state.id where id is the id of the element.
In other words: If all children of a parallel state are in a final state, a NestedState
could execute on_final
callbacks.
I added on_final
in the branch dev-on-final. I wonder whether I should make this part of 0.9.0 or keep this for 0.9.1.
from transitions.extensions import HierarchicalMachine
from transitions.extensions.states import add_state_features, Tags
@add_state_features(Tags)
class FinalHSM(HierarchicalMachine):
def final_event_raised(self, event_data):
# one way to get the currently finalized state is via the scoped attribute of the machine passed
# with 'event_data'. However, this is done here to keep the example short. In most cases dedicated
# final callbacks will probably result in cleaner and more comprehensible code.
print("{} is final!".format(event_data.machine.scoped.name or "Machine"))
# We initialize this parallel HSM in state A:
# / X
# / / yI
# A -> B - Y - yII [final]
# \ Z - zI
# \ zII [final]
states = ['A', {'name': 'B', 'parallel': [{'name': 'X', 'tags': ['final'], 'on_final': 'final_event_raised'},
{'name': 'Y', 'transitions': [['final_Y', 'yI', 'yII']],
'initial': 'yI',
'on_final': 'final_event_raised',
'states':
['yI', {'name': 'yII', 'tags': ['final']}]
},
{'name': 'Z', 'transitions': [['final_Z', 'zI', 'zII']],
'initial': 'zI',
'on_final': 'final_event_raised',
'states':
['zI', {'name': 'zII', 'tags': ['final']}]
},
],
"on_final": 'final_event_raised'}]
machine = FinalHSM(states=states, on_final='final_event_raised', initial='A', send_event=True)
# X will emit a final event right away
machine.to_B()
# >>> X is final!
print(machine.state)
# >>> ['B_X', 'B_Y_yI', 'B_Z_zI']
# Y's substate is final now and will trigger 'on_final' on Y
machine.final_Y()
# >>> Y is final!
print(machine.state)
# >>> ['B_X', 'B_Y_yII', 'B_Z_zI']
# Z's substate becomes final which also makes all children of B final and thus machine itself
machine.final_Z()
# >>> Z is final!
# >>> B is final!
# >>> Machine is final!
Even though this is not an SCXML feature I added on_final
callbacks to simple machines as well:
from transitions import Machine
from transitions.extensions.states import Tags as State
states = [State(name='idling'),
State(name='rescuing_kitten'),
State(name='offender_escaped', tags='final'),
State(name='offender_caught', tags='final')]
transitions = [["called", "idling", "rescuing_kitten"], # we will come when called
{"trigger": "intervene",
"source": "rescuing_kitten",
"dest": "offender_caught", # we will catch the offender
"unless": "offender_is_faster"}, # unless they are faster
["intervene", "rescuing_kitten", "offender_gone"]]
class FinalSuperhero(object):
def __init__(self, speed):
self.machine = Machine(self, states=states, transitions=transitions, initial="idling", on_final="claim_success")
self.speed = speed
def offender_is_faster(self):
self.speed < 15
def claim_success(self):
print("The kitten is safe.")
hero = FinalSuperhero(speed=10) # we are not in shape today
hero.called()
assert hero.is_rescuing_kitten()
hero.intervene()
# >>> 'The kitten is safe'
assert hero.machine.get_state(hero.state).is_final # it's over
assert hero.is_offender_gone() # maybe next time
I added
on_final
in the branch dev-on-final. I wonder whether I should make this part of 0.9.0 or keep this for 0.9.1.
Generally speaking, if we are following SemVer, new features appear in minor releases.
This is a simplified version of my state machine. The pipe notates a potentially parallel state.
Initializing
↓
Initialized
↓
Starting | (Optionally) Restarting → Dependencies Started | (Optionally) Restarting → Resources Acquired | (Optionally) Restarting → Tasks Started | (Optionally) Restarting
↓
Healthy | (Optionally) Restarted
↗
Started | (Optionally) Restarted → Degraded | (Optionally) Restarted
↘
Unhealthy | (Optionally) Restarted
↓
Stopping | (Optionally) Restarting → Tasks Stopped | (Optionally) Restarting → Resources Released | (Optionally) Restarting → Dependencies Stopped | (Optionally) Restarting
↓
Stopped
A restart()
causes a transition from started
to the stopping
state but also causes the actor to enter the restarting
state. Note that if the actor were stopped normally, we would not be in the restarting
state.
I don't want to declare a restarting
state for every level of my state hierarchy.
Instead, I want the following hierarchy:
- initializing
- initialized
- starting
- dependencies started
- resources acquired
- tasks started
- started
- healthy
- degraded
- unhealthy
- stopping
- stopped
- restarting
- restarted
I'd like to be able to be both in state starting→acquiring resources
and restarting
or in started→healthy
and restarted
without being in ['starting→acquiring resources', 'starting→restarting']
or ['started→healthy', 'started→restarted']
since I want the restarting
/restarted
states to always use the same transition and state callbacks. I could customize machien.state
to remove all separators before those states for display purposes but then I'd have to copy the callbacks around when they change.
Essentially I'd like parallel states to work with an arbitrary level of nesting in the hierarchy if that's possible.
I'm trying to reverse engineer how I thought the feature you're describing but I honestly don't remember my reasoning. It's possible I misunderstood the paper.
Hello @thedrow,
I haven't read the paper from first to last sentence but it appeared to me that they are describing a structure meant for microcontrollers or other single/few chip architectures. Basically each 'module' is a callback which can execute other module. The calling module is blocked/suspended (probably to save memory) and will be resumed when the called module(s) is(/are) done. This kind of 'blocking' architecture can be done with (a couple of) asynchronous machines that allow to process multiple callbacks at the same time but still block until the processes are done.
Another approach to 'continue when substates/submachines are done' could be the aforementioned 'on_final' approach. A branch of the config could look like this:
starting
starting dependencies -> dependencies started [final]
aquiring resources -> resources acquired [final]
starting tasks -> tasks started [final]
Edit: But if I understand your first passage correctly this is what you want to avoid. You could reuse a NestedState
object for all 'starting' tasks to reduce overhead though.
Essentially I'd like parallel states to work with an arbitrary level of nesting in the hierarchy if that's possible.
This is somewhat possible already, even though it might not be straightforward:
from transitions.extensions import HierarchicalMachine
states = [
{'name': 'starting', 'parallel': ['acquiring-resources', 'dependencies-started']}
]
m = HierarchicalMachine(states=states, transitions=[['acquired', 'starting_acquiring-resources', 'starting_dependencies-started']])
m.to_starting()
print(m.state) # ['starting_acquiring-resources', 'starting_dependencies-started']
m.acquired()
print(m.state) # starting_dependencies-started
m.to_initial()
m.to("starting_acquiring-resources")
print(m.state) # starting_acquiring-resources
Another idea could be to add a new wildcard for final states (e.g. '.') instead of/in addition to working with [final] tags. So if a substates transitions to '.', it will 'vanish'. It's parent's on_final
will be called when it's either the last substate while doing this or all other remaining siblings are tagged 'final'.
# just an idea
# ... transitions=[['acquired', 'starting_acquiring-resources', '.']])
print(m.state) # ['starting_acquiring-resources', 'starting_dependencies-started']
m.acquired()
print(m.state) # starting_dependencies-started
Would something like this suffice? This should be possible with 2 state machines:
Edit: Tried to get rid of some edges. What I'd basically attempt to do is to run parallel processes (StartingState, StartedState) in a separate machine that can be run and stopped on demand.
stateDiagram-v2
direction LR
state Machine {
direction LR
StartedState --> Restarting : Restart
StartedState --> Stopping : Stop
Starting --> StartingState : OnEnter
Restarting --> StartingState : OnEnter
Started --> StartedState : OnEnter
Restarted --> StartedState : OnEnter
state Status {
direction LR
[*] --> Initializing
Initializing --> Initialized
Initialized --> Starting
Starting --> Started : StartingState.OnFinal
Started --> Stopping : Stop
Stopping --> Stopped
Started --> Restarting : Restart
Restarting --> Restarted : StartingState.OnFinal
Restarted --> Stopping : Stop
Stopped --> Restarting : Restart
}
--
state StartingState {
[*] --> Starting_Deps
Starting_Deps --> Deps_Started
Deps_Started --> [*] : OnEnter
--
[*] --> Acquring_Ressources
Acquring_Ressources --> Ressources_Acquired
Ressources_Acquired --> [*] : OnEnter
}
--
state StartedState {
direction LR
Healthy --> Degraded
Degraded --> Unhealthy
Unhealthy --> Healthy
Degraded --> Healthy
}
}
Code can be streamlined here and there but this should mimic the diagram above:
from transitions.extensions import HierarchicalMachine
from transitions.core import listify
from transitions.extensions.states import add_state_features, Tags
@add_state_features(Tags)
class StartingOperations(HierarchicalMachine):
pass
states = ["initializing", "initialized", "starting", "started", "restarting", "restarted", "stopping", "stopped"]
transitions = [["starting_done", "starting", "started"], ["starting_done", "restarting", "restarted"],
["restart", "*", "restarting"], ["stop", "*", "stopping"]]
starting_states = [{'name': 'startingOp', 'parallel': [
{"name": "resources", "states": ["acquiring", {"name": "acquired", "tags": "final"}], "initial": "acquiring",
"transitions": [["all_done", "acquiring", "acquired"]]},
{"name": "dependencies", "states": ["starting", {"name": "started", "tags": "final"}], "initial": "starting",
"transitions": [["all_done", "starting", "started"]]},
{"name": "tasks", "states": ["starting", {"name": "started", "tags": "final"}], "initial": "starting",
"transitions": [["all_done", "starting", "started"]]}]
}]
started_states = ["healthy", "degraded", "unhealthy"]
class Model:
def __init__(self):
self.state_machine = HierarchicalMachine(self, states=states, transitions=transitions,
initial="initializing", model_attribute="mainstate")
self.substate_machine = None
@property
def state(self):
return self.mainstate \
if self.substate_machine is None \
else listify(self.substate) + listify(self.mainstate)
def on_enter_starting(self):
self.substate_machine = StartingOperations(self, states=starting_states,
transitions=[["go", "initial", "startingOp"]],
on_final="starting_done",
model_attribute="substate")
self.go()
def on_exit_starting(self):
# TODO: tear down substate machine
self.substate_machine = None
def on_exit_restarting(self):
self.on_exit_starting()
def on_enter_restarting(self):
self.on_enter_starting()
def on_enter_started(self):
self.substate_machine = HierarchicalMachine(self, states=started_states,
initial="healthy", model_attribute="substate")
def on_enter_restarted(self):
self.on_enter_started()
model = Model()
print(model.state) # >>> initializing
model.to_starting()
print(model.state) # >>> ['startingOp_resources_acquiring', 'startingOp_dependencies_starting', 'startingOp_tasks_starting', 'starting']
model.all_done()
print(model.state) # >>> ['healthy', 'started']
model.restart()
print(model.state) # >>> ['startingOp_resources_acquiring', 'startingOp_dependencies_starting', 'startingOp_tasks_starting', 'restarting']
model.all_done()
print(model.state) # >>> ['healthy', 'restarted']
Using before
after
or a list of on_final
callbacks may reduce some lines of code.
I reached a similar solution already without the on_final callback by using a condition. What I'm expecting here is for this to be possible with only one state machine and have transition deal with the implementation details. If multiple machines are required to handle my state, this is an implementation detail of transitions since the implementation can be generalized.
This is how my state machine looks like right now:
What I'm expecting here is for this to be possible with only one state machine and have transition deal with the implementation details.
It can be done with one state machine:
from transitions.extensions import HierarchicalMachine
from transitions.extensions.states import add_state_features, Tags
@add_state_features(Tags)
class TaggedHSM(HierarchicalMachine):
pass
starting_state = {'name': 'startingOp', 'parallel': [
{"name": "resources", "states": ["acquiring", {"name": "acquired", "tags": "final"}], "initial": "acquiring",
"transitions": [["all_done", "acquiring", "acquired"]]},
{"name": "dependencies", "states": ["starting", {"name": "started", "tags": "final"}], "initial": "starting",
"transitions": [["all_done", "starting", "started"]]},
{"name": "tasks", "states": ["starting", {"name": "started", "tags": "final"}], "initial": "starting",
"transitions": [["all_done", "starting", "started"]]}],
"on_final": "done"
}
started_state = {"name": "startedOp", "states": ["healthy", "degraded", "unhealthy"], "initial": "healthy",
"transitions": [["degrade", "healthy", "degraded"], ["degrade", "degraded", "unhealthy"]]}
states = ["initializing", "initialized",
{"name": "starting", "parallel": ["state", starting_state]},
{"name": "started", "parallel": ["state", started_state]},
{"name": "restarting", "parallel": ["state", starting_state]},
{"name": "restarted", "parallel": ["state", started_state]}, "stopping", "stopped"]
transitions = [["restart", "*", "restarting"], ["stop", "*", "stopping"], ["done", "starting", "started"],
["done", "restarting", "restarted"]]
m = TaggedHSM(states=states, transitions=transitions, initial="initializing")
m.to_starting()
print(m.state)
# >>> ['starting_state', ['starting_startingOp_resources_acquiring', 'starting_startingOp_dependencies_starting', 'starting_startingOp_tasks_starting']]
m.all_done()
print(m.state)
# >>> ['started_state', 'started_startedOp_healthy']
m.degrade()
print(m.state)
# >>> ['started_state', 'started_startedOp_degraded']
m.degrade()
print(m.state)
# >>> ['started_state', 'started_startedOp_unhealthy']
m.restart()
print(m.state)
# >>> ['restarting_state', ['restarting_startingOp_resources_acquiring', 'restarting_startingOp_dependencies_starting', 'restarting_startingOp_tasks_starting']]
If starting
and started
states would be initialized in advance, they could be reused for easier configuration and slightly less memory footprint.
I tried to implement NestedTransition(source="A", dest=["B_1", "B_2"])
but eventually gave up. Don't know why anymore but it wasn't a trivial enhancement. Maybe I give it another try. But I cannot promise fast results or results at all.
I added an experimental multi-dest feature for testing. It passes all tests but still could break other things like GraphSupport
:
from transitions.extensions import HierarchicalMachine
import logging
starting_state = {'name': 'startingOp', 'parallel': [
{"name": "resources", "states": ["acquiring", "acquired"], "initial": "acquiring",
"transitions": [["all_done", "acquiring", "acquired"]]},
{"name": "dependencies", "states": ["starting", "started"], "initial": "starting",
"transitions": [["all_done", "starting", "started"]]},
{"name": "tasks", "states": ["starting", "started"], "initial": "starting",
"transitions": [["all_done", "starting", "started"]]}],
"on_final": "done"
}
started_state = {"name": "startedOp", "states": ["healthy", "degraded", "unhealthy"], "initial": "healthy",
"transitions": [["degrade", "healthy", "degraded"], ["degrade", "degraded", "unhealthy"]]}
states = ["initializing", "initialized", "starting", "started", "restarting", "restarted",
"stopping", "stopped", starting_state, started_state]
transitions = [
["start", "*", ["starting", "startingOp"]],
["restart", "*", ["restarting", "startingOp"]],
["ready", "starting", ["started", "startedOp"]],
["ready", "restarting", ["restarted", "startedOp"]],
["stop", ["starting", "restarting"], "stopping"]
# wildcard in 'stop' would enter and exit stopping multiple times when more than one state is active
]
logging.basicConfig(level=logging.DEBUG)
m = HierarchicalMachine(states=states, transitions=transitions, initial="initializing")
print(m.state)
# >>> initializing
m.start()
print(m.state)
# >>> ['starting', ['startingOp_resources_acquiring', 'startingOp_dependencies_starting', 'startingOp_tasks_starting']]
m.all_done()
print(m.state)
# >>> ['starting', ['startingOp_resources_acquired', 'startingOp_dependencies_started', 'startingOp_tasks_started']]
m.ready()
print(m.state)
# >>> ['started', 'startedOp_healthy']
m.degrade()
print(m.state)
# >>> ['started', 'startedOp_degraded']
m.restart()
print(m.state)
# >>> ['restarting', ['startingOp_resources_acquiring', 'startingOp_dependencies_starting', 'startingOp_tasks_starting']]
m.ready()
# >>> ['restarted', 'startedOp_healthy']
m.stop()
print(m.state)
# >>> stopping
I was wondering what the status of this PR is.
I've got two parallel state machines which I want to transition to from some state in a different machine. When I try to trigger the transition into them, I get
components/base_model.py:99: in step
self.next()
venv/lib/python3.9/site-packages/transitions/extensions/nesting.py:807: in trigger_event
return self._process(partial(self._trigger_event, event_data, trigger))
venv/lib/python3.9/site-packages/transitions/core.py:1211: in _process
return trigger()
venv/lib/python3.9/site-packages/transitions/extensions/nesting.py:812: in _trigger_event
res = self._trigger_event_nested(event_data, trigger, None)
venv/lib/python3.9/site-packages/transitions/extensions/nesting.py:1157: in _trigger_event_nested
tmp = event_data.event.trigger_nested(event_data)
venv/lib/python3.9/site-packages/transitions/extensions/nesting.py:140: in trigger_nested
self._process(event_data)
venv/lib/python3.9/site-packages/transitions/extensions/nesting.py:154: in _process
event_data.result = trans.execute(event_data)
venv/lib/python3.9/site-packages/transitions/core.py:277: in execute
self._change_state(event_data)
venv/lib/python3.9/site-packages/transitions/extensions/diagrams.py:44: in _change_state
super(TransitionGraphSupport, self)._change_state(
venv/lib/python3.9/site-packages/transitions/extensions/nesting.py:276: in _change_state
state_tree, exit_partials, enter_partials = self._resolve_transition(event_data)
venv/lib/python3.9/site-packages/transitions/extensions/nesting.py:264: in _resolve_transition
new_states, enter_partials = self._enter_nested(root, dst_name_path, scope + root, event_data)
venv/lib/python3.9/site-packages/transitions/extensions/nesting.py:292: in _enter_nested
new_states[state_name], new_enter = self._enter_nested([], dest, prefix_path + [state_name], event_data)
venv/lib/python3.9/site-packages/transitions/extensions/nesting.py:302: in _enter_nested
initial_states = [event_data.machine.scoped.states[n] for n in initial_names]
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
.0 = <list_iterator object at 0x10555ef10>
> initial_states = [event_data.machine.scoped.states[n] for n in initial_names]
E KeyError: ''
venv/lib/python3.9/site-packages/transitions/extensions/nesting.py:302: KeyError
And I can't figure out why the key would be empty here. I'm also not clear on how to define the transition "next" from my single state machine's state to the children of the two parallel state machines, since this would be multiple destinations — that seems not to be allowed according to this issue.
Hello @translunar,
could you provide an MWE for me to work with? The multi-destination support as well as on_final
has not been merged so far. I will start working on it at the beginning of April though.
Sure. Here's an MWE.
from transitions.extensions import HierarchicalGraphMachine as BaseMachine
class Toggle(BaseMachine):
def __init__(self):
states = ["a", "b"]
transitions = [
{"trigger": "next", "source": "a", "dest": "b"},
{"trigger": "next", "source": "b", "dest": "a"},
]
super().__init__(states=states, transitions=transitions, initial="a")
class ParallelA(BaseMachine):
def __init__(self, toggle):
states = [
{"name": "simple",},
{"name": "complex", "children": [toggle,],}
]
transitions = [
{"trigger": "next", "source": "simple", "dest": "complex_a",},
]
super().__init__(states=states, transitions=transitions, initial="simple")
class ParallelB(BaseMachine):
def __init__(self, toggle):
states = [
{"name": "startpid", "on_enter": self.start_pid,},
{"name": "complexx", "children": [toggle,],},
]
transitions = [
{"trigger": "next", "source": "startpid", "dest": "complexx_b",},
]
super().__init__(states=states, transitions=transitions, initial="startpid")
def start_pid(self):
print("starting PID controller")
class Outer(BaseMachine):
def __init__(self, para, parb):
states = [
{"name": "locked",},
{"name": "warming",},
{"name": "auto", "parallel": [para, parb,],},
{"name": "cooling",}
]
transitions = [
{"trigger": "unlock", "source": "locked", "dest": "warming",},
{"trigger": "next", "source": "warming", "dest": "auto",},
{"trigger": "shutdown", "source": "auto", "dest": "cooling",},
{"trigger": "lock", "source": "cooling", "dest": "locked",},
]
super().__init__(states=states, transitions=transitions, initial="locked")
if __name__ == "__main__":
tog = Toggle()
para = ParallelA(tog)
parb = ParallelB(tog)
outer = Outer(para, parb)
outer.unlock()
outer.next()