feat(PubSub): unbounded replay
Type
- [ ] Refactor
- [x] Feature
- [ ] Bug Fix
- [ ] Optimization
- [ ] Documentation Update
Description
Implementation of replay last N values feature for PubSub.unbounded(replayBufferSize: N = 0).
Can further be used to create shared Stream https://github.com/Effect-TS/effect/pull/2943.
Related
- Related Issue #
- Closes #
🦋 Changeset detected
Latest commit: 7cdbf8128e34e542985d69dcdbc0f5853709280f
The changes in this PR will be included in the next version bump.
This PR includes changesets to release 30 packages
| Name | Type |
|---|---|
| effect | Minor |
| @effect/cli | Major |
| @effect/cluster-browser | Major |
| @effect/cluster-node | Major |
| @effect/cluster-workflow | Major |
| @effect/cluster | Major |
| @effect/experimental | Major |
| @effect/opentelemetry | Major |
| @effect/platform-browser | Major |
| @effect/platform-bun | Major |
| @effect/platform-node-shared | Major |
| @effect/platform-node | Major |
| @effect/platform | Major |
| @effect/printer-ansi | Major |
| @effect/printer | Major |
| @effect/rpc-http | Major |
| @effect/rpc | Major |
| @effect/schema | Major |
| @effect/sql-d1 | Major |
| @effect/sql-drizzle | Major |
| @effect/sql-mssql | Major |
| @effect/sql-mysql2 | Major |
| @effect/sql-pg | Major |
| @effect/sql-sqlite-bun | Major |
| @effect/sql-sqlite-node | Major |
| @effect/sql-sqlite-react-native | Major |
| @effect/sql-sqlite-wasm | Major |
| @effect/sql | Major |
| @effect/typeclass | Major |
| @effect/vitest | Major |
Not sure what this means? Click here to learn what changesets are.
Click here if you're a maintainer who wants to add another changeset to this PR
I think if we were going to add a replay option, we would need to add it to every strategy.
Also switch to object arguments:
Pubsub.unbounded<string>({ replaySize: 16 })
I'm not sure how replay should behave for all the other strategies.
FMPOV it only makes sense for unbounded, because it's the only unlimited strategy.
For example, PubSub.bounded<string>({capacity: 2, replay: 3}) looks contradictory to me, same as PubSub.dropping<string>({capacity: 2, replay: 3}). But maybe i am wrong.
I would suggest to focus on unbounded strategy in this PR.
I like the idea about object argument. What do you think about replayBufferSize name? I think it exhaustively reflects the concept behind it
PubSub.bounded<string>({capacity: 2, replay: 3})looks contradictory to me
capacity and replay could be separate buffers, so you could have a higher replay buffer compared to the capacity. Or the capacity could be the ceiling.
Also I think the replay doesn't need to be tracked per strategy, but rather on the PubSubImpl. The replay buffer can be independent of the subscriber-based logic.
Also I think the replay doesn't need to be tracked per strategy, but rather on the PubSubImpl. The replay buffer can be independent of the subscriber-based logic.
I don't see a way to implement it in the PubSubImpl within the current architecture because we need to pass a pointer to the node from where to start to the new subscription (https://github.com/dilame/effect/blob/pubsub-unbounded-replay/packages/effect/src/internal/pubsub.ts#L754). The new subscription is being created at the UnboundedPubSub level, and UnboundedPubSub doesn't have any pointers to the PubSubImpl, so even if we track the replay in the PubSubImpl, we will not have access to it from the place where we need it.
I think it is possible to reimplement the whole PubSub module from scratch with a new architecture that will track replay on a higher abstraction level, but is it really needed?
Not sure replay should be part of pub-sub, but if we decide for it then it should not be part of a specific strategy, I'd at least explore the design where we track replay independently
The replay feature is pretty commonly used. In RxJS – ReplaySubject is one of the most important classes.
I implemented this feature for myself, because i really need it in my project. I decided to make it part of PubSub because PubSub.unbounded is the most similar entity to the RxJS Subject. I just don't see any other entities in Effect to make it part of. Maybe you have any thought about it, @mikearnaldi ?
TBH i wasn't sure if it should be a part of unbounded strategy. Maybe we just should make a new strategy specifically for that – PubSub.replay(N) and leave the PubSub.unbounded as is?
Another reason why i think it should be a PubSub is because it will enable us to make a replayable shared stream with the share() operator from my another PR https://github.com/Effect-TS/effect/pull/2943, i think it is a really powerful and flexible symbiosis
Another reason why i think it should be a
PubSubis because it will enable us to make a replayable shared stream with theshare()operator from my another PR #2943, i think it is a really powerful and flexible symbiosis
Another reason why i think it should be a
PubSubis because it will enable us to make a replayable shared stream with theshare()operator from my another PR #2943, i think it is a really powerful and flexible symbiosis
Another reason why i think it should be a
PubSubis because it will enable us to make a replayable shared stream with theshare()operator from my another PR #2943, i think it is a really powerful and flexible symbiosis
wouldn't share be memory unsafe?
wouldn't share be memory unsafe?
I don't see any reason for it. Why do you think it could be?
Actually, i just copied the code forshare() from some of broadcast operators, i don't remember which exactly. But conceptually share is just more flexible broadcast