nv-ingest icon indicating copy to clipboard operation
nv-ingest copied to clipboard

[BUG]: Ingestion Issue: Subsequent Document Upload Re-ingests Previous Document with Significant Delay

Open saikatscalers opened this issue 4 months ago • 3 comments

Version

25.6.2

Which installation method(s) does this occur on?

Docker

Describe the bug.

Problem Description:

When attempting to upload documents sequentially to Milvus using nv-ingest's vdb_upload functionality, subsequent document uploads immediately after a successful first upload result in the re-ingestion of the first document instead of the new one. A significant delay of approximately 8-10 minutes is required after the initial document's ingestion (even after it's queryable in Milvus via pymilvus and visible in Attu) before a new, distinct document can be successfully ingested. If a new document is attempted before this ~10-minute window, the system erroneously re-ingests the previously uploaded document.

This behavior does not occur when directly uploading data using pymilvus, suggesting an issue specific to nv-ingest's internal state management or job finalization.

Steps to Reproduce:

  1. Start the nv-ingest services (e.g., via docker-compose up -d).
  2. Using the provided Python client code snippet below, perform an ingestion of document_A.pdf.
  3. Verify document_A.pdf is successfully ingested into Milvus and queryable.
  4. Immediately after the successful ingestion of document_A.pdf, attempt to ingest document_B.pdf using the same client code.
  5. Observe that document_A.pdf is re-ingested instead of document_B.pdf.
  6. Wait approximately 8-10 minutes after document_A.pdf's initial ingestion is complete.
  7. Attempt to ingest document_B.pdf again.
  8. Observe that document_B.pdf is now successfully ingested.

Expected Behavior:

Each call to the ingestion pipeline with a new file_path (and thus a unique source_id implicit in the file content) should process and ingest the new document immediately without re-ingesting previous documents or requiring a prolonged waiting period between sequential uploads.

Actual Behavior:

A ~10-minute delay occurs after each successful ingestion of a unique document before the nv-ingest service is ready to accept a new, distinct document. Attempts to ingest a new document within this window result in the re-ingestion of the previously processed document.

Code Snippet (Python Client):

import asyncio
import time
# Assuming self.client is an instance of NvIngestClient
# Assuming settings.MILVUS_HOST, settings.MILVUS_PORT, settings.MINIO_ENDPOINT are configured
# Assuming extract_text, extract_tables, etc., are boolean flags
# Assuming dense_dim, recreate_collection, collection_name, file_path are defined

ingestor = (
    Ingestor(client=self.client)
    .files(file_path) # file_path would be unique for each document (e.g., "doc_A.pdf", then "doc_B.pdf")
    .extract(
        extract_text=extract_text,
        extract_tables=extract_tables,
        extract_charts=extract_charts,
        extract_images=extract_images,
        paddle_output_format="markdown",
        extract_infographics=extract_infographics,
        text_depth=text_depth
    ).embed()
    .vdb_upload(
        collection_name=collection_name,
        sparse=False,
        dense_dim=dense_dim,
        recreate=recreate_collection,
        stream=True, # Note: this is for internal batching within a single job, not sequential jobs
        milvus_uri=f"http://{settings.MILVUS_HOST}:{settings.MILVUS_PORT}",
        minio_endpoint=settings.MINIO_ENDPOINT
    )
)

self.logger.info(f"Processing document with collection: {collection_name}")

# Execute the ingestion pipeline
# Run in thread pool to avoid blocking the async event loop
loop = asyncio.get_event_loop()
results = await loop.run_in_executor(None, ingestor.ingest)

Minimum reproducible example

import asyncio
import time
# Assuming self.client is an instance of NvIngestClient
# Assuming settings.MILVUS_HOST, settings.MILVUS_PORT, settings.MINIO_ENDPOINT are configured
# Assuming extract_text, extract_tables, etc., are boolean flags
# Assuming dense_dim, recreate_collection, collection_name, file_path are defined

ingestor = (
    Ingestor(client=self.client)
    .files(file_path) # file_path would be unique for each document (e.g., "doc_A.pdf", then "doc_B.pdf")
    .extract(
        extract_text=extract_text,
        extract_tables=extract_tables,
        extract_charts=extract_charts,
        extract_images=extract_images,
        paddle_output_format="markdown",
        extract_infographics=extract_infographics,
        text_depth=text_depth
    ).embed()
    .vdb_upload(
        collection_name=collection_name,
        sparse=False,
        dense_dim=dense_dim,
        recreate=recreate_collection,
        stream=True, # Note: this is for internal batching within a single job, not sequential jobs
        milvus_uri=f"http://{settings.MILVUS_HOST}:{settings.MILVUS_PORT}",
        minio_endpoint=settings.MINIO_ENDPOINT
    )
)

self.logger.info(f"Processing document with collection: {collection_name}")

# Execute the ingestion pipeline
# Run in thread pool to avoid blocking the async event loop
loop = asyncio.get_event_loop()
results = await loop.run_in_executor(None, ingestor.ingest)

Relevant log output

nv-ingest-ms-runtime  | (MetadataInjectionStage pid=13691) Fatal Python error: PyGILState_Release: auto-releasing thread-state, but no thread-state for this thread [repeated 16x across cluster]
nv-ingest-ms-runtime  | (MetadataInjectionStage pid=13691) Python runtime state: initialized [repeated 16x across cluster]
nv-ingest-ms-runtime  | (MetadataInjectionStage pid=13691) Thread 0x000078c150a4d740 (most recent call first): [repeated 32x across cluster]
nv-ingest-ms-runtime  | (MetadataInjectionStage pid=13691)   <no Python frame> [repeated 15x across cluster]
nv-ingest-ms-runtime  | (MetadataInjectionStage pid=13691)   File "/opt/conda/envs/nv_ingest_runtime/lib/python3.12/site-packages/ray/_private/worker.py", line 946 in main_loop [repeated 15x across cluster]
nv-ingest-ms-runtime  | (MetadataInjectionStage pid=13691)   File "/opt/conda/envs/nv_ingest_runtime/lib/python3.12/site-packages/ray/_private/workers/default_worker.py", line 330 in <module> [repeated 15x across cluster]
nv-ingest-ms-runtime  | (PDFExtractorStage pid=17157) 
nv-ingest-ms-runtime  | (PDFExtractorStage pid=17157) 
nv-ingest-ms-runtime  | (PDFExtractorStage pid=17157) Fatal Python error: PyGILState_Release: auto-releasing thread-state, but no thread-state for this thread
nv-ingest-ms-runtime  | (PDFExtractorStage pid=17157) Python runtime state: initialized
nv-ingest-ms-runtime  | (PDFExtractorStage pid=17157) Thread 0x00007f754a673740 (most recent call first): [repeated 2x across cluster]
nv-ingest-ms-runtime  | (PDFExtractorStage pid=17157)   <no Python frame>
nv-ingest-ms-runtime  | (PDFExtractorStage pid=17157)   File "/opt/conda/envs/nv_ingest_runtime/lib/python3.12/site-packages/ray/_private/worker.py", line 946 in main_loop
nv-ingest-ms-runtime  | (PDFExtractorStage pid=17157)   File "/opt/conda/envs/nv_ingest_runtime/lib/python3.12/site-packages/ray/_private/workers/default_worker.py", line 330 in <module>
nv-ingest-ms-runtime  | (raylet) WARNING: All log messages before absl::InitializeLog() is called are written to STDERR
nv-ingest-ms-runtime  | (raylet) I0000 00:00:1755081054.478291    3391 chttp2_transport.cc:1182] ipv4:172.18.0.19:38741: Got goaway [2] err=UNAVAILABLE:GOAWAY received; Error code: 2; Debug Text: Cancelling all calls {created_time:"2025-08-13T10:30:54.478276086+00:00", http2_error:2, grpc_status:14}
nv-ingest-ms-runtime  | (ImageDedupStage pid=22070) Fatal Python error: PyGILState_Release: auto-releasing thread-state, but no thread-state for this thread
nv-ingest-ms-runtime  | (ImageDedupStage pid=22070) Python runtime state: initialized
nv-ingest-ms-runtime  | (ImageDedupStage pid=22070) 
nv-ingest-ms-runtime  | (ImageDedupStage pid=22070) Thread 0x000079f55b7fe640 (most recent call first):
nv-ingest-ms-runtime  | (ImageDedupStage pid=22070)   <no Python frame>
nv-ingest-ms-runtime  | (ImageDedupStage pid=22070) 
nv-ingest-ms-runtime  | (ImageDedupStage pid=22070) Thread 0x000079ff4d847740 (most recent call first):
nv-ingest-ms-runtime  | (ImageDedupStage pid=22070)   File "/opt/conda/envs/nv_ingest_runtime/lib/python3.12/site-packages/ray/_private/worker.py", line 946 in main_loop
nv-ingest-ms-runtime  | (ImageDedupStage pid=22070)   File "/opt/conda/envs/nv_ingest_runtime/lib/python3.12/site-packages/ray/_private/workers/default_worker.py", line 330 in <module>

