core icon indicating copy to clipboard operation
core copied to clipboard

Queues support

Open atinux opened this issue 1 year ago • 13 comments
trafficstars

See https://developers.cloudflare.com/queues/

How would you see the usage within NuxtHub? Feedback welcome ❤️

atinux avatar Jun 04 '24 09:06 atinux

Consumers could be defined similar to other Nitro events in server/queues/consumer-example.ts. Would probs be like Nitro's event object syntax so that details to configure it can be done alongside the code.

For producers, a composable like hubQueues().send() would suffice. Could have some fancy DX around autocompletion for queue bindings both in consumer objects and producer function.

It's worth noting that Cloudflare are introducing a thing for workflows which I think should be considered alongside implementing queues as it may be better long term for handling complex applications better? See this Tweet and this blog post.

RihanArfan avatar Jun 18 '24 21:06 RihanArfan

Consumers could be defined similar to other Nitro events in server/queues/consumer-example.ts. Would probs be like Nitro's event object syntax so that details to configure it can be done alongside the code.

I really like this idea!

Regarding the workflow, would you mind creating a new issue for it and linking this one too?

atinux avatar Jun 20 '24 06:06 atinux

I'm not sure how this works with pages? Currently I have a separate worker project which is the queue consumer. Does pages support this or is it only pull based (http)?

It would be nice to be able to merge this consumer code back into my main Nuxt project!

Gerbuuun avatar Jun 20 '24 11:06 Gerbuuun

I am not sure that Pages support queues but I know it's planned.

I try to avoid having to create two outputs and 2 deployments for one project.

atinux avatar Jun 25 '24 08:06 atinux

I'm not sure how this works with pages? Currently I have a separate worker project which is the queue consumer. Does pages support this or is it only pull based (http)?

It would be nice to be able to merge this consumer code back into my main Nuxt project!

I believe you're right where a consumer could only be a separate Worker, or be HTTP pull based.

With Pages, currently you can only add a producer binding. image

RihanArfan avatar Jun 25 '24 11:06 RihanArfan

I'm sorry if it's not the right place to ask, but I would like to publish a message on a CF queue from a hub project. But how would that be possible if we haven't the binding in hub yet, and I don't have access to the wrangler file to create de binding manually? Anyone had to do this already?

Also, if I may contribute with my feedback, Pages currently supports producers, and there is an HTTP API for pulling and ack messages. Simple composables, similar to the KV, would be really nice and simple to use.

valtlfelipe avatar Jun 28 '24 11:06 valtlfelipe

@valtlfelipe I use this util to publish to a Queue (based on other nuxthub utils)

import type { Queue } from '@cloudflare/workers-types/experimental';

interface Message {
  orderId: string;
  lang: string;
}
type OrderQueue = Queue<Message>;

let queue: OrderQueue;

export function useQueue() {
  if (queue)
    return queue;

  // @ts-expect-error globalThis.__env__ is not defined
  // eslint-disable-next-line node/prefer-global/process
  const binding = process.env.QUEUE || globalThis.__env__?.QUEUE || globalThis.QUEUE;

  if (binding) {
    queue = binding as OrderQueue;
    return queue;
  }

  throw createError('Cloudflare Queue binding not found');
}

You should add the Queue binding to the worker in the cloudflare dashboard manually.

Gerbuuun avatar Jun 28 '24 11:06 Gerbuuun

@valtlfelipe I use this util to publish to a Queue (based on other nuxthub utils) [...] You should add the Queue binding to the worker in the cloudflare dashboard manually.

Nice workaround. Thanks!

valtlfelipe avatar Jun 28 '24 11:06 valtlfelipe

This would be great thanks. I need to process 1000s of lines from a CSV and process each through hubAI() in parallel using a queue system.

Currently it looks like I'd have to create a separate project for the consumer worker, whereas it would be great to specify a single server/api/my-consumer.post/queue.ts endpoint for the consumer somehow if Nuxt can detect it's a queue job request coming in.

Edit: just tried it in the dashboard and reading the docs, CF sends a special "queue" orign rather than "fetch" so it's just a matter of handling that instead.

Image

export default {
  async fetch(request, env, ctx) {
    // No changes in the fetch handler
  },
  async queue(batch, env) {
    for (let message of batch.messages) {
      // Call a queue function in our Nuxt app to consume the message
    }
  },
} 

Maybe a new server/queues folder? With a server/queues/process.ts which allows for a custom function to determine where/what to call based on the incoming queue batch param?

alexcroox avatar Apr 30 '25 09:04 alexcroox

@alexcroox so you did manage to get it working this way?

runarlevi avatar May 03 '25 09:05 runarlevi

No I don’t know the internals of Nuxt/nitro to be able to modify either listening for fetch or queue message type in the cloudflare adaptor.

alexcroox avatar May 03 '25 10:05 alexcroox

I see. Im hoping its possible though, it would be nice to not have to create a separate project for the consumer worker!

runarlevi avatar May 03 '25 10:05 runarlevi

It's now possible to add additional bindings to NuxtHub projects, including the queue binding (though it's unavailable during local development) https://hub.nuxt.com/changelog/observability-additional-bindings

export default defineNuxtConfig({
  hub: {
    workers: true,
    // database: true,
    // ...
    bindings: {
      queue: {
        BACKGROUND_JOBS: {
          queue_name: 'background-jobs'
        }
      }
    }
  }
})

We are planning on introducing hubQueue() too in the coming weeks.

CF sends a special "queue" orign rather than "fetch" so it's just a matter of handling that instead.

You can access this using the Nitro hook cloudflare:queue. https://github.com/nitrojs/nitro/blob/v3/src/presets/cloudflare/runtime/_module-handler.ts#L72-L82

https://nuxt.com/docs/guide/going-further/hooks#server-hooks-runtime

RihanArfan avatar May 03 '25 18:05 RihanArfan

Is there any update? Or is there any manual workaround to make it work locally too?

bgervan avatar Jul 19 '25 09:07 bgervan

I'm bumping this issue as well. It's been a while, assuming acquisition has slowed this down a bit or its low priority for now?

nonwip avatar Sep 22 '25 19:09 nonwip

Right now we're working on making NuxtHub work across multiple platforms for v1 which will also be deployed directly using the cloud provider's tooling. So you'll be able to add queue bindings easily on Cloudflare with your own wrangler.jsonc config.

RihanArfan avatar Sep 25 '25 12:09 RihanArfan