Dify2OpenAI icon indicating copy to clipboard operation
Dify2OpenAI copied to clipboard

workflow场景下,支持流式响应,设置了 stream true 也不生效问题解决

Open randolph555 opened this issue 2 months ago • 1 comments

替换复制下面完整代码即可修复问题

randolph555 avatar Oct 11 '25 01:10 randolph555

// workflowHandler.js

import fetch from "node-fetch";
import { PassThrough } from "stream";
import { log } from '../config/logger.js';
import { logApiCall, generateId, getFileExtension, getFileType } from "./utils.js";

// 上传文件到 Dify,获取文件 ID
async function uploadFileToDify(base64Data, config, userId) {
  try {
    // 解析 base64 数据 URL,提取 contentType 和 base64 字符串
    const matches = base64Data.match(/^data:(.+);base64,(.+)$/);
    if (!matches || matches.length !== 3) {
      throw new Error("Invalid base64 data");
    }
    let contentType = matches[1];
    const base64String = matches[2];
    let fileData = Buffer.from(base64String, "base64");

    // 如果 contentType 是 'image/jpg',将其调整为 'image/jpeg'
    if (contentType === "image/jpg") {
      contentType = "image/jpeg";
    }

    // 从 contentType 确定文件扩展名
    const fileExtension = contentType.split("/")[1]; // 例如 'jpeg'、'png'、'gif'

    // 创建文件名
    const filename = `file.${fileExtension}`;

    // 创建 FormData 并包含 'user' 字段
    const form = new FormData();
    form.append("file", fileData, {
      filename: filename,
      contentType: contentType,
    });
    form.append("user", userId); // 使用提供的用户标识符

    // 记录文件上传请求的详细信息
    log("info", "正在上传文件到 Dify", {
      url: `${config.DIFY_API_URL}/files/upload`,
      headers: {
        Authorization: `Bearer ${config.API_KEY}`,
        ...form.getHeaders(),
      },
      formData: "<<FILE DATA>>", // 出于安全考虑,不记录实际文件数据
    });

    // 发送上传请求
    const response = await fetch(`${config.DIFY_API_URL}/files/upload`, {
      method: "POST",
      headers: {
        Authorization: `Bearer ${config.API_KEY}`,
        ...form.getHeaders(),
      },
      body: form,
    });

    // 记录文件上传响应的详细信息
    log("info", "文件上传响应", {
      status: response.status,
      statusText: response.statusText,
    });

    if (!response.ok) {
      const errorBody = await response.text();
      log("error", "文件上传失败", {
        status: response.status,
        statusText: response.statusText,
        errorBody: errorBody,
      });
      throw new Error(
        `文件上传失败: ${response.status} ${response.statusText}: ${errorBody}`
      );
    }

    const result = await response.json();
    log("info", "文件上传成功", { fileId: result.id });
    return result.id; // 返回文件 ID
  } catch (error) {
    console.error("上传文件出错:", error);
    throw error;
  }
}

