parallel-consumer icon indicating copy to clipboard operation
parallel-consumer copied to clipboard

Parallel Consumer does not get records from poll after OFFSET_OUT_OF_RANGE in auto.offset.reset = earliest

Open jikeshdpatel opened this issue 3 years ago • 8 comments
trafficstars

Discussed in https://github.com/confluentinc/parallel-consumer/discussions/351

Originally posted by jikeshdpatel July 14, 2022 Hi @astubbs , this is a great library and improves the performance drastically. However, I am seeing some strange issue. I feel like it is because of the auto_offset_reset = earliest.

  • Populated events in topic
  • Started parallel consumer with auto_offset_reset = earliest and it started consuming from beginning.
  • Stopped the process let's say at offset 5000, at partition 3 and PC was behind 1000 events. Similarly stats for other 5 partitions.
  • Waited until the topic's retention period.
  • Produced more records. waited for retention period and then produced more records.
  • Started PC and it only consumes from a partition 5 (last partition). I am not sure whether PC was lagging behind on partition 5.
  • I have started a spring kafka listener (with auto_offset_reset = earliest) for partition 3 and it started consuming
  • Stopped spring kafka and started PC (with auto_offset_reset = earliest) and it now consumes from partitions 5 and 3
  • Stopped PC and started it with auto_offset_reset = latest and now it starts consuming from all the partitions.

The processing order was based on KEY.

Can you please advise on how should I further debug it ?

Thank you for your help!

Below are the config: 2022-07-14 17:17:32.036 INFO [main] [org.apache.kafka.coborker01mmon.config.AbstractConfig.logAll(AbstractConfig.java:372)] - ConsumerConfig values: allow.auto.create.topics = true auto.commit.interval.ms = 5000 auto.offset.reset = earliest bootstrap.servers = [broker01:9092] check.crcs = true client.dns.lookup = use_all_dns_ips client.id = client-01 client.rack = connections.max.idle.ms = 540000 default.api.timeout.ms = 60000 enable.auto.commit = false exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = app-group-01 group.instance.id = null heartbeat.interval.ms = 3000 interceptor.classes = [] internal.leave.group.on.close = true internal.throw.on.fetch.stable.offset.unsupported = false isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 500 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = [hidden] sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = PLAIN security.protocol = SASL_SSL security.providers = null send.buffer.bytes = 131072 session.timeout.ms = 10000 socket.connection.setup.timeout.max.ms = 30000 socket.connection.setup.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.3] ssl.endpoint.identification.algorithm = https ssl.engine.factory.class = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.certificate.chain = null ssl.keystore.key = null ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLSv1.3 ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.certificates = null ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer 2022-07-14 17:17:32.038 DEBUG [main] [org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:696)] - [Consumer clientId=client-01, groupId=app-group-01] Initializing the Kafka consumer 2022-07-14 17:17:32.135 INFO [main] [org.apache.kafka.common.security.authenticator.AbstractLogin.login(AbstractLogin.java:61)] - Successfully logged in. 2022-07-14 17:17:32.141 DEBUG [main] [org.apache.kafka.common.security.ssl.DefaultSslEngineFactory.createSSLContext(DefaultSslEngineFactory.java:264)] - Created SSL context with keystore null, truststore null, provider SunJSSE. 2022-07-14 17:17:32.192 INFO [main] [org.apache.kafka.common.utils.AppInfoParser$AppInfo.(AppInfoParser.java:119)] - Kafka version: 6.2.1-ccs 2022-07-14 17:17:32.192 INFO [main] [org.apache.kafka.common.utils.AppInfoParser$AppInfo.(AppInfoParser.java:120)] - Kafka commitId: fa4bec046a2df3a6 2022-07-14 17:17:32.192 INFO [main] [org.apache.kafka.common.utils.AppInfoParser$AppInfo.(AppInfoParser.java:121)] - Kafka startTimeMs: 1657833452190 2022-07-14 17:17:32.193 DEBUG [main] [org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:815)] - [Consumer clientId=client-01, groupId=app-group01] Kafka consumer initialized 2022-07-14 17:17:32.199 INFO [main] [io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.(AbstractParallelEoSStreamProcessor.java:231)] - Confluent Parallel Consumer initialise... Options: ParallelConsumerOptions(consumer=org.apache.kafka.clients.consumer.KafkaConsumer@6c42c856, producer=null, managedExecutorService=java:comp/DefaultManagedExecutorService, managedThreadFactory=java:comp/DefaultManagedThreadFactory, ordering=KEY, commitMode=PERIODIC_CONSUMER_ASYNCHRONOUS, maxConcurrency=100, defaultMessageRetryDelay=PT1S, retryDelayProvider=null, sendTimeout=PT10S, offsetCommitTimeout=PT10S, batchSize=1, thresholdForTimeSpendInQueueWarning=PT10S, maxFailureHistory=10)

