kafkajs icon indicating copy to clipboard operation
kafkajs copied to clipboard

producer.connect() - KafkaJSError: The producer is disconnected

Open zemacnica opened this issue 3 years ago • 27 comments

Describe the bug Somewhere is problem in 1.14.0 version. I tried to connect and then produce message, but sometimes I got error:

(node:78713) UnhandledPromiseRejectionWarning: KafkaJSError: The producer is disconnected
    at validateConnectionStatus (/Users/g.karabinosova/ares/node_packages/eventhub/node_modules/kafkajs/src/producer/messageProducer.js:30:15)
    at sendBatch (/Users/g.karabinosova/ares/node_packages/eventhub/node_modules/kafkajs/src/producer/messageProducer.js:81:5)
    at Object.send (/Users/g.karabinosova/ares/node_packages/eventhub/node_modules/kafkajs/src/producer/messageProducer.js:153:12)
    at EventHubClient.produce (/Users/g.karabinosova/ares/node_packages/eventhub/src/EventHubClient.js:111:25)
    at processTicksAndRejections (internal/process/next_tick.js:81:5)

Also producer.event.CONNECTED is not emitted.

I tried to produce 2 messages at once and to log KafkaJS connectionStatus into console and result is:

2020-10-05T16:29:20.854Z level=info label=eventhub-client message="Registered handler for all events."
CONNECTING.................  (Output from producer.connect method )
CONNECTING.................
CONNECTED.................
2020-10-05T16:29:20.879Z level=info label=eventhub-client message="Producing 1 message(s)=[{'key':'partition-key1','value':'{\'id\':\'739ec863-be56-43b1-99bd-f4ef2184081c\',\'time\':\'2020-10-05T16:29:20.855Z\',\'specversion\':\'0.2\',\'type\':\'event1\',\'source\':\'/test\',\'contenttype\':\'application/json\',\'data\':{\'foo\':\'bar1\'}}','headers':{}}] test"
STATUS....................... disconnected (Output from validateConnectionStatus in sentBatch method )
(node:78713) UnhandledPromiseRejectionWarning: KafkaJSError: The producer is disconnected
    at validateConnectionStatus (/Users/g.karabinosova/ares/node_packages/eventhub/node_modules/kafkajs/src/producer/messageProducer.js:30:15)
    at sendBatch (/Users/g.karabinosova/ares/node_packages/eventhub/node_modules/kafkajs/src/producer/messageProducer.js:81:5)
    at Object.send (/Users/g.karabinosova/ares/node_packages/eventhub/node_modules/kafkajs/src/producer/messageProducer.js:153:12)
    at EventHubClient.produce (/Users/g.karabinosova/ares/node_packages/eventhub/src/EventHubClient.js:111:25)
    at processTicksAndRejections (internal/process/next_tick.js:81:5)
(node:78713) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). (rejection id: 2)
(node:78713) [DEP0018] DeprecationWarning: Unhandled promise rejections are deprecated. In the future, promise rejections that are not handled will terminate the Node.js process with a non-zero exit code.
CONNECTED.................
2020-10-05T16:29:20.882Z level=info label=eventhub-client message="Producing 2 message(s)=[{'key':'partition-key2','value':'{\'id\':\'996b7604-e715-4a31-97ce-d058abac721d\',\'time\':\'2020-10-05T16:29:20.858Z\',\'specversion\':\'0.2\',\'type\':\'event2\',\'source\':\'/test\',\'contenttype\':\'application/json\',\'data\':{\'foo\':\'bar2\'}}','headers':{}},{'key':'partition-key3','value':'{\'id\':\'f1fcc9e2-1905-417d-a54e-b624e5b299b6\',\'time\':\'2020-10-05T16:29:20.858Z\',\'specversion\':\'0.2\',\'type\':\'event3\',\'source\':\'/test\',\'contenttype\':\'application/json\',\'data\':{\'foo\':\'bar3\'}}','headers':{}}] test2"
STATUS....................... connected

Code sample:

if (!this.producerConnected) {
      this.producer = this.kafka.producer();
      try {
        await this.producer.connect();
        this.producerConnected = true;
      }catch(error){
        throw new Error(`Couldn't connect producer.`);
      }
    }
await this.producer.send({ topic, messages });

To Reproduce Please provide either a link to a:

  1. failing test in a KafkaJS fork
  2. repository with an example project reproducing the issue

If none of the above are possible to provide, please write down the exact steps to reproduce the behavior:

  1. Run a producer that continuously produces messages to a topic
  2. Run a consumer that subscribes to that topic and logs each topic offset
  3. After the consumer has consumed 100 messages, it...

Expected behavior resolve connect then produce messages

Observed behavior Even when connection is resolved, producing first message failed and connectionStatus is not updated, other messages are produced.

