effect icon indicating copy to clipboard operation
effect copied to clipboard

feat(Stream): raceAll implementation

Open dilame opened this issue 1 year ago • 2 comments

Type

  • [ ] Refactor
  • [x] Feature
  • [ ] Bug Fix
  • [ ] Optimization
  • [ ] Documentation Update

Description

I implemented raceAll using high-level API, not sure if it's a correct way. I just don't understand the low-level API yet:)

dilame avatar Jun 30 '24 12:06 dilame

🦋 Changeset detected

Latest commit: 45c3fadbe799fd12afab09b6d2d4f9c795d0b245

The changes in this PR will be included in the next version bump.

This PR includes changesets to release 30 packages
Name Type
effect Minor
@effect/cli Major
@effect/cluster-browser Major
@effect/cluster-node Major
@effect/cluster-workflow Major
@effect/cluster Major
@effect/experimental Major
@effect/opentelemetry Major
@effect/platform-browser Major
@effect/platform-bun Major
@effect/platform-node-shared Major
@effect/platform-node Major
@effect/platform Major
@effect/printer-ansi Major
@effect/printer Major
@effect/rpc-http Major
@effect/rpc Major
@effect/schema Major
@effect/sql-d1 Major
@effect/sql-drizzle Major
@effect/sql-mssql Major
@effect/sql-mysql2 Major
@effect/sql-pg Major
@effect/sql-sqlite-bun Major
@effect/sql-sqlite-node Major
@effect/sql-sqlite-react-native Major
@effect/sql-sqlite-wasm Major
@effect/sql Major
@effect/typeclass Major
@effect/vitest Major

Not sure what this means? Click here to learn what changesets are.

Click here if you're a maintainer who wants to add another changeset to this PR

changeset-bot[bot] avatar Jun 30 '24 12:06 changeset-bot[bot]

Now it should be a correct way

dilame avatar Jul 02 '24 11:07 dilame

@tim-smart I think i'm done, could you review, please?

dilame avatar Jul 04 '24 04:07 dilame

@tim-smart a strange thing is happening.

https://effect.website/play#a7005d4f9f10

The program is finished, but the process is hanging. I didn't get the reason yet, will investigate, but it looks like a deep bug somewhere out of this PR scope

dilame avatar Jul 04 '24 05:07 dilame

I improved the docs, refactored the takeWhile predicate function body to be more readable. It stucks on the docs generation now, and the reason is this behaviour that looks like a bug

dilame avatar Jul 04 '24 05:07 dilame

@tim-smart a strange thing is happening.

https://effect.website/play#a7005d4f9f10

The program is finished, but the process is hanging. I didn't get the reason yet, will investigate, but it looks like a deep bug somewhere out of this PR scope

That seems to be exiting fine. Not sure what the docs issue is then.

tim-smart avatar Jul 04 '24 05:07 tim-smart

No, it's not exiting. I am checking it locally on my machine.

dilame avatar Jul 04 '24 05:07 dilame

I'm sorry, i provided a wrong code. This one is not exiting

https://effect.website/play#2bd210998527

dilame avatar Jul 04 '24 05:07 dilame

It does exiting as expected if run it with NodeRuntime.runMain, but doesn't with Effect.runPromise

dilame avatar Jul 04 '24 05:07 dilame

NodeRuntime waits until program is finished and forcefully kills the process. Effect.runPromise just runs the program.

I assume the issue is somewhere here https://github.com/Effect-TS/effect/blob/main/packages/effect/src/internal/core.ts#L1532-L1537

dilame avatar Jul 04 '24 05:07 dilame

Yes, the issue is here.

I modified the node_modules/effect/dist/esm/internal/core.js like this

export const never = /*#__PURE__*/async(() => {
  const interval = setInterval(() => void 0, 2 ** 31 - 1);
  console.log('set interval');
  return sync(() => {
    console.log('clear interval');
    clearInterval(interval)
  });
});

and the output is

tsx ./src/bugs/effect.interrupt.bug.ts
set interval
0
1
2
3
4
5

There are no clear interval log

dilame avatar Jul 04 '24 05:07 dilame

I modified the code like this, and it works now. Looks like a dirty workaround, not sure if it's a good idea to do it. What do you think about it?

export const raceAll = <S extends ReadonlyArray<Stream.Stream<any, any, any>>>(
  ...streams: S
): Stream.Stream<
  Stream.Stream.Success<S[number]>,
  Stream.Stream.Error<S[number]>,
  Stream.Stream.Context<S[number]>
> =>
  Effect.all([Deferred.make<void>(), Deferred.make<void>()]).pipe(
    Effect.map(([halt, forever]) => {
      let winner: number | null = null;
      return Stream.mergeAll(
        streams.map((stream, index) =>
          stream.pipe(
            Stream.takeWhile(() => {
              if (winner === null) {
                winner = index;
                Deferred.unsafeDone(halt, Exit.void);
                return true;
              }
              return winner === index;
            }),
            Stream.interruptWhen(
              Deferred.await(halt).pipe(
                Effect.flatMap(() =>
                  winner === index ? Deferred.await(forever) : Effect.void,
                ),
              ),
            ),
          ),
        ),
        { concurrency: streams.length },
      );
    }),
    Stream.unwrap,
  );

dilame avatar Jul 04 '24 05:07 dilame

I think we need @mikearnaldi here, because the bug looks serious

dilame avatar Jul 04 '24 06:07 dilame

Here is an even easier code to reproduce

import { Effect, Schedule, Stream } from 'effect';

const program = Stream.fromSchedule(Schedule.spaced('1 second')).pipe(
  Stream.interruptWhen(Effect.never),
  Stream.take(1),
  Stream.runCollect,
);

Effect.runPromise(program);

dilame avatar Jul 04 '24 06:07 dilame