azure-kusto-python icon indicating copy to clipboard operation
azure-kusto-python copied to clipboard

Python QueuedIngestClient Report

Open jyk4100 opened this issue 5 months ago • 4 comments

Feature Suggestion

Please correct me if I am wrong.

I came to figure out ways to find out if Kusto ingestion using a Python client succeed or failed. If client (QueuedIngestClient) is initialized with ReportLevel.FailuresAndSuccesses then it should have methods to validate and report failure without having to create and pull from KustoIngestStatusQueues.

It appears there is relatively straightforward on C# (below snip from official doc) Ingest and Validate. However, I am not finding comparable method in the python QueuedIngestClient class.

// Retrieve and validate failures
var ingestionFailures = await client.PeekTopIngestionFailuresAsync();
Ensure.IsTrue(ingestionFailures.Any(), "The failed ingestion should have been reported to the failed ingestions queue");
// Retrieve, delete and validate failures
ingestionFailures = await client.GetAndDiscardTopIngestionFailuresAsync();
Ensure.IsTrue(ingestionFailures.Any(), "The failed ingestion should have been reported to the failed ingestions queue");
// Verify the success has also been reported to the queue
var ingestionSuccesses = await client.GetAndDiscardTopIngestionSuccessesAsync();
Ensure.ConditionIsMet(ingestionSuccesses.Any(), "The successful ingestion should have been reported to the successful ingestions queue");

"Bug" Report

++ I came across this code example but looks like this is an infinite while loop as there is no exit condition based on ` https://github.com/Azure/azure-kusto-python/blob/master/azure-kusto-ingest/tests/sample.py#L133

import pprint
import time
from azure.kusto.ingest.status import KustoIngestStatusQueues

qs = KustoIngestStatusQueues(client)

MAX_BACKOFF = 180

backoff = 1
while True:
    ################### NOTICE ####################
    # in order to get success status updates,
    # make sure ingestion properties set the
    # reportLevel=ReportLevel.FailuresAndSuccesses.
    if qs.success.is_empty() and qs.failure.is_empty():
        time.sleep(backoff)
        backoff = min(backoff * 2, MAX_BACKOFF)
        print("No new messages. backing off for {} seconds".format(backoff))
        continue

    backoff = 1

    success_messages = qs.success.pop(10)
    failure_messages = qs.failure.pop(10)

    pprint.pprint("SUCCESS : {}".format(success_messages))
    pprint.pprint("FAILURE : {}".format(failure_messages))

    # you can of course separate them and dump them into a file for follow up investigations
    with open("successes.log", "w+") as sf:
        for sm in success_messages:
            sf.write(str(sm))

    with open("failures.log", "w+") as ff:
        for fm in failure_messages:
            ff.write(str(fm))

jyk4100 avatar Jun 12 '25 19:06 jyk4100

Kusto has two methods of ingestion tracking: Table, and Queues.

The queues method is more complicated , the sample you provided shows how to use it - you have to manually and repeatedly check the queues for your results.

The table method is not implemented in python, but will be in a wider effort of "Ingest V2", scheduled to come out in the next half year.

Either way, I recommend reading this about tracking - https://learn.microsoft.com/en-us/kusto/api/netfx/kusto-ingest-client-status?view=microsoft-fabric#tracking-ingestion-status-kustoqueuedingestclient

Usually it's done in rare cases, which is why it wasn't prioritized to be implemented in all of the SDKs.

What's your use case for tracking ingestions in python.

AsafMah avatar Jun 15 '25 11:06 AsafMah

Our team (part of MSFT) needs to publish aggregate metrics (produced by a scheduled job) on grafana dashboard and easiest solution is to write data to Kusto cluster that is already set up with grafana dashboard. We don't want job to complete and "succeed" when there were data ingestion error so our job is manually doing queued ingestion tracking which can take up to 10 minutes.

jyk4100 avatar Jun 25 '25 17:06 jyk4100

I can suggest a few alternatives.

  1. If your data items are smaller than 4MB of uncompressed data you may want to consider streaming the data as the API is synchronous
  2. Assuming ingestion failures are rare, you can consider using .show ingestion failures to alerts you when ingestions are not working correctly (without programmatic mitigation)
  3. You can implement queue-based tracking manually (least recommended)
  4. You can switch to Go or C# where table-based tracking is implemented

yogilad avatar Jun 26 '25 07:06 yogilad

Also consider,

  • https://learn.microsoft.com/en-us/azure/data-explorer/monitor-queued-ingestion
  • https://techcommunity.microsoft.com/blog/azuredataexplorer/how-to-monitor-azure-data-explorer-ingestion-using-diagnostic-logs-preview/1107252

yogilad avatar Jun 26 '25 08:06 yogilad

Closing - alternatives given + dup of #470

AsafMah avatar Jul 06 '25 11:07 AsafMah