streaming icon indicating copy to clipboard operation
streaming copied to clipboard

Promptness

Open ocharles opened this issue 8 years ago • 17 comments

Following on from the general discussion in #18, I'd like to hear if you have any thoughts on adding prompt finalisation into the library. I'm currently working on https://github.com/ocharles/streaming-postgresql-simple, and using bracketStream when declaring a transaction for a cursor in order to correctly stream results. Transactions are very expensive, and you definitely do not want them open for longer than they need to be. Sadly, we don't have prompt finalisation, which means the following keeps the transaction open for as long as the whole computation:

runResourceT $ do
  S.print . S.take 1 $
    streamWithOptionsAndParser_ defaultFoldOptions  
                                         (Pg.fromRow :: RowParser (Pg.Only Int)) 
                                         c 
                                         "VALUES (1), (2), (3), (4), (5)")
  liftIO (threadDelay 10000000)

As observed in #18, it's take that is problematic here, as it means the final "prompt" release added by bracketStream is never ran.

Any thoughts?

ocharles avatar Jan 26 '17 11:01 ocharles

The pipes-safe approach to lifting mask seems to work:

data Restore m = Unmasked | Masked (forall x . m x -> m x)

liftMask
    :: forall m f r . (MonadIO m, MonadCatch m, Functor f)
    => (forall s . ((forall x . m x -> m x) -> m s) -> m s)
    -> ((forall x . Stream f m x -> Stream f m x)
        -> Stream f m r)
    -> Stream f m r
liftMask maskVariant k = do
    ioref <- liftIO $ newIORef Unmasked

    let -- mask adjacent actions in base monad
        loop :: Stream f m r -> Stream f m r
        loop (Step f)   = Step (fmap loop f)
        loop (Return r) = Return r
        loop (Effect m) = Effect $ maskVariant $ \unmaskVariant -> do
            -- stash base's unmask and merge action
            liftIO $ writeIORef ioref $ Masked unmaskVariant
            m >>= chunk >>= return . loop

        -- unmask adjacent actions in base monad
        unmask :: forall q. Stream f m q -> Stream f m q
        unmask (Step f)   = Step (fmap unmask f)
        unmask (Return q) = Return q
        unmask (Effect m) = Effect $ do
            -- retrieve base's unmask and apply to merged action
            Masked unmaskVariant <- liftIO $ readIORef ioref
            unmaskVariant (m >>= chunk >>= return . unmask)

        -- merge adjacent actions in base monad
        chunk :: forall s. Stream f m s -> m (Stream f m s)
        chunk (Effect m) = m >>= chunk
        chunk s          = return s

    loop $ k unmask

instance (MonadMask m, MonadIO m, Functor f) => MonadMask (Stream f m) where
    mask                = liftMask mask
    uninterruptibleMask = liftMask uninterruptibleMask

which may be beneficial for at least masking the acquisition of resources.

ocharles avatar Jan 26 '17 12:01 ocharles

Also, re-reading http://www.haskellforall.com/2013/01/pipes-safe-10-resource-management-and.html seems to suggest that it's impossible to have prompt finalisation in the take example. I guess that mean the best I can do is:

runResourceT $ do
  S.print . S.take 1 $
    streamWithOptionsAndParser_ defaultFoldOptions  
                                         (Pg.fromRow :: RowParser (Pg.Only Int)) 
                                         c 
                                         "VALUES (1), (2), (3), (4), (5)")
liftIO (threadDelay 10000000)

That is, run my Stream with runResourceT as soon as possible.

ocharles avatar Jan 26 '17 12:01 ocharles

Right, when I was writing this I was just thinking of very simple uses, partly to show how simple dumb simple preludish uses could be. ResourceT was understood as a means to avoiding opening a closure; I would probably have done better, given my purpose, just using Managed, though this makes the pleasing "writeFile x $ readFile y" impossible. I will have to study this not having thought about it in a while. Is your suspicion that Pipes.Safe is a better path in any case? I would in a way prefer to use it for reasons of piety. I was also thinking of getting the things like writeFile into a Pipes.Prelude.IO with mostly the same api (using Text instead of String for the simple line-by-line stuff etc.) and putting a MonadResource or MonadSafe instance in an orphan package. This is because more than a few times people have complained that, since Streaming.Prelude has a lot of really simple uses, the pile of dependencies that come with ResourceT is too high. It is possible for the main business to have very few dependencies. Again, all of this will involve hitting the books, I haven't been thinking about ResourceT/SafeT

michaelt avatar Jan 26 '17 12:01 michaelt

My current feeling is that I can get what I want if we had the MonadMask instance on Stream, provided it actually makes sense. With that, we can have:

bracket
  :: (Functor f, MonadIO m, MonadMask m, MonadResource m)
  => m a -> (a -> IO ()) -> (a -> Stream f m c) -> Stream f m c
bracket before after action = mask $ \restore -> do
    h <- lift before
    r <- restore (action h) `onException` after h
    liftIO (after h)
    return r

