alpakka-kafka icon indicating copy to clipboard operation
alpakka-kafka copied to clipboard

Consumer comes up with no assigned partitions

Open njhartwell opened this issue 7 years ago • 31 comments

We are using reactive-kafka (Using com.typesafe.akka:akka-stream-kafka_2.11:0.14) with kafka 10 brokers and are frequently seeing consumers come up (usually immediately following the termination of another consumer in the group) and having no partitions assigned. Logs look like this:

2017-05-11 01:55:49,394 INFO  o.a.k.c.consumer.ConsumerConfig - ConsumerConfig values:
	auto.commit.interval.ms = 5000
	auto.offset.reset = earliest
	bootstrap.servers = [192.168.1.1:9093, 192.168.1.2:9093, 192.168.1.3:9093]
	check.crcs = true
	client.id = consumer-1
	connections.max.idle.ms = 540000
	enable.auto.commit = false
	exclude.internal.topics = true
	fetch.max.bytes = 52428800
	fetch.max.wait.ms = 500
	fetch.min.bytes = 1
	group.id = my-group
	heartbeat.interval.ms = 3000
	interceptor.classes = null
	key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
	max.partition.fetch.bytes = 1048576
	max.poll.interval.ms = 300000
	max.poll.records = 500
	metadata.max.age.ms = 300000
	metric.reporters = [com.my.app.adapters.akkalib.metrics.KafkaDropwizardMetricsReporter]
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
	receive.buffer.bytes = 65536
	reconnect.backoff.ms = 50
	request.timeout.ms = 305000
	retry.backoff.ms = 100
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.mechanism = GSSAPI
	security.protocol = SSL
	send.buffer.bytes = 131072
	session.timeout.ms = 10000
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = null
	ssl.key.password = [hidden]
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = /etc/app/server.jks
	ssl.keystore.password = [hidden]
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = /etc/ssl/app.truststore.jks
	ssl.truststore.password = [hidden]
	ssl.truststore.type = JKS
	value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

2017-05-11 01:55:49,474 INFO  o.a.kafka.common.utils.AppInfoParser - Kafka version : 0.10.2.0
2017-05-11 01:55:49,477 INFO  o.a.kafka.common.utils.AppInfoParser - Kafka commitId : 576d93a8dc0cf421
2017-05-11 01:55:49,600 WARN  o.a.k.c.consumer.ConsumerConfig - The configuration 'metric.tag' was supplied but isn't a known config.
2017-05-11 01:55:49,600 INFO  o.a.kafka.common.utils.AppInfoParser - Kafka version : 0.10.2.0
2017-05-11 01:55:49,600 INFO  o.a.kafka.common.utils.AppInfoParser - Kafka commitId : 576d93a8dc0cf421
2017-05-11 01:55:51,899 INFO  o.a.k.c.c.i.AbstractCoordinator - Discovered coordinator foo.bar.baz:9093 (id: 123 rack: null) for group my-group.
2017-05-11 01:55:51,976 INFO  o.a.k.c.c.i.ConsumerCoordinator - Revoking previously assigned partitions [] for group my-group
2017-05-11 01:55:51,978 INFO  o.a.k.c.c.i.AbstractCoordinator - (Re-)joining group my-group
2017-05-11 01:55:52,801 WARN  akka.kafka.KafkaConsumerActor - Consumer interrupted with WakeupException after timeout. Message: null. Current value of akka.kafka.consumer.wakeup-timeout is 3000 milliseconds
2017-05-11 01:55:52,995 INFO  o.a.k.c.c.i.AbstractCoordinator - Successfully joined group my-group with generation 1
2017-05-11 02:52:49,210 INFO  c.m.a.GoogleCredentialService - Refreshing Google auth token
2017-05-11 02:52:49,404 INFO  c.m.a.GoogleCredentialService - Refreshed Google auth token, scheduling to refresh again in 3420s

Relevant code looks like this:

Consumer.committablePartitionedSource(settings, Subscriptions.topics(config.getTopic())).watchTermination(Keep.both())
                .via(killSwitch.flow())
                .via(sourceFilter.limiterFlow())
                .flatMapMerge(1000, r -> r.second()
                        .filter(this::filter)
                        .map(this::parseRecord)
                        .filter(Optional::isPresent)
                        .map(Optional::get)
                        .mapAsync(1, this::sendRequest)
                        .map(this::handleResponse))
                .map(StreamMessage::getKafkaMessage)
                .batch(
                        1000,
                        first -> ConsumerMessage.emptyCommittableOffsetBatch().updated(first.committableOffset()),
                        (batch, elem) -> batch.updated(elem.committableOffset()))
                .mapAsync(10, ConsumerMessage.Committable::commitJavadsl)
                .runWith(Sink.ignore(), materializer);

