feat: add continuous batching for concurrent request processing
Motivation
Enable the runner to process multiple concurrent inference requests efficiently. Previously, requests were processed sequentially - one had to complete before the next could start. With continuous batching, multiple requests can be processed in parallel, significantly improving throughput.
Changes
-
New
BatchGenerationEngine(batch_engine.py): Wraps mlx_lm'sBatchGeneratorto manage continuous batching. Handles request insertion, token generation, and response routing. -
New
distributed_sync.py: Implementsshare_object()for syncing batch state across distributed ranks usingmx.distributed.all_sum(). -
Modified
runner.py: Changed from blocking sequential loop to non-blocking pattern that polls for new tasks between decode steps. -
Modified
plan.py: Updated to forward tasks to runners inRunnerRunningstate (not justRunnerReady). -
Modified
runner_supervisor.py: Fixed race condition by keeping tasks inpendinguntil completion (not just acknowledgement). -
Updated
RunnerRunning: Addedactive_requestsfield to track batch size.
Why It Works
mlx_lm's BatchGenerator handles the complexity of continuous batching internally:
- New prompts are prefilled while existing requests decode
- KV caches are concatenated when new requests join the batch
- All active requests generate one token per decode step
The runner now:
- Drains all pending tasks non-blocking (
receive_nowait()) - Runs one batch decode step (~1-5ms)
- Loops back to check for new tasks
This ensures new requests can join the batch mid-generation.
Test Plan
Manual Testing
Hardware: MacBook Pro M1 Max (Apple Silicon)
What I did:
- Ran
test_concurrent.pywhich sends 4-8 concurrent streaming requests - Verified all requests complete with overlapping token delivery times (proving concurrency)
- Observed ~3.5x speedup with 4 requests, ~7x with 8 requests
Automated Testing
- Added
test_continuous_batching.pywith unit tests for the new batching logic - Updated
test_event_ordering.pyto work with the new batch engine
🤖 Generated with Claude Code
its open!
Moving back to draft. Needs some further work.
ok - i had some comments, ill submit them but I didnt make it to the runner. looks good though!
Code Review — PR #1153: feat: add continuous batching for concurrent request processing
CI Status: All checks passing (typecheck, build on aarch64-darwin, x86_64-linux, aarch64-linux).
Overview
This PR replaces the sequential one-request-at-a-time text generation path in the MLX runner with a continuous batching architecture based on mlx_lm.BatchGenerator. The key additions are:
-
BatchGenerationEngine(batch_engine.py): WrapsBatchGenerator, manages request lifecycle (queue, insert, step, complete), handles distributed sync. -
distributed_sync.py: Two-phaseshare_object()for broadcasting data from rank 0 viaall_sum. -
TimeBudget(time_budget.py): Iterator that controls generation loop duration, with distributed timing sync. -
Rewritten
runner.pymain loop: Switches from blockingfor task in tasksto a two-phase loop: (1) run generation steps withTimeBudget, polling for new tasks mid-cycle, (2) process non-generation tasks. -
plan.py/runner_supervisor.py: Allow task forwarding toRunnerRunningrunners; keep tasks inpendinguntil fully complete. -
apply.py:apply_instance_deletednow cleans up associated runners from state. -
Tool call & model-specific handling re-implemented:
ToolCallTracker,GptOssTracker, Kimi/GLM patches, thinking model support are all re-implemented for the batch path.
Since AlexCheema's last CTO review, the critical issues he identified appear to have been addressed. The generation loop now exists (via _run_generation_steps + TimeBudget), tasks are not marked complete prematurely, and tool calls are handled. I will focus on what remains.
Critical Issues
1. _narrow_finish_reason silently drops tool_calls finish reason
In runner.py lines 112-118 (_narrow_finish_reason), the function narrows the finish reason to Literal["stop", "length", "content_filter"] | None. If batch_engine.py:step() returns finish_reason="tool_calls" (which it correctly preserves via get_args(FinishReason)), the _narrow_finish_reason function in runner.py silently converts it to "stop". This means the TokenChunk emitted for a tool-call completion will report finish_reason="stop" instead of "tool_calls", which could confuse downstream OpenAI-compatible clients expecting the correct reason.
The ToolCallTracker.process_token() at line 158 calls _narrow_finish_reason(response.finish_reason) when creating TokenChunk for tool call parse failures, so this is not merely theoretical — the narrowing is applied in the tool call code path. If a ToolCallChunk is emitted the finish reason from the batch engine is discarded, but if parsing fails the narrowed (incorrect) reason is propagated.
def _narrow_finish_reason(
raw: str | None,
) -> Literal["stop", "length", "content_filter"] | None:
if raw is None or raw in ("stop", "length", "content_filter"):
return raw
return "stop"
Consider adding "tool_calls" to the accepted set, or at minimum logging when an unknown reason is narrowed.
2. Per-request sampler parameters: "last request wins" for the entire batch
In batch_engine.py lines 160-166, sync_and_insert_pending updates the sampler from the last insert's parameters. Since BatchGenerator uses a single sampler for all requests in the batch, this means if request A has temperature=0.0 and request B has temperature=1.0, and they're inserted in the same batch, ALL requests (including A) will use temperature=1.0. This is a semantic correctness issue — users sending requests with different sampling parameters will get unexpected behavior.
# Update sampler from per-request parameters (last request wins for batch)
last = inserts_to_process[-1]
self.batch_gen.sampler = make_sampler(
temp=last.temperature if last.temperature is not None else 0.7,
top_p=last.top_p if last.top_p is not None else 1.0,
top_k=last.top_k if last.top_k is not None else 0,
)
This should at minimum be documented prominently (e.g., in the API response or logs), or ideally the batch engine should refuse to batch requests with incompatible sampling parameters.
Significant Issues
3. KVPrefixCache removed with no replacement
The old code used KVPrefixCache for efficient handling of repeated/similar prompts. This is entirely removed with no equivalent in the batch path. For workloads with shared system prompts (common in production), this is a meaningful performance regression. The BatchGenerator may have its own internal caching, but this should be verified and documented.
4. share_object does not guard against non-rank-0 callers passing non-None data
In distributed_sync.py, the function asserts that rank 0 provides non-None data, but does not assert that non-rank-0 callers pass None. If a non-rank-0 caller accidentally passes real data, all_sum would silently add those bytes to the rank-0 data, corrupting the result:
if rank == 0:
assert obj is not None, "Rank 0 must provide data..."
data = mx.array(list(pickle.dumps(obj)), dtype=mx.uint8)
...
else:
size = int(mx.distributed.all_sum(mx.array([0]), group=group).item())
...
Add assert obj is None in the else branch for defensive programming.
5. Detokenizer instance sharing across requests
In batch_engine.py line 181, each ActiveRequest is assigned self.tokenizer.detokenizer. If detokenizer is a property that returns a shared instance (not a new copy), all concurrent requests would share the same detokenizer state, corrupting text output. If it returns a new instance each time, this is fine. Worth verifying with StreamingDetokenizer's behavior.
6. emit_task_completion flag is always True for non-TextGeneration tasks
In the runner main loop, emit_task_completion is set to True at line 1130 and never set to False within the match/case block. This means every non-TextGeneration task (ConnectToGroup, LoadModel, StartWarmup, Shutdown) will emit both TaskStatusUpdated(Complete) AND RunnerStatusUpdated at the end of the for-loop. Looking at the old code, this matches the previous behavior, but the variable name suggests it was intended to be conditionally set to False for some cases. The Shutdown case in particular already transitions to RunnerShutdown status inside the match block, then the outer emit_task_completion block re-emits the same status.
7. Error handling in _run_generation_steps re-raises after cleanup
In _run_generation_steps (runner.py around line 460-485), when batch_engine.step() throws an exception, the code sends ErrorChunk and TaskStatusUpdated(Complete) for all in-flight tasks, then re-raises the exception. This will crash the runner process. In the old code, the exception was also re-raised, but only for a single request. With batching, a single exception now kills ALL in-flight requests AND the entire runner process. Consider whether the runner should attempt to recover (e.g., reset the batch engine) rather than crashing.
Minor Issues
8. conftest.py at repo root
A new conftest.py is added at the repo root with collect_ignore = ["tests/start_distributed_test.py"]. This is fine functionally, but the comment in the PR description doesn't mention it. Is this meant to be temporary?
9. AGENTS.md changes bundled into this PR
The PR adds documentation about model storage, API testing, distributed testing, and logs to AGENTS.md. While useful, these are unrelated to continuous batching and would be cleaner in a separate commit/PR.
10. TimeBudget distributed sync uses all_sum for timing
The TimeBudget class uses mx.distributed.all_sum to aggregate timing across ranks, then divides by group.size() * sync_frequency. This computes an average loop time across ranks and sync periods. However, the slowest rank determines the actual throughput — using an average may cause faster ranks to overshoot their iteration count. Consider using all_max (if available) instead of all_sum / count.
11. Hardcoded default temp=0.7
The default temperature of 0.7 in BatchGenerationEngine.__init__ (line 75) and in sync_and_insert_pending (line 163) should be documented or pulled from a shared constant. The old code path did not have a hardcoded default — it came from the task parameters.
12. _process_generation_results uses MpSender.send (synchronous)
The function signature takes MpSender[Event] but uses synchronous .send() calls. In the test harness this works via EventCollector, but in production the synchronous send to a multiprocessing pipe could block if the pipe buffer is full. The old code also used synchronous sends in the runner process, so this is not a regression, but worth noting.
What's Good
- Architecture is sound: The two-phase main loop (generate, then process tasks) with inline TextGeneration handling during generation is well-designed. It ensures new requests can join the batch without breaking the decode loop while non-generation tasks (Shutdown, etc.) cause a clean exit.
-
Comprehensive test coverage: Two new test files with 10+ test cases covering single requests, multiple concurrent requests, tool calls, staggered completions, edge cases (interrupted tool calls, mixed finish reasons, 10 simultaneous completions). The
ScriptedBatchEnginepattern is elegant. -
Tool call and model-specific handling re-implemented:
ToolCallTracker,GptOssTracker, Kimi token filtering, GLM tokenizer patching, and thinking model support are all present in the batch path — addressing a major gap from the earlier revision. -
apply_instance_deletedcleanup: The fix to clean up runner entries when an instance is deleted is correct, well-tested, and orthogonal to the batching work but valuable. -
Supervisor
pendingfix: Keeping tasks inpendinguntilTaskStatusUpdated(Complete)prevents duplicate dispatch — this is the right fix for continuous batching where acknowledgment and completion are separated. -
Clean separation:
BatchGenerationEngine,distributed_sync, andTimeBudgetare cleanly separated into focused modules with clear responsibilities.
Verdict
This is a substantial and well-structured PR. The core continuous batching architecture is solid, and the major gaps from the earlier revision (no generation loop, premature task completion, missing tool call handling) have been addressed. The remaining issues are: (1) the _narrow_finish_reason silently dropping tool_calls, (2) the "last request wins" sampler behavior needing documentation or guards, and (3) the removed KVPrefixCache as a performance consideration. None are blockers, but (1) should be fixed before merge.
Review only — not a merge approval.