[Bug] Brokers in a cluster become unhealthy collectively
Search before asking
- [X] I searched in the issues and found nothing similar.
Version
Ubuntu 20.04 Pulsar 2.7.0
Cluster setup 5 zookeeper 5 bookkeeper 5 broker
- with pulsar function and websocket enabled
- 4 out of 5 are behind a proxy, the last 1 is isolated from external connections in order to focus on pulsar function executions
Each node with at least 64 GB RAM, 16 physical core CPU
Minimal reproduce step
Start the cluster, it should run stably at first.
Create 20k websocket connections to the proxy, thus each of 4 brokers behind proxy establishes 5k websocket connections. Each connection creates a new topic, so total 20k topics. Send data about 1k messages per seconds.
What did you expect to see?
Brokers should handle the load well, and not die collectively (that's the whole point of clustering!).
What did you see instead?
Our cluster was running OK for slightly more than a day. We were using it with Apache Flink, we did occasionally see this error from Flink
java.lang.RuntimeException: Failed to get schema information for persistent://public/xxxxx/xxxxxxxx
at org.apache.flink.streaming.connectors.pulsar.internal.SchemaUtils.uploadPulsarSchema(SchemaUtils.java:64)
at org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSinkBase.uploadSchema(FlinkPulsarSinkBase.java:302)
at org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSinkBase.getProducer(FlinkPulsarSinkBase.java:320)
at org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSink.invoke(FlinkPulsarSink.java:103)
at org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSink.invoke(FlinkPulsarSink.java:41)
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:223)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
at org.apache.flink.streaming.api.operators.StreamGroupedReduceOperator.processElement(StreamGroupedReduceOperator.java:64)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:187)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:395)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.pulsar.client.admin.PulsarAdminException$ServerSideErrorException: HTTP 500 Internal Server Error
at org.apache.pulsar.client.admin.internal.BaseResource.getApiException(BaseResource.java:209)
at org.apache.pulsar.client.admin.internal.SchemasImpl$1.failed(SchemasImpl.java:85)
at org.apache.pulsar.shade.org.glassfish.jersey.client.JerseyInvocation$1.failed(JerseyInvocation.java:839)
at org.apache.pulsar.shade.org.glassfish.jersey.client.JerseyInvocation$1.completed(JerseyInvocation.java:820)
at org.apache.pulsar.shade.org.glassfish.jersey.client.ClientRuntime.processResponse(ClientRuntime.java:229)
at org.apache.pulsar.shade.org.glassfish.jersey.client.ClientRuntime.access$200(ClientRuntime.java:62)
at org.apache.pulsar.shade.org.glassfish.jersey.client.ClientRuntime$2.lambda$response$0(ClientRuntime.java:173)
at org.apache.pulsar.shade.org.glassfish.jersey.internal.Errors$1.call(Errors.java:248)
at org.apache.pulsar.shade.org.glassfish.jersey.internal.Errors$1.call(Errors.java:244)
at org.apache.pulsar.shade.org.glassfish.jersey.internal.Errors.process(Errors.java:292)
at org.apache.pulsar.shade.org.glassfish.jersey.internal.Errors.process(Errors.java:274)
at org.apache.pulsar.shade.org.glassfish.jersey.internal.Errors.process(Errors.java:244)
at org.apache.pulsar.shade.org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:288)
at org.apache.pulsar.shade.org.glassfish.jersey.client.ClientRuntime$2.response(ClientRuntime.java:173)
at org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector.lambda$apply$1(AsyncHttpConnector.java:212)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector.lambda$retryOperation$3(AsyncHttpConnector.java:253)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at org.apache.pulsar.shade.org.asynchttpclient.netty.NettyResponseFuture.loadContent(NettyResponseFuture.java:222)
at org.apache.pulsar.shade.org.asynchttpclient.netty.NettyResponseFuture.done(NettyResponseFuture.java:257)
at org.apache.pulsar.shade.org.asynchttpclient.netty.handler.AsyncHttpClientHandler.finishUpdate(AsyncHttpClientHandler.java:241)
at org.apache.pulsar.shade.org.asynchttpclient.netty.handler.HttpHandler.handleChunk(HttpHandler.java:114)
at org.apache.pulsar.shade.org.asynchttpclient.netty.handler.HttpHandler.handleRead(HttpHandler.java:143)
at org.apache.pulsar.shade.org.asynchttpclient.netty.handler.AsyncHttpClientHandler.channelRead(AsyncHttpClientHandler.java:78)
at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at org.apache.pulsar.shade.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at org.apache.pulsar.shade.io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
at org.apache.pulsar.shade.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
at org.apache.pulsar.shade.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
at org.apache.pulsar.shade.io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at org.apache.pulsar.shade.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at org.apache.pulsar.shade.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at org.apache.pulsar.shade.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at org.apache.pulsar.shade.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at org.apache.pulsar.shade.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
... 1 more
Caused by: org.apache.pulsar.shade.javax.ws.rs.InternalServerErrorException: HTTP 500 Internal Server Error
at org.apache.pulsar.shade.org.glassfish.jersey.client.JerseyInvocation.convertToException(JerseyInvocation.java:914)
at org.apache.pulsar.shade.org.glassfish.jersey.client.JerseyInvocation.access$500(JerseyInvocation.java:77)
... 54 more
But it seems pretty harmless. Flink job able to run after restart. All brokers are healthy.
At some point after a day, when the load starts peaking, all 4 brokers showing these logs
...
[pulsar-client-io-63-6] ERROR org.apache.pulsar.client.impl.ProducerImpl - [xxxx] [null] Failed to create producer: java.io.IOException: Error while using ZooKeeper - ledger=24 - operation=Failed to open ledger
...
[pulsar-client-io-63-1] ERROR org.apache.pulsar.client.impl.ProducerImpl - [xxxx] [xxxx] Failed to create producer: Disconnected from server at xxxxx
All 4 nodes are unhealthy at this stage, and they are not able to recover itself from this state without manual intervention.
After manually restarting brokers, it is back to normal again.
Anything else?
We have serious instability issues with our production Pulsar cluster with 3 nodes. We recently updated to 5 nodes, hoping it can be more stable. Unfortunately, the broker still become unhealthy and cannot cover from it. Manual intervention is costly and not feasible for production environment.
Why brokers become unhealthy collectively? How can we prevent that?
Are you willing to submit a PR?
- [ ] I'm willing to submit a PR!
Pulsar 2.7.0 is an outdated version that contains security vulnerabilities (Log4Shell, CVE-2021-22160, CVE-2021-41571). It's strongly recommended upgrading to a more recent version. In your case, it's first recommended to upgrade to Pulsar 2.7.4 . After that, it's possible to continue upgrading to stay on a version that will be maintained for a longer period of time. 2.7.x will become EOL soon. Supported versions are listed on https://pulsar.apache.org/docs/next/security-policy-and-supported-versions/ .
Pulsar Release 2.7.5 is on the way.
Thanks for your input @lhotari @HQebupt
Though do we have any bug fixes on broker stability from 2.7.0 to 2.7.4/5?
Often when the brokers in the cluster become unhealthy, and we try to restart them, they always hit this exception
15:14:43.615 [main] ERROR org.apache.pulsar.functions.worker.WorkerService - Error Starting up in worker
Which sometimes is due to timeout, or connection refused when connecting to other brokers. What we always end up doing, is shutting off external traffic at proxy level, and let the brokers recover first. For some reasons, brokers unable to startup when there is high external traffic.
Have you encountered similar issue before? This seems to happen to pulsar function enabled broker.
Though do we have any bug fixes on broker stability from 2.7.0 to 2.7.4/5?
Yes, there's a large number of improvements and fixes.
Thanks again @lhotari I have upgraded to 2.10.1, and it seems to be much more stable. Closing this. Thanks for your good works!