Below are logs : OFFSET_OUT_OF_RANGE partition, it does reset the offset but does not get any records. However, there are records with the new offset value.

2022-07-14 17:17:39.407 DEBUG [pc-broker-ptest-topic-1oll] [org.apacborker01he.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:314)] - [Consumer clientId=client-01, groupId=app-group-01] Fetctest-topich READ_UNCOMMITTED at offset 842285 for partition test-topic-1 returned fetch data (error=OFFSET_OUT_OF_RANGE, highWaterMark=-1, lastStableOffset = -1, logStartOffset = -1, preferredReadReplica = absent, abortedTransacticlient-01ons = null, divergingEpoch =Optional.empty, recordsSizeInBytes=0) 2022-07-14 17:17:39.407 INFO [pc-broker-poll] [org.apache.kafka.clients.consumer.internals.Fetcher.handleOffsetOutOfRange(Fetcher.java:1360)] - [Consumer clientId=client-01, groupId=app-group-01] Fetch position FetchPosition{offset=842285, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[borker01:9092 (id: 21 rack: 0)], epoch=12}} is out of range for partition test-topic-1, resetting offset 2022-07-14 17:17:39.407 DEBUG [pc-broker-poll] [org.apache.kafka.clients.consumer.internals.Fetcher.sendListOffsetRequest(Fetcher.java:983)] - [Consumer clientId=client-01, groupId=app-group-01] Sending ListOffsetRequest ListOffsetsRequestData(replicaId=-1, isolationLevel=0, topics=[ListOffsetsTopic(name='test-topic', partitions=[ListOffsetsPartition(partitionIndex=1, currentLeaderEpoch=12, timestamp=-2, maxNumOffsets=1)])]) to broker borker01:9092 (id: 21 rack: 0) 2022-07-14 17:17:39.407 DEBUG [pc-broker-poll] [org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:522)] - [Consumer clientId=client-01, groupId=app-group-01] Sending LIST_OFFSETS request with header RequestHeader(apiKey=LIST_OFFSETS, apiVersion=6, clientId=client-01, correlationId=25) and timeout 30000 to node 21: ListOffsetsRequestData(replicaId=-1, isolationLevel=0, topics=[ListOffsetsTopic(name='test-topic', partitions=[ListOffsetsPartition(partitionIndex=1, currentLeaderEpoch=12, timestamp=-2, maxNumOffsets=1)])]) 2022-07-14 17:17:39.451 DEBUG [pc-broker-poll] [org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:880)] - [Consumer clientId=client-01, groupId=app-group-01] Received LIST_OFFSETS response from node 21 for request with header RequestHeader(apiKey=LIST_OFFSETS, apiVersion=6, clientId=client-01, correlationId=25): ListOffsetsResponseData(throttleTimeMs=0, topics=[ListOffsetsTopicResponse(name='test-topic', partitions=[ListOffsetsPartitionResponse(partitionIndex=1, errorCode=0, oldStyleOffsets=[], timestamp=-1, offset=1197212, leaderEpoch=7)])]) 2022-07-14 17:17:39.451 DEBUG [pc-broker-poll] [org.apache.kafka.clients.consumer.internals.Fetcher.handleListOffsetResponse(Fetcher.java:1035)] - [Consumer clientId=client-01, groupId=app-group-01] Handling ListOffsetResponse response for test-topic-1. Fetched offset 1197212, timestamp -1 2022-07-14 17:17:39.451 DEBUG [pc-broker-poll] [org.apache.kafka.clients.Metadata.updateLastSeenEpochIfNewer(Metadata.java:183)] - [Consumer clientId=client-01, groupId=app-group-01] Not replacing existing epoch 12 with new epoch 7 for partition test-topic-1 2022-07-14 17:17:39.451 INFO [pc-broker-poll] [org.apache.kafka.clients.consumer.internals.SubscriptionState.maybeSeekUnvalidated(SubscriptionState.java:398)] - [Consumer clientId=client-01, groupId=app-group-01] Resetting offset for partition test-topic-1 to position FetchPosition{offset=1197212, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[borker01:9092 (id: 21 rack: 0)], epoch=12}}. 2022-07-14 17:17:39.452 DEBUG [pc-broker-poll] [org.apache.kafka.clients.consumer.internals.Fetcher.prepareFetchRequests(Fetcher.java:1198)] - [Consumer clientId=client-01, groupId=app-group-01] Added READ_UNCOMMITTED fetch request for partition test-topic-1 at position FetchPosition{offset=1197212, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[borker01:9092 (id: 21 rack: 0)], epoch=12}} to node borker01:9092 (id: 21 rack: 0) 2022-07-14 17:17:39.452 DEBUG [pc-broker-poll] [org.apache.kafka.clients.FetchSessionHandler$Builder.build(FetchSessionHandler.java:259)] - [Consumer clientId=client-01, groupId=app-group-01] Built incremental fetch (sessionId=1279947631, epoch=1) for node 21. Added 0 partition(s), altered 1 partition(s), removed 0 partition(s) out of 1 partition(s) 2022-07-14 17:17:39.452 DEBUG [pc-broker-poll] [org.apache.kafka.clients.consumer.internals.Fetcher.sendFetches(Fetcher.java:265)] - [Consumer clientId=client-01, groupId=app-group-01] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(test-topic-1), toForget=(), implied=()) to broker borker01:9092 (id: 21 rack: 0) 2022-07-14 17:17:39.452 DEBUG [pc-broker-poll] [org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:522)] - [Consumer clientId=client-01, groupId=app-group-01] Sending FETCH request with header RequestHeader(apiKey=FETCH, apiVersion=12, clientId=client-01, correlationId=26) and timeout 30000 to node 21: FetchRequestData(clusterId=null, replicaId=-1, maxWaitMs=500, minBytes=1, maxBytes=52428800, isolationLevel=0, sessionId=1279947631, sessionEpoch=1, topics=[FetchTopic(topic='test-topic', partitions=[FetchPartition(partition=1, currentLeaderEpoch=12, fetchOffset=1197212, lastFetchedEpoch=-1, logStartOffset=-1, partitionMaxBytes=1048576)])], forgottenTopicsData=[], rackId='') 2022-07-14 17:17:39.608 DEBUG [pc-broker-poll] [io.confluent.parallelconsumer.internal.ConsumerManager.poll(ConsumerManager.java:55)] - Poll completed normally (after timeout of PT2S) and returned 0... 2022-07-14 17:17:39.608 DEBUG [pc-broker-poll] [io.confluent.parallelconsumer.internal.BrokerPollSystem.pollBrokerForRecords(BrokerPollSystem.java:185)] - Poll completed 2022-07-14 17:17:39.608 DEBUG [pc-broker-poll] [io.confluent.parallelconsumer.internal.BrokerPollSystem.handlePoll(BrokerPollSystem.java:142)] - Got 0 records in poll result 2022-07-14 17:17:39.608 DEBUG [pc-broker-poll] [io.confluent.parallelconsumer.internal.BrokerPollSystem.pollBrokerForRecords(BrokerPollSystem.java:176)] - Subscriptions are paused: false 2022-07-14 17:17:39.608 DEBUG [pc-broker-poll] [io.confluent.parallelconsumer.internal.BrokerPollSystem.pollBrokerForRecords(BrokerPollSystem.java:182)] - Long polling broker with timeout PT2S, might appear to sleep here if subs are paused, or no data available on broker. Run state: running

