async icon indicating copy to clipboard operation
async copied to clipboard

Unsupported use case: mapConcurrently-alike with concurrency limit, and ordered job start

Open dcoutts opened this issue 7 years ago • 7 comments
trafficstars

Async is a nice abstraction, however as far as I can see the following use case is not supported in any straightforward obvious way:

  • A bunch of tasks to run
  • No more than N running at once (they consume resources other than just CPU so we want limits to work within bounded resources, and reduce thrashing overheads).

If it was just this, we could simply use async + a semaphore. There's one more requirement:

  • The list of tasks is ordered according to which ones's results are likely to be needed first, so though we cannot control when a task completes, we want to ensure we start tasks in order.

This last one is what means we cannot fit it into the async abstraction. But this is not an uncommon requirement. Consider a tool like cabal downloading package tarballs. We want to download e.g. 2-3 concurrently, but we want to start downloading the ones we'll need first, before the ones we'll need later, since we will also start building packages concurrently with the downloading.

The abstraction that cabal-install currently uses is this thing called JobControl. The above use case is handled by putting all the jobs into the job control queue in order, and the N worker threads simply grab the next one available.

Of course I'm not proposing to merge the rather-different JobControl abstraction into async, rather the challenge is if anyone can think of some way the existing async abstraction that would support this use case, and if any extensions would be needed, or if any helper utils would make this easier.

The basic problem is that an async starts a thread immediately, so having created a bunch of them there is no way to control the relative ordering. All of them can contend on a quantity semaphore, but there is no ordering guarantee. One plausible route might be to sequentialise the startup and add a startup synchronisation between the parent and the child async, so e.g. one could have the child thread enqueue onto a quantity semaphore before returning the async in the parent. This approach would not require any extensions, but could benefit from some helper utilities.

dcoutts avatar May 28 '18 15:05 dcoutts

I believe async-pool package does something like that.

sopvop avatar May 28 '18 15:05 sopvop

And note that async-pool uses a different representation of the Async type. The question is, can we do something like async-pool or JobControl with the basic async abstraction.

dcoutts avatar May 28 '18 15:05 dcoutts

@dcoutts Is QSem not suitable here? It does have FIFO semantics. Small race conditions aside, mapConcurrently jobs would roughly acquire the semaphore in jobs-order.

mitchellwrosen avatar May 29 '18 19:05 mitchellwrosen

@mitchellwrosen if you can add things into the QSem in order then it's fine, but if you start a bunch of asyncs up and each one queues itself onto the QSem then you've lost all ordering. There would have to be synchronisation, to only spawn the next one once the previous one has been enqueued (which is of course a bit tricky to arrange).

dcoutts avatar May 30 '18 18:05 dcoutts

@dcoutts Ah, that's what I meant by "small race conditions aside". Since mapConcurrently and friends at least call forkIO in container-order, then they should roughly grab the semaphore in order as well. But to your point, unfortunately there's no hard guarantee here, just a "soft" one :)

mitchellwrosen avatar May 30 '18 18:05 mitchellwrosen

You could do this by having a separate controller thread, like this:

do
   tickets <- replicateM n newEmptyMVar
   releaseQ <- newEmptyChan
   withAsync (controller tickets) $ \_ -> do
   forConcurrently tickets $ \t -> do
      bracket_ (takeMVar t) (writeChan releaseQ ()) $ do ....
 where
  controller tickets =  ...

And in the controller thread you can release tickets in the order you want, waiting for threads to complete by listening on releaseQ. (you could also do this by having the tickets be TMVars and waiting for any of them, but that would involve O(n)-sized transactions, so a Chan is better here.)

But a much nicer way would be to build precisely the abstraction you need, which is:

newTicketQueue
  :: Int -- number of tickets to issue in total
  -> Int -- number of withTickets that can run concurrently
  -> IO [Ticket]

withTicket :: Ticket -> IO a -> IO a

Implementation left as an exercise for the reader :)

simonmar avatar Jun 02 '18 07:06 simonmar

I stumbled upon http://hackage.haskell.org/package/unliftio-0.2.13/docs/UnliftIO-Async.html and it suits me well.

arrowd avatar Jun 19 '20 16:06 arrowd