pulsar-client-dotnet icon indicating copy to clipboard operation
pulsar-client-dotnet copied to clipboard

Add support for Pulsar cluster level auto failover (PIP-121)

Open LeoSht opened this issue 2 years ago • 5 comments

Please add support for cluster-level auto failover similar to Java client. Related information: Cluster Level Fail Over PIP-121 Java Client MR

LeoSht avatar Jan 17 '23 19:01 LeoSht

Hi! I'm not actively using Pulsar and not adding new functionality to the library myself (only doing bug fixes), but always happy to support if someone wants to add smth, so if you need this feature - you can send a PR

Lanayx avatar Jan 17 '23 19:01 Lanayx

So far I get to the point: in java code when the cluster address changed, they do this:

   pulsarClient.updateServiceUrl(serviceUrl);
   pulsarClient.reloadLookUp();

On update service url:

 log.info("Updating service URL to {}", serviceUrl);
        conf.setServiceUrl(serviceUrl);
        lookup.updateServiceUrl(serviceUrl);
        cnxPool.closeAllConnections();

and on reload lookup:

 if (conf.getServiceUrl().startsWith("http")) {
            lookup = new HttpLookupService(conf, eventLoopGroup);
        } else {
            lookup = new BinaryProtoLookupService(this, conf.getServiceUrl(), conf.getListenerName(), conf.isUseTls(),
                    externalExecutorProvider.getExecutor());
        }

So far I did same things in pulsarClient

 backgroundTask {
       let q = { config with ServiceAddresses = addresses } 
       do! connectionPool.CloseAsync()       
       connectionPool <- ConnectionPool(q)
       lookupService <- BinaryLookupService(q, connectionPool)       
       }

but all producers are still connected to old address. It looks like in Java code when all connections are closed, producers/consumers will go to new lookup to get new addresses(?), but in F# they do not.

fjod avatar Jul 16 '23 11:07 fjod

I think you need to ensure that once connection is dropped consumers/producers should already have new address for reconnection. In your code you make connection drop first, so probably reconnect happens immediately to the same address

Lanayx avatar Jul 16 '23 15:07 Lanayx

once connection is dropped consumers/producers should already have new address for reconnection

I tried to rewrite like this:

 backgroundTask {
       let q = { config with ServiceAddresses = addresses }
       let oldPool = connectionPool
       connectionPool <- ConnectionPool(q)
       lookupService <- BinaryLookupService(q, connectionPool)
       do! oldPool.CloseAsync()      
       }

Interesting, if I start demo app + 2 clusters on 6650 and 6651 addresses, and a webapp which tells pulsar to swtich from 6650 to 6651 - it works :) But if I start with 6650, then boot up the app to broadcast new address 6651, it fails to reconnect tp 6651 with inner exception

Unhandled exception. System.TimeoutException: Could not send message to broker within given timeout
  at <StartupCode$Pulsar-Client>[email protected]() in F:\work\pulsar-client-dotnet\src\Pulsar.Client\Internal\ProducerImpl.fs:line 767

So, it seems like if there is an established connection to broker, my code fails to change the address. But if there is no connection yet (address is switched before client is connected to broker), it works ok.

fjod avatar Aug 13 '23 11:08 fjod

@fjod I think you can create a PR already so I could try it myself and help

Lanayx avatar Aug 13 '23 17:08 Lanayx