ragas icon indicating copy to clipboard operation
ragas copied to clipboard

Multi-threading processing error

Open jietang00723-cloud opened this issue 4 months ago • 1 comments

[ ] I have checked the documentation and related resources and couldn't resolve my bug.

Describe the bug When I used the evaluate() method to conduct multi-threaded evaluation of my rag application, an Exception occurred: Exception raised in Job[0]: AttributeError('NoneType' object has no attribute 'generate'). But I checked my Langsmith tracking and found that there were records. But the final result I got here is NAN. Can the evaluate() function not be processed using multi-threading? Only coroutines or sequential processing can be used.

Ragas version: 0.2.15 Python version: v3.12.5

Code to Reproduce

def get_eva_result_from_llm(self,data_q:Queue,result_dict,rag_run_config:dict,stop_event=None,**kwargs):
        self.log.debug(f"开启了评测线程")
        while True:
            if stop_event and stop_event.is_set():
                break
            try:
                index,cont = data_q.get(timeout=150)
                if index is None:
                    data_q.task_done()
                    break
                if isinstance(cont,dict):
                    self.log.debug(f"cont: \n{cont}")
                    cont_copy = cont.copy()

                    for i in cont_copy:
                        if i == "retrieved_contexts":
                            if len(cont[i][0]) == 0:
                                cont[i][0].append("无")
                                self.log.info(f"index:{index},cont_copy:{cont_copy}")
                                continue
                        if len(cont[i]) == 0:
                            cont[i].append("无") 

                    rag_run_config["eval_data"] = cont
                    eva_id =  f"{RunAgentConfig.TASK_ID}-{index}"
                    rag_run_config["experiment_name"] = eva_id
                    result=self.ragEva.ragas_evaluation_from_dict(**rag_run_config)
                    result_dict[index] = result
                    self.log.info(f"eva_id:{eva_id} 评测完成")
                else:
                    self.log.error(f"数据格式不对,评测失败,index:{index},type:{type(cont)},need type:dict()")
            except Exception as e:
                self.log.error(f"评测出错:\n {str(e)}",exc_info=True)
            finally:
                if index  is not None:
                    data_q.task_done()

def test_after_use_llm(self,test_case_path=None,config=None):

        self.log.info("开始工作")

        if test_case_path is None:
            test_case_path = self.rag_run_config.get("test_case_path","空")
        if config is not None:
            self.updata_config(config)
        request_q = Queue(maxsize=10)  # 可选参数:队列最大容量
        response_q =Queue(maxsize=10)
        manager = Manager()
        result_dict = manager.dict()  # 线程安全的字典

        self.log.debug("原始测试用例数据加载中")
        df = self.evadataTool.get_data_from_excel(test_case_path)
        df = df.head(4)
        if df.empty:
            self.log.error(f"test case path is null,case path :{test_case_path}")
            return result_dict
        self.log.debug(f"原始数据加载完毕,共 {len(df)} 条记录")
        max_workers = 6
        stage1_workers = max(1, max_workers // 3)
        stage2_workers = max(1, max_workers - stage1_workers)
        # 创建线程池(推荐使用with语句管理资源)
        try:
            with ThreadPoolExecutor(max_workers=max_workers) as executor:
                # 启动第一阶段工作线程
                stage1_futures = [
                    executor.submit(self.evadataTool.get_eva_data_to_queue,request_q,response_q)
                    for _ in range(stage1_workers)
                ]
                rag_run_config = self.rag_run_config
                stage2_futures = [
                    executor.submit(self.get_eva_result_from_llm,response_q,result_dict,rag_run_config)
                    for _ in range(stage2_workers)
                ]
                for i,row in df.iterrows():
                    request_q.put((i,row))
                for _ in range(stage1_workers):
                    request_q.put((None,None))
                # 等待第一阶段工作线程完成
                for f in stage1_futures:
                    f.result()
                for _ in range(stage2_workers):
                    response_q.put((None,None))
                for f in stage2_futures:
                    f.result()

            self.log.info(f"requeset_q: {request_q.qsize()},respons_q:{response_q.qsize()}")
            try:
                sorted_dict = {i:result_dict[i] for i in sorted(result_dict)}
                df = pd.concat(sorted_dict.values(),ignore_index=True)
                self.ragEva.get_result_to_html(df)
            except:
                pass
            finally:
                executor.shutdown(wait=False)
        finally:
            # 确保清理资源
            manager.shutdown()
            self.cleanup_async()
            for thread in threading.enumerate():
                self.log.info(f"  - {thread.name} (ID: {thread.ident}, 存活: {thread.is_alive()})")
                stack = sys._current_frames().get(thread.ident)
                if stack:
                    self.log.info("调用栈:")
                    for filename, lineno, name, line in traceback.extract_stack(stack):
                        self.log.info(f"  {filename}:{lineno} - {name}() - {line or ''}")
                else:
                    self.log.info("  无法获取调用栈信息")
                
                self.log.info("-" * 60)

Error trace

 Job[0]: AttributeError('NoneType' object has no attribute 'generate')

Expected behavior The evaluation can be conducted using multi-threading, and the evaluation results are stable without any NAN values.

Additional context Add any other context about the problem here.

jietang00723-cloud avatar Aug 27 '25 02:08 jietang00723-cloud