Environment:

  • OS: Mac OS 18.7.0
  • KafkaJS version 1.14.0
  • Kafka version 5.1.2
  • NodeJS version 11.10.0

Additional context Add any other context about the problem here.

zemacnica avatar Oct 05 '20 16:10 zemacnica

I ran into this as well after upgrading to 1.14.0. It seems like there was an unannounced breaking change that requires producer.connect to be called before sending?

eliw00d avatar Oct 05 '20 21:10 eliw00d

It seems like there was an unannounced breaking change that requires producer.connect to be called before sending?

This has always been the case. We have never supported sending messages without first connecting the producer. It's possible that a change in the network layer caused a change in behavior, but even if it somehow worked before, it was never supported. Otherwise there would be no point in having a connect method.

I tried to produce 2 messages at once and to log KafkaJS connectionStatus into console and result is:

Could you share a more complete example of what you are doing? I tried to reproduce it myself using the simple producer example from examples/producer.js and was not able to:

node examples/producer.js
info:  ┏ Sending 870 messages #0...
info:  ┃ [0] {
info:  ┃ [1]   "timestamp": "2020-10-06T06:52:26.007Z",
info:  ┃ [2]   "logger": "kafkajs"
info:  ┗ [3] }
info:  ┏ Sending 602 messages #1...
info:  ┃ [0] {
info:  ┃ [1]   "timestamp": "2020-10-06T06:52:26.013Z",
info:  ┃ [2]   "logger": "kafkajs"
info:  ┗ [3] }
info:  ┏ Messages sent #0
info:  ┃ [ 0] {
info:  ┃ [ 1]   "timestamp": "2020-10-06T06:52:27.093Z",
info:  ┃ [ 2]   "logger": "kafkajs",
info:  ┃ [ 3]   "response": [
info:  ┃ [ 4]     {
info:  ┃ [ 5]       "topicName": "topic-test",
info:  ┃ [ 6]       "partition": 0,
info:  ┃ [ 7]       "errorCode": 0,
info:  ┃ [ 8]       "baseOffset": "0",
info:  ┃ [ 9]       "logAppendTime": "-1",
info:  ┃ [10]       "logStartOffset": "0"
info:  ┃ [11]     }
info:  ┃ [12]   ],
info:  ┃ [13]   "msgNumber": 1472
info:  ┗ [14] }
info:  ┏ Messages sent #1
info:  ┃ [ 0] {
info:  ┃ [ 1]   "timestamp": "2020-10-06T06:52:27.105Z",
info:  ┃ [ 2]   "logger": "kafkajs",
info:  ┃ [ 3]   "response": [
info:  ┃ [ 4]     {
info:  ┃ [ 5]       "topicName": "topic-test",
info:  ┃ [ 6]       "partition": 0,
info:  ┃ [ 7]       "errorCode": 0,
info:  ┃ [ 8]       "baseOffset": "870",
info:  ┃ [ 9]       "logAppendTime": "-1",
info:  ┃ [10]       "logStartOffset": "0"
info:  ┃ [11]     }
info:  ┃ [12]   ],
info:  ┃ [13]   "msgNumber": 1472
info:  ┗ [14] }

Specifically, there's clearly something fishy going on in /Users/g.karabinosova/ares/node_packages/eventhub/src/EventHubClient.js:111:25 since you're having an unhandled promise rejection, meaning you're not awaiting some promise.

Nevon avatar Oct 06 '20 06:10 Nevon

That line 111 is last line of code sample: await this.producer.send({ topic, messages }); and yes it is not wrapped in try catch so that's the reason. now is on the line 112 wrapped in try catch, but result is the same.

producer.connect() doesn't throw error, producer.event.CONNECTED is not emitted and message is not sent and end up with error

2020-10-06T18:04:56.121Z level=info label=eventhub-client message="Producing 1 message(s)=[{'key':'partition-key1','value':'{\'id\':\'da1a4ddc-1fe4-44e0-90d3-0145ad205704\',\'time\':\'2020-10-06T18:04:56.100Z\',\'specversion\':\'0.2\',\'type\':\'event1\',\'source\':\'/test\',\'contenttype\':\'application/json\',\'data\':{\'foo\':\'bar1\'}}','headers':{}}] test"
ERROOOOOR Im handling it >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> { KafkaJSError: The producer is disconnected
    at validateConnectionStatus (/Users/g.karabinosova/ares/node_packages/eventhub/node_modules/kafkajs/src/producer/messageProducer.js:30:15)
    at sendBatch (/Users/g.karabinosova/ares/node_packages/eventhub/node_modules/kafkajs/src/producer/messageProducer.js:81:5)
    at Object.send (/Users/g.karabinosova/ares/node_packages/eventhub/node_modules/kafkajs/src/producer/messageProducer.js:153:12)
    at EventHubClient.produce (/Users/g.karabinosova/ares/node_packages/eventhub/src/EventHubClient.js:112:27)
    at processTicksAndRejections (internal/process/next_tick.js:81:5) name: 'KafkaJSError', retriable: true, helpUrl: undefined }

