`uncons`ing stream and rebuilding it again changes the number of effects executed
I am trying to create a function that would modify the last element of the stream. I came up with the following solution using uncons:
-- | Increment the last char of the stream by one
foo :: S.SerialT IO Char -> S.SerialT IO Char
foo stream = S.bracket (putStrLn "init") (\_ -> putStrLn "fini") $ \_ ->
lift (S.uncons stream) >>= \case
Nothing -> S.nil
Just (x, rest) -> lift (S.null rest) >>= \case
True -> S.yield (succ x)
False -> S.yield x <> foo rest
Note that I also added a bracket. My idea was that the initialisation and finalisation will happen “around” my stream processing function. However, it is not the case:
main :: IO ()
main = do
let input = S.fromFoldable ['h', 'e', 'l', 'l', 'o']
output <- S.toList . foo $ input
print $ output == "hellp"
putStrLn output
When I run the program above I get:
init
init
init
init
init
fini
fini
fini
fini
fini
True
hellp
So it looks like initialisation and finalisation are happening for each individual element, which is totally not what I expected.
Is this the intended behaviour? If yes, then, I think, it would be great to update the documentation to explain how this works, and also I would be grateful for suggestions on how to implement the behaviour that I need.
With regards to how it can be written,
import Streamly.Internal.Data.Stream.IsStream (IsStream)
import Streamly.Internal.Data.Stream.StreamD as D
data ModifyLastState a s = MLStop | MLBuffer a s | MLNoBuffer s
{-# INLINE modifyLast #-}
modifyLast :: (IsStream t, Monad m) => (a -> a) -> t m a -> t m a
modifyLast f s = D.fromStreamD $ modifyLastD f $ D.toStreamD s
{-# INLINE [1] modifyLastD #-}
modifyLastD :: Monad m => (a -> a) -> D.Stream m a -> D.Stream m a
modifyLastD f (D.Stream step state) = D.Stream step1 (MLNoBuffer state)
where
{-# INLINE [0] step1 #-}
step1 gst (MLNoBuffer s) = do
r <- step gst s
return
$ case r of
D.Yield a s1 -> D.Skip (MLBuffer a s1)
D.Skip s1 -> D.Skip (MLNoBuffer s1)
D.Stop -> D.Stop
step1 gst (MLBuffer e s) = do
r <- step gst s
return
$ case r of
D.Yield a s1 -> D.Yield e (MLBuffer a s1)
D.Skip s1 -> D.Skip (MLBuffer e s1)
D.Stop -> D.Yield (f e) MLStop
step1 _ MLStop = return D.Stop
This is not the intended behavior, I suspect this has something to with the monadic instance. Will get back to you regarding this.
We could probably add some kind of an update function to the library or some general combinators which buffer and process the stream.
This will not have the same problem.
foo :: S.SerialT IO Char -> S.SerialT IO Char
foo stream =
S.bracket (putStrLn "init") (\_ -> putStrLn "fini")
$ \_ -> modifyLast succ stream
I don't think there is an efficient way to do this without using some internal
functions. The INLINE calls are stage-specific to help the stream fuse and
please GHC. You can find more information in the docs directory.
If efficiency is not a concern then there are many ways to do this. One way is to:
- Get the length of the stream
- Zip the stream with an enumeration combinator
- Map over the stream and change the required element
- Map over the stream to discard the enumeration
Another way is to do what you are doing but there seems to be some problem with that.
Ok, thanks for the advice, I’ll go with an implementation that digs into internals for now then.
Please, let me know what you decide on the issue in general. I think being able to implement this kind of behaviour using the public interface would be useful. Getting the length of the stream is not ideal, of course :).
You are calling the function foo recursively in your code which is nesting the bracket calls and is the reason why you're seeing that behaviour. You should move the recursion inside to just recurse over the uncons'ing and not the whole stream.
Performance wise an explicit implementation should be way faster than the one that uses bracket.
@pranaysashank Oh, you are right! I was minimising the example I had and minimised it incorrectly. In my original code that was behaving unexpectedly the recursive call was done properly, without multiple brackets. Let me try to reproduce this again...
Ok, the reproduction is going to be more complex, because it involves effects. Basically, what is happening is that I have a stream of 2 elements, I pass them through this “special mapM” twice, and I see the effect performed 5 times in total – 3 of them to the original stream (even though it ends up producing only 2 elements as expected).
Turns out, this has nothing to do with bracket indeed. I thought it was the case because of the state involved, but, actually, bracket works exactly as expected. It looks like the problem is only in uncons.
-- | Like @mapM@ but applies a different function to the last item.
mapLastSpecialM
:: forall m a b. S.MonadAsync m
=> m () -- ^ What to do if the stream is empty.
-> (Bool -> a -> m b) -- ^ Function to map over items. @True@ means last.
-> S.SerialT m a -> S.SerialT m b
mapLastSpecialM empty f = go
where
go :: S.SerialT m a -> S.SerialT m b
go stream = lift (S.uncons stream) >>= \case
Nothing -> lift empty *> S.nil
Just (x, rest) -> lift (S.null rest) >>= \case
False -> S.yieldM (f False x) <> go rest
True -> S.yieldM (f True x)
-- | Kinda like scanl (+) 0, but multiplies the last item by 10.
weirdSum :: S.SerialT IO Int -> S.SerialT IO Int
weirdSum stream = do
counter <- lift $ newIORef 0
mapLastSpecialM
(error "empty")
(\isLast x -> do
let val = if isLast then x * 10 else x
modifyIORef counter (+ val)
readIORef counter)
stream
main :: IO ()
main = do
let input = S.fromFoldable [1, 2]
output <- S.toList . weirdSum . weirdSum $ input
-- Input : [1, 2]
-- weirdSum [1, 2] = [1, 1 + 2*10] = [1, 21]
-- weirdSum [1, 21] = [1, 1 + 21*10] = [1, 211]
print $ output == [1, 211]
print output -- prints [1, 411] because `modifyIORef counter (+ 20)` executes twice
I suspect that this is a fundamental problem and an uncons-based implementation just can’t work. I think this is actually the reason why conduit has leftovers in its pipes – we want to inspect the following element and remember it somewhere.
One way to allow an efficient implementation of this function is to have zipBoth :: SerialT m a -> SerialT m b -> SerialT m (EitherBoth a b), where data EitherBoth a b = First a | Second b | Both a b. Then mapLastSpecialM can be implemented through zipBoth stream (tail stream).
The thing to remember is that streams are not persistent, when you check S.null rest the stream's first element is consumed (that means for a monadic stream the first monadic action is executed) and then you pass the same stream rest in go rest which causes the same monadic action to execute again.
One way to rewrite this would be to pass the uncons'd element as an arg to the go function and pass it to f depending on whether uncons rest is Nothing or not. It can be internally implemented in a similar way.
S.null should either have docs warning about this or be completely removed.
scan is the solution to most stateful problems. However, the problem with standard scan functions is that they cannot run an action when the stream ends. We have a special scan function that is capable of doing that:
scanlMAfter' :: (IsStream t, Monad m) => (b -> a -> m b) -> m b -> (b -> m b) -> t m a -> t m b
It runs the b -> m b (called done) action and emits the result when the stream ends. This can be used to flush the buffered state of the scan when the stream ends.
For your specific problem, you can buffer the first element and emit Nothing at the first element. Then emit Just b for each new element processed, where b is the previously buffered item. In the done action you just emit the buffered item as Just. Then apply catMaybes on the resulting stream to get rid of the Maybe.
This can also be used to map on last n elements. In that case you will have to buffer n elements and emit streams of single elements once the buffer is full, in the end you emit all the buffered items as a stream. Then apply concat on the resulting stream.
One way to allow an efficient implementation of this function is to have zipBoth :: SerialT m a -> SerialT m b -> SerialT m (EitherBoth a b), where data EitherBoth a b = First a | Second b | Both a b. Then mapLastSpecialM can be implemented through zipBoth stream (tail stream).
This won't work because we will be using the input stream twice, so the effects will run twice.
This won't work
Yes, of course :(.
scanlMAfter'
Huh, yes, this function looks very promising. I can see a couple different ways to implement what I need with its help, however all of those ways, e.g. the one you suggested, are not particularly straightforward, so I’m wondering about efficiency. I guess I’ll just run some benchmarks to see.
Actually, @pranaysashank’s comment made me realise, that the issue, indeed, was merely due to my use of null, that forces the first action, and then reusing the same stream. All I need to do is make sure the action is not forced twice, so I can simply use uncons again instead of null:
-- | Like @mapM@ but applies a different function to the last item.
mapLastSpecialM
:: forall m a b. MonadAsync m
=> m () -- ^ What to do if the stream is empty.
-> (Bool -> a -> m b) -- ^ Function to map over items. @True@ means last.
-> SerialT m a -> SerialT m b
mapLastSpecialM empty f stream =
lift (S.uncons stream) >>= \case
Nothing -> lift empty *> S.nil
Just (x, rest) -> go x rest
where
go :: a -> SerialT m a -> SerialT m b
go x stream' =
lift (S.uncons stream') >>= \case
Nothing -> S.yieldM (f True x)
Just (x', rest) -> S.yieldM (f False x) <> go x' rest
This function seems to work exactly as expected. The only concern, once again, is its performance compared to a direct StreamD implementation.
You have to first consider whether the performance is good enough for the given use case, if it is then you do not need to do anything. But you can if you want to.
all of those ways, e.g. the one you suggested, are not particularly straightforward, so I’m wondering about efficiency.
In general, scan operations can fuse with filter/catMaybe or whatever else, so the Maybe wrapper should get completely eliminated. You may have to make sure that the accumulator of the scan is strict, if it does not fuse you can compile with the fusion-plugin.
If it still does not fuse you can use what @adithyaov suggested above. At some point we will officially expose low level StreamD/StreamK primitives for general use, writing your own operations using primitives is not a bad thing. We can also see if writing this class of operations in the library itself makes sense.