jikeshdpatel avatar Jul 14 '22 22:07 jikeshdpatel

Are your offsets in your consumer group topic being expired? the default retention for offsets used to be 24 hours. Once the offsets are gone, it should start from the beginning like there was never any offsets committed.

I'm not quite following what you're describing. Are you able to write this as an integration test? You can copy the latest version of tests using TransactionMarkersTest as a template.

astubbs avatar Jul 15 '22 12:07 astubbs

Thanks @astubbs for your help. Really appreciate it! I am working on the integration tests and it might take time to reproduce thru tests. But in the meantime, the offsets in my consumer group for the topic are expired and I expect the same that it will start from earliest available new offset. I can see that log as below but it did not fetch any records. However when I start spring kafka using the same consumer group, I see the same logs as parallel consumer but it does fetch records with new offset returned by kafka.

2022-07-14 17:17:39.407 DEBUG [pc-broker-ptest-topic-1oll] [org.apacborker01he.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:314)] - [Consumer clientId=client-01, groupId=app-group-01] Fetctest-topich READ_UNCOMMITTED at offset **842285** for partition test-topic-1 returned fetch data (error=**OFFSET_OUT_OF_RANGE**, highWaterMark=-1, lastStableOffset = -1, logStartOffset = -1, preferredReadReplica = absent, abortedTransacticlient-01ons = null, divergingEpoch =Optional.empty, recordsSizeInBytes=0)

