When use Langfuse tracing in large Documents pipeline: task_done() called too many times
Describe the bug When use Langfuse tracing in large Documents pipline.
2025-04-11 14:49:08 - langfuse - WARNING - Item exceeds size limit (size: 1736099), dropping input / output / metadata of item until it fits.
2025-04-11 14:49:08 - langfuse - WARNING - Item does not have body or input/output fields, dropping item.
Exception in thread Thread-17:
Traceback (most recent call last):
File "/root/xxx/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
self.run()
File "/root/xxx/lib/python3.10/site-packages/langfuse/_task_manager/ingestion_consumer.py", line 246, in run
2025-04-11 14:49:08 - haystack.core.pipeline.pipeline - INFO - Running component prompt_builder
self.upload()
File "/root/xxx/lib/python3.10/site-packages/langfuse/_task_manager/ingestion_consumer.py", line 261, in upload
self._ingestion_queue.task_done()
File "/root/xxx/lib/python3.10/queue.py", line 75, in task_done
raise ValueError('task_done() called too many times')
ValueError: task_done() called too many times
To Reproduce code:
retrieval_pipeline = AsyncPipeline()
retrieval_pipeline.add_component("tracer", LangfuseConnector("DB"))
Describe your environment (please complete the following information):
langfuse 2.60.2
langfuse-haystack 0.10.1
haystack-ai 2.12.1
Hey @Silence-Well sorry to hear you are having trouble. Could you provide the complete pipeline you are using that causes the issue so it's easier for us to reproduce?
Also show us how you are call your pipeline.
Hey @Silence-Well sorry to hear you are having trouble. Could you provide the complete pipeline you are using that causes the issue so it's easier for us to reproduce?
Also show us how you are call your pipeline.
Hi~ @sjrl After a bunch of tests, I can confirm that this issue pops up when I submit a huge number of documents. When the number of documents is small, the entire program runs without errors. My whole pipeline is pretty complex, and I’ve tried to share a portion of it—thanks so much for your help!
I have two pipelines:
- retrieval_pipeline
- generation_pipeline
The program first runs the retrieval_pipeline, and after getting the results, it passes them to the generation_pipeline. Since I need to interrupt them in the middle for some business logic, that’s why there are two separate pipelines.
Through some testing, I found that the entire trace for the retrieval_pipeline shows up fine in Langfuse. However, for the generation_pipeline, there’s only one record for message_retriever, and the error gets thrown starting from the line right after "Running component message_retriever."
I tried commenting out the tracking code for the retrieval_pipeline, but the generation_pipeline still throws the same error when I run it, so I’m guessing the issue is with the message_retriever in the generation_pipeline. Langfuse also cuts off tracking at this component.
# init
message_retriever = ChatMessageRetriever(message_store=message_store, last_k=chat_history_count)
generation_message_retriever = ChatMessageRetriever(message_store=message_store, last_k=chat_history_count)
message_writer = ChatMessageWriter(message_store=message_store)
chat_history_db_writer = ChatHistoryDBWriter(db=db)
retrieval_pipeline = AsyncPipeline()
retrieval_pipeline.add_component("tracer", LangfuseConnector("DB"))
retrievers = []
if search_document:
retrievers.append(MilvusEmbeddingRetriever(xxxx))
retrieval_pipeline.add_component("query_rephrase_prompt_builder", PromptBuilder(query_rephrase_template,required_variables=["query"]))
retrieval_pipeline.add_component("query_rephrase_llm", OpenAIGenerator(xxxxxx,streaming_callback=sync_callback)
retrieval_pipeline.add_component("list_to_str_adapter", OutputAdapter(template="{{ replies[0] }}", output_type=str))
retrieval_pipeline.add_component("memory_retriever", message_retriever)
retrieval_pipeline.add_component("text_embedder", text_embedder)
# connections for query rephrasing
retrieval_pipeline.connect("memory_retriever", "query_rephrase_prompt_builder.memories")
retrieval_pipeline.connect("query_rephrase_prompt_builder.prompt", "query_rephrase_llm")
retrieval_pipeline.connect("query_rephrase_llm.replies", "list_to_str_adapter")
retrieval_pipeline.connect("list_to_str_adapter", "text_embedder.text")
for i, retriever in enumerate(retrievers):
component_name = f"retriever_{i}"
retrieval_pipeline.add_component(component_name, retriever)
retrieval_pipeline.connect("text_embedder.embedding", f"{component_name}.query_embedding")
retrieval_pipeline.async_run(xxxx)
--------
generation_pipeline = Pipeline()
generation_pipeline.add_component("tracer", LangfuseConnector("DB"))
generation_pipeline.add_component("prompt_builder", ChatPromptBuilder(xxxx)
generation_pipeline.add_component("llm", OpenAIChatGenerator(xxxxxx,streaming_callback=sync_callback))
# components for memory
generation_pipeline.add_component("message_retriever", generation_message_retriever)
generation_pipeline.add_component("message_writer", message_writer)
generation_pipeline.add_component("message_joiner", ListJoiner(List[ChatMessage]))
generation_pipeline.add_component("chat_history_db_writer", chat_history_db_writer)
generation_pipeline.connect("prompt_builder.prompt", "llm.messages")
generation_pipeline.connect("llm.replies", "message_joiner")
# connections for memory
generation_pipeline.connect("message_joiner", "message_writer")
generation_pipeline.connect("message_retriever", "prompt_builder.chat_history")
generation_pipeline.connect("llm.replies", "chat_history_db_writer.messages")
messages = [
system_message,
user_message
]
generation_pipeline.async_run(xxx)
Hey @Silence-Well thanks for the additional information! A few more follow-up questions:
-
In your statement
I can confirm that this issue pops up when I submit a huge number of documents.
by Documents do you mean retrieved ChatMessages or Haystack Documents?
-
What is a huge number of Documents in this case? ie. What is the exact number you are sending?
-
What is the value of
chat_history_count?
As another test could you try running these pipelines in sync mode? So using the normal Pipeline and not AsyncPipeline? I'd be curious to know if this issue is partly occurring since things are being run in parallel.
Hey @sjrl Sorry for not explaining clearly—let me answer and add to your questions:
- By documents, I mean Haystack Documents.
- My splitting method is by word, with a token count of 128 and an overlap of 20. I set top_k to 20.
- chat_history_count is set to 5, but the exception actually shows up on the first try.
- I tried running it with Pipeline, and the same issue still happens.