streamly
streamly copied to clipboard
streaming a restricted number of IO actions
hello,
I would like to stream the contents of a file, which contains a limited number of instances of a data structure. I would like then to process the file contents through several filters, and then zip the results altogether.
I tried to produce a minimalist example below to exhibit my problem.
The data structure Foo
simulates the file contents, which are streamed by streamFoo
.
Notice, in function test
, the use of S.take 10
to control the number of IO accesses.
I believe that my problem is not related to zipping. My example below zips two streams, but I believe that the issue is the same if I use other kinds of streams. I.e., the number of reads of Foo is greater than what I would expect.
data Foo = Foo
{ counter :: IORef Int
}
initFoo :: IO Foo
initFoo = do
c <- newIORef 0
return $ Foo c
readFoo :: Foo -> IO Int
readFoo foo = do
c <- readIORef (counter foo)
modifyIORef (counter foo) (+1)
putStrLn $ "Foo: counter = " ++ show c
return c
streamFoo :: MonadAsync m => Foo -> SerialT m Int
streamFoo foo = S.repeatM (liftIO $ readFoo foo)
compute :: MonadAsync m => SerialT m Int -> m [(Int, Int)]
compute stream = S.toList $ zipSerially $ (,)
<$> (serially $ p0 stream)
<*> (serially $ p1 stream)
p0, p1 :: MonadAsync m => SerialT m Int -> SerialT m Int
p0 = S.filter even
p1 = S.filter odd
test :: IO ()
test = do
foo <- initFoo
(compute $ S.take 10 $ streamFoo foo) >>= print
Observed behaviour:
>>> test
Foo: counter = 0
Foo: counter = 1
Foo: counter = 2
Foo: counter = 3
Foo: counter = 4
Foo: counter = 5
Foo: counter = 6
Foo: counter = 7
Foo: counter = 8
Foo: counter = 9
Foo: counter = 10
Foo: counter = 11
Foo: counter = 12
Foo: counter = 13
Foo: counter = 14
Foo: counter = 15
Foo: counter = 16
Foo: counter = 17
Foo: counter = 18
Foo: counter = 19
[(0,1),(2,3),(4,5),(6,7),(8,9),(10,11),(12,13),(14,15),(16,17),(18,19)]
Expected behaviour:
>>> test
Foo: counter = 0
Foo: counter = 1
Foo: counter = 2
Foo: counter = 3
Foo: counter = 4
Foo: counter = 5
Foo: counter = 6
Foo: counter = 7
Foo: counter = 8
Foo: counter = 9
[(0,1),(2,3),(4,5),(6,7),(8,9)]
It looks like I did not understand something. Please could you tell me what is wrong here?
A stream is a blueprint to run actions and generate elements.
S.take 10 (S.repeatM (liftIO $ readFoo foo))
is a blueprint to generate 10 elements from a given source.
Consider compute
,
compute :: MonadAsync m => SerialT m Int -> m [(Int, Int)]
compute stream = S.toList $ zipSerially $ (,)
<$> (serially $ p0 stream)
<*> (serially $ p1 stream)
Substituting stream
, p0
and p1
:
compute2 :: MonadAsync m => m [(Int, Int)]
compute2 = S.toList $ zipSerially $ (,)
<$> (serially $ S.filter even $ S.take 10 $ S.repeatM (liftIO $ readFoo foo)))
<*> (serially $ S.filter odd $ S.take 10 $ (S.repeatM (liftIO $ readFoo foo)))
compute
and compute2
are equivalent.
You are composing 2 streams of 10 elements each. Hence 20 actions in total.
@damiencourousse in the zip operation
compute stream = S.toList $ zipSerially $ (,)
<$> (serially $ p0 stream)
<*> (serially $ p1 stream)
serially $ p0 stream
would generate the first element of the stream as 0
and then the first element of serially $ p1 stream
would be generated. This element would be 1
as the counter has already been incremented to 1
. These two would then be zipped to create (0,1). After that again the second element of serially $ p0 stream
would be generated as 2
, because the counter has been increased to 2
already. And so on.
Thus, your first stream in the zip would be [0,2,4,..] and the second stream would be [1,3,5..], you are zipping these two and taking 10 elements from the result. So the output you are getting is correctly giving you the first ten elements.
You must note that the two streams are sharing the same mutable counter for stream generation. The result of the second one is impacted by the modification made to the counter by the first one.
Thank you @harendra-kumar and @adithyaov,
@adithyaov. Yes, I was missing the substitution, hence my misunderstanding.
I am looking for a way to stream the results of an IO computation into several concurrent (streamed) computations before merging them together into a single result stream.
In conduit, I came with this:
import Conduit
import Control.Monad.IO.Class (liftIO)
import Data.IORef
data Foo = Foo
{ counter :: IORef Int
}
initFoo :: IO Foo
initFoo = do
c <- newIORef 0
return $ Foo c
readFoo :: Foo -> IO Int
readFoo foo = do
c <- readIORef (counter foo)
modifyIORef (counter foo) (+1)
putStrLn $ "Foo: counter = " ++ show c
return c
streamFoo :: MonadIO m => Foo -> ConduitT i Int m ()
streamFoo foo = repeatMC (liftIO $ readFoo foo)
compute :: Monad m => ConduitT Int o m [(Int, Int)]
compute = getZipConduit $
zip <$> ZipConduit (p0 .| sinkList) <*> ZipConduit (p1 .| sinkList)
p0, p1 :: Monad m => ConduitT Int Int m ()
p0 = filterC even
p1 = filterC odd
test :: MonadUnliftIO m => m [(Int, Int)]
test = do
foo <- liftIO initFoo
runResourceT $ runConduit $ streamFoo foo .| takeC 10 .| compute
-- >>> test
-- Foo: counter = 0
-- Foo: counter = 1
-- Foo: counter = 2
-- Foo: counter = 3
-- Foo: counter = 4
-- Foo: counter = 5
-- Foo: counter = 6
-- Foo: counter = 7
-- Foo: counter = 8
-- Foo: counter = 9
-- [(0,1),(2,3),(4,5),(6,7),(8,9)]
With the latest version of streamly, Streamly.Internal.Data.Fold could provide something similar, but I would need a single output data stream instead of separated output streams of data.
import Data.IORef
import Streamly
import Streamly.Data.Fold (Fold)
import qualified Streamly.Data.Fold as FL
import qualified Streamly.Internal.Data.Fold as FL
import qualified Streamly.Internal.Prelude as S
data Foo = Foo
{ counter :: IORef Int
}
initFoo :: IO Foo
initFoo = do
c <- newIORef 0
return $ Foo c
readFoo :: Foo -> IO Int
readFoo foo = do
c <- readIORef (counter foo)
modifyIORef (counter foo) (+1)
putStrLn $ "Foo: counter = " ++ show c
return c
streamFoo :: MonadAsync m => Foo -> SerialT m Int
streamFoo foo = S.repeatM (liftIO $ readFoo foo)
p0, p1 :: Monad m => Fold m Int [Int]
p0 = FL.lfilter even FL.toList
p1 = FL.lfilter odd FL.toList
f :: Monad m => Fold m Int ([Int], [Int])
f = (,) <$> p0 <*> p1
test :: IO ()
test = do
foo <- initFoo
S.fold f (S.take 10 $ streamFoo foo) >>= print
-- >>> test
-- Foo: counter = 0
-- Foo: counter = 1
-- Foo: counter = 2
-- Foo: counter = 3
-- Foo: counter = 4
-- Foo: counter = 5
-- Foo: counter = 6
-- Foo: counter = 7
-- Foo: counter = 8
-- Foo: counter = 9
-- ([0,2,4,6,8],[1,3,5,7,9])
Is there a way to do this with streamly?
thank you,
You can change your zip function like this:
f :: Monad m => Fold m Int [(Int, Int)]
f = zip <$> p0 <*> p1
It will give you equivalent behavior. But note that both conduit and streamly examples as you wrote above would accumulate the full lists in memory before zipping them.
Yes, thank you, I completely missed the use of zip
.
As far as I understand, the list accumulation is due to the use of FL.toList
. Right?
One difference in those two examples is that in the case of Conduit, I get back a stream (ConduitT) after zipping, whereas in the case of Streamly, I get back data in the underlying monad m
.
Is it possible to use a sort of split/join execution model with streamly?
Closing the issue since my last question it out of the scope of the initial issue.
Thank you!
One difference in those two examples is that in the case of Conduit, I get back a stream (ConduitT) after zipping,
In this particular example, it is a single element stream, the single element in the stream is an accumulated list, so it is equivalent to what you get in the fold example.
Is it possible to use a sort of split/join execution model with streamly?
It should be possible to do it using some internal functions as of now. An idiomatic way to do it concurrently will be coming soon.
Is it possible to use a sort of split/join execution model with streamly?
It should be possible to do it using some internal functions as of now. An idiomatic way to do it concurrently will be coming soon.
Great! I'm looking forward to this!
Thank you again for your feedbacks.
Let us keep this open until we have concurrent split and zip functionality committed.