kernel-memory icon indicating copy to clipboard operation
kernel-memory copied to clipboard

[Bug] Endless pipeline loop when trying to import corrupted document

Open marcominerva opened this issue 1 year ago • 15 comments

Context / Scenario

In service mode (using a queue), when trying to import a corrupted document (i.e., invalid PDF file), decoder will throw an exception, but then the message will be put again in the queue, generating an endless loop:

warn: Microsoft.KernelMemory.Pipeline.Queue.DevTools.SimpleQueues[0]
      Message '20240416.100334.6972903.f0f10e85d780488d927f6c03891d20d5' processing failed with exception, 
      putting message back in the queue.
      Message content: {"index":"default","document_id":"98d5b5a2d38e4cf49dbda5b50c4502c2202404161003346000416","execution_id":"e6f9118e170745168f1d0bc1a9cd39b5","steps":["extract","partition","gen_embeddings","save_records"]}

What happened?

If decoding fails with an exception, I expect that the document will be marked as failed to process and not process again. I see also that in the DataPipelineStatus class, there is a Failed property, but it is always false:

https://github.com/microsoft/kernel-memory/blob/babedc0ef8aeceacc61d607f2fe5fd43d27aa3f8/service/Abstractions/Pipeline/DataPipeline.cs#L432-L448

Importance

I cannot use Kernel Memory

Platform, Language, Versions

Kernel Memory v0.36.240415.2

Relevant log output

dbug: Microsoft.KernelMemory.Handlers.TextExtractionHandler[0]
      Extracting text, pipeline 'default/577dbc0439a74a0b8a4996be204b45e3202404161021070293739'
dbug: Microsoft.KernelMemory.Handlers.TextExtractionHandler[0]
      Extracting text from file 'Iceberg.pdf' mime type 'application/pdf' using extractor 'Microsoft.KernelMemory.DataFormats.Pdf.PdfDecoder'
dbug: Microsoft.KernelMemory.DataFormats.Pdf.PdfDecoder[0]
      Extracting text from PDF file
warn: Microsoft.KernelMemory.Pipeline.Queue.DevTools.SimpleQueues[0]
      Message '20240416.102107.1238584.0a0332828e5f4538ba32bc09b4998acc' processing failed with exception, putting message back in the queue. Message content: {"index":"default","document_id":"577dbc0439a74a0b8a4996be204b45e3202404161021070293739","execution_id":"86d031771dc445d1bd085f110075266e","steps":["extract","partition","gen_embeddings","save_records"]}
      UglyToad.PdfPig.Core.PdfDocumentFormatException: Could not find the version header comment at the start of the document.
         at UglyToad.PdfPig.Parser.FileStructure.FileHeaderParser.Parse(ISeekableTokenScanner scanner, IInputBytes inputBytes, Boolean isLenientParsing, ILog log)
         at UglyToad.PdfPig.Parser.PdfDocumentFactory.OpenDocument(IInputBytes inputBytes, ISeekableTokenScanner scanner, InternalParsingOptions parsingOptions)
         at UglyToad.PdfPig.Parser.PdfDocumentFactory.Open(IInputBytes inputBytes, ParsingOptions options)
         at UglyToad.PdfPig.Parser.PdfDocumentFactory.Open(Stream stream, ParsingOptions options)
         at UglyToad.PdfPig.PdfDocument.Open(Stream stream, ParsingOptions options)
         at Microsoft.KernelMemory.DataFormats.Pdf.PdfDecoder.DecodeAsync(Stream data, CancellationToken cancellationToken)
         at Microsoft.KernelMemory.DataFormats.Pdf.PdfDecoder.DecodeAsync(BinaryData data, CancellationToken cancellationToken)
         at Microsoft.KernelMemory.Handlers.TextExtractionHandler.ExtractTextAsync(FileDetails uploadedFile, BinaryData fileContent, CancellationToken cancellationToken)
         at Microsoft.KernelMemory.Handlers.TextExtractionHandler.InvokeAsync(DataPipeline pipeline, CancellationToken cancellationToken)
         at Microsoft.KernelMemory.Pipeline.DistributedPipelineOrchestrator.RunPipelineStepAsync(DataPipeline pipeline, IPipelineStepHandler handler, CancellationToken cancellationToken)
         at Microsoft.KernelMemory.Pipeline.DistributedPipelineOrchestrator.<>c__DisplayClass5_0.<<AddHandlerAsync>b__0>d.MoveNext()
      --- End of stack trace from previous location ---
         at Microsoft.KernelMemory.Pipeline.Queue.DevTools.SimpleQueues.<>c__DisplayClass19_0.<<OnDequeue>b__0>d.MoveNext()
