agent-lightning icon indicating copy to clipboard operation
agent-lightning copied to clipboard

Bug Fix:Prevent duplicate rollouts caused by stale task requeue + lat…

Open USTCKevinF opened this issue 4 months ago • 1 comments

This PR addresses an issue with stale task requeueing that could cause duplicate rollouts to be received and stored.

Problem

  • The server requeues tasks when they time out (_check_and_requeue_stale_tasks).
  • If a worker eventually finishes and submits a rollout for a task that has already been requeued, the server still accepted the outdated result in /rollout.
  • This caused the same logical task to be processed more than once.
  • In our training loop, if this duplicate rollout is from last batch this led to:
    assert len(self._completed_rollouts) == self._total_tasks_queued
    
    failing, because duplicate rollouts from previous batches were counted.

or led to rollout id key error because this rollout is not from current batch image

Solution

  • Introduced an attempt_id for each task claim.
    • Every time a task is handed out via /task, a new attempt_id (UUID) is generated.
    • Workers are required to include this attempt_id when submitting rollouts.
  • When the server receives a rollout:
    • It checks if the attempt_id matches the currently active one in _processing_tasks.
    • If it doesn't match (i.e. the task was requeued and a newer attempt is active), the stale rollout is silently ignored.

This ensures at most one valid rollout is stored per logical task, and late stale results will not break training.

Changes

  • Updated Task to include attempt_id.
  • get_next_task assigns a new attempt_id upon each claim.
  • store_rollout validates the attempt_id before accepting results.
  • Updated logging to make it clear when a stale rollout is dropped.

Why this is important

  • Guarantees consistency between queued tasks and completed rollouts.
  • Prevents assertion errors during training when tasks time out and later resurface.
  • Makes the system more robust in long-running distributed training with occasional straggler workers.

Related issues

  • Internal error AssertionError: assert len(self._completed_rollouts) == self._total_tasks_queued caused by duplicate rollouts from stale tasks.

USTCKevinF avatar Sep 05 '25 06:09 USTCKevinF

In server.py, we already had a num_claims. Maybe rename it as claim_sequence_id is a good idea? I don't see the API used elsewhere. so it should be safe.

ultmaster avatar Sep 06 '25 06:09 ultmaster