2022-07-14 17:17:39.451 INFO [pc-broker-poll] [org.apache.kafka.clients.consumer.internals.SubscriptionState.maybeSeekUnvalidated(SubscriptionState.java:398)] - [Consumer clientId=client-01, groupId=app-group-01] **Resetting offset** for partition test-topic-1 to position FetchPosition{offset=**1197212**, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[borker01:9092 (id: 21 rack: 0)], epoch=12}}. 

2022-07-14 17:17:39.452 DEBUG [pc-broker-poll] [org.apache.kafka.clients.consumer.internals.Fetcher.prepareFetchRequests(Fetcher.java:1198)] - [Consumer clientId=client-01, groupId=app-group-01] Added READ_UNCOMMITTED fetch request for partition test-topic-1 at position FetchPosition{offset=1197212, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[borker01:9092 (id: 21 rack: 0)], epoch=12}} to node borker01:9092 (id: 21 rack: 0) 

2022-07-14 17:17:39.452 DEBUG [pc-broker-poll] [org.apache.kafka.clients.FetchSessionHandler$Builder.build(FetchSessionHandler.java:259)] - [Consumer clientId=client-01, groupId=app-group-01] Built incremental fetch (sessionId=1279947631, epoch=1) for node 21. Added 0 partition(s), altered 1 partition(s), removed 0 partition(s) out of 1 partition(s) 

2022-07-14 17:17:39.452 DEBUG [pc-broker-poll] [org.apache.kafka.clients.consumer.internals.Fetcher.sendFetches(Fetcher.java:265)] - [Consumer clientId=client-01, groupId=app-group-01] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(test-topic-1), toForget=(), implied=()) to broker borker01:9092 (id: 21 rack: 0) 

2022-07-14 17:17:39.452 DEBUG [pc-broker-poll] [org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:522)] - [Consumer clientId=client-01, groupId=app-group-01] Sending FETCH request with header RequestHeader(apiKey=FETCH, apiVersion=12, clientId=client-01, correlationId=26) and timeout 30000 to node 21: FetchRequestData(clusterId=null, replicaId=-1, maxWaitMs=500, minBytes=1, maxBytes=52428800, isolationLevel=0, sessionId=1279947631, sessionEpoch=1, topics=[FetchTopic(topic='test-topic', partitions=[FetchPartition(partition=1, currentLeaderEpoch=12, fetchOffset=1197212, lastFetchedEpoch=-1, logStartOffset=-1, partitionMaxBytes=1048576)])], forgottenTopicsData=[], rackId='') 

2022-07-14 17:17:39.608 DEBUG [pc-broker-poll] [io.confluent.parallelconsumer.internal.ConsumerManager.poll(ConsumerManager.java:55)] - Poll completed normally (after timeout of PT2S) and **returned 0**... 

2022-07-14 17:17:39.608 DEBUG [pc-broker-poll] [io.confluent.parallelconsumer.internal.BrokerPollSystem.pollBrokerForRecords(BrokerPollSystem.java:185)] - Poll completed 

2022-07-14 17:17:39.608 DEBUG [pc-broker-poll] [io.confluent.parallelconsumer.internal.BrokerPollSystem.handlePoll(BrokerPollSystem.java:142)] - Got 0 records in poll result 

2022-07-14 17:17:39.608 DEBUG [pc-broker-poll] [io.confluent.parallelconsumer.internal.BrokerPollSystem.pollBrokerForRecords(BrokerPollSystem.java:176)] - Subscriptions are paused: false 

