kafka icon indicating copy to clipboard operation
kafka copied to clipboard

Promise attempted to resolve multiple times?

Open scmorse opened this issue 8 years ago • 51 comments

We use no-kafka at my workplace (currently using version 3.0.1), and we are seeing a strange issue:

TypeError: Cannot read property 'resolve' of undefined
 at Connection._receive (/app/node_modules/tns-kafka/node_modules/no-kafka/lib/connection.js:199:30)
 at Connection._receive (/app/node_modules/tns-kafka/node_modules/no-kafka/lib/connection.js:203:14)
 at emitOne (events.js:77:13)
 at Socket.emit (events.js:169:7)
 at readableAddChunk (_stream_readable.js:153:18)
 at Socket.Readable.push (_stream_readable.js:111:10)
 at TCP.onread (net.js:531:20)

The issue happens regularly, 10-50 times per hour, in all deployed instances of our service (I have not been able to reproduce this locally, but my local setup only uses 1 broker and a replication factor of 1). I am curious to know if the package creators have any idea why this might be happening, and what the impact might be.

From looking at the code, it seems that either a kafka message is being sent which the library does not expect a response for but is receiving one, OR kafka is sending multiple responses for the same message.

scmorse avatar Feb 13 '17 03:02 scmorse

What is your cluster setup? Any complex routing?

oleksiyk avatar Feb 13 '17 06:02 oleksiyk

I talked to our dev-ops person, it seems that our development cluster has 3 brokers and a replication factor of 1.

I copied this repo, put some debug lines in, deployed it to our development environment, and the results are interesting:

[13/Feb/2017:19:43:41.589 +00:00] [] [info] - AY763536 connectionId:4 attempted to resolve message with correlationId:8 but no resolve found!
[13/Feb/2017:19:43:41.592 +00:00] [] [info] - AY763536 connectionId:5 attempted to resolve message with correlationId:8 but no resolve found!
[13/Feb/2017:19:43:41.593 +00:00] [] [info] - AY763536 connectionId:6 attempted to resolve message with correlationId:8 but no resolve found!
[13/Feb/2017:19:46:36.174 +00:00] [] [info] - AY763536 connectionId:4 attempted to resolve message with correlationId:8 but no resolve found!
[13/Feb/2017:19:46:36.178 +00:00] [] [info] - AY763536 connectionId:5 attempted to resolve message with correlationId:8 but no resolve found!
[13/Feb/2017:19:46:36.180 +00:00] [] [info] - AY763536 connectionId:6 attempted to resolve message with correlationId:8 but no resolve found!

In connection.js, I made each Connection instance have a unique connectionId. There are only 3 connections that log messages for receiving duplicate responses, so I'm guessing that each connection corresponds to a connection to one of the 3 brokers.

Looking at the relevant sent/received messages for one of the connections:

