azure-kusto-python
azure-kusto-python copied to clipboard
Python QueuedIngestClient Report
Feature Suggestion
Please correct me if I am wrong.
I came to figure out ways to find out if Kusto ingestion using a Python client succeed or failed. If client (QueuedIngestClient) is initialized with ReportLevel.FailuresAndSuccesses then it should have methods to validate and report failure without having to create and pull from KustoIngestStatusQueues.
It appears there is relatively straightforward on C# (below snip from official doc) Ingest and Validate. However, I am not finding comparable method in the python QueuedIngestClient class.
// Retrieve and validate failures
var ingestionFailures = await client.PeekTopIngestionFailuresAsync();
Ensure.IsTrue(ingestionFailures.Any(), "The failed ingestion should have been reported to the failed ingestions queue");
// Retrieve, delete and validate failures
ingestionFailures = await client.GetAndDiscardTopIngestionFailuresAsync();
Ensure.IsTrue(ingestionFailures.Any(), "The failed ingestion should have been reported to the failed ingestions queue");
// Verify the success has also been reported to the queue
var ingestionSuccesses = await client.GetAndDiscardTopIngestionSuccessesAsync();
Ensure.ConditionIsMet(ingestionSuccesses.Any(), "The successful ingestion should have been reported to the successful ingestions queue");
"Bug" Report
++ I came across this code example but looks like this is an infinite while loop as there is no exit condition based on ` https://github.com/Azure/azure-kusto-python/blob/master/azure-kusto-ingest/tests/sample.py#L133
import pprint
import time
from azure.kusto.ingest.status import KustoIngestStatusQueues
qs = KustoIngestStatusQueues(client)
MAX_BACKOFF = 180
backoff = 1
while True:
################### NOTICE ####################
# in order to get success status updates,
# make sure ingestion properties set the
# reportLevel=ReportLevel.FailuresAndSuccesses.
if qs.success.is_empty() and qs.failure.is_empty():
time.sleep(backoff)
backoff = min(backoff * 2, MAX_BACKOFF)
print("No new messages. backing off for {} seconds".format(backoff))
continue
backoff = 1
success_messages = qs.success.pop(10)
failure_messages = qs.failure.pop(10)
pprint.pprint("SUCCESS : {}".format(success_messages))
pprint.pprint("FAILURE : {}".format(failure_messages))
# you can of course separate them and dump them into a file for follow up investigations
with open("successes.log", "w+") as sf:
for sm in success_messages:
sf.write(str(sm))
with open("failures.log", "w+") as ff:
for fm in failure_messages:
ff.write(str(fm))
Kusto has two methods of ingestion tracking: Table, and Queues.
The queues method is more complicated , the sample you provided shows how to use it - you have to manually and repeatedly check the queues for your results.
The table method is not implemented in python, but will be in a wider effort of "Ingest V2", scheduled to come out in the next half year.
Either way, I recommend reading this about tracking - https://learn.microsoft.com/en-us/kusto/api/netfx/kusto-ingest-client-status?view=microsoft-fabric#tracking-ingestion-status-kustoqueuedingestclient
Usually it's done in rare cases, which is why it wasn't prioritized to be implemented in all of the SDKs.
What's your use case for tracking ingestions in python.
Our team (part of MSFT) needs to publish aggregate metrics (produced by a scheduled job) on grafana dashboard and easiest solution is to write data to Kusto cluster that is already set up with grafana dashboard. We don't want job to complete and "succeed" when there were data ingestion error so our job is manually doing queued ingestion tracking which can take up to 10 minutes.
I can suggest a few alternatives.
- If your data items are smaller than 4MB of uncompressed data you may want to consider streaming the data as the API is synchronous
- Assuming ingestion failures are rare, you can consider using
.show ingestion failuresto alerts you when ingestions are not working correctly (without programmatic mitigation) - You can implement queue-based tracking manually (least recommended)
- You can switch to Go or C# where table-based tracking is implemented
Also consider,
- https://learn.microsoft.com/en-us/azure/data-explorer/monitor-queued-ingestion
- https://techcommunity.microsoft.com/blog/azuredataexplorer/how-to-monitor-azure-data-explorer-ingestion-using-diagnostic-logs-preview/1107252
Closing - alternatives given + dup of #470