stm-conduit icon indicating copy to clipboard operation
stm-conduit copied to clipboard

Question about terminating gatherFrom in the event of an outer exception

Open bbarker opened this issue 5 years ago • 5 comments

Looking at the gatherFrom code (pasted below for convenient), I've got a scatter function that starts up a TCP server. Ideally, this TCP server would be restarted in the case gatherFrom is called again due to exception handling at the outer level. Of course, it became obvious that this wasn't possible due to the bound listening socket never being closed. I'm not immediately sure of the best way to handle this, though one possibility is perhaps rewriting the code to use withAsync. However, even if possible, I'm not even sure that would be desirable for all use cases.

gatherFrom :: (MonadIO m, MonadUnliftIO m)
           => Int                -- ^ Size of the queue to create
           -> (TBQueue o -> m ()) -- ^ Action that generates output values
           -> ConduitT () o m ()
gatherFrom size scatter = do
    chan   <- liftIO $ newTBQueueIO (fromIntegral size)
    worker <- lift $ async (scatter chan)
    gather worker chan
  where
    gather worker chan = do
        (xs, mres) <- liftIO $ atomically $ do
            xs <- whileM (not <$> isEmptyTBQueue chan) (readTBQueue chan)
            (xs,) <$> pollSTM worker
        traverse_ yield xs
        case mres of
            Just (Left e)  -> liftIO $ throwIO (e :: SomeException)
            Just (Right r) -> return r
            Nothing        -> gather worker chan

bbarker avatar Sep 16 '20 21:09 bbarker

I tried an alternative approach, but it has had interesting (detrimental) results...

gatherFromA :: (MonadIO m, MonadUnliftIO m)
           => Int                -- ^ Size of the queue to create
           -> (TBQueue o -> m ()) -- ^ Action that generates output values
           -> m (ConduitT () o m (), Async ())
gatherFromA size scatter = do
    chan <- newTBQueueIO (fromIntegral size)
    worker <- async (scatter chan)
    pure $ (gather worker chan, worker)
  where
    gather worker chan = do
        (xs, mres) <- liftIO $ atomically $ do
            xs <- whileM (not <$> isEmptyTBQueue chan) (readTBQueue chan)
            (xs,) <$> pollSTM worker
        traverse_ yield xs
        case mres of
            Just (Left e)  -> liftIO $ throwIO (e :: SomeException)
            Just (Right r) -> return r
            Nothing        -> gather worker chan

When I cancel the returned Async () in bracket where gatherFromA is used, it actually kills my entire program!

bbarker avatar Sep 16 '20 23:09 bbarker

I spoke too soon, if I comment out the call to cancel, my program is still crashing for some unknown reason, though this is new behavior.

UPDATE: it appears to be a memory leak somewhere; I haven't yet narrowed down whether it is related to the above change.

bbarker avatar Sep 16 '20 23:09 bbarker

Some profiling output below. It does appear that the crash is related to the implementation of gatherFromA at first glance, though my current guess is that it is more to do with how the code is being used; will continue to investigate.

FarmDataServer exe_hc

FarmDataServer exe_hd

bbarker avatar Sep 17 '20 14:09 bbarker

As it turns out, that was a memory leak unrelated to the implementation of gatherFromA, so I think this form of gatherFrom is still useful - it solved my original problem.

I can plan to make a PR of this that implements gatherFrom in terms of gatherFromA.

bbarker avatar Sep 24 '20 20:09 bbarker

Sorry for the late response. Do I understand correctly that gatherFromA differ from gather in a way that we return an async of the worker so we can wait on it or kill it?

And in this case gatherFrom can be implemented in terms for gatherFromA? In this case I don't see problems with a PR that will introduce it.

qnikst avatar Oct 10 '20 20:10 qnikst