mount
mount copied to clipboard
Mount and core.async
Hi,
Sorry to post here instead of Slack (blocked by proxy), and it might be a bit of a noob question, but I've got trouble finding documentation on Mount regarding the stopping process / running concurrent processes.
When I have a starting order : state1 -> state2 -> state3 And the stopping order : state3 -> state2 -> state1
Can I block in the state3 function, waiting to clean up some running concurrent process ?
(For more context :
I've got quite a few channels and go-loops running, with dependencies between them (1 go loop giving tempo -> 3 thread I/O workers -> 1 test go-loop downstream-consumer).
I have the following states in my namespace :
(defstate workers-nbr :start 3)
(defstate close-chan :start (a/chan))
(...)
(defstate heartbeat-chan :start (heartbeat-start) ; starts a go-loop alt!-ing between a timeout and close-chan, returns a chan
:stop #(a/close! close-chan))
(...)
(defstate work-chan :start (workers-start) ; starts 3 long running threads doing IO while heartbeat-chan isn't closed, returns a chan
:stop #(a/close! heartbeat-chan))
(...)
(defstate work-test :start (work-test-start) ; starts a go-loop and returns the go-loop chan
:stop #(a/close! work-chan))
And my "main" :
(m/start)
(.addShutdownHook (Runtime/getRuntime)
(Thread.
(fn []
(a/close! work-test)
(m/stop))))
(<!! work-test))
I get these kinds of exceptions when I exit :
Exception in thread "async-thread-macro-3" java.lang.IllegalArgumentException: No implementation of method: :take! of protocol: #'clojure.core.async.impl.protocols/ReadPort found for class: mount.core.NotStartedState
I guess the states get stopped while my go-loops are still running meaning work-chan is stopped right after #(a/close! work-chan) got fired, but before the go-loop had time to pick up the closed signal, hence the go-loop trying a <! on an already closed state.
I'm thinking of blocking inside the stop functions on the go-loop / threads like :
(defstate work-test :start (work-test-start)
:stop #(do (a/close! work-chan)
(<!! work-test)
Will it work properly ? Are mount and core.async meant to be combined in this way ? Or is it bad design ?
Cheers,
François
a couple observations:
configuration over public constants
(defstate workers-nbr :start 3)
this would usually come from configuration / program parameters rather than being defined as a state
stop functions
is there any reason your stop functions are anonymous functions:
:stop #(a/close! heartbeat-chan)
rather than just:
:stop (a/close! heartbeat-chan)
not stopping state on stop
most likely the root of your problem is that a single state is started and stopped in different states, whereas it should be started and stopped in/as the same state:
(defstate a :start (foo)
:stop b)
(defstate b :start (bar)
:stop a)
vs.
(defstate a :start (foo)
:stop a)
(defstate b :start (bar)
:stop b)
Hi, I'm having a very similar problem. In general, I think mount
is not currently a good tool for managing channel state. My initial instinct was to do something like this:
(mount/defstate requests-channel
:start (async/chan)
:stop (async/close! requests-channel))
But this doesn't actually work. The basic problem is that asynchronous goroutines expect to see a channel in the "closed" state in order to terminate themselves, but mount
replaces the channel object with a NotStartedState
instance and causes them to crash. Eg. if you have standard goroutine code like this:
(async/go-loop []
(when-let [request (async/<! requests/requests-channel)]
;; handle request
(recur)))
Your program will randomly crash because requests-channel
is gone.
Personally, I'm not really a fan of the whole (undocumented) NotStartedState
thing. If it were up to me, stopped states would just get assigned the value you return from :stop
, similar to how component works. That was actually how I thought it worked from reading the documentation.
Personally, I'm not really a fan of the whole (undocumented)
NotStartedState
thing
why NotStartedState
The initial idea was to follow this succession of values:
NotStartedState
=> (start it)
=> (stop it)
=> NotStartedState
Reasoning:
-
The state, once stopped returns to its initial value of
NotStartedState
which gives an immediate feedback in case it is used while it should not be: i.e. rather than chasing NullPointers, etc. -
There are states that do not require a
:stop
function, in which case, if they are stopped, should "no longer be started", and also should not be used:NotStartedState
clearly communicates this (in case they are stopped).
stop
value has "value"
I do not disagree it creates confusion in cases where other parts of an application rely on a stop value to control their flow. However I also think controlling flow based on mutation is a smell.
core.async
made certain design choices, and returning nil
from a closed channel is one of them. It works well in some cases, does not work well in others: for example when you depend on it to get out of a tight go-loop
.
I usually do something like (pseudo code):
(async/go-loop []
(when-let [request (alt!
request-channel ([msg] msg)
stop-channel ([_] nil))]
;; handle request
(recur)))
and then in a :stop
function(s) I send a message to a stop-channel
which:
- explicitly communicates a stop action
- go loop reads without an implicit knowledge that it will stop ones "something" returns
nil
.
I am not saying my way is better, it is just something I feel more comfortable with.
However 47%
of me agrees with you that it would be more expected (less surprising) if a state ended up bound to a "stop value" once a :stop
function is called. Another 4%
and I'll just do it, so I am more than willing to have a discussion about it.
Yeah, I see your point. NotStartedState
obviously does have some error-detection benefits. The interaction with channels is just unfortunate, since the "loop until it returns nil" idiom is so widespread and convenient, especially for things like correctly cleaning up child channels created via sub
or tap
. Wish there was a good compromise solution.
I don't think mount has limitations when it comes to core.async. Besides the solution above with a stop-channel
you can do (pseudo code):
(defn listen-to-events []
(let [req-chan (chan)]
(async/go-loop []
(when-let [request (<! req-chan)]
;; handle request
(recur)))
req-chan))
(defstate listener :start (listen-to-events)
:stop (async/close! listener))
It also conveys a notion of state a bit better since it is not just a channel that is stateful, but a listener as well.
Hi, sorry for my low participation after asking a question, changed a bit of focus at work and went on holidays.
You were obviously right for your first two remarks (external conf, using anonymous function instead of a simple function call block). Your third remark was a bit confusing to me since you display a cyclic dependency with a and b. My go-loop pipeline is acyclic, with a-fetching -> b-accumulating -> c-output. And I had in mind I would be able to manage dependant states/the dependency order with mount, starting automatically c, then b, then a, and stopping a, then b, then c.
I'll have to wrap my head around this a bit more, since it's still not clear to me at this point what could/should be handled by mount as state (the channels, the go-loops, should I have just one restartable state for the whole pipeline ?, etc.). I might still be doing pretty basic mistakes.
Anyway, I'm glad thurn stepped in and fueled the discussion a bit further.
Cheers,
it's still not clear to me at this point what could/should be handled by mount as state (the channels, the go-loops, should I have just one restartable state for the whole pipeline ?, etc.) I might still be doing pretty basic mistakes.
These are not basic mistakes, different developers would come up with different designs and will stand their ground to prove their way is the best :)
It would depend on the application design and usage of course, but when it has to do with core.async
I tend to wrap go-loop
s in mount states. Since a go-loop
should be started and should be stopped and it is stateful.
Sometimes it makes sense to deem the whole pipeline as a stateful component: https://stackoverflow.com/a/40435030/211277
Things to consider when deciding when create a mount state and what should be "in it":
- can I easily test it?
- would some state within "this state" be useful/needed by itself? then split into two states.
- can I avoid creating a mount state by simply using a locally scoped state (i.e.
(let [a 42] ..)
)? - does this state really have / need a lifecycle: i.e.
start
/stop
- etc. :)
This SO answer is awesome / many food for thoughts, thanx !
Little update : I made sure states are responsible for stopping only themselves, but still since I have running go-loops /thread-loops and dependencies (state B loop depending on state A), I keep having some kind of race condition.
I tried with a stop-channel, it seems like state B gets his stop called, which would close its loop in the next cycle (when alt!!-ing on the stop channel)... and then A gets stopped...
But during this last cycle, the B loop is still running and calls A -> IllegalArgumentException: No implementation of method: :my-method! of protocol: #'my-ns/StateAProtocol found for class: mount.core.NotStartedState.
So as I get it, there are only two options here :
- have a way to make the stop process block... with B finishing its whole loop cycle before closing A (not so easy to block on go-loops everywhere) ?
- have a stop value, so my-method! on the stopped state A would result in a no-op (could be easier) ?
Regards,
I don't think mount has limitations when it comes to core.async. Besides the solution above with a
stop-channel
you can do (pseudo code):(defn listen-to-events [] (let [req-chan (chan)] (async/go-loop [] (when-let [request (<! req-chan)] ;; handle request (recur))) req-chan)) (defstate listener :start (listen-to-events) :stop (async/close! listener))
It also conveys a notion of state a bit better since it is not just a channel that is stateful, but a listener as well.
i just wanna put it here for the ones to come - this is working:
(defstate listener
:start (listen-to-events)
:stop (async/close! ^clojure.core.async.impl.protocols/Channel listener))
notice the type hint above --> ^clojure.core.async.impl.protocols/Channel this is what actually making the difference
Why ?
before using the type hint we get this message:
Execution error (IllegalArgumentException) at clojure.core.async.impl.protocols/eval5371$fn$G (protocols.clj:21).
No implementation of method: :close! of protocol: #'clojure.core.async.impl.protocols/Channel found for class: mount.core.DerefableState
it means that it gets mount.core.DerefableState
as the type instead of a channel
so helping it by hinting it to ^Channel
helps!
Enjoy 😉