effect icon indicating copy to clipboard operation
effect copied to clipboard

add PubSub `replayCapacity` option

Open tim-smart opened this issue 1 year ago • 5 comments

tim-smart avatar Jul 01 '24 07:07 tim-smart

🦋 Changeset detected

Latest commit: 2de7f5b55180637ed3132df2e85d935f9e60aeee

The changes in this PR will be included in the next version bump.

This PR includes changesets to release 30 packages
Name Type
effect Minor
@effect/cli Major
@effect/cluster-browser Major
@effect/cluster-node Major
@effect/cluster-workflow Major
@effect/cluster Major
@effect/experimental Major
@effect/opentelemetry Major
@effect/platform-browser Major
@effect/platform-bun Major
@effect/platform-node-shared Major
@effect/platform-node Major
@effect/platform Major
@effect/printer-ansi Major
@effect/printer Major
@effect/rpc-http Major
@effect/rpc Major
@effect/schema Major
@effect/sql-d1 Major
@effect/sql-drizzle Major
@effect/sql-mssql Major
@effect/sql-mysql2 Major
@effect/sql-pg Major
@effect/sql-sqlite-bun Major
@effect/sql-sqlite-node Major
@effect/sql-sqlite-react-native Major
@effect/sql-sqlite-wasm Major
@effect/sql Major
@effect/typeclass Major
@effect/vitest Major

Not sure what this means? Click here to learn what changesets are.

Click here if you're a maintainer who wants to add another changeset to this PR

changeset-bot[bot] avatar Jul 01 '24 07:07 changeset-bot[bot]

While the withReplay implementation is more flexible, I would prefer having a separate PubSub.replay for several reasons.

First, PubSub.replay uses a Linked List, which makes it faster in terms of performance.

Second, there are issues with consistency when combining withReplay with PubSub.sliding. PubSub.sliding is designed to discard the oldest unprocessed value, and I expect this value to be the one in the replayBuffer. For example:

const pubsub = yield* PubSub.sliding<number>(3).pipe(
    Effect.map(PubSub.withReplay(3))
)

In this scenario, if we emit 1, 2, 3, they are stored in the replayBuffer. When a new subscription S1 starts processing from 1, and we emit 4, 5, 6, 7, S1 will process 1, 2, 3, 5, 6, 7. However, based on the sliding behavior, the logical expectation would be for S1 to process 1, 3, 4, 5, 6, 7

Third, the replayBufferSize is not considered when determining if the dropping or bounded pubsub is full. This undermines the point of these types. If we use dropping or bounded or sliding, it's usually to save resources like CPU or memory. Adding replay behaviour that doesn't take replayBuffer into capacity accounting is kinda "poisons" these pubsub types.

In summary, while PubSub.withReplay is functional, its use with sliding, dropping, and bounded variants reveals design flaws. The only clear and effective use case is with unbounded pubsub. Therefore, i would prefer a separate PubSub.replay as it provides a simpler and faster solution, ensuring better performance and clearer behavior consistency.

dilame avatar Jul 01 '24 22:07 dilame

I think it only makes sense to track replays for sliding/bounded/dropping separately in each of these implementations.

So, for PubSub.sliding({capacity: 5, replay: 10}) we are sliding the real "oldest value". For PubSub.bounded({capacity: 5, replay: 10}) we are rejecting the offer if the current replayBuffer length is more than capacity. And for PubSub.dropping({capacity: 5, replay: 10}) we are dropping the offer if the current replayBuffer length is more than capacity.

dilame avatar Jul 01 '24 22:07 dilame

its use with sliding, dropping, and bounded variants

Bounded and dropping is an easy fix - only add items to the replay buffer if they are successfully added to the pub sub.

Sliding is the only tricky one, but I want to see if we can support it as I have a use case for it.

tim-smart avatar Jul 02 '24 03:07 tim-smart

I think i have a good idea.

We can expose the most efficient API for each use-case. I offered a good (IMHO) implementation for unbounded pubsub replay, which is better than independent tracking in terms of perfomance.

We also have a good implementation of tracking replays for bounded and dropping pubsubs in this PR.

We can also write a separated implementation of replays for sliding variant.

And then we can expose such APIs

PubSub.unboundedReplay();
PubSub.boundedReplay();
PubSub.droppingReplay();
PubSub.slidingReplay();

Which would use different replay tracking mechanisms under the hood.

dilame avatar Jul 02 '24 11:07 dilame

We can expose the most efficient API for each use-case.

From a maintainability perspective, I would prefer not to have a replay version of each AtomicPubSub implementation. Also, things like capacity would be different depending on the strategy, so keeping the replay buffer separate keeps it cleaner.

tim-smart avatar Jul 05 '24 02:07 tim-smart

I would prefer not to have a replay version of each AtomicPubSub implementation.

Yes, of course, i didn't mean that. I just mean that we could use my implementation of unbounded replay, and your implementation for all the rest AtomicPubSub implementations, just because IMHO my implementaion is more efficient for unbounded type, and we already have it:)

dilame avatar Jul 05 '24 02:07 dilame

my implementaion is more efficient for unbounded type

I ran some benchmarks and they are pretty much the same, so I think we should just go with the less complex option.

tim-smart avatar Jul 05 '24 02:07 tim-smart

I ran some benchmarks and they are pretty much the same

Yep, i saw that you implemented ReplayBuffer based on linked list, and it should be the same from the performance point of view, but less effective from RAM point of view.

Let's say we have PubSub.unbounded({replay: Infinity}) and one slow subscriber. We emit 10000 elements, but we have 20000 in RAM because we keep the same elements in 2 places.

Not sure if it is something we should really care about living in cheap CPU era, but...

dilame avatar Jul 05 '24 03:07 dilame

We emit 10000 elements, but we have 20000 in RAM because we keep the same elements in 2 places.

It will just be overhead in terms of the replay buffer nodes, as the actual elements will be the same reference.

tim-smart avatar Jul 05 '24 03:07 tim-smart

It will just be overhead in terms of the replay buffer nodes, as the actual elements will be the same reference.

Yes, it will work for objects, but not for primitives, especially if it is a long strings

dilame avatar Jul 05 '24 03:07 dilame

Yes, it will work for objects, but not for primitives, especially if it is a long strings

If you have a large string, and rebind it to another variable, it will not clone the string:

const largeString = "....."

const obj = {
  value: largeString // still the same string in memory
}

tim-smart avatar Jul 05 '24 03:07 tim-smart

As far as i understand the JS – each time you pass a primitive to somewhere – you get the new copy of it in memory.

https://javascript.info/object-copy

dilame avatar Jul 05 '24 04:07 dilame

As far as i understand the JS – each time you pass a primitive to somewhere – you get the new copy of it in memory.

https://javascript.info/object-copy

No, only mutating the string will cause a new one to be allocated.

tim-smart avatar Jul 05 '24 04:07 tim-smart

I checked the memory snapshot and i was wrong – it is the same reference Screenshot 2024-07-05 at 07 25 26

dilame avatar Jul 05 '24 04:07 dilame

WOW WOW WOW HOT IN HERE!!! 🔥

dilame avatar Jul 08 '24 21:07 dilame