taskiq icon indicating copy to clipboard operation
taskiq copied to clipboard

Allow scheduling one-time tasks for the future

Open fubuloubu opened this issue 2 years ago • 5 comments
trafficstars

I have a scenario where I know a time in the future (a variable time) that I want to trigger a task to execute only once, and I want to be able to dynamically schedule this task in the task queue from another task without doing asyncio.sleep (which keeps the task that detects this condition running until it's complete)

Do you have any suggestions of how I can do this, or implement functionality into TaskIQ for this?

fubuloubu avatar Aug 10 '23 14:08 fubuloubu

Sure. I would suggest to use taskiq scheduler for that.

Generic implementation only uses labels, but you can add implementation that fits you more. I created an example of a scheduler that can dynamically store tasks in postgresql.

https://gist.github.com/s3rius/f54301d28fe34159f24b7bb9c0d51939

s3rius avatar Aug 10 '23 21:08 s3rius

Sure. I would suggest to use taskiq scheduler for that.

Generic implementation only uses labels, but you can add implementation that fits you more. I created an example of a scheduler that can dynamically store tasks in postgresql.

https://gist.github.com/s3rius/f54301d28fe34159f24b7bb9c0d51939

Ah, okay yeah that's a good design. Basically store the schedules in some sort of long-term storage, then query on a cron for tasks to kiq into the queue

fubuloubu avatar Aug 11 '23 20:08 fubuloubu

Exactly.

s3rius avatar Aug 11 '23 21:08 s3rius

Why not just use the with_labels(delay=N) on a regular call of kicker()? Isn't that the point of it -- run the task after N amount of time has passed?

pahrohfit avatar Jan 11 '24 18:01 pahrohfit

Why not just use the with_labels(delay=N) on a regular call of kicker()? Isn't that the point of it -- run the task after N amount of time has passed?

To follow up on this, we ended up with a regular polling of a database record containing a future time to process the task for a number of reasons:

  1. Resilient to fault reset on the task manager service
  2. The time field in the database record may change (increase in time), so avoids resetting a delayed task execution
  3. We can control burst load when a lot of records come due at once by doing a LIMIT query until the queue is empty (in case that becomes a problem with the task manager service we have an envvar we can fiddle with)

fubuloubu avatar Jan 11 '24 18:01 fubuloubu