ragas icon indicating copy to clipboard operation
ragas copied to clipboard

New executor throws `RuntimeError: ... got Future <..> attached to a differen t loop`

Open joy13975 opened this issue 1 year ago • 4 comments

Describe the bug Calling TestsetGenerator.generate_with_langchain_docs() throws the following error, then subsequent attempts to embed (using the same Langchain cache) would get stuck and never proceed past 0%. Suspecting cache db(sqlite3) corruption but then integration check reported no problem..

This requires deeper investigation but I'm raising it before I forget/lose the details.

Ragas version: 0.1.3 Python version: Python 3.9.13

Code to Reproduce

lc_docs = ...
testset = generator.generate_with_langchain_docs(
        documents=lc_docs,
        test_size=20,
        distributions={simple: 0.5, reasoning: 0.25, conditional: 0.25},
    )

Error trace

Exception in thread Thread-7299:                                                                          
Traceback (most recent call last):                                                                        
  File "/home/ec2-user/.pyenv/versions/3.9.13/lib/python3.9/threading.py", line 980, in _bootstrap_inner  
    self.run()
  File "/home/ec2-user/code/ragas/src/ragas/executor.py", line 93, in run                                 
    results = self.loop.run_until_complete(self._aresults())                                              
  File "/home/ec2-user/.pyenv/versions/3.9.13/lib/python3.9/asyncio/base_events.py", line 647, in run_unti
