streamly icon indicating copy to clipboard operation
streamly copied to clipboard

Check fromCallback cleanup behavior

Open harendra-kumar opened this issue 2 years ago • 1 comments

We add the current threadId in the channel. Need to check if there are any issues when the main stream stops.

Also, do we need another callback which can be called to indicate the end of stream?

We need to ensure that this code works as expected:

mirrorStream :: (MonadAsync m, MonadCatch m) => Stream m a -> m (Stream m a, Stream m a)
mirrorStream stream = do
  emiters <- newIORef []
  pure
    ( stream
        & Stream.finally
          ( do
              emiters' <- readIORef emiters
              forM_ emiters' \emiter -> emiter Nothing
          )
        & Stream.parMapM
          id
          ( \x -> do
              emiters' <- readIORef emiters
              forM_ emiters' \emiter -> emiter (Just x)
              pure x
          )
    , Stream.fromCallback (\emit -> modifyIORef' emiters (emit :)) & Stream.takeWhile isJust & Stream.catMaybes
    )

Notes:

Need to start draining the second stream before the first stream otherwise the callback won't be registered. We can use some synchronization mechanism e.g. MVar to start evaluating the main stream after all the consumers are registered.

Also, check if there is sufficient concurrency.

harendra-kumar avatar Mar 20 '23 08:03 harendra-kumar

There a few things to take care of in the sample code above:

  1. Make the callback caller block if a queue threshold at the consumer is reached, so that the queue does not grow arbitrarily.
  2. Use atomicModifyIORef' to ensure correctness when the second stream is evaluated from concurrent threads.
  3. The callback is registered when the stream starts getting evaluated, so it may miss events from the source stream before that. That may or may not be ok depending on the use case.
  4. In general, the consumer stream needs to remove the callback from the list of callbacks when it is done, though it depends on the specific use case. For that it will have to associate some id with the callback. The IORef can keep an id counter for that and we can store a (id, callback) tuple.
  5. Exceptions in the consumer streams cannot be propagated back to the source stream.

harendra-kumar avatar Mar 20 '23 12:03 harendra-kumar