redis4cats icon indicating copy to clipboard operation
redis4cats copied to clipboard

Flaky streaming test

Open gvolpe opened this issue 3 years ago • 2 comments

dev.profunktor.redis4cats.RedisStreamSpec:
==> X dev.profunktor.redis4cats.RedisStreamSpec.append/read to/from a stream  3.219s java.util.NoSuchElementException: null
    at fs2.Stream$CompileOps.$anonfun$lastOrError$3(Stream.scala:4329)
    at scala.Option.fold(Option.scala:251)
    at fs2.Stream$CompileOps.$anonfun$lastOrError$2(Stream.scala:4329)

https://github.com/profunktor/redis4cats/runs/1805553054?check_suite_focus=true

gvolpe avatar Feb 01 '21 09:02 gvolpe

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

stale[bot] avatar Apr 02 '21 10:04 stale[bot]

Still seeing this

dev.profunktor.redis4cats.RedisStreamSpec:
==> X dev.profunktor.redis4cats.RedisStreamSpec.append/read to/from a stream  3.212s java.util.NoSuchElementException: null
    at fs2.Stream$CompileOps.$anonfun$lastOrError$3(Stream.scala:4401)
    at scala.Option.fold(Option.scala:251)
    at fs2.Stream$CompileOps.$anonfun$lastOrError$2(Stream.scala:4401)

gvolpe avatar Jul 01 '21 14:07 gvolpe

I was playing around with this. Somehow, setting the read to block = Some(1.millis) removes the flakiness (tested via 100+ runs). This doesn't make sense to me, since the default is block = Some(Duration.Zero), i.e. wait forever.

If we use N messages instead of 1, and say interruptAfter(1000.seconds), then .take(N - 1) makes it "work", while .take(N) leads to hanging forever.

Question: why is there a repeat here? https://github.com/profunktor/redis4cats/blob/296b6fe11e85a13c24c69ad90b018e4576b14e8a/modules/streams/src/main/scala/dev/profunktor/redis4cats/streams/Fs2Streaming.scala#L97

wiwa avatar Feb 13 '23 01:02 wiwa

I was playing around with this. Somehow, setting the read to block = Some(1.millis) removes the flakiness (tested via 100+ runs). This doesn't make sense to me, since the default is block = Some(Duration.Zero), i.e. wait forever.

Fixes that require a sleep or so are really flaky, that's why this test is marked as such.

If we use N messages instead of 1, and say interruptAfter(1000.seconds), then .take(N - 1) makes it "work", while .take(N) leads to hanging forever.

Does take(N - 1) get you all the published messages? I think that would be missing the final message, no?

Question: why is there a repeat here?

I can't remember any details of the streaming module, it's been too long, but since you seem to have it fresh I'll counter-ask, why shouldn't there be a repeat there? :)

gvolpe avatar Feb 14 '23 18:02 gvolpe

Yes, it misses the final message. It's less of a "here's a fix" and more of a "here are some things I noticed about it". That being said, the current test essentially says "block forever", until the interrupt, so I am specificially lowering the sleep from "infinite"/3.seconds to 1.millis

The reason I don't expect a repeat is that (I believe) list <- Stream.eval(rawStreaming.xRead(offsets.values.toSet, block, count)) already gives me count messages -- I wouldn't expect more than that. But the repeat does let us continue the read from the initial offset...?

wiwa avatar Feb 14 '23 18:02 wiwa

But the repeat does let us continue the read from the initial offset...?

No, it would not, but there could be something wrong related to the block and count arguments. I am not really familiar with the streaming API, it's mainly experimental in this library.

gvolpe avatar Feb 14 '23 18:02 gvolpe

I think I figured it out: https://github.com/lettuce-io/lettuce-core/discussions/1593

The block = Some(Duration.Zero) means that XREAD before XADD causes the client to block indefinitely, because async().xread() is only async from the point of view of Java, not the underlying Redis client. So, we need to use an async connection pool. Verified that this works if we just use a 2nd connection for XREAD.

As for why block = Some(1.millis) works: it's because XREAD returns with nothing, and the stream will continue XREADing nothing (i.e. 1000 reads/sec) until XADD is hit.

(Edit) This is also why my initial impression of the redundancy of repeat is wrong. Since XREAD's count is actually just "maxCount" -- it will return immediately upon no data.

wiwa avatar Feb 14 '23 23:02 wiwa

I don't think repeat is wrong, as we'd want a continuous stream of data from the XREAD source, but definitely some improvements could be made to the logic behind block and count.

So, we need to use an async connection pool. Verified that this works if we just use a 2nd connection for XREAD.

That makes sense, you can see how transactions and pipelining work in this library. Instead of adding two underlying connections, what you need is to create two different instances of Streaming via mkStreamingConnection, which share the same RedisClient, and use one for reading and the other for writing.

gvolpe avatar Feb 15 '23 09:02 gvolpe

Yep, fully agree with you. I meant to say that my impression was wrong (and that repeat, of some form, is correct). Sorry for confusion!

wiwa avatar Feb 15 '23 10:02 wiwa

Ah no problems, thanks for digging into it! I think what we can do immediately is to create a proper withRedisStream helper method for testing that returns two instances of Streaming for read/write. Let me know if you'd like to give that a go.

That may already fix the test, but obviously your findings on the usage of block and count need to be fixed as well.

gvolpe avatar Feb 15 '23 10:02 gvolpe

Documentation for streams needs to be marked as experimental as well, and we could document that two streaming connections need to be created (one for read; one for write), similar to how transactions/pipelining work.

gvolpe avatar Feb 15 '23 10:02 gvolpe

I think I'll have time before the weekend, will take a look in the direction you suggested.

wiwa avatar Feb 16 '23 10:02 wiwa