kafka-ui
kafka-ui copied to clipboard
kafka-connect page (listing connectors) fail if a connector is on FAIL with a big log error
Describe the bug
provectus on the page listing the connectors is failing ( and not showing any connector ) when one connector is on FAILED state with a BIG trace ( error log )
Steps to Reproduce make a connector fail that log a BIG error
provectus will no more be able to list the connectors of the kafka-connect
Expected behavior
provectus should be able to ALWAYS list the connectors of kafka-connect and not try to fetch the error logs on the listing page of the connectors
also the buffer to show the error log could be bigger ( akhq is not failing to render a big error log )
Additional context
the error of provectus
org.springframework.web.reactive.function.client.WebClientResponseException: 200 OK from GET http://kafka-connect-np-service.kafka-tooling.svc.cluster.local:60030/connectors/bq_stream_tracking_preprod-1/status; nested exception is org.springframework.core.io.buffer.DataBufferLimitException: Exceeded limit on max bytes to buffer : 262144
at org.springframework.web.reactive.function.client.WebClientResponseException.create(WebClientResponseException.java:229)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
*__checkpoint ⇢ Handler com.provectus.kafka.ui.controller.KafkaConnectController#getAllConnectors(String, String, ServerWebExchange) [DispatcherHandler]
*__checkpoint ⇢ com.provectus.kafka.ui.config.ReadOnlyModeFilter [DefaultWebFilterChain]
*__checkpoint ⇢ com.provectus.kafka.ui.config.CustomWebFilter [DefaultWebFilterChain]
*__checkpoint ⇢ org.springframework.security.web.server.authorization.AuthorizationWebFilter [DefaultWebFilterChain]
*__checkpoint ⇢ org.springframework.security.web.server.authorization.ExceptionTranslationWebFilter [DefaultWebFilterChain]
*__checkpoint ⇢ org.springframework.security.web.server.authentication.logout.LogoutWebFilter [DefaultWebFilterChain]
*__checkpoint ⇢ org.springframework.security.web.server.savedrequest.ServerRequestCacheWebFilter [DefaultWebFilterChain]
*__checkpoint ⇢ org.springframework.security.web.server.context.SecurityContextServerWebExchangeWebFilter [DefaultWebFilterChain]
*__checkpoint ⇢ org.springframework.security.web.server.context.ReactorContextWebFilter [DefaultWebFilterChain]
*__checkpoint ⇢ org.springframework.security.web.server.header.HttpHeaderWriterWebFilter [DefaultWebFilterChain]
*__checkpoint ⇢ org.springframework.security.config.web.server.ServerHttpSecurity$ServerWebExchangeReactorContextWebFilter [DefaultWebFilterChain]
*__checkpoint ⇢ org.springframework.security.web.server.WebFilterChainProxy [DefaultWebFilterChain]
*__checkpoint ⇢ org.springframework.boot.actuate.metrics.web.reactive.server.MetricsWebFilter [DefaultWebFilterChain]
*__checkpoint ⇢ HTTP GET "/api/clusters/aiven/connectors?search=" [ExceptionHandlingWebHandler]
Original Stack Trace:
at org.springframework.web.reactive.function.client.WebClientResponseException.create(WebClientResponseException.java:229)
at org.springframework.web.reactive.function.client.DefaultClientResponse.lambda$createException$1(DefaultClientResponse.java:207)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:106)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2398)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2194)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onSubscribe(FluxOnErrorResume.java:74)
at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55)
at reactor.core.publisher.Mono.subscribe(Mono.java:4400)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103)
at reactor.core.publisher.Operators$MonoSubscriber.onError(Operators.java:1863)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onError(FluxMapFuseable.java:140)
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onError(FluxContextWrite.java:121)
at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onError(FluxMapFuseable.java:334)
at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onError(FluxFilterFuseable.java:382)
at reactor.core.publisher.MonoCollect$CollectSubscriber.onError(MonoCollect.java:144)
at reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:132)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:222)
at reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:132)
at reactor.core.publisher.Operators.error(Operators.java:198)
at reactor.netty.channel.FluxReceive.startReceiver(FluxReceive.java:182)
at reactor.netty.channel.FluxReceive.subscribe(FluxReceive.java:143)
at reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:62)
at reactor.netty.ByteBufFlux.subscribe(ByteBufFlux.java:340)
at reactor.core.publisher.Mono.subscribe(Mono.java:4400)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onError(MonoFlatMap.java:172)
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onError(FluxContextWrite.java:121)
at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onError(FluxMapFuseable.java:334)
at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onError(FluxFilterFuseable.java:382)
at reactor.core.publisher.MonoCollect$CollectSubscriber.onError(MonoCollect.java:144)
at reactor.core.publisher.MonoCollect$CollectSubscriber.onNext(MonoCollect.java:123)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:200)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:364)
at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:404)
at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:724)
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:93)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:299)
at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795)
at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:830)
Caused by: org.springframework.core.io.buffer.DataBufferLimitException: Exceeded limit on max bytes to buffer : 262144
at org.springframework.core.io.buffer.LimitedDataBufferList.raiseLimitException(LimitedDataBufferList.java:99)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
*__checkpoint ⇢ Body from GET http://kafka-connect-np-service.kafka-tooling.svc.cluster.local:60030/connectors/bq_stream_tracking_preprod-1/status [DefaultClientResponse]
Original Stack Trace:
at org.springframework.core.io.buffer.LimitedDataBufferList.raiseLimitException(LimitedDataBufferList.java:99)
at org.springframework.core.io.buffer.LimitedDataBufferList.updateCount(LimitedDataBufferList.java:92)
at org.springframework.core.io.buffer.LimitedDataBufferList.add(LimitedDataBufferList.java:58)
at reactor.core.publisher.MonoCollect$CollectSubscriber.onNext(MonoCollect.java:118)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:200)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:364)
at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:404)
at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:724)
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:93)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:299)
at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795)
at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:830)
[30m2022-08-09 14:19:09,913[0;39m [1;31mERROR[0;39m [[34mreactor-http-epoll-2[0;39m] [33mo.s.b.a.w.r.e.AbstractErrorWebExceptionHandler[0;39m: [29799bba-196] 500 Server Error for HTTP GET "/api/clusters/aiven/connectors?search="
and akhq is capable to show the error and do not fail to list the connectors of the kafka-connect :
Thanks for the report, we'll fix this :)
Can I be please assigned to this :) ?
@shubhwip hey, sure :)
Hey @Haarolean I have looked at this and looks like while fetching connectors "/api/clusters/local/connectors?search="
this error resulted and due to big size of connectors data json
. Now we've specified a good limit of size ~20MB
already but somehow defaults(which is maxSize 262144) are picked up.
So we can fix this by making sure ~20MB
maxSize picked up using ExchangeStrategies.builder()
. Correct me if i am wrong here please :).
Now I am facing an issue.
- Writing a
Junit
Test for which this erroneous scenario will be reproduced. I tried to reproduce it by writing a test which create thousand of connectors and then callinggetConnectors
but unfortunately my local build is not working so i am not able to verify tests. https://discord.com/channels/897805035122077716/897805035122077719/1008216678376681543 Is there any other way to write test and reproduce the behaviour ?
If test is not necessary for this issue then I can create a PR.
@shubhwip
Let's make it configurable, just like in #2139
Regarding the tests: I've already replied how to build the app / run the tests in that thread. The test might be too extra, you can try opening a draft PR so far to let us take a look.
@shubhwip
Let's make it configurable, just like in #2139
Regarding the tests: I've already replied how to build the app / run the tests in that thread. The test might be too extra, you can try opening a draft PR so far to let us take a look.
@Haarolean I looked at the #2139 but I saw maxBuffSize
for connectors config is made configurable in [kafka-ui-api/src/main/java/com/provectus/kafka/ui/client/KafkaConnectClientsFactory.java](https://github.com/provectus/kafka-ui/blob/master/kafka-ui-api/src/main/java/com/provectus/kafka/ui/client/KafkaConnectClientsFactory.java#L15
@shubhwip So the thing we discuss is the exact same property I pointed to we previously implemented lol
@raphaelauv you can increase the buffer size via property webclient.max-in-memory-buffer-size
. The default value is 20MB
. Let me know if you encounter any problems.
Closing as a duplicate, need to add this property to documentation tho, plus, possibly adjust the default value.
Updated wiki: https://github.com/provectus/kafka-ui/wiki/Common-problems#databufferlimitexception-exceeded-limit-on-max-bytes-to-buffer
@Haarolean I am encountering this problem right now.
I understand that I need to change the value of the property webclient.max-in-memory-buffer-size
, but how can I do it?
Can it be changed via environment variable or do I need to build a custom docker image?
@zevektor hey! Yeah it can be used as an env variable or a config property :) replace the dots with an underscore and here's your env variable