confluent-kafka-dotnet icon indicating copy to clipboard operation
confluent-kafka-dotnet copied to clipboard

How to deal with error code `Local_State` with reason `Erroneous state` after upgrade to 1.9.0 version

Open plachor opened this issue 2 years ago • 11 comments

Hi @mhowlett,

recently we observed offsets reset to earliest on random (partition, consumer-group) pairs in our production environment. This was a huge issue for us as we were flooded with millions of messages causing lag on our consumers. As we guarantee ordered processing without gaps this was a challenge for us.

We believe this was an outcome of bug fixed here: https://github.com/edenhill/librdkafka/pull/3774#issuecomment-1177475167 I've tried to ask there if for such conditions we are now guaranteed to observe exception but perhaps @edenhill is ignoring comments in merge requests that are already merged.

Description

Judging by your comments we think 1.9.0 address that case. So we performed an upgrade of Confluent.Kafka on our development environment and started testing.

After update we observer KafkaException with code: Local_State and reason: Erroneous state. From what I checked it correlates with RD_KAFKA_RESP_ERR__STATE which was involved in that merge request addressing: https://github.com/edenhill/librdkafka/pull/3774

How to reproduce

We use auto-commit.

After upgrade to 1.9.0 we observe KafkaException(Local_State) when calling StoreOffset in case of batch-processing. We already described our approach to batch processing here: https://github.com/confluentinc/confluent-kafka-dotnet/issues/1164#issuecomment-610308425.

We consume batch of messages (for a given time lets say a minute). Once we aggregate our batch we start processing after all messages are processed (it can take minutes) we start to store offsets (preserving order of consumption) for all consumed messages.

In mean time what happens is we loose partition assignments as we observe this KafkaException with code Local_State.

We see that this exception is not observed during aggregation of batch on consume phase which make us think this is probably silently handled underneath and subsequent consumes simply returns messages from partitions that are currently assigned.

Questions:

How should we deal with this exception:

  1. is it safe to log it and resume StoreOffset for other messages that were aggregated in batch and after that start aggregation of new batch by calling consume again?
  2. what will consumer instance do under hood once observers this exception during offsets store (will it empty locally fetched queues for unassigned partitions)?
  3. are we guaranteed that we will reprocess messages in order after this exception without unsubscribe of consumer instance that has observed this exception?
  4. could our approach lead us to error state when during batch aggregation and processing 2 re-balances would happen that would end with partition reassigned (but processed offset is not valid from perspective of current assignment)
  5. considering ad.4 is true we could have possible duplicates in aggregated batch. Would then last stored offset for a given partition-topic, always win when it comes to committed offsets?
  6. what was happening in such case before 1.9.0 we never observed errors before on StoreOffset (however un-assignments had to happen than too), could it result in gaps as instance that was not owning partition any more stored offsets that were not consumed/processed by current owner?
  7. is commit occurring during consume calls or is it totally separated thread and can occur totally apart from calling consume so we could end committing partially stored offsets of given aggregated batch?
  8. there are callback like SetPartitionsLostHandler can they be used along with auto commit just to notify our self that partitions were unassigned ?
  9. are all callbacks always triggered by calling consume or they can be triggered from different thread not during consume calls?

Checklist

Please provide the following information:

  • [+] A complete (i.e. we can run it), minimal program demonstrating the problem. No need to supply a project file.
  • [1.9.0 ] Confluent.Kafka nuget version.
  • [ 2.6.0] Apache Kafka version.
  • [-] Client configuration.
  • [Linux containers] Operating system.
  • [-] Provide logs (with "debug" : "..." as necessary in configuration).
  • [-] Provide broker log excerpts.
  • [-] Critical issue.

plachor avatar Jul 14 '22 09:07 plachor

I would really appreciate your thoughts here @mhowlett, recent issue with offsets reset exposed our vulnerability there and we wish to upgrade to 1.9.0 as soon as possible to mitigate chances of it occurring again.

On the other hand we would not like to fall in to a pitfall of misunderstanding how this error should be dealt with.

plachor avatar Jul 15 '22 12:07 plachor

I'm after another round of tests and got some new conclusions:

  1. since we use round-robin assignor during aggregation of batch I'm now interrupting consume when I detect that I lost assignments or I'm about to loose one to not even start processing current batch. Since round-robin is eager then there is no partial unassignment.
  2. after throwing an exception from SetPartitionsRevokedHandler or SetPartitionsLostHandler callbacks I have never managed to reproduce KafkaException with code Local_State no-matter how long I process batch which leads me to conclusion that StoreOffsets is stale after consume is not being called for awhile and in fact still allows to store offsets that are not being owned by current instance
  3. does auto-commit background process check current assignment or it will commit anything in stored offsets local queue. And that calling StoreOffset after processing of a batch (with no Consume calls during batch processing) will not protect us from committing illegal offsets to unassigned partitions leading to unordered messages

