river icon indicating copy to clipboard operation
river copied to clipboard

Sequential jobs that insert new jobs only picked up after 1 second

Open marttinen opened this issue 6 months ago • 2 comments

Hello, we're using sequences to ensure only one job is running at a given time "per entity". We have a couple of workers (~20ish) that are all set to use an id field as river:"sequence" - some of which will generate follow up jobs for the same entity - think regular maintenance jobs or jobs that first create things, call an external service and then start things. This works fine on production (so far) however our test suite is becoming slower and slower.

The docs states that

If there are no actively running jobs in a sequence, the first job in that sequence may encounter a higher latency before being moved to available by the sequence maintenance process. This latency does not apply to subsequent jobs in the sequence if they are already enqueued when the previous job completes; such subsequent jobs will be scheduled immediately.

but it seems like this is always true for jobs that are created within a sequenced job for the same entity.

For example (im using the same TaskArgs here, but this also happens with different TaskArgs/Workers):

type TaskArgs struct {
	EntityId          int `json:"entityId" river:"sequence"`
	RescheduleCounter int `json:"rescheduleCounter"`
}

func (worker *TaskWorker) Work(ctx context.Context, job *river.Job[TaskArgs]) error {
	// ...
	_, err := client.Insert(context.Background(), TaskArgs{...}, nil)
	// ...
}

When run you can see the "1 second" within the logging. For example 10 jobs will usually take around 10-12 seconds.

$ go test
2025/06/24 10:57:12 INFO Work() started entityId=13 rescheduleCounter=10
2025/06/24 10:57:12 INFO Scheduled next job nextArgs="{EntityId:13 RescheduleCounter:9}"
2025/06/24 10:57:13 INFO Work() started entityId=13 rescheduleCounter=9
2025/06/24 10:57:13 INFO Scheduled next job nextArgs="{EntityId:13 RescheduleCounter:8}"
2025/06/24 10:57:14 INFO Work() started entityId=13 rescheduleCounter=8
<snip>
2025/06/24 10:57:21 INFO We're done! entityId=13
PASS
ok  	rivertestexecution	10.267s

I've build a small river test execution project to recreate the issue in isolation. It has a basic worker implementation calling itself X times to showcase the issue.

Is there a way to work around this timer / issue?

marttinen avatar Jun 24 '25 09:06 marttinen

Hi @marttinen, sorry for the delay on this. As a Pro customer you can always feel free to email us at [email protected] for direct support, but I really appreciate you putting together an extensive reproduction like this!

Your report actually led me to uncover a related bug that hadn't shipped yet (introduced by mistake only a few days ago) so I was able to get that one fixed preemptively. Thanks 😅

After fixing that issue, my hunch was that yours is actually a result of the fetch poll interval of your client which defaults to 1 second. Here's your tests running as is:

➜  rivertestexecution git:(main) ✗  go test -v -count 1
=== RUN   TestRunSequencedJobs
2025/06/24 23:02:41 INFO Work() started entityId=81 rescheduleCounter=10
2025/06/24 23:02:41 INFO Scheduled next job nextArgs="{EntityId:81 RescheduleCounter:9}"
2025/06/24 23:02:42 INFO Work() started entityId=81 rescheduleCounter=9
2025/06/24 23:02:42 INFO Scheduled next job nextArgs="{EntityId:81 RescheduleCounter:8}"
2025/06/24 23:02:43 INFO Work() started entityId=81 rescheduleCounter=8
2025/06/24 23:02:43 INFO Scheduled next job nextArgs="{EntityId:81 RescheduleCounter:7}"
2025/06/24 23:02:44 INFO Work() started entityId=81 rescheduleCounter=7
2025/06/24 23:02:44 INFO Scheduled next job nextArgs="{EntityId:81 RescheduleCounter:6}"
2025/06/24 23:02:45 INFO Work() started entityId=81 rescheduleCounter=6
2025/06/24 23:02:45 INFO Scheduled next job nextArgs="{EntityId:81 RescheduleCounter:5}"
2025/06/24 23:02:46 INFO Work() started entityId=81 rescheduleCounter=5
2025/06/24 23:02:46 INFO Scheduled next job nextArgs="{EntityId:81 RescheduleCounter:4}"
2025/06/24 23:02:47 INFO Work() started entityId=81 rescheduleCounter=4
2025/06/24 23:02:47 INFO Scheduled next job nextArgs="{EntityId:81 RescheduleCounter:3}"
2025/06/24 23:02:48 INFO Work() started entityId=81 rescheduleCounter=3
2025/06/24 23:02:48 INFO Scheduled next job nextArgs="{EntityId:81 RescheduleCounter:2}"
2025/06/24 23:02:49 INFO Work() started entityId=81 rescheduleCounter=2
2025/06/24 23:02:49 INFO Scheduled next job nextArgs="{EntityId:81 RescheduleCounter:1}"
2025/06/24 23:02:50 INFO Work() started entityId=81 rescheduleCounter=1
2025/06/24 23:02:50 INFO We're done! entityId=81
--- PASS: TestRunSequencedJobs (12.07s)
PASS
ok      rivertestexecution      12.238s

