decaton
decaton copied to clipboard
Expose TopicPartition and offset from ProcessingContext
We develop Kafka consume using decaton such as
- Consume messages from Kafka
- Write messages into file and make a large file containing 10K and more messages
- We would like create 100MB+ files, if single message size is 300B and 50% compression ratio, the number of messages contained will be 700K
- Thus we specify
huge decaton.max.pending.records
like 100K, 1M
- We'd like to commit offset for all messages are persisted in the file
- We use external storage and we assume data is persisted when the file descriptor is closed successfully
- So we don't like to commit offset until we close it
We used deferred completion and tried to keep all deferred completions, however, we figured out we need huge heap, otherwise OOME. Finally, we solved this by keeping the deferred completion for the smallest offset for each topic partition per file.
This worked well, however, we need ugly and fragile code to take topic partition and offset for current message from processing context like
if (context is ProcessingContextImpl) {
try {
val clazz = ProcessingContextImpl::class.java
val requestField = clazz.getDeclaredField("request")
requestField.isAccessible = true
val request = requestField.get(context) as TaskRequest
val topicPartition = request.topicPartition()
val offset = request.recordOffset()
return Pair(topicPartition, offset)
} catch (e: ReflectiveOperationException) {
logger.debug("Fallback to take topicPartitionAndOffset from logging context", e)
}
}
context.loggingContext().use {
val topic = MDC.get(LoggingContext.TOPIC_KEY)
if (topic == null) {
logger.warn(
"Cannot find topicPartitionAndOffset from MDC, " +
"configure decaton.logging.mdc.enabled=true",
)
return Pair(TopicPartition("dummy", 0), 0)
}
val partition = MDC.get(LoggingContext.PARTITION_KEY).toInt()
val offset = MDC.get(LoggingContext.OFFSET_KEY).toLong()
return Pair(TopicPartition(topic, partition), offset)
}
I'm wondering if ProcessingContext can provide such info officially, and it would be useful for some usecase for advanced users like us.
If you don't like to expose interface of Kafka, I'd like to have an object having methods to
- check the task A is coming from the same group (topic partition for kafka) of another task B (A.topic-partition == B.topic-parition?)
- check the task A is prior than another task B in the same group (A.offset < B.offset?)
Thanks for reporting the issue.
I have a question about the situation.
Finally, we solved this by keeping the deferred completion for the smallest offset for each topic partition per file.
Does it mean, just holding DeferredCompletion
instances would cause OOME? (i.e. Even we don't hold task data itself)
I wonder if solving #217 is enough for your problem or we still need workaround to track smallest DeferredCompletion
even we address #217.
Right, DeferredCompletion and related (CompletableFuture and TaskRequest) objects consume memory. Also, the list contains DeferredCompletion consumes memory.
So we still need workaround for DeferredCompletion even if #217 is solved.
#217 is an improvement when consuming many messages faster than processor in case of huge decaton.max.pending.records
.
I'll share the heap analysis image with you in a different channel.