2022-07-14 17:17:39.608 DEBUG [pc-broker-poll] [io.confluent.parallelconsumer.internal.BrokerPollSystem.pollBrokerForRecords(BrokerPollSystem.java:182)] - Long polling broker with timeout PT2S, might appear to sleep here if subs are paused, or no data available on broker. Run state: running

jikeshdpatel avatar Jul 15 '22 20:07 jikeshdpatel

Hi @astubbs I have tested it further and it looks like the parallel consumer is getting records. The issue I see is that the current offset in confluent cloud > consumer lag for it's group does not move when the parallel consumer instance failed to commit the processed records due to the partition was reassigned. What could be the reason behind not moving the current offset in confluent cloud even if I see the records are being processed from such partition ?

jikeshdpatel avatar Jul 20 '22 13:07 jikeshdpatel

Hi I suspect this is because PC doesn't know about "reset to earliest", so the underlying consumer re-downloads the records, but PC has already seen them so ignores them.

If the offset gets reset and the consumer starts from earliest again, do you want PC to also reprocess everything?

Was PC stopped, then offsets expired, then PC started again? Or did the offsets get expires while PC was running?

astubbs avatar Jul 20 '22 14:07 astubbs

Hi @astubbs thanks for response. Yes, I want at least once processing behavior. If last committed offset is reset and system is down for 24hrs then it will reprocess everything from earliest available event (no deleted from topic) instead of process only latest events. Although this should rarely happen in prod but I still don't want to loose events using offset reset to latest.

When I looked at my logs again, it looks like when there are some processed offsets in local to be committed to broker but due to rebalance after scaling or some other reason it was not committed then next time PC somehow knows that it was processed even though the offset commit was failed and don't reprocess but then on it always commits the last uncommitted offset which is already moved on. I have attached the last commit failure from partitions 0 - 4 and that's the offsets I see on confluent cloud as current offset. (unable to upload confluent cloud screen print)

cp-enricher-pc-60-auth5-1.log:ERROR 2022-07-19T16:59:47.204Z [pc-broker-poll][ConsumerCoordinator][][][][] : [Consumer clientId=AAAAA, groupId=BBBBB] Offset commit failed on partition sqs-jcard-riskcontrol-enricher-3 at offset 977994: The coordinator is not aware of this member.

cp-enricher-pc-60-auth5-2.log:ERROR 2022-07-19T16:59:47.141Z [pc-broker-poll][ConsumerCoordinator][][][][] : [Consumer clientId=AAAAA, groupId=BBBBB] Offset commit failed on partition sqs-jcard-riskcontrol-enricher-1 at offset 1269418: The coordinator is not aware of this member.

cp-enricher-pc-60-auth5-2020220720115455104.log.gz:ERROR 2022-07-20T15:43:00.207Z [pc-broker-poll][ConsumerCoordinator][][][][] : [Consumer clientId=AAAAA, groupId=BBBBB]Offset commit failed on partition sqs-jcard-riskcontrol-enricher-2 at offset 1124610: The coordinator is not aware of this member.

cp-enricher-pc-60-auth5-2020220720115455104.log.gz:ERROR 2022-07-20T15:51:58.043Z [pc-broker-poll][ConsumerCoordinator][][][][] : [Consumer clientId=AAAAA, groupId=BBBBB] Offset commit failed on partition sqs-jcard-riskcontrol-enricher-4 at offset 1072807: The coordinator is not aware of this member.

cp-enricher-pc-60-auth5-2120220720120005253.log.gz:ERROR 2022-07-20T15:51:57.968Z [pc-broker-poll][ConsumerCoordinator][][][][] : [Consumer clientId=AAAAA, groupId=BBBBB] Offset commit failed on partition sqs-jcard-riskcontrol-enricher-2 at offset 1124610: The coordinator is not aware of this member.

cp-enricher-pc-60-auth5-2120220720120005253.log.gz:ERROR 2022-07-20T15:51:58.044Z [pc-broker-poll][ConsumerCoordinator][][][][] : [Consumer clientId=AAAAA, groupId=BBBBB] Offset commit failed on partition sqs-jcard-riskcontrol-enricher-2 at offset 1124610: The coordinator is not aware of this member.

