Serving icon indicating copy to clipboard operation
Serving copied to clipboard

两个OP组合; AttributeError: 'VirtualOp' object has no attribute 'model_config'

Open zhengwangwang opened this issue 2 years ago • 4 comments

模型串并联问题,如下图,先目标检测,然后根据检测结果再对图片进行分割,分割之后进行图像分类;然后和另一个目标检测的结果进行结果组合。最后一个OP不进行模型预测,只进行前面input_ops的预测结果进行整合。 image 启动报错 da715c481a09f476538da87fca49744 组合模型的代码

class CombineOp(Op):

    def preprocess(self, input_data, data_id, log_id):
        out_data = {}
        for op_name, data in input_data.items():
            if 'bbox_result' in data.keys():
                _LOGGER.info("{}: {}".format(op_name, data['bbox_result']))
                out_data[op_name] = str(data['bbox_result'])
            else:
                data_str = json.dumps(data).replace('"', "'").replace("'[", "[").replace("]'", "]")
                _LOGGER.info("{}: {}".format(op_name, data_str))
                out_data[op_name] = data_str
        return out_data, True, None, ""

当图像分类OP的input_ops中包含read_op也会报同样错误. 关键代码

            if type == "start":
                op_map[id] = read_op
                # continue
            elif type == "end":
                input_ops = []
                for input_id in inputIds:
                    input_ops.append(op_map[input_id])
                res_op = CombineOp("combine", input_ops=input_ops)
                op_map[id] = res_op
            else:
                modelId = op["modelId"]
                input_ops = []
                op_temp = None
                if inputIds is None:
                    input_ops.append(read_op)
                else:
                    for input_id in inputIds:
                        input_ops.append(op_map[input_id])
                if type == "classification":
                    op_temp = ClassificationOp(name=modelId, input_ops=input_ops)
                else:
                    op_temp = DetectionOp(name=modelId, input_ops=input_ops)
                op_map[id] = op_temp

起始节点为read_op, 结束节点为CombineOp(组合OP,不进行模型预测) 目标检测OP根据前置OP组合input_ops op_map为一个字典{OP的id: 对应OP}

zhengwangwang avatar Feb 22 '23 08:02 zhengwangwang

Message that will be displayed on users' first issue

github-actions[bot] avatar Feb 22 '23 08:02 github-actions[bot]

老哥有结果了吗,遇到了同样的问题QAQ @ @zhengwangwang

smallfish45 avatar Mar 16 '23 09:03 smallfish45

老哥有结果了吗,遇到了同样的问题QAQ @ @zhengwangwang

并没有,没人回复

zhengwangwang avatar Mar 16 '23 12:03 zhengwangwang

老哥有结果了吗,遇到了同样的问题QAQ @ @zhengwangwang

有结果了,经过调试源码找到问题。首先是因为并联的两个线路层数不一致,会创建虚拟Op以达到相同的层级。创建虚拟op的代码在tag.py中,然后在pipeline_server.py中_init_ops方法中,只初始化了自己定义的op配置,虚拟OP没有定义

# _used_op是自己代码中定义的op,里面不包含tag中创建的虚拟OP,
# 因此会报错'VirtualOp' object has no attribute 'model_config'
for op in self._used_op:
            if not isinstance(op, operator.RequestOp) and not isinstance(
                    op, operator.ResponseOp):
                conf = op_conf.get(op.name, default_conf)
                op.init_from_dict(conf)

报错的代码位置在operator.py Op类中的start_with_thread方法或者start_with_process,方法基本一样,就是一个OP启动一个(或多个,看concurrency配置)进程或者线程,具体报错代码如下

for concurrency_idx in range(self.concurrency):
            p = multiprocessing.Process(
                target=self._run,
                args=(concurrency_idx, self._get_input_channel(),
                      self._get_output_channels(), False, trace_buffer,
                      self.model_config, self.workdir, self.thread_num,
                      self.device_type, self.devices, self.mem_optim,
                      self.ir_optim, self.precision, self.use_mkldnn,
                      self.mkldnn_cache_capacity, self.mkldnn_op_list,
                      self.mkldnn_bf16_op_list, self.is_jump_op(),
                      self.get_output_channels_of_jump_ops(),
                      self.min_subgraph_size, self.dynamic_shape_info,
                      self.use_calib))
            p.daemon = True
            p.start()
            process.append(p)

这里是创建进程的,target就是OP自身的_run()方法,由于虚拟OP没有被init_from_dict,所以有些配置是不存在的,也就是报错原因,当然可以给虚拟OP也初始一下,但其实没必要,Op类的_run有23个参数,但是虚拟VirtualOp重载了_run方法,只有6个参数,也就是说有些配置是用不到的。我的做法是在创建线程的时候判断一下这个对象是否是虚拟OP,如果是虚拟OP,那么给_run方法的参数就只能是6个了,我改的代码如下

for concurrency_idx in range(self.concurrency):
            if isinstance(self, VirtualOp):
                p = multiprocessing.Process(
                    target=self._run,
                    args=(concurrency_idx, self._get_input_channel(),
                          self._get_output_channels(), None, False))
            else:
                p = multiprocessing.Process(
                    target=self._run,
                    args=(concurrency_idx, self._get_input_channel(),
                          self._get_output_channels(), False, trace_buffer,
                          self.model_config, self.workdir, self.thread_num,
                          self.device_type, self.devices, self.mem_optim,
                          self.ir_optim, self.precision, self.use_mkldnn,
                          self.mkldnn_cache_capacity, self.mkldnn_op_list,
                          self.mkldnn_bf16_op_list, self.is_jump_op(),
                          self.get_output_channels_of_jump_ops(),
                          self.min_subgraph_size, self.dynamic_shape_info,
                          self.use_calib))

这样就不会报配置错误了,而且给重载后的run方法传参也是正确的。 VirtualOp的_run函数中有个错误,是关于日志的 log = get_log_func(op_info_prefix) get_log_func这个函数是不存在的,我给改为log = logging.getLogger(op_info_prefix),但是这个log又没啥用,其实这行代码可以删除 还有一个错误在调用self._auto_batching_generator的参数列表中没有log_func参数,而是op_info_prefix参数,直接把变量op_info_prefix传给他就行了

batch_generator = self._auto_batching_generator(
            input_channel=input_channel,
            op_name=self.name,
            batch_size=1,
            timeout=None,
            op_info_prefix=op_info_prefix)
## 函数定义为

def _auto_batching_generator(self, input_channel, op_name, batch_size,
                                 timeout, op_info_prefix):
    ### pass

zhengwangwang avatar Mar 31 '23 13:03 zhengwangwang