Open-LLM-VTuber icon indicating copy to clipboard operation
Open-LLM-VTuber copied to clipboard

Dify integration

Open andsty opened this issue 1 year ago • 7 comments

how to use RAG functionality? I was thinking if is possible to connect vtuber with dify.ai api and create an app in dify that will contain some documents, ask question and get answer from there. So basically i am looking a way to instead of ollama to integrate dify with vtuber

andsty avatar Sep 17 '24 18:09 andsty

hello I have recently been learning about this project, and I just graduated and started working, with involvement in dify. But I am a beginner, and I hope to learn about this project and the use of Dify, then contribute to this project. What should I do? 我最近在了解这个项目,然后我最近刚开始工作,有涉及到dify。但我是个小白,我希望学习本项目和dify的使用,并为本项目做些贡献。我该怎么做呢

ShanJianSoda avatar Jun 20 '25 02:06 ShanJianSoda

I did a simple workflow on dify to retrieve the documents I uploaded, as follows: 我在dify上做了一个简单的工作流,用于检索我上传的文档,具体如下:

Image dataset 知识库设置

Image workflow 工作流

code 代码调用

"""
Dify Workflow API Test Program
Dify 工作流 API 测试程序

This program is used to test Dify Workflow API calls
此程序用于测试 Dify 工作流 API 调用
"""

API_KEY = "app-****************"

import asyncio
import aiohttp
import json
from typing import Dict, Any, AsyncGenerator, Optional
from loguru import logger


class DifyWorkflowClient:
    """Dify Workflow Client / Dify 工作流客户端"""
    
    def __init__(self, api_key: str = API_KEY, base_url: str = "https://api.dify.ai/v1"):
        """
        Initialize Dify Workflow Client
        初始化 Dify 工作流客户端
        Args:
            api_key: Dify API key / Dify API 密钥
            base_url: Dify API base URL / Dify API 基础 URL
        """
        self.api_key = api_key
        self.base_url = base_url.rstrip('/')
        self.headers = {
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json"
        }
        logger.info(f"Initialized Dify Workflow Client: {self.base_url}")
    
    async def run_workflow(
        self,
        inputs: Dict[str, Any],
        response_mode: str = "streaming",
        user: str = "workflow_user_001"
    ) -> AsyncGenerator[str, None]:
        """
        Execute Dify workflow, support streaming and blocking mode
        执行 Dify 工作流,支持 streaming 和 blocking 两种模式
        Args:
            inputs: input parameters (dict) / 输入参数(dict)
            response_mode: 'streaming' or 'blocking' / 'streaming' 或 'blocking'
            user: unique user identifier / 用户唯一标识
        Yields:
            str: streaming content or final result / 流式内容或最终结果
        """
        url = f"{self.base_url}/workflows/run"
        payload = {
            "inputs": inputs,
            "response_mode": response_mode,
            "user": user
        }
        logger.info(f"Calling workflow API: {url}")
        logger.debug(f"Request payload: {json.dumps(payload, ensure_ascii=False, indent=2)}")
        try:
            async with aiohttp.ClientSession() as session:
                async with session.post(
                    url,
                    headers=self.headers,
                    json=payload,
                    timeout=aiohttp.ClientTimeout(total=100)
                ) as response:
                    if response.status != 200:
                        error_text = await response.text()
                        logger.error(f"API call failed: {response.status} - {error_text}")
                        raise Exception(f"API call failed: {response.status} - {error_text}")
                    if response_mode == "streaming":
                        # SSE streaming response / SSE流式响应
                        async for line in response.content:
                            line = line.decode('utf-8').strip()
                            if line.startswith('data: '):
                                data = line[6:]
                                if data == '[DONE]':
                                    break
                                try:
                                    parsed_data = json.loads(data)
                                    event = parsed_data.get('event')
                                    event_data = parsed_data.get('data', {})
                                    if event == 'text_chunk' and 'text' in event_data:
                                        yield event_data['text']
                                    # elif event == 'node_finished':
                                    #     outputs = event_data.get('outputs', {})
                                    #     if isinstance(outputs, dict) and 'text' in outputs:
                                    #         yield outputs['text']
                                    #     elif outputs:
                                    #         yield str(outputs)
                                except Exception as e:
                                    logger.warning(f"JSON parse failed: {data}, error: {e}")
                    else:
                        # Blocking mode, return full JSON / 阻塞模式,直接返回完整 JSON
                        data = await response.json()
                        outputs = data.get('data', {}).get('outputs', {})
                        if isinstance(outputs, dict) and 'text' in outputs:
                            yield outputs['text']
                        elif outputs:
                            yield str(outputs)
        except Exception as e:
            logger.error(f"Workflow call exception: {e}")
            raise
    
    async def run_workflow_blocking(
        self,
        inputs: Dict[str, Any],
        user: str = "workflow_user_001"
    ) -> str:
        """
        Execute Dify workflow in blocking mode, return full text
        阻塞模式执行 Dify 工作流,返回完整文本
        """
        result = ""
        async for chunk in self.run_workflow(inputs, "blocking", user):
            result += chunk
        return result