And if I change FetchPollInterval to 100ms, I see this:

➜  rivertestexecution git:(main) ✗  go test -v -count 1
=== RUN   TestRunSequencedJobs
time=2025-06-24T23:02:17.864-05:00 level=ERROR msg="jobexecutor.JobExecutor: Unhandled job kind" kind=fake_job job_id=2
time=2025-06-24T23:02:17.864-05:00 level=WARN msg="jobexecutor.JobExecutor: Job errored; retrying" error="job kind is not registered in the client's Workers bundle: fake_job" job_id=2 job_kind=fake_job
2025/06/24 23:02:18 INFO Work() started entityId=1 rescheduleCounter=10
2025/06/24 23:02:18 INFO Scheduled next job nextArgs="{EntityId:1 RescheduleCounter:9}"
2025/06/24 23:02:18 INFO Work() started entityId=1 rescheduleCounter=9
2025/06/24 23:02:18 INFO Scheduled next job nextArgs="{EntityId:1 RescheduleCounter:8}"
2025/06/24 23:02:19 INFO Work() started entityId=1 rescheduleCounter=8
2025/06/24 23:02:19 INFO Scheduled next job nextArgs="{EntityId:1 RescheduleCounter:7}"
2025/06/24 23:02:19 INFO Work() started entityId=1 rescheduleCounter=7
2025/06/24 23:02:19 INFO Scheduled next job nextArgs="{EntityId:1 RescheduleCounter:6}"
2025/06/24 23:02:19 INFO Work() started entityId=1 rescheduleCounter=6
2025/06/24 23:02:19 INFO Scheduled next job nextArgs="{EntityId:1 RescheduleCounter:5}"
2025/06/24 23:02:19 INFO Work() started entityId=1 rescheduleCounter=5
2025/06/24 23:02:19 INFO Scheduled next job nextArgs="{EntityId:1 RescheduleCounter:4}"
2025/06/24 23:02:20 INFO Work() started entityId=1 rescheduleCounter=4
2025/06/24 23:02:20 INFO Scheduled next job nextArgs="{EntityId:1 RescheduleCounter:3}"
2025/06/24 23:02:20 INFO Work() started entityId=1 rescheduleCounter=3
2025/06/24 23:02:20 INFO Scheduled next job nextArgs="{EntityId:1 RescheduleCounter:2}"
2025/06/24 23:02:20 INFO Work() started entityId=1 rescheduleCounter=2
2025/06/24 23:02:20 INFO Scheduled next job nextArgs="{EntityId:1 RescheduleCounter:1}"
2025/06/24 23:02:20 INFO Work() started entityId=1 rescheduleCounter=1
2025/06/24 23:02:20 INFO We're done! entityId=1
--- PASS: TestRunSequencedJobs (5.57s)
PASS
ok      rivertestexecution      5.747s

That gets us down to a considerably shorter interval, but honestly still much higher than I was expecting. To explain, here's the sequence of events as I understand it:

  1. First job gets inserted, and should get picked up quickly due to listen/notify, this one gets picked up ~immediately.
  2. Second job is inserted non-transactionally when first job is executing. No insert notification is emitted because it's pending (not available).
  3. Asynchronously and after a short delay from the batching completer, the job gets marked as completed, and the sequence promotion occurs in the same transaction to mark the next job as available. No listen/notify messages are emitted here as of now either.
  4. The client doesn't try to fetch again until its next FetchPollInterval tick following the 100ms FetchCooldown. When it does finally fetch again, that next job is ready to go.

If I change your example so the worker inserts the new jobs as part of a transaction that also completes the running job, and also lower the FetchPollInterval to 100ms, I see a much better result:

➜  rivertestexecution git:(main) ✗  go test -v -count 1
=== RUN   TestRunSequencedJobs
2025/06/24 23:15:27 INFO Work() started entityId=89 rescheduleCounter=10
2025/06/24 23:15:27 INFO Scheduled next job nextArgs="{EntityId:89 RescheduleCounter:9}"
2025/06/24 23:15:27 INFO Work() started entityId=89 rescheduleCounter=9
2025/06/24 23:15:27 INFO Scheduled next job nextArgs="{EntityId:89 RescheduleCounter:8}"
2025/06/24 23:15:27 INFO Work() started entityId=89 rescheduleCounter=8
2025/06/24 23:15:27 INFO Scheduled next job nextArgs="{EntityId:89 RescheduleCounter:7}"
2025/06/24 23:15:27 INFO Work() started entityId=89 rescheduleCounter=7
2025/06/24 23:15:27 INFO Scheduled next job nextArgs="{EntityId:89 RescheduleCounter:6}"
2025/06/24 23:15:27 INFO Work() started entityId=89 rescheduleCounter=6
2025/06/24 23:15:27 INFO Scheduled next job nextArgs="{EntityId:89 RescheduleCounter:5}"
2025/06/24 23:15:27 INFO Work() started entityId=89 rescheduleCounter=5
2025/06/24 23:15:27 INFO Scheduled next job nextArgs="{EntityId:89 RescheduleCounter:4}"
2025/06/24 23:15:27 INFO Work() started entityId=89 rescheduleCounter=4
2025/06/24 23:15:27 INFO Scheduled next job nextArgs="{EntityId:89 RescheduleCounter:3}"
2025/06/24 23:15:27 INFO Work() started entityId=89 rescheduleCounter=3
2025/06/24 23:15:27 INFO Scheduled next job nextArgs="{EntityId:89 RescheduleCounter:2}"
2025/06/24 23:15:28 INFO Work() started entityId=89 rescheduleCounter=2
2025/06/24 23:15:28 INFO Scheduled next job nextArgs="{EntityId:89 RescheduleCounter:1}"
2025/06/24 23:15:28 INFO Work() started entityId=89 rescheduleCounter=1
2025/06/24 23:15:28 INFO We're done! entityId=89
--- PASS: TestRunSequencedJobs (2.32s)
PASS
ok      rivertestexecution      2.491s

Here's what that diff looks like:

git diff
diff --git a/task.go b/task.go
index 18ca085..46931b8 100644
--- a/task.go
+++ b/task.go
@@ -5,8 +5,10 @@ import (
 	"log/slog"
 
 	"github.com/jackc/pgx/v5"
+	"github.com/jackc/pgx/v5/pgxpool"
 	"github.com/riverqueue/river"
 	"riverqueue.com/riverpro"
+	"riverqueue.com/riverpro/driver/riverpropgxv5"
 )
 
 type TaskArgs struct {
@@ -29,6 +31,7 @@ func (TaskArgs) SequenceOpts() riverpro.SequenceOpts {
 
 type TaskWorker struct {
 	river.WorkerDefaults[TaskArgs]
+	dbPool *pgxpool.Pool
 }
 
 func (worker *TaskWorker) Work(ctx context.Context, job *river.Job[TaskArgs]) error {
@@ -40,11 +43,22 @@ func (worker *TaskWorker) Work(ctx context.Context, job *river.Job[TaskArgs]) er
 	}
 
 	nextArgs := TaskArgs{EntityId: job.Args.EntityId, RescheduleCounter: job.Args.RescheduleCounter - 1}
-	client := river.ClientFromContext[pgx.Tx](ctx)
-	_, err := client.Insert(context.Background(), nextArgs, nil)
+	client := riverpro.ClientFromContext[pgx.Tx](ctx)
+	tx, err := worker.dbPool.Begin(context.Background())
+	if err != nil {
+		return err
+	}
+	defer tx.Rollback(context.Background())
+
+	_, err = client.InsertTx(context.Background(), tx, nextArgs, nil)
 	if err != nil {
 		return err
 	}
 	slog.Info("Scheduled next job", "nextArgs", nextArgs)
-	return nil
+
+	if _, err := river.JobCompleteTx[*riverpropgxv5.Driver](ctx, tx, job); err != nil {
+		return err
+	}
+
+	return tx.Commit(context.Background())
 }
diff --git a/taskqueue.go b/taskqueue.go
index 28f4611..e8b82f6 100644
--- a/taskqueue.go
+++ b/taskqueue.go
@@ -2,6 +2,7 @@ package rivertestexecution
 
 import (
 	"context"
+	"time"
 
 	"github.com/jackc/pgx/v5"
 	"github.com/jackc/pgx/v5/pgxpool"
@@ -12,10 +13,11 @@ import (
 
 func NewTaskQueue(db *pgxpool.Pool) *riverpro.Client[pgx.Tx] {
 	workers := river.NewWorkers()
-	river.AddWorker(workers, &TaskWorker{})
+	river.AddWorker(workers, &TaskWorker{dbPool: db})
 
 	riverClient, err := riverpro.NewClient(riverpropgxv5.New(db), &riverpro.Config{
 		Config: river.Config{
+			FetchPollInterval: 100 * time.Millisecond,
 			Queues: map[string]river.QueueConfig{
 				river.QueueDefault: {MaxWorkers: 10},
 			},

I know it's not a totally satisfying answer, and there may be some more things we can look into to further lower latency here, but AFAICT this seems to be mainly the result of having a single client with longer default fetch intervals. I think there's more we can look into about how the various buffers and delays can add up to a fair bit of latency, and we've talked about this previously as something to look into.

Hoping to have a bit more time to keep digging more on this tomorrow.

bgentry avatar Jun 25 '25 04:06 bgentry

Hey @bgentry, thank you already for the elaborate feedback.

So far I've only tweaked the FetchPollInterval and I can reproduce the performance increase in the test project (very similar to your results). I will look into using transactions next.

marttinen avatar Jun 25 '25 09:06 marttinen