effect icon indicating copy to clipboard operation
effect copied to clipboard

feat(Stream): share

Open dilame opened this issue 1 year ago • 4 comments

Type

  • [ ] Refactor
  • [x] Feature
  • [ ] Bug Fix
  • [ ] Optimization
  • [ ] Documentation Update

Description

Stream.share({ connector: PubSub.unbounded() }) returns a new Stream that multicasts (shares) the original Stream by forking runIntoQueue or runIntoPubSub process in forkDaemon mode. As long as there is at least one consumer, this Stream will be run and emitting data. When all consumers have exited, it will kill forked daemon.

I understand that this PR is kinda incomplete, i just need to know if you are interested in this operator at all, so i could finish. Please, let me know.

Related

I have not filled an issue on GitHub, but i posted a question in Discord, i will repeat it here

I have a WebSocket connection to a remote server. To subscribe to a topic, I send a subscribe@someTopic message, and to unsubscribe, I send an unsubscribe@someTopic message. Each topic is represented as a broadcasted Stream.

The challenge is that if two different parts of my code subscribe to the same topic, they should use the same WS connection, but should be able to work independently at the same time. This is difficult to achieve with the classic Scope finalization model because if one of the two consumers closes the scope, it will send the unsubscribe@someTopic signal to the server, causing the second consumer to stop receiving events without notice.

Using a global scope is not a viable solution because I need to ensure the WebSocket connection remains clean and unsubscribe from the topic when there are no active consumers.

The share operator addresses this issue by allowing multiple independent consumers to subscribe to the same topic without interfering with each other. This ensures that each consumer can independently manage its subscription and receive events without being affected by other consumers.

And, actually, there are much more scenarios when such share is invaluable.

dilame avatar Jun 24 '24 23:06 dilame

🦋 Changeset detected

Latest commit: c14e39f5bf7c686cad82d6f84f26277a5458085a

The changes in this PR will be included in the next version bump.

This PR includes changesets to release 31 packages
Name Type
effect Minor
@effect/cli Major
@effect/cluster-browser Major
@effect/cluster-node Major
@effect/cluster-workflow Major
@effect/cluster Major
@effect/experimental Major
@effect/opentelemetry Major
@effect/platform-browser Major
@effect/platform-bun Major
@effect/platform-node-shared Major
@effect/platform-node Major
@effect/platform Major
@effect/printer-ansi Major
@effect/printer Major
@effect/rpc-http Major
@effect/rpc Major
@effect/schema Major
@effect/sql-d1 Major
@effect/sql-drizzle Major
@effect/sql-kysely Major
@effect/sql-mssql Major
@effect/sql-mysql2 Major
@effect/sql-pg Major
@effect/sql-sqlite-bun Major
@effect/sql-sqlite-node Major
@effect/sql-sqlite-react-native Major
@effect/sql-sqlite-wasm Major
@effect/sql Major
@effect/typeclass Major
@effect/vitest Major

Not sure what this means? Click here to learn what changesets are.

Click here if you're a maintainer who wants to add another changeset to this PR

changeset-bot[bot] avatar Jun 24 '24 23:06 changeset-bot[bot]

I think a better primitive is a kind of ScopedRef that does ref counting. You can then wrap a PubSub or Queue with it and call Stream.fromQueue etc.

It could also be used for other applications.

tim-smart avatar Jun 25 '24 00:06 tim-smart

As i said, this PR is WIP. It should have options like

interface Options {
  readonly connector: Effect.Effect<
      PubSub.PubSub<Take.Take<A, E>> | Queue.Queue<Take.Take<A, E>>
    >;
  readonly resetOnRefCountZero?: boolean;
  readonly resetOnError?: boolean;
  readonly resetOnComplete?: boolean;
}

Let's say we have

Stream.share({
  connector: PubSub.replay(1),
  resetOnRefCountZero: false;
})

That means if there was 2 consumers, then they both unsubscribed (0 consumers at now), then 1 consumer subscribed – it will receive the last value from stream. As i understand, it is impossible to achieve with ScopedRef idea. Also, i don't quite understand how to implement such a ScopedRef primitive

dilame avatar Jun 25 '24 13:06 dilame

@tim-smart could you maybe provide an example of code of how to use hypothetical ScopedRef to solve the described problem?

dilame avatar Jun 25 '24 14:06 dilame

