qstash-js icon indicating copy to clipboard operation
qstash-js copied to clipboard

Vercel edge function stream not working with qstash

Open nilooy opened this issue 2 years ago • 6 comments

i am trying to use vercel edge function's streaming with ai sdk (https://sdk.vercel.ai/docs/guides/openai) for long running job(up to 120s) with https://docs.upstash.com/qstash

it works perfectly locally but when deployed on vercel, it doesn't. it just stream for few milli seconds and stops. (no error) i'm not sure exactly where the issue is. At first glance seems like, the issue is on qstash but then it works locally and not in vercel, so it indicates that the issue might be there.

i would love to know any feedback about this approach for fan out jobs with edge function's streaming.

calling this to run queue:

import { baseUrl, qstashClient } from "@/lib/queue/qstash";

export default async function handler(req, res) {
  const job = await qstashClient.publishJSON({
    url: baseUrl + "/api/test/chat",
    // or topic: "the name or id of a topic"
    body: {},
    retries: 0,
  });

  return res.status(200).json({ ok: true });
}

the queue that run as background job

import { Configuration, OpenAIApi } from "openai-edge";
import { OpenAIStream, StreamingTextResponse } from "ai";
import { validateSignatureEdge } from "./validateSignatureEdge";

// Create an OpenAI API client (that's edge friendly!)
const config = new Configuration({
  organization: process.env.OPENAI_ORG,
  apiKey: process.env.OPENAI_API_KEY,
});

const openai = new OpenAIApi(config);

export const runtime = "edge";

async function handler(req: Request) {
  const response = await openai.createChatCompletion({
    model: "gpt-3.5-turbo",
    stream: true,
    messages: [{ role: "user", content: "how next.js works?" }],
  });
  const stream = OpenAIStream(response, {
    onToken: (token) => {
      console.log({ token });
    },
    onStart: () => {
      console.log("started");
    },
  });
  // Respond with the stream
  return new StreamingTextResponse(stream);
}

// it's a custom signature validator of qstash
export default validateSignatureEdge(handler);

validateSignatureEdge.ts

// next.js api wrapper
export const validateSignatureEdge = (handler) => async (req, res) => {
  const receiver = new Receiver({
    currentSigningKey: process.env.QSTASH_CURRENT_SIGNING_KEY,
    nextSigningKey: process.env.QSTASH_NEXT_SIGNING_KEY,
  });

  const body = await req.text();

  const isValid = receiver.verify({
    signature: req.headers.get("Upstash-Signature")!,
    body: body,
  });
  if (!isValid) {
    return new Response("Invalid signature", { status: 401 });
  }

  req.data = JSON.parse(body);

  return handler(req, res);
};

nilooy avatar Jun 26 '23 23:06 nilooy

What is the error?

chronark avatar Jun 27 '23 07:06 chronark

What is the error?

there's no error showing up it's just starts and under few ms it stops streaming (locally with same qstash it streams fully)

Screenshot 2023-06-27 at 10 01 29 log ends here

it happens all at the same time, so i doubt if any streaming is actually happening. Screenshot 2023-06-27 at 10 04 26

nilooy avatar Jun 27 '23 08:06 nilooy

is it perhaps cached by vercel? they have some really aggressive caching strategies

chronark avatar Jun 27 '23 12:06 chronark

is it perhaps cached by vercel? they have some really aggressive caching strategies

i tried redeploying without cache on vercel, but no luck, i guess vercel edge function are not considering qstash as valid client for streaming. i tried without qstash, it ran fine.

nilooy avatar Jun 27 '23 13:06 nilooy

Same problem here, any solution one year later?

Raithfall avatar Apr 22 '24 16:04 Raithfall

I don't think this is anything to do with caching but I'm experiencing the same issue :/

nyacg avatar Jul 27 '24 17:07 nyacg

Hey @nyacg, can you explain the problem with more detail?

From what I understand, the original post was trying to call an endpoint that returns a streaming response from qstash. If you don't pass a callback when sending the request, qstash won't use the body. So if the response status is successfull, it does not wait for the response stream to finish. I tried locally, here is an example

image

If you pass a callback however, qstash waits until the stream ends and calls the callback with that body. For example, here is the body returned to my callback handler:

image

But there is no reason to do this, you don't have to stream the response since it wouldn't make a difference. QStash would still wait for it to finish so it's better to wait for the whole response in your handler.

ytkimirti avatar Sep 10 '24 10:09 ytkimirti

Hi there @nilooy,

it looks like the fix above addresses the issue. We are closing this issue. Let us know if you have any other questions

CahidArda avatar Sep 25 '24 11:09 CahidArda