To observe that I'm consuming topic with lots of messages and lots of rebalnces in background so this are not regular conditions, however we can end up with them in rare cases when there are partial network partitions between cluster an consumer instances plus deployment.

plachor avatar Jul 19 '22 06:07 plachor

I confirm I have same error "Local: Erroneous state" after migrate to 1.9.0. In my case I dont see any offset reset, but we are getting messages duplicates , same partition, same offset, just after "Erroneous state" error is logged.

alfhv avatar Jul 19 '22 21:07 alfhv

Further conclusions regarding: https://github.com/confluentinc/confluent-kafka-dotnet/blob/master/src/Confluent.Kafka/ConsumerBuilder.cs#L432-L465 SetPartitionsRevokedHandler method on consumer builder:

 <summary>
     Specify a handler that will be called immediately prior to partitions being revoked
     from the consumer's current assignment, allowing the application to take action
     (e.g. commit offsets to a custom store) before the consumer gives up ownership of
     the partitions.

     Kafka supports two rebalance protocols: EAGER (range and roundrobin assignors) and
     COOPERATIVE (incremental) (cooperative-sticky assignor). Use the PartitionAssignmentStrategy
     configuration property to specify which assignor to use.

     ## EAGER Rebalancing (range, roundrobin)

     The second parameter provides the entire set of partitions the consumer is currently
     assigned to, and the current position of the consumer on each of these partitions.
     The consumer will stop consuming from all partitions following execution of this
     handler.

     ## COOPERATIVE (Incremental) Rebalancing

     The second parameter provides the subset of the partitions assigned to the consumer
     which are being revoked, and the current position of the consumer on each of these
     partitions. The consumer will stop consuming from this set of partitions following
     execution of this handler, and continue reading from any remaining partitions.
 </summary>
 <remarks>
     May execute as a side-effect of the Consumer.Consume/Close/Dispose call (on the same
     thread).

     (Incremental)Assign/Unassign must not be called in the handler.

     Exceptions: Any exception thrown by your partitions revoked handler will be wrapped
     in a ConsumeException with ErrorCode ErrorCode.Local_Application and thrown by the
     initiating call to Consume.
 </remarks>
  1. Reading this comment I understand that when round-robin assigner is used it will loose whole assignment, there is no way to loose it partially
  2. I imagine I should be unassigned already once I leave current consume call that triggered my callback
  3. If by any chance I throw an exception within that callback I should observe ConsumeException with ErrorCode ErrorCode.Local_Application with my custom exception within inner exception

But my observations show that for batch processing:

  • ad1) I'm able to reproduce gaps (gap = not even once processed offset) - that leaves me with only one conclusion that: locally fetched offsets are not properly erased and that if by any chance I end reassigned to same partition I might start consuming form point I was before I observed PartitionsRevoked
  • ad2) After observing PartitionsRevoked and consume call ended I still see old Assignment on consumer for short time. I would imagine that when I leave from consume call Assignment would be empty
  • ad3) However my exception is simply escalated above from consume call so I have doubts if this XML comment is correct at all

Finally I have tried to approach it the other way: Every time I observe PartitionsRevoked or PartitionsLost I perform manual seek to offsets that were last time successfully committed on broker before resuming consume calls something like:

var assigment = consumer.Assignment;
var committed = consumer.Committed(assigment, TimeSpan.FromSeconds(30));
foreach (var offset in committed)
{
    consumer.Seek(offset);
}

This approach has not resulted in gaps or unordered processing. It rolls back consumed offsets so I end with duplicates but I'm not jumping forward or missing any messages.

However I'm mostly afraid not to cause some cascading-rebalances here. As in my use case consumer group can scale dynamically during lifetime.

So if I understand your XML comment above SetPartitionsRevokedHandler I assume it should work by design as follows:

  1. since we use round-robin we simply throw on PartitionsRevoked and PartitionsLost callbacks
  2. we handle ConsumeException with ErrorCode ErrorCode.Local_Application with my inner exception
  3. we resume calling consume and should start from lastly committed offsets on newly assigned partitions

Any thoughts ?

plachor avatar Jul 21 '22 12:07 plachor

I also observed that even though I call consumer.Consume(TimeSpan.Zero) I was able to reproduce situation when either PartitionsRevoked or PartitionsLost occured and still consume returned record.

For such case when TimeSpan.Zero is passed I would assume consume will not return message.

plachor avatar Jul 22 '22 06:07 plachor

what I have observed so far: the consumer group receive a stock on messages. we can see logs on __commit_offset topics in kafka cluster, we can see logs on service side (kibana) we can see logs on kafka metrics (grafana)

some time later, about 1 hour, part of the stock is received again by the consumergroup, we can see logs on service side (kibana), we can see network activity on grafana, but: we dont see any commit on __commit_offset topics, we dont have any movement on kafka metrics on grafana.

