【bug】工作流的构建,条件分支再次连接汇合节点,工作流会直接结束
问题描述
在代码分支feat/0.4.2的基础上,构建一个工作流,工作流的信息如下:
%%{init: {'flowchart': {'curve': 'linear'}}}%% graph TD; __start__([<p>__start__</p>]):::first start_dd1cc(start_dd1cc) input_7932b(input_7932b<hr/><small><em>__interrupt = before</em></small>) llm_3b72f(llm_3b72f) llm_24b89(llm_24b89) output_9afde(output_9afde) end_a3447(end_a3447) condition_e3386(condition_e3386) output_9afde_fake(output_9afde_fake<hr/><small><em>__interrupt = before</em></small>) __end__([<p>__end__</p>]):::last __start__ --> start_dd1cc; end_a3447 --> __end__; input_7932b --> condition_e3386; llm_24b89 --> output_9afde; llm_3b72f --> output_9afde; output_9afde --> output_9afde_fake; start_dd1cc --> input_7932b; output_9afde_fake -.-> end_a3447; condition_e3386 -.-> llm_3b72f; condition_e3386 -.-> llm_24b89; classDef default fill:#f2f0ff,line-height:1.2 classDef first fill-opacity:0 classDef last fill:#bfb6fc
大概的流程: 开始节点---输入节点---条件节点----2个大模型节点(llm_24b89 +llm_3b72f )---输出节点output_9afde ---结束节点
可参照类似简单的demo复现
输入的内容,经过条件节点之后,会执行条件分支上2个节点(llm_24b89 ,llm_3b72f )其中之一,但是执行之后,工作流直接执行完了,并没有执行汇合的output_9afde 节点。
原因分析:
`在src/backend/bisheng/workflow/graph/graph_engine.py构建工作流的方法:
def build_more_fan_in_node(self):
for node_id, source_ids in self.nodes_fan_in.items():
if not source_ids or len(source_ids) <= 1:
continue
wait_nodes, no_wait_nodes = self.parse_fan_in_node(node_id)
if wait_nodes:
logger.debug(f'node {node_id} need wait nodes {wait_nodes}')
self.graph_builder.add_edge(wait_nodes, node_id)
if no_wait_nodes:
for one in no_wait_nodes:
logger.debug(f'node {node_id} no need wait nodes {one}')
self.graph_builder.add_edge(one, node_id)
在添加output_9afde 节点的时候,条件分支上的2个节点llm_24b89 ,llm_3b72f 当成了并行节点处理,添加边的时候wait_nodes=[llm_24b89 ,llm_3b72f ],但是条件分支只执行1个,output_9afde 永远不会等到2个节点同时执行完,由于langgrah本身的特性,会直接结束执行,导致汇合节点及其之后的节点都不会执行`
可能的思路:
` def parse_fan_in_node(self, node_id: str):
source_ids = self.nodes_fan_in.get(node_id)
all_next_nodes = self.nodes_next_nodes.get(node_id)
wait_nodes = []
no_wait_nodes = []
for one in source_ids:
# output节点有特殊处理逻辑
if one.startswith(('output_', 'condition_')):
continue
if one in all_next_nodes:
no_wait_nodes.append(one)
else:
wait_nodes.append(one)
return wait_nodes, no_wait_nodes`
在 parse_fan_in_node()解析节点的时候,是否可以加入溯源的判断? 汇合节点之前的节点
- 如果是并行节点,则是wait_nodes,
- 如果是条件分支节点,则是no_wait_nodes。
不过,并行分支里面嵌套条件分支,条件分支再嵌套并行分支呢,可能比较麻烦
希望评估下问题,以及解决办法,谢谢~
这个我们新版本正在评估,怎么把互斥边上的节点不作为并行节点来处理
这个我们新版本正在评估,怎么把互斥边上的节点不作为并行节点来处理
毕昇最新的代码分支,feat/1.1.0,解决了类似上面简单的场景
1、我的思路理解为:
对于每个汇合的节点,判断前面所有的条件节点到当前节点的路径是否互斥,如果互斥,当成条件分支,每个节点单独连接汇合节点。如果不是互斥,当成并行分支,前面的节点,放在一个数组,一起连接汇合节点
2、核心代码:
判断是否存在从condition节点或者output节点(选择型交互)到此节点的 两条不重复的路径
all_branches = []
for one in self.condition_nodes:
branches = self.edges.get_all_edges_nodes(one, node_id)
for branch in branches:
if node_id not in branch:
continue
branch.remove(node_id)
branch.remove(one)
all_branches.append(branch)
def judge_not_same_branch():
# 判断所有边中是否存在两条不重复的路径
for i in range(len(all_branches)):
for j in range(i + 1, len(all_branches)):
if not (set(all_branches[i]) & set(all_branches[j])):
return True
return False
3、存在的缺陷
简单场景已经满足,但是有一种情况,仍然有bug,并行分支嵌套条件分支,比如输入节点a---2个并行分支,其中一个分支,存在一个条件节点b,连接2个节点b1和b2,然后跟另一个并行分支的节点c,3个节点一起连接一个汇合节点d。按照代码的思路
首先:条件节点b到汇合节点d的分支为: b-b1-d b-b2-d 去掉开始和结束,剩下 b1 b2 路径互斥的,当成条件分支来处理,也就是b1 b2 c 单独连接d。而实际情况,大的分支应该是并行的,这样连接是错误
4、个人建议
按照目前的代码设计,在b1 b2 之后增加一个汇合节点bb,汇合节点去连接d,就可以解决了。这个汇合节点,比如代码节点,提取条件分支中b1 b2的非空,然后d节点直接引用这个代码节点的输出。 增加之后,条件节点b到汇合节点d的分支为: b-b1-bb-d b-b2-bb-d 去掉开始和结束,剩下 b1-bb b2-bb 有共同的节点,当成并行的,即:builder.add_edge(["bb",”c”], "d") 这样不仅解决了代码缺陷,而且方便后续节点引用。
上面的思路,增加一个汇合节点,目前代码节点,可以实现,类似于扣子的变量聚合节点
5、后续
请问下,上面的并行条件嵌套的bug,是打算代码层面解决呢?运行之前,是无法确定的,没法静态计算,貌似只能动态计算。还是在实际的业务场景,增加汇合去约束呢?
@zgqgit