kafka icon indicating copy to clipboard operation
kafka copied to clipboard

Connection time limit

Open aheckmann-pebble opened this issue 8 years ago • 15 comments

When a misconfigured connection string is pushed to production, I'd like to know about it as soon as I can. After reaching some configurable time threshold, I'd like this driver to reject attempts to connect to Kafka. Currently this driver will keep retrying forever, silently swallowing connection failure.

Thoughts?

aheckmann-pebble avatar Jun 01 '16 16:06 aheckmann-pebble

I'm streaming all logs to Elastic and then analyse them for failures. This way I'm immediately aware of any problems but I'm also free to not bother restarting services once issue is resolved.

oleksiyk avatar Jun 01 '16 17:06 oleksiyk

That may work ok but depends on latency between app and logging services. Ideally we would know after 10-15 seconds that we couldn't connect so we can immediately stop a bad configuration rolling out to all production services.

aheckmann-pebble avatar Jun 01 '16 17:06 aheckmann-pebble

:+1: For a connection retry limit.

sahilthapar avatar Jun 15 '16 09:06 sahilthapar

We too are streaming all logs to ES. But really the application needs to know what is going on as well. We need to stop consuming from the source when we can't write to kafka. We don't want our ops people having to intervene via log alerting. It is so much better if the application can be itself resilient. As for restarting, all our systems run in marathon/mesos, so in our case restarting is not a manual process.

Any kind of callback when the broker cannot be contacted would immensely helpful.

memelet avatar Jul 04 '16 07:07 memelet

I'm not sure no-kafka will do this. There are so many possible scenarios to handle that if I add an immediate callback for failed broker I will then receive new issues requesting new options for this callback. Such that the callback should fire only on 3rd failed request, or wait 5 seconds in the hope that Kafka server will re-balance and remove failed broker and so on.

oleksiyk avatar Jul 04 '16 07:07 oleksiyk

But the callback would be what would allow for that kind of extensibility. With a simple callback all these strategies could be implemented, or not. I'm not asking for the callback to change any behavior of nokafka, nor even be synchronous. Just a notification that the broker is disconnected so the application can take appropriate action.

memelet avatar Jul 04 '16 11:07 memelet

When the broker is down it is up to the Kafka cluster to take action and reassign partitions to new broker/leader. And then you'll probably need a callback that the broker is up again? What if that broker was removed and replaced permanently?

oleksiyk avatar Jul 04 '16 19:07 oleksiyk

@oleksiyk that's actually what the Cassandra driver offers: http://docs.datastax.com/en/latest-nodejs-driver-api/Client.html#event:hostAdd

Most use cases won't need this, but if this isn't extremely difficult to implement .. it'll open up opportunities for apps to deal with it the way they want.

ismriv avatar Jul 04 '16 21:07 ismriv

Here is scenario that just happened to us:

A client could not connect to the broker for many hours

Metadata request failed: NoKafkaConnectionError [kafka.service.consul:9092]: Connection was aborted before connection was established.
{ [NoKafkaConnectionError: Connection was aborted before connection was established.]
  name: 'NoKafkaConnectionError',
  server: 'kafka.service.consul:9092',
  message: 'Connection was aborted before connection was established.' } 

This was logged and routed to ELK and alerted on. But the broker was up, and the DNS 'kafka.service.consul' resolved the correct IP. Yet nokafka could not connect to the broker. After restarting the service nokafka connected to the broker and all was well.

But this required manual intervention. Ideally the service using nokafka would determine that it has not been able to the broker for too long and terminate itself (and let Marathon restart it).

This is the kind of behavior that an status callback would enable. (And the callback would have all the logic, nokafka would simply need to notify on state.)

Of course the alternative is to intercept the log messages and parse them (and hope they do not change in a later release).

memelet avatar Jul 14 '16 03:07 memelet

The error Connection was aborted before connection was established. can only be raised if the no-kafka client was manually closed, e.g. .end() called on the instance too fast.

oleksiyk avatar Jul 14 '16 08:07 oleksiyk

Emitting an event when unable to connect after a configurable n attempts sounds like it could meet our needs. FWIW, this general problem space has been solved before within the NodeJS mongodb driver as well (connection timeouts, cluster changes, reconnection events, etc). Might be a good place to go for guidance since all their drivers support these behaviors.

aheckmann-pebble avatar Jul 14 '16 19:07 aheckmann-pebble

Is there any fix available for "connection timeout limit" ? Apart from intercepting the logs and taking necessary action (as stated above)

zer0Id0l avatar Sep 30 '16 22:09 zer0Id0l

Today we hit this problem as well.

Potentially, this can lead to memory leaking or data loss, or both.

We call producer.send() many times per second. It accumulates all the promises in the RAM until the connection is back. At some point the node.js process will be killed with OutOfMemoryException or something, loosing all the in memory data.

Also, this means the producer {retries} option make no sense when a connectivity breaks suddenly. It never gets to retry the producer.send() because it stuck in the endless client.updateMetadata() function.

Proposal

How about a new option which would limit the connection retries? But by default attempt to reconnect forever.

var producer = new Kafka.Producer({
    reconnectionDelay: {retries: 60, min: 1000, max: 1000}, // reject after ~1 minute
});

var producer = new Kafka.Producer({
    reconnectionDelay: {retries: 0, min: 1000, max: 1000}, // defaults. Never rejects
});

koresar avatar Oct 28 '16 04:10 koresar

Put up the PR to implement rejections. See #131 please.

koresar avatar Oct 28 '16 06:10 koresar

I don't know if it's still relevant to anyone, but since no-kafka uses bluebird promises, you can use the timeout function, like this:

producer.init().timeout(3000).catch(console.log("Unable to connect"))

orgoldfus avatar Feb 20 '17 12:02 orgoldfus