Serving
Serving copied to clipboard
两个OP组合; AttributeError: 'VirtualOp' object has no attribute 'model_config'
模型串并联问题,如下图,先目标检测,然后根据检测结果再对图片进行分割,分割之后进行图像分类;然后和另一个目标检测的结果进行结果组合。最后一个OP不进行模型预测,只进行前面input_ops的预测结果进行整合。
启动报错
组合模型的代码
class CombineOp(Op):
def preprocess(self, input_data, data_id, log_id):
out_data = {}
for op_name, data in input_data.items():
if 'bbox_result' in data.keys():
_LOGGER.info("{}: {}".format(op_name, data['bbox_result']))
out_data[op_name] = str(data['bbox_result'])
else:
data_str = json.dumps(data).replace('"', "'").replace("'[", "[").replace("]'", "]")
_LOGGER.info("{}: {}".format(op_name, data_str))
out_data[op_name] = data_str
return out_data, True, None, ""
当图像分类OP的input_ops中包含read_op也会报同样错误. 关键代码
if type == "start":
op_map[id] = read_op
# continue
elif type == "end":
input_ops = []
for input_id in inputIds:
input_ops.append(op_map[input_id])
res_op = CombineOp("combine", input_ops=input_ops)
op_map[id] = res_op
else:
modelId = op["modelId"]
input_ops = []
op_temp = None
if inputIds is None:
input_ops.append(read_op)
else:
for input_id in inputIds:
input_ops.append(op_map[input_id])
if type == "classification":
op_temp = ClassificationOp(name=modelId, input_ops=input_ops)
else:
op_temp = DetectionOp(name=modelId, input_ops=input_ops)
op_map[id] = op_temp
起始节点为read_op, 结束节点为CombineOp(组合OP,不进行模型预测) 目标检测OP根据前置OP组合input_ops op_map为一个字典{OP的id: 对应OP}
Message that will be displayed on users' first issue
老哥有结果了吗,遇到了同样的问题QAQ @ @zhengwangwang
老哥有结果了吗,遇到了同样的问题QAQ @ @zhengwangwang
并没有,没人回复
老哥有结果了吗,遇到了同样的问题QAQ @ @zhengwangwang
有结果了,经过调试源码找到问题。首先是因为并联的两个线路层数不一致,会创建虚拟Op以达到相同的层级。创建虚拟op的代码在tag.py中,然后在pipeline_server.py中_init_ops方法中,只初始化了自己定义的op配置,虚拟OP没有定义
# _used_op是自己代码中定义的op,里面不包含tag中创建的虚拟OP,
# 因此会报错'VirtualOp' object has no attribute 'model_config'
for op in self._used_op:
if not isinstance(op, operator.RequestOp) and not isinstance(
op, operator.ResponseOp):
conf = op_conf.get(op.name, default_conf)
op.init_from_dict(conf)
报错的代码位置在operator.py Op类中的start_with_thread方法或者start_with_process,方法基本一样,就是一个OP启动一个(或多个,看concurrency配置)进程或者线程,具体报错代码如下
for concurrency_idx in range(self.concurrency):
p = multiprocessing.Process(
target=self._run,
args=(concurrency_idx, self._get_input_channel(),
self._get_output_channels(), False, trace_buffer,
self.model_config, self.workdir, self.thread_num,
self.device_type, self.devices, self.mem_optim,
self.ir_optim, self.precision, self.use_mkldnn,
self.mkldnn_cache_capacity, self.mkldnn_op_list,
self.mkldnn_bf16_op_list, self.is_jump_op(),
self.get_output_channels_of_jump_ops(),
self.min_subgraph_size, self.dynamic_shape_info,
self.use_calib))
p.daemon = True
p.start()
process.append(p)
这里是创建进程的,target就是OP自身的_run()方法,由于虚拟OP没有被init_from_dict,所以有些配置是不存在的,也就是报错原因,当然可以给虚拟OP也初始一下,但其实没必要,Op类的_run有23个参数,但是虚拟VirtualOp重载了_run方法,只有6个参数,也就是说有些配置是用不到的。我的做法是在创建线程的时候判断一下这个对象是否是虚拟OP,如果是虚拟OP,那么给_run方法的参数就只能是6个了,我改的代码如下
for concurrency_idx in range(self.concurrency):
if isinstance(self, VirtualOp):
p = multiprocessing.Process(
target=self._run,
args=(concurrency_idx, self._get_input_channel(),
self._get_output_channels(), None, False))
else:
p = multiprocessing.Process(
target=self._run,
args=(concurrency_idx, self._get_input_channel(),
self._get_output_channels(), False, trace_buffer,
self.model_config, self.workdir, self.thread_num,
self.device_type, self.devices, self.mem_optim,
self.ir_optim, self.precision, self.use_mkldnn,
self.mkldnn_cache_capacity, self.mkldnn_op_list,
self.mkldnn_bf16_op_list, self.is_jump_op(),
self.get_output_channels_of_jump_ops(),
self.min_subgraph_size, self.dynamic_shape_info,
self.use_calib))
这样就不会报配置错误了,而且给重载后的run方法传参也是正确的。
VirtualOp的_run函数中有个错误,是关于日志的
log = get_log_func(op_info_prefix)
get_log_func这个函数是不存在的,我给改为log = logging.getLogger(op_info_prefix),但是这个log又没啥用,其实这行代码可以删除
还有一个错误在调用self._auto_batching_generator的参数列表中没有log_func
参数,而是op_info_prefix
参数,直接把变量op_info_prefix
传给他就行了
batch_generator = self._auto_batching_generator(
input_channel=input_channel,
op_name=self.name,
batch_size=1,
timeout=None,
op_info_prefix=op_info_prefix)
## 函数定义为
def _auto_batching_generator(self, input_channel, op_name, batch_size,
timeout, op_info_prefix):
### pass