// 处理 Workflow 类型请求
async function handleRequest(req, res, config, requestId, startTime) {
  try {
    const apiPath = "/workflows/run";
    const data = req.body;
    const messages = data.messages;
    let inputs = {};
    let files = [];

    // 记录收到的请求头和请求体
    log("info", "收到请求", {
      requestId,
      headers: req.headers,
      body: data,
    });

    const userId = "apiuser"; // 如果可用,替换为实际的用户 ID
    const lastMessage = messages[messages.length - 1];
    
    // 第一步:先扫描所有消息中的图片内容
    log("info", "开始扫描所有消息中的图片", { requestId, messageCount: messages.length });
    for (const message of messages) {
      if (Array.isArray(message.content)) {
        for (const content of message.content) {
          if (content.type === "image_url" && content.image_url && content.image_url.url) {
            const imageUrl = content.image_url.url;
            
            // 检查URL是否为base64数据
            if (imageUrl.startsWith('data:')) {
              // 是base64数据,需要上传
              const fileExt = getFileExtension(imageUrl);
              const fileType = getFileType(fileExt);
              log("info", "检测到base64数据,准备上传", { requestId, fileType, fileExt });
              const fileId = await uploadFileToDify(
                imageUrl,
                config,
                userId
              );
              // 构建输入格式
              const fileInput = {
                transfer_method: "local_file",
                upload_file_id: fileId,
                type: fileType,
              };
              inputs["file_input"] = fileInput;
            } else {
              // 是真正的URL,直接使用remote_url方式
              const fileExt = getFileExtension(imageUrl);
              const fileType = getFileType(fileExt);
              log("info", "检测到远程文件URL", { requestId, url: imageUrl.substring(0, 30) + '...', fileType, fileExt });
              const fileInput = {
                transfer_method: "remote_url",
                url: imageUrl,
                type: fileType,
              };
              inputs["file_input"] = fileInput;
            }
          }
        }
      }
    }
    
    // 第二步:从最后一条消息中提取查询文本
    if (Array.isArray(lastMessage.content)) {
      for (const content of lastMessage.content) {
        // 处理字符串类型的内容(OpenAI格式)
        if (typeof content === "string") {
          // 将字符串类型的内容设置为输入变量
          inputs["text_input"] = content;
        }
        // 处理对象类型的内容
        else if (content.type === "text") {
          // 假设文本内容是输入变量,需要根据您的应用逻辑调整
          inputs["text_input"] = content.text;
        }
        // 注意:这里不再重复处理image_url,因为已经在上面处理过了
      }
    } else {
      // 假设消息内容是输入变量,需要根据您的应用逻辑调整
      inputs["text_input"] = lastMessage.content;
    }

    // 日志记录
    log("info", "处理 Workflow 类型消息", {
      requestId,
      inputs,
      files,
    });

    const stream = data.stream !== undefined ? data.stream : false;

    // 构建请求体
    const requestBody = {
      inputs: inputs,
      response_mode: "streaming",
      user: userId,
      files: files, // 如果需要,可以将 files 数组添加到请求体中
    };

    // 记录将要发送到 Dify 的请求载荷
    log("info", "发送请求到 Dify", {
      requestId,
      url: config.DIFY_API_URL + apiPath,
      method: "POST",
      headers: {
        "Content-Type": "application/json",
        Authorization: `Bearer ${config.API_KEY}`,
      },
      body: requestBody,
    });

    // 发送请求到 Dify
    const resp = await fetch(config.DIFY_API_URL + apiPath, {
      method: "POST",
      headers: {
        "Content-Type": "application/json",
        Authorization: `Bearer ${config.API_KEY}`,
      },
      body: JSON.stringify(requestBody),
    });

    // 记录 API 调用时间
    const apiCallDuration = Date.now() - startTime;
    logApiCall(requestId, config, apiPath, apiCallDuration);

    // 记录 Dify 的响应状态
    log("info", "收到 Dify 响应", {
      requestId,
      status: resp.status,
      statusText: resp.statusText,
    });

    if (!resp.ok) {
      const errorBody = await resp.text();
      log("error", "Dify API 请求失败", {
        requestId,
        status: resp.status,
        statusText: resp.statusText,
        errorBody: errorBody,
      });
      res.status(resp.status).send(errorBody);
      return;
    }

    let isResponseEnded = false;

    if (stream) {
      res.setHeader("Content-Type", "text/event-stream");
      let buffer = "";
      const responseStream = resp.body
        .pipe(new PassThrough())
        .on("data", (chunk) => {
          buffer += chunk.toString();
          let lines = buffer.split("\n");

          for (let i = 0; i < lines.length - 1; i++) {
            let line = lines[i].trim();

            if (!line.startsWith("data:")) continue;
            line = line.slice(5).trim();
            let chunkObj;
            try {
              if (line.startsWith("{")) {
                chunkObj = JSON.parse(line);
              } else {
                continue;
              }
            } catch (error) {
              console.error("解析 chunk 出错:", error);
              continue;
            }

            // 记录每个 chunk 的内容
            log("debug", "处理 chunk", {
              requestId,
              chunkObj,
            });

            // ✅ 关键修改:处理 text_chunk 事件(逐字流式输出)
            if (chunkObj.event === "text_chunk") {
              const textContent = chunkObj.data?.text || "";
              if (textContent && !isResponseEnded) {
                const chunkId = `chatcmpl-${Date.now()}`;
                res.write(
                  "data: " +
                    JSON.stringify({
                      id: chunkId,
                      object: "chat.completion.chunk",
                      created: Math.floor(Date.now() / 1000),
                      model: data.model,
                      choices: [
                        {
                          index: 0,
                          delta: {
                            content: textContent,
                          },
                          finish_reason: null,
                        },
                      ],
                    }) +
                    "\n\n"
                );
              }
            } else if (chunkObj.event === "workflow_started") {
              // 处理 workflow_started 事件
            } else if (chunkObj.event === "node_started") {
              // 处理 node_started 事件
            } else if (chunkObj.event === "node_finished") {
              // node_finished 事件不再输出内容,因为已经通过 text_chunk 输出了
            } else if (chunkObj.event === "workflow_finished") {
              if (!isResponseEnded) {
                res.write(
                  "data: " +
                    JSON.stringify({
                      id: `chatcmpl-${Date.now()}`,
                      object: "chat.completion.chunk",
                      created: Math.floor(Date.now() / 1000),
                      model: data.model,
                      choices: [
                        {
                          index: 0,
                          delta: {},
                          finish_reason: "stop",
                        },
                      ],
                    }) +
                    "\n\n"
                );
                res.write("data: [DONE]\n\n");
                res.end();
                isResponseEnded = true;
              }
            } else if (chunkObj.event === "ping") {
              // 处理 ping 事件
            } else if (chunkObj.event === "error") {
              console.error(`Error: ${chunkObj.code}, ${chunkObj.message}`);
              res
                .status(500)
                .write(
                  `data: ${JSON.stringify({ error: chunkObj.message })}\n\n`
                );

              if (!isResponseEnded) {
                res.write("data: [DONE]\n\n");
              }

              res.end();
              isResponseEnded = true;
            }
          }

          buffer = lines[lines.length - 1];
        });

      // 记录响应结束
      responseStream.on("end", () => {
        log("info", "响应结束", { requestId });
        if (!isResponseEnded) {
          res.write("data: [DONE]\n\n");
          res.end();
        }
      });
    } else {
      let result = "";
      let usageData = "";
      let buffer = "";
      let hasError = false;

      // 记录普通响应开始
      log("info", "开始处理普通响应", {
        requestId,
        timestamp: new Date().toISOString(),
      });

      const responseStream = resp.body;
      responseStream.on("data", (chunk) => {
        buffer += chunk.toString();
        let lines = buffer.split("\n");

        for (let i = 0; i < lines.length - 1; i++) {
          const line = lines[i].trim();
          if (line === "") continue;
          let chunkObj;
          try {
            const cleanedLine = line.replace(/^data: /, "").trim();
            if (cleanedLine.startsWith("{") && cleanedLine.endsWith("}")) {
              chunkObj = JSON.parse(cleanedLine);
            } else {
              continue;
            }
          } catch (error) {
            console.error("解析 JSON 出错:", error);
            continue;
          }

          // 记录每个 chunk 的内容
          log("debug", "处理 chunk", {
            requestId,
            chunkObj,
          });

          if (chunkObj.event === "workflow_finished") {
            const outputs = chunkObj.data.outputs;
            if (config.OUTPUT_VARIABLE) {
              result = outputs[config.OUTPUT_VARIABLE];
            } else {
              result = outputs;
            }
            usageData = {
              total_tokens: chunkObj.data.total_tokens || 110,
            };
          } else if (chunkObj.event === "ping") {
            // 处理 ping 事件
          } else if (chunkObj.event === "error") {
            hasError = true;
            console.error(`Error: ${chunkObj.code}, ${chunkObj.message}`);
            break;
          }
        }

        buffer = lines[lines.length - 1];
      });

      responseStream.on("end", () => {
        if (hasError) {
          res
            .status(500)
            .json({ error: "An error occurred while processing the request." });
        } else {
          const formattedResponse = {
            id: `chatcmpl-${generateId()}`,
            object: "chat.completion",
            created: Math.floor(Date.now() / 1000),
            model: data.model,
            choices: [
              {
                index: 0,
                message: {
                  role: "assistant",
                  content: result,
                },
                logprobs: null,
                finish_reason: "stop",
              },
            ],
            usage: usageData,
            system_fingerprint: "fp_2f57f81c11",
          };
          const jsonResponse = JSON.stringify(formattedResponse, null, 2);

          // 记录发送的响应
          log("info", "发送响应", {
            requestId,
            response: formattedResponse,
          });

          res.set("Content-Type", "application/json");
          res.send(jsonResponse);
        }
      });
    }
  } catch (error) {
    console.error("处理 Workflow 请求时发生错误:", error);

    // 记录错误
    log("error", "处理 Workflow 请求时发生错误", {
      requestId,
      error: error.message,
      stack: error.stack,
    });

    res.status(500).json({ error: error.message });
  }
}

export default {
  handleRequest,
};

**关键修改:

添加了 text_chunk 事件的处理,这是 Dify 发送逐字流式输出的事件 从 chunkObj.data.text 中提取文本内容并立即发送 node_finished 事件不再输出内容(因为内容已经通过 text_chunk 输出了) 实现真正的流式响应!**

randolph555 avatar Oct 11 '25 02:10 randolph555