async def test_workflow_streaming():
    """
    Test streaming workflow call
    测试流式工作流调用
    """
    logger.info("=== Test streaming workflow call / 测试流式工作流调用 ===")
    
    client = DifyWorkflowClient()
    
    test_inputs = {
        "query": "为什么用自注意力机制"
    }
    
    try:
        print("Calling workflow... / 正在调用工作流...")
        print("Response content: / 响应内容:")
        print("-" * 50)
        
        async for chunk in client.run_workflow(
            inputs=test_inputs,
            response_mode="streaming",
            user=API_KEY
        ):
            print(chunk, end="", flush=True)
        
        print("\n" + "-" * 50)
        logger.info("Streaming call finished / 流式调用完成")
        
    except Exception as e:
        logger.error(f"Streaming call failed / 流式调用失败: {e}")


async def test_workflow_blocking():
    """
    Test blocking workflow call
    测试阻塞式工作流调用
    """
    logger.info("=== Test blocking workflow call / 测试阻塞式工作流调用 ===")
    
    client = DifyWorkflowClient()
    
    test_inputs = {
        "query": "为什么用自注意力机制"
    }
    
    try:
        print("Calling workflow (blocking mode)... / 正在调用工作流(阻塞模式)...")
        
        response = await client.run_workflow_blocking(
            inputs=test_inputs,
            user=API_KEY
        )
        
        print("Full response: / 完整响应:")
        print("-" * 50)
        print(response)
        print("-" * 50)
        logger.info("Blocking call finished / 阻塞式调用完成")
        
    except Exception as e:
        logger.error(f"Blocking call failed / 阻塞式调用失败: {e}")


async def test_multiple_queries():
    """
    Test multiple queries in streaming mode
    测试多个查询(流式模式)
    """
    logger.info("=== Test multiple queries / 测试多个查询 ===")
    
    client = DifyWorkflowClient()
    
    test_queries = [
        "为什么用自注意力机制",
        "惠惠有什么特点",
        "今天天气怎么样"
    ]
    
    for i, query in enumerate(test_queries, 1):
        logger.info(f"\n--- Query {i}: {query} --- / 查询 {i}: {query} ---")
        
        try:
            print(f"\nQuestion {i}: {query} / 问题 {i}: {query}")
            print("Answer: / 回答:", end=" ")
            
            async for chunk in client.run_workflow(
                inputs={"query": query},
                response_mode="streaming",
                user=API_KEY
            ):
                print(chunk, end="", flush=True)
            
            print("\n")
            
        except Exception as e:
            logger.error(f"查询 {i} 失败: {e}")
        
        # 等待一下再进行下一个查询
        await asyncio.sleep(1)