[13/Feb/2017:19:43:41.585 +00:00] [] [info] - AY763536 connectionId:4 sending message with correlationId:8, messageHex:00100000000000080010746e732d6b61666b612d636c69656e74, message:\x00\x00\x00\x00\x00\x00\x00tns-kafka-client
[13/Feb/2017:19:43:41.588 +00:00] [] [info] - AY763536 connectionId:4 received a reply for correlationId:8 resultHex:0000000800000000000a002e71612d666f7267652d65732d696e64657865722d61737365742d7461672d6368616e67652d636f6e73756d6572730008636f6e73756d6572003071612d646576656c6f702d65732d696e64657865722d61737365742d7461672d6368616e67652d636f6e73756d6572730008636f6e73756d657200216465762d7363616e2d6d6172742d646973636f766572792d636f6e73756d6572730008636f6e73756d6572002e6465762d73736f722d65732d696e64657865722d61737365742d7461672d6368616e67652d636f6e73756d6572730008636f6e73756d657200246465762d646576656c6f702d6d6172742d646973636f766572792d636f6e73756d6572730008636f6e73756d6572002171612d666f7267652d6d6172742d646973636f766572792d636f6e73756d6572730008636f6e73756d6572003c71612d646576656c6f702d74616767696e672d706c7567696e2d61737365742d6173736f63696174696f6e2d6368616e67652d636f6e73756d6572730008636f6e73756d6572002e6465762d7363616e2d65732d696e64657865722d61737365742d7461672d6368616e67652d636f6e73756d6572730008636f6e73756d6572003c6465762d7265706f72742d74616767696e672d706c7567696e2d61737365742d6173736f63696174696f6e2d6368616e67652d636f6e73756d6572730008636f6e73756d6572002471612d666f7267652d65732d696e64657865722d6368616e67652d636f6e73756d6572730008636f6e73756d6572, result:\x00\x00\x00\x00\x00\x00\x00\x00 \x00.qa-forge-es-indexer-asset-tag-change-consumers\x00consumer\x000qa-develop-es-indexer-asset-tag-change-consumers\x00consumer\x00!dev-scan-mart-discovery-consumers\x00consumer\x00.dev-ssor-es-indexer-asset-tag-change-consumers\x00consumer\x00$dev-develop-mart-discovery-consumers\x00consumer\x00!qa-forge-mart-discovery-consumers\x00consumer\x00<qa-develop-tagging-plugin-asset-association-change-consumers\x00consumer\x00.dev-scan-es-indexer-asset-tag-change-consumers\x00consumer\x00<dev-report-tagging-plugin-asset-association-change-consumers\x00consumer\x00$qa-forge-es-indexer-change-consumers\x00consumer
[13/Feb/2017:19:43:41.588 +00:00] [] [info] - AY763536 connectionId:4 received a reply for correlationId:8 resultHex:0000000800000000000a002e71612d666f7267652d65732d696e64657865722d61737365742d7461672d6368616e67652d636f6e73756d6572730008636f6e73756d6572003071612d646576656c6f702d65732d696e64657865722d61737365742d7461672d6368616e67652d636f6e73756d6572730008636f6e73756d657200216465762d7363616e2d6d6172742d646973636f766572792d636f6e73756d6572730008636f6e73756d6572002e6465762d73736f722d65732d696e64657865722d61737365742d7461672d6368616e67652d636f6e73756d6572730008636f6e73756d657200246465762d646576656c6f702d6d6172742d646973636f766572792d636f6e73756d6572730008636f6e73756d6572002171612d666f7267652d6d6172742d646973636f766572792d636f6e73756d6572730008636f6e73756d6572003c71612d646576656c6f702d74616767696e672d706c7567696e2d61737365742d6173736f63696174696f6e2d6368616e67652d636f6e73756d6572730008636f6e73756d6572002e6465762d7363616e2d65732d696e64657865722d61737365742d7461672d6368616e67652d636f6e73756d6572730008636f6e73756d6572003c6465762d7265706f72742d74616767696e672d706c7567696e2d61737365742d6173736f63696174696f6e2d6368616e67652d636f6e73756d6572730008636f6e73756d6572002471612d666f7267652d65732d696e64657865722d6368616e67652d636f6e73756d6572730008636f6e73756d6572, result:\x00\x00\x00\x00\x00\x00\x00\x00 \x00.qa-forge-es-indexer-asset-tag-change-consumers\x00consumer\x000qa-develop-es-indexer-asset-tag-change-consumers\x00consumer\x00!dev-scan-mart-discovery-consumers\x00consumer\x00.dev-ssor-es-indexer-asset-tag-change-consumers\x00consumer\x00$dev-develop-mart-discovery-consumers\x00consumer\x00!qa-forge-mart-discovery-consumers\x00consumer\x00<qa-develop-tagging-plugin-asset-association-change-consumers\x00consumer\x00.dev-scan-es-indexer-asset-tag-change-consumers\x00consumer\x00<dev-report-tagging-plugin-asset-association-change-consumers\x00consumer\x00$qa-forge-es-indexer-change-consumers\x00consumer
[13/Feb/2017:19:43:41.589 +00:00] [] [info] - AY763536 connectionId:4 attempted to resolve message with correlationId:8 but no resolve found!
[13/Feb/2017:19:46:36.171 +00:00] [] [info] - AY763536 connectionId:4 sending message with correlationId:8, messageHex:00100000000000080010746e732d6b61666b612d636c69656e74, message:\x00\x00\x00\x00\x00\x00\x00tns-kafka-client
[13/Feb/2017:19:46:36.174 +00:00] [] [info] - AY763536 connectionId:4 received a reply for correlationId:8 resultHex:0000000800000000000a002e71612d666f7267652d65732d696e64657865722d61737365742d7461672d6368616e67652d636f6e73756d6572730008636f6e73756d6572003071612d646576656c6f702d65732d696e64657865722d61737365742d7461672d6368616e67652d636f6e73756d6572730008636f6e73756d657200216465762d7363616e2d6d6172742d646973636f766572792d636f6e73756d6572730008636f6e73756d6572002e6465762d73736f722d65732d696e64657865722d61737365742d7461672d6368616e67652d636f6e73756d6572730008636f6e73756d657200246465762d646576656c6f702d6d6172742d646973636f766572792d636f6e73756d6572730008636f6e73756d6572002171612d666f7267652d6d6172742d646973636f766572792d636f6e73756d6572730008636f6e73756d6572003c71612d646576656c6f702d74616767696e672d706c7567696e2d61737365742d6173736f63696174696f6e2d6368616e67652d636f6e73756d6572730008636f6e73756d6572002e6465762d7363616e2d65732d696e64657865722d61737365742d7461672d6368616e67652d636f6e73756d6572730008636f6e73756d6572003c6465762d7265706f72742d74616767696e672d706c7567696e2d61737365742d6173736f63696174696f6e2d6368616e67652d636f6e73756d6572730008636f6e73756d6572002471612d666f7267652d65732d696e64657865722d6368616e67652d636f6e73756d6572730008636f6e73756d6572, result:\x00\x00\x00\x00\x00\x00\x00\x00 \x00.qa-forge-es-indexer-asset-tag-change-consumers\x00consumer\x000qa-develop-es-indexer-asset-tag-change-consumers\x00consumer\x00!dev-scan-mart-discovery-consumers\x00consumer\x00.dev-ssor-es-indexer-asset-tag-change-consumers\x00consumer\x00$dev-develop-mart-discovery-consumers\x00consumer\x00!qa-forge-mart-discovery-consumers\x00consumer\x00<qa-develop-tagging-plugin-asset-association-change-consumers\x00consumer\x00.dev-scan-es-indexer-asset-tag-change-consumers\x00consumer\x00<dev-report-tagging-plugin-asset-association-change-consumers\x00consumer\x00$qa-forge-es-indexer-change-consumers\x00consumer
[13/Feb/2017:19:46:36.174 +00:00] [] [info] - AY763536 connectionId:4 received a reply for correlationId:8 resultHex:0000000800000000000a002e71612d666f7267652d65732d696e64657865722d61737365742d7461672d6368616e67652d636f6e73756d6572730008636f6e73756d6572003071612d646576656c6f702d65732d696e64657865722d61737365742d7461672d6368616e67652d636f6e73756d6572730008636f6e73756d657200216465762d7363616e2d6d6172742d646973636f766572792d636f6e73756d6572730008636f6e73756d6572002e6465762d73736f722d65732d696e64657865722d61737365742d7461672d6368616e67652d636f6e73756d6572730008636f6e73756d657200246465762d646576656c6f702d6d6172742d646973636f766572792d636f6e73756d6572730008636f6e73756d6572002171612d666f7267652d6d6172742d646973636f766572792d636f6e73756d6572730008636f6e73756d6572003c71612d646576656c6f702d74616767696e672d706c7567696e2d61737365742d6173736f63696174696f6e2d6368616e67652d636f6e73756d6572730008636f6e73756d6572002e6465762d7363616e2d65732d696e64657865722d61737365742d7461672d6368616e67652d636f6e73756d6572730008636f6e73756d6572003c6465762d7265706f72742d74616767696e672d706c7567696e2d61737365742d6173736f63696174696f6e2d6368616e67652d636f6e73756d6572730008636f6e73756d6572002471612d666f7267652d65732d696e64657865722d6368616e67652d636f6e73756d6572730008636f6e73756d6572, result:\x00\x00\x00\x00\x00\x00\x00\x00 \x00.qa-forge-es-indexer-asset-tag-change-consumers\x00consumer\x000qa-develop-es-indexer-asset-tag-change-consumers\x00consumer\x00!dev-scan-mart-discovery-consumers\x00consumer\x00.dev-ssor-es-indexer-asset-tag-change-consumers\x00consumer\x00$dev-develop-mart-discovery-consumers\x00consumer\x00!qa-forge-mart-discovery-consumers\x00consumer\x00<qa-develop-tagging-plugin-asset-association-change-consumers\x00consumer\x00.dev-scan-es-indexer-asset-tag-change-consumers\x00consumer\x00<dev-report-tagging-plugin-asset-association-change-consumers\x00consumer\x00$qa-forge-es-indexer-change-consumers\x00consumer
[13/Feb/2017:19:46:36.174 +00:00] [] [info] - AY763536 connectionId:4 attempted to resolve message with correlationId:8 but no resolve found!

