Groupwithin improvements
Summary
Apologies in advance for the wall of text: hopefully this time it will be worth it (if not this is probably my last attempt on this for a while π )
This PR is a followup from #3162 (thanks all for the feedback)
Goals:
- improving timeout behaviour (increased accuracy)
- improving performance: over 100% faster in certain scenarios π , (with better memory utilization: this is what I'm gathering from
gcbenchmark stats) - fixing
interruptionpropagation misbehaviour - simplifying logic (race condition)
Timeout behaviour
The current implementation has a small flaw in the timeout logic: when entering the timeout state the stream waits for the first element then tries to calculate how many elements are available using the supply semaphore. It finally uses that figure to decide where to split the buffer.
The problem is that even if the buffer has collected enough elements to make up a chunk, these wonβt always be part of the emitted chunk, because of the above logic. I suspect this is whatβs happening in the "accumulation and splitting" test, which is more flaky than it should probably be (some level of flakiness is probably acceptable since itβs fundamentally a time based test, but with the current logic itβs very easy to get a 50%, or even higher, failure rate).
When running the test below (a slightly modified version of the original test that includes an additional sleep) the situation is even worse: itβs nearly impossible to get the test passing)
test("accumulation and splitting 2".only) {
val t = 200.millis
val size = 5
val sleep = Stream.sleep_[IO](2 * t)
val longSleep = Stream.sleep_[IO](10 * t)
def chunk(from: Int, size: Int) =
Stream.range(from, from + size).chunkAll.unchunks
// this test example is designed to have good coverage of
// the chunk manipulation logic in groupWithin
val source =
chunk(from = 1, size = 3) ++
sleep ++
chunk(from = 4, size = 1) ++ longSleep ++
chunk(from = 5, size = 11) ++
chunk(from = 16, size = 7)
val expected = List(
List(1, 2, 3),
List(4),
List(5, 6, 7, 8, 9),
List(10, 11, 12, 13, 14),
List(15, 16, 17, 18, 19),
List(20, 21, 22)
)
source.groupWithin(size, t).map(_.toList).assertEmits(expected)
}
This PR introduces a new mechanism to improve the accuracy of the timeout logic. Instead of calculating the number of elements to be flushed, we flush them and then lower the supply accordingly. We also use acquireN vs tryAcquireN since the number of elements flushed is known.
I'm also using a SignallingRef for state management and to provide an upper bound to the fiber that waits for the first chunk. I need this to comply to one to the test cases.
The result is a more accurate behaviour (see below)
I've run both "accumulation and splitting" tests a number of times and had zero failures so far
Performance
benchmarks figures are better compared to the existing implementation: consistently faster, over 100% faster (ops/sec), especially on large streams, and with better gc stats (see screenshots below)
Example
- test:
stress test (short execution): all elements are processed(with range parameter set to10 mill)- suggested implementation: 15 seconds
- existing implementation: 30 seconds
suggested implementation
existing implementation
Simplified logic
This implementation is hopefully simpler to follow.
- removing the custom error propagation (relies on
concurrently) - limiting the use of
Semaphoreto what's needed, (demandis gone in favour of a bounded queue) - reducing the demand/supply
Option[Either[T, A]]state flags to a singleboolean - relying on the fs2 streams api to await a chunk instead of splitting the state
Vector
Tests
This PR include new tests
accumulation and splitting 2(I'm happy to remove this, or include thelongSleepin the original test, it was just for illustration purposes)stress test (short execution): all elements are processed. (copy of the benchmark tests with an integrity check at the end) I found this useful as it allowed me to verify the behaviour over a long stream of elements, uncovering a couple of bugs in the permits management logic. As mentioned elsewhere it is possible to write an implementation that passes all other tests consistently, but fails this onestress test (long execution): all elements are processed: basically the same as the one above, but it pauses after each elements. in theory this allows to test the timeout logic more frequently than the previous test. Marked asignoredsince it takes ~15 mins to complete with TestControlif the buffer fills up at the same time when the timeout expires there won't be a deadlock(thanks @armanbilge for explaining the problem and for suggesting a nice way to reproduce the bug) this test allowed me to confirm the race condition bug and to make sure the fix was working as expected- upstream error/interruption propagation error tests. (thanks @SystemFw for explaining what the correct behaviour should be on my previous attempt)
Notes
- may be a fix for #2432 (let me know what's your experience, I haven't seen a single failure yet, when running these tests in isolation or as part of a whole suite. EDIT: I've made a small change and I've seen the first failure (on CI) after several successful runs, so it is still a flaky test. The failure rate however has decreased significantly, I'm still seeing several successful test runs locally as shown in the screenshot above)
- fixes #3169 (unless #3183 gets merged first, note though that the interruption behaviour is not working not only on upstream termination but also when
chunkSize == 0(I believe this is a bug and I assume the behaviour of this implementation is the intended behaviour. I should be able to change it easily if that's not the case. Either way I'm happy to add a test to document the behaviour))
Thank you
avoid deconstructing tuple in for-compr to avoid CI js failure
Sorry, this is my trigger π
This issue is not related to Scala.js. The core Scala language (i.e. syntax) actually works 100% identically for all three platforms (if you ever find a case where it doesn't, that's a legitimate bug and we should fix it :).
Sometimes, a JS CI job will happen to fail first. That doesn't mean it's a JS-specific issue.
/rant π
@armanbilge I had a look, this time it sounds like a Scala.js problem on scala 3 (setting the version to 2 and running the CI command ++ 3.2.2 Test/scalaJSLinkerResult doesn't cause issues).
I tried to set the compiler flag -source:future, but that brings up a bunch of other unrelated errors/warning, that perhaps deserve a separate PR.
Also I've found a similar issue here: sounds like a complier issue from the discussion. Do you think it's worth spending more time on this ?
I had a look, this time it sounds like a Scala.js problem on scala 3
You can't reproduce it with Scala JVM?
You can't reproduce it with Scala JVM?
Nope: the command is not available on jvm
scalaJSLinkerResult is just running compile, try compiling on JVM :)
yeah compile works on jvm, (I think it's a subcommand of test and I've run these tests a number of times π)
Test/compile on Scala 3 JVM works, and Test/compile on Scala 3 JS doesn't work? That would be a bug.
Test/compile on Scala 3 JVM works, and Test/compile on Scala 3 JS doesn't work? That would be a bug.
I've run the jvm tests on scala 2, let me try with scala 3
@armanbilge yeah you're right I'm getting the same error.
As mentioned earlier though using the -source:future compiler flag, doesn't work:
[warn] 60 warnings found
[error] 55 errors found
I'm leaning towards keeping it as it is, unless you know how to fix it: because I have no idea: the issue linked earlier labels it as a won't-fix. Hopefully deconstructing vs non-deconstructing the tuple is not a deal breaker. Either way should we move this to Discord ? The conversation is moving away from the PR π’
Hopefully deconstructing vs non-deconstructing the tuple is not a deal breaker.
It's not a problem at all, just a symptom of cross-compiling across Scala 2 and Scala 3. Sorry that I derailed things π
It's not a problem at all, just a symptom of cross-compiling across Scala 2 and Scala 3. Sorry that I derailed things π
No worries: it's just because I've opened a bunch of PRs doing the same thing and it might get confusing. Thanks for explaining though, learned something again π₯
@armanbilge I was tweaking the benchmark parameters and I've discovered a flaw in this implementation.
Currently the buffer window is set to 100 micros meaning every test run will generate very small buffers (with 1 or 2 elements), simply because the timeout is too short for the buffer to accumulate a significant number of elements. So I've decided to extend the timeout to 1 second and I've also increased the total number of elements and the buffer size to 1 mill and 100k respectively.
This is what I've found out:
- using a
Queuefrom cats for back pressure and buffering adds a significant overhead in terms of memory (perhaps this was obvious, I didn't know)
And these are the results:
visual vm & benchmarks (only one test case)
(no longer relevant) suggested implementation (cats bounded Queue) - replaced

existing implementation - main branch

suggested implementation (Semaphore + Vector) - this branch

In light of this, I've decided to restore the Vector (Chunk doesn't seem to improve performance, at least with the existing benchmarks, will test with different benchmark params) + demand semaphore (renamed to backpressure, explained below).
This is the commit with the change explained above (there's also another small improvement where the supply == 0 is checked only once and some refactoring)
So to summarize, compared to the existing implementation I'm seeing better performance overall and increased accuracy:
- ops/sec: 70%-90% better (see benchmarks above: nearly twice as fast)
- memory: pretty much the same (see visual vm screenshots)
- timeout logic (tested via "accumulation and splitting" logic): lower failure rate (from
60/80%to less than1-2%maybe: hard to give an exact figure, but failures are few and far between. I haven't seen a single failure locally yet, but I've seen failures on ci: example)
On top of that a few improvements:
- timeout logic is a bit clearer (at least to me)
splitAtlogic is gone, the entire buffer is replaced instead since the buffer is bounded it will contain at mostchunkSizeelements- end of demand tracking is gone, the semaphore is purely used for backpressure (as opposed to backpressure + upstream interruption via
forall) - error/interruption propagation relies on existing combinators (
concurrently) - edge case (chunkSize == 0) is explicitly handled (not sure if this is the best way to handle it (i.e. should the stream terminate immediately, should we have enforce the groupSize to be positive ?). Either way changing it to whatever makes the most sense it's easy. Note that the existing implementation causes the stream to never end
- edge cases (chunkSize == 1 and timeout == 0) uses an existing combinator (
chunkN, not sure about this since it might lead to different behaviour in terms of other properties ofgroupWithindepending on the parameters. If that's not desirable it can be changed easily)
So if the above makes sense and you guys are happy with these changes I think that the bulk of the work is done and what's left is:
- [ ] decide the desired behaviour around edge cases (and write tests/scaladocs at least for the chunkSize == 0 scenario)
- [ ] (possibly) parameterise benchmarks including a longer timeout window to have more realistic scenarios
- [x] test
ChunkvsVectorwith timeout longer than100 micros(apparently Chunk produces results similar to Queue) - [ ] mark the "accumulation & splitting" test(s) as flaky (even though the behaviour is more accurate, truth is it can still fail)
- [ ] remove tests added here #3183
- [x] sync this branch with main
Notes
- I don't think this ci failure is caused by a deadlock or defect in this implementation. The same error (still on rootJS tests) occurred (see that ci failure logs) on the same test on #3183 which uses the same implementation currently on the
mainbranch (not to mention that one of the tests added in this PR explicitly checks that no deadlock occurs)
Thanks for that update!
In light of this, I've decided to restore the
Vector(Chunkdoesn't seem to improve performance, at least with the existing benchmarks, will test with different benchmark params)
I have a question about this: I understand that Chunk didn't improve performance in your benchmarks, but does it degrade performance? The reason I ask is because there is a big difference between e.g. a Chunk[Byte] and a Vector[Byte]: the chunk can be backed by a primitive array but the vector will be boxed.
I have a question about this: I understand that Chunk didn't improve performance in your benchmarks, but does it degrade performance? The reason I ask is because there is a big difference between e.g. a Chunk[Byte] and a Vector[Byte]: the chunk can be backed by a primitive array but the vector will be boxed.
@armanbilge sorry for the late reply.
I see, thanks for explaining. When I ran the benchmark last time using s2.Chunk I had noticed a similar behaviour (memory-wise) observed with the cats Queue (visual vm stats/graphs were similar).
In terms of ops/sec instead I didn't notice any particular difference, but I was running the existing benchmarks using Int values.
I'm wondering if the difference between Chunk[Byte] vs Vector[Byte] calls for a separate method, or maybe better a separate extended api specifically targeting Streams[F, Byte]
I'm wondering if the difference between
Chunk[Byte]vsVector[Byte]calls for a separate method, or maybe better a separate extended api specifically targetingStreams[F, Byte]
It doesn't just apply to Byte, it applies to all primitive types. The specific needs of FS2 are precisely why Chunk exists and is used in APIs instead of e.g. Vector. Here's a presentation by Michael discussing some of this :)
https://www.youtube.com/watch?v=wOybldcyMLs