streamly icon indicating copy to clipboard operation
streamly copied to clipboard

Strange caveat in docs for foldr (and its impact on real-life usage)?

Open saurabhnanda opened this issue 9 months ago • 20 comments

From https://www.stackage.org/haddock/lts-18.28/streamly-0.7.3/Streamly-Prelude.html#v:foldr

Please avoid using this routine in strict monads like IO unless you need a strict right fold. This is provided only for use in lazy monads (e.g. Identity) or pure streams. Note that with this signature it is not possible to implement a lazy foldr when the monad m is strict. In that case it would be strict in its accumulator and therefore would necessarily consume all its input.

I finally managed to use Streamly to fold over a DB query executed via Opaleye, as such:

import qualified Opaleye as O
import Streamly
import qualified Streamly as S

O.runQueryFold conn qry S.nil $ \stream x -> pure $ S.cons x stream

Would this actually stream rows from the DB, or will it end-up accumulating everything in memory?

saurabhnanda avatar Feb 16 '25 14:02 saurabhnanda

Questions:

  1. you are using a very old version of streamly - any specific reason for that?
  2. can you provide a pointer to the source of runQueryFold?

harendra-kumar avatar Feb 16 '25 14:02 harendra-kumar

@harendra-kumar

  1. That version of streamly is what comes with Stack LTS 18.28 -- we haven't upgrade out entire toolchain because this seems to be working fine atm
  2. https://hackage.haskell.org/package/opaleye-0.6.1.0/docs/src/Opaleye-RunQuery.html#runQueryFold => https://hackage.haskell.org/package/opaleye-0.6.1.0/docs/src/Opaleye-RunQuery.html#runQueryFoldExplicit => which seems to be using PGS.foldWith_ internally

saurabhnanda avatar Feb 16 '25 14:02 saurabhnanda

This will build the entire stream first and then return it. It is not streaming. You should try using something like unfoldrM to generate a stream.

harendra-kumar avatar Feb 16 '25 14:02 harendra-kumar

@harendra-kumar how did you determine this? And why isn't the type-system preventing me from making this mistake? Is the underlying issue with how Opaleye.runQueryFold is written, or how I'm using Streamly.foldr?

saurabhnanda avatar Feb 16 '25 14:02 saurabhnanda

IO is a strict monad, so your fold is just like a statement in a C program, it will be evaluated completely before you move on to the next statement in IO. Consider this signature:

runQueryFoldExplicit
  :: QueryRunner columns haskells
  -> PGS.Connection
  -> Query columns
  -> b
  -> (b -> haskells -> IO b)
  -> IO b

A call to runQueryFoldExplicit is essentially a loop running the step b -> haskells -> IO b and finally returning IO b. This loop will run just like any for/while loop in C and return you a b value. In your case the step function is consing an element to the stream, you will keep consing until the fold is done and then return that constructed stream as the value b. This output stream can then be consumed in a stream fashion, but there is no point because you have already consumed the entire input.