So the same connection object is sending two messages with the same correlation ID (strange, right? The messages are sent 3 minutes apart), and each time, we receive 2 replies. The second reply seems to have the exact same contents as the first reply, so that fact that the second reply has essentially been ignored seems okay.

scmorse avatar Feb 13 '17 21:02 scmorse

Is it producer? Simple or Group consumer? Can you share the code?

oleksiyk avatar Feb 14 '17 05:02 oleksiyk

Also, is it SSL connection or not? Can you show initial (connecting/connected) log messages?

oleksiyk avatar Feb 14 '17 14:02 oleksiyk

Is it producer? Simple or Group consumer? Can you share the code?

I can't really share the code, but we do use a GroupConsumer and a Producer, although I think the GroupConsumer is where these issues are happening. The Consumer uses a DefaultAssignmentStrategy. Our connectionString is of the form ip1:port,ip2:port,ip3:port. Our topics have 100 partitions, and 2 group consumers.

Also, is it SSL connection or not? Can you show initial (connecting/connected) log messages?

No SSL. And I put the log level to 4 (debug), but I don't see any (connecting/connected) log messages.


I confirmed that each of the connections that are having this issue with receiving duplicate responses are stored in the brokerConnections object in client.js.

It is unexpected, though, that an instance of Connection would send two messages with the same correlationId, isn't it? It seems to be the exact same message, but sent 3 minutes apart, as shown in the logs in my previous comment. And each time the message is sent, it gets 2 or more replies.

