add PubSub `replayCapacity` option
🦋 Changeset detected
Latest commit: 2de7f5b55180637ed3132df2e85d935f9e60aeee
The changes in this PR will be included in the next version bump.
This PR includes changesets to release 30 packages
| Name | Type |
|---|---|
| effect | Minor |
| @effect/cli | Major |
| @effect/cluster-browser | Major |
| @effect/cluster-node | Major |
| @effect/cluster-workflow | Major |
| @effect/cluster | Major |
| @effect/experimental | Major |
| @effect/opentelemetry | Major |
| @effect/platform-browser | Major |
| @effect/platform-bun | Major |
| @effect/platform-node-shared | Major |
| @effect/platform-node | Major |
| @effect/platform | Major |
| @effect/printer-ansi | Major |
| @effect/printer | Major |
| @effect/rpc-http | Major |
| @effect/rpc | Major |
| @effect/schema | Major |
| @effect/sql-d1 | Major |
| @effect/sql-drizzle | Major |
| @effect/sql-mssql | Major |
| @effect/sql-mysql2 | Major |
| @effect/sql-pg | Major |
| @effect/sql-sqlite-bun | Major |
| @effect/sql-sqlite-node | Major |
| @effect/sql-sqlite-react-native | Major |
| @effect/sql-sqlite-wasm | Major |
| @effect/sql | Major |
| @effect/typeclass | Major |
| @effect/vitest | Major |
Not sure what this means? Click here to learn what changesets are.
Click here if you're a maintainer who wants to add another changeset to this PR
While the withReplay implementation is more flexible, I would prefer having a separate PubSub.replay for several reasons.
First, PubSub.replay uses a Linked List, which makes it faster in terms of performance.
Second, there are issues with consistency when combining withReplay with PubSub.sliding. PubSub.sliding is designed to discard the oldest unprocessed value, and I expect this value to be the one in the replayBuffer. For example:
const pubsub = yield* PubSub.sliding<number>(3).pipe(
Effect.map(PubSub.withReplay(3))
)
In this scenario, if we emit 1, 2, 3, they are stored in the replayBuffer. When a new subscription S1 starts processing from 1, and we emit 4, 5, 6, 7, S1 will process 1, 2, 3, 5, 6, 7.
However, based on the sliding behavior, the logical expectation would be for S1 to process 1, 3, 4, 5, 6, 7
Third, the replayBufferSize is not considered when determining if the dropping or bounded pubsub is full. This undermines the point of these types. If we use dropping or bounded or sliding, it's usually to save resources like CPU or memory. Adding replay behaviour that doesn't take replayBuffer into capacity accounting is kinda "poisons" these pubsub types.
In summary, while PubSub.withReplay is functional, its use with sliding, dropping, and bounded variants reveals design flaws. The only clear and effective use case is with unbounded pubsub. Therefore, i would prefer a separate PubSub.replay as it provides a simpler and faster solution, ensuring better performance and clearer behavior consistency.
I think it only makes sense to track replays for sliding/bounded/dropping separately in each of these implementations.
So, for PubSub.sliding({capacity: 5, replay: 10}) we are sliding the real "oldest value".
For PubSub.bounded({capacity: 5, replay: 10}) we are rejecting the offer if the current replayBuffer length is more than capacity.
And for PubSub.dropping({capacity: 5, replay: 10}) we are dropping the offer if the current replayBuffer length is more than capacity.
its use with sliding, dropping, and bounded variants
Bounded and dropping is an easy fix - only add items to the replay buffer if they are successfully added to the pub sub.
Sliding is the only tricky one, but I want to see if we can support it as I have a use case for it.
I think i have a good idea.
We can expose the most efficient API for each use-case. I offered a good (IMHO) implementation for unbounded pubsub replay, which is better than independent tracking in terms of perfomance.
We also have a good implementation of tracking replays for bounded and dropping pubsubs in this PR.
We can also write a separated implementation of replays for sliding variant.
And then we can expose such APIs
PubSub.unboundedReplay();
PubSub.boundedReplay();
PubSub.droppingReplay();
PubSub.slidingReplay();
Which would use different replay tracking mechanisms under the hood.
We can expose the most efficient API for each use-case.
From a maintainability perspective, I would prefer not to have a replay version of each AtomicPubSub implementation.
Also, things like capacity would be different depending on the strategy, so keeping the replay buffer separate keeps it cleaner.
I would prefer not to have a replay version of each AtomicPubSub implementation.
Yes, of course, i didn't mean that. I just mean that we could use my implementation of unbounded replay, and your implementation for all the rest AtomicPubSub implementations, just because IMHO my implementaion is more efficient for unbounded type, and we already have it:)
my implementaion is more efficient for unbounded type
I ran some benchmarks and they are pretty much the same, so I think we should just go with the less complex option.
I ran some benchmarks and they are pretty much the same
Yep, i saw that you implemented ReplayBuffer based on linked list, and it should be the same from the performance point of view, but less effective from RAM point of view.
Let's say we have PubSub.unbounded({replay: Infinity}) and one slow subscriber. We emit 10000 elements, but we have 20000 in RAM because we keep the same elements in 2 places.
Not sure if it is something we should really care about living in cheap CPU era, but...
We emit 10000 elements, but we have 20000 in RAM because we keep the same elements in 2 places.
It will just be overhead in terms of the replay buffer nodes, as the actual elements will be the same reference.
It will just be overhead in terms of the replay buffer nodes, as the actual elements will be the same reference.
Yes, it will work for objects, but not for primitives, especially if it is a long strings
Yes, it will work for objects, but not for primitives, especially if it is a long strings
If you have a large string, and rebind it to another variable, it will not clone the string:
const largeString = "....."
const obj = {
value: largeString // still the same string in memory
}
As far as i understand the JS – each time you pass a primitive to somewhere – you get the new copy of it in memory.
https://javascript.info/object-copy
As far as i understand the JS – each time you pass a primitive to somewhere – you get the new copy of it in memory.
https://javascript.info/object-copy
No, only mutating the string will cause a new one to be allocated.
I checked the memory snapshot and i was wrong – it is the same reference
WOW WOW WOW HOT IN HERE!!! 🔥