stm-conduit
stm-conduit copied to clipboard
Question about terminating gatherFrom in the event of an outer exception
Looking at the gatherFrom code (pasted below for convenient), I've got a scatter function that starts up a TCP server. Ideally, this TCP server would be restarted in the case gatherFrom is called again due to exception handling at the outer level. Of course, it became obvious that this wasn't possible due to the bound listening socket never being closed. I'm not immediately sure of the best way to handle this, though one possibility is perhaps rewriting the code to use withAsync. However, even if possible, I'm not even sure that would be desirable for all use cases.
gatherFrom :: (MonadIO m, MonadUnliftIO m)
=> Int -- ^ Size of the queue to create
-> (TBQueue o -> m ()) -- ^ Action that generates output values
-> ConduitT () o m ()
gatherFrom size scatter = do
chan <- liftIO $ newTBQueueIO (fromIntegral size)
worker <- lift $ async (scatter chan)
gather worker chan
where
gather worker chan = do
(xs, mres) <- liftIO $ atomically $ do
xs <- whileM (not <$> isEmptyTBQueue chan) (readTBQueue chan)
(xs,) <$> pollSTM worker
traverse_ yield xs
case mres of
Just (Left e) -> liftIO $ throwIO (e :: SomeException)
Just (Right r) -> return r
Nothing -> gather worker chan
I tried an alternative approach, but it has had interesting (detrimental) results...
gatherFromA :: (MonadIO m, MonadUnliftIO m)
=> Int -- ^ Size of the queue to create
-> (TBQueue o -> m ()) -- ^ Action that generates output values
-> m (ConduitT () o m (), Async ())
gatherFromA size scatter = do
chan <- newTBQueueIO (fromIntegral size)
worker <- async (scatter chan)
pure $ (gather worker chan, worker)
where
gather worker chan = do
(xs, mres) <- liftIO $ atomically $ do
xs <- whileM (not <$> isEmptyTBQueue chan) (readTBQueue chan)
(xs,) <$> pollSTM worker
traverse_ yield xs
case mres of
Just (Left e) -> liftIO $ throwIO (e :: SomeException)
Just (Right r) -> return r
Nothing -> gather worker chan
When I cancel the returned Async () in bracket where gatherFromA is used, it actually kills my entire program!
I spoke too soon, if I comment out the call to cancel, my program is still crashing for some unknown reason, though this is new behavior.
UPDATE: it appears to be a memory leak somewhere; I haven't yet narrowed down whether it is related to the above change.
Some profiling output below. It does appear that the crash is related to the implementation of gatherFromA at first glance, though my current guess is that it is more to do with how the code is being used; will continue to investigate.


As it turns out, that was a memory leak unrelated to the implementation of gatherFromA, so I think this form of gatherFrom is still useful - it solved my original problem.
I can plan to make a PR of this that implements gatherFrom in terms of gatherFromA.
Sorry for the late response. Do I understand correctly that gatherFromA differ from gather in a way that we return an async of the worker so we can wait on it or kill it?
And in this case gatherFrom can be implemented in terms for gatherFromA? In this case I don't see problems with a PR that will introduce it.