cp-enricher-pc-60-auth5-3.log:ERROR 2022-07-19T16:59:47.161Z [pc-broker-poll][ConsumerCoordinator][][][][] : [Consumer clientId=AAAAA, groupId=BBBBB] Offset commit failed on partition sqs-jcard-riskcontrol-enricher-2 at offset 1121707: The coordinator is not aware of this member.

cp-enricher-pc-60-auth5-4.log:ERROR 2022-07-19T16:59:55.376Z [kafka-coordinator-heartbeat-thread | fs-async-events-pc-auth5][ConsumerCoordinator][][][][] : [Consumer clientId=AAAAA, groupId=BBBBB]Offset commit failed on partition sqs-jcard-riskcontrol-enricher-4 at offset 1072807: The coordinator is not aware of this member.

cp-enricher-pc-60-auth5-5.log:ERROR 2022-07-19T16:59:47.061Z [pc-broker-poll][ConsumerCoordinator][][][][] : [Consumer clientId=AAAAA, groupId=BBBBB]Offset commit failed on partition sqs-jcard-riskcontrol-enricher-0 at offset 1205613: The coordinator is not aware of this member.

cp-enricher-pc-60-earlist-0.log:ERROR 2022-07-19T20:03:53.771Z [pc-broker-poll][ConsumerCoordinator][][][][] : [Consumer clientId=AAAAA, groupId=BBBBB] Offset commit failed on partition sqs-jcard-riskcontrol-enricher-1 at offset 1270220: The coordinator is not aware of this member.

Below are some additional logs. Please let me know if you want the committedMetadata hash.

cp-enricher-pc-60-auth5-1.log:ERROR 2022-07-19T16:50:56.605Z [pc-pool-3-thread-1][consumer1][partition=3][offset=977994]: Retriable exception to process message at 977994 . 

cp-enricher-pc-60-auth5-1.log:ERROR 2022-07-19T16:59:47.204Z [pc-broker-poll][ConsumerCoordinator][][][][] : [Consumer clientId=AAAAA, groupId=BBBBB] Offset commit failed on partition topic-name-3 at offset 977994: The coordinator is not aware of this member.

ERROR 2022-07-19T16:59:47.209Z [pc-broker-poll][BrokerPollSystem][][][][] : Unknown error
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:1262) ~[kafka-clients-6.2.1-ccs.jar:?]
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:1163) ~[kafka-clients-6.2.1-ccs.jar:?]
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1180) ~[kafka-clients-6.2.1-ccs.jar:?]
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1155) ~[kafka-clients-6.2.1-ccs.jar:?]

jikeshdpatel avatar Jul 20 '22 19:07 jikeshdpatel

At least once? Are you saying PC has been skipping records?

astubbs avatar Jul 20 '22 20:07 astubbs

Hi @astubbs actually sorry for confusion but I did not mean PC is skipping records. I meant if I use auto.offset.reset as latest instead of earliest. I guess that's not related to PC.

The issue I am seeing is once the PC is unable to commit the offset due to rebalancing or other reason then the current offset does not move. What I mean by does not move is I see that uncommitted offset in confluent cloud consumer dashboard. However that record is not being processed as duplicated.

jikeshdpatel avatar Jul 20 '22 21:07 jikeshdpatel