My apologies for lack of runnable test case, etc. (that will soon follow if need be); just wanted to get this out there in case there is a known issue behind it or something obviously wrong with how we're using the library. Thanks!

njhartwell avatar May 11 '17 16:05 njhartwell

We are seeing this sometimes as well. I think that the wake up time out should be much higher. It currently is 3 seconds, but the default heartbeat in kafka is also 3 seconds. The heartbeat frequency is a factor in how long it takes to rebalance so it makes sense to me that the wake up time out should be significantly higher, perhaps close to the default session timeout of 10 seconds.

jpeel avatar Jun 15 '17 20:06 jpeel

I'm facing something similar issue in my project:

09:11:43.738 [test.kafka.default-dispatcher-22] DEBUG o.a.k.c.c.i.ConsumerCoordinator - Group test committed offset 17872622155 for partition test-0
09:11:43.753 [test.kafka.default-dispatcher-14] INFO  o.a.k.clients.producer.KafkaProducer - Closing the Kafka producer with timeoutMillis = 60000 ms.
09:11:43.755 [kafka-producer-network-thread | producer-1] DEBUG o.a.k.c.producer.internals.Sender - Beginning shutdown of Kafka producer I/O thread, sending remaining records.
09:11:43.759 [test.actor.default-dispatcher-48] DEBUG akka.kafka.internal.ProducerStage - Stage completed
09:11:43.762 [kafka-producer-network-thread | producer-1] DEBUG o.a.k.c.producer.internals.Sender - Shutdown of Kafka producer I/O thread has completed.
09:11:43.762 [test.kafka.default-dispatcher-14] DEBUG o.a.k.clients.producer.KafkaProducer - The Kafka producer has closed.
09:11:43.763 [test.actor.default-dispatcher-48] DEBUG akka.kafka.internal.ProducerStage - Producer closed

This happens randomly after couple of hours (or even days) of running. The reason looks to be this:

Exception in thread "main" java.lang.IllegalStateException: No current assignment for partition test-0
	at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:264)
	at org.apache.kafka.clients.consumer.internals.SubscriptionState.committed(SubscriptionState.java:269)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:739)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:716)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:784)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:765)
	at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186)
	at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149)
	at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:493)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:322)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:253)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:263)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:907)

(com.typesafe.akka:akka-stream-kafka_2.12:0.15 with kafka brokers 0.10)

romanwozniak avatar Jun 19 '17 10:06 romanwozniak

+1

I experience exactly the same problem quite randomly after wakeup:

07/13/2017 23:37:43.090|INFO ||||org.apache.kafka.common.config.AbstractConfig.logAll(AbstractConfig.java:180)|ProducerConfig values: 
	acks = 1
	batch.size = 16384
	block.on.buffer.full = false
	bootstrap.servers = [kafka01-stage1.messagehub.services.us-south.bluemix.net:9093, kafka02-stage1.messagehub.services.us-south.bluemix.net:9093, kafka03-stage1.messagehub.services.us-south.bluemix.net:9093, kafka04-stage1.messagehub.services.us-south.bluemix.net:9093, kafka05-stage1.messagehub.services.us-south.bluemix.net:9093]
	buffer.memory = 33554432
	client.id = 
	compression.type = none
	connections.max.idle.ms = 540000
	interceptor.classes = null
	key.serializer = class org.apache.kafka.common.serialization.StringSerializer
	linger.ms = 0
	max.block.ms = 60000
	max.in.flight.requests.per.connection = 5
	max.request.size = 1048576
	metadata.fetch.timeout.ms = 60000
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.sample.window.ms = 30000
	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
	receive.buffer.bytes = 32768
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retries = 0
	retry.backoff.ms = 100
	sasl.jaas.config = [hidden]
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.mechanism = PLAIN
	security.protocol = SASL_SSL
	send.buffer.bytes = 131072
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2]
	ssl.endpoint.identification.algorithm = HTTPS
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLSv1.2
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	timeout.ms = 30000
	value.serializer = class org.apache.kafka.common.serialization.StringSerializer

