conduit
conduit copied to clipboard
The MonadUnliftIO instance of ResourceT makes it incredibly tricky to use with threads
I was using MonadUnliftIO so I could easily parallelise functions by running them in multiple threads, but after a refactoring to use more parallelism I've started noticing some errors from ResourceT
about cleanup being triggered/accessed after freeing.
After diving through a bunch of sources, this seems to be fairly obviously caused by the fact that the MonadUnliftIO instance of ResourceT naively duplicates the IORef that tracks all the cleanups. The end result is that every thread independently ends up running the cleanup on these resources, hence triggering an InvalidAccess
exception.
I can't simply use resourceForkWith
or resourceForkIO
because my code doesn't use/isn't aware of ResourceT
, but I also have no way to stop users from passing me functions that use ResourceT
somewhere internally, which will end up causing all sorts of failures.
I think maybe there should be an additional MonadUnliftIOThreadSafe
(or whatever) class that is restricted to monads that are safe to run in separate threads and not implementing it for ResourceT (I'm not sure whether this should be an issue with unliftio-core rather than resourcet...)
On further thinking I don't think this is enough, there is no way for a user to know that the IO
they produce via MonadUnliftIO
will be run in the same thread as the original ResourceT
, so I think the only safe thing to do is to have the MonadUnliftIO
instance increment the reference count like the fork functions do...
I agree that the current situation is unsatisfying, but the case you're hitting is relatively rare, and the solutions all involve a significant overhead. Perhaps using atomic-primops
for the counter instead of an IORef
would make enough of a difference that it wouldn't matter.
The reason this is rare in my experience is that, by using functions like concurrently
and race
instead of forkIO
, the resource management works correctly.
Maybe is a quicker/simpler stopgap there should be a bracketing function like:
withResources :: (MonadResource m, MonadUnliftIO m) => m (m a -> IO a)
Which gets the current resource map and returns a bracketing function which wraps another computation with increments/decrements of the resource map, so that library authors (i.e., me ;)) can simply do:
foo = do
withRes <- withResources
forkIO $ withRes actualComputation
To more easily fix this issue (I have, essentially, implemented this in my own code, now, but it relies on stateAlloc and cleanState from Control.Monad.Trans.Resource.Internal
and I never feel comfortable being dependent on .Internal
APIs
Even that isn't sufficient, though it may help. There's a race condition in whether withRes
increments the counter before the context in the parent thread decrements the counter. I put together some WIP code quickly using atomic-primops, and immediately ran into that issue when trying to get the tests to pass.
Ah, yeah...my own fix was paranoid enough to prevent that (I call stateAlloc before forking and then install a cleanup at thread exit), but I couldn't think of a convenient API to make that accessible...