I also see about 40 different instances of the Connection object being opened. Does this seem normal, or higher than you would expect?

Possibly related to this error, the Kafka logs show some rebalancing, which I'm not entirely sure why is happening, since we have fairly stable consumers that don't go on/off line all that often, and I don't think the brokers are changing very frequently either.

I would like to try to bring up a Kafka cluster with 3 brokers on my local machine using docker, and make a small code sample to see if the error is reproducible, but that would take some time that I can't really commit to this issue right now. For the time being, the error seems benign, so we are ignoring it.

scmorse avatar Feb 15 '17 20:02 scmorse

Side note, you don't happen to have any plans to implement an equivalent of the ConsumerRebalanceListener, do you? I'm guessing not. The dev-ops person who manages our Kafka clusters thinks that committing the most recent offset when partitions get revoked might help this issue, although we already commit the latest offset after each batch of messages that we process.

When a rebalance happens and a client loses a partition, do no-kafka GroupConsumers finish processing all messages they have already started before giving up control of those partitions?

The duplicated messages seem to be metadata messages rather than actual kafka messages that were put onto a topic, anyway, so maybe the rebalancing isn't related to this issue.

scmorse avatar Feb 15 '17 21:02 scmorse

Are there messages such as Rejoining group on RebalanceInProgress or others?

oleksiyk avatar Feb 16 '17 07:02 oleksiyk

When a rebalance happens and a client loses a partition, do no-kafka GroupConsumers finish processing all messages they have already started before giving up control of those partitions?

This is not controlled by GroupConsumer, if there was successful fetch request then all these messages are being passed to processing handler, if Kafka server has started a rebalance then fetch request will fail with something like NotLeaderForPartition.

oleksiyk avatar Feb 16 '17 07:02 oleksiyk

If I correctly understand those messages HEX dumps, they are ListGroup requests. Are you using GroupAdmin and its listGroups method?

oleksiyk avatar Feb 16 '17 07:02 oleksiyk

If I correctly understand those messages HEX dumps, they are ListGroup requests. Are you using GroupAdmin and its listGroups method?

Yes, we use that as part of our health check mechanism, to verify that our communication with Kafka is working. Thank you for pointing out that those messages are ListGroups messages. I saw the responses, but wasn't sure what they were from, I guess I just thought it was part of the metadata.

I'm still not sure of the root cause of the duplicate responses to the listGroups call, but it seems like an error we can safely catch and ignore, since communication with kafka is still working in this instance, which is all we really care about.

scmorse avatar Feb 16 '17 15:02 scmorse

Are there messages such as Rejoining group on RebalanceInProgress or others?

No, but there may be a logging configuration problem.

scmorse avatar Feb 16 '17 15:02 scmorse

Can you stop using listGroups request and/or GroupAdmin and see if it resolves duplicate messages problem?

oleksiyk avatar Feb 16 '17 17:02 oleksiyk

I would like to confirm that the problem happens only to GroupAdmin and not GroupConsumer or Producer

oleksiyk avatar Feb 16 '17 17:02 oleksiyk

@scmorse This might be trivial, but do you have different environments (like dev and test) that use the different topic names with same group-id on same broker?

apalanki avatar Feb 20 '17 22:02 apalanki

Can you stop using listGroups request and/or GroupAdmin and see if it resolves duplicate messages problem?

I removed the call to listGroups but kept the initialization of the the GroupAdmin, and the problem went away. No more log messages about "cannot read property 'resolve' of undefined".

I would like to confirm that the problem happens only to GroupAdmin and not GroupConsumer or Producer

Yes, that seems to be the case. It's not even in the initialization of the group consumer, it's something specifically about the listGroups call.

This might be trivial, but do you have different environments (like dev and test) that use the different topic names with same group-id on same broker?

We do have different environments, but the topic names and the group names encode the environment name into their titles, so they should be unique-per-env. We do use just one constant clientId though, and I thought kafka might be confusing our clients and that might be causing the duped message. But I tried using a unique clientId as well and the error persisted.

