pulsar icon indicating copy to clipboard operation
pulsar copied to clipboard

[Bug] Brokers in a cluster become unhealthy collectively

Open Raven888888 opened this issue 3 years ago • 0 comments

Search before asking

  • [X] I searched in the issues and found nothing similar.

Version

Ubuntu 20.04 Pulsar 2.7.0

Cluster setup 5 zookeeper 5 bookkeeper 5 broker

  • with pulsar function and websocket enabled
  • 4 out of 5 are behind a proxy, the last 1 is isolated from external connections in order to focus on pulsar function executions

Each node with at least 64 GB RAM, 16 physical core CPU

Minimal reproduce step

Start the cluster, it should run stably at first.

Create 20k websocket connections to the proxy, thus each of 4 brokers behind proxy establishes 5k websocket connections. Each connection creates a new topic, so total 20k topics. Send data about 1k messages per seconds.

What did you expect to see?

Brokers should handle the load well, and not die collectively (that's the whole point of clustering!).

What did you see instead?

Our cluster was running OK for slightly more than a day. We were using it with Apache Flink, we did occasionally see this error from Flink

java.lang.RuntimeException: Failed to get schema information for persistent://public/xxxxx/xxxxxxxx
	at org.apache.flink.streaming.connectors.pulsar.internal.SchemaUtils.uploadPulsarSchema(SchemaUtils.java:64)
	at org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSinkBase.uploadSchema(FlinkPulsarSinkBase.java:302)
	at org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSinkBase.getProducer(FlinkPulsarSinkBase.java:320)
	at org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSink.invoke(FlinkPulsarSink.java:103)
	at org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSink.invoke(FlinkPulsarSink.java:41)
	at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:223)
	at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
	at org.apache.flink.streaming.api.operators.StreamGroupedReduceOperator.processElement(StreamGroupedReduceOperator.java:64)
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:187)
	at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204)
	at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:395)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.pulsar.client.admin.PulsarAdminException$ServerSideErrorException: HTTP 500 Internal Server Error
	at org.apache.pulsar.client.admin.internal.BaseResource.getApiException(BaseResource.java:209)
	at org.apache.pulsar.client.admin.internal.SchemasImpl$1.failed(SchemasImpl.java:85)
	at org.apache.pulsar.shade.org.glassfish.jersey.client.JerseyInvocation$1.failed(JerseyInvocation.java:839)
	at org.apache.pulsar.shade.org.glassfish.jersey.client.JerseyInvocation$1.completed(JerseyInvocation.java:820)
	at org.apache.pulsar.shade.org.glassfish.jersey.client.ClientRuntime.processResponse(ClientRuntime.java:229)
	at org.apache.pulsar.shade.org.glassfish.jersey.client.ClientRuntime.access$200(ClientRuntime.java:62)
	at org.apache.pulsar.shade.org.glassfish.jersey.client.ClientRuntime$2.lambda$response$0(ClientRuntime.java:173)
	at org.apache.pulsar.shade.org.glassfish.jersey.internal.Errors$1.call(Errors.java:248)
	at org.apache.pulsar.shade.org.glassfish.jersey.internal.Errors$1.call(Errors.java:244)
	at org.apache.pulsar.shade.org.glassfish.jersey.internal.Errors.process(Errors.java:292)
	at org.apache.pulsar.shade.org.glassfish.jersey.internal.Errors.process(Errors.java:274)
	at org.apache.pulsar.shade.org.glassfish.jersey.internal.Errors.process(Errors.java:244)
	at org.apache.pulsar.shade.org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:288)
	at org.apache.pulsar.shade.org.glassfish.jersey.client.ClientRuntime$2.response(ClientRuntime.java:173)
	at org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector.lambda$apply$1(AsyncHttpConnector.java:212)
	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
	at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
	at org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector.lambda$retryOperation$3(AsyncHttpConnector.java:253)
	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
	at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
	at org.apache.pulsar.shade.org.asynchttpclient.netty.NettyResponseFuture.loadContent(NettyResponseFuture.java:222)
	at org.apache.pulsar.shade.org.asynchttpclient.netty.NettyResponseFuture.done(NettyResponseFuture.java:257)
	at org.apache.pulsar.shade.org.asynchttpclient.netty.handler.AsyncHttpClientHandler.finishUpdate(AsyncHttpClientHandler.java:241)
	at org.apache.pulsar.shade.org.asynchttpclient.netty.handler.HttpHandler.handleChunk(HttpHandler.java:114)
	at org.apache.pulsar.shade.org.asynchttpclient.netty.handler.HttpHandler.handleRead(HttpHandler.java:143)
	at org.apache.pulsar.shade.org.asynchttpclient.netty.handler.AsyncHttpClientHandler.channelRead(AsyncHttpClientHandler.java:78)
	at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at org.apache.pulsar.shade.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
	at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at org.apache.pulsar.shade.io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
	at org.apache.pulsar.shade.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
	at org.apache.pulsar.shade.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
	at org.apache.pulsar.shade.io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
	at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at org.apache.pulsar.shade.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at org.apache.pulsar.shade.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at org.apache.pulsar.shade.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
	at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
	at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
	at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
	at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
	at org.apache.pulsar.shade.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	at org.apache.pulsar.shade.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	... 1 more