07/13/2017 23:37:43.094|INFO ||||org.apache.kafka.common.utils.AppInfoParser$AppInfo.<init>(AppInfoParser.java:83)|Kafka version : 0.10.2.1
07/13/2017 23:37:43.098|INFO ||||org.apache.kafka.common.utils.AppInfoParser$AppInfo.<init>(AppInfoParser.java:84)|Kafka commitId : e89bffd6b2eff799
07/13/2017 23:37:43.099|INFO ||||org.apache.kafka.common.utils.AppInfoParser$AppInfo.<init>(AppInfoParser.java:83)|Kafka version : 0.10.2.1
07/13/2017 23:37:43.099|INFO ||||org.apache.kafka.common.utils.AppInfoParser$AppInfo.<init>(AppInfoParser.java:84)|Kafka commitId : e89bffd6b2eff799

07/13/2017 23:37:44.534|INFO ||||org.apache.kafka.clients.consumer.internals.AbstractCoordinator$GroupCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:586)|Discovered coordinator kafka05-stage1.messagehub.services.us-south.bluemix.net:9093 (id: 2147483643 rack: null) for group test-UsageMetricsSpec-group-1.
07/13/2017 23:37:44.539|INFO ||||org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:397)|Revoking previously assigned partitions [] for group test-UsageMetricsSpec-group-1
07/13/2017 23:37:44.539|INFO ||||org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest(AbstractCoordinator.java:420)|(Re-)joining group test-UsageMetricsSpec-group-1
[WARN] [07/13/2017 23:37:46.191] [ml-event-client-units-akka.kafka.default-dispatcher-20] [akka://ml-event-client-units/system/kafka-consumer-2] Consumer interrupted with WakeupException after timeout. Message: null. Current value of akka.kafka.consumer.wakeup-timeout is 3000 milliseconds
07/13/2017 23:37:46.640|INFO ||||org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:388)|Successfully joined group test-UsageMetricsSpec-group-1 with generation 5

After: Successfully joined group test-UsageMetricsSpec-group-1 with generation 5 consumer gets stuck forever

rafalbigaj avatar Jul 13 '17 21:07 rafalbigaj

In my case increasing wakeup timeout didn't help. Consumers get stuck rarely but they still get stuck. It is a big issue because Kafka brokers think partitions are assign to the consumer. The only way to "unclog" the consumer is to trigger a group rebalance. Is the way reactive-kafka uses a wakeup mechanism an issue or vanilla kafka consumer implementation?

wojda avatar Aug 05 '17 22:08 wojda

I am also facing this issue. Thrice I saw in the past month. The rebalance happens frequently in my application. But at times, I see 0 partitions assinged (I subsrcibed to 1 partition)

Does it take time to get update that the partition is already revoked ?? Should we poll again , if this arises ??

anilkumar763 avatar Aug 07 '17 12:08 anilkumar763

I am pretty confident that the issue is that the poll initiated by the KafkaConsumerActor is grabbing the metadata and joining the group, but the wakeupexception interrupts this process after the group is joined but before the callbacks (ConsumerRebalanceListener) are called which are used to tell the SingleSourceLogic stuff which topics are subscribed to. The SingleSourceListener makes requests to the KafkaConsumerActor for the topic partitions that it knows about.. but they either aren't the right ones or are empty because the callback was never called. To be clear, the KafkaConsumer has been assigned partitions in this case, but because the SingleSourceLogic doesn't know about them, they are never consumed from.

I'm not fully sure how the code should be changed to deal with this problem, but the workaround we are implementing right now is to set the wake up timeout a little longer (6 seconds or so), set the max wakeup timeouts to 1, and restart the stream whenever the max wakeup timeouts is encountered.

jpeel avatar Aug 08 '17 16:08 jpeel

We're facing the same issue.

We have a service which sometime stops receiving data right after a WakeupException. The issue doesn't happen on every WakeupException, but when it happens it's always after a WakeupException.

The explanation of @jpeel fit pretty well with our observation.

We'll try and implement the proposed workaround, but a more structured way of solving the issue (without restarting the stream) would be definitely appreciated.

Has there been any activity on this issue?

nivox avatar Oct 27 '17 09:10 nivox

Ping @patriknw @kciesielski. Given you're the last to have worked on the code, do you have any insights?

nivox avatar Oct 27 '17 09:10 nivox

Defaults:

wakeup-timeout = 3s
max-wakeups = 10

So you mean that increasing the wakeup-timeout to let's say 9s would be different than trying 3 times with 3s?

The reason it's bad to have long timeout is that it's blocking a thread during that time, and that can easily result in starvation and unresponsiveness of other streams running on the same dispatcher. But sure, if that solves the problem please go ahead and create a PR.