@oleksiyk is it best practice to use a random clientId each time the app starts, or is it okay to have multiple consumers with the same clientId?

I have replaced the listGroups call with a call to describeGroup, and the error has gone away, so that is the work around we are going to go with right now.

scmorse avatar Mar 02 '17 21:03 scmorse

is it best practice to use a random clientId each time the app starts, or is it okay to have multiple consumers with the same clientId?

@scmorse I use the same clientId each time.

oleksiyk avatar Mar 03 '17 05:03 oleksiyk

Yes, that seems to be the case. It's not even in the initialization of the group consumer, it's something specifically about the listGroups call.

The problem is that GroupAdmin doesn't refresh broker metadata, so once Kafka cluster rebalances it sends listGroup request to outdated list of brokers which triggers the error. I will fix that.

oleksiyk avatar Mar 03 '17 05:03 oleksiyk

@oleksiyk The problem went away when I started using describeGroup, though. Wouldn't that call have the same metadata issue if GroupAdmins weren't refreshing their metadata?

scmorse avatar Mar 03 '17 14:03 scmorse

Switching from listGroups to describeGroup has gotten rid of the cannot read property 'resolve' of undefined errors, but has surfaced a few protocol errors:

RangeError: Trying to access beyond buffer length
    at _Reader.Reader.demand (/app/node_modules/tns-kafka/node_modules/bin-protocol/lib/reader.js:79:15)
    at _Reader.define.read (/app/node_modules/tns-kafka/node_modules/bin-protocol/lib/index.js:118:14)
    at _Reader._read (/app/node_modules/tns-kafka/node_modules/bin-protocol/lib/reader.js:42:12)
    at apply (/app/node_modules/lodash/lodash.js:497:27)
    at _Reader.wrapper [as raw] (/app/node_modules/lodash/lodash.js:5356:16)
    at _Reader.Protocol.define.read (/app/node_modules/tns-kafka/node_modules/no-kafka/lib/protocol/common.js:47:14)
    at _Reader._read (/app/node_modules/tns-kafka/node_modules/bin-protocol/lib/reader.js:42:12)
    at apply (/app/node_modules/lodash/lodash.js:496:27)
    at _Reader.wrapper [as string] (/app/node_modules/lodash/lodash.js:5356:16)
    at _Reader.Reader.loop (/app/node_modules/tns-kafka/node_modules/bin-protocol/lib/reader.js:94:12)
    at _Reader.Protocol.define.read (/app/node_modules/tns-kafka/node_modules/no-kafka/lib/protocol/common.js:69:14)
    at _Reader._read (/app/node_modules/tns-kafka/node_modules/bin-protocol/lib/reader.js:42:12)
    at apply (/app/node_modules/lodash/lodash.js:497:27)
    at _Reader.wrapper [as array] (/app/node_modules/lodash/lodash.js:5356:16)
    at _Reader.Protocol.define.read (/app/node_modules/tns-kafka/node_modules/no-kafka/lib/protocol/admin.js:56:14)
    at _Reader._read (/app/node_modules/tns-kafka/node_modules/bin-protocol/lib/reader.js:42:12)
    at apply (/app/node_modules/lodash/lodash.js:496:27)
    at _Reader.wrapper [as DescribeGroupResponse_ConsumerGroupMemberItem] (/app/node_modules/lodash/lodash.js:5356:16)
    at _Reader.Reader.loop (/app/node_modules/tns-kafka/node_modules/bin-protocol/lib/reader.js:94:12)
    at _Reader.Protocol.define.read (/app/node_modules/tns-kafka/node_modules/no-kafka/lib/protocol/common.js:69:14)
    at _Reader._read (/app/node_modules/tns-kafka/node_modules/bin-protocol/lib/reader.js:42:12)
    at apply (/app/node_modules/lodash/lodash.js:497:27)
    at _Reader.wrapper [as array] (/app/node_modules/lodash/lodash.js:5356:16)
    at _Reader.Protocol.define.read (/app/node_modules/tns-kafka/node_modules/no-kafka/lib/protocol/admin.js:83:18)
    at _Reader._read (/app/node_modules/tns-kafka/node_modules/bin-protocol/lib/reader.js:42:12)
    at apply (/app/node_modules/lodash/lodash.js:496:27)
    at _Reader.wrapper [as DescribeGroupResponse_GroupItem] (/app/node_modules/lodash/lodash.js:5356:16)
    at _Reader.Reader.loop (/app/node_modules/tns-kafka/node_modules/bin-protocol/lib/reader.js:94:12)
    at _Reader.Protocol.define.read (/app/node_modules/tns-kafka/node_modules/no-kafka/lib/protocol/common.js:69:14)
    at _Reader._read (/app/node_modules/tns-kafka/node_modules/bin-protocol/lib/reader.js:42:12)
    at apply (/app/node_modules/lodash/lodash.js:497:27)
    at _Reader.wrapper [as array] (/app/node_modules/lodash/lodash.js:5356:16)
    at _Reader.Protocol.define.read (/app/node_modules/tns-kafka/node_modules/no-kafka/lib/protocol/admin.js:95:14)
    at _Reader._read (/app/node_modules/tns-kafka/node_modules/bin-protocol/lib/reader.js:42:12)
    at apply (/app/node_modules/lodash/lodash.js:495:27)
    at _Reader.wrapper [as DescribeGroupResponse] (/app/node_modules/lodash/lodash.js:5356:16)
    at /app/node_modules/tns-kafka/node_modules/no-kafka/lib/client.js:743:55
    at bound (domain.js:287:14)
    at runBound (domain.js:300:12)
    at tryCatcher (/app/node_modules/bluebird/js/release/util.js:16:23)
    at Promise._settlePromiseFromHandler (/app/node_modules/bluebird/js/release/promise.js:512:31)
    at Promise._settlePromise (/app/node_modules/bluebird/js/release/promise.js:569:18)
    at Promise._settlePromise0 (/app/node_modules/bluebird/js/release/promise.js:614:10)
    at Promise._settlePromises (/app/node_modules/bluebird/js/release/promise.js:693:18)
    at Async._drainQueue (/app/node_modules/bluebird/js/release/async.js:133:16)
    at Async._drainQueues (/app/node_modules/bluebird/js/release/async.js:143:10)
    at Immediate.Async.drainQueues [as _onImmediate] (/app/node_modules/bluebird/js/release/async.js:17:14)
    at processImmediate [as _immediateCallback] (timers.js:383:17)

