kafka-python
                                
                                 kafka-python copied to clipboard
                                
                                    kafka-python copied to clipboard
                            
                            
                            
                        Config parameter 'coordinator_not_ready_retry_timeout_ms'
I appreciate the motivation here, but I think a better solution would be to pass a timeout parameter to ensure_coordinator_known . That is the approach taken in the java client (see KAFKA-4426).
Yes, I can add a timeout parameter to ensure_coordinator_known and can call it with the desired value of the timeout. None can be the default value in order not to change the existing functionality. And the desired timeout value can be taken from the configuration.
Isn't it what I have implemented? I implemented this to only handle NodeNotReadyError because this is the only error I could produce. So I named the timeout as node_not_ready_retry_timeout_ms.
Can this be merged now?
@dpkp I have checked the changes made for KAFKA-4426 but I think the problem I mentioned here does not fit to that situation. KAFKA-4426 handles the closing of the KafkaConsumer in different scenario including unavailable coordinator. In this case, the consumer is polling without knowing if the coordinator is available or not, and the host code using the KafkaConsumer may decide to close the KafkaConsumer if it gets notified about the unavailable coordinator. IMO Kafka is designed with resiliency assuming that there will always be a coordinator. Therefore the current implementation is correct for most of the cases. But in my case I want to handle the only coordinator being unavailable, and let the hosting code handle it once it is aware of the unavailable coordinator. Since the timeout in polling is not carried on to coordinator poll, it polls infinitely (until a coordinator is available. In my case the coordinator will not be available with the same connection string). This is kind of a design issue, that's why I added another configuration parameter. I don't want to change the resiliency feature, but I want to add an option to get notified about the unavailable coordinator.
I faced the same problem in Java client. But in Java client there is a difference with the Python client in ConsumerCoordinator.poll; if the partitions are manually assigned the coordinator readiness check is skipped in the Java client.
This branch has conflicts that must be resolved