druid
druid copied to clipboard
`KafkaSupervisor` not entering idle mode when last message is sent inside a kafka transaction
Affected Version
Reproduced with:
- v29.0.1
- v30.0.0
Description
We observed that the KafkaSupervisor
won't enter idle mode after processing all messages even though no new messages are produced in inactiveAfterMillis
duration.
We traced this problem down to cases were the last message has been produced inside a kafka transaction.
In this case, the offset lag reported in the supervisor status only reduces to a minium of 1 instead of 0 when all messages have been processed.
This is caused by currentOffsets
being less than latestOffsets
in the supervisor status.
Our assumption is that the offset lag is wrongly computed because the latestOffsets
might point to kafka transaction markers (more on that in section Mutual error cause).
Note: This issue only appears when the last message has been written as part of a kafka transaction. The lastestOffsets
will point to the offset after the last written (possibly aborted) message which corresponds to the transaction marker.
When producing a non-transactional message after an arbitrary amount of transactional messages the lag is correctly calculated as 0 and idle mode is entered.
Minimal example
The following is tested against a single topic & single partition configuration. See last section for a full reproduction of this behaviour.
Consider we insert only one single message into a kafka topic as part of a kafka transaction.
This message will have the offset 0.
As the kafka supervisor reports the next offset to be processed we would expect the supervisor to report currentOffsets == latestOffsets == 1
.
But the status is actually reported as:
-
currentOffsets
(actually next to-be-processed offset): 1 -
latestOffsets
(expected next to-be-processed offset): 2
This leads to a lag of 1 which should indicate that there is 1 message to consume. But as there are no more messages this is unexpected behaviour.
Example status after single message with offset 0
{
"dataSource": "test-idle-mode",
"stream": "test-idle-mode",
"partitions": 1,
"replicas": 1,
"durationSeconds": 3600,
"activeTasks": [
{
"id": "index_kafka_test-idle-mode_f982fc91af103bc_nehpmfih",
"startingOffsets": {
"0": 0
},
"startTime": "2024-07-03T15:04:34.022Z",
"remainingSeconds": 3568,
"type": "ACTIVE",
"currentOffsets": {
"0": 1
},
"lag": {
"0": 1
}
}
],
"publishingTasks": [],
"latestOffsets": {
"0": 2
},
"minimumLag": {
"0": 1
},
"aggregateLag": 1,
"offsetsLastUpdated": "2024-07-03T15:05:02.179Z",
"suspended": false,
"healthy": true,
"state": "RUNNING",
"detailedState": "RUNNING",
"recentErrors": []
}
We further found out that it is possible to further increment the latestOffsets
when producing messages in an aborted transaction.
The currentOffsets
are not changed by this as they seems to correctly report the offset of the last processed message +1.
But latestOffsets
is now reported as 4, leading to a lag of 3 but there are still no new messages to process.
Updated status after producing another message and aborting transaction
{
"dataSource": "test-idle-mode",
"stream": "test-idle-mode",
"partitions": 1,
"replicas": 1,
"durationSeconds": 3600,
"activeTasks": [
{
"id": "index_kafka_test-idle-mode_f982fc91af103bc_nehpmfih",
"startingOffsets": {
"0": 0
},
"startTime": "2024-07-03T15:04:34.022Z",
"remainingSeconds": 1607,
"type": "ACTIVE",
"currentOffsets": {
"0": 1
},
"lag": {
"0": 3
}
}
],
"publishingTasks": [],
"latestOffsets": {
"0": 4
},
"minimumLag": {
"0": 3
},
"aggregateLag": 3,
"offsetsLastUpdated": "2024-07-03T15:37:32.514Z",
"suspended": false,
"healthy": true,
"state": "RUNNING",
"detailedState": "RUNNING",
"recentErrors": []
}
We can rule out issues with the isolation.level
because the latestOffsets
only get updated once the transaction is either committed or aborted. So the process seems to adhere to isolation.level = read_committed
.
By producing a new message without a transaction it is now possible to reduce the lag to 0.
This will result in currentOffsets == latestOffsets == 5
(after processing just 2 valid messages!).
Status after producing non-transactional message
{
"dataSource": "test-idle-mode",
"stream": "test-idle-mode",
"partitions": 1,
"replicas": 1,
"durationSeconds": 3600,
"activeTasks": [
{
"id": "index_kafka_test-idle-mode_f982fc91af103bc_nehpmfih",
"startingOffsets": {
"0": 0
},
"startTime": "2024-07-03T15:04:34.022Z",
"remainingSeconds": 917,
"type": "ACTIVE",
"currentOffsets": {
"0": 5
},
"lag": {
"0": 0
}
}
],
"publishingTasks": [],
"latestOffsets": {
"0": 5
},
"minimumLag": {
"0": 0
},
"aggregateLag": 0,
"offsetsLastUpdated": "2024-07-03T15:49:02.588Z",
"suspended": false,
"healthy": true,
"state": "RUNNING",
"detailedState": "RUNNING",
"recentErrors": []
}
Mutual error cause
Based on the sudden jumps in the offset we repeated the experiment consuming the topic in isolation.level = read_uncommitted
mode in a separate process.
This results in the following offsets after the respective produce calls:
Produce call Offset
(1) first message (transaction committed) [0]
(2) failed message (transaction aborted) [2]
(3) non-transactional message (no transaction) [4]
This raises the question what happened with the offsets 1 and 3 which is where the Kafka transaction comes in. These offsets are used by kafka for the transaction markers which signal if the transaction was committed or aborted. These messages are not intended to be processed by the consumer as a usual message.
So actually the topic looks like this:
Produce call Offset Relevant position for current/lastestOffsets @ produce call
(1) first message (transaction committed) [0] currentOffset (1, 2)
(1) [TRANSACTION COMMITTED] [1] latestOffset (1)
(2) failed message (transaction aborted) [2]
(2) [TRANSACTION ABORTED] [3] latestOffset (2)
(3) non-transactional message (no transaction) [4] currentOffset (3), lastOffset (3)
Unfortunately the KafkaSupervisor
seems to use the offsets of the transaction markers to determine the latestOffsets
.
The transaction marker offsets will never be processed by the ingestion task and can therefore never be considered for the currentOffsets
.
This is why we are left with a partition lag > 0 even though there aren't any messages left to process.
This makes it impossible for the current implementation of the idle mode to detect that there are no more messages to process (if the lastestOffsets
point to a transaction marker). The supervisor will just keep running and spawning tasks forever.
Possible solution
The method for determining latestOffsets
needs to be updated so that it determines the latest available offset that can be consumed by the indexing task (especially ignoring transaction markers).
Unfortunately I was not able to identify the exact code sections that would need to be adapted.
But it will very likely have something to do with KafkaRecordSupplier.seekToLatest
(src) or where it is called by the supervisor.
Reproduction
Code examples (python) have authentication details redacted. Adapt the snippets according to your setup.
-
Create an empty kafka topic
test-idle-mode
-
Insert single message in transaction -> it will have offset 0
[`Python`] Produce message in transaction
# Produce one message in a transaction import json from confluent_kafka import Producer producer = Producer({ "bootstrap.servers": "<redacted>", ..., # auth redacted "transactional.id": "idletest" }) producer.init_transactions() producer.begin_transaction() producer.produce( topic="test-idle-mode", key="", value=json.dumps( {"__time": 1577833200000, "value": 1} ), on_delivery=print, ) producer.commit_transaction() producer.flush() producer.poll(0)
-
Create a KafkaSupervisor based on the topic
test-idle-mode
Ingestion spec
{ "type": "kafka", "spec": { "dataSchema": { "dataSource": "test-idle-mode", "timestampSpec": { "column": "__time", "format": "millis", "missingValue": null }, "dimensionsSpec": { "dimensions": [ { "type": "long", "name": "value", "multiValueHandling": "SORTED_ARRAY", "createBitmapIndex": false } ], "dimensionExclusions": [ "__time" ], "includeAllDimensions": false, "useSchemaDiscovery": false }, "metricsSpec": [], "granularitySpec": { "type": "uniform", "segmentGranularity": "DAY", "queryGranularity": { "type": "none" }, "rollup": false, "intervals": [] }, "transformSpec": { "filter": null, "transforms": [] } }, "ioConfig": { "topic": "test-idle-mode", "topicPattern": null, "inputFormat": { "type": "json", "keepNullColumns": false, "assumeNewlineDelimited": false, "useJsonNodeReader": false }, "replicas": 1, "taskCount": 1, "taskDuration": "PT3600S", "consumerProperties": { "bootstrap.servers": <redacted>, "druid.dynamic.config.provider": { "type": "environment", "variables": { <auth - redacted> } }, "idleConfig": { "enabled": true, "inactiveAfterMillis": 60000 } }, "autoScalerConfig": null, "pollTimeout": 100, "startDelay": "PT5S", "period": "PT30S", "useEarliestOffset": true, "completionTimeout": "PT1800S", "lateMessageRejectionPeriod": null, "earlyMessageRejectionPeriod": null, "lateMessageRejectionStartDateTime": null, "configOverrides": null, "idleConfig": null, "stopTaskCount": null, "stream": "test-idle-mode", "useEarliestSequenceNumber": true }, "tuningConfig": { "type": "kafka", "appendableIndexSpec": { "type": "onheap", "preserveExistingMetrics": false }, "maxRowsInMemory": 150000, "maxBytesInMemory": 0, "skipBytesInMemoryOverheadCheck": false, "maxRowsPerSegment": 5000000, "maxTotalRows": null, "intermediatePersistPeriod": "PT10M", "maxPendingPersists": 0, "indexSpec": { "bitmap": { "type": "roaring" }, "dimensionCompression": "lz4", "stringDictionaryEncoding": { "type": "utf8" }, "metricCompression": "lz4", "longEncoding": "longs" }, "indexSpecForIntermediatePersists": { "bitmap": { "type": "roaring" }, "dimensionCompression": "lz4", "stringDictionaryEncoding": { "type": "utf8" }, "metricCompression": "lz4", "longEncoding": "longs" }, "reportParseExceptions": false, "handoffConditionTimeout": 900000, "resetOffsetAutomatically": false, "segmentWriteOutMediumFactory": null, "workerThreads": null, "chatRetries": 8, "httpTimeout": "PT10S", "shutdownTimeout": "PT80S", "offsetFetchPeriod": "PT30S", "intermediateHandoffPeriod": "P2147483647D", "logParseExceptions": false, "maxParseExceptions": 2147483647, "maxSavedParseExceptions": 0, "numPersistThreads": 1, "skipSequenceNumberAvailabilityCheck": false, "repartitionTransitionDuration": "PT120S" } }, "context": null, "suspended": false }
-
When checking the supervisor after the task started, the
currentOffsets
are 1 (last message offset 0 + 1 = 1) butlatestOffsets
is 2 instead of 1.[`Status`] Example status after single message with offset 0
This is the same as in section "Example".
{ "dataSource": "test-idle-mode", "stream": "test-idle-mode", "partitions": 1, "replicas": 1, "durationSeconds": 3600, "activeTasks": [ { "id": "index_kafka_test-idle-mode_f982fc91af103bc_nehpmfih", "startingOffsets": { "0": 0 }, "startTime": "2024-07-03T15:04:34.022Z", "remainingSeconds": 3568, "type": "ACTIVE", "currentOffsets": { "0": 1 }, "lag": { "0": 1 } } ], "publishingTasks": [], "latestOffsets": { "0": 2 }, "minimumLag": { "0": 1 }, "aggregateLag": 1, "offsetsLastUpdated": "2024-07-03T15:05:02.179Z", "suspended": false, "healthy": true, "state": "RUNNING", "detailedState": "RUNNING", "recentErrors": [] }
-
Insert single message in transaction but abort it -> unavailable message will have offset 2
[`Python`] Produce message and abort transaction
# Produce one message in a transaction import json from confluent_kafka import Producer producer = Producer({ "bootstrap.servers": "<redacted>", ..., # auth redacted "transactional.id": "idletest" }) producer.init_transactions() producer.begin_transaction() producer.produce( topic="test-idle-mode", key="", value=json.dumps( {"__time": 1577833201000, "value": 1} ), on_delivery=print, ) # Make sure message is sent to broker before aborting the transaction producer.flush() producer.poll(0) producer.abort_transaction() producer.flush() producer.poll(0)
-
The supervisor status still reports
currentOffsets == 1
butlatestOffsets
is now 4.[`Status`] Updated status after producing another message and aborting transaction
{ "dataSource": "test-idle-mode", "stream": "test-idle-mode", "partitions": 1, "replicas": 1, "durationSeconds": 3600, "activeTasks": [ { "id": "index_kafka_test-idle-mode_f982fc91af103bc_nehpmfih", "startingOffsets": { "0": 0 }, "startTime": "2024-07-03T15:04:34.022Z", "remainingSeconds": 1607, "type": "ACTIVE", "currentOffsets": { "0": 1 }, "lag": { "0": 3 } } ], "publishingTasks": [], "latestOffsets": { "0": 4 }, "minimumLag": { "0": 3 }, "aggregateLag": 3, "offsetsLastUpdated": "2024-07-03T15:37:32.514Z", "suspended": false, "healthy": true, "state": "RUNNING", "detailedState": "RUNNING", "recentErrors": [] }
-
Produce a non-transactional message (will have offset 4)
[`Python`] Produce message without transaction
# Produce one message without transaction import json from confluent_kafka import Producer producer = Producer({ "bootstrap.servers": "<redacted>", ..., # auth redacted }) producer.produce( topic="test-idle-mode", key="", value=json.dumps( {"__time": 1577833202000, "value": 1} ), on_delivery=print, ) producer.flush() producer.poll(0)
-
New supervisor status with
currentOffsets == latestOffsets == 5
(now finally able to enter idle mode)[`Status`] Status after producing non-transactional message
{ "dataSource": "test-idle-mode", "stream": "test-idle-mode", "partitions": 1, "replicas": 1, "durationSeconds": 3600, "activeTasks": [ { "id": "index_kafka_test-idle-mode_f982fc91af103bc_nehpmfih", "startingOffsets": { "0": 0 }, "startTime": "2024-07-03T15:04:34.022Z", "remainingSeconds": 917, "type": "ACTIVE", "currentOffsets": { "0": 5 }, "lag": { "0": 0 } } ], "publishingTasks": [], "latestOffsets": { "0": 5 }, "minimumLag": { "0": 0 }, "aggregateLag": 0, "offsetsLastUpdated": "2024-07-03T15:49:02.588Z", "suspended": false, "healthy": true, "state": "RUNNING", "detailedState": "RUNNING", "recentErrors": [] }