pipecat icon indicating copy to clipboard operation
pipecat copied to clipboard

dify connection

Open chengleilovesky opened this issue 1 year ago • 5 comments

I implemented dify as LLM. I implemented dify as LLM.

class DifyHttpService(LLMService): """DifyHttpService 使用 aiohttp 来发送异步 HTTP 请求,并解析响应数据。

该服务消耗 OpenAILLMContextFrame 帧,包含一个引用 OpenAILLMContext 帧的对象。
OpenAILLMContext 对象定义了发送到 LLM 进行完成的上下文。
这包括用户、助手和系统消息,以及工具选择和使用的工具(如果请求函数调用来自 LLM)。
"""

def __init__(self, api_key: str, base_url: str = 'http://localhost/v1', **kwargs):
    super().__init__(**kwargs)
    self.api_key = api_key
    self.base_url = base_url

async def send_chat_request(self, query: str, response_mode: str = "streaming", conversation_id: str = "",
                            user: str = "abc-123"):
    url = f'{self.base_url}/chat-messages'
    headers = {
        'Authorization': f'Bearer {self.api_key}',
        'Content-Type': 'application/json',
    }
    data = {
        "inputs": {},
        "query": query,
        "response_mode": response_mode,
        "conversation_id": conversation_id,
        "user": user,
    }

    async with aiohttp.ClientSession() as session:
        try:
            async with session.post(url, headers=headers, json=data) as response:
                response.raise_for_status()
                async for line in response.content:
                    decoded_line = line.decode('utf-8').strip()
                    if decoded_line.startswith("data: "):
                        json_data = decoded_line[6:]
                        parsed_data = self.parse_message_data(json_data)
                        if parsed_data.get("answer") not in [None, ""]:
                            await self.push_frame(TextFrame(parsed_data.get("answer")))



        except aiohttp.ClientResponseError as e:
            print(f"HTTP error occurred: {e.status} - {e.message}")
        except Exception as e:
            print(f"An error occurred: {e}")
            return None


async def _process_context(self, context: OpenAILLMContext):
    await self.push_frame(LLMFullResponseStartFrame())
    logger.debug(f"Generating chat: {context.get_messages_json()}")
    messages = self._get_messages_from_str(context)
    await self.start_ttfb_metrics()
    await self.send_chat_request(messages)
    await self.stop_ttfb_metrics()
    await self.push_frame(LLMFullResponseEndFrame())


async def process_frame(self, frame: Frame, direction: FrameDirection):
    await super().process_frame(frame, direction)

    context = None

    if isinstance(frame, OpenAILLMContextFrame):
        context: OpenAILLMContext = frame.context
    elif isinstance(frame, LLMMessagesFrame):
        context = OpenAILLMContext.from_messages(frame.messages)
    elif isinstance(frame, VisionImageRawFrame):
        context = OpenAILLMContext.from_image_frame(frame)
    elif isinstance(frame, LLMModelUpdateFrame):
        logger.debug(f"Switching LLM model to: [{frame.model}]")
        self._create_client(frame.model)
    else:
        await self.push_frame(frame, direction)

    if context:
        await self._process_context(context)



def parse_message_data(self, json_data: str) -> dict:
    """
    解析从服务器返回的 JSON 数据,提取有用的信息。

    :param json_data: JSON 格式的字符串数据
    :return: 包含提取数据的字典
    """
    try:
        # 将 JSON 字符串解析为 Python 字典
        data = json.loads(json_data)

        # 提取有用的字段
        parsed_data = {
            "event": data.get("event"),
            "conversation_id": data.get("conversation_id"),
            "message_id": data.get("message_id"),
            "created_at": data.get("created_at"),
            "task_id": data.get("task_id"),
            "id": data.get("id"),
            "answer": data.get("answer"),
        }

        return parsed_data

    except json.JSONDecodeError as e:
        print(f"Failed to decode JSON: {e}")
        return {}

def _get_messages_from_str(
        self, context: OpenAILLMContext) -> str:
    openai_messages = context.get_messages()
    google_messages = []
    logger.debug(openai_messages)
    for message in openai_messages:
        content = message["content"]
        google_messages.append(content)
    return google_messages[-1]

chengleilovesky avatar Aug 14 '24 17:08 chengleilovesky

In the websocket server case, the effect of using the user speaking interruption function is not particularly good. I found that the audio transmission is too fast, which makes it impossible to interrupt. Will it cause two segments of speech to be output simultaneously? Interrupting in Daily is very effective

chengleilovesky avatar Aug 14 '24 17:08 chengleilovesky

do you have an example bot.py on how to us this? I have similar use case and would like to try your implementation.

ramishi avatar Aug 16 '24 12:08 ramishi

There's this PR https://github.com/pipecat-ai/pipecat/pull/378 from @ramishi

aconchillo avatar Aug 16 '24 15:08 aconchillo

do you have an example bot.py on how to us this? I have similar use case and would like to try your implementation.

How can I give you an example?

chengleilovesky avatar Aug 20 '24 08:08 chengleilovesky

do you have an example bot.py on how to us this? I have similar use case and would like to try your implementation.

How can I give you an example?

you can post it here please @chengleilovesky or we can PM if it's better for you.

ramishi avatar Aug 20 '24 22:08 ramishi

Closed as a duplicate of PR #378 .

chadbailey59 avatar Jan 09 '25 19:01 chadbailey59