it is easier to reproduce the error, when producer is stopped and restarted because reconnection takes longer.

zemacnica avatar Oct 06 '20 18:10 zemacnica

This has always been the case. We have never supported sending messages without first connecting the producer. It's possible that a change in the network layer caused a change in behavior, but even if it somehow worked before, it was never supported. Otherwise there would be no point in having a connect method.

Is it best to connect->send->disconnect every time or just connect when the app starts and only disconnect on app shutdown? It seems like I am seeing the same error sometimes even after adding an explicit connect on app start.

eliw00d avatar Oct 07 '20 21:10 eliw00d

You should just connect at startup and disconnect at shutdown.

There seems to be a bug introduced with the latest version related to reconnects, but regardless, nothing is changing about the intended usage.

Nevon avatar Oct 08 '20 06:10 Nevon

I was looking into this issue and we definitely changed the behavior on 1.14, but it's for the better. Without the changes, you would never be able to stop the producer without side-effects since it would always reconnect even if your server was going down. When you call connect or disconnect you express your intent and the consumer will operate on that, so a disconnect will prevent the consumer from reconnecting or producing new messages since this is signaling that the service or producer is going down.

If you have an API that is producing messages and you are shutting it down it usually means that the server will block new requests and start draining the remaining connections, which is also the case for the producer now.

tulios avatar Oct 08 '20 19:10 tulios

v1.14.0 broke my entire application, Sentry quota was depleted in 1 day because of the bump from 1.12.0 -> 1.14.0 so I rolled back to 1.12.0 which resolves the issues with producer's sending messages. Seems to me this needs more priority and attention on what/why the 1.14.0 release breaks things.

image

crobinson42 avatar Nov 28 '20 17:11 crobinson42

@crobinson42 try 1.15 but the cause is very likely to be you not connecting the producer, this was always a requirement but somehow worked on 1.12, give 1.15 a try otherwise we will need more context to investigate.

tulios avatar Nov 30 '20 16:11 tulios

You should just connect at startup and disconnect at shutdown.

What happens if no broker is available at startup? We occasionally produce messages in our API server, but i dont want my whole API server to not start, because kafka is not available, since a large part of the API is useable without kafka.

backbone87 avatar Jan 13 '21 01:01 backbone87

Hmm, I have checked with netstat -ant and got such output after starting a service:

user@my-pod:/app# netstat -ant
Active Internet connections (servers and established)
Proto Recv-Q Send-Q Local Address           Foreign Address         State
...
tcp        0      0 10.120.32.80:38454      10.120.38.28:9092       ESTABLISHED
...

Looks good!

But, after about 10 minutes I don't see any connections to Kafka. I run connect function only during initialization. What it might be? Maybe there is something wrong with re-connection?

I am talking about producer. 1.15 version

tomislater avatar Apr 06 '21 15:04 tomislater

I have used tcpdump tool and it looks that Kafka closes idle connection after ten minutes (by default). After that, my producer cannot send any message. I call connect func only on initialization.

tomislater avatar Apr 06 '21 21:04 tomislater

What happens when you call send after that? A producer is not meant to keep a constantly open connection. If the connection is idle, it should be closed. Then when you try to send, it should reconnect. If reconnecting doesn't work, then there's a problem.

What happens if no broker is available at startup?

producer.connect() will reject. If Kafka is not a requirement for your service to operate, then you'll need to design your application such that it tries to connect when needed instead of connecting at startup.

Nevon avatar Apr 07 '21 11:04 Nevon

What happens when you call send after that?

I get this error then: KafkaJSError: The producer is disconnected.

tomislater avatar Apr 07 '21 12:04 tomislater

Ran in the same issue after updating to 1.14. From my point of view calling connect fells very strange. I would rather expect that kafkajs handle connection on send/subscribe and throws, if connection can't be established or if subscriber got disconnected.

As workaround we created an own abstraction and call every time our own connectIfNeeded, which basically calls kafkajs connect function and catch/post-to-sentry/hide errors. Is there any way to check if connection is alive? We are on 1.15.0.

Bessonov avatar Oct 06 '21 10:10 Bessonov

We have the same issue in producer side, with set producer connected when get a producer instance from kafka client in "kafkajs": "^1.15.0"., there is data loss each time restart producer.

superLincoln avatar May 09 '22 02:05 superLincoln

I'm having the same issue when using transactions.

nick-0101 avatar Jul 01 '22 01:07 nick-0101

Hi all, I have the same issue, Is there any resolution for this?

