umbrella icon indicating copy to clipboard operation
umbrella copied to clipboard

[csp] Bug with concurrent select()s using the same channel

Open azerum opened this issue 3 months ago • 1 comments

The issue is that when Promise.race() inside select() completes, select() removes the first race of each participating channel. However, that race might have been added by another select(), called earlier. Here, s1 does not unblock as its race was removed by s2:

import { channel, select } from '@thi.ng/csp'

void main()

async function main() {
    const a = channel(1) // chan-0
    const b = channel(1) // chan-1
    const c = channel(1) // chan-2

    const s1 = select(a, b)
    const s2 = select(a, c)

    c.write(1)
    console.log(await s2)

    a.write(1)
    console.log(await s1)
}

Expected output:

[1, <channel id: chan-2>]
[1, <channel id: chan-0>]

Actual output:

[1, <channel id: chan-2>]
<s1 remains blocked>

This can happen in a quite usual async code if two routines concurrently do such selects, and then c.write() happens somewhere

Suggested fix:

One approach is to provide a way to cancel a particular race, so selects cancel only their races. E.g. race() can return a cancel function along the promise (or accept AbortSignal)

azerum avatar Oct 08 '25 15:10 azerum

Also, what are you thoughts about cancellation of channel operations? One may want to cancel writing/reading based on external signal

In Go they pass around a channel that is closed to signal cancellation, and then use it with select:

select {
case channel <- x:
	fmt.Println("Written value")
case <-done:
	fmt.Println("Cancelled")
}

There is Context that essentially wraps around such a channel, so in the example above one can replace <-done with <-context.Done. This is idiomatic way to support cancellation across APIs

For better or for worse, in JS idiomatic way to do cancellation is AbortSignal. The problem is that it is based on events and listeners, so to wait for cancellation you have to subscribe to event, and then not forget to remove the listener, as otherwise listeners can accumulate and leak memory

So there is no straightforward way to implement function simiarl timeout() which takes AbortSignal and returns a channel that closes on abort, as it does not know when to remove the listener

I can think of ways to implement cancellation:

  1. Pass Channel<never> around, that closes to signal cancellation. Ignore AbortSignal. Does not work well if you ever going to write a public API function that wants idiomatic cancellation and uses channels inside

  2. Leverage the new using keyword from TS 5.2+ to make Context-like thing that removes the listener on dispose:

function doSomething(ch: Channel<number>, signal?: AbortSignal) {
  using ctx = contextFromMaybeSignal(signal)
  return await select([ch, ctx])
}
  1. Extend select() to support AbortSignal. I tried that in @azerum/ts-csp, and as a bonus, one can race any (signal: AbortSignal) => Promise<T> function. E.g. there is returnOnAbort(signal: AbortSignal, cancelSignal?: AbortSignal) => Promise<unknown> that resolves with signal.reason when it aborts, or can be cancelled via another signal:
function doSomething(ch: Channel<number>, signal?: AbortSignal) {
  return await select({ 
    ch: ch.raceRead(), 
    aborted: signal === undefined ? null : s => returnOnAbort(signal, s),
  })
}

And the bonus - racing with other async functions:

import timers from 'node:timers/promises'

await select({
  ch: ch.raceRead(),
  timedOut: s => timers.setTimeout(1000, undefined, s),
})

This does add more operations for the code to do, with creating AbortControllers, adding and removing listeners

azerum avatar Oct 14 '25 14:10 azerum