streaming
streaming copied to clipboard
Promptness
Following on from the general discussion in #18, I'd like to hear if you have any thoughts on adding prompt finalisation into the library. I'm currently working on https://github.com/ocharles/streaming-postgresql-simple, and using bracketStream when declaring a transaction for a cursor in order to correctly stream results. Transactions are very expensive, and you definitely do not want them open for longer than they need to be. Sadly, we don't have prompt finalisation, which means the following keeps the transaction open for as long as the whole computation:
runResourceT $ do
S.print . S.take 1 $
streamWithOptionsAndParser_ defaultFoldOptions
(Pg.fromRow :: RowParser (Pg.Only Int))
c
"VALUES (1), (2), (3), (4), (5)")
liftIO (threadDelay 10000000)
As observed in #18, it's take that is problematic here, as it means the final "prompt" release added by bracketStream is never ran.
Any thoughts?
The pipes-safe approach to lifting mask seems to work:
data Restore m = Unmasked | Masked (forall x . m x -> m x)
liftMask
:: forall m f r . (MonadIO m, MonadCatch m, Functor f)
=> (forall s . ((forall x . m x -> m x) -> m s) -> m s)
-> ((forall x . Stream f m x -> Stream f m x)
-> Stream f m r)
-> Stream f m r
liftMask maskVariant k = do
ioref <- liftIO $ newIORef Unmasked
let -- mask adjacent actions in base monad
loop :: Stream f m r -> Stream f m r
loop (Step f) = Step (fmap loop f)
loop (Return r) = Return r
loop (Effect m) = Effect $ maskVariant $ \unmaskVariant -> do
-- stash base's unmask and merge action
liftIO $ writeIORef ioref $ Masked unmaskVariant
m >>= chunk >>= return . loop
-- unmask adjacent actions in base monad
unmask :: forall q. Stream f m q -> Stream f m q
unmask (Step f) = Step (fmap unmask f)
unmask (Return q) = Return q
unmask (Effect m) = Effect $ do
-- retrieve base's unmask and apply to merged action
Masked unmaskVariant <- liftIO $ readIORef ioref
unmaskVariant (m >>= chunk >>= return . unmask)
-- merge adjacent actions in base monad
chunk :: forall s. Stream f m s -> m (Stream f m s)
chunk (Effect m) = m >>= chunk
chunk s = return s
loop $ k unmask
instance (MonadMask m, MonadIO m, Functor f) => MonadMask (Stream f m) where
mask = liftMask mask
uninterruptibleMask = liftMask uninterruptibleMask
which may be beneficial for at least masking the acquisition of resources.
Also, re-reading http://www.haskellforall.com/2013/01/pipes-safe-10-resource-management-and.html seems to suggest that it's impossible to have prompt finalisation in the take example. I guess that mean the best I can do is:
runResourceT $ do
S.print . S.take 1 $
streamWithOptionsAndParser_ defaultFoldOptions
(Pg.fromRow :: RowParser (Pg.Only Int))
c
"VALUES (1), (2), (3), (4), (5)")
liftIO (threadDelay 10000000)
That is, run my Stream with runResourceT as soon as possible.
Right, when I was writing this I was just thinking of very simple uses, partly to show how simple dumb simple preludish uses could be. ResourceT was understood as a means to avoiding opening a closure; I would probably have done better, given my purpose, just using Managed, though this makes the pleasing "writeFile x $ readFile y" impossible. I will have to study this not having thought about it in a while. Is your suspicion that Pipes.Safe is a better path in any case? I would in a way prefer to use it for reasons of piety. I was also thinking of getting the things like writeFile into a Pipes.Prelude.IO with mostly the same api (using Text instead of String for the simple line-by-line stuff etc.) and putting a MonadResource or MonadSafe instance in an orphan package. This is because more than a few times people have complained that, since Streaming.Prelude has a lot of really simple uses, the pile of dependencies that come with ResourceT is too high. It is possible for the main business to have very few dependencies. Again, all of this will involve hitting the books, I haven't been thinking about ResourceT/SafeT
My current feeling is that I can get what I want if we had the MonadMask instance on Stream, provided it actually makes sense. With that, we can have:
bracket
:: (Functor f, MonadIO m, MonadMask m, MonadResource m)
=> m a -> (a -> IO ()) -> (a -> Stream f m c) -> Stream f m c
bracket before after action = mask $ \restore -> do
h <- lift before
r <- restore (action h) `onException` after h
liftIO (after h)
return r
onException :: (Functor f, MonadResource m) => Stream f m a -> IO () -> Stream f m a
m1 `onException` io = do
key <- lift (register io)
clean key m1
where
clean key = loop
where
loop str =
case str of
Return r -> Effect (unprotect key >> return (Return r))
Effect m -> Effect (fmap loop m)
Step f -> Step (fmap loop f)
onException is basically your bracketStream - run the stream as normal. If we get to the end we don't run the cleanup, but if we are terminated early or crash, run a cleanup. With that and a MonadMask instance on Stream, we can implement bracket.
It would appear that you basically need to export bracket, bracket_, etc, just as pipes-safe does, because you have different semantics in onException.
To summon one more voice - @Gabriel439, am I correct in thinking that pipes-safe only does prompt finalisation if upstream terminates? That is, in the case of take (or something else downstream) terminating early, your only hope is to surround the offending composition with runSafeP?
@ocharles: pipes-safe does not do prompt finalization at all. The only way the computation promptly finalizes is if it terminates successfully without upstream or downstream terminating prematurely.
You are correct that the only way to ensure prompt finalization if you expect premature termination is to surround the computation with runSafeP to bound when finalization occurs
As a side note, I just want to point out that pipes-safe would never be biased towards upstream or downstream because the underlying Proxy machinery is symmetric. In this case, it's equally bad in both directions :)
Does a MonadMask instance make sense? Part of the trouble with these classes and something like Stream and FreeT is reasoning about what could be happening with the completely general functor argument. I wish I could get enough time right now to think this through properly.
Sorry, somehow the order of events had me misunderstand your MonadMask remark so I ended up reproducing its content. I think the order of events is now sound. The trouble would be how to constrain f.
Sorry this has been a bit confused as I am working in minute intervals. As an experimental half-measure I put up a version with a MonadMask instance restricted to what I can definitely understand, name where f ~ (Of a) . I will experiment with whether it actually does anything desirable.
I added bracket and onException to Streaming.Internal as you defined them, except following the absurdly narrow constraint I put on the MonadMask instance.
Don't worry about the incoherent ramblings, it's still productive watching you think :) It's interesting to note that it's really f that becomes problematic. It does seem that we really want to say that f ~ (X, a)+, where X is some associated information at each functorial layer, and I write + to indicate "1 or more". That does seem to rule out MonadMask (Stream (Stream ..) ..), which we'd get with chunksOf, but that really is a stream of non-empty streams, which can't capture right now. It also seems to rule out groupsOf, which effectively forks the Stream. Hrm.
If you want to leave this out, I actually don't mind. I don't need a MonadMask instance for Stream, it's enough to just have bracket and onException (which I now call onTermination). These can make use of maskStream, which is mask, but specialised to Stream (Of _) _ _. What do you think about that?
I wonder if (MonadMask f, MonadMask m) => MonadMask (Stream f m) is correct? After all, we can have MonadMask Of where mask a = a id... right? (CatchT does this to define a "pure" way to mask asynchronous exceptions).
When promptness and resource management are an issue, I like to have continuation-accepting or Managed-returning function variants, like
streamWithOptionsAndParser :: (ToRow params, MonadMask m)
=> FoldOptions -> RowParser row -> Connection -> Query -> params
-> (Stream (Of row) m () -> m r) -> m r
They are less flexible than MonadResource but in many cases they are enough.
Daniel Díaz Carrete [email protected] writes:
When promptness and resource management are an issue, I like to have continuation-accepting or
Managed-returning function variants, likestreamWithOptionsAndParser :: (ToRow params, MonadMask m) => FoldOptions -> RowParser row -> Connection -> Query -> params -> (Stream (Of row) m () -> m r) -> m rThey are less flexible than
MonadResourcebut in many cases they are enough.
Indeed, maybe this is what I want.
What would be needed to write something like this, or to make it simple to get to it? (Sorry I haven't had time yet to focus on this. )
I meant to say, a problem with instance MonadMask f, MonadMask m is that it requires a Monad instance on Of to get things started, but this exists only for the Of m where m is a monoid.
@ocharles, it's pretty easy to represent a nonempty stream:
data NES f m a =
Step1 !(f (Stream f m a))
| Effect1 (m (NES f m a))
Or, as a not-too-bad shortcut,
newtype NES f m a = NES (m (f (Stream f m a)))