async icon indicating copy to clipboard operation
async copied to clipboard

Add support to Async::Barrier to wait for the first response

Open tadman opened this issue 5 years ago • 9 comments

There are occasions where any one of several Task activities might produce a result and only the first result matters.

Ideally there's a method like #wait_first which can return the result of the first task to finish.

In some cases it may be necessary to select the first non-nil result, but that could be handled with an Async::Condition instead.

tadman avatar Apr 03 '20 02:04 tadman

Just in case if someone is looking for an implementation:

# frozen_string_literal: true

def wait_first(*tasks, task: Async::Task.current)
  c = Async::Notification.new

  await = lambda do |task|
    task.async do
      task.wait
      c.signal(task)
    rescue StandardError
      c.signal(task)
    end
  end

  tasks.each(&await)

  first = c.wait

  [first, tasks - [first]]
end

Usage:

done, pending = wait_first(task1, task2, task3)
pending.each(&:stop) # cancel others
done.wait # return or re-raise first result

The API is influenced by python's asyncio.wait(tasks,return_when=asyncio.FIRST_COMPLETED)

Personally I'd be happy to have a similar method defined on Async. Something like Async.wait(*tasks, mode: :all) - awaits all tasks, a shortcut for Barrier Async.wait(*tasks, mode: :first) - awaits first, either my implementation or a shortcut for some primitive with similar logic

I'd like to volunteer myself for this task. WDYT?

zhulik avatar Jul 26 '22 23:07 zhulik

Sure sounds great, I don't have any strong opinions on the interface except that if it's fundamentally different from wait we should make a separate method.

ioquatix avatar Jul 27 '22 01:07 ioquatix

What about Async.wait_for?

zhulik avatar Jul 27 '22 10:07 zhulik

Waiting for all tasks vs waiting for the first task are quite different things IMHO and the use case is different. Waiting for all tasks can be achieved by tasks.each(&:wait). Waiting for the first N tasks to complete is more complex since you need to track all the tasks.

Needing to spawn N tasks to wait for the first task to finish out of N tasks seems like an anti-pattern.

We might need to consider how to do this more efficiently.

ioquatix avatar Jul 27 '22 10:07 ioquatix

A Barrier-like primitive?

w = Async::Waiter.new # naming is currently out of scope
w.async{}
w.async{}
w.async{}
done, pending = w.wait_first(2) # or maybe w.wait(first: 2). Wait without arguments will wait for all just like Barrier, but with different return value

Even though waiting for all and waiting for N have different use cases, waiting for all is just a special case of waiting for N. The proposed primitive does not care about task results letting the user decide how to deal with them and handle possible exceptions by awaiting the done tasks again. In case of waiting for all, the done list will contain all tasks and pending will be empty

zhulik avatar Jul 27 '22 10:07 zhulik

The most efficient implementation is something like this:

#!/usr/bin/env ruby

require 'async'

class Waiter
  def initialize
    @finished = Async::Condition.new
  end

  def async(&block)
    Async do
      result = begin
        yield
      rescue => error
        error
      end

      @finished.signal(result)
    end
  end

  def wait(n = 1)
    n.times.collect do
      @finished.wait
    end
  end
end

Async do
  waiter = Waiter.new
  waiter.async do
    sleep 1
    puts "1"; "1"
  end
  waiter.async do
    sleep 2
    puts "2"; "2"
  end
  waiter.async do
    sleep 3
    puts "3"; "3"
  end
  pp waiter.wait(2)
end

But that assumes we can force the user via this funnel.

ioquatix avatar Jul 27 '22 11:07 ioquatix

Async::Barrier already makes the assumption that users are going to call barrier.async so we could add the interface there for wait(n = nil) (nil for all, Integer for some subset)? or wait_for(n = 1)/wait_some?

ioquatix avatar Jul 27 '22 11:07 ioquatix

In case if a task spawned with Waiter raises an exception your implementation will return it as a value, do I understend it correctly?

What if the task is supposed to return an instance of an exception(can't imagine a use case but it's possible in theory)? How to distinguish it from an actual exception raised from the task?

It also gives no information about pending tasks, the user would have to find what is still in progress manually after getting requested first results

And the last one: exceptions raised from the pending tasks after the user got their first results would be silently swallowed even if the user finds and awaits pending tasks manually.

Thats why I believe returning two lists of tasks from wait is better than returning their results. It forces the user to do something with them. Collect the results, handle or reraise exceptions and either stop, or let pending tasks work further if they need so

The other option would be tracking of spawned tasks inside the waiter:

w.wait_for(2) # no need to return anything 
# after waiting the done and pending lists do not change anymore, but tasks from pending still running and may finish any time 
w.done # get finished tasks
w.pending # get pending

zhulik avatar Jul 27 '22 11:07 zhulik

In case if a task spawned with Waiter raises an exception your implementation will return it as a value, do I understend it correctly?

It was just a MVP, you'd need to handle this correctly.

What if the task is supposed to return an instance of an exception(can't imagine a use case but it's possible in theory)? How to distinguish it from an actual exception raised from the task?

This is handled by Fiber#raise but not in this code. Again, we might need to consider how to deal with it.

It also gives no information about pending tasks, the user would have to find what is still in progress manually after getting requested first results

The idea of a barrier is that it's stateful, you can call wait(2) several times until it's exhausted.

ioquatix avatar Jul 27 '22 21:07 ioquatix

We have introduced Async::LimitedBarrier for this purpose.

https://github.com/socketry/async/blob/main/lib/async/limited_barrier.rb

ioquatix avatar Oct 31 '22 23:10 ioquatix

Aaaaaand it's gone?

tadman avatar Nov 06 '22 19:11 tadman

I think it was renamed https://github.com/socketry/async/pull/196

trevorturk avatar Nov 06 '22 20:11 trevorturk

I'm still thinking about this.

Oddly enough, this morning when I was half asleep I started going through the entire design in my head and my sub-concious figured out we don't need Async::Waiter either, since we can do the same implementation on Async::Group just as efficiently. I'll make a PR.

ioquatix avatar Nov 06 '22 21:11 ioquatix

I was wondering about Group as well since it could be pretty flexible in use.

trevorturk avatar Nov 06 '22 21:11 trevorturk