async def interactive_test():
    """
    Interactive test mode
    交互式测试模式
    """
    logger.info("=== Interactive test mode / 交互式测试 ===")
    
    client = DifyWorkflowClient()
    
    print("Enter interactive mode, type 'quit' to exit / 进入交互式模式,输入 'quit' 退出")
    print("-" * 50)
    
    while True:
        try:
            query = input("\nPlease enter your question: / 请输入问题: ").strip()
            
            if query.lower() in ['quit', 'exit', '退出']:
                print("Exit interactive mode / 退出交互式模式")
                break
            
            if not query:
                continue
            
            print("Answer: / 回答:", end=" ")
            
            async for chunk in client.run_workflow(
                inputs={"query": query},
                response_mode="streaming",
                user=API_KEY
            ):
                print(chunk, end="", flush=True)
            
            print("\n")
            
        except KeyboardInterrupt:
            print("\n\nUser interrupted, exiting / 用户中断,退出程序")
            break
        except Exception as e:
            logger.error(f"Interactive test failed / 交互式测试失败: {e}")


async def main():
    """
    Main function for test menu
    主函数,测试菜单
    """
    logger.info("=== Dify Workflow API Test Program / Dify 工作流 API 测试程序 ===")
    
    logger.add(
        "logs/dify_workflow_test.log",
        rotation="1 day",
        retention="7 days",
        level="INFO"
    )
    
    print("Please select test mode: / 请选择测试模式:")
    print("1. Streaming call test / 流式调用测试")
    print("2. Blocking call test / 阻塞式调用测试")
    print("3. Multiple queries test / 多个查询测试")
    print("4. Interactive test / 交互式测试")
    print("5. Run all tests / 运行所有测试")
    print("6. Exit / 退出")
    
    try:
        choice = input("\nPlease enter your choice (1-6): / 请输入选择 (1-6): ").strip()
        
        if choice == "1":
            await test_workflow_streaming()
        elif choice == "2":
            await test_workflow_blocking()
        elif choice == "3":
            await test_multiple_queries()
        elif choice == "4":
            await interactive_test()
        elif choice == "5":
            await test_workflow_streaming()
            await asyncio.sleep(1)
            await test_workflow_blocking()
            await asyncio.sleep(1)
            await test_multiple_queries()
        elif choice == "6":
            logger.info("Exit program / 退出程序")
        else:
            logger.warning("Invalid choice, exiting / 无效选择,退出程序")
            
    except KeyboardInterrupt:
        print("\n\nUser interrupted, exiting / 用户中断,退出程序")
    except Exception as e:
        logger.error(f"Program exception / 程序运行异常: {e}")


if __name__ == "__main__":
    # 确保 logs 目录存在
    import os
    os.makedirs("logs", exist_ok=True)
    
    # 运行主函数
    asyncio.run(main()) 

return 返回内容:

Image

ShanJianSoda avatar Jun 20 '25 06:06 ShanJianSoda

我下一步打算进行一个开发工作以实现dify和agno,可能也有fastgpt,ragflow等智能体平台的适配。如果有兴趣,请在qq群或者是discord或者是zulip找到我,一起讨论一下具体方案。我预计今天启动开发工作。多谢。@ShanJianSoda @andsty

Next, I plan to start a development project to enable integration with Dify and Agno, and possibly other agent platforms such as FastGPT and RAGFlow. If you're interested, please find me on QQ group, Discord, or Zulip to discuss the specific implementation plan. I expect to start development today. Thanks!

mastwet avatar Oct 29 '25 12:10 mastwet

Close this issue because dify has supported openai compatible api

ylxmf2005 avatar Nov 16 '25 04:11 ylxmf2005

有一定的区别。dify并不是标准的openai compat兼容格式There is a certain difference. dify is not in the standard OpenAI-compatible format. @ylxmf2005

mastwet avatar Nov 17 '25 01:11 mastwet

pls refer to this article:https://www.cnblogs.com/xiao987334176/p/18805992

mastwet avatar Nov 17 '25 08:11 mastwet

有一定的区别。dify并不是标准的openai compat兼容格式There is a certain difference. dify is not in the standard OpenAI-compatible format. @ylxmf2005

Sorry, I closed the issue before doing enough research, now I reopen it.

I still want to connect through the OpenAI interface. The project https://github.com/onenov/Dify2OpenAI looks useful. It wraps workflow, agent, and chat. Which makes it a "model" instead of an agent.

This project might become an optional plugin for the next OLV generation. Adapting it will take some time.

ylxmf2005 avatar Nov 17 '25 09:11 ylxmf2005