@tim-smart could you maybe provide an example of code of how to use hypothetical ScopedRef to solve the described problem?

https://github.com/Effect-TS/effect/pull/3179

You could do:

import { Effect, PubSub, RcMap, Schedule, Stream } from "effect"

Effect.gen(function*() {
  const map = yield* RcMap.make((topic: string) =>
    Effect.acquireRelease(
      PubSub.unbounded<string>().pipe(
        Effect.tap((pubsub) =>
          pubsub.publish("message").pipe(
            Effect.schedule(Schedule.spaced(500)),
            Effect.forkScoped,
            Effect.interruptible
          )
        )
      ),
      () => Effect.log(`unsubscribing from ${topic}`)
    )
  )

  const subscribe = (topic: string): Stream.Stream<string> =>
    RcMap.get(map, topic).pipe(
      Effect.map((_) => Stream.fromPubSub(_)),
      Stream.unwrapScoped
    )

  yield* subscribe("foo").pipe(
    Stream.take(3),
    Stream.runForEach(Effect.log)
  )

  yield* Effect.log("done")
}).pipe(
  Effect.scoped,
  Effect.runPromise
)

tim-smart avatar Jul 05 '24 11:07 tim-smart

@tim-smart wow, i didn't noticed your message here, saw it just now after pushed refined version of share to ask you for a help.

Anyway, as i understand, it it not supposed to be a replacement for Stream.share operator, but it could be used under the hood. Not sure if it will make it better or just more complex with no benefit, though. AIU it doesn't support resetOnRefCountZero: false option.

I'm sorry distrubing you here, but i spent the whole day trying to understand why my second test case doesn't work. It works if i replace this

fiber ??= yield* Effect.forkScoped(runIntoPubSubScoped(upstream, connector))

with this

fiber ??= yield* Effect.forkDaemon(runIntoPubSub(upstream, connector))

Which makes me think that something is wrong with my scope management, but everything looks correct FMPOV. But effect doesn't think so:)

I really need someone's help. Could you take a look at it, please?

dilame avatar Jul 05 '24 21:07 dilame

I fixed the test to bypass the founded bug

dilame avatar Jul 06 '24 03:07 dilame

Again, this should be Effect<Stream<A, E>, never, R | Scope>.

The context should be captured at the point of creation, so the R is eliminated from the consumption point, and any fibers forked inside the Scope.

I feel there is a misunderstanding about this operator. Forking upstream consumption inside the resulting effect's scope is impossible due to its nature. This operator is designed to subscribe to the upstream only after the first consumer subscribes. Even if I fork upstream consumption during effect initialization, I must kill this fork when the ref count hits zero and restart it when it hits one. This is only feasible within the stream execution pipeline.

As we discussed earlier, streams operate within a Scope they create themselves if using non-scoped Stream.run*.

I hope this clarifies why I fork inside the stream pipeline—it's simply not feasible otherwise. Now, let’s consider using forkScoped instead of forkDaemon:

If you fork upstream consumption with forkScoped inside the stream pipeline, this fork will terminate upon downstream completion regardless of the ref count. This is unacceptable since other shared downstreams might still be consuming it. This contradicts the logic of shareRefCount.

You can run all shared downstreams with Stream.runScoped, but this requires informing users because it’s not obvious. Additionally, it restricts users as they might attach business logic to Stream.ensuring on shared downstreams, needing it to execute upon stream termination. With Stream.runScoped, this only happens upon parent scope termination. Thus, users face a limitation: either use shareRefCount or have Stream.ensuring guaranteed to execute upon stream termination, but not both simultaneously. This is unacceptable.

Even if we run shared downstreams with Stream.runScoped, the subscriber count hits zero only when the scope closes, not upon actual stream consumption termination. The count decreases only with Stream.ensuring, which, in the case of runScoped, executes upon parent scope termination, defeating the purpose of shareRefCount.

Using forkDaemon is the only correct and 100% resource-safe solution since it kills within Stream.ensuring when refCount === 0. The Effect runtime ensures that Stream.ensuring executes upon scope closure or program termination—eventually, it will execute, giving users full control.

Based on this, I believe the implementation is correct.

Regarding wrapping this in Effect, I think it's pointless. Only the counter initializes; no effects execute during initialization. Furthermore, we can't "lift" the context R from Stream to Effect, as it results in a compilation error.

For example, this declaration:

shareRefCount(): Effect.Effect<Stream.Stream<A, E>, never, R>

