swift-kafka-client icon indicating copy to clipboard operation
swift-kafka-client copied to clipboard

Listen and propagate RD_KAFKA_RESP_ERR__PARTITION_EOF

Open blindspotbounty opened this issue 2 years ago • 7 comments

Sometimes it is nice to know that partition/topic was read to EOF and it is supported by librdkafka. It should be explicitly enabled with property enable.partition.eof=true and error is handled, e.g.:

        for _ in 0..<maxEvents {
            let event = rd_kafka_queue_poll(self.queue, 0)
            defer { rd_kafka_event_destroy(event) }

            let rdEventType = rd_kafka_event_type(event)
            guard let eventType = RDKafkaEvent(rawValue: rdEventType) else {
                fatalError("Unsupported event type: \(rdEventType)")
            }

            switch eventType {
            case .error:
                let err = rd_kafka_event_error(event)
                if err == RD_KAFKA_RESP_ERR__PARTITION_EOF {
                    let topicPartition = rd_kafka_event_topic_partition(event)
                    if let topicPartition {
                    ... return events

Probably, it could be extended with current api e.g.:

public struct KafkaConsumerMessage {
    /// The topic that the message was received from.
    public var topic: String
    /// The partition that the message was received from.
    public var partition: KafkaPartition
    /// The key of the message.
    public var key: ByteBuffer?
    /// The body of the message.
    public var value: ByteBuffer
    /// The offset of the message in its partition.
    public var offset: KafkaOffset
    var eof: Bool {
        self.value.readableBytesView.isEmpty
    }

    /// Initialize ``KafkaConsumerMessage`` as EOF from `rd_kafka_topic_partition_t` pointer.
    /// - Throws: A ``KafkaError`` if the received message is an error message or malformed.
    internal init(topicPartitionPointer: UnsafePointer<rd_kafka_topic_partition_t>) {
        let topicPartition = topicPartitionPointer.pointee
        guard let topic = String(validatingUTF8: topicPartition.topic) else {
            fatalError("Received topic name that is non-valid UTF-8")
        }
        self.topic = topic
        self.partition = KafkaPartition(rawValue: Int(topicPartition.partition))
        self.offset = KafkaOffset(rawValue: Int(topicPartition.offset))
        self.value = ByteBuffer()
    }

or changed to enum:

enum KafkaConsumerMessage {
    case message(topic: String, partition: KafkaPartition, key: ByteBuffer?, value: ByteBuffer, offset: KafkaOffset)
    case eof(topic: String, partition: KafkaPartition, offset: KafkaOffset)
}

blindspotbounty avatar Aug 28 '23 13:08 blindspotbounty

@felixschlegel Can you check this?

FranzBusch avatar Sep 04 '23 09:09 FranzBusch

Hi @FranzBusch and @felixschlegel!

Could you advise how can I help you to move it forward, please?

blindspotbounty avatar Oct 03 '23 14:10 blindspotbounty

IMO we should make the KafkaConsumerMessages AsyncSequence throw when we encounter RD_KAFKA_RESP_ERR__PARTITION_EOF since this is an option that is explicitly set by the user.

Alternatively, we could emit this as an event in the KafkaConsumerEvents AsyncSequence though I am more inclined towards just throwing in KafkaConsumerMessages.

On that note: for this feature, we would also have to expose an option like isPartitionEOFEnabled to the KafkaConsumerConfiguration.

felixschlegel avatar Oct 04 '23 13:10 felixschlegel

Can we ever recover from a partition EOF error? Like does this happen during rebalance or is this really a terminal state

FranzBusch avatar Oct 04 '23 14:10 FranzBusch

Can we ever recover from a partition EOF error? Like does this happen during rebalance or is this really a terminal state

Not a terminal state afaik, just means that there are no more messages to read

felixschlegel avatar Oct 04 '23 14:10 felixschlegel

Ah right, it is just when we hit the end of the partition and then we can still continue once more messages have been produced. So finishing the sequence is not the correct thing. I think the only thing we can really do is have an enum on the sequence. If we produce the EOF event into a separate sequence then there might be reordering problems when consuming both sequences where the EOF happens before the last message.

Overall, I am wondering how the consumption pattern look like and what you do when you hit EOF. @blindspotbounty could you provide some examples?

FranzBusch avatar Oct 04 '23 16:10 FranzBusch

Hi @FranzBusch, thank you for looking into this case!

That would be great to have such enum in KafkaConsumerMessage to determine that partition is read up to the end. The major thing for us is to determine that all messages are read after adding new or old service recovered/started successfully. Hitting partition EOF for all partitions for us means that now we are up to date and process real time data for partitions. Other less important but still nice to have use-case is related to offset commits: usually commit every message is too slow, so we commit them time to time (after some number of entries or time passed). However, when there is no more messages we wait for next one. Partition EOF solves this as we know that no more messages will be received within partition and thus offset can be committed. Currently we solve main problem by parsing statistics on partitions from librdkafka. That adds extra overhead from librdkafka to construct json and from our side to parse it. Additionally, it is less reliable because statistics is provided time to time, so with large message flow it can be hard to catch statistics with zero lag.

Looking forward to help/provide more details if needed!

blindspotbounty avatar Oct 09 '23 12:10 blindspotbounty