core
core copied to clipboard
Queues support
See https://developers.cloudflare.com/queues/
How would you see the usage within NuxtHub? Feedback welcome ❤️
Consumers could be defined similar to other Nitro events in server/queues/consumer-example.ts. Would probs be like Nitro's event object syntax so that details to configure it can be done alongside the code.
For producers, a composable like hubQueues().send() would suffice. Could have some fancy DX around autocompletion for queue bindings both in consumer objects and producer function.
It's worth noting that Cloudflare are introducing a thing for workflows which I think should be considered alongside implementing queues as it may be better long term for handling complex applications better? See this Tweet and this blog post.
Consumers could be defined similar to other Nitro events in server/queues/consumer-example.ts. Would probs be like Nitro's event object syntax so that details to configure it can be done alongside the code.
I really like this idea!
Regarding the workflow, would you mind creating a new issue for it and linking this one too?
I'm not sure how this works with pages? Currently I have a separate worker project which is the queue consumer. Does pages support this or is it only pull based (http)?
It would be nice to be able to merge this consumer code back into my main Nuxt project!
I am not sure that Pages support queues but I know it's planned.
I try to avoid having to create two outputs and 2 deployments for one project.
I'm not sure how this works with pages? Currently I have a separate worker project which is the queue consumer. Does pages support this or is it only pull based (http)?
It would be nice to be able to merge this consumer code back into my main Nuxt project!
I believe you're right where a consumer could only be a separate Worker, or be HTTP pull based.
With Pages, currently you can only add a producer binding.
I'm sorry if it's not the right place to ask, but I would like to publish a message on a CF queue from a hub project. But how would that be possible if we haven't the binding in hub yet, and I don't have access to the wrangler file to create de binding manually? Anyone had to do this already?
Also, if I may contribute with my feedback, Pages currently supports producers, and there is an HTTP API for pulling and ack messages. Simple composables, similar to the KV, would be really nice and simple to use.
@valtlfelipe I use this util to publish to a Queue (based on other nuxthub utils)
import type { Queue } from '@cloudflare/workers-types/experimental';
interface Message {
orderId: string;
lang: string;
}
type OrderQueue = Queue<Message>;
let queue: OrderQueue;
export function useQueue() {
if (queue)
return queue;
// @ts-expect-error globalThis.__env__ is not defined
// eslint-disable-next-line node/prefer-global/process
const binding = process.env.QUEUE || globalThis.__env__?.QUEUE || globalThis.QUEUE;
if (binding) {
queue = binding as OrderQueue;
return queue;
}
throw createError('Cloudflare Queue binding not found');
}
You should add the Queue binding to the worker in the cloudflare dashboard manually.
@valtlfelipe I use this util to publish to a Queue (based on other nuxthub utils) [...] You should add the Queue binding to the worker in the cloudflare dashboard manually.
Nice workaround. Thanks!
This would be great thanks. I need to process 1000s of lines from a CSV and process each through hubAI() in parallel using a queue system.
Currently it looks like I'd have to create a separate project for the consumer worker, whereas it would be great to specify a single server/api/my-consumer.post/queue.ts endpoint for the consumer somehow if Nuxt can detect it's a queue job request coming in.
Edit: just tried it in the dashboard and reading the docs, CF sends a special "queue" orign rather than "fetch" so it's just a matter of handling that instead.
export default {
async fetch(request, env, ctx) {
// No changes in the fetch handler
},
async queue(batch, env) {
for (let message of batch.messages) {
// Call a queue function in our Nuxt app to consume the message
}
},
}
Maybe a new server/queues folder? With a server/queues/process.ts which allows for a custom function to determine where/what to call based on the incoming queue batch param?
@alexcroox so you did manage to get it working this way?
No I don’t know the internals of Nuxt/nitro to be able to modify either listening for fetch or queue message type in the cloudflare adaptor.
I see. Im hoping its possible though, it would be nice to not have to create a separate project for the consumer worker!
It's now possible to add additional bindings to NuxtHub projects, including the queue binding (though it's unavailable during local development) https://hub.nuxt.com/changelog/observability-additional-bindings
export default defineNuxtConfig({
hub: {
workers: true,
// database: true,
// ...
bindings: {
queue: {
BACKGROUND_JOBS: {
queue_name: 'background-jobs'
}
}
}
}
})
We are planning on introducing hubQueue() too in the coming weeks.
CF sends a special "queue" orign rather than "fetch" so it's just a matter of handling that instead.
You can access this using the Nitro hook cloudflare:queue. https://github.com/nitrojs/nitro/blob/v3/src/presets/cloudflare/runtime/_module-handler.ts#L72-L82
https://nuxt.com/docs/guide/going-further/hooks#server-hooks-runtime
Is there any update? Or is there any manual workaround to make it work locally too?
I'm bumping this issue as well. It's been a while, assuming acquisition has slowed this down a bit or its low priority for now?
Right now we're working on making NuxtHub work across multiple platforms for v1 which will also be deployed directly using the cloud provider's tooling. So you'll be able to add queue bindings easily on Cloudflare with your own wrangler.jsonc config.