info: Microsoft.KernelMemory.Pipeline.Queue.DevTools.SimpleQueues[0]
      Message received
dbug: Microsoft.KernelMemory.Handlers.TextExtractionHandler[0]
      Extracting text, pipeline 'default/577dbc0439a74a0b8a4996be204b45e3202404161021070293739'
dbug: Microsoft.KernelMemory.Handlers.TextExtractionHandler[0]
      Extracting text from file 'Iceberg.pdf' mime type 'application/pdf' using extractor 'Microsoft.KernelMemory.DataFormats.Pdf.PdfDecoder'
dbug: Microsoft.KernelMemory.DataFormats.Pdf.PdfDecoder[0]
      Extracting text from PDF file
warn: Microsoft.KernelMemory.Pipeline.Queue.DevTools.SimpleQueues[0]
      Message '20240416.102107.1238584.0a0332828e5f4538ba32bc09b4998acc' processing failed with exception, putting message back in the queue. Message content: {"index":"default","document_id":"577dbc0439a74a0b8a4996be204b45e3202404161021070293739","execution_id":"86d031771dc445d1bd085f110075266e","steps":["extract","partition","gen_embeddings","save_records"]}

marcominerva avatar Apr 16 '24 08:04 marcominerva

Does this happen only with SimpleQueues? By design SimpleQueues doesn't support poison queues, so that at dev time one can debug without having to worry about the number of retries.

With AzureQueue and RabbitMQ the number of retries should be capped, so that eventually the service stops retrying.

dluc avatar Apr 16 '24 17:04 dluc

I have tried with Azure Queues, I have verified that the message is moved to a poison queue after a certain number of retries. However, the number of retries is hard-coded:

https://github.com/microsoft/kernel-memory/blob/f266b25b1c385871abc6253a93c21b44fdf16837/extensions/AzureQueues/AzureQueuesPipeline.cs#L48

What do you think about adding this parameter in AzureQueuesConfig?

For what concerns RabbitMQ, it seems that the message is always put in the queue again:

https://github.com/microsoft/kernel-memory/blob/f266b25b1c385871abc6253a93c21b44fdf16837/extensions/RabbitMQ/RabbitMQPipeline.cs#L105-L143

marcominerva avatar Apr 17 '24 07:04 marcominerva

no problem about making it configurable if it helps. If I remember correctly Azure Queues uses a count to decide when to discard a message, while RabbitMQ uses an expiration date, time to live. I would check the logs for this message "Message '{0}' received, expires at {1}" and see if the message actually expires.

dluc avatar Apr 17 '24 16:04 dluc

If I correctly understand the meaning of the requeue parameter:

https://github.com/microsoft/kernel-memory/blob/f266b25b1c385871abc6253a93c21b44fdf16837/extensions/RabbitMQ/RabbitMQPipeline.cs#L139

It enqueue the a new message again in the queue, with a new expiration time, endlessly.

marcominerva avatar Apr 17 '24 17:04 marcominerva

By the way, there are some issues in RabbitMQPipeline:

https://github.com/microsoft/kernel-memory/blob/f266b25b1c385871abc6253a93c21b44fdf16837/extensions/RabbitMQ/RabbitMQPipeline.cs#L111

