async.nvim icon indicating copy to clipboard operation
async.nvim copied to clipboard

Make sure parallel tasks can be throttled to a limit

Open lewis6991 opened this issue 9 months ago • 3 comments

  • Could be done in a lower level form using a mutex pool or a semaphore around vim.system.

lewis6991 avatar Feb 21 '25 10:02 lewis6991

Just as a note, this regression from "async v0" is a blocker for upgrading to v2 in nvim-treesitter.

clason avatar Mar 23 '25 10:03 clason

You can throttle to a limit manually without much work

local function pop(tasks, n)
  local r = {}
  for i = 1, n do
    table.insert(r, table.remove(tasks, 1)
  end
  return r
end

local tasks = ... -- async tasks

local head = pop(tasks, limit)

while next(head) do
  for r in async.iter(head) do
    -- consume
  end

  head = parition(tasks, limit)
end

This also easily allows for an interrupt check to be inserted too.

Doesn't mean we can't add this in, but just demonstrating how the new API is a bit more general than before.

lewis6991 avatar Mar 23 '25 11:03 lewis6991

Yeah, maybe it's cleaner not to put this into the join itself. Makes async v2 less of a drop-in replacement, but that's not a criticism (just means I need more help).

(parition probably should be pop, too?)

clason avatar Mar 23 '25 11:03 clason

See https://github.com/lewis6991/async.nvim/blob/main/lua/async/misc.lua for two variants of this.

clason avatar May 16 '25 15:05 clason

There is now a proper way to do this using semaphores

--- Like async.join, but with a limit on the number of concurrent tasks.
--- @async
--- @param max_jobs integer
--- @param task_funs async.TaskFun[]
function M.join_n(max_jobs, task_funs)
  if #task_funs == 0 then
    return
  end

  local semaphore = async.semaphore(max_jobs)

  for _, task_fun in ipairs(task_funs) do
    semaphore:with(function()
      async.await(task_fun)
    end)
  end
end

lewis6991 avatar Jun 05 '25 15:06 lewis6991

PR to nvim-treesitter? 🥺

clason avatar Jun 05 '25 16:06 clason

Probably best to wait until this is in nvim.

lewis6991 avatar Jun 05 '25 17:06 lewis6991

Yeah, probably. The code is no longer luals compatible, and the semaphore join doesn't interleave tasks like the current code does.

clason avatar Jun 06 '25 07:06 clason

It should interleave tasks, it's intended to work exactly the same. When one task finishes a new one can begin.

And yes, the async stuff makes heavy use of generics which LuaLS doesn't really support. Not sure on how we can integrate this into neovim. Might just need to make heavy use of @type for LuaLS

lewis6991 avatar Jun 06 '25 08:06 lewis6991

It should interleave tasks, it's intended to work exactly the same. When one task finishes a new one can begin.

Hmm, I'm seeing a difference in nvim-treesitter: current main can interleave different tasks for different languages (so "Download A, Download B, Compile A, ...") while the semaphore snippet chains task ("Download A, Compile A, ..., Download B, Compile B, ...).

And yes, the async stuff makes heavy use of generics which LuaLS doesn't really support. Not sure on how we can integrate this into neovim. Might just need to make heavy use of @type for LuaLS

It's not a big deal for me since I'm planning to switch to emmyluals for nvim-treesitter sooner rather than later (only 14 warnings and errors remaining in the runtime, most of which are in and around async.nvim -- the scripts are another matter, though...)

clason avatar Jun 06 '25 08:06 clason

Hmm, I'm seeing a difference in nvim-treesitter: current main can interleave different tasks for different languages (so "Download A, Download B, Compile A, ...") while the semaphore snippet chains task ("Download A, Compile A, ..., Download B, Compile B, ...).

Ok thanks for testing. Admittedly, I haven't tested any of this yet. Can you show me what you tried?

lewis6991 avatar Jun 06 '25 08:06 lewis6991

I just plonked the code into nvim-treesitter (replace async.lua with current master and install.join with your snippet above), then compared :TSInstall! lua vimdoc (or ./scripts/install-parsers.lua lua vimdoc).

clason avatar Jun 06 '25 08:06 clason

I just found that luv has recently added uv.new_sem(), uv.sem_post(), uv.sem_wait() and uv.sem_trywait(). 👀

Though this is slightly different to what is added here as the luv version is designed to work across luv threads.

lewis6991 avatar Jun 06 '25 09:06 lewis6991

It does work, I just told you the wrong code:

        local semaphore = Async.semaphore(3)
        local tasks = {} --- @type async.Task<nil>[]
        for i = 1, 5 do
          tasks[#tasks + 1] = run(function()
            semaphore:with(function()
              ret[#ret + 1] = 'start' .. i
              schedule()
              ret[#ret + 1] = 'end' .. i
            end)
          end)
        end
        Async.join(tasks)

lewis6991 avatar Jun 06 '25 10:06 lewis6991

That doesn't quite seem like something I can copy&paste, but happy to hear that it works!

clason avatar Jun 06 '25 10:06 clason

How about this?

--- Like async.join, but with a limit on the number of concurrent tasks.
--- @async
--- @param max_jobs integer
--- @param task_funs async.TaskFun[]
function M.join_n_3(max_jobs, task_funs)
  if #task_funs == 0 then
    return
  end

  local semaphore = async.semaphore(max_jobs)

  local tasks = {}

  for _, task_fun in ipairs(task_funs) do
    tasks[#tasks + 1] = async.run(function()
      semaphore:with(function()
        async.await(task_fun)
      end)
    end)
  end

  async.join(tasks)
end

lewis6991 avatar Jun 06 '25 10:06 lewis6991

Yep, works!

clason avatar Jun 06 '25 10:06 clason