work icon indicating copy to clipboard operation
work copied to clipboard

Seeing jobs dequeued/run multiple times for long-running cron jobs, despite same jobId/start time

Open ashley-walsh opened this issue 2 years ago • 3 comments

Hi there,

I am trying to have a cron job that only runs once every X hours (or daily, etc.) despite there being multiple instances of my application. When the job is instantaneous (or close, i.e. prints a line and returns) it works as expected (we run the job with a UniqueJobId and our uniqueness constraint is triggered (see below).

...
foundJobs, err := c.BulkFindJobs(job.ID)
	if err != nil {
		return err
	}

	if len(foundJobs) > 0 && foundJobs[0] != nil {
		logrus.Warnf("Did not enqueue Job: %s in Queue: %s due to uniqueness constraint", job.ID, jobParams.jobQueueName)
		return nil
	}
...

But when the job is longer (our job takes several minutes to complete), the unique constraint is ignored across instances and the job is dequeued several times and wrongly runs several times (once per instance). We've tried to use InvisibleSec but we have found that other jobs just run after that time period-- i.e. if the job is set to run at 5:00 and InvisibleSec is 60, one instance's job runs (correctly) at 5:00 and another runs at 5:01. We've also tried to see what we can do with EnqueueDelay but that does not seem to be working either.

Any help/insight would be greatly appreciated! See below for how we are setting up our cron service.

// called on application start-up 
func main() {
...
redisClient := application.BuildRedisClient()
jobsClient := application.BuildJobsClient(redisClient)

core := core.New(
core.Config{
                     ...
JobsClient:            jobsClient,
RedisClient:           redisClient,
...
        })

... 

group.Go(func() error {
cron.CronHandler(jobsClient, context.Background())
return nil
})

}
// JobsHandler, also called on start-up
func JobsHandler(redisClient *redis.ClusterClient, handlerFunc work.ContextHandleFunc) {
	jobWorker := work.NewWorker(&work.WorkerOptions{
		Namespace: jobs.NAMESPACE,
		Queue:     work.NewRedisQueue(redisClient),
		ErrorFunc: func(err error) {
			log.Println(err)
		},
	})

	jobOpts := &work.JobOptions{
		MaxExecutionTime: time.Minute,
		IdleWait:         time.Second,
		NumGoroutines:    4,
		HandleMiddleware: []work.HandleMiddleware{
			logrus.HandleFuncLogger,
			catchPanic,
		},
	}

	for queueName := range jobs.JOB_QUEUES {
		jobWorker.RegisterWithContext(string(queueName), handlerFunc, jobOpts)
	}

	jobWorker.Start()
}
// cron service
package cron

import (
	"context"
	"main/entities/jobs"
	"main/lib/errors"

	"github.com/robfig/cron/v3"
)

func CronHandler(jobsClient jobs.Client, ctx context.Context) {
	c := cron.New()
	c.AddFunc("50 * * * *", func() { enqueueOurJob(jobsClient, ctx) })
	c.Start()
	return
}

func enqueueOurJob(jobsClient jobs.Client, ctx context.Context) {
	uniqueId := "uniqueId"
	enqueueJobParams, err := jobs.CreateEnqueueJobParams(jobs.CreateEnqueueJobParamsArgs{
		Name:         jobs.OurJob,
		UniqueJobId:  &uniqueId,
	}, &jobs.OurJobPayload{})

	err = jobsClient.EnqueueJob(ctx, *enqueueJobParams)
}

func (c *client) EnqueueJob(ctx context.Context, jobParams EnqueueJobParams) error {
	job := work.NewJob()

	if jobParams.uniqueJobId != nil {
		job.ID = *jobParams.uniqueJobId
	}

	if jobParams.enqueueDelay != nil {
		job = job.Delay(*jobParams.enqueueDelay)
	}

	if err := job.MarshalJSONPayload(string(jobParams.jobPayload)); err != nil {
		return err
	}

	foundJobs, err := c.BulkFindJobs(job.ID)
	if err != nil {
		return err
	}

        // uniqueness constraint
	if len(foundJobs) > 0 && foundJobs[0] != nil {
		logrus.Warnf("Did not enqueue Job: %s in Queue: %s due to uniqueness constraint", job.ID, jobParams.jobQueueName)
		return nil
	}

	err = c.enqueue(job, &work.EnqueueOptions{
		Namespace: NAMESPACE,
		QueueID:   string(jobParams.jobQueueName),
	})
	if err != nil {
		return err
	}

	return nil
}

ashley-walsh avatar Oct 14 '22 17:10 ashley-walsh

I think you should use this middleware to wrap your enqueuer: https://pkg.go.dev/github.com/taylorchu/[email protected]/middleware/unique.

It maps a job into a unique key and a unique duration.

The way how you did it will have race conditions.

taylorchu avatar Oct 14 '22 22:10 taylorchu

Thank you so much for the fast response! The issue is, we have varying jobs already in existence that will require different unique durations (i.e. the cron job above runs daily, but others run every 3 days). Is there a non-duration solution? For example, is it possible to just base it off of whether the job ID is unique and whether a job with that same ID is currently in the queue?

Or if we are going about this incorrectly, what is the best way to proceed?

ashley-walsh avatar Oct 17 '22 16:10 ashley-walsh

uniq function can switch by enqueue options https://pkg.go.dev/github.com/taylorchu/[email protected]#EnqueueOptions too.

taylorchu avatar Oct 17 '22 22:10 taylorchu