image image

It seems that all the properties of BasicProperties are null.

marcominerva avatar Apr 18 '24 08:04 marcominerva

@dluc I have made a PR to set the missing properties in RabbitMQ: https://github.com/microsoft/kernel-memory/pull/454.

After that, I think that also in RabbitMQPipeline we should handle a max retries number and poison queues like in AzureQueuePipeline. In case of RabbitMQ, it seems that the correct approach is based on Quorum Queues and x-delivery-limit: https://www.rabbitmq.com/blog/2020/04/20/rabbitmq-gets-an-ha-upgrade#quorum-queues-in-the-management-ui.

marcominerva avatar May 02 '24 10:05 marcominerva

@dluc I have made a PR to set the missing properties in RabbitMQ: #454.

After that, I think that also in RabbitMQPipeline we should handle a max retries number and poison queues like in AzureQueuePipeline. In case of RabbitMQ, it seems that the correct approach is based on Quorum Queues and x-delivery-limit: https://www.rabbitmq.com/blog/2020/04/20/rabbitmq-gets-an-ha-upgrade#quorum-queues-in-the-management-ui.

There's a couple of approaches, IIRC all of them require multiple queues per queue. Worth reading:

  • https://medium.com/nmc-techblog/re-routing-messages-with-delay-in-rabbitmq-4a52185f5098
  • https://dockyard.com/blog/2021/01/14/make-rabbitmq-retries-easier-with-delayed-message-exchange-and-elixir-s-broadway
  • https://dev.to/bhimani07/rabbitmq-retry-strategy-delay-with-max-retry-threshold-50o6

dluc avatar May 06 '24 18:05 dluc

So, do you prefer using such solutions instead of the quorum queue?

marcominerva avatar May 07 '24 08:05 marcominerva

So, do you prefer using such solutions instead of the quorum queue?

quorum queues don't seem to offer a delayed delivery, do they?

When a message delivery fails we need to allow a configurable retry strategy, e.g.

  • retry 10 times, waiting 1 minute
  • retry for 24 hours, using exponential backoff

AFAIK with RabbitMQ it will require multiple queues.

dluc avatar May 07 '24 20:05 dluc

I have understood, so at least we need something like this, right?

https://stackoverflow.com/a/73358042/1728189

marcominerva avatar May 08 '24 13:05 marcominerva

Hi @dluc! Finally, I can return to this issue.

What do you think if we start implementing also in RabbitMQ the same behavior we have in Azure Queues?

https://github.com/microsoft/kernel-memory/blob/8c0ad8c144c5b9b2b8f1fa0de72ba54e2aa723a0/extensions/AzureQueues/AzureQueuesPipeline.cs#L192-L231

marcominerva avatar Jun 03 '24 08:06 marcominerva

hey @marcominerva that would be nice, if possible. It might be a lot of work considering the extra queues to introduce - so, before starting, do you think we could break it down in multiple steps, to have separate PRs?

dluc avatar Jun 03 '24 21:06 dluc

Yes. I can start, for example, creating the extra queues and then making a draft PR, so you can see the work in progress. Do you agree?

marcominerva avatar Jun 04 '24 08:06 marcominerva

Yes. I can start, for example, creating the extra queues and then making a draft PR, so you can see the work in progress. Do you agree?

yes please go ahead - what would be the name of the queues? I'm assuming when missing, if deleted, these extra queues will be automatically recreated, without causing errors.

dluc avatar Jun 04 '24 17:06 dluc

Actually, adding poison queues support like in Azure Queues has required a smaller amount of changes than I thought, so I have make a single PR with all the changes: https://github.com/microsoft/kernel-memory/pull/648.

Let me know if it is OK, or do you prefer to split it in any case.

marcominerva avatar Jun 05 '24 15:06 marcominerva

I think we can close this now. All pipeline modes support dead lettering (and backoff) to avoid infinite retries.

dluc avatar Oct 08 '24 02:10 dluc