Fix duplicate HTTP task execution via WorkflowRepairService race condition
Summary
Fixes #630
Long-running HTTP tasks (and other async system tasks) were being executed multiple times (typically 4x) due to a race condition in WorkflowRepairService.
Problem
- SystemTaskWorker polls task → removes from queue via
ackTaskReceived() - AsyncSystemTaskExecutor increments
pollCountin-memory only - HTTP call starts and blocks for minutes
- WorkflowRepairService reads task from database → sees
pollCount = 0(stale value) - Task not found in queue +
pollCount = 0→ WorkflowRepairService re-queues it - Another worker picks it up → duplicate execution
- Repeats 3-4 times during long-running operations
Solution
Two-part fix:
1. WorkflowRepairService (lines 146-155)
Check pollCount > 0 before re-queueing. If task has been polled, a worker is actively processing it, so skip re-queueing.
2. AsyncSystemTaskExecutor (lines 152-154)
Persist task to database before calling systemTask.start(). This ensures pollCount is written to DB before any blocking operations, preventing WorkflowRepairService from reading stale data.
Tradeoffs to Consider
Current Behavior (Before Fix)
- ❌ Healthy workers executing long tasks → task re-queued immediately → duplicate executions
- ✅ Crashed workers → task re-queued immediately → fast recovery
Proposed Behavior (After Fix)
- ✅ Healthy workers executing long tasks → task NOT re-queued → no duplicates
- ⏱️ Crashed workers → task NOT re-queued → recovery via timeout mechanisms
Recovery for Crashed Workers
If a worker crashes with this change, recovery depends on existing timeout mechanisms:
- Queue unack timeout: Message automatically returns to queue after timeout expires
- Response timeout (
responseTimeoutSeconds): DeciderService marks task as TIMED_OUT if no updates received - Execution timeout (
timeoutSeconds): Overall time limit from first poll
This means recovery would be delayed by the responseTimeout duration (typically 30-60s) rather than immediate.
Questions for Discussion
- What is the relative frequency of long-running tasks vs worker crashes in production?
- Is delayed recovery for crashed workers acceptable to prevent duplicate executions?
- Are there scenarios where aggressive re-queueing is beneficial that we haven't considered?
- Should we add additional logic (e.g., time-based thresholds) to handle both cases?
Changes
Modified:
core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowRepairService.java- Added pollCount check (lines 146-155)core/src/main/java/com/netflix/conductor/core/execution/AsyncSystemTaskExecutor.java- Persist task before blocking start() call (lines 152-154)core/src/test/java/com/netflix/conductor/core/reconciliation/TestWorkflowRepairService.java- Added 2 new tests:verifyScheduledTaskWithPollCountIsNotRequeued()verifySystemTaskWithPollCountIsNotRequeued()
1. Unack Timeout - Why It Doesn't Prevent Duplicates
The unack timeout should work, but there's an issue with the sequence:
All SQL-based queue implementations (Postgres, MySQL, SQLite):
public boolean ack(String queueName, String messageId) {
return removeMessage(tx, queueName, messageId); // DELETES the message
}
The sequence that causes duplicates:
- Worker polls task → message marked
popped = true - SystemTaskWorker calls
ackTaskReceived()(line 140) → message DELETED from queue - Worker starts HTTP call (takes minutes)
processAllUnacks()runs but message is already gone- WorkflowRepairService sees task not in queue → re-queues it
So the unack timeout can't work because ack() deletes the message before task completion.
Note: setUnackTimeout() is only used by WorkflowSweeper for DECIDER_QUEUE (workflow messages), not for task queue messages.
2. Race Condition with pollCount > 1
The fix handles this - the check is if (task.getPollCount() > 0), so it catches pollCount = 1, 2, 3, etc.
(Is there another race condition we're concerned about? Want to make sure we're not missing an edge case.)
3. Is This Postgres-Specific?
No, this affects all SQL-based implementations:
- Postgres:
ack()→removeMessage() - MySQL:
ack()→removeMessage() - SQLite:
ack()→removeMessage()
Redis/DynoQueue might behave differently.
Potential Alternative Fixes
Option A: WorkflowRepairService check (current PR)
- Works for all queue implementations
- Simple: if polled, don't re-queue
- Trade-off: Delayed recovery for crashed workers (via timeout mechanisms)
Option B: Don't call ackTaskReceived() in SystemTaskWorker
- Rely on unack timeout instead
- Issue: Default 60s timeout → still duplicates for 5-min tasks
- Would need dynamic unack timeouts based on task duration
Option C: Change SQL queue implementations
- Make
ack()not delete immediately, keep withacked=trueflag - Large architectural change
- Changes queue semantics
Option A seems most practical but open to other approaches if there's a better way.
Update Based on User Feedback
Thanks to @ColinShih for testing the original PR and finding it didn't solve the problem.
The Problem with the Original Fix:
The original fix only checked pollCount > 0 in WorkflowRepairService, but pollCount was being incremented in-memory and only persisted to the database in the finally block after the HTTP call completed. This meant:
- Memory:
pollCount = 1✅ - Database:
pollCount = 0❌ (stale during execution) - WorkflowRepairService reads from database → sees
pollCount = 0→ still re-queues
The Fix:
Added a second part to persist the task to the database immediately after incrementing pollCount and before calling the blocking systemTask.start(). This ensures the database has the correct pollCount value before any long-running operations begin.
Changes:
AsyncSystemTaskExecutor.java(lines 152-154): AddedexecutionDAOFacade.updateTask(task)beforesystemTask.start()
This is a more general solution than the HTTP-specific fix proposed, as it handles all async system tasks (HTTP, Lambda, etc.).
Staff Feedback: Lock Lease Extension Approach
Received feedback that the current fix has a performance issue - adding a DB write before every start() call is expensive.
Root Cause (Corrected Understanding)
The real issue is workflow lock lease timeout, not just DB state:
- WorkflowSweeper/Decider acquires lock on workflow with default 60s lease
- System task (HTTP) executes and blocks for 5+ minutes
- After 60s, workflow lock expires
- Another process acquires lock on same workflow
- Sees task still in SCHEDULED → executes it again → duplicate
Proposed Better Solution: Lock Lease Extension
AsyncSystemTaskExecutor should periodically extend the workflow lock lease during long-running task execution.
Implementation: Using ScheduledExecutorService
Conductor already uses ScheduledExecutorService for various periodic tasks. We can use the same pattern for lease extension:
Note: LocalOnlyLock uses ScheduledExecutorService but for lock expiration (one-time schedule()), not extension. We need periodic extension using scheduleAtFixedRate():
// In AsyncSystemTaskExecutor or new LockLeaseExtensionService
private static final ScheduledExecutorService LEASE_EXTENDER =
Executors.newScheduledThreadPool(10,
ExecutorsUtil.newNamedThreadFactory("lock-lease-extender-"));
private ScheduledFuture<?> startLeaseExtension(String workflowId, long leaseTimeMs) {
long extensionInterval = leaseTimeMs / 2; // Extend every 30s for 60s lease
return LEASE_EXTENDER.scheduleAtFixedRate(
() -> executionLockService.acquireLock(workflowId, 0, leaseTimeMs),
extensionInterval, extensionInterval, TimeUnit.MILLISECONDS
);
}
// Usage in AsyncSystemTaskExecutor
if (task.getStatus() == TaskModel.Status.SCHEDULED) {
ScheduledFuture<?> leaseExtender = startLeaseExtension(workflowId, lockLeaseTime);
try {
systemTask.start(workflow, task, workflowExecutor);
} finally {
leaseExtender.cancel(false);
}
}
Difference from LocalOnlyLock:
LocalOnlyLock:schedule()- runs ONCE to delete lock after expiryLease extension:scheduleAtFixedRate()- runs PERIODICALLY to renew before expiry
Challenges
- Lock interface needs extension: Add
extendLease()method toLockinterface (or just re-acquire) - Redisson support: RLock has watchdog auto-renewal but needs investigation
- Postgres support: pg_advisory_lock doesn't have native lease extension
- All Lock implementations need updating: RedisLock, PostgresLockDAO, LocalOnlyLock, NoopLock
Workaround for Now
Users can increase lock lease time to accommodate long-running tasks:
conductor.lockLeaseTime=360000 # 6 minutes for 5-min HTTP tasks
Questions
- Should we keep the current PR as-is (works but has perf cost) or revert the AsyncSystemTaskExecutor change?
- What's the priority for implementing lock lease extension properly?
- Should we start with Redis-only implementation or all Lock types?
- Is Redisson's auto-renewal (watchdog) acceptable for RedisLock?
Created analysis document in PLANNING/lock-lease-extension-analysis.md with more details.
Longer term - we should extend the lease while working on the long running system tasks.
Sounds like this is outside of current scope of work, but just for future reference, what do you see as the most likely path for implementing this?