[CURATOR-320] Discovery reregister triggered even if retry policy suceeds. Connection looping condition.
ServiceDiscoveryImpl.reRegisterServices() can be trigger on ConnectionState events: RECONNECTED and CONNECTED. Causing the reRegisterServices() method to be run on ConnectionStateManager thread. If a connection drops while running reRegisterServices() it will be recovered by the retry policy. However the ConnectionState SUSPENDED followed by RECONNECTED events will be queued but not fired until reRegisterServices() completes(ConnectionStateManager Thread fires these events but is in use). When it does complete the RECONNECTED event in the queue will fire and reRegisterServices() will rerun.
When zookeeper's server connection is interrupted all of the clients will simultaneously call reRegisterServices(). This overloads the server with requests causing connections to timeout and reset. Thus queuing up more RECONNECTED events. This state can persist indefinitely.
Because the reRegisterServices() will most likely receive a NodeExistsException. It deletes and recreates the node. Effectively causing the services to thrash up and down. Wreaking havoc on our service dependency chain.
Originally reported by runningfly, imported from: Discovery reregister triggered even if retry policy suceeds. Connection looping condition.
- status: Open
- priority: Major
- resolution: Unresolved
- imported: 2025-01-21
If it helps, I did write some code to treat the symptoms. I never felt like I had a good enough understanding of Curator to create a proper patch.
These are the workaround changes I made to "org.apache.curator.x.discovery.details.ServiceDiscoveryImpl". It doesn't completely eliminate the problem but It does help it recover and avoid it from getting stuck in an indeterminate loop. It essentially aborts the reregister if the connection drops and start over. Every time it fails it waits a little longer before retrying. This helps to soften the load on the ZK server during a sudden connection recovery. Its far from ideal but it works and has gotten us by for months.
/** * A mechanism to register and query service instances using ZooKeeper @@ -66,17 +70,16 @@ private final Collection> providers = Sets .newSetFromMap(Maps. , Boolean> newConcurrentMap()); private final boolean watchInstances; + + private ExecutorService reRegisterExecutor = ThreadUtils.newSingleThreadExecutor("reRegister"); + private volatile Future reRegisterFuture = null; + private int reRegisterRetryCount = 0; private final ConnectionStateListener connectionStateListener = new ConnectionStateListener() { @Override public void stateChanged(CuratorFramework client, ConnectionState newState) { if ((newState == ConnectionState.RECONNECTED) || (newState == ConnectionState.CONNECTED)) { -try { - log.debug("Re-registering due to reconnection"); - reRegisterServices(); -} catch (Exception e) { - ThreadUtils.checkInterrupted(e); - log.error("Could not re-register instances after reconnection", e); -} +log.warn("Reconnection event. Calling re-register."); +reRegisterServices(true); } } }; @@ -118,11 +121,7 @@ */ @Override public void start() throws Exception { - try { - reRegisterServices(); - } catch (KeeperException e) { - log.error("Could not register instances - will try again later", e); - } + reRegisterServices(false); client.getConnectionStateListenable().addListener(connectionStateListener); } @@ -368,10 +367,57 @@ return (entry != null) ? entry.service : null; }
- private void reRegisterServices() throws Exception {
- for (final Entry<T> entry : services.values()) {
<span class="code-keyword">synchronized</span> (entry) {-internalRegisterService(entry.service);
- private void reRegisterServices(final boolean concurrent) {
- synchronized (reRegisterExecutor) {
<span class="code-keyword">if</span> (reRegisterFuture != <span class="code-keyword">null</span>) {+reRegisterFuture.cancel(true); +log.warn("Re-register restarting."); +reRegisterRetryCount++;
}reRegisterFuture = reRegisterExecutor.submit(<span class="code-keyword">new</span> <span class="code-object">Runnable</span>() {+@Override +public void run() {
- int count = 0;
- try {
- if (reRegisterRetryCount > 0) {
<span class="code-object">int</span> secToWait = reRegisterRetryCount * 5;secToWait = secToWait < 180 ? secToWait : 180;log.info(<span class="code-quote">"Re-register attempt {} will start in {}s"</span>, reRegisterRetryCount, secToWait);<span class="code-object">Thread</span>.sleep(secToWait * 1000);- } else {
log.info(<span class="code-quote">"Re-register will start immediately"</span>);- }
- for (final Entry<T> entry : services.values()) {
<span class="code-keyword">synchronized</span> (entry) {+if (Thread.interrupted()) {
- throw new InterruptedException(); +} +internalRegisterService(entry.service);
}count++;- }
- log.warn(
- "Re-registered {} services."
- (reRegisterRetryCount > 0 ? " After {} retries." : ""),
- count, reRegisterRetryCount);
- synchronized (reRegisterExecutor) {
reRegisterRetryCount = 0;reRegisterFuture = <span class="code-keyword">null</span>;- }
- } catch (InterruptedException ie) {
- log.warn("Re-register interrupted. After registering {} services.", count);
- } catch (Exception e) {
- ThreadUtils.checkInterrupted(e);
- log.error("Could not re-register instances after reconnection", e);
- } +}
});- }
- if (concurrent == false) {
<span class="code-keyword">try</span> {+reRegisterFuture.get();
} <span class="code-keyword">catch</span> (Exception e) {+log.error("Re-register execution exception:", e); } } }
Running Fly can you please create a PR with your change? We can work on it together from that form.