Note that the reason for retrying at all, compared to just failing the stream immediately and only rely on stream restart is that then it can hopefully survive short glitches without loosing the state.

patriknw avatar Oct 31 '17 14:10 patriknw

@patriknw I think the point of @jpeel was that sometimes when the timeout occurs it somehow leaves Kafka consumer in a state where no messages are being received (even though there are messages available) and the stream becomes stuck. The proposed workaround of raising the timeout and reset the stream was to circumvent the problem but indeed it is NOT a solution.

I think that the retry mechanism is the right way to go, as you said it would prevent losing the stream state. Given the thesis of @jpeel on why the streams gets stuck, do you think it is plausible? Do you have any idea on how to avoid the issue while maintaining the retry mechanism?

I'm willing to try and put together a PR, but I don't really know the internals so any guidance would be welcome.

nivox avatar Oct 31 '17 15:10 nivox

Ok, read his comment again. Is there a way we can reset things completely after a wakeup interupt? So that it is treated as a new consumer in next attempt? Or can we get the assigned partitions with a merhod call after wakeup, since we might have missed the callback. Other ideas?

patriknw avatar Nov 07 '17 20:11 patriknw

@patriknw @jpeel I've tried to put together an implementation of the first strategy you proposed: reset things completely after a wakeup. commit

The base idea is to just unsubscribe and re-subscribe right away to all the topics. This way the assignment callbacks will be called and the state of the Single/SubSourceLogic should be consistent. Note that I had to introduce another callback just to clean the state of the Single/SubSourceLogic to make things consistent.

While I was working on this it came to me that another strategy we could use is to:

  • register current assignments before poll()
  • on wakeup ask again the assignments
  • manually call callbacks for new/removed TopicPartition assignment

This strategy would still require to keep some book keeping on the KafkaConsumerActor side but would free the Single/SubSourceLogic from any change. This requires that the internal state of the KafkaConsumer is not corrupted after the wakeup, which I think is a reasonable assumption to make.

I'll try to work on this approach in the next few days, in the meantime could someone take a look at the current proposal and give me some feedback?

nivox avatar Nov 08 '17 16:11 nivox

@patriknw @jpeel I've implemented the alternative strategy of reconcile the TopicPartition assignments on WakeupException. commit

As I stated in the previous comment this strategy tries to reconcile the new KafkaConsumer assignments with the Source's by manually calling the RebalanceListener callbacks for all revoked/added TopicPartition assignment.