l_complete                                                                                                
    return future.result()                                                                                
  File "/home/ec2-user/code/ragas/src/ragas/executor.py", line 81, in _aresults                           
    raise e                                                                                               
  File "/home/ec2-user/code/ragas/src/ragas/executor.py", line 76, in _aresults                           
    r = await future                                                                                      
  File "/home/ec2-user/.pyenv/versions/3.9.13/lib/python3.9/asyncio/tasks.py", line 611, in _wait_for_one 
    return f.result()  # May raise f.exception().                                                         
  File "/home/ec2-user/code/ragas/src/ragas/executor.py", line 36, in sema_coro                           
    return await coro                                                                                     
  File "/home/ec2-user/code/ragas/src/ragas/executor.py", line 109, in wrapped_callable_async             
    return counter, await callable(*args, **kwargs)                                                       
  File "/home/ec2-user/code/ragas/src/ragas/testset/evolutions.py", line 145, in evolve                   
    ) = await self._aevolve(current_tries, current_nodes)                                                 
  File "/home/ec2-user/code/ragas/src/ragas/testset/evolutions.py", line 573, in _aevolve                 
    result = await self._acomplex_evolution(                                                              
  File "/home/ec2-user/code/ragas/src/ragas/testset/evolutions.py", line 375, in _acomplex_evolution      
    simple_question, current_nodes, _ = await self.se._aevolve(                                           
  File "/home/ec2-user/code/ragas/src/ragas/testset/evolutions.py", line 288, in _aevolve                 
    passed = await self.node_filter.filter(merged_node)                                                   
  File "/home/ec2-user/code/ragas/src/ragas/testset/filters.py", line 54, in filter                       
    results = await self.llm.generate(prompt=prompt)                                                      
  File "/home/ec2-user/code/ragas/src/ragas/llms/base.py", line 92, in generate                           
    return await agenerate_text_with_retry(                                                               
  File "/home/ec2-user/code/parakeet/.venv/lib/python3.9/site-packages/tenacity/_asyncio.py", line 88, in 
async_wrapped                                                                                             
    return await fn(*args, **kwargs)                                                                      
  File "/home/ec2-user/code/parakeet/.venv/lib/python3.9/site-packages/tenacity/_asyncio.py", line 47, in 
__call__                                                                                                  
    do = self.iter(retry_state=retry_state)
  File "/home/ec2-user/code/parakeet/.venv/lib/python3.9/site-packages/tenacity/__init__.py", line 325, in
 iter                                                                                                     
    raise retry_exc.reraise()                                                                             
  File "/home/ec2-user/code/parakeet/.venv/lib/python3.9/site-packages/tenacity/__init__.py", line 158, in
 reraise                                                                                                  
    raise self.last_attempt.result()                                                                      
  File "/home/ec2-user/.pyenv/versions/3.9.13/lib/python3.9/concurrent/futures/_base.py", line 439, in res
ult                                                                                                       
    return self.__get_result()                                                                            
  File "/home/ec2-user/.pyenv/versions/3.9.13/lib/python3.9/concurrent/futures/_base.py", line 391, in __g
et_result                                                                                                 
    raise self._exception                                                                                 
  File "/home/ec2-user/code/parakeet/.venv/lib/python3.9/site-packages/tenacity/_asyncio.py", line 50, in 
__call__                                                                                                  
    result = await fn(*args, **kwargs)                                                                    
  File "/home/ec2-user/code/ragas/src/ragas/llms/base.py", line 176, in agenerate_text                    
    result = await self.langchain_llm.agenerate_prompt(                                                   
  File "/home/ec2-user/code/parakeet/.venv/lib/python3.9/site-packages/langchain_core/language_models/chat
_models.py", line 554, in agenerate_prompt                                                                
    return await self.agenerate(                                                                          
  File "/home/ec2-user/code/parakeet/.venv/lib/python3.9/site-packages/langchain_core/language_models/chat
_models.py", line 514, in agenerate                                                                       
    raise exceptions[0]                                                                                   
  File "/home/ec2-user/code/parakeet/.venv/lib/python3.9/site-packages/langchain_core/language_models/chat
_models.py", line 630, in _agenerate_with_cache                                                           
    result = await self._agenerate(                                                                       
  File "/home/ec2-user/code/parakeet/.venv/lib/python3.9/site-packages/langchain_google_genai/chat_models.
py", line 574, in _agenerate                                                                              
    response: genai.types.GenerateContentResponse = await _achat_with_retry(                              
  File "/home/ec2-user/code/parakeet/.venv/lib/python3.9/site-packages/langchain_google_genai/chat_models.
py", line 185, in _achat_with_retry                                                                       
    return await _achat_with_retry(**kwargs)                                                              
  File "/home/ec2-user/code/parakeet/.venv/lib/python3.9/site-packages/tenacity/_asyncio.py", line 88, in 
async_wrapped                                                                                             
    return await fn(*args, **kwargs)
  File "/home/ec2-user/code/parakeet/.venv/lib/python3.9/site-packages/tenacity/_asyncio.py", line 47, in 
__call__                                                                                                  
    do = self.iter(retry_state=retry_state)                                                               
  File "/home/ec2-user/code/parakeet/.venv/lib/python3.9/site-packages/tenacity/__init__.py", line 314, in
 iter                                                                                                     
    return fut.result()                                                                                   
  File "/home/ec2-user/.pyenv/versions/3.9.13/lib/python3.9/concurrent/futures/_base.py", line 439, in res
ult                                                                                                       
    return self.__get_result()                                                                            
  File "/home/ec2-user/.pyenv/versions/3.9.13/lib/python3.9/concurrent/futures/_base.py", line 391, in __g
et_result                                                                                                 
    raise self._exception                                                                                 
  File "/home/ec2-user/code/parakeet/.venv/lib/python3.9/site-packages/tenacity/_asyncio.py", line 50, in 
__call__                                                                                                  
    result = await fn(*args, **kwargs)                                                                    
  File "/home/ec2-user/code/parakeet/.venv/lib/python3.9/site-packages/langchain_google_genai/chat_models.
py", line 183, in _achat_with_retry                                                                       
    raise e                                                                                               
  File "/home/ec2-user/code/parakeet/.venv/lib/python3.9/site-packages/langchain_google_genai/chat_models.
py", line 176, in _achat_with_retry                                                                       
    return await generation_method(**kwargs)                                                              
  File "/home/ec2-user/code/parakeet/.venv/lib/python3.9/site-packages/google/generativeai/generative_mode
ls.py", line 410, in send_message_async                                                                   
    response = await self.model.generate_content_async(                                                   
  File "/home/ec2-user/code/parakeet/.venv/lib/python3.9/site-packages/google/generativeai/generative_mode
ls.py", line 275, in generate_content_async                                                               
    response = await self._async_client.generate_content(request)
    return await retry_target(                                                                            
  File "/home/ec2-user/code/parakeet/.venv/lib/python3.9/site-packages/google/api_core/retry/retry_unary_a
sync.py", line 160, in retry_target                                                                       
    _retry_error_helper(                                                                                  
  File "/home/ec2-user/code/parakeet/.venv/lib/python3.9/site-packages/google/api_core/retry/retry_base.py
", line 212, in _retry_error_helper                                                                       
    raise final_exc from source_exc                                                                       
  File "/home/ec2-user/code/parakeet/.venv/lib/python3.9/site-packages/google/api_core/retry/retry_unary_a
sync.py", line 155, in retry_target                                                                       
    return await target()
  File "/home/ec2-user/code/parakeet/.venv/lib/python3.9/site-packages/google/api_core/grpc_helpers_async.
py", line 85, in __await__                                                                                
    response = yield from self._call.__await__()                                                          
  File "/home/ec2-user/code/parakeet/.venv/lib/python3.9/site-packages/grpc/aio/_call.py", line 299, in __
await__                                                                                                   
    response = yield from self._call_response                                                             
RuntimeError: Task <Task pending name='Task-19654' coro=<BaseChatModel._agenerate_with_cache() running at 
/home/ec2-user/code/parakeet/.venv/lib/python3.9/site-packages/langchain_core/language_models/chat_models.
py:630> cb=[_gather.<locals>._done_callback() at /home/ec2-user/.pyenv/versions/3.9.13/lib/python3.9/async
io/tasks.py:767]> got Future <Task pending name='Task-19655' coro=<UnaryUnaryCall._invoke() running at /ho
me/ec2-user/code/parakeet/.venv/lib/python3.9/site-packages/grpc/aio/_call.py:568>> attached to a differen
t loop
Traceback (most recent call last):
  File "/home/ec2-user/code/parakeet/parakeet/components/ragas_gen_testset.py", line 84, in <module>
    testset = generator.generate_with_langchain_docs( 
  File "/home/ec2-user/code/ragas/src/ragas/testset/generator.py", line 223, in generate_with_langchain_do
cs                                                                                                        
    return self.generate(                            
  File "/home/ec2-user/code/ragas/src/ragas/testset/generator.py", line 316, in generate
    raise ExceptionInRunner()
ragas.exceptions.ExceptionInRunner: The runner thread which was running the jobs raised an exeception. Rea
d the traceback above to debug it. You can also pass `raise_exceptions=False` incase you want to show only
 a warning message instead.

Expected behavior Don't throw error and don't get stuck.

Additional context After getting this, my langchain's sqlite3 cache always gets corrupted i.e. next cached embedding access never completes. However the db integration check reports ok.

joy13975 avatar Feb 29 '24 06:02 joy13975

I can consistently reproduce with a call to evaluate from a Jupyter notebook, with or without is_async=True.

Exception in thread Thread-83:
Traceback (most recent call last):
  File "/Users/rbellamy/.pyenv/versions/3.11.6/lib/python3.11/threading.py", line 1045, in _bootstrap_inner
    self.run()
  File "/Users/rbellamy/test/.venv/lib/python3.11/site-packages/ragas/executor.py", line 93, in run
    results = self.loop.run_until_complete(self._aresults())
...
<SNIP/>
...
RuntimeError: Task <Task pending name='Task-178' coro=<as_completed.<locals>.sema_coro() 
  running at /Users/rbellamy/test/.venv/lib/python3.11/site-packages/ragas/executor.py:36> 
  cb=[as_completed.<locals>._on_completion() at /Users/rbellamy/.pyenv/versions/3.11.6/lib/python3.11/asyncio/tasks.py:602]> 
  got Future <Task pending name='Task-227' 
  coro=<UnaryUnaryCall._invoke() running at /Users/rbellamy/test/.venv/lib/python3.11/site-packages/grpc/aio/_call.py:568>> attached to a different loop

Calling code:

def get_eval_chain_results(question, answer, ground_truth_answer, llm, embeddings, splitter):
    # testset = get_testset_data(answer["source_documents"], llm, embeddings, splitter)
    features = Features(
        {
            "question": Value(dtype="string", id=None),
            "answer": Value(dtype="string", id=None),
            "contexts": Sequence(feature=Value(dtype="string", id=None)),
        }
    )
    mapping = {
        "question": [question],
        "answer": [answer["answer"]],
        "contexts": [[source.page_content for source in answer["source_documents"]]],
    }
    metrics = [
        answer_relevancy,
        context_relevancy,
        faithfulness,
    ]

    if ground_truth_answer is not None:
        features["ground_truths"] = Sequence(feature=Value(dtype="string", id=None))
        mapping["ground_truths"] = [[ground_truth_answer]]
        metrics.extend([answer_similarity, context_precision, context_recall])

    try:
        results = evaluate(
            dataset=Dataset.from_dict(
                mapping=mapping,
                features=features,
            ),
            metrics=metrics,
            llm=LangchainLLMWrapper(llm),
            embeddings=embeddings,
            is_async=False,
        )
    except Exception as e:
        results = {
            "answer_relevancy": float("nan"),
            "context_relevancy": float("nan"),
            "faithfulness": float("nan"),
            "answer_similarity": float("nan"),
            "context_precision": float("nan"),
            "context_recall": float("nan"),
        }
        print(f"Exception occurred while evaluating question: {question}")
        print(f"answer: {answer['answer']}")
        print(f"Error: {e}")

    return results

RC1 didn't have this issue.

rbellamy avatar Mar 01 '24 16:03 rbellamy

#689 seems to fix this issue since I can no longer reproduce the error, but I don't know from a theory & technical point of view it is really the correct fix or not. Haven't had time to verfiy exactly

joy13975 avatar Mar 01 '24 16:03 joy13975

@rbellamy could that have anything to do with https://github.com/jupyter/jupyter_console/issues/241?

joy13975 avatar Mar 01 '24 19:03 joy13975

I do not. I think it's more likely something like the issue outlined in using-autoawait-in-a-notebook-ipykernel, and https://github.com/ipython/ipython/issues/11338.

I've followed those directions and things are working with:

import nest_asyncio
nest_asyncio.apply()

rbellamy avatar Mar 01 '24 23:03 rbellamy

The nest_asyncio fix no longer works for me. The symptoms in Jupyter are that the first evaluation will complete, but all subsequent calls to evaluate will fail with some kind of deadlock or race condition in the event loop.

I've tried a bunch of different methods of initiating the loop in my copy of executor.py without appreciable effect. I wish I had more time to look into this - I really want to be able to use this tool during evaluations. It's hard when I can't reliably run it in Jupyter.

rbellamy avatar May 27 '24 14:05 rbellamy

hey @joy13975 I took inspiration from your code and removed the runner as you suggested in the above PR. closing this one for now. Thanks a lot for your input ❤️ .

if you get some time to look at that PR I would really appreciate it too!

jjmachan avatar Jul 12 '24 14:07 jjmachan