feat(Stream): raceAll implementation
Type
- [ ] Refactor
- [x] Feature
- [ ] Bug Fix
- [ ] Optimization
- [ ] Documentation Update
Description
I implemented raceAll using high-level API, not sure if it's a correct way. I just don't understand the low-level API yet:)
🦋 Changeset detected
Latest commit: 45c3fadbe799fd12afab09b6d2d4f9c795d0b245
The changes in this PR will be included in the next version bump.
This PR includes changesets to release 30 packages
| Name | Type |
|---|---|
| effect | Minor |
| @effect/cli | Major |
| @effect/cluster-browser | Major |
| @effect/cluster-node | Major |
| @effect/cluster-workflow | Major |
| @effect/cluster | Major |
| @effect/experimental | Major |
| @effect/opentelemetry | Major |
| @effect/platform-browser | Major |
| @effect/platform-bun | Major |
| @effect/platform-node-shared | Major |
| @effect/platform-node | Major |
| @effect/platform | Major |
| @effect/printer-ansi | Major |
| @effect/printer | Major |
| @effect/rpc-http | Major |
| @effect/rpc | Major |
| @effect/schema | Major |
| @effect/sql-d1 | Major |
| @effect/sql-drizzle | Major |
| @effect/sql-mssql | Major |
| @effect/sql-mysql2 | Major |
| @effect/sql-pg | Major |
| @effect/sql-sqlite-bun | Major |
| @effect/sql-sqlite-node | Major |
| @effect/sql-sqlite-react-native | Major |
| @effect/sql-sqlite-wasm | Major |
| @effect/sql | Major |
| @effect/typeclass | Major |
| @effect/vitest | Major |
Not sure what this means? Click here to learn what changesets are.
Click here if you're a maintainer who wants to add another changeset to this PR
Now it should be a correct way
@tim-smart I think i'm done, could you review, please?
@tim-smart a strange thing is happening.
https://effect.website/play#a7005d4f9f10
The program is finished, but the process is hanging. I didn't get the reason yet, will investigate, but it looks like a deep bug somewhere out of this PR scope
I improved the docs, refactored the takeWhile predicate function body to be more readable.
It stucks on the docs generation now, and the reason is this behaviour that looks like a bug
@tim-smart a strange thing is happening.
https://effect.website/play#a7005d4f9f10
The program is finished, but the process is hanging. I didn't get the reason yet, will investigate, but it looks like a deep bug somewhere out of this PR scope
That seems to be exiting fine. Not sure what the docs issue is then.
No, it's not exiting. I am checking it locally on my machine.
I'm sorry, i provided a wrong code. This one is not exiting
https://effect.website/play#2bd210998527
It does exiting as expected if run it with NodeRuntime.runMain, but doesn't with Effect.runPromise
NodeRuntime waits until program is finished and forcefully kills the process. Effect.runPromise just runs the program.
I assume the issue is somewhere here https://github.com/Effect-TS/effect/blob/main/packages/effect/src/internal/core.ts#L1532-L1537
Yes, the issue is here.
I modified the node_modules/effect/dist/esm/internal/core.js like this
export const never = /*#__PURE__*/async(() => {
const interval = setInterval(() => void 0, 2 ** 31 - 1);
console.log('set interval');
return sync(() => {
console.log('clear interval');
clearInterval(interval)
});
});
and the output is
tsx ./src/bugs/effect.interrupt.bug.ts
set interval
0
1
2
3
4
5
There are no clear interval log
I modified the code like this, and it works now. Looks like a dirty workaround, not sure if it's a good idea to do it. What do you think about it?
export const raceAll = <S extends ReadonlyArray<Stream.Stream<any, any, any>>>(
...streams: S
): Stream.Stream<
Stream.Stream.Success<S[number]>,
Stream.Stream.Error<S[number]>,
Stream.Stream.Context<S[number]>
> =>
Effect.all([Deferred.make<void>(), Deferred.make<void>()]).pipe(
Effect.map(([halt, forever]) => {
let winner: number | null = null;
return Stream.mergeAll(
streams.map((stream, index) =>
stream.pipe(
Stream.takeWhile(() => {
if (winner === null) {
winner = index;
Deferred.unsafeDone(halt, Exit.void);
return true;
}
return winner === index;
}),
Stream.interruptWhen(
Deferred.await(halt).pipe(
Effect.flatMap(() =>
winner === index ? Deferred.await(forever) : Effect.void,
),
),
),
),
),
{ concurrency: streams.length },
);
}),
Stream.unwrap,
);
I think we need @mikearnaldi here, because the bug looks serious
Here is an even easier code to reproduce
import { Effect, Schedule, Stream } from 'effect';
const program = Stream.fromSchedule(Schedule.spaced('1 second')).pipe(
Stream.interruptWhen(Effect.never),
Stream.take(1),
Stream.runCollect,
);
Effect.runPromise(program);