Can someone take a look at both approaches and give some feedback? For both solution I've tested that everything works in the normal case but I'm not sure how to manually cause the issue (stopping the Kafka broker will not work as the consumer detects that the broker is not there and won't follow the code path leading to the wakeup). Any ideas on how to test this in a controlled environment?

nivox avatar Nov 09 '17 08:11 nivox

If we compare a normal scenario where the partitions get assigned

INFO  [2017-10-22 09:16:48,855] [ConsumerCoordinator:256] [] [] - Setting newly assigned partitions [coffee_manager_events_1.0-2, coffee_manager_events_1.0-1, coffee_manager_events_1.0-0, coffee_manager_events_1.0-4, coffee_manager_events_1.0-3] for group coffee-manager
INFO  [2017-10-22 09:16:48,855] [AbstractCoordinator:388] [] [] - Successfully joined group coffee-manager with generation 9
INFO  [2017-10-22 09:16:48,852] [AbstractCoordinator:420] [] [] - (Re-)joining group coffee-manager
INFO  [2017-10-22 09:16:47,504] [AbstractCoordinator:420] [] [] - (Re-)joining group coffee-manager
INFO  [2017-10-22 09:16:47,504] [ConsumerCoordinator:397] [] [] - Revoking previously assigned partitions [] for group coffee-manager

bad scenario where it happens

INFO  [2017-10-22 09:17:53,401] [AbstractCoordinator:388] [] [] - Successfully joined group coffee-manager with generation 11
WARN  [2017-10-22 09:17:53,401] [KafkaConsumerActor:78] [coffeemanager-akka.kafka.default-dispatcher-62041] [akka.tcp://[email protected]/system/kafka-consumer-9] - Consumer interrupted with WakeupException after timeout. Message: null. Current value of akka.kafka.consumer.wakeup-timeout is 3000 milliseconds
INFO  [2017-10-22 09:17:53,377] [AbstractCoordinator:388] [] [] - Successfully joined group coffee-manager with generation 11
INFO  [2017-10-22 09:17:52,959] [AbstractCoordinator:420] [] [] - (Re-)joining group coffee-manager
INFO  [2017-10-22 09:17:50,433] [AbstractCoordinator:420] [] [] - (Re-)joining group coffee-manager
INFO  [2017-10-22 09:17:50,433] [ConsumerCoordinator:397] [] [] - Revoking previously assigned partitions [] for group coffee-manager

It for sure points to some floppy consumers but wondering if reactive-kafka can be more resilient to this? One way to self-check would be to use the driver consumer-coordinator-metrics and get the assigned partitions count which should subscribed to go up within a certain period and if not perhaps let it escalate to the supervisor?

schrepfler avatar Nov 16 '17 10:11 schrepfler

@schrepfler wouldn't the strategy I proposed in the last message fix the problem?

nivox avatar Nov 16 '17 16:11 nivox

Without knowing too many Kafka internals, I think WakeupException_s_ are not such a rare events in reality, is it acceptable to reconcile all the time? I've noticed some callbacks being created and you mention this Single/SubSourceLogic, is that the best/only technique we have at the driver level? There's no risk of getting in some bad state, or race condition, or memory leak?

schrepfler avatar Nov 16 '17 22:11 schrepfler

@nivox The second alternative looks attractive since it contains the problem inside the KakaConsumerActor. I don't know if there are any drawbacks of always doing that for the WakeupExc.

patriknw avatar Nov 20 '17 11:11 patriknw

@nivox i like the second solution. I rebased it and added some more logging around wake up exceptions. Can you review https://github.com/akka/reactive-kafka/pull/395 ?

chbatey avatar Jan 25 '18 10:01 chbatey

@chbatey: Thanks for taking the issue forward, I was a little bit unsure on how to proceed due to the lack of feedback.

I've taken a look at the pull request and it seems good to me.

nivox avatar Jan 25 '18 13:01 nivox

Any update on this? We are facing the same issue, increasing the wakeup timeout to 6s and setting the the max to 1 seems to resolve the issue but doesn't feel really solid to me.

Am I correct to assume that MR https://github.com/akka/reactive-kafka/pull/395 did not fully resolve the issue?

giejay avatar Mar 26 '18 10:03 giejay

We'd like to understand if people still experience this with 0.20 and 0.21.

ennru avatar Jun 12 '18 12:06 ennru

I had this issue on 0.16 and was fortunate to catch the 0.22 on the release day. I haven't seen the issue since.

Hallborg avatar Aug 13 '18 13:08 Hallborg

Thank you for this update. Anyone else?

ennru avatar Aug 20 '18 14:08 ennru

Im using 0.22 and i'm having the same issue.

I'll try with changing the settings.

sudpaw avatar Sep 26 '18 10:09 sudpaw

@ennru, we just encountered this issue using v0.22. We stopped consuming a subset of topic partitions after the error: KafkaConsumer poll has exceeded wake up timeout (3 seconds). Waking up consumer to avoid thread starvation. We were using the default configurations.

We have encountered the wake-up timeout error 60 times in the past 30 days, but only once did it stop consuming.

Anecdotally, we used to encounter this issue more often on older versions of alpakka-kafka. We would also usually lose all topic partitions, not just a subset. As a workaround, we added an idleTimeout flow element to terminate and restart the stream if it stopped consuming all topic partitions, but of course that didn't work when only losing a subset of them.

We are looking at #614 in v1.0 to ultimately resolve this issue, but we are interested if any workarounds have been effective.

breckcs avatar Nov 28 '18 17:11 breckcs

Thank you for reporting that here. So some state in the actor or Kafka client seems to be lost upon wake-up in certain cases. I'm not aware of a workaround for this. We've improved the error-handling around polling in 1.0-M1 a bit more, but you're right #614 would make the wakeup-dance go away completely.

ennru avatar Nov 29 '18 07:11 ennru

@ennru any idea when version 1.0 will be production ready?. I see that for now is just RC version.

politrons avatar Feb 11 '19 13:02 politrons

It is considered production ready. We do not expect any significant changes and might call it 1.0 before the end of March 2019.

ennru avatar Feb 11 '19 13:02 ennru

I have the same problem. After investigation, I found that there are two consumers.I hope my ideas can help you. Command used: bin/kafka-consumer-groups.sh --bootstrap-server [your kafka server] --group [your group name] --describe

ValueYouth avatar May 27 '19 02:05 ValueYouth

Thank you @ValueYouth, we haven't found any evidence that this is caused by Alpakka Kafka, so your hint might help others to identify the problem.

ennru avatar May 27 '19 05:05 ennru