Other/Misc.

No response

saikatscalers avatar Aug 13 '25 12:08 saikatscalers

I am also facing the same issue. Can anyone please check.

Rohith-Scalers avatar Sep 01 '25 10:09 Rohith-Scalers

please check and close the issues asap

saikatscalers avatar Sep 02 '25 09:09 saikatscalers

Update: Findings and Temporary Workaround

Hi all,

I wanted to post an update with a finding and a temporary workaround for this issue.


Diagnosis

After some more investigation, it appears the issue stems from old tasks persisting in the Redis queue. When an ingestion configuration is changed, nv-ingest seems to pick up the old, queued tasks instead of generating new ones based on the updated configuration. This prevents a fresh ingestion from occurring.


Temporary Workaround

As a temporary solution, I've found that clearing the Redis queue after each ingestion forces the system to start fresh on the next run, correctly applying the new configuration.

I implemented this by adding a flushall method to the class that manages the Redis client. By calling this method after an ingestion process completes, it ensures the next run is always clean.

async def flushall(self) -> bool:
    """Clear all Redis databases (FLUSHALL)."""
    try:
        # Note: Added 'await' as the function is async.
        result = await self.client.flushall()
        self.logger.info("Redis FLUSHALL executed - ALL cache and tasks cleared")
        return True
    except Exception as err:
        self.logger.error("Error executing Redis FLUSHALL: %s", err)
        return False

This is obviously a "sledgehammer" approach and not a proper fix, but it serves as an effective temporary workaround and might be useful for others encountering this problem.

Hopefully, this information helps narrow down the root cause of why the task queue isn't being invalidated or updated correctly when configurations change. Let me know if any more information would be helpful.

saikatscalers avatar Sep 05 '25 12:09 saikatscalers