jetcd
jetcd copied to clipboard
Thread blocked on get key inside a watcher
Versions
- etcd: 3.5.1
- jetcd: 0.7.3
- java: 11.0.8
Describe the bug
I don't know if it's a bug or a miss usage of the jetcd sdk but since we migrate from jetcd version 0.5.7
to 0.7.3
when we do a get key within a watcher notification we get the exception io.vertx.core.VertxException: Thread blocked
and indeed the get key is stuck.
To Reproduce Here how I reproduce:
@Test
public void testWatcherAndGet() throws Exception {
final Client client = Client.builder().endpoints("http://localhost:32784").build();
final ByteSequence keyToGet = ByteSequence.from("/keytoGet", StandardCharsets.UTF_8);
final ByteSequence keyToWatch = ByteSequence.from("/keytoWatch", StandardCharsets.UTF_8);
final ByteSequence keyValueToWatch = ByteSequence.from("keyValuetoWatch", StandardCharsets.UTF_8);
CountDownLatch countDownLatch = new CountDownLatch(1);
client.getWatchClient().watch(keyToWatch, WatchOption.DEFAULT, new Watch.Listener() {
@Override
public void onNext(WatchResponse watchResponse) {
try {
client.getKVClient().get(keyToGet).get(); // stuck there with 0.7.3 but ok with 0.5.7
countDownLatch.countDown();
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void onError(Throwable throwable) { }
@Override
public void onCompleted() { }
});
client.getKVClient().put(keyToWatch, keyValueToWatch).get();
countDownLatch.await();
}
Here is the full exception:
2022-08-18T16:36:31.680Z dev WARN vertx-blocked-thread-checker i.v.c.i.BlockedThreadChecker Thread Thread[vert.x-eventloop-thread-0,5,main] has been blocked for 2413 ms, time limit is 2000 ms
2022-08-18T16:36:32.681Z dev WARN vertx-blocked-thread-checker i.v.c.i.BlockedThreadChecker Thread Thread[vert.x-eventloop-thread-0,5,main] has been blocked for 3415 ms, time limit is 2000 ms
2022-08-18T16:36:33.682Z dev WARN vertx-blocked-thread-checker i.v.c.i.BlockedThreadChecker Thread Thread[vert.x-eventloop-thread-0,5,main] has been blocked for 4415 ms, time limit is 2000 ms
2022-08-18T16:36:34.688Z dev WARN vertx-blocked-thread-checker i.v.c.i.BlockedThreadChecker Thread Thread[vert.x-eventloop-thread-0,5,main] has been blocked for 5419 ms, time limit is 2000 ms
io.vertx.core.VertxException: Thread blocked
at [email protected]/jdk.internal.misc.Unsafe.park(Native Method)
at [email protected]/java.util.concurrent.locks.LockSupport.park(LockSupport.java:194)
at [email protected]/java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1796)
at [email protected]/java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3128)
at [email protected]/java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1823)
at [email protected]/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1998)
at app//TestEtcdClient$4.onNext(TestEtcdClient.java:571)
at app//io.etcd.jetcd.impl.WatchImpl$WatcherImpl.onNext(WatchImpl.java:310)
at app//io.etcd.jetcd.impl.WatchImpl$WatcherImpl$$Lambda$183/0x00000008004af840.handle(Unknown Source)
at app//io.vertx.grpc.stub.StreamObserverReadStream.onNext(StreamObserverReadStream.java:37)
at app//io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onMessage(ClientCalls.java:474)
at app//io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1MessagesAvailable.runInternal(ClientCallImpl.java:661)
at app//io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1MessagesAvailable.runInContext(ClientCallImpl.java:646)
at app//io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at app//io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
at app//io.vertx.grpc.VertxChannelBuilder.lambda$null$0(VertxChannelBuilder.java:298)
at app//io.vertx.grpc.VertxChannelBuilder$$Lambda$175/0x00000008003db840.handle(Unknown Source)
at app//io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:264)
at app//io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:246)
at app//io.vertx.grpc.VertxChannelBuilder.lambda$build$1(VertxChannelBuilder.java:298)
at app//io.vertx.grpc.VertxChannelBuilder$$Lambda$139/0x0000000800375c40.execute(Unknown Source)
at app//io.grpc.internal.SerializingExecutor.schedule(SerializingExecutor.java:102)
at app//io.grpc.internal.SerializingExecutor.execute(SerializingExecutor.java:95)
at app//io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.messagesAvailable(ClientCallImpl.java:677)
at app//io.grpc.internal.RetriableStream$Sublistener$6.run(RetriableStream.java:1065)
at app//io.grpc.SynchronizationContext.drain(SynchronizationContext.java:95)
at app//io.grpc.SynchronizationContext.execute(SynchronizationContext.java:127)
at app//io.grpc.internal.RetriableStream$Sublistener.messagesAvailable(RetriableStream.java:1061)
at app//io.grpc.internal.ForwardingClientStreamListener.messagesAvailable(ForwardingClientStreamListener.java:39)
at app//io.grpc.internal.AbstractStream$TransportState.messagesAvailable(AbstractStream.java:182)
at app//io.grpc.internal.MessageDeframer.processBody(MessageDeframer.java:412)
at app//io.grpc.internal.MessageDeframer.deliver(MessageDeframer.java:275)
at app//io.grpc.internal.MessageDeframer.deframe(MessageDeframer.java:177)
at app//io.grpc.internal.AbstractStream$TransportState.deframe(AbstractStream.java:210)
at app//io.grpc.internal.AbstractClientStream$TransportState.inboundDataReceived(AbstractClientStream.java:361)
at app//io.grpc.internal.Http2ClientStreamTransportState.transportDataReceived(Http2ClientStreamTransportState.java:148)
at app//io.grpc.netty.NettyClientStream$TransportState.transportDataReceived(NettyClientStream.java:348)
at app//io.grpc.netty.NettyClientHandler.onDataRead(NettyClientHandler.java:387)
at app//io.grpc.netty.NettyClientHandler.access$1100(NettyClientHandler.java:91)
at app//io.grpc.netty.NettyClientHandler$FrameListener.onDataRead(NettyClientHandler.java:927)
at app//io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$FrameReadListener.onDataRead(DefaultHttp2ConnectionDecoder.java:307)
at app//io.netty.handler.codec.http2.Http2InboundFrameLogger$1.onDataRead(Http2InboundFrameLogger.java:48)
at app//io.netty.handler.codec.http2.DefaultHttp2FrameReader.readDataFrame(DefaultHttp2FrameReader.java:415)
at app//io.netty.handler.codec.http2.DefaultHttp2FrameReader.processPayloadState(DefaultHttp2FrameReader.java:250)
at app//io.netty.handler.codec.http2.DefaultHttp2FrameReader.readFrame(DefaultHttp2FrameReader.java:159)
at app//io.netty.handler.codec.http2.Http2InboundFrameLogger.readFrame(Http2InboundFrameLogger.java:41)
at app//io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder.decodeFrame(DefaultHttp2ConnectionDecoder.java:173)
at app//io.netty.handler.codec.http2.Http2ConnectionHandler$FrameDecoder.decode(Http2ConnectionHandler.java:378)
at app//io.netty.handler.codec.http2.Http2ConnectionHandler.decode(Http2ConnectionHandler.java:438)
at app//io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:510)
at app//io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:449)
at app//io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279)
at app//io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at app//io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at app//io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at app//io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1373)
at app//io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1236)
at app//io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1285)
at app//io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:510)
at app//io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:449)
at app//io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279)
at app//io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at app//io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at app//io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at app//io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at app//io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at app//io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at app//io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at app//io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at app//io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
at app//io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
at app//io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
at app//io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
at app//io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at app//io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at app//io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at [email protected]/java.lang.Thread.run(Thread.java:834)
Expected behavior I expect the test not to be stuck and the get key return its result.
Additional context NA
@q12321q do you have any time to investigate further ?
Hello @lburgazzoli, Are you able to reproduce? I tested it with the 0.6.1 version and no issue there as well. If I wrap
client.getKVClient().get(keyToGet).get();
countDownLatch.countDown();
in a non blocking thread the test works in 0.7.3.
My guess is that the switch to Vertx produce this regression. The watcher notification and the key get are in the event loop (i.e. single thread) and the first one wait for the second hence thread is blocked. This is just a supposition as I'm not very familiar with Vertx. I can wrap all my watch notifications in non blocking threads but it's not a impact less change so first I would like to understand if it's an expected behavior from the jetcd point of view and where it should be fixed: jetcd library side or users side?
Thanks a lot for your help
I can't look at the issue now and probably for a few days so I would love if you can help me on this. Yes this is a regression and your analysis seems to be reasonable to me.
Thanks a lot for having a look at this issue. Sure I'll do my best to help you.
@q12321q May I ask how you can avoid this problem? I am also encountering this problem now.
I guess you should use an async call so don't use client.getKVClient().get(keyToGet).get() but i.e. client.getKVClient().get(keyToGet).accept(...)
I also have the same problem in reactor,but reactor throws BlockingException, I have an idea, can you have async method execute it.
I don't know the method of get
in etcd, but I guess it's blocking? 👂
Try with something like this:
public void testWatchAndGet(final Client client) throws Exception {
final ByteSequence key = randomByteSequence();
final ByteSequence value = randomByteSequence();
final AtomicReference<KeyValue> ref = new AtomicReference<>();
final Consumer<WatchResponse> consumer = response -> {
for (WatchEvent event : response.getEvents()) {
if (event.getEventType() == EventType.PUT) {
ByteSequence key1 = event.getKeyValue().getKey();
client.getKVClient().get(key1).whenComplete((r, t) -> {
if (!r.getKvs().isEmpty()) {
ref.set(r.getKvs().get(0));
}
});
}
}
};
try (Watcher watcher = client.getWatchClient().watch(key, consumer)) {
client.getKVClient().put(key, value).get();
await().atMost(TIME_OUT_SECONDS, TimeUnit.SECONDS).untilAsserted(() -> assertThat(ref.get()).isNotNull());
assertThat(ref.get()).isNotNull();
assertThat(ref.get().getKey()).isEqualTo(key);
assertThat(ref.get().getValue()).isEqualTo(value);
}
}
Try with something like this:
public void testWatchAndGet(final Client client) throws Exception { final ByteSequence key = randomByteSequence(); final ByteSequence value = randomByteSequence(); final AtomicReference<KeyValue> ref = new AtomicReference<>(); final Consumer<WatchResponse> consumer = response -> { for (WatchEvent event : response.getEvents()) { if (event.getEventType() == EventType.PUT) { ByteSequence key1 = event.getKeyValue().getKey(); client.getKVClient().get(key1).whenComplete((r, t) -> { if (!r.getKvs().isEmpty()) { ref.set(r.getKvs().get(0)); } }); } } }; try (Watcher watcher = client.getWatchClient().watch(key, consumer)) { client.getKVClient().put(key, value).get(); await().atMost(TIME_OUT_SECONDS, TimeUnit.SECONDS).untilAsserted(() -> assertThat(ref.get()).isNotNull()); assertThat(ref.get()).isNotNull(); assertThat(ref.get().getKey()).isEqualTo(key); assertThat(ref.get().getValue()).isEqualTo(value); } }
I have tested this code, the result is as follows
The vertx thread always is single, I don't know why do jetcd have a single consume thread?
@moremind, @q12321q does my example solve the issue ? if so please close it.
The vertx thread always is single, I don't know why do jetcd have a single consume thread?
What is the issue ?
@moremind, @q12321q does my example solve the issue ? if so please close it.
I think it can solve the bug. thanks for your answer!
The vertx thread always is single, I don't know why do jetcd have a single consume thread?
What is the issue ?
I think the thread 'vetx.x-eventloop-0' should not be single, but I test it, and it is single
this is new test
I still don't get what is the issue as vert.x use event loops so things may end up on the the same thread. You are free to open another issue and investigate more but jetcd does not do anything particular rather to delegate to vertx
I still don't get what is the issue as vert.x use event loops so things may end up on the the same thread. You are free to open another issue and investigate more but jetcd does not do anything particular rather to delegate to vertx
yes, I know that I will reference to vertx
@lburgazzoli for me the issue is that the migration to vert.x force us not to block on some call backs. It's new requirement and a undocumented breaking change. What will happen if we block the events loop for some time? My guess is that it'll also block all the other ETCD requests in the meantime?
@lburgazzoli for me the issue is that the migration to vert.x force us not to block on some call backs. It's new requirement and a undocumented breaking change.
Unfortunately I'm the only maintainer of the project and I asked many time to get some help but no one step up to help so I do what I think it is right and what helps me reducing the maintenance cost. Every time I've opened issues related to similar changes for discussion, I have received no feedback so I assume people do not care or do not use jetcd.
So I'm sorry for the breaking change but given I'm alone and I don't have much time, I can't do much.
What will happen if we block the events loop for some time? My guess is that it'll also block all the other ETCD requests in the meantime?
I think this is something you have to take into account anyway as in general the expectation is that you should use the callback methods to handle responses. That is why we return a CompletableFuture and not a blocking one
Oh sorry if my tone was too direct! I didn't open the issue to blame. Thanks a lot for your work and my bad to not have enough time to help you more in this issue. My goal here is to clearly understand the new implication of the migration vert.x and find the best way to remediate.
My feeling here is that we may have to wrap the watcher's callbacks in a thread pool. The whenComplete
won't solve entirely the issue as our application can block for several seconds.
Maybe we should document this behavior as I feel that we won't be the only ones to have the issue. What do you think?
thanks for your response, if possible, I am glad to seek the question.
Oh sorry if my tone was too direct! I didn't open the issue to blame. Thanks a lot for your work and my bad to not have enough time to help you more in this issue. My goal here is to clearly understand the new implication of the migration vert.x and find the best way to remediate.
No worries at all
My feeling here is that we may have to wrap the watcher's callbacks in a thread pool. The
whenComplete
won't solve entirely the issue as our application can block for several seconds.Maybe we should document this behavior as I feel that we won't be the only ones to have the issue. What do you think?
I don't think we should wrap the watcher's callbacks in a thread pool as if the pool become full then you may end blocking anyway so that should be handled by the application code (i.e. put responses in a queue). Honestly the watch should be implemented using the Flow APIs but I had to hold back on this because of people are asking for keep supporting Java 8 ...
I also encountered this problem. I think callback processing is a good solution
This issue is stale because it has been open 60 days with no activity. Remove stale label or comment or this will be closed in 7 days.
I have the similiar problem, my etcd client sometime stucks, and never comes back, while the internet is working perfectly. And in some of cases, for example, I used 4 thread to submit request to get data by the client, and 2 of them is blocked for no reason, but rest of them is still working. I am still trying to figure out what happens, but it seems to be not easy to reproduce this problem. Since then, it happens frequently on our production machine, which is on a private enviroment. But not able to reproduce on our local machine.
I still don't get what is the issue as vert.x use event loops so things may end up on the the same thread. You are free to open another issue and investigate more but jetcd does not do anything particular rather to delegate to vertx
yes, I know that I will reference to vertx
Is there any new progress on this issue? @moremind @lburgazzoli We have encountered a performance issue, initially suspected to be related to this thread, because it is always single.
@Wackerle if you think this is an issue, please open a new one or it will be forgotten and more important, any help would really be appreciated as unfortunately, I don't have much time to dedicate to troubleshoot issues like this one.,