And

RangeError: index out of range
    at checkOffset (buffer.js:688:11)
    at Buffer.readInt32BE (buffer.js:853:5)
    at _Reader.define.read (/app/node_modules/tns-kafka/node_modules/bin-protocol/lib/index.js:101:47)
    at _Reader._read (/app/node_modules/tns-kafka/node_modules/bin-protocol/lib/reader.js:42:12)
    at apply (/app/node_modules/lodash/lodash.js:496:27)
    at _Reader.wrapper (/app/node_modules/lodash/lodash.js:5356:16)
    at _Reader.Protocol.define.read (/app/node_modules/tns-kafka/node_modules/no-kafka/lib/protocol/common.js:65:14)
    at _Reader._read (/app/node_modules/tns-kafka/node_modules/bin-protocol/lib/reader.js:42:12)
    at apply (/app/node_modules/lodash/lodash.js:497:27)
    at _Reader.wrapper [as array] (/app/node_modules/lodash/lodash.js:5356:16)
    at _Reader.Protocol.define.read (/app/node_modules/tns-kafka/node_modules/no-kafka/lib/protocol/admin.js:56:14)
    at _Reader._read (/app/node_modules/tns-kafka/node_modules/bin-protocol/lib/reader.js:42:12)
    at apply (/app/node_modules/lodash/lodash.js:496:27)
    at _Reader.wrapper [as DescribeGroupResponse_ConsumerGroupMemberItem] (/app/node_modules/lodash/lodash.js:5356:16)
    at _Reader.Reader.loop (/app/node_modules/tns-kafka/node_modules/bin-protocol/lib/reader.js:94:12)
    at _Reader.Protocol.define.read (/app/node_modules/tns-kafka/node_modules/no-kafka/lib/protocol/common.js:69:14)
    at _Reader._read (/app/node_modules/tns-kafka/node_modules/bin-protocol/lib/reader.js:42:12)
    at apply (/app/node_modules/lodash/lodash.js:497:27)
    at _Reader.wrapper [as array] (/app/node_modules/lodash/lodash.js:5356:16)
    at _Reader.Protocol.define.read (/app/node_modules/tns-kafka/node_modules/no-kafka/lib/protocol/admin.js:83:18)
    at _Reader._read (/app/node_modules/tns-kafka/node_modules/bin-protocol/lib/reader.js:42:12)
    at apply (/app/node_modules/lodash/lodash.js:496:27)
    at _Reader.wrapper [as DescribeGroupResponse_GroupItem] (/app/node_modules/lodash/lodash.js:5356:16)
    at _Reader.Reader.loop (/app/node_modules/tns-kafka/node_modules/bin-protocol/lib/reader.js:94:12)
    at _Reader.Protocol.define.read (/app/node_modules/tns-kafka/node_modules/no-kafka/lib/protocol/common.js:69:14)
    at _Reader._read (/app/node_modules/tns-kafka/node_modules/bin-protocol/lib/reader.js:42:12)
    at apply (/app/node_modules/lodash/lodash.js:497:27)
    at _Reader.wrapper [as array] (/app/node_modules/lodash/lodash.js:5356:16)
    at _Reader.Protocol.define.read (/app/node_modules/tns-kafka/node_modules/no-kafka/lib/protocol/admin.js:95:14)
    at _Reader._read (/app/node_modules/tns-kafka/node_modules/bin-protocol/lib/reader.js:42:12)
    at apply (/app/node_modules/lodash/lodash.js:495:27)
    at _Reader.wrapper [as DescribeGroupResponse] (/app/node_modules/lodash/lodash.js:5356:16)
    at /app/node_modules/tns-kafka/node_modules/no-kafka/lib/client.js:743:55
    at bound (domain.js:287:14)
    at runBound (domain.js:300:12)
    at tryCatcher (/app/node_modules/bluebird/js/release/util.js:16:23)
    at Promise._settlePromiseFromHandler (/app/node_modules/bluebird/js/release/promise.js:512:31)
    at Promise._settlePromise (/app/node_modules/bluebird/js/release/promise.js:569:18)
    at Promise._settlePromise0 (/app/node_modules/bluebird/js/release/promise.js:614:10)
    at Promise._settlePromises (/app/node_modules/bluebird/js/release/promise.js:693:18)
    at Async._drainQueue (/app/node_modules/bluebird/js/release/async.js:133:16)
    at Async._drainQueues (/app/node_modules/bluebird/js/release/async.js:143:10)
    at Immediate.Async.drainQueues [as _onImmediate] (/app/node_modules/bluebird/js/release/async.js:17:14)
    at processImmediate [as _immediateCallback] (timers.js:383:17)

