zio-actors icon indicating copy to clipboard operation
zio-actors copied to clipboard

Implement streaming with state changes

Open nightscape opened this issue 4 years ago • 11 comments

This is a draft of a "correct" streaming API where the state changes with the emitted stream elements. @softinio @mtsokol @mijicd could you have a look if this looks plausible?

I'm not sure what would be the correct behaviour of any element in the stream failing. Should the supervisor be called in that case or would we just continue streaming?

In order to maintain the current Stateful API, one could introduce an additional method receiveS (or any name you prefer) and do something like the following:

def receiveS[A](state: S, msg: F[A], context: Context): ActorResponse[R, S, A] = oneTimeResponse(receive(state, msg, context))
def receive[A](state: S, msg: F[A], context: Context): RIO[R, (S, A)] = ???

That way users could implement/override the new receiveS if they need the additional functionality or receive if they don't. The downside of this is that IDEs will not know that one of the two methods has to be overriden because both contain an implementation. I'd be happy for any suggestions 😃

nightscape avatar Sep 24 '20 00:09 nightscape

I'm also just writing a RemoteSpec for the ZStream case and it shows that my approach in https://github.com/zio/zio-actors/pull/269 was too naive 😉 I'm getting all kinds of ClassCastExceptions now because it tries to cast a single element from the stream into a ZStream.

nightscape avatar Sep 24 '20 09:09 nightscape

@softinio @mtsokol @mijicd I could need some help 😅 I've added a test that should stream 10 messages back to the requester, but after 2 messages it just stops. I couldn't find out yet what's happening, if there's a channel that's being closed, or my usage of ZStream#repeat or ZStream#takeWhile is incorrect. Would you have any idea?

nightscape avatar Sep 29 '20 09:09 nightscape

@nightscape Hi! Thanks for reminder! I'll have some time later this week to fully look at it. Do you mind waiting?

mtsokol avatar Sep 29 '20 10:09 mtsokol

Hi @mtsokol, thanks for the offer! I can do other things in the meantime, so later this week is fine :+1:

nightscape avatar Sep 29 '20 14:09 nightscape

Hi @mtsokol, did you already find some time to have a look at this? I don't need a full analyis or solution, but some hints that point me in the right direction could help me to figure this out :smiley:

nightscape avatar Oct 02 '20 18:10 nightscape

@nightscape Sorry for the delay (started new semester this week). Thanks for a ping, I wanted to do in-depth review of the PR but I ended up fighting with that issue you described (the test that should stream 10 messages but ends after 2). After a while of debugging I think that the issue is with .takeWhile from here.

Here are snippets reproducing it:

After changing it to takeUntill with e == StreamEnd condition all 10 elements from stream are send and received. But unfortunately test hangs after receiving StreamEnd on runCollect so there's still something about handling this stream (or in socket communication), will look at it.

I only focused on this issue so I'll deliver rest of the review.

[EDIT] I believe it's due to the implementation of ZStream#takeWhile, here's modified version that works (will ask about it): https://scastie.scala-lang.org/RHDosRy0TdGjMvOP881JWQ

mtsokol avatar Oct 02 '20 22:10 mtsokol

Great find! That actually looks like a bug in takeWhile. Thanks a lot for digging into this! I was always searching for some problem w.r.t. closed connections or the like and didn't take the implementation of any ZStream methods into the set of suspects.

nightscape avatar Oct 03 '20 08:10 nightscape

@nightscape I got this test passing but there's still one issue (with deserialization I guess?). The whole stream is producing Strings. If I augment producer site with: (stream ++ ZStream("StreamEnd")) instead of (stream ++ ZStream(StreamEnd)) it's all correctly consumed. Also on consumer site after .takeUntil I added .filterNot(_ == "StreamEnd") as takeUntil adds that termination message. After that test completes correctly. I don't know concrete reason of this and how to handle it correctly yet (maybe add wrapper stream message that is being send as protocol? WDYT?)

sealed trait StreamProtocol
case class StreamMsg(obj: Any) extends StreamProtocol
case object StreamEnd extends StreamProtocol

And .map this Stream in sendEnvelope after .filterNot to get inner messages? (I guess there's better solution) Let me know if you want me to send this snippet or if have more questions.

mtsokol avatar Oct 03 '20 09:10 mtsokol

@mtsokol is this what you had in mind? https://github.com/zio/zio-actors/pull/270/commits/97ed0eb64e9bed77433fe1438e7321e6b07800b2

nightscape avatar Oct 06 '20 15:10 nightscape

And I guess this should fix the takeWhile problem: https://github.com/zio/zio/pull/4273/files#diff-f8e5bb3d67d9b3c721d3cac422fb50f6R2502 Unfortunately, we can't rely on users using ZIO 1.0.2, so the current implementation is safer.

nightscape avatar Oct 06 '20 15:10 nightscape

From my side everything except supervision of streams is looking good.

nightscape avatar Oct 07 '20 21:10 nightscape