streamly icon indicating copy to clipboard operation
streamly copied to clipboard

Pipes mailboxes-like feedback on events

Open rainbyte opened this issue 5 years ago • 1 comments
trafficstars

I was wondering if this kind of Producers/Consumers behavior could be implemented using streamly.

This piece of code is implemented using pipes-concurrency with mailboxes.

Details:

  • Handlers consume events from their respectives mailboxes.
  • Handlers produce events which could be sent to other mailboxes.
  • Sometimes there is feedback (eg. mailboxA -> handlerA -> mailboxB -> handlerB -> mailboxA).
  • Operator >-> connects producers, pipes (handlers), and consumers.
mailboxA <- spawn unbounded
mailboxB <- spawn unbounded
mailboxC <- spawn unbounded

_ <- forkIO $ runEffect $
    fromMailbox mailboxA >-> handlerA >-> toMailbox (mailboxB <> mailboxC)
_ <- forkIO $ runEffect $
    fromMailbox mailboxB >-> handlerB >-> toMailbox mailboxA
_ <- forkIO $ runEffect $
    fromMailbox mailboxC >-> handlerC

It is not clear to me how to implement something like this using streamly in a clean way.

I have been using MVars to emulate the feedback, but it is just an IO hack.

handlerB :: MonadAsync m => MVar () -> EventB -> SerialT m ()
handlerB signalVar ev =
    liftIO $ case ev of
        SomeEventB -> do
            putStrLn "SomeEventB happened, signal handlerA"
            putMVar signalVar ()
        _ -> putStrLn "Other EventB happened"

someSourceA :: MonadAsync m => MVar () -> SerialT m EventA
someSourceA = S.repeatM . (*> pure SomeEventA) . liftIO . takeMVar

I hope there is a better way to do this without MVars or other explicit IO.

Thanks in advance!

rainbyte avatar Jan 02 '20 04:01 rainbyte

As of now, we lack the facilities to create arbitrary graphs (with feedback cycles) of streams declaratively. Using a reference in IO, as you are doing, seems to be the only way to create such graphs as of now. You could perhaps use the internal data structure SVar as well instead of MVar. Internally, we have operations like toSVar and fromSVar but SVar and its operations are not standardized yet so they are not exposed officially, they remain internal.

You may also be interested in the concatMapLoopWith operation here. This can perform a transformation and provide feedback from output to input.

Creating arbitrary graphs of streams is in the plan but we do not yet know how it will look like, this is more of research topic as of now. concatMapLoopWith is part of experiments related to that.

harendra-kumar avatar Jan 02 '20 09:01 harendra-kumar