river icon indicating copy to clipboard operation
river copied to clipboard

Any advice on testing worker code that calls: JobCompleteTx

Open advdv opened this issue 1 year ago • 5 comments

I'm very excited about this project, thank you and hats off!

I have work function that calls: JobCompleteTx to atomicaly complete the job as well as perform other work on the tx. It seems that this is pretty hard to unit test right now, the JobCompleteTx can only complete if the job actually exist in the database.

Ofcourse the JobCompleteTx can be mocked/replaced in the tests. Or i could insert actualy jobs, but that is a bit harder to do in my specific case because it's periodic job. Or maybe i'm overlooking something?

advdv avatar Sep 20 '24 06:09 advdv

@advdv Presumably you've tried inserting a job, but aren't able to target it because JobCompleteTx requires that it be running. Is that right?

Hmm, we might be missing a helper that allows you to transition a job from from available to running for testing purposes.

brandur avatar Sep 21 '24 18:09 brandur

I did not try to insert the job, I ended up going down a unit-testing route. I would image that for more of an integration testing setup it would definitely be usefull to have a "PrepareJobForTesting" helper function. I now use special tag that i can inspect to not call "JobCompleteTx" during testing. Something like this (NOTE that the tx is real though):

// SkipAtomicCompleteTag can be added to a job's tags to indicate that it should not be completed in the same
// tx as the work. This is mainly useful for testing.
const SkipAtomicCompleteTag = "__no_atomic_complete"

// work utility that runs fn within a transaction that is committed with the job completion, or rolled back
// when an error occurred.
func work[TArgs river.JobArgs](
	ctx context.Context,
	rw *pgxpool.Pool,
	job *river.Job[TArgs],
	fn func(pgx.Tx, *river.Job[TArgs]) error,
) error {
	tx, err := rw.Begin(ctx)
	if err != nil {
		return fmt.Errorf("failed to start tx: %w", err)
	}
	defer tx.Rollback(ctx)

	if err := fn(tx, job); err != nil {
		return err
	}

	if !slices.Contains(job.Tags, SkipAtomicCompleteTag) {
		// in tests, we don't usually have an actual job in the database, so we need to be able to skip this:
		// https://github.com/riverqueue/river/issues/605
		if _, err = river.JobCompleteTx[*riverpgxv5.Driver](ctx, tx, job); err != nil {
			return fmt.Errorf("failed to complete job: %w", err)
		}
	}

	if err = tx.Commit(ctx); err != nil {
		return fmt.Errorf("failed to commit: %w", err)
	}

	return nil
}

For my tests, i setup jobs with a helper like this:

func NewJobT[TArg river.JobArgs](a TArg) *river.Job[TArg] {
	return &river.Job[TArg]{Args: a, JobRow: &rivertype.JobRow{
		Tags: []string{worker.SkipAtomicCompleteTag},
	}}
}

advdv avatar Sep 23 '24 06:09 advdv

Please check out the new rivertest.Worker type in #753. I think it makes testing workers much easier! It should go out in the next release tomorrow.

The one caveat is that for JobCompleteTx you'll still need to get a real job row that's been inserted and manipulated into the correct running state. We'll be working on a follow up to make that easier to generate, but you can probably hack it without too much trouble in the mean time.

Will leave this issue open until we've addressed the ease of getting a job suitable for testing JobCompleteTx with the test worker.

bgentry avatar Feb 14 '25 23:02 bgentry

Hi @bgentry, thank you for the changes in #753 and #765. It's great to have built-in test helpers in River that keep the test environment similar to prod.

Would it be possible to expand upon Example_completeJobWithinTx with a recommended test pattern for using the WorkTx helper added in #765?

The test JobCompleteTxWithInsertedJobRow, showing an example of WorkJobTx, appears to use a closure to pass the same tx used for the test into the job Work method. I haven't been able to get JobCompleteTx to work in a Example_completeJobWithinTx-style job where the worker has a dbPool *pgxpool.Pool in its struct, from which it then creates a new job-scoped transaction.

alexstuckeyjump avatar Jun 04 '25 14:06 alexstuckeyjump

@alexstuckeyjump if you're wanting your real running workers to operate with a full pgx pool but in tests you want to test them within a transaction, the typical pattern for this is to have your worker type its db field with a common interface that covers methods appropriate for both a pool and a transaction. This will vary by driver (pgx vs dbsql) but the sqlc output here shows an example of how you could set that up. With that approach, your worker could take either a pool or a tx and it wouldn't know or care about the difference.

This should work well with rivertest.Worker, so long as your workers aren't doing anything that truly needs a pool instead of a tx or conn (such as checking out conns or reading pool stats).

bgentry avatar Jun 12 '25 14:06 bgentry