pulsar-flink
pulsar-flink copied to clipboard
[BUG] OOM occurs when a new task is submitted after the pulsar connector is closed
Describe the bug
On the same flink cluster, after committing the pulsar connector, cancel the task. Resubmit the task, OOM issue will occur. Try a few more times, this problem is bound to appear.
2021-03-17 01:54:12
org.apache.pulsar.client.admin.PulsarAdminException: org.apache.pulsar.shade.io.netty.handler.codec.EncoderException: java.lang.OutOfMemoryError: Cannot reserve 16777216 bytes of direct buffer memory (allocated: 252674791, limit: 268435458)
at org.apache.pulsar.client.admin.internal.BaseResource.getApiException(BaseResource.java:228)
at org.apache.pulsar.client.admin.internal.TopicsImpl$7.failed(TopicsImpl.java:324)
at org.apache.pulsar.shade.org.glassfish.jersey.client.JerseyInvocation$4.failed(JerseyInvocation.java:1030)
at org.apache.pulsar.shade.org.glassfish.jersey.client.ClientRuntime.processFailure(ClientRuntime.java:231)
at org.apache.pulsar.shade.org.glassfish.jersey.client.ClientRuntime.access$100(ClientRuntime.java:85)
at org.apache.pulsar.shade.org.glassfish.jersey.client.ClientRuntime$2.lambda$failure$1(ClientRuntime.java:183)
at org.apache.pulsar.shade.org.glassfish.jersey.internal.Errors$1.call(Errors.java:272)
at org.apache.pulsar.shade.org.glassfish.jersey.internal.Errors$1.call(Errors.java:268)
at org.apache.pulsar.shade.org.glassfish.jersey.internal.Errors.process(Errors.java:316)
at org.apache.pulsar.shade.org.glassfish.jersey.internal.Errors.process(Errors.java:298)
at org.apache.pulsar.shade.org.glassfish.jersey.internal.Errors.process(Errors.java:268)
at org.apache.pulsar.shade.org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:312)
at org.apache.pulsar.shade.org.glassfish.jersey.client.ClientRuntime$2.failure(ClientRuntime.java:183)
at org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector$3.onThrowable(AsyncHttpConnector.java:277)
at org.apache.pulsar.shade.org.asynchttpclient.netty.NettyResponseFuture.abort(NettyResponseFuture.java:277)
at org.apache.pulsar.shade.org.asynchttpclient.netty.request.WriteListener.abortOnThrowable(WriteListener.java:46)
at org.apache.pulsar.shade.org.asynchttpclient.netty.request.WriteListener.operationComplete(WriteListener.java:59)
at org.apache.pulsar.shade.org.asynchttpclient.netty.request.WriteCompleteListener.operationComplete(WriteCompleteListener.java:28)
at org.apache.pulsar.shade.org.asynchttpclient.netty.request.WriteCompleteListener.operationComplete(WriteCompleteListener.java:20)
at org.apache.pulsar.shade.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577)
at org.apache.pulsar.shade.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:551)
at org.apache.pulsar.shade.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490)
at org.apache.pulsar.shade.io.netty.util.concurrent.DefaultPromise.addListener(DefaultPromise.java:183)
at org.apache.pulsar.shade.io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:95)
at org.apache.pulsar.shade.io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:30)
at org.apache.pulsar.shade.org.asynchttpclient.netty.request.NettyRequestSender.writeRequest(NettyRequestSender.java:421)
at org.apache.pulsar.shade.org.asynchttpclient.netty.channel.NettyConnectListener.writeRequest(NettyConnectListener.java:80)
at org.apache.pulsar.shade.org.asynchttpclient.netty.channel.NettyConnectListener.onSuccess(NettyConnectListener.java:156)
at org.apache.pulsar.shade.org.asynchttpclient.netty.channel.NettyChannelConnector$1.onSuccess(NettyChannelConnector.java:92)
at org.apache.pulsar.shade.org.asynchttpclient.netty.SimpleChannelFutureListener.operationComplete(SimpleChannelFutureListener.java:26)
at org.apache.pulsar.shade.org.asynchttpclient.netty.SimpleChannelFutureListener.operationComplete(SimpleChannelFutureListener.java:20)
at org.apache.pulsar.shade.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577)
at org.apache.pulsar.shade.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:570)
at org.apache.pulsar.shade.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:549)
at org.apache.pulsar.shade.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490)
at org.apache.pulsar.shade.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615)
at org.apache.pulsar.shade.io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:604)
at org.apache.pulsar.shade.io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:104)
at org.apache.pulsar.shade.io.netty.channel.DefaultChannelPromise.trySuccess(DefaultChannelPromise.java:84)
at org.apache.pulsar.shade.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:300)
at org.apache.pulsar.shade.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:335)
at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702)
at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at org.apache.pulsar.shade.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at org.apache.pulsar.shade.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: org.apache.pulsar.shade.io.netty.handler.codec.EncoderException: java.lang.OutOfMemoryError: Cannot reserve 16777216 bytes of direct buffer memory (allocated: 252674791, limit: 268435458)
at org.apache.pulsar.shade.io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:107)
at org.apache.pulsar.shade.io.netty.channel.CombinedChannelDuplexHandler.write(CombinedChannelDuplexHandler.java:346)
at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709)
at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792)
at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702)
at org.apache.pulsar.shade.io.netty.handler.stream.ChunkedWriteHandler.doFlush(ChunkedWriteHandler.java:300)
at org.apache.pulsar.shade.io.netty.handler.stream.ChunkedWriteHandler.flush(ChunkedWriteHandler.java:132)
at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750)
at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765)
at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:790)
at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:758)
at org.apache.pulsar.shade.io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:1020)
at org.apache.pulsar.shade.io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:299)
at org.apache.pulsar.shade.org.asynchttpclient.netty.request.NettyRequestSender.writeRequest(NettyRequestSender.java:420)
... 23 more
Caused by: java.lang.OutOfMemoryError: Cannot reserve 16777216 bytes of direct buffer memory (allocated: 252674791, limit: 268435458)
at java.base/java.nio.Bits.reserveMemory(Bits.java:178)
at java.base/java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:120)
at java.base/java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:329)
at org.apache.pulsar.shade.io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:758)
at org.apache.pulsar.shade.io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:734)
at org.apache.pulsar.shade.io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:245)
at org.apache.pulsar.shade.io.netty.buffer.PoolArena.allocate(PoolArena.java:215)
at org.apache.pulsar.shade.io.netty.buffer.PoolArena.allocate(PoolArena.java:147)
at org.apache.pulsar.shade.io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:356)
at org.apache.pulsar.shade.io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187)
at org.apache.pulsar.shade.io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:178)
at org.apache.pulsar.shade.io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:115)
at org.apache.pulsar.shade.io.netty.handler.codec.http.HttpObjectEncoder.encode(HttpObjectEncoder.java:93)
at org.apache.pulsar.shade.io.netty.handler.codec.http.HttpClientCodec$Encoder.encode(HttpClientCodec.java:167)
at org.apache.pulsar.shade.io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:89)
... 37 more
taskmanager_019266c3f3a1d106efddbaf14b82e6b3_thread_dump.txt
Residual pulsar-related threads
pulsar-client-io-6-1
pulsar-timer-8-1
pulsar-client-io-6-1
pulsar-timer-8-1
pulsar-client-io-6-1
pulsar-timer-8-1
pulsar-client-io-6-1
pulsar-external-listener-7-1
To Reproduce Steps to reproduce the behavior:
- Go to '...'
- Click on '....'
- Scroll down to '....'
- See error
Expected behavior A clear and concise description of what you expected to happen.
Screenshots If applicable, add screenshots to help explain your problem.
Additional context Add any other context about the problem here.
Every time I close pulsar's task, there will be these two pulsar-client-io-6-1
and pulsar-timer-8-1
threads that are not closed.
@codelipenghui @jiazhai Do you have any suggestions to troubleshoot and solve them?
I think this issue is easy to reproduce under Java 11, It's a direct buffer memory limit in Netty. We can resolve this issue by adding limit support.
Any update to this?
We experienced problem that @syhily described here https://github.com/streamnative/pulsar-flink/issues/170.
After some networking issues after upgrading k8s cluster we started getting java.lang.OutOfMemoryError: Direct buffer memory
We are using flink 1.13 and flink pulsar connector 1.13.1.5-rc3.
Has this been fixed in newer versions? Or are than any future plans for fixing this?