onException :: (Functor f, MonadResource m) => Stream f m a -> IO () -> Stream f m a
m1 `onException` io = do
  key <- lift (register io)
  clean key m1
  where
    clean key = loop
      where
        loop str =
          case str of
            Return r -> Effect (unprotect key >> return (Return r))
            Effect m -> Effect (fmap loop m)
            Step f -> Step (fmap loop f)

onException is basically your bracketStream - run the stream as normal. If we get to the end we don't run the cleanup, but if we are terminated early or crash, run a cleanup. With that and a MonadMask instance on Stream, we can implement bracket.

It would appear that you basically need to export bracket, bracket_, etc, just as pipes-safe does, because you have different semantics in onException.

ocharles avatar Jan 26 '17 12:01 ocharles

To summon one more voice - @Gabriel439, am I correct in thinking that pipes-safe only does prompt finalisation if upstream terminates? That is, in the case of take (or something else downstream) terminating early, your only hope is to surround the offending composition with runSafeP?

ocharles avatar Jan 26 '17 13:01 ocharles

@ocharles: pipes-safe does not do prompt finalization at all. The only way the computation promptly finalizes is if it terminates successfully without upstream or downstream terminating prematurely.

You are correct that the only way to ensure prompt finalization if you expect premature termination is to surround the computation with runSafeP to bound when finalization occurs

As a side note, I just want to point out that pipes-safe would never be biased towards upstream or downstream because the underlying Proxy machinery is symmetric. In this case, it's equally bad in both directions :)

Gabriella439 avatar Jan 26 '17 17:01 Gabriella439

Does a MonadMask instance make sense? Part of the trouble with these classes and something like Stream and FreeT is reasoning about what could be happening with the completely general functor argument. I wish I could get enough time right now to think this through properly.

michaelt avatar Jan 30 '17 00:01 michaelt

Sorry, somehow the order of events had me misunderstand your MonadMask remark so I ended up reproducing its content. I think the order of events is now sound. The trouble would be how to constrain f.

michaelt avatar Feb 01 '17 14:02 michaelt

Sorry this has been a bit confused as I am working in minute intervals. As an experimental half-measure I put up a version with a MonadMask instance restricted to what I can definitely understand, name where f ~ (Of a) . I will experiment with whether it actually does anything desirable.

michaelt avatar Feb 01 '17 14:02 michaelt

I added bracket and onException to Streaming.Internal as you defined them, except following the absurdly narrow constraint I put on the MonadMask instance.

michaelt avatar Feb 01 '17 15:02 michaelt

Don't worry about the incoherent ramblings, it's still productive watching you think :) It's interesting to note that it's really f that becomes problematic. It does seem that we really want to say that f ~ (X, a)+, where X is some associated information at each functorial layer, and I write + to indicate "1 or more". That does seem to rule out MonadMask (Stream (Stream ..) ..), which we'd get with chunksOf, but that really is a stream of non-empty streams, which can't capture right now. It also seems to rule out groupsOf, which effectively forks the Stream. Hrm.

If you want to leave this out, I actually don't mind. I don't need a MonadMask instance for Stream, it's enough to just have bracket and onException (which I now call onTermination). These can make use of maskStream, which is mask, but specialised to Stream (Of _) _ _. What do you think about that?

ocharles avatar Feb 02 '17 20:02 ocharles

I wonder if (MonadMask f, MonadMask m) => MonadMask (Stream f m) is correct? After all, we can have MonadMask Of where mask a = a id... right? (CatchT does this to define a "pure" way to mask asynchronous exceptions).

ocharles avatar Feb 02 '17 20:02 ocharles

When promptness and resource management are an issue, I like to have continuation-accepting or Managed-returning function variants, like

streamWithOptionsAndParser :: (ToRow params, MonadMask m) 
                           => FoldOptions -> RowParser row -> Connection -> Query -> params 
                           -> (Stream (Of row) m () -> m r) -> m r

They are less flexible than MonadResource but in many cases they are enough.

danidiaz avatar Mar 09 '17 07:03 danidiaz

Daniel Díaz Carrete [email protected] writes:

When promptness and resource management are an issue, I like to have continuation-accepting or Managed-returning function variants, like

streamWithOptionsAndParser :: (ToRow params, MonadMask m) 
                           => FoldOptions -> RowParser row -> Connection -> Query -> params 
                           -> (Stream (Of row) m () -> m r) -> m r

They are less flexible than MonadResource but in many cases they are enough.

Indeed, maybe this is what I want.

ocharles avatar Mar 14 '17 16:03 ocharles

What would be needed to write something like this, or to make it simple to get to it? (Sorry I haven't had time yet to focus on this. )

michaelt avatar Mar 14 '17 18:03 michaelt

I meant to say, a problem with instance MonadMask f, MonadMask m is that it requires a Monad instance on Of to get things started, but this exists only for the Of m where m is a monoid.

michaelt avatar Mar 14 '17 18:03 michaelt

@ocharles, it's pretty easy to represent a nonempty stream:

data NES f m a =
    Step1 !(f (Stream f m a))
  | Effect1 (m (NES f m a))

Or, as a not-too-bad shortcut,

newtype NES f m a = NES (m (f (Stream f m a)))

treeowl avatar Sep 07 '17 17:09 treeowl