When you are streaming you usually run one stream generation step and then the control is transferred to the next stage in the stream processing pipeline. Then the stream will loop back to the next generation step after that iteration is done. unfoldrM (https://hackage.haskell.org/package/streamly-0.7.3/docs/Streamly-Prelude.html#v:unfoldrM) will do this.

harendra-kumar avatar Feb 16 '25 15:02 harendra-kumar

The function runQueryFoldExplicit wants you to do all the processing in the b -> haskells -> IO b step and only return a final value b after the entire processing is done. If you want to use this function and compose your processing in a streaming manner then you have to essentially compose your streams in this step function itself. And, you can do that very well using the Fold type in streamly. Fold is basically like Stream but it is consumer side stream rather than producer side stream.

What you need to do is convert the runQueryFoldExplicit function into a streaming Fold using:

mkFoldId :: Monad m => (b -> a -> m b) -> m b -> Fold m a b

Then you can use routines from the Fold module to compose and process the stream. Fold and Stream provide you equivalent functionality, they just compose in opposite manner. If some Fold functionality is not present in 0.7.3. you can check out newer versions and port it back to 0.7.3 or upgrade.

harendra-kumar avatar Feb 16 '25 15:02 harendra-kumar

@harendra-kumar thanks for the explanation. Yes - it makes sense now. Does something like this look like a better implementation?

  1. Although I'm not sure if I'm mixing Streamly.unfoldrM and Opaleye.foldForward corectly
  2. And how do I ensure that Opaleye.closeCursor is called even if the steram is not fully consumed?
runQueryStream qry = do
  conn <- getDbConnection
  cursor <- liftIO $ Opaleye.declareCursor conn qry 
  pure $ (flip S.unfoldrM) (True, cursor) $ \(hasMore, cur) -> do
    case hasMore of
      False -> Opaleye.closeCursor cur >> pure Nothing
      True -> do
        Opaleye.foldForward cur 1 (\_ haskells -> pure $ Just haskells) Nothing >>= \case
          Left Nothing -> Opaleye.closeCursor cur >> pure Nothing
          Right Nothing -> Opaleye.closeCursor cur >> pure Nothing
          Left (Just haskells) -> pure $ Just (haskells, (False, cur))
          Right (Just haskells) -> pure $ Just (haskells, (True, cur))

saurabhnanda avatar Feb 16 '25 15:02 saurabhnanda

Looks good.

  1. is Right Nothing possible?
  2. don't you need to call closeCursor in Left (Just haskells) case as well?

You can use Streamly.Prelude.bracket or Streamly.Prelude.finally to close the cursor.

Also, make sure to INLINE the functions part of the stream pipeline stages so that the stages can fuse.

harendra-kumar avatar Feb 16 '25 17:02 harendra-kumar

@harendra-kumar does this seem better?

runQueryStream qry = do
  conn <- getDbConnection
  cursor <- liftIO $ Opaleye.declareCursor conn qry 
  pure $ S.finallyIO (Opaleye.closeCursor cursor) $ (flip S.unfoldrM) (True, cursor) $ \(hasMore, cur) -> do
    case hasMore of
      False -> pure Nothing
      True -> do
        Opaleye.foldForward cur 1 (\_ haskells -> pure $ Just haskells) Nothing >>= \case
          Left Nothing -> pure Nothing
          Right Nothing ->pure Nothing
          Left (Just haskells) -> pure $ Just (haskells, (False, cur))
          Right (Just haskells) -> pure $ Just (haskells, (True, cur))

saurabhnanda avatar Feb 17 '25 12:02 saurabhnanda

@harendra-kumar should I add {-# INLINE runQueryStream #-} to ensure that stages can fuse?

saurabhnanda avatar Feb 17 '25 12:02 saurabhnanda

  1. You can use bracketIO instead, and open the cursor inside the bracket itself. In fact this code is very similar to file opening and reading code. It has exactly the same structure. Follow this function here. There you open a file handle, read the file and close it. Here you open a db cursor, read the db and close it.

  2. It will be better if runQueryStream has type ... -> Stream m a instead of ... -> m (Stream m a). You can push the monadic code inside bracketIO and you will get the former type.

As a thumb rule any function whose return type is a stream or if it takes a stream as an argument, should be inlined. After the (2) change your function's return type would be Stream m a, so it should be inlined. Note if it were m (Stream m a) then you do not need to inline it, as it won't fuse anyway - in that case it is an action returning a stream and not a stream itself.

harendra-kumar avatar Feb 17 '25 13:02 harendra-kumar

@harendra-kumar even I feel weird about m (SerialT IO a), but all of this ultimately needs to align with Servant's streaming. All examples that I have seen online seem to assume SourceIO a and not SourceT m a. So, even though I'd like to stick to my app's custom monad m, I am unable to ultimately get it to work with Servant.

If my call sites look like the following, will it still end-up generating efficient code?

do
  stream <- runQueryStream whatever
  pure $ S.map whateverElse $ S.filter whateverElse stream

saurabhnanda avatar Feb 17 '25 14:02 saurabhnanda

I am not sure if this will fuse, because of the monad bind - maybe latest GHCs can do it. To be sure we will have to make a small example and check the core. I have not tried this before.

All examples that I have seen online seem to assume SourceIO a and not SourceT m a

But anyway, we have type SourceIO = SourceT IO . I do not understand the real reason why runQueryStream could not be of type Stream IO a . Anyway you can write this function as Stream IO a type and if something demands IO (Stream IO a) then you can use return . runQueryStream.

harendra-kumar avatar Feb 17 '25 14:02 harendra-kumar

@harendra-kumar here's the complete function definition as of now. The only reason this needs to be a m (SerialT IO haskell) is because of the call to getDbConnection. I can pass-in the Connection as an explicit argument from the call-site and probably reduce this function's signature to SerialT IO haskells (if I also use S.bracketIO). I'm just scratching my head as to why this would make it more efficient. Shouldn't the haskell compiler be figuring out all of this for me?

{-# INLINE runQueryStream #-}
runQueryStream :: 
  ( Default Opaleye.QueryRunner columns haskells
  , HasDatabase m
  , Default Opaleye.Unpackspec columns columns
  )
  => Opaleye.Query columns 
  -> m (SerialT IO haskells)
runQueryStream qry = do
  conn <- getDbConnection
  cursor <- liftIO $ Opaleye.declareCursor conn qry
  pure $ S.finallyIO (Opaleye.closeCursor cursor) $ (flip S.unfoldrM) (True, cursor) $ \(hasMore, cur) -> do
    case hasMore of
      False -> pure Nothing
      True -> do
        Opaleye.foldForward cur 1 (\_ haskells -> pure $ Just haskells) Nothing >>= \case
          Left Nothing -> pure Nothing
          Right Nothing ->pure Nothing
          Left (Just haskells) -> pure $ Just (haskells, (False, cur))
          Right (Just haskells) -> pure $ Just (haskells, (True, cur))

saurabhnanda avatar Feb 18 '25 02:02 saurabhnanda

@harendra-kumar here's the updated version of the runQueryStream function...

{-# INLINE runQueryStream #-}
runQueryStream :: ( Default Opaleye.QueryRunner columns haskells
                  , Default Opaleye.Unpackspec columns columns
                  )
               => Connection
               -> Opaleye.Query columns 
               -> SerialT IO haskells
runQueryStream conn qry = do
  S.bracketIO (Opaleye.declareCursor conn qry) Opaleye.closeCursor $ \cursor -> do
    (flip S.unfoldrM) (True, cursor) $ \(hasMore, cur) -> do
      case hasMore of
        False -> pure Nothing
        True -> do
          Opaleye.foldForward cur 1 (\_ haskells -> pure $ Just haskells) Nothing >>= \case
            Left Nothing -> pure Nothing
            Right Nothing ->pure Nothing
            Left (Just haskells) -> pure $ Just (haskells, (False, cur))
            Right (Just haskells) -> pure $ Just (haskells, (True, cur))

...and here's an actual call-site...

locationQueryStream :: (HasDatabase m) => ClientId -> m (SerialT IO Text)
locationQueryStream cid = do
  conn <- getDbConnection
  pure $ S.mapMaybe (fmap extractLocationQueries) $ Foundation.runQueryStream conn qry      
  where
    qry = filterAndSelectQ tableForTrip (\t -> t ^. clientId .== constant cid) (\t -> t ^. locationDataFromGmaps)

    extractLocationQueries :: Aeson.Value -> Text
    extractLocationQueries v = undefined

saurabhnanda avatar Feb 18 '25 02:02 saurabhnanda

You have shifted the problem to locationQueryStream, now it is returning m(SerialT IO). There is a simple solution to this. You do not need to call getDbConnection at call site. You can call it in the alloc part of bracketIO. That is what I was suggesting earlier as well.

runQueryStream qry =
 let alloc = do
            conn <- getDbConnection
            Opaleye.declareCursor conn qry
  S.bracketIO alloc ( Opaleye.closeCursor $ \cursor -> do

The simple rule is that you do not break a stream pipeline to perform an action and return the stream from the monad. If you do that then you are deliberately breaking a fused pipeline. You need to compose all the stream functions without cutting the pipeline by a monad. Stream has an underlying monad and you should perform any action that you need in that monad and not outside of the stream. Stream provides all functionality to enable you to perform those actions in the underlying monad. In this case that bracketIO enables you to perform any monadic actions before the pipeline starts.

Note that breaking fusion may have almost zero impact in many cases where the stream is carrying bigger chunks which means it runs less often. For example, a stream of 1000, 1KB items would not be affected much by breaking the pipeline, but a stream of 1 million 1 byte items would become very inefficient, though the total amount of data flowing is the same. The overhead of copying intermediate structure due to breaking the stream fusion is proportional to the number of times the stream loop runs. So fusion is more important in loops with larger number of iterations. This is more evident in imperative programming loops as the programmer sees it right in front doing the work in the body of each iteration of the loop and huge number of iterations, but stream being an abstraction hiding the loop under it, programmers do not see it as easily.

No, it is not easy for the compiler to do this.

harendra-kumar avatar Feb 18 '25 11:02 harendra-kumar

You do not need to call getDbConnection at call site. You can call it in the alloc part of bracketIO. That is what I was suggesting earlier as well.

Won't I have to juggle around with liftIO and UnliftIO.withRunInIO to make all of the following align?

getDbConnection :: (HasDatabase m) => m Connection

declareCursor :: Connection > IO Cursor

closeCursor :: Cursor -> IO ()

foldForward :: Cursor -> Int -> (a ->haskells -> IO a) -> a > Either a a

Even if I get a working version of runQueryStream to return a type of SerialT m haskells, I won't know how to connect it to servant's SourceT m a?

The simple rule is that you do not break a stream pipeline to perform an action and return the stream from the monad. If you do that then you are deliberately breaking a fused pipeline. You need to compose all the stream functions without cutting the pipeline by a monad.

Does this break stream fusion?

--
-- Servant <=> Streamly support
--

instance ToSourceIO a (S.SerialT IO a) where
  toSourceIO :: S.SerialT IO a -> SourceIO a
  toSourceIO stream = 
    Stream.fromStepT $ Stream.Effect $ S.foldr (\chunk nextStep -> Stream.Yield chunk nextStep) Stream.Stop stream

saurabhnanda avatar Feb 18 '25 14:02 saurabhnanda

Won't I have to juggle around with liftIO and UnliftIO.withRunInIO to make all of the following align?

I did not know the types and UnliftIO issue.

Does this break stream fusion?

We are handing over data from one stream type to another here, so there is no question of fusion, data will be copied. As long as you stay in the Stream type, all operations are fused. Once you are out of it, then it depends on the type you are using.

As I said earlier fusion may not matter at all for you. Let me explain in a bit more detail. A fused stream is like a super highway for data processing, any fusion breaking point is like a toll tax point or a junction to transfer data items from one highway to another. You pay some cost overhead per data item at this junction. Whether this cost is important or not depends on how much cost you are paying in the rest of the processing. To give you some idea, we can roughly say that this cost is of the order of 30-50 ns for typical data structures. If your stream is carrying one DB row at a time and the processing of one row is of the order of a few microseconds or more then the fusion break won't matter, the toll tax is less than 0.05% of the cost of the entire trip. If the processing of each item takes something of the order of 100 ns or so then it becomes significant. My guess is that your servant processing will take much longer. So do not worry about fusion, worry about optimizing the rest of the processing and then about fusion.

harendra-kumar avatar Feb 18 '25 17:02 harendra-kumar

@harendra-kumar

Won't I have to juggle around with liftIO and UnliftIO.withRunInIO to make all of the following align?

I did not know the types and UnliftIO issue.

Just re-confirming that with foldForward having the following type, I will have to deal with runInIO, right?

foldForward :: Cursor haskells -> Int -> (a -> haskells -> IO a) -> a -> IO (Either a a)

We are handing over data from one stream type to another here, so there is no question of fusion, data will be copied. As long as you stay in the Stream type, all operations are fused. Once you are out of it, then it depends on the type you are using.

Is there any other way that you can recommend tying a (MonadIO m) => SerialT m a and Servant's SourceT m a such that it is efficient? We are trying to introduce streaming into our Haskell app for real-time operations (earlier we used to use it only for batch jobs which ran in the background), and are looking at writing the core building blocks properly. For writing the ToSourceIO definition above, I took inspiration from the servant-streamly package at https://github.com/blackheaven/servant-streamly/blob/a727245746c27544012fa6915c3bd6b2b1425b0f/src/Servant/Streamly.hs#L31-L35 -- is this the most efficient way to integrate streamly <=> servant?

If there is no way to do this without breaking Stream fusion, then as you said, we have reduce the impact of the "toll tax". We would change the underlying blocks to pull 100-500 rows from the DB stream in each step/execution. This is pretty much what Rails' find_each does.

PS: I guess now the discussion is going into another territory, but as long as you are okay with continuing this, it is a very good learning experience for me :-)

saurabhnanda avatar Feb 19 '25 03:02 saurabhnanda

If you want to run a monad m function in IO monad then you will have to "run that monad" to lower it to IO and if you want to come back again to IO then you have to lift IO to m. The transition from m to IO to m would require some sort of unlift functionality like unliftio or monad-control. So if there some situation in your usage then you need it.

You can go with your current code, it is reasonable even with the monad bind in between. I would suggest the following:

  • The core of your processing is the processing of an SQL row in the servant IO code, you need to focus on optimizing that part of the code first. And then possibly measure how much CPU time it is taking.
  • You are only using streamly up to the point where it is handed over the servant stream, so any performance benefits of guarantees of streamly apply only up to that point. I have not looked into servant's stream representation sufficiently to say whether it fuses or how well it performs. It might be good enough, only measurement can tell. Also, not sure how much streaming functionality, combinators it provides to do useful stuff.
  • If you are not using any significant processing before the servant stream handover point then you probably do not need streamly. You can just directly glue your foldForward code with servant stream, it would be more efficient. Maybe you want to use streamly for concurrent processing of rows? In that case it can help but then you do not need to worry about fusion at all, because concurrent processing overhead is much higher than a fusion break.
  • Optimize only if it makes a difference. You can try directly calling runQueryFold/Explicit and see how much performance you get; and then use a streaming impl based on foldForward and check the difference. runQueryFold/Explicit is as fast as you can get theoretically.
  • try chunked functionality i.e. reading many rows at a time, this will reduce the overhead of the code path from DB to actual processing point. You can at least try this and measure how much performance difference it is making. You can use the streamly unfoldMany or concatMap to process a list of rows by mapping a single-row processing function. Chunked processing is almost always beneficial.
  • Use streamly 0.10, it should work with any GHC version, you can put this as extra-dep in your stack.yaml . If you use stream < 0.9 then a later upgrade may become tedious. Also the newer versions have much more useful functionality.

I guess it would be more efficient if we discuss this on a call.

harendra-kumar avatar Feb 19 '25 13:02 harendra-kumar