Hi @astubbs just an update on this. This happens when the last committed offset is out of range. So out of range check during start up and reset the offsets before starting PC seems to solve this problem.

        Properties consumerProperties = populateConsumerProperties(pcKafkaProperties);
        consumerProperties.put(MAX_POLL_RECORDS_CONFIG, 1);
        KafkaConsumer kafkaConsumer = new KafkaConsumer<>(consumerProperties);

        AtomicBoolean partitionsAssigned = new AtomicBoolean(false);
        try (kafkaConsumer) {
            kafkaConsumer.subscribe(List.of(pcKafkaProperties.getInputTopics().split(";")), new ConsumerRebalanceListener() {
                @Override
                public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                    logger.info("Partitions are revoked while resetting offset to earliest during start up {} ",
                            partitions);
                }

                @Override
                public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                    Map<TopicPartition, Long> earliestOffsets = new HashMap<>();
                    Map<TopicPartition, OffsetAndMetadata> committedOffsets = kafkaConsumer.committed(new HashSet<>(partitions));
                    Map<TopicPartition, OffsetAndMetadata> resetOffsets = new HashMap<>();
                    kafkaConsumer.seekToBeginning(partitions);
                    for (TopicPartition partition: partitions) {
                        long nextOffset = kafkaConsumer.position(partition); // get the beginning offset of the partition
                        logger.info("Partition {} position remote call is made and got nextOffset {} ",
                                partition, nextOffset);
                        earliestOffsets.put(partition, nextOffset);
                        if (committedOffsets == null ||
                                committedOffsets.get(partition) == null ||
                                nextOffset > committedOffsets.get(partition).offset()) {
                            // earliest offset is ahead of last committed offset, it means committed offset needs to be reset
                            resetOffsets.put(partition, new OffsetAndMetadata(nextOffset));
                        }
                    }
                    if (!resetOffsets.isEmpty()) {
                        kafkaConsumer.commitSync(resetOffsets);
                        logger.warn("Offsets are reset to {} . This should not happen in prod, please evaluate" +
                                " apps downtime", resetOffsets);
                    } else {
                        logger.info("All good! No need to reset offsets {} ", resetOffsets);
                    }
                    logger.info("Earliest offsets {} ", earliestOffsets);
                    logger.info("Last committed offsets {} ", committedOffsets);

                    partitionsAssigned.set(true);
                }
            });

            while (!partitionsAssigned.get()) {
                kafkaConsumer.poll(Duration.ofMillis(1000L));
                logger.info("Partitions are not yet assigned during offset reset step");
            }
        } finally {
            kafkaConsumer.close();
        }
    }

jikeshdpatel avatar Aug 01 '22 14:08 jikeshdpatel

I’m afraid this all might be been confused by issue #365 (Received duplicated records when rebalance occurs), which is fixed in release 0.5.2.1 (latest is 0.5.2.2).

What is the retention period of the input topic? OFFSET_OUT_OF_RANGE can be for an offset below what’s available too, not only above. OFFSET_OUT_OF_RANGE is a normal error.

The issue I see is that the current offset in confluent cloud > consumer lag

That statement doesn’t compile - consumer lag is the difference between the partition head offset, and the consumers position in the partition. You cannot compare those two things the way you are.

does not move when the parallel consumer instance failed to commit the processed records due to the partition was reassigned

That was the regression bug #365 :/ Sorry about that.

If last committed offset is reset and system is down for 24hrs then it will reprocess everything from earliest available event

Just to check - you know you can increase the offset expired time to whatever you want? It used to default to 24 hours, now it's 7 days, but a lot of people max it infinite and only delete them on request (in production).

And if you want this system, as per normal consumer setup, just set the policy as you have to earliest.

PC somehow knows that it was processed even though the offset commit was failed and don't reprocess

Ah yes, probably because an earlier offset commit’s metadata included the status of the completed records (in other words they were committed earlier in a metadata commit) - see https://github.com/confluentinc/parallel-consumer#offset_map for more information.

I meant if I use auto.offset.reset as latest instead of earliest. I guess that's not related to PC.

Yes, that’s unrelated, as above.

The issue I am seeing is once the PC is unable to commit the offset due to rebalancing or other reason then the current offset does not move. What I mean by does not move is I see that uncommitted offset in confluent cloud consumer dashboard. However that record is not being processed as duplicated.

Yeah its probably the issue as above, but as mentioned above, offsets may have been previously marked as completed in the metadata from older commits.

consumerProperties.put(MAX_POLL_RECORDS_CONFIG, 1);

Why do you set this to one?

code snippit

Do you have this in a repo i can see?

nextOffset > committedOffsets.get(partition).offset()) {
      // earliest offset is ahead of last committed offset, it means committed offset needs to be reset
      resetOffsets.put(partition, new OffsetAndMetadata(nextOffset));

This code seems odd, how is nextOffset the same as earliest? They are not.

Why do you think there would be a problem? Generally - position() returns the next record that would be returned by the broker, it is not the last committed offset. It is totally expected that #positioncould return an offset higher than#comitted` immediately after assignment - if a partition is reassigned to the same consumer, position() might be higher than committed. Record compaction, record retention periods etc. If this occurs, it does not need to be manually handled. simple the next commit will be correct.

Have a play with the latest version and let me know if you still see any issues.

astubbs avatar Aug 18 '22 11:08 astubbs