confluent-kafka-dotnet icon indicating copy to clipboard operation
confluent-kafka-dotnet copied to clipboard

[Question] How to detect connection failures and is there a hard limit for re-connection?

Open quixoticaxis opened this issue 3 years ago • 7 comments
trafficstars

The consumer currently attempts to re-connect to brokers after encountering connectivity issues. Is it possible to apply a global timeout/limit to consumer's re-connection or disable it completely? Is it possible to somehow detect the connection loss in the user code?

As it stands, the messages processed lately are by definition incorrect in the system I work on, so I have a side channel that can invalidate the output for my clients. In general, I would prefer getting an exception from Consume to losing time on re-connection attempts that take longer than a few seconds when all brokers are not available.

quixoticaxis avatar Jul 08 '22 09:07 quixoticaxis

The consumer currently attempts to re-connect to brokers after encountering connectivity issues. Is it possible to apply a global timeout/limit to consumer's re-connection or disable it completely? Is it possible to somehow detect the connection loss in the user code?

As it stands, the messages processed lately are by definition incorrect in the system I work on, so I have a side channel that can invalidate the output for my clients. In general, I would prefer getting an exception from Consume to losing time on re-connection attempts that take longer than a few seconds when all brokers are not available.

hi @quixoticaxis

about your first question (set timeout/limit consumer connection) , you can use SessionTimeoutMs property in ConsumerConfig , E.g

var consumerConfig = new ConsumerConfig()
{
    SessionTimeoutMs = (int)TimeSpan.FromSeconds(10).TotalMilliseconds;
};

about your second question to detecting the bootstrap server is available or not I'll refer you to this link. in result , you should use AdminClient to managing your topics , brokers etc...

amirvalhalla avatar Jul 19 '22 09:07 amirvalhalla

@amirvalhalla AFAIU, session timeout defines the time frame during which the heartbeat message is awaited, but it does not stop the client.

quixoticaxis avatar Jul 19 '22 10:07 quixoticaxis

@quixoticaxis , so as far as I don't know for what kind of business your writing codes , I'm suggesting you to use AdminClient to managing your brokers etc.. , whenever you get all your brokers or just specific broker is/are unavailable you can clean/delete your topics by this way your clients couldn't get any more messages from Kafka , meanwhile whenever your brokers come available again , your producer will create topics again and other cycles will occure

amirvalhalla avatar Jul 19 '22 10:07 amirvalhalla

@amirvalhalla while it is possible in theory, I would much prefer to have any kind of consumer callback (or at least error message to look for) for the case of complete connection loss, otherwise I would have to query the cluster with AdminClient in a loop.

quixoticaxis avatar Jul 19 '22 10:07 quixoticaxis

@mhowlett , hello, were you asking (in #725. Unfortunately, the link to the wiki is dead. ) for a scenario when detecting connection failures would be appreciated? As I mentioned above, we have a system that tries to minimize the probability of our clients getting stale data, so ideally we would like to kill our application (and thus enable a separate health check mechanism to invalidate the data) in case we are either too slow to process the data, or have connectivity issues that prevent Kafka consumer from accessing the cluster. The current re-connection mechanism blocks Consume indefinitely, and thus we need to design some workaround. The only way I see for now is parsing errors from error handler and stopping the system if all brokers are deemed being down by the consumer.

quixoticaxis avatar Jul 19 '22 19:07 quixoticaxis

so ideally we would like to kill our application (and thus enable a separate health check mechanism to invalidate the data) in case we are either too slow to process the data, or have connectivity issues that prevent Kafka consumer from accessing the cluster.

why not just ignore messages conditional on Message.Timestamp?

The current re-connection mechanism blocks Consume indefinitely

there are variants of Consume that have a timeout.

mhowlett avatar Aug 07 '22 15:08 mhowlett

why not just ignore messages conditional on Message.Timestamp?

Yes, we can filter stale new data, but we cannot be sure that the latest processed data is still not stale, because we don't know whether we are still receiving the messages or not. We can work around this scenario by generating heartbeat messages every few seconds to all topics.

there are variants of Consume that have a timeout.

Thank you. I know, and we use it, but it would help us in the described case only if we implement heartbeat for all topics.

quixoticaxis avatar Aug 07 '22 22:08 quixoticaxis