Mansh05 avatar Jul 06 '22 20:07 Mansh05

It is probably because you submit message without connection built at the beginning , it happened in parallel coding but no problem in serial coding. So just make sure there is a success connection initialized before sending msg. Plus, You can use debug logging to check the disconnected connection information when sending messages in parallel which causes this issue. Good luck

On Thu, Jul 7, 2022 at 04:46 Manish Sharma @.***> wrote:

Hi all, I have the same issue, Is there any resolution for this?

— Reply to this email directly, view it on GitHub https://github.com/tulios/kafkajs/issues/907#issuecomment-1176717045, or unsubscribe https://github.com/notifications/unsubscribe-auth/ARI74VPWCRE3MM62YW2L36TVSXWB3ANCNFSM4SE6BQCA . You are receiving this because you commented.Message ID: @.***>

superLincoln avatar Jul 07 '22 00:07 superLincoln

Thanks @superLincoln for the answer, Sorry i gave a little less context on the comment.

Actually the scenario that i have is, My app is deployed in k8 and with one pod deployment, the kafka service runs well, the publisher doesn't get disconnected when performing stress testing also, but once i spawn multiple replicas, It started giving me this error.

Both the pods point to the same broker and publish to the same topic (Its just the replica anyhow). So ya like is this a known issue or do we have some workaround for this?

Do let me know of the possibilities, I will try and find some workaround myself on this.

Mansh05 avatar Jul 07 '22 12:07 Mansh05

We are also facing the same issue with KafkaJS 1.16.0.

mahajanankur avatar Aug 17 '22 11:08 mahajanankur

WE are also facing the same issue , IS there a fix for this

mahesh-avoma avatar Jun 28 '23 11:06 mahesh-avoma

Thanks @superLincoln for the answer, Sorry i gave a little less context on the comment.

Actually the scenario that i have is, My app is deployed in k8 and with one pod deployment, the kafka service runs well, the publisher doesn't get disconnected when performing stress testing also, but once i spawn multiple replicas, It started giving me this error.

Both the pods point to the same broker and publish to the same topic (Its just the replica anyhow). So ya like is this a known issue or do we have some workaround for this?

Do let me know of the possibilities, I will try and find some workaround myself on this.

I guess in this case all consumers have same consumer group as it is scaled service. You can only have as many consumers in same group as many partitions you have on kafka instance.

zemacnica avatar Jun 28 '23 20:06 zemacnica

I guess in this case all consumers have same consumer group as it is scaled service. You can only have as many consumers in same group as many partitions you have on kafka instance.

IIRC, you can, they just stay idle. But no error should be thrown.

Bessonov avatar Jun 28 '23 21:06 Bessonov

@Nevon As you are saying that - when you try to send, it should reconnect. If reconnecting doesn't work, then there's a problem. KafkaJS is not trying to reconnect as it is just checking connection status and throwing below two error:

  1. The producer is disconnecting; therefore, it can't safely accept messages anymore
  2. The producer is disconnected

Can you revisit the implementation.

const sendBatch = async ({ acks = -1, timeout, compression, topicMessages = [] }) => {
    if (topicMessages.some(({ topic }) => !topic)) {
      throw new KafkaJSNonRetriableError(`Invalid topic`)
    }

    if (idempotent && acks !== -1) {
      throw new KafkaJSNonRetriableError(
        `Not requiring ack for all messages invalidates the idempotent producer's EoS guarantees`
      )
    }

    for (const { topic, messages } of topicMessages) {
      if (!messages) {
        throw new KafkaJSNonRetriableError(
          `Invalid messages array [${messages}] for topic "${topic}"`
        )
      }

      const messageWithoutValue = messages.find(message => message.value === undefined)
      if (messageWithoutValue) {
        throw new KafkaJSNonRetriableError(
          `Invalid message without value for topic "${topic}": ${JSON.stringify(
            messageWithoutValue
          )}`
        )
      }
    }

    validateConnectionStatus()
    const mergedTopicMessages = topicMessages.reduce((merged, { topic, messages }) => {
      const index = merged.findIndex(({ topic: mergedTopic }) => topic === mergedTopic)

      if (index === -1) {
        merged.push({ topic, messages })
      } else {
        merged[index].messages = [...merged[index].messages, ...messages]
      }

      return merged
    }, [])

    return await sendMessages({
      acks,
      timeout,
      compression,
      topicMessages: mergedTopicMessages,
    })
  }

smitpatelhv avatar Jan 31 '24 07:01 smitpatelhv

im encountering the same issue

EmmanDizon avatar Apr 24 '24 06:04 EmmanDizon

im encountering the same issue if i need to loop the sending, if i will send only once. no error

EmmanDizon avatar May 03 '24 04:05 EmmanDizon