These errors seem to occur much less frequently than the listGroups error, although I suspect the issues may be related.

Both of these errors relate to admin.js:95:14 (DescribeGroupResponse) > admin.js:83:18 (DescribeGroupResponse_ConsumerGroupMemberItem) > admin.js:56:14 (DescribeGroupResponse_ConsumerGroupMemberItem). Looks like a protocol level error, although I'm sure @oleksiyk, you would have more insight.

(Using Kafka 0.9.0.1, btw)

scmorse avatar Mar 06 '17 20:03 scmorse

Can you try https://github.com/oleksiyk/kafka/commit/238fe7a50ae1cba147b17eb72824551b1bab1afc and see if it fixes the protocol error?

oleksiyk avatar Mar 06 '17 20:03 oleksiyk

I'm having a difficult time deploying the package referencing a git url (works locally just can't deploy it), would you mind publishing the updated code in a prerelease?

npm version patch
npm version prerelease
npm publish --tag debug-protocol-fix

(I put the --tag on there so that people who installed the 'latest' won't get this version until we verify it works)

scmorse avatar Mar 06 '17 21:03 scmorse

Done, try it.

[email protected]

oleksiyk avatar Mar 07 '17 05:03 oleksiyk

No luck. Pretty soon after it deployed, it logged another RangeError: Trying to access beyond buffer length error.

scmorse avatar Mar 07 '17 18:03 scmorse

I need to replicate it. Until I see what I get back from Kafka I can't really guess whats wrong with that packet.

oleksiyk avatar Mar 07 '17 18:03 oleksiyk

How do you initiate each describeGroup call? Do you create GroupAdmin instance each time or using the same one? What is your connectionString? Does it have any repeating/duplicate hosts? Maybe one time mentioned by IP and other by domain name?

oleksiyk avatar Mar 07 '17 19:03 oleksiyk

I'm also getting

TypeError: Cannot read property 'resolve' of undefined
    at Connection._receive (/app/node_modules/wix-bootstrap-greynode/node_modules/greynode-core/node_modules/no-kafka/lib/connection.js:205:30)
    at emitOne (events.js:96:13)
    at Socket.emit (events.js:188:7)
    at readableAddChunk (_stream_readable.js:176:18)
    at Socket.Readable.push (_stream_readable.js:134:10)
    at TCP.onread (net.js:547:20)

I don't use any GroupAdmin nor DescribeGroup. Only GroupConsumer and Producer. I have a suspicion that it happens on rebalancing - when I had a single node process consuming messages that didn't happen, but once more instances joined consuming the same topic & groupId those have started to appear

hugebdu avatar Aug 31 '17 09:08 hugebdu

More instances in the same node process or different node processes?

oleksiyk avatar Aug 31 '17 09:08 oleksiyk

same one

hugebdu avatar Aug 31 '17 09:08 hugebdu

Once all instances had joined (deployed), looks like errors stop to appear. Still examining ...

hugebdu avatar Aug 31 '17 09:08 hugebdu

@oleksiyk So I commented out your printout, plus added data argument to it. Obviously I get a huge blob of binary data, which doesn't look nice in log. However I do see in between some readable stuff and those are definitely the complete listing of the topics we have in our cluster. I'm not that much familiar with kafka protocol, will try to find out which message is resolved twice in my case. Any hints are more than welcome :)

