effect icon indicating copy to clipboard operation
effect copied to clipboard

feat(PubSub): unbounded replay

Open dilame opened this issue 1 year ago • 10 comments

Type

  • [ ] Refactor
  • [x] Feature
  • [ ] Bug Fix
  • [ ] Optimization
  • [ ] Documentation Update

Description

Implementation of replay last N values feature for PubSub.unbounded(replayBufferSize: N = 0). Can further be used to create shared Stream https://github.com/Effect-TS/effect/pull/2943.

Related

  • Related Issue #
  • Closes #

dilame avatar Jun 06 '24 14:06 dilame

🦋 Changeset detected

Latest commit: 7cdbf8128e34e542985d69dcdbc0f5853709280f

The changes in this PR will be included in the next version bump.

This PR includes changesets to release 30 packages
Name Type
effect Minor
@effect/cli Major
@effect/cluster-browser Major
@effect/cluster-node Major
@effect/cluster-workflow Major
@effect/cluster Major
@effect/experimental Major
@effect/opentelemetry Major
@effect/platform-browser Major
@effect/platform-bun Major
@effect/platform-node-shared Major
@effect/platform-node Major
@effect/platform Major
@effect/printer-ansi Major
@effect/printer Major
@effect/rpc-http Major
@effect/rpc Major
@effect/schema Major
@effect/sql-d1 Major
@effect/sql-drizzle Major
@effect/sql-mssql Major
@effect/sql-mysql2 Major
@effect/sql-pg Major
@effect/sql-sqlite-bun Major
@effect/sql-sqlite-node Major
@effect/sql-sqlite-react-native Major
@effect/sql-sqlite-wasm Major
@effect/sql Major
@effect/typeclass Major
@effect/vitest Major

Not sure what this means? Click here to learn what changesets are.

Click here if you're a maintainer who wants to add another changeset to this PR

changeset-bot[bot] avatar Jun 06 '24 14:06 changeset-bot[bot]

I think if we were going to add a replay option, we would need to add it to every strategy.

Also switch to object arguments:

Pubsub.unbounded<string>({ replaySize: 16 })

tim-smart avatar Jun 09 '24 22:06 tim-smart

I'm not sure how replay should behave for all the other strategies. FMPOV it only makes sense for unbounded, because it's the only unlimited strategy. For example, PubSub.bounded<string>({capacity: 2, replay: 3}) looks contradictory to me, same as PubSub.dropping<string>({capacity: 2, replay: 3}). But maybe i am wrong. I would suggest to focus on unbounded strategy in this PR.

I like the idea about object argument. What do you think about replayBufferSize name? I think it exhaustively reflects the concept behind it

dilame avatar Jun 09 '24 23:06 dilame

PubSub.bounded<string>({capacity: 2, replay: 3}) looks contradictory to me

capacity and replay could be separate buffers, so you could have a higher replay buffer compared to the capacity. Or the capacity could be the ceiling.

Also I think the replay doesn't need to be tracked per strategy, but rather on the PubSubImpl. The replay buffer can be independent of the subscriber-based logic.

tim-smart avatar Jun 10 '24 08:06 tim-smart

Also I think the replay doesn't need to be tracked per strategy, but rather on the PubSubImpl. The replay buffer can be independent of the subscriber-based logic.

I don't see a way to implement it in the PubSubImpl within the current architecture because we need to pass a pointer to the node from where to start to the new subscription (https://github.com/dilame/effect/blob/pubsub-unbounded-replay/packages/effect/src/internal/pubsub.ts#L754). The new subscription is being created at the UnboundedPubSub level, and UnboundedPubSub doesn't have any pointers to the PubSubImpl, so even if we track the replay in the PubSubImpl, we will not have access to it from the place where we need it.

I think it is possible to reimplement the whole PubSub module from scratch with a new architecture that will track replay on a higher abstraction level, but is it really needed?

dilame avatar Jun 10 '24 13:06 dilame

Not sure replay should be part of pub-sub, but if we decide for it then it should not be part of a specific strategy, I'd at least explore the design where we track replay independently

mikearnaldi avatar Jun 12 '24 09:06 mikearnaldi

The replay feature is pretty commonly used. In RxJS – ReplaySubject is one of the most important classes. I implemented this feature for myself, because i really need it in my project. I decided to make it part of PubSub because PubSub.unbounded is the most similar entity to the RxJS Subject. I just don't see any other entities in Effect to make it part of. Maybe you have any thought about it, @mikearnaldi ?

TBH i wasn't sure if it should be a part of unbounded strategy. Maybe we just should make a new strategy specifically for that – PubSub.replay(N) and leave the PubSub.unbounded as is?

dilame avatar Jun 12 '24 09:06 dilame

Another reason why i think it should be a PubSub is because it will enable us to make a replayable shared stream with the share() operator from my another PR https://github.com/Effect-TS/effect/pull/2943, i think it is a really powerful and flexible symbiosis

dilame avatar Jun 12 '24 09:06 dilame

Another reason why i think it should be a PubSub is because it will enable us to make a replayable shared stream with the share() operator from my another PR #2943, i think it is a really powerful and flexible symbiosis

Another reason why i think it should be a PubSub is because it will enable us to make a replayable shared stream with the share() operator from my another PR #2943, i think it is a really powerful and flexible symbiosis

Another reason why i think it should be a PubSub is because it will enable us to make a replayable shared stream with the share() operator from my another PR #2943, i think it is a really powerful and flexible symbiosis

wouldn't share be memory unsafe?

mikearnaldi avatar Jun 12 '24 13:06 mikearnaldi

wouldn't share be memory unsafe?

I don't see any reason for it. Why do you think it could be? Actually, i just copied the code forshare() from some of broadcast operators, i don't remember which exactly. But conceptually share is just more flexible broadcast

dilame avatar Jun 12 '24 13:06 dilame