zio-actors
zio-actors copied to clipboard
Implement streaming with state changes
This is a draft of a "correct" streaming API where the state changes with the emitted stream elements. @softinio @mtsokol @mijicd could you have a look if this looks plausible?
I'm not sure what would be the correct behaviour of any element in the stream failing. Should the supervisor be called in that case or would we just continue streaming?
In order to maintain the current Stateful
API, one could introduce an additional method receiveS
(or any name you prefer) and do something like the following:
def receiveS[A](state: S, msg: F[A], context: Context): ActorResponse[R, S, A] = oneTimeResponse(receive(state, msg, context))
def receive[A](state: S, msg: F[A], context: Context): RIO[R, (S, A)] = ???
That way users could implement/override the new receiveS
if they need the additional functionality or receive
if they don't.
The downside of this is that IDEs will not know that one of the two methods has to be overriden because both contain an implementation.
I'd be happy for any suggestions 😃
I'm also just writing a RemoteSpec
for the ZStream
case and it shows that my approach in https://github.com/zio/zio-actors/pull/269 was too naive 😉
I'm getting all kinds of ClassCastExceptions
now because it tries to cast a single element from the stream into a ZStream
.
@softinio @mtsokol @mijicd I could need some help 😅
I've added a test that should stream 10 messages back to the requester, but after 2 messages it just stops.
I couldn't find out yet what's happening, if there's a channel that's being closed, or my usage of ZStream#repeat
or ZStream#takeWhile
is incorrect.
Would you have any idea?
@nightscape Hi! Thanks for reminder! I'll have some time later this week to fully look at it. Do you mind waiting?
Hi @mtsokol, thanks for the offer! I can do other things in the meantime, so later this week is fine :+1:
Hi @mtsokol, did you already find some time to have a look at this? I don't need a full analyis or solution, but some hints that point me in the right direction could help me to figure this out :smiley:
@nightscape Sorry for the delay (started new semester this week).
Thanks for a ping, I wanted to do in-depth review of the PR but I ended up fighting with that issue you described (the test that should stream 10 messages but ends after 2). After a while of debugging I think that the issue is with .takeWhile
from here.
Here are snippets reproducing it:
- with takeWhile
- with takeUntil WDYT?
After changing it to takeUntill
with e == StreamEnd
condition all 10 elements from stream are send and received. But unfortunately test hangs after receiving StreamEnd
on runCollect
so there's still something about handling this stream (or in socket communication), will look at it.
I only focused on this issue so I'll deliver rest of the review.
[EDIT]
I believe it's due to the implementation of ZStream#takeWhile
, here's modified version that works (will ask about it):
https://scastie.scala-lang.org/RHDosRy0TdGjMvOP881JWQ
Great find! That actually looks like a bug in takeWhile. Thanks a lot for digging into this! I was always searching for some problem w.r.t. closed connections or the like and didn't take the implementation of any ZStream methods into the set of suspects.
@nightscape I got this test passing but there's still one issue (with deserialization I guess?). The whole stream is producing Strings. If I augment producer site with: (stream ++ ZStream("StreamEnd"))
instead of (stream ++ ZStream(StreamEnd))
it's all correctly consumed. Also on consumer site after .takeUntil
I added .filterNot(_ == "StreamEnd")
as takeUntil
adds that termination message. After that test completes correctly.
I don't know concrete reason of this and how to handle it correctly yet
(maybe add wrapper stream message that is being send as protocol? WDYT?)
sealed trait StreamProtocol
case class StreamMsg(obj: Any) extends StreamProtocol
case object StreamEnd extends StreamProtocol
And .map
this Stream in sendEnvelope
after .filterNot
to get inner messages? (I guess there's better solution)
Let me know if you want me to send this snippet or if have more questions.
@mtsokol is this what you had in mind? https://github.com/zio/zio-actors/pull/270/commits/97ed0eb64e9bed77433fe1438e7321e6b07800b2
And I guess this should fix the takeWhile
problem: https://github.com/zio/zio/pull/4273/files#diff-f8e5bb3d67d9b3c721d3cac422fb50f6R2502
Unfortunately, we can't rely on users using ZIO 1.0.2, so the current implementation is safer.
From my side everything except supervision of streams is looking good.