hugebdu avatar Sep 05 '17 14:09 hugebdu

Regardless of my findings to come, if any, WDYT about wrapping this.queue[correlationId].resolve(new Buffer(data.slice(4, length + 4))); with try...catch...report meanwhile to keep node process from dying disgracefully?

hugebdu avatar Sep 05 '17 14:09 hugebdu

Can you save the data of the duplicate message into the separate file (fs.writeFile, etc) and share it here in any way?

oleksiyk avatar Sep 05 '17 15:09 oleksiyk

@oleksiyk here you go. Recorded four messages. kafka_messages.tar.gz

hugebdu avatar Sep 06 '17 06:09 hugebdu

@oleksiyk did you have a chance to take a look into my dumps? Anything interesting? I didn't have much progress. My plan is to add additional copy of queue with TTL eviction and to try and correlate duplicate messages using that new queue - WDYT?

hugebdu avatar Sep 11 '17 07:09 hugebdu

@hugebdu what were the log messages right before the error occurred? How easily can you replicate it? Can you describe that process?

oleksiyk avatar Sep 11 '17 11:09 oleksiyk

  1. I don't have logs enabled (or maybe I've missed it)
  2. Pretty easy. So I have a few instances of exactly the same micro-service (same groupId, same consumer). Once I start restarting those randomly I start to see those errors.

hugebdu avatar Sep 11 '17 13:09 hugebdu

How do you restart them? Which instances trigger the error - those that keep running or those that are being restarted?

oleksiyk avatar Sep 11 '17 13:09 oleksiyk

Those are docker. start and stop will do. I think I've seen it both in those that did the restart and those that didn't move.

hugebdu avatar Sep 11 '17 13:09 hugebdu

@hugebdu How many Kafka servers are running? Is it single topic subscription or multiple topics?

oleksiyk avatar Sep 11 '17 17:09 oleksiyk

Three brokers in kafka cluster. Single topic that is

hugebdu avatar Sep 11 '17 17:09 hugebdu

@oleksiyk so I added another queue to connection to try and catch duplicate responses for the same correlation id (see here)

Here is the stderr.log Look for Wrong correlationId received I did remove some unrelated lines

hugebdu avatar Sep 12 '17 12:09 hugebdu

"Socket timeout" - do you stop Kafka servers (brokers)?

oleksiyk avatar Sep 12 '17 12:09 oleksiyk

Nope

hugebdu avatar Sep 12 '17 13:09 hugebdu

I guess a consumer instance sends a metadataRequest that timeouts for some reason (there is definitely something with the network connection going on), that correlationId request (and its promise) is then removed from this.queue but a moment later the response is received from Kafka brokers - and that triggers "wrong correlationId error".

oleksiyk avatar Sep 12 '17 13:09 oleksiyk

As I mentioned before, we run our node processes in docker and we do suspect some networking problems there (planning to upgrade to latest docker - there were many network stack related fixes).

Meanwhile WDYT about wrapping this.queue[correlactionId] with try...catch...log?

hugebdu avatar Sep 12 '17 13:09 hugebdu

Meanwhile we're preparing some sanity & stress test, to see there's no messages lost, including on those errors, which currently cause node process to restart (via node cluster).

hugebdu avatar Sep 12 '17 13:09 hugebdu

I we can safely ignore "wrong correlationId error" in this case. No need in try/catch I think, just drop that packet and log a warning message. I will update the code.

oleksiyk avatar Sep 12 '17 15:09 oleksiyk

Great, will wait for the fix. Thanks a lot Oleksiy! Great package (so far the best for node out there i could find) add the attitude!

hugebdu avatar Sep 12 '17 16:09 hugebdu

I've published it as 3.2.2. It silently drops that packet without log message (this has to be changed later when it comes to changing logging globally in no-kafka).

oleksiyk avatar Sep 12 '17 16:09 oleksiyk

Awesome, will upgrade tomorrow. Thanks a lot

hugebdu avatar Sep 12 '17 16:09 hugebdu

@oleksiyk UPD: no more errors in my case. 10x!

hugebdu avatar Sep 18 '17 06:09 hugebdu