question: it is possible that the consumer, for some unknown reason, start asking old already commited offsets and kafka deliver thoses messages without any trace on cluster side in __commit_offset logs and without altering the consumer offset (leaving no trace on kafka metrics) ???

alfhv avatar Jul 27 '22 13:07 alfhv

we spot the same issue in our setup as well starting from 1.9.0 release. @mhowlett, do you have any news about this?

vhatsura avatar Aug 06 '22 17:08 vhatsura

In the librdkafka PR linked to in the description, librdkafka was updated to cause this error if an attempt is made to commit offsets for partitions that aren't currently assigned. You could be seeing the error because you are doing that, or due to a bug in librdkafka that allows for this to happen (or some other reason, but the former seems likely given it was an update to librdkafka in 1.9.0). Either way, the only negative thing that will have happened (in the first two cases) is offsets aren't being committed and another consumer in the group will re-process the messages & you can safely just ignore the exception.

If you provide a small test application that demonstrates the issue, we'd likely get to looking at it sooner.

mhowlett avatar Aug 07 '22 12:08 mhowlett

I think this check is not well-compatible with existing consume-revoke-assign callback semantics under STW protocol. Under STW protocol each rebalancing involves revoke-assign pair for unchanged partition; if all partitions are revoked it appears we only get the revoke callback, and no assign callback. So, we only decisively know that a given partition is revoked after consume exits (i.e. we know that there won't be an assign callback for this partition within the same rebalancing round).

Given that most rebalancing rounds, most partitions do not move; and the round can last a long time; we currently continue background processing on already-read events during rebalancing opportunistically. We only stop when the partition is actually revoked. In most cases we find out during assign callback which partitions are actually revoked. However if there's no assign callback, we only stop revoked partition event processing after consume exits, and the background activity can call storeoffset in the time window between the end of rebalancing and this post-consume processing... previously this was handled internally, now it throws a very generic error... Do you have any advice in this? Can the error be made more specific to the condition (Local_UnknownPartition, or something like that)? Or perhaps assign callback can be called with an empty list for STW protocols when all partitions are revoked, although that could be disruptive to old code.

sershe-ms avatar Sep 21 '22 15:09 sershe-ms

I did not read your comment closely, but I certainly agree the API semantics we have here are crap.

Note: To get precisely the old behavior, just ignore the Erroneous state exception.

You might be interested in https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol - we will be implementing this.

mhowlett avatar Sep 21 '22 15:09 mhowlett

Update: while testing a mitigation I noticed the issue with the way we use it is not limited to the above narrow case, it even happens in the middle of the rebalancing round, forcing one (if not choosing to ignore the error, which is what "Failed" here does) to never attempt to store offset during consume callbacks... this is STW protocol so entirely within one Consume:

Native revoked callback with TestTopic,5 Failed to store 16582536 due to a concurrent revocation of TestTopic,5 Failed to store 16582537 due to a concurrent revocation of TestTopic,5 Native assigned callback with TestTopic,5 Ignoring revoke/assign pair for TestTopic,5

sershe-ms avatar Sep 22 '22 19:09 sershe-ms

it even happens in the middle of the rebalancing round

seems like that is expected?

i don't think there is likely an issue in librdkafka here (aside from the confusing semantics of course...), and i'm going through tidying up the community issues so closing this. if you believe there is a real issue, feel free to open again and try to provide some additional precise info as to expected and actual behavior.

mhowlett avatar Oct 03 '22 16:10 mhowlett

After trying to deploy at scale we still see this error even after consume, including when calling other APIs. This seems like it would not be handled by any amount of sync. May be related to paused and resumed partitions.

Scenarios I've discovered are:

  1. Sync doesn't appear to always work - we only ignore error during consume and reset the flag after consume exits; yet we still occasionally get erroneous state trying to store offset, I can see in logs it's after consume has already exited, rebalancing is done and client has the same partition still assigned.

  2. Also seek fails on consume thread, so no possible race with consume whatsoever. We pause partition 1; at some point call consume; rebalancing happens, we get callback that 1 is revoked, then that 1 is assigned; consume exits; we call resume on the same thread - no parallel consume is possible at this point - resume does not fail; we call seek on the same thread - seek fails with erroneous state. In this case we log whether partition is still in Assignment on error, and it still is.

sershe-ms avatar Nov 23 '22 03:11 sershe-ms

Given that a rebalance is not an error condition, I would expect to be able to commit messages (manually) that my application has consumed before the rebalance began. According to Confluent docs on eager (a.k.a. "stop the world") rebalancing:

Each consumer in the group has max.poll.interval.ms to wrap up their current processing and send their JoinGroup request.

With this library, it seems that I immediately hit the "Local_State" error instead of being allowed to wrap up processing. I understand I can ignore the error, but ideally, I would like to prevent the message from being processed again. Has anyone been able to achieve this using this library?

kmcclellan avatar Apr 16 '24 18:04 kmcclellan