New executor throws `RuntimeError: ... got Future <..> attached to a differen t loop`
Describe the bug
Calling TestsetGenerator.generate_with_langchain_docs() throws the following error, then subsequent attempts to embed (using the same Langchain cache) would get stuck and never proceed past 0%. Suspecting cache db(sqlite3) corruption but then integration check reported no problem..
This requires deeper investigation but I'm raising it before I forget/lose the details.
Ragas version: 0.1.3
Python version: Python 3.9.13
Code to Reproduce
lc_docs = ...
testset = generator.generate_with_langchain_docs(
documents=lc_docs,
test_size=20,
distributions={simple: 0.5, reasoning: 0.25, conditional: 0.25},
)
Error trace
Exception in thread Thread-7299:
Traceback (most recent call last):
File "/home/ec2-user/.pyenv/versions/3.9.13/lib/python3.9/threading.py", line 980, in _bootstrap_inner
self.run()
File "/home/ec2-user/code/ragas/src/ragas/executor.py", line 93, in run
results = self.loop.run_until_complete(self._aresults())
File "/home/ec2-user/.pyenv/versions/3.9.13/lib/python3.9/asyncio/base_events.py", line 647, in run_unti
l_complete
return future.result()
File "/home/ec2-user/code/ragas/src/ragas/executor.py", line 81, in _aresults
raise e
File "/home/ec2-user/code/ragas/src/ragas/executor.py", line 76, in _aresults
r = await future
File "/home/ec2-user/.pyenv/versions/3.9.13/lib/python3.9/asyncio/tasks.py", line 611, in _wait_for_one
return f.result() # May raise f.exception().
File "/home/ec2-user/code/ragas/src/ragas/executor.py", line 36, in sema_coro
return await coro
File "/home/ec2-user/code/ragas/src/ragas/executor.py", line 109, in wrapped_callable_async
return counter, await callable(*args, **kwargs)
File "/home/ec2-user/code/ragas/src/ragas/testset/evolutions.py", line 145, in evolve
) = await self._aevolve(current_tries, current_nodes)
File "/home/ec2-user/code/ragas/src/ragas/testset/evolutions.py", line 573, in _aevolve
result = await self._acomplex_evolution(
File "/home/ec2-user/code/ragas/src/ragas/testset/evolutions.py", line 375, in _acomplex_evolution
simple_question, current_nodes, _ = await self.se._aevolve(
File "/home/ec2-user/code/ragas/src/ragas/testset/evolutions.py", line 288, in _aevolve
passed = await self.node_filter.filter(merged_node)
File "/home/ec2-user/code/ragas/src/ragas/testset/filters.py", line 54, in filter
results = await self.llm.generate(prompt=prompt)
File "/home/ec2-user/code/ragas/src/ragas/llms/base.py", line 92, in generate
return await agenerate_text_with_retry(
File "/home/ec2-user/code/parakeet/.venv/lib/python3.9/site-packages/tenacity/_asyncio.py", line 88, in
async_wrapped
return await fn(*args, **kwargs)
File "/home/ec2-user/code/parakeet/.venv/lib/python3.9/site-packages/tenacity/_asyncio.py", line 47, in
__call__
do = self.iter(retry_state=retry_state)
File "/home/ec2-user/code/parakeet/.venv/lib/python3.9/site-packages/tenacity/__init__.py", line 325, in
iter
raise retry_exc.reraise()
File "/home/ec2-user/code/parakeet/.venv/lib/python3.9/site-packages/tenacity/__init__.py", line 158, in
reraise
raise self.last_attempt.result()
File "/home/ec2-user/.pyenv/versions/3.9.13/lib/python3.9/concurrent/futures/_base.py", line 439, in res
ult
return self.__get_result()
File "/home/ec2-user/.pyenv/versions/3.9.13/lib/python3.9/concurrent/futures/_base.py", line 391, in __g
et_result
raise self._exception
File "/home/ec2-user/code/parakeet/.venv/lib/python3.9/site-packages/tenacity/_asyncio.py", line 50, in
__call__
result = await fn(*args, **kwargs)
File "/home/ec2-user/code/ragas/src/ragas/llms/base.py", line 176, in agenerate_text
result = await self.langchain_llm.agenerate_prompt(
File "/home/ec2-user/code/parakeet/.venv/lib/python3.9/site-packages/langchain_core/language_models/chat
_models.py", line 554, in agenerate_prompt
return await self.agenerate(
File "/home/ec2-user/code/parakeet/.venv/lib/python3.9/site-packages/langchain_core/language_models/chat
_models.py", line 514, in agenerate
raise exceptions[0]
File "/home/ec2-user/code/parakeet/.venv/lib/python3.9/site-packages/langchain_core/language_models/chat
_models.py", line 630, in _agenerate_with_cache
result = await self._agenerate(
File "/home/ec2-user/code/parakeet/.venv/lib/python3.9/site-packages/langchain_google_genai/chat_models.
py", line 574, in _agenerate
response: genai.types.GenerateContentResponse = await _achat_with_retry(
File "/home/ec2-user/code/parakeet/.venv/lib/python3.9/site-packages/langchain_google_genai/chat_models.
py", line 185, in _achat_with_retry
return await _achat_with_retry(**kwargs)
File "/home/ec2-user/code/parakeet/.venv/lib/python3.9/site-packages/tenacity/_asyncio.py", line 88, in
async_wrapped
return await fn(*args, **kwargs)
File "/home/ec2-user/code/parakeet/.venv/lib/python3.9/site-packages/tenacity/_asyncio.py", line 47, in
__call__
do = self.iter(retry_state=retry_state)
File "/home/ec2-user/code/parakeet/.venv/lib/python3.9/site-packages/tenacity/__init__.py", line 314, in
iter
return fut.result()
File "/home/ec2-user/.pyenv/versions/3.9.13/lib/python3.9/concurrent/futures/_base.py", line 439, in res
ult
return self.__get_result()
File "/home/ec2-user/.pyenv/versions/3.9.13/lib/python3.9/concurrent/futures/_base.py", line 391, in __g
et_result
raise self._exception
File "/home/ec2-user/code/parakeet/.venv/lib/python3.9/site-packages/tenacity/_asyncio.py", line 50, in
__call__
result = await fn(*args, **kwargs)
File "/home/ec2-user/code/parakeet/.venv/lib/python3.9/site-packages/langchain_google_genai/chat_models.
py", line 183, in _achat_with_retry
raise e
File "/home/ec2-user/code/parakeet/.venv/lib/python3.9/site-packages/langchain_google_genai/chat_models.
py", line 176, in _achat_with_retry
return await generation_method(**kwargs)
File "/home/ec2-user/code/parakeet/.venv/lib/python3.9/site-packages/google/generativeai/generative_mode
ls.py", line 410, in send_message_async
response = await self.model.generate_content_async(
File "/home/ec2-user/code/parakeet/.venv/lib/python3.9/site-packages/google/generativeai/generative_mode
ls.py", line 275, in generate_content_async
response = await self._async_client.generate_content(request)
return await retry_target(
File "/home/ec2-user/code/parakeet/.venv/lib/python3.9/site-packages/google/api_core/retry/retry_unary_a
sync.py", line 160, in retry_target
_retry_error_helper(
File "/home/ec2-user/code/parakeet/.venv/lib/python3.9/site-packages/google/api_core/retry/retry_base.py
", line 212, in _retry_error_helper
raise final_exc from source_exc
File "/home/ec2-user/code/parakeet/.venv/lib/python3.9/site-packages/google/api_core/retry/retry_unary_a
sync.py", line 155, in retry_target
return await target()
File "/home/ec2-user/code/parakeet/.venv/lib/python3.9/site-packages/google/api_core/grpc_helpers_async.
py", line 85, in __await__
response = yield from self._call.__await__()
File "/home/ec2-user/code/parakeet/.venv/lib/python3.9/site-packages/grpc/aio/_call.py", line 299, in __
await__
response = yield from self._call_response
RuntimeError: Task <Task pending name='Task-19654' coro=<BaseChatModel._agenerate_with_cache() running at
/home/ec2-user/code/parakeet/.venv/lib/python3.9/site-packages/langchain_core/language_models/chat_models.
py:630> cb=[_gather.<locals>._done_callback() at /home/ec2-user/.pyenv/versions/3.9.13/lib/python3.9/async
io/tasks.py:767]> got Future <Task pending name='Task-19655' coro=<UnaryUnaryCall._invoke() running at /ho
me/ec2-user/code/parakeet/.venv/lib/python3.9/site-packages/grpc/aio/_call.py:568>> attached to a differen
t loop
Traceback (most recent call last):
File "/home/ec2-user/code/parakeet/parakeet/components/ragas_gen_testset.py", line 84, in <module>
testset = generator.generate_with_langchain_docs(
File "/home/ec2-user/code/ragas/src/ragas/testset/generator.py", line 223, in generate_with_langchain_do
cs
return self.generate(
File "/home/ec2-user/code/ragas/src/ragas/testset/generator.py", line 316, in generate
raise ExceptionInRunner()
ragas.exceptions.ExceptionInRunner: The runner thread which was running the jobs raised an exeception. Rea
d the traceback above to debug it. You can also pass `raise_exceptions=False` incase you want to show only
a warning message instead.
Expected behavior Don't throw error and don't get stuck.
Additional context After getting this, my langchain's sqlite3 cache always gets corrupted i.e. next cached embedding access never completes. However the db integration check reports ok.
I can consistently reproduce with a call to evaluate from a Jupyter notebook, with or without is_async=True.
Exception in thread Thread-83:
Traceback (most recent call last):
File "/Users/rbellamy/.pyenv/versions/3.11.6/lib/python3.11/threading.py", line 1045, in _bootstrap_inner
self.run()
File "/Users/rbellamy/test/.venv/lib/python3.11/site-packages/ragas/executor.py", line 93, in run
results = self.loop.run_until_complete(self._aresults())
...
<SNIP/>
...
RuntimeError: Task <Task pending name='Task-178' coro=<as_completed.<locals>.sema_coro()
running at /Users/rbellamy/test/.venv/lib/python3.11/site-packages/ragas/executor.py:36>
cb=[as_completed.<locals>._on_completion() at /Users/rbellamy/.pyenv/versions/3.11.6/lib/python3.11/asyncio/tasks.py:602]>
got Future <Task pending name='Task-227'
coro=<UnaryUnaryCall._invoke() running at /Users/rbellamy/test/.venv/lib/python3.11/site-packages/grpc/aio/_call.py:568>> attached to a different loop
Calling code:
def get_eval_chain_results(question, answer, ground_truth_answer, llm, embeddings, splitter):
# testset = get_testset_data(answer["source_documents"], llm, embeddings, splitter)
features = Features(
{
"question": Value(dtype="string", id=None),
"answer": Value(dtype="string", id=None),
"contexts": Sequence(feature=Value(dtype="string", id=None)),
}
)
mapping = {
"question": [question],
"answer": [answer["answer"]],
"contexts": [[source.page_content for source in answer["source_documents"]]],
}
metrics = [
answer_relevancy,
context_relevancy,
faithfulness,
]
if ground_truth_answer is not None:
features["ground_truths"] = Sequence(feature=Value(dtype="string", id=None))
mapping["ground_truths"] = [[ground_truth_answer]]
metrics.extend([answer_similarity, context_precision, context_recall])
try:
results = evaluate(
dataset=Dataset.from_dict(
mapping=mapping,
features=features,
),
metrics=metrics,
llm=LangchainLLMWrapper(llm),
embeddings=embeddings,
is_async=False,
)
except Exception as e:
results = {
"answer_relevancy": float("nan"),
"context_relevancy": float("nan"),
"faithfulness": float("nan"),
"answer_similarity": float("nan"),
"context_precision": float("nan"),
"context_recall": float("nan"),
}
print(f"Exception occurred while evaluating question: {question}")
print(f"answer: {answer['answer']}")
print(f"Error: {e}")
return results
RC1 didn't have this issue.
#689 seems to fix this issue since I can no longer reproduce the error, but I don't know from a theory & technical point of view it is really the correct fix or not. Haven't had time to verfiy exactly
@rbellamy could that have anything to do with https://github.com/jupyter/jupyter_console/issues/241?
I do not. I think it's more likely something like the issue outlined in using-autoawait-in-a-notebook-ipykernel, and https://github.com/ipython/ipython/issues/11338.
I've followed those directions and things are working with:
import nest_asyncio
nest_asyncio.apply()
The nest_asyncio fix no longer works for me. The symptoms in Jupyter are that the first evaluation will complete, but all subsequent calls to evaluate will fail with some kind of deadlock or race condition in the event loop.
I've tried a bunch of different methods of initiating the loop in my copy of executor.py without appreciable effect. I wish I had more time to look into this - I really want to be able to use this tool during evaluations. It's hard when I can't reliably run it in Jupyter.
hey @joy13975 I took inspiration from your code and removed the runner as you suggested in the above PR. closing this one for now. Thanks a lot for your input ❤️ .
if you get some time to look at that PR I would really appreciate it too!