hatchet icon indicating copy to clipboard operation
hatchet copied to clipboard

[BUG] Static rate limits reset their own limits to 0

Open jfaust opened this issue 7 months ago • 5 comments

Describe the issue I'm unable to get static rate limits to work at all. They seem to reset their limit (not their value) to 0 as soon as they're used.

Environment

  • SDK: Python 1.10.2
  • Engine: Cloud

Expected behavior I don't expect the limit to change without a call to hatchet.rate_limits.put()

Code to Reproduce, Logs, or Screenshots

from hatchet_sdk import Context, EmptyModel, Hatchet
from hatchet_sdk.rate_limit import RateLimit, RateLimitDuration
from pydantic import BaseModel

hatchet = Hatchet(debug=True)

class Input(BaseModel):
    foo: int = 100

workflow = hatchet.workflow(name="test", input_validator=Input)

@workflow.task(rate_limits=[
    RateLimit(
        static_key="rate_limit",
        units="input.foo"
    )
])
async def task(input: Input, ctx: Context):
    print(input)
    return {}

if __name__ == "__main__":
    hatchet.rate_limits.put("rate_limit", 1000, RateLimitDuration.MINUTE)
    worker = hatchet.worker(name="test", workflows=[workflow])
    workflow.run_no_wait(Input())
    worker.start()

When I run this, the task gets queued: Image

And if I look at the Rate Limits section, I see: Image

If I cancel the task and just run:

hatchet.rate_limits.put("rate_limit", 1000, RateLimitDuration.MINUTE)

The rate limit goes back up: Image

But as soon as I run the worker again, it drop back to 0.

What am I doing wrong here?

jfaust avatar Jun 06 '25 21:06 jfaust

I have the same issue, but on the hatchet-lite Doker image and with Typescript SDK.

I tried also to set aggressively the rate limit, but the only thing that I obtained is to set the limit to the proper value, but the refill doesn't work while task with the CEL expression in the unit is Queued.

setInterval(async () => {
	await hatchet.ratelimits.upsert({
		key: JINA_API_TPM_KEY,
		limit: 420,
		duration: RateLimitDuration.MINUTE,
	})
	console.log('JINA_API_TPM_KEY', JINA_API_TPM_KEY)
}, 100)
Image

my code example:

type SimpleInput = {
	Message: string
}
type DagOutput = {
	reverse: {
		Original: string
		Transformed: string
	}
}
type SimpleOutput = {
	TransformedMessage: string
}

export const embedResourceTask = hatchet.task({
	name: 'embed-resource-task',
	rateLimits: [
		{
			staticKey: JINA_API_RPM_KEY,
			units: 1,
		},
		{
			staticKey: JINA_API_TPM_KEY,
			units: 'input.tokenUnits',
		},
	],
	retries: 3,
	executionTimeout: '5m',
	backoff: {
		maxSeconds: 60,
		factor: 1.5,
	},
	fn: async (input: SimpleInput & { tokenUnits: number }, ctx): Promise<SimpleOutput> => {
		if (!input.Message) {
			throw new Error('Message is required')
		}
		ctx.logger.info('embed-resource-task, tokenUnits', { tokenUnits: input.tokenUnits })
		const randomNumber = Math.floor(Math.random() * 100)
		if (randomNumber <= 10) {
			throw new Error('Random number is less than 70')
		}
		ctx.logger.info('embed-resource-task', { input })
		return {
			TransformedMessage: input.Message.toLowerCase(),
		}
	},
})

export const resourceEmbeddingWorkflow = hatchet.workflow<SimpleInput, DagOutput>({
	name: 'resource-embedding-workflow',
})

export const embedResourcesSpawner = resourceEmbeddingWorkflow.task({
	name: 'embed-resources-spawner',
	executionTimeout: '1h',
	parents: [transformResourceTask, prepareForEmbeddingTask],
	fn: async (input: SimpleInput, ctx): Promise<SimpleOutput> => {
		ctx.logger.info('embed-resources-spawner', { input })
		const nItems = Math.floor(Math.random() * 10) + 1
		ctx.logger.info(`Will spawn ${nItems} items`)
		ctx.logger.info('Rate limit units', { rateLimitUnits: rateLimitUnits })
		const items = Array.from({ length: nItems }, (_, i) => ({
			workflow: embedResourceTask.name,
			input: {
				Message: `Item ${i + 1}`,
				tokenUnits: Math.floor(Math.random() * 50) + 5,
			},
			additionalData: {
				parent: ctx.workflowRunId,
			},
		}))

		//get rate limit units
		const rateLimitUnits = await hatchet.ratelimits.list({
			orderByField: RateLimitOrderByField.Key,
			orderByDirection: RateLimitOrderByDirection.Desc,
		})
		console.log(
			'rateLimitUnits embedResourceTask',
			rateLimitUnits?.rows?.find((r) => r.key === JINA_API_TPM_KEY)
		)
		const runs = await ctx.bulkRunChildren(items)
		ctx.logger.info('Spawned', { runs })
		const results = runs.map((run) => run.TransformedMessage)
		ctx.logger.info('Results', { results })

		return {
			TransformedMessage: input.Message.toLowerCase(),
		}
	},
})

I noticed, like @jfaust, the rate limits are correctly set before putting the task with the CEL expression.

I suppose that there is a function that is called pretty frequently to check the status/progress, is setting the limit and value of the rate limit to 0

mattia-consiglio avatar Jul 31 '25 07:07 mattia-consiglio

This issue has been stale for 30 days. Please update the issue or comment to keep it active. Otherwise, it will be closed in 5 days.

github-actions[bot] avatar Sep 04 '25 08:09 github-actions[bot]

Still an active issue as far as I know

jfaust avatar Sep 04 '25 16:09 jfaust

This issue has been stale for 30 days. Please update the issue or comment to keep it active. Otherwise, it will be closed in 5 days.

github-actions[bot] avatar Oct 06 '25 08:10 github-actions[bot]

still active

jfaust avatar Oct 07 '25 20:10 jfaust