async
async copied to clipboard
Add support to Async::Barrier to wait for the first response
There are occasions where any one of several Task activities might produce a result and only the first result matters.
Ideally there's a method like #wait_first which can return the result of the first task to finish.
In some cases it may be necessary to select the first non-nil result, but that could be handled with an Async::Condition instead.
Just in case if someone is looking for an implementation:
# frozen_string_literal: true
def wait_first(*tasks, task: Async::Task.current)
c = Async::Notification.new
await = lambda do |task|
task.async do
task.wait
c.signal(task)
rescue StandardError
c.signal(task)
end
end
tasks.each(&await)
first = c.wait
[first, tasks - [first]]
end
Usage:
done, pending = wait_first(task1, task2, task3)
pending.each(&:stop) # cancel others
done.wait # return or re-raise first result
The API is influenced by python's asyncio.wait(tasks,return_when=asyncio.FIRST_COMPLETED)
Personally I'd be happy to have a similar method defined on Async. Something like
Async.wait(*tasks, mode: :all) - awaits all tasks, a shortcut for Barrier
Async.wait(*tasks, mode: :first) - awaits first, either my implementation or a shortcut for some primitive with similar logic
I'd like to volunteer myself for this task. WDYT?
Sure sounds great, I don't have any strong opinions on the interface except that if it's fundamentally different from wait we should make a separate method.
What about Async.wait_for?
Waiting for all tasks vs waiting for the first task are quite different things IMHO and the use case is different. Waiting for all tasks can be achieved by tasks.each(&:wait). Waiting for the first N tasks to complete is more complex since you need to track all the tasks.
Needing to spawn N tasks to wait for the first task to finish out of N tasks seems like an anti-pattern.
We might need to consider how to do this more efficiently.
A Barrier-like primitive?
w = Async::Waiter.new # naming is currently out of scope
w.async{}
w.async{}
w.async{}
done, pending = w.wait_first(2) # or maybe w.wait(first: 2). Wait without arguments will wait for all just like Barrier, but with different return value
Even though waiting for all and waiting for N
have different use cases, waiting for all is just a special case of waiting for N. The proposed primitive does not care about task results letting the user decide how to deal with them and handle possible exceptions by awaiting the done tasks again. In case of waiting for all, the done list will contain all tasks and pending will be empty
The most efficient implementation is something like this:
#!/usr/bin/env ruby
require 'async'
class Waiter
def initialize
@finished = Async::Condition.new
end
def async(&block)
Async do
result = begin
yield
rescue => error
error
end
@finished.signal(result)
end
end
def wait(n = 1)
n.times.collect do
@finished.wait
end
end
end
Async do
waiter = Waiter.new
waiter.async do
sleep 1
puts "1"; "1"
end
waiter.async do
sleep 2
puts "2"; "2"
end
waiter.async do
sleep 3
puts "3"; "3"
end
pp waiter.wait(2)
end
But that assumes we can force the user via this funnel.
Async::Barrier already makes the assumption that users are going to call barrier.async so we could add the interface there for wait(n = nil) (nil for all, Integer for some subset)? or wait_for(n = 1)/wait_some?
In case if a task spawned with Waiter raises an exception your implementation will return it as a value, do I understend it correctly?
What if the task is supposed to return an instance of an exception(can't imagine a use case but it's possible in theory)? How to distinguish it from an actual exception raised from the task?
It also gives no information about pending tasks, the user would have to find what is still in progress manually after getting requested first results
And the last one: exceptions raised from the pending tasks after the user got their first results would be silently swallowed even if the user finds and awaits pending tasks manually.
Thats why I believe returning two lists of tasks from wait is better than returning their results. It forces the user to do something with them. Collect the results, handle or reraise exceptions and either stop, or let pending tasks work further if they need so
The other option would be tracking of spawned tasks inside the waiter:
w.wait_for(2) # no need to return anything
# after waiting the done and pending lists do not change anymore, but tasks from pending still running and may finish any time
w.done # get finished tasks
w.pending # get pending
In case if a task spawned with Waiter raises an exception your implementation will return it as a value, do I understend it correctly?
It was just a MVP, you'd need to handle this correctly.
What if the task is supposed to return an instance of an exception(can't imagine a use case but it's possible in theory)? How to distinguish it from an actual exception raised from the task?
This is handled by Fiber#raise but not in this code. Again, we might need to consider how to deal with it.
It also gives no information about pending tasks, the user would have to find what is still in progress manually after getting requested first results
The idea of a barrier is that it's stateful, you can call wait(2) several times until it's exhausted.
We have introduced Async::LimitedBarrier for this purpose.
https://github.com/socketry/async/blob/main/lib/async/limited_barrier.rb
Aaaaaand it's gone?
I think it was renamed https://github.com/socketry/async/pull/196
I'm still thinking about this.
Oddly enough, this morning when I was half asleep I started going through the entire design in my head and my sub-concious figured out we don't need Async::Waiter either, since we can do the same implementation on Async::Group just as efficiently. I'll make a PR.
I was wondering about Group as well since it could be pretty flexible in use.