Caused by: org.apache.pulsar.shade.javax.ws.rs.InternalServerErrorException: HTTP 500 Internal Server Error
	at org.apache.pulsar.shade.org.glassfish.jersey.client.JerseyInvocation.convertToException(JerseyInvocation.java:914)
	at org.apache.pulsar.shade.org.glassfish.jersey.client.JerseyInvocation.access$500(JerseyInvocation.java:77)
	... 54 more

But it seems pretty harmless. Flink job able to run after restart. All brokers are healthy.

At some point after a day, when the load starts peaking, all 4 brokers showing these logs

...
[pulsar-client-io-63-6] ERROR org.apache.pulsar.client.impl.ProducerImpl - [xxxx] [null] Failed to create producer: java.io.IOException: Error while using ZooKeeper -  ledger=24 - operation=Failed to open ledger
...
[pulsar-client-io-63-1]  ERROR org.apache.pulsar.client.impl.ProducerImpl - [xxxx] [xxxx] Failed to create producer: Disconnected from server at xxxxx

All 4 nodes are unhealthy at this stage, and they are not able to recover itself from this state without manual intervention.

After manually restarting brokers, it is back to normal again.

Anything else?

We have serious instability issues with our production Pulsar cluster with 3 nodes. We recently updated to 5 nodes, hoping it can be more stable. Unfortunately, the broker still become unhealthy and cannot cover from it. Manual intervention is costly and not feasible for production environment.

Why brokers become unhealthy collectively? How can we prevent that?

Are you willing to submit a PR?

  • [ ] I'm willing to submit a PR!

Raven888888 avatar Aug 09 '22 16:08 Raven888888

Pulsar 2.7.0 is an outdated version that contains security vulnerabilities (Log4Shell, CVE-2021-22160, CVE-2021-41571). It's strongly recommended upgrading to a more recent version. In your case, it's first recommended to upgrade to Pulsar 2.7.4 . After that, it's possible to continue upgrading to stay on a version that will be maintained for a longer period of time. 2.7.x will become EOL soon. Supported versions are listed on https://pulsar.apache.org/docs/next/security-policy-and-supported-versions/ .

lhotari avatar Aug 11 '22 07:08 lhotari

Pulsar Release 2.7.5 is on the way.

HQebupt avatar Aug 14 '22 03:08 HQebupt

Thanks for your input @lhotari @HQebupt

Though do we have any bug fixes on broker stability from 2.7.0 to 2.7.4/5?

Often when the brokers in the cluster become unhealthy, and we try to restart them, they always hit this exception

15:14:43.615 [main] ERROR org.apache.pulsar.functions.worker.WorkerService - Error Starting up in worker

Which sometimes is due to timeout, or connection refused when connecting to other brokers. What we always end up doing, is shutting off external traffic at proxy level, and let the brokers recover first. For some reasons, brokers unable to startup when there is high external traffic.

Have you encountered similar issue before? This seems to happen to pulsar function enabled broker.

Raven888888 avatar Aug 14 '22 15:08 Raven888888

Though do we have any bug fixes on broker stability from 2.7.0 to 2.7.4/5?

Yes, there's a large number of improvements and fixes.

lhotari avatar Aug 14 '22 18:08 lhotari

Thanks again @lhotari I have upgraded to 2.10.1, and it seems to be much more stable. Closing this. Thanks for your good works!

Raven888888 avatar Aug 18 '22 01:08 Raven888888