[BUG]: Ingestion Issue: Subsequent Document Upload Re-ingests Previous Document with Significant Delay
Version
25.6.2
Which installation method(s) does this occur on?
Docker
Describe the bug.
Problem Description:
When attempting to upload documents sequentially to Milvus using nv-ingest's vdb_upload functionality, subsequent document uploads immediately after a successful first upload result in the re-ingestion of the first document instead of the new one. A significant delay of approximately 8-10 minutes is required after the initial document's ingestion (even after it's queryable in Milvus via pymilvus and visible in Attu) before a new, distinct document can be successfully ingested. If a new document is attempted before this ~10-minute window, the system erroneously re-ingests the previously uploaded document.
This behavior does not occur when directly uploading data using pymilvus, suggesting an issue specific to nv-ingest's internal state management or job finalization.
Steps to Reproduce:
- Start the
nv-ingestservices (e.g., viadocker-compose up -d). - Using the provided Python client code snippet below, perform an ingestion of
document_A.pdf. - Verify
document_A.pdfis successfully ingested into Milvus and queryable. - Immediately after the successful ingestion of
document_A.pdf, attempt to ingestdocument_B.pdfusing the same client code. - Observe that
document_A.pdfis re-ingested instead ofdocument_B.pdf. - Wait approximately 8-10 minutes after
document_A.pdf's initial ingestion is complete. - Attempt to ingest
document_B.pdfagain. - Observe that
document_B.pdfis now successfully ingested.
Expected Behavior:
Each call to the ingestion pipeline with a new file_path (and thus a unique source_id implicit in the file content) should process and ingest the new document immediately without re-ingesting previous documents or requiring a prolonged waiting period between sequential uploads.
Actual Behavior:
A ~10-minute delay occurs after each successful ingestion of a unique document before the nv-ingest service is ready to accept a new, distinct document. Attempts to ingest a new document within this window result in the re-ingestion of the previously processed document.
Code Snippet (Python Client):
import asyncio
import time
# Assuming self.client is an instance of NvIngestClient
# Assuming settings.MILVUS_HOST, settings.MILVUS_PORT, settings.MINIO_ENDPOINT are configured
# Assuming extract_text, extract_tables, etc., are boolean flags
# Assuming dense_dim, recreate_collection, collection_name, file_path are defined
ingestor = (
Ingestor(client=self.client)
.files(file_path) # file_path would be unique for each document (e.g., "doc_A.pdf", then "doc_B.pdf")
.extract(
extract_text=extract_text,
extract_tables=extract_tables,
extract_charts=extract_charts,
extract_images=extract_images,
paddle_output_format="markdown",
extract_infographics=extract_infographics,
text_depth=text_depth
).embed()
.vdb_upload(
collection_name=collection_name,
sparse=False,
dense_dim=dense_dim,
recreate=recreate_collection,
stream=True, # Note: this is for internal batching within a single job, not sequential jobs
milvus_uri=f"http://{settings.MILVUS_HOST}:{settings.MILVUS_PORT}",
minio_endpoint=settings.MINIO_ENDPOINT
)
)
self.logger.info(f"Processing document with collection: {collection_name}")
# Execute the ingestion pipeline
# Run in thread pool to avoid blocking the async event loop
loop = asyncio.get_event_loop()
results = await loop.run_in_executor(None, ingestor.ingest)
Minimum reproducible example
import asyncio
import time
# Assuming self.client is an instance of NvIngestClient
# Assuming settings.MILVUS_HOST, settings.MILVUS_PORT, settings.MINIO_ENDPOINT are configured
# Assuming extract_text, extract_tables, etc., are boolean flags
# Assuming dense_dim, recreate_collection, collection_name, file_path are defined
ingestor = (
Ingestor(client=self.client)
.files(file_path) # file_path would be unique for each document (e.g., "doc_A.pdf", then "doc_B.pdf")
.extract(
extract_text=extract_text,
extract_tables=extract_tables,
extract_charts=extract_charts,
extract_images=extract_images,
paddle_output_format="markdown",
extract_infographics=extract_infographics,
text_depth=text_depth
).embed()
.vdb_upload(
collection_name=collection_name,
sparse=False,
dense_dim=dense_dim,
recreate=recreate_collection,
stream=True, # Note: this is for internal batching within a single job, not sequential jobs
milvus_uri=f"http://{settings.MILVUS_HOST}:{settings.MILVUS_PORT}",
minio_endpoint=settings.MINIO_ENDPOINT
)
)
self.logger.info(f"Processing document with collection: {collection_name}")
# Execute the ingestion pipeline
# Run in thread pool to avoid blocking the async event loop
loop = asyncio.get_event_loop()
results = await loop.run_in_executor(None, ingestor.ingest)
Relevant log output
nv-ingest-ms-runtime | (MetadataInjectionStage pid=13691) Fatal Python error: PyGILState_Release: auto-releasing thread-state, but no thread-state for this thread [repeated 16x across cluster]
nv-ingest-ms-runtime | (MetadataInjectionStage pid=13691) Python runtime state: initialized [repeated 16x across cluster]
nv-ingest-ms-runtime | (MetadataInjectionStage pid=13691) Thread 0x000078c150a4d740 (most recent call first): [repeated 32x across cluster]
nv-ingest-ms-runtime | (MetadataInjectionStage pid=13691) <no Python frame> [repeated 15x across cluster]
nv-ingest-ms-runtime | (MetadataInjectionStage pid=13691) File "/opt/conda/envs/nv_ingest_runtime/lib/python3.12/site-packages/ray/_private/worker.py", line 946 in main_loop [repeated 15x across cluster]
nv-ingest-ms-runtime | (MetadataInjectionStage pid=13691) File "/opt/conda/envs/nv_ingest_runtime/lib/python3.12/site-packages/ray/_private/workers/default_worker.py", line 330 in <module> [repeated 15x across cluster]
nv-ingest-ms-runtime | (PDFExtractorStage pid=17157)
nv-ingest-ms-runtime | (PDFExtractorStage pid=17157)
nv-ingest-ms-runtime | (PDFExtractorStage pid=17157) Fatal Python error: PyGILState_Release: auto-releasing thread-state, but no thread-state for this thread
nv-ingest-ms-runtime | (PDFExtractorStage pid=17157) Python runtime state: initialized
nv-ingest-ms-runtime | (PDFExtractorStage pid=17157) Thread 0x00007f754a673740 (most recent call first): [repeated 2x across cluster]
nv-ingest-ms-runtime | (PDFExtractorStage pid=17157) <no Python frame>
nv-ingest-ms-runtime | (PDFExtractorStage pid=17157) File "/opt/conda/envs/nv_ingest_runtime/lib/python3.12/site-packages/ray/_private/worker.py", line 946 in main_loop
nv-ingest-ms-runtime | (PDFExtractorStage pid=17157) File "/opt/conda/envs/nv_ingest_runtime/lib/python3.12/site-packages/ray/_private/workers/default_worker.py", line 330 in <module>
nv-ingest-ms-runtime | (raylet) WARNING: All log messages before absl::InitializeLog() is called are written to STDERR
nv-ingest-ms-runtime | (raylet) I0000 00:00:1755081054.478291 3391 chttp2_transport.cc:1182] ipv4:172.18.0.19:38741: Got goaway [2] err=UNAVAILABLE:GOAWAY received; Error code: 2; Debug Text: Cancelling all calls {created_time:"2025-08-13T10:30:54.478276086+00:00", http2_error:2, grpc_status:14}
nv-ingest-ms-runtime | (ImageDedupStage pid=22070) Fatal Python error: PyGILState_Release: auto-releasing thread-state, but no thread-state for this thread
nv-ingest-ms-runtime | (ImageDedupStage pid=22070) Python runtime state: initialized
nv-ingest-ms-runtime | (ImageDedupStage pid=22070)
nv-ingest-ms-runtime | (ImageDedupStage pid=22070) Thread 0x000079f55b7fe640 (most recent call first):
nv-ingest-ms-runtime | (ImageDedupStage pid=22070) <no Python frame>
nv-ingest-ms-runtime | (ImageDedupStage pid=22070)
nv-ingest-ms-runtime | (ImageDedupStage pid=22070) Thread 0x000079ff4d847740 (most recent call first):
nv-ingest-ms-runtime | (ImageDedupStage pid=22070) File "/opt/conda/envs/nv_ingest_runtime/lib/python3.12/site-packages/ray/_private/worker.py", line 946 in main_loop
nv-ingest-ms-runtime | (ImageDedupStage pid=22070) File "/opt/conda/envs/nv_ingest_runtime/lib/python3.12/site-packages/ray/_private/workers/default_worker.py", line 330 in <module>
Other/Misc.
No response
I am also facing the same issue. Can anyone please check.
please check and close the issues asap
Update: Findings and Temporary Workaround
Hi all,
I wanted to post an update with a finding and a temporary workaround for this issue.
Diagnosis
After some more investigation, it appears the issue stems from old tasks persisting in the Redis queue. When an ingestion configuration is changed, nv-ingest seems to pick up the old, queued tasks instead of generating new ones based on the updated configuration. This prevents a fresh ingestion from occurring.
Temporary Workaround
As a temporary solution, I've found that clearing the Redis queue after each ingestion forces the system to start fresh on the next run, correctly applying the new configuration.
I implemented this by adding a flushall method to the class that manages the Redis client. By calling this method after an ingestion process completes, it ensures the next run is always clean.
async def flushall(self) -> bool:
"""Clear all Redis databases (FLUSHALL)."""
try:
# Note: Added 'await' as the function is async.
result = await self.client.flushall()
self.logger.info("Redis FLUSHALL executed - ALL cache and tasks cleared")
return True
except Exception as err:
self.logger.error("Error executing Redis FLUSHALL: %s", err)
return False
This is obviously a "sledgehammer" approach and not a proper fix, but it serves as an effective temporary workaround and might be useful for others encountering this problem.
Hopefully, this information helps narrow down the root cause of why the task queue isn't being invalidated or updated correctly when configurations change. Let me know if any more information would be helpful.