will result in the following error:

packages/effect/src/internal/stream.ts:5730:5 - error TS2375: Type 'Effect<Stream<A, E, R>, never, never>' is not assignable to type 'Effect<Stream<A, E, never>, never, Scope | R>' with 'exactOptionalPropertyTypes: true'. Consider adding 'undefined' to the types of the target's properties.
  Type 'Stream<A, E, R>' is not assignable to type 'Stream<A, E, never>' with 'exactOptionalPropertyTypes: true'. Consider adding 'undefined' to the types of the target's properties.
    Type 'R' is not assignable to type 'never'.

5730     return Effect.gen(function*(){
         ~~~~~~

  packages/effect/src/internal/stream.ts:5724:10
    5724   <A, E, R>(
                  ~
    This type parameter might need an `extends never` constraint.

This is correct because the resulting Stream actually requires the context R as it runs runIntoPubSub(self, connector) within its pipeline, not upon yield* shareRefCount().

Thus, the correct declaration should be:

declare function shareRefCount(...): Effect.Effect<Stream.Stream<A, E, R>>

which, in my opinion, offers no practical benefit.

The only reason to use this form:

declare function shareRefCount(...): Effect.Effect<Stream.Stream<A, E, R>>

is to create two different instances with separate refCount counters. But this can also be done with:

declare function shareRefCount(...): Stream.Stream<A, E, R>

In the first case, you'd write:

declare const upstream: Stream.Stream<number>
const sharedStreamConstructor = upstream.pipe(Stream.shareRefCount());

const sharedStreamInstance1 = yield* sharedStreamConstructor;
const sharedStreamInstance2 = yield* sharedStreamConstructor;

In the second case, you'd write:

declare const upstream: Stream.Stream<number>
const sharedStreamInstance1 = upstream.pipe(Stream.shareRefCount());
const sharedStreamInstance2 = upstream.pipe(Stream.shareRefCount());

The constructor step seems unnecessary in this chain.

However, if this is necessary, I will do it as you want. I would appreciate an explanation of why you think it's better—I'm genuinely curious. If you don't want to explain, just let me know, and I will still implement it as requested.

dilame avatar Jul 07 '24 23:07 dilame

I also implemented a test that dead-locks in case of using Effect.forkScoped under the hood of shareRefCount

dilame avatar Jul 07 '24 23:07 dilame

I also implemented a test that dead-locks in case of using Effect.forkScoped under the hood of shareRefCount

You should be using forkIn in this situation, to fork in the parent scope:

const forkingStream = <A, E, R>(self: Stream.Stream<A, E, R>) =>
  Effect.gen(function*() {
    const scope = yield* Effect.scope
    const context = yield* Effect.context<R>()
    let fiber: Fiber.RuntimeFiber<void, E> | undefined = undefined
    return Effect.gen(function*() {
      fiber = yield* Stream.runDrain(self).pipe(
        Effect.locally(FiberRef.currentContext, context),
        Effect.forkIn(scope)
      )
    })
  })

tim-smart avatar Jul 07 '24 23:07 tim-smart

Do you mean like this (only about forking mechanism)?

dilame avatar Jul 08 '24 00:07 dilame

Do you mean like this (only about forking mechanism)?

Yes

tim-smart avatar Jul 08 '24 00:07 tim-smart

@dilame Thank you for your detailed explanation! I'll try to address what I can you lower the cognitive load for @tim-smart and I'm sure he would correct me if I say something wrong :)

I feel there is a misunderstanding about this operator. Forking upstream consumption inside the resulting effect's scope is impossible due to its nature.

As we discussed earlier, streams operate within a Scope they create themselves if using non-scoped Stream.run*.

Regarding wrapping this in Effect, I think it's pointless. Only the counter initializes; no effects execute during initialization.

The forking is not the relevant action that is being addressed (it's just the affected party, in this case). You should stagger the construction of the stream so that the environment could be accessed as soon as possible. I don't mean the Scope of the Stream. I mean the scope of the PubSub. By not staggering the construction you create a brand new PubSub for every time a stream is evaluated that lives only for the duration of said stream. Practically speaking, you can remove it entirely without changing the behavior of the util at all.

Using forkDaemon is the only correct and 100% resource-safe solution

It's really not. I am pretty sure that for n listeners you actually zombify n-1 of them.

datner avatar Jul 08 '24 00:07 datner