[BUG] Static rate limits reset their own limits to 0
Describe the issue I'm unable to get static rate limits to work at all. They seem to reset their limit (not their value) to 0 as soon as they're used.
Environment
- SDK: Python 1.10.2
- Engine: Cloud
Expected behavior
I don't expect the limit to change without a call to hatchet.rate_limits.put()
Code to Reproduce, Logs, or Screenshots
from hatchet_sdk import Context, EmptyModel, Hatchet
from hatchet_sdk.rate_limit import RateLimit, RateLimitDuration
from pydantic import BaseModel
hatchet = Hatchet(debug=True)
class Input(BaseModel):
foo: int = 100
workflow = hatchet.workflow(name="test", input_validator=Input)
@workflow.task(rate_limits=[
RateLimit(
static_key="rate_limit",
units="input.foo"
)
])
async def task(input: Input, ctx: Context):
print(input)
return {}
if __name__ == "__main__":
hatchet.rate_limits.put("rate_limit", 1000, RateLimitDuration.MINUTE)
worker = hatchet.worker(name="test", workflows=[workflow])
workflow.run_no_wait(Input())
worker.start()
When I run this, the task gets queued:
And if I look at the Rate Limits section, I see:
If I cancel the task and just run:
hatchet.rate_limits.put("rate_limit", 1000, RateLimitDuration.MINUTE)
The rate limit goes back up:
But as soon as I run the worker again, it drop back to 0.
What am I doing wrong here?
I have the same issue, but on the hatchet-lite Doker image and with Typescript SDK.
I tried also to set aggressively the rate limit, but the only thing that I obtained is to set the limit to the proper value, but the refill doesn't work while task with the CEL expression in the unit is Queued.
setInterval(async () => {
await hatchet.ratelimits.upsert({
key: JINA_API_TPM_KEY,
limit: 420,
duration: RateLimitDuration.MINUTE,
})
console.log('JINA_API_TPM_KEY', JINA_API_TPM_KEY)
}, 100)
my code example:
type SimpleInput = {
Message: string
}
type DagOutput = {
reverse: {
Original: string
Transformed: string
}
}
type SimpleOutput = {
TransformedMessage: string
}
export const embedResourceTask = hatchet.task({
name: 'embed-resource-task',
rateLimits: [
{
staticKey: JINA_API_RPM_KEY,
units: 1,
},
{
staticKey: JINA_API_TPM_KEY,
units: 'input.tokenUnits',
},
],
retries: 3,
executionTimeout: '5m',
backoff: {
maxSeconds: 60,
factor: 1.5,
},
fn: async (input: SimpleInput & { tokenUnits: number }, ctx): Promise<SimpleOutput> => {
if (!input.Message) {
throw new Error('Message is required')
}
ctx.logger.info('embed-resource-task, tokenUnits', { tokenUnits: input.tokenUnits })
const randomNumber = Math.floor(Math.random() * 100)
if (randomNumber <= 10) {
throw new Error('Random number is less than 70')
}
ctx.logger.info('embed-resource-task', { input })
return {
TransformedMessage: input.Message.toLowerCase(),
}
},
})
export const resourceEmbeddingWorkflow = hatchet.workflow<SimpleInput, DagOutput>({
name: 'resource-embedding-workflow',
})
export const embedResourcesSpawner = resourceEmbeddingWorkflow.task({
name: 'embed-resources-spawner',
executionTimeout: '1h',
parents: [transformResourceTask, prepareForEmbeddingTask],
fn: async (input: SimpleInput, ctx): Promise<SimpleOutput> => {
ctx.logger.info('embed-resources-spawner', { input })
const nItems = Math.floor(Math.random() * 10) + 1
ctx.logger.info(`Will spawn ${nItems} items`)
ctx.logger.info('Rate limit units', { rateLimitUnits: rateLimitUnits })
const items = Array.from({ length: nItems }, (_, i) => ({
workflow: embedResourceTask.name,
input: {
Message: `Item ${i + 1}`,
tokenUnits: Math.floor(Math.random() * 50) + 5,
},
additionalData: {
parent: ctx.workflowRunId,
},
}))
//get rate limit units
const rateLimitUnits = await hatchet.ratelimits.list({
orderByField: RateLimitOrderByField.Key,
orderByDirection: RateLimitOrderByDirection.Desc,
})
console.log(
'rateLimitUnits embedResourceTask',
rateLimitUnits?.rows?.find((r) => r.key === JINA_API_TPM_KEY)
)
const runs = await ctx.bulkRunChildren(items)
ctx.logger.info('Spawned', { runs })
const results = runs.map((run) => run.TransformedMessage)
ctx.logger.info('Results', { results })
return {
TransformedMessage: input.Message.toLowerCase(),
}
},
})
I noticed, like @jfaust, the rate limits are correctly set before putting the task with the CEL expression.
I suppose that there is a function that is called pretty frequently to check the status/progress, is setting the limit and value of the rate limit to 0
This issue has been stale for 30 days. Please update the issue or comment to keep it active. Otherwise, it will be closed in 5 days.
Still an active issue as far as I know
This issue has been stale for 30 days. Please update the issue or comment to keep it active. Otherwise, it will be closed in 5 days.
still active