pulsar-client-dotnet
pulsar-client-dotnet copied to clipboard
Add support for Pulsar cluster level auto failover (PIP-121)
Please add support for cluster-level auto failover similar to Java client. Related information: Cluster Level Fail Over PIP-121 Java Client MR
Hi! I'm not actively using Pulsar and not adding new functionality to the library myself (only doing bug fixes), but always happy to support if someone wants to add smth, so if you need this feature - you can send a PR
So far I get to the point: in java code when the cluster address changed, they do this:
pulsarClient.updateServiceUrl(serviceUrl);
pulsarClient.reloadLookUp();
log.info("Updating service URL to {}", serviceUrl);
conf.setServiceUrl(serviceUrl);
lookup.updateServiceUrl(serviceUrl);
cnxPool.closeAllConnections();
and on reload lookup:
if (conf.getServiceUrl().startsWith("http")) {
lookup = new HttpLookupService(conf, eventLoopGroup);
} else {
lookup = new BinaryProtoLookupService(this, conf.getServiceUrl(), conf.getListenerName(), conf.isUseTls(),
externalExecutorProvider.getExecutor());
}
So far I did same things in pulsarClient
backgroundTask {
let q = { config with ServiceAddresses = addresses }
do! connectionPool.CloseAsync()
connectionPool <- ConnectionPool(q)
lookupService <- BinaryLookupService(q, connectionPool)
}
but all producers are still connected to old address. It looks like in Java code when all connections are closed, producers/consumers will go to new lookup to get new addresses(?), but in F# they do not.
I think you need to ensure that once connection is dropped consumers/producers should already have new address for reconnection. In your code you make connection drop first, so probably reconnect happens immediately to the same address
once connection is dropped consumers/producers should already have new address for reconnection
I tried to rewrite like this:
backgroundTask {
let q = { config with ServiceAddresses = addresses }
let oldPool = connectionPool
connectionPool <- ConnectionPool(q)
lookupService <- BinaryLookupService(q, connectionPool)
do! oldPool.CloseAsync()
}
Interesting, if I start demo app + 2 clusters on 6650 and 6651 addresses, and a webapp which tells pulsar to swtich from 6650 to 6651 - it works :) But if I start with 6650, then boot up the app to broadcast new address 6651, it fails to reconnect tp 6651 with inner exception
Unhandled exception. System.TimeoutException: Could not send message to broker within given timeout
at <StartupCode$Pulsar-Client>[email protected]() in F:\work\pulsar-client-dotnet\src\Pulsar.Client\Internal\ProducerImpl.fs:line 767
So, it seems like if there is an established connection to broker, my code fails to change the address. But if there is no connection yet (address is switched before client is connected to broker), it works ok.
@fjod I think you can create a PR already so I could try it myself and help