bisheng icon indicating copy to clipboard operation
bisheng copied to clipboard

【bug】工作流的构建,条件分支再次连接汇合节点,工作流会直接结束

Open IceHope opened this issue 11 months ago • 3 comments

问题描述

在代码分支feat/0.4.2的基础上,构建一个工作流,工作流的信息如下: %%{init: {'flowchart': {'curve': 'linear'}}}%% graph TD; __start__([<p>__start__</p>]):::first start_dd1cc(start_dd1cc) input_7932b(input_7932b<hr/><small><em>__interrupt = before</em></small>) llm_3b72f(llm_3b72f) llm_24b89(llm_24b89) output_9afde(output_9afde) end_a3447(end_a3447) condition_e3386(condition_e3386) output_9afde_fake(output_9afde_fake<hr/><small><em>__interrupt = before</em></small>) __end__([<p>__end__</p>]):::last __start__ --> start_dd1cc; end_a3447 --> __end__; input_7932b --> condition_e3386; llm_24b89 --> output_9afde; llm_3b72f --> output_9afde; output_9afde --> output_9afde_fake; start_dd1cc --> input_7932b; output_9afde_fake -.-> end_a3447; condition_e3386 -.-> llm_3b72f; condition_e3386 -.-> llm_24b89; classDef default fill:#f2f0ff,line-height:1.2 classDef first fill-opacity:0 classDef last fill:#bfb6fc

大概的流程: 开始节点---输入节点---条件节点----2个大模型节点(llm_24b89 +llm_3b72f )---输出节点output_9afde ---结束节点

可参照类似简单的demo复现

输入的内容,经过条件节点之后,会执行条件分支上2个节点(llm_24b89 ,llm_3b72f )其中之一,但是执行之后,工作流直接执行完了,并没有执行汇合的output_9afde 节点。

原因分析:

`在src/backend/bisheng/workflow/graph/graph_engine.py构建工作流的方法:

  def build_more_fan_in_node(self):
    for node_id, source_ids in self.nodes_fan_in.items():
        if not source_ids or len(source_ids) <= 1:
            continue
        wait_nodes, no_wait_nodes = self.parse_fan_in_node(node_id)
        if wait_nodes:
            logger.debug(f'node {node_id} need wait nodes {wait_nodes}')
            self.graph_builder.add_edge(wait_nodes, node_id)
        if no_wait_nodes:
            for one in no_wait_nodes:
                logger.debug(f'node {node_id} no need wait nodes {one}')
                self.graph_builder.add_edge(one, node_id)

在添加output_9afde 节点的时候,条件分支上的2个节点llm_24b89 ,llm_3b72f 当成了并行节点处理,添加边的时候wait_nodes=[llm_24b89 ,llm_3b72f ],但是条件分支只执行1个,output_9afde 永远不会等到2个节点同时执行完,由于langgrah本身的特性,会直接结束执行,导致汇合节点及其之后的节点都不会执行`

可能的思路:

` def parse_fan_in_node(self, node_id: str):
    source_ids = self.nodes_fan_in.get(node_id)
    all_next_nodes = self.nodes_next_nodes.get(node_id)
    wait_nodes = []
    no_wait_nodes = []
    for one in source_ids:
        # output节点有特殊处理逻辑
        if one.startswith(('output_', 'condition_')):
            continue
        if one in all_next_nodes:
            no_wait_nodes.append(one)
        else:
            wait_nodes.append(one)
    return wait_nodes, no_wait_nodes`

在 parse_fan_in_node()解析节点的时候,是否可以加入溯源的判断? 汇合节点之前的节点
 - 如果是并行节点,则是wait_nodes,
 - 如果是条件分支节点,则是no_wait_nodes。
 不过,并行分支里面嵌套条件分支,条件分支再嵌套并行分支呢,可能比较麻烦

希望评估下问题,以及解决办法,谢谢~

IceHope avatar Jan 13 '25 08:01 IceHope

这个我们新版本正在评估,怎么把互斥边上的节点不作为并行节点来处理

zgqgit avatar Jan 16 '25 09:01 zgqgit

这个我们新版本正在评估,怎么把互斥边上的节点不作为并行节点来处理

毕昇最新的代码分支,feat/1.1.0,解决了类似上面简单的场景

1、我的思路理解为:

对于每个汇合的节点,判断前面所有的条件节点到当前节点的路径是否互斥,如果互斥,当成条件分支,每个节点单独连接汇合节点。如果不是互斥,当成并行分支,前面的节点,放在一个数组,一起连接汇合节点

2、核心代码:

判断是否存在从condition节点或者output节点(选择型交互)到此节点的 两条不重复的路径 all_branches = [] for one in self.condition_nodes: branches = self.edges.get_all_edges_nodes(one, node_id) for branch in branches: if node_id not in branch: continue branch.remove(node_id) branch.remove(one) all_branches.append(branch)

    def judge_not_same_branch():
        # 判断所有边中是否存在两条不重复的路径
        for i in range(len(all_branches)):
            for j in range(i + 1, len(all_branches)):
                if not (set(all_branches[i]) & set(all_branches[j])):
                    return True
        return False

3、存在的缺陷

简单场景已经满足,但是有一种情况,仍然有bug,并行分支嵌套条件分支,比如输入节点a---2个并行分支,其中一个分支,存在一个条件节点b,连接2个节点b1和b2,然后跟另一个并行分支的节点c,3个节点一起连接一个汇合节点d。按照代码的思路

首先:条件节点b到汇合节点d的分支为: b-b1-d b-b2-d 去掉开始和结束,剩下 b1 b2 路径互斥的,当成条件分支来处理,也就是b1 b2 c 单独连接d。而实际情况,大的分支应该是并行的,这样连接是错误

4、个人建议

按照目前的代码设计,在b1 b2 之后增加一个汇合节点bb,汇合节点去连接d,就可以解决了。这个汇合节点,比如代码节点,提取条件分支中b1 b2的非空,然后d节点直接引用这个代码节点的输出。 增加之后,条件节点b到汇合节点d的分支为: b-b1-bb-d b-b2-bb-d 去掉开始和结束,剩下 b1-bb b2-bb 有共同的节点,当成并行的,即:builder.add_edge(["bb",”c”], "d") 这样不仅解决了代码缺陷,而且方便后续节点引用。

上面的思路,增加一个汇合节点,目前代码节点,可以实现,类似于扣子的变量聚合节点

5、后续

请问下,上面的并行条件嵌套的bug,是打算代码层面解决呢?运行之前,是无法确定的,没法静态计算,貌似只能动态计算。还是在实际的业务场景,增加汇合去约束呢?

IceHope avatar Mar 19 '25 09:03 IceHope

@zgqgit

dolphin0618 avatar Nov 07 '25 04:11 dolphin0618