RxNetty
RxNetty copied to clipboard
io.reactivex.netty.client.PoolExhaustedException although pool is at 12k and we're receiving 2k QPS
Hi,
We're seing the following errors on our proxy server:
io.reactivex.netty.client.PoolExhaustedException
at io.reactivex.netty.client.ConnectionPoolImpl$1.call(ConnectionPoolImpl.java:147)
at io.reactivex.netty.client.ConnectionPoolImpl$1.call(ConnectionPoolImpl.java:117)
at rx.Observable$2.call(Observable.java:162)
at rx.Observable$2.call(Observable.java:154)
at rx.Observable$2.call(Observable.java:162)
at rx.Observable$2.call(Observable.java:154)
at rx.Observable$2.call(Observable.java:162)
at rx.Observable$2.call(Observable.java:154)
at rx.Observable$2.call(Observable.java:162)
at rx.Observable$2.call(Observable.java:154)
at rx.Observable.unsafeSubscribe(Observable.java:8314)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:235)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:145)
at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:54)
at rx.internal.operators.OperatorDoOnEach$1.onNext(OperatorDoOnEach.java:85)
at rx.internal.operators.NotificationLite.accept(NotificationLite.java:150)
at rx.internal.operators.CachedObservable$ReplayProducer.replay(CachedObservable.java:404)
at rx.internal.operators.CachedObservable$ReplayProducer.request(CachedObservable.java:304)
at rx.Subscriber.setProducer(Subscriber.java:211)
at rx.Subscriber.setProducer(Subscriber.java:205)
at rx.Subscriber.setProducer(Subscriber.java:205)
at rx.internal.operators.CachedObservable$CachedSubscribe.call(CachedObservable.java:244)
at rx.internal.operators.CachedObservable$CachedSubscribe.call(CachedObservable.java:230)
at rx.Observable$2.call(Observable.java:162)
at rx.Observable$2.call(Observable.java:154)
at rx.Observable$2.call(Observable.java:162)
at rx.Observable$2.call(Observable.java:154)
at rx.Observable$2.call(Observable.java:162)
at rx.Observable$2.call(Observable.java:154)
at rx.Observable$2.call(Observable.java:162)
at rx.Observable$2.call(Observable.java:154)
at rx.Observable$2.call(Observable.java:162)
at rx.Observable$2.call(Observable.java:154)
at rx.Observable.unsafeSubscribe(Observable.java:8314)
at rx.internal.operators.OperatorSwitchIfEmpty$ParentSubscriber.subscribeToAlternate(OperatorSwitchIfEmpty.java:78)
at rx.internal.operators.OperatorSwitchIfEmpty$ParentSubscriber.onCompleted(OperatorSwitchIfEmpty.java:71)
at rx.internal.operators.OperatorDoOnEach$1.onCompleted(OperatorDoOnEach.java:53)
at rx.internal.operators.OperatorMap$1.onCompleted(OperatorMap.java:43)
at rx.internal.operators.OperatorFilter$1.onCompleted(OperatorFilter.java:42)
at rx.internal.producers.SingleDelayedProducer.emit(SingleDelayedProducer.java:110)
at rx.internal.producers.SingleDelayedProducer.setValue(SingleDelayedProducer.java:85)
at rx.internal.operators.OperatorAny$1.onNext(OperatorAny.java:57)
at rx.internal.operators.OperatorFilter$1.onNext(OperatorFilter.java:54)
at rx.internal.operators.NotificationLite.accept(NotificationLite.java:150)
at rx.internal.operators.CachedObservable$ReplayProducer.replay(CachedObservable.java:404)
at rx.internal.operators.CachedObservable$CacheState.dispatch(CachedObservable.java:220)
at rx.internal.operators.CachedObservable$CacheState.onNext(CachedObservable.java:191)
at rx.internal.operators.CachedObservable$CacheState$1.onNext(CachedObservable.java:171)
at rx.internal.operators.NotificationLite.accept(NotificationLite.java:150)
at rx.internal.operators.CachedObservable$ReplayProducer.replay(CachedObservable.java:404)
at rx.internal.operators.CachedObservable$CacheState.dispatch(CachedObservable.java:220)
at rx.internal.operators.CachedObservable$CacheState.onNext(CachedObservable.java:191)
at rx.internal.operators.CachedObservable$CacheState$1.onNext(CachedObservable.java:171)
at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:54)
at io.reactivex.netty.protocol.http.UnicastContentSubject$AutoReleaseByteBufOperator$1.onNext(UnicastContentSubject.java:271)
at rx.internal.operators.NotificationLite.accept(NotificationLite.java:150)
at rx.internal.operators.BufferUntilSubscriber.emit(BufferUntilSubscriber.java:145)
at rx.internal.operators.BufferUntilSubscriber.onNext(BufferUntilSubscriber.java:178)
at io.reactivex.netty.protocol.http.UnicastContentSubject.onNext(UnicastContentSubject.java:295)
at io.reactivex.netty.protocol.http.server.ServerRequestResponseConverter.invokeContentOnNext(ServerRequestResponseConverter.java:206)
at io.reactivex.netty.protocol.http.server.ServerRequestResponseConverter.channelRead(ServerRequestResponseConverter.java:142)
at io.netty.channel.ChannelHandlerInvokerUtil.invokeChannelReadNow(ChannelHandlerInvokerUtil.java:83)
at io.netty.channel.DefaultChannelHandlerInvoker.invokeChannelRead(DefaultChannelHandlerInvoker.java:154)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:354)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:145)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at io.netty.channel.ChannelHandlerInvokerUtil.invokeChannelReadNow(ChannelHandlerInvokerUtil.java:83)
at io.netty.channel.DefaultChannelHandlerInvoker.invokeChannelRead(DefaultChannelHandlerInvoker.java:154)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:354)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:145)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:277)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:264)
at io.netty.channel.ChannelHandlerInvokerUtil.invokeChannelReadNow(ChannelHandlerInvokerUtil.java:83)
at io.netty.channel.DefaultChannelHandlerInvoker.invokeChannelRead(DefaultChannelHandlerInvoker.java:154)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:354)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:145)
at io.netty.handler.logging.LoggingHandler.channelRead(LoggingHandler.java:233)
at io.netty.channel.ChannelHandlerInvokerUtil.invokeChannelReadNow(ChannelHandlerInvokerUtil.java:83)
at io.netty.channel.DefaultChannelHandlerInvoker.invokeChannelRead(DefaultChannelHandlerInvoker.java:154)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:354)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:145)
at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
at io.reactivex.netty.metrics.BytesInspector.channelRead(BytesInspector.java:59)
at io.netty.channel.ChannelHandlerInvokerUtil.invokeChannelReadNow(ChannelHandlerInvokerUtil.java:83)
at io.netty.channel.DefaultChannelHandlerInvoker.invokeChannelRead(DefaultChannelHandlerInvoker.java:154)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:354)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:145)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:1078)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:117)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:527)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:484)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:398)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:370)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:742)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:145)
at java.lang.Thread.run(Thread.java:745)
I'm kinda unsure of what can be causing this. Our Proxy is getting around 4k QPS but only redirect half of those (2k) to the RxNetty client which is configured with withMaxConnections(12000)
. The target server answers in an average of 30ms. We got this error after 12 hours of production and restarting the service fixed the problem (until it comes back).
What could cause the pool to be exhausted ? MAy there be something one needs to do to make sure connections are released properly ? Thanks
Connection should be released when the response completes or there is an error. So, if this is happening, it looks like a leak. Hard to find where it would be coming from though without looking at the code. Do you use the metrics for connection pool? If so, do release and acquire align?
I face the same problem these days we we're developing our gateway.
At the high load, first, there're few connections getting ConnectionReset exception.
Bit by bit, the later connections get the io.reactivex.netty.client.PoolExhaustedException.
@NiteshKant Sorry seems like i missed your answer earlier. I don't use the metrics but I could if I knew how ;) i'll dig a bit into this see if I can find anything.
@Crystark I have seen similar issue when I was using ribbon as a client. There was a bug on the way the maxConnectionsPerHost was handled and I have filed the following ticket (also working on a fix right now)
https://github.com/Netflix/ribbon/issues/290
Could this be similar to what you are seeing?
@Crystark rxnetty-spectator module gives information about how to attach metrics listeners
I still need to dig more into this as it's currently a real problem that forces us to restart our services on a regular basis but i thought I'd mention this seems linked to #523
FYI, all of our services that use RxNetty as an HTTP client have this issue.
@mkmainali we're not using Ribbon on our end and we're having this issue even on a service that uses only one RxNetty HTTP Client
@Crystark after 2 weeks struggling with RxNetty, we have changed to use Pure Netty as our gateway implementation. It works extremely direct, clean, and high performance.
Here's an example version written in Kotlin (not our production version, but just an experiment code during research ), hope that it would be useful. https://github.com/Feng-Zihao/protox
Based on the model of protox, it's easily to gain more than 3K QPS on a normal machine, with just about 20 threads or less (I test it with 2 * core number, without deep machine/JVM turning,constant number of connection).
@Crystark As you just interested in Http Client, please refer to the file below: https://github.com/Feng-Zihao/protox/blob/master/src/main/kotlin/org/protox/http/BackendHandler.kt It's not hard to translate it to Java.
Protox set AutoRead to false, to control the IO manually. The purpose is to reduce Memory Use.
If you don't need to concerned about Memory Use, just set autoRead to true, and attach HttpObjectAggregator to simplify the job.
@Feng-Zihao Thanks for the advice, I'll be looking into this if I can't find out a solution using RxNetty which i'd like to keep.
@NiteshKant We're currently digging a bit more into this so if you could let me know everything I can do to find out more information on this leak it would be very helpful.
I've added RxNetty.useMetricListenersFactory(new ServoEventsListenerFactory());
to our proxy server that we'll be able to monitor through JMX. I wanted to have this come up in graphite like our other Servo metrics but it seems that this doesn't register the metrics in the MonitorRegistry. I'm not sure if I can do something about that. We're using the GraphiteMetricObserver
combined with AsyncMetricObserver
and CounterToRateMetricTransform
with servo's PollScheduler
. I haven't for now dug enough in spectator to switch to it.
Any advice would be welcomed. I'd really like to have RxNetty fully working. Thanks
@NiteshKant
Here are some metrics collected manually through JMX. It took approximately 3 minutes to write all values down so bear that in mind when reading them. The order I'm presenting them in is the order they were wrote down in.
COUNTERS
--------
bytesRead 12717694549
bytesWritten 187986085411
connectionCount 15189579
failedConnectionClosed 0
failedConnects 0
failedContentSource 0
failedFlush 446
failedPoolAcquire 0
failedPoolRelease 0
failedResponses 5481888
failedWrites 857516
poolAcquire 101589015
poolEviction 15258924
poolRelease 101676551
poolReuse 86438609
processedRequest 104494834
requestWriteFailed 450
GAUGE
-----
inflightRequest -2725850
liveConnections 5528
pendingConnectionClose 0
pendingConnects 0
pendingFlushes 81049184
pendingPoolAcquire 3
pendingPoolrelease 0
pendingWrite 0
requestBacklog 30233
I hope this can give you some insight on what may be happening. The service hasn't crashed yet. We'll get some more data when it does.
Updated values just now. Our max connection is set to 20k so it will break later. We'll try with a lower max conn to make it break faster.
COUNTERS
--------
bytesRead 19721917559
bytesWritten 299488150851
connectionCount 30460281
failedConnectionClosed 0
failedConnects 0
failedContentSource 0
failedFlush 1075
failedPoolAcquire 0
failedPoolRelease 0
failedResponses 11955234
failedWrites 1758824
poolAcquire 163678049
poolEviction 30557316
poolRelease 163797064
poolReuse 133279320
processedRequest 169876336
requestWriteFailed 1078
GAUGE
-----
inflightRequest -5938256
liveConnections 11326
pendingConnectionClose 0
pendingConnects 2
pendingFlushes 124804329
pendingPoolAcquire 0
pendingPoolrelease 0
pendingWrite 0
requestBacklog 64154
@crystark are these metrics for one RxNetty client or for multiple clients? High number of pending flushes look suspicious. I will look at the metrics more.
In order to eliminate the issue with client shutdown, if you can remove shutdown of clients, it will help narrow down the problem I think.
@NiteshKant ~~those are all the client metrics~~. I'm not sure what you mean by client shutdown but I'll look more into this tomorrow when I'm back at work.
@NiteshKant I'm really not sure what you mean by "client shutdown". Can you elaborate ? If you mean shutting down the RxNetty client, we should never be doing that.
I see my previous answer wasn't really on topic (sorry it was late). Those are the metrics for only one RxNetty client in our reverse proxy server.
More data. We restarted the server with a client using max connection at 2000 so it crashes faster. Here is the data right before and right after it crashed (reached 2k live connections).
Thu Jul 28 10:45:01 UTC 2016
COUNTERS:
bytesRead:value=5366545868
bytesWritten:value=87346338408
connectionCount:value=6054151
failedConnectionClose:value=0
failedConnects:value=0
failedContentSource:value=0
failedFlushes:value=122
failedPoolAcquires:value=37
failedPoolReleases:value=0
failedResponses:value=1882875
failedWrites:value=313334
poolAcquires:value=47481382
poolEvictions:value=6056595
poolReleases:value=47483216
poolReuse:value=41427256
processedRequests:value=48419617
requestWriteFailed:value=124
GAUGE:
liveConnections:value=1919
pendingFlushes:value=38802682
pendingConnects:value=1
pendingConnectionClose:value=0
pendingPoolAcquires:value=0
pendingPoolReleases:value=0
pendingWrites:value=0
requestBacklog:value=10772
inflightRequests:value=-932203
flushTimes:value=5500
connectionTimes:value=946
------------------------------------------------
Thu Jul 28 10:46:01 UTC 2016
COUNTERS:
bytesRead:value=5377140316
bytesWritten:value=87510804902
connectionCount:value=6069020
failedConnectionClose:value=0
failedConnects:value=0
failedContentSource:value=0
failedFlushes:value=125
failedPoolAcquires:value=132691
failedPoolReleases:value=0
failedResponses:value=1888534
failedWrites:value=314078
poolAcquires:value=47692433
poolEvictions:value=6067022
poolReleases:value=47555589
poolReuse:value=41488729
processedRequests:value=48491275
requestWriteFailed:value=125
GAUGE:
liveConnections:value=2000
pendingFlushes:value=38839631
pendingConnects:value=0
pendingConnectionClose:value=0
pendingPoolAcquires:value=0
pendingPoolReleases:value=0
pendingWrites:value=0
requestBacklog:value=154082
inflightRequests:value=-789577
flushTimes:value=1714
connectionTimes:value=298
Note: I have about half an hour of those stats extracted every minutes if you need. We automated the process as we couldn't graph it.
@NiteshKant We updated to the RxNetty 0.4.18 with great hopes but we're still facing the connection leaks. Here are the extracted metrics. The max connection was set to 10k on this server.
------------------------------------------------
Tue Aug 30 13:34:13 UTC 2016
COUNTERS:
bytesRead:value=22214935
bytesWritten:value=301571174
connectionCount:value=46530
failedConnectionClose:value=0
failedConnects:value=0
failedContentSource:value=0
failedFlushes:value=0
failedPoolAcquires:value=0
failedPoolReleases:value=0
failedResponses:value=14530
failedWrites:value=1792
poolAcquires:value=176456
poolEvictions:value=48762
poolReleases:value=177645
poolReuse:value=129285
processedRequests:value=186335
requestWriteFailed:value=0
GAUGE:
liveConnections:value=24
pendingFlushes:value=113821
pendingConnects:value=1
pendingConnectionClose:value=0
pendingPoolAcquires:value=0
pendingPoolReleases:value=0
pendingWrites:value=0
requestBacklog:value=88
inflightRequests:value=-7716
flushTimes:value=2180
connectionTimes:value=617
------------------------------------------------
Tue Aug 30 13:35:01 UTC 2016
COUNTERS:
bytesRead:value=38314550
bytesWritten:value=525947318
connectionCount:value=86181
failedConnectionClose:value=0
failedConnects:value=0
failedContentSource:value=0
failedFlushes:value=1
failedPoolAcquires:value=0
failedPoolReleases:value=0
failedResponses:value=26455
failedWrites:value=3234
poolAcquires:value=303131
poolEvictions:value=88797
poolReleases:value=304584
poolReuse:value=216251
processedRequests:value=319400
requestWriteFailed:value=1
GAUGE:
liveConnections:value=38
pendingFlushes:value=189347
pendingConnects:value=0
pendingConnectionClose:value=0
pendingPoolAcquires:value=0
pendingPoolReleases:value=0
pendingWrites:value=2
requestBacklog:value=160
inflightRequests:value=-13495
flushTimes:value=2626
connectionTimes:value=789
------------------------------------------------
Tue Aug 30 13:36:01 UTC 2016
COUNTERS:
bytesRead:value=59477695
bytesWritten:value=810126386
connectionCount:value=128479
failedConnectionClose:value=0
failedConnects:value=0
failedContentSource:value=0
failedFlushes:value=1
failedPoolAcquires:value=0
failedPoolReleases:value=0
failedResponses:value=37365
failedWrites:value=4676
poolAcquires:value=458178
poolEvictions:value=130065
poolReleases:value=459549
poolReuse:value=329898
processedRequests:value=479510
requestWriteFailed:value=1
GAUGE:
liveConnections:value=54
pendingFlushes:value=288876
pendingConnects:value=1
pendingConnectionClose:value=0
pendingPoolAcquires:value=0
pendingPoolReleases:value=0
pendingWrites:value=0
requestBacklog:value=225
inflightRequests:value=-18832
flushTimes:value=2610
connectionTimes:value=704
------------------------------------------------
...
------------------------------------------------
Tue Aug 30 23:16:01 UTC 2016
COUNTERS:
bytesRead:value=12657987871
bytesWritten:value=191209255585
connectionCount:value=29716643
failedConnectionClose:value=0
failedConnects:value=0
failedContentSource:value=0
failedFlushes:value=952
failedPoolAcquires:value=0
failedPoolReleases:value=0
failedResponses:value=10671653
failedWrites:value=1305752
poolAcquires:value=105677887
poolEvictions:value=29709864
poolReleases:value=105671034
poolReuse:value=75961602
processedRequests:value=110960746
requestWriteFailed:value=953
GAUGE:
liveConnections:value=8398
pendingFlushes:value=66041731
pendingConnects:value=0
pendingConnectionClose:value=0
pendingPoolAcquires:value=0
pendingPoolReleases:value=0
pendingWrites:value=4
requestBacklog:value=55868
inflightRequests:value=-5281009
flushTimes:value=3068
connectionTimes:value=773
------------------------------------------------
Tue Aug 30 23:17:01 UTC 2016
COUNTERS:
bytesRead:value=12679116014
bytesWritten:value=191541020749
connectionCount:value=29760853
failedConnectionClose:value=0
failedConnects:value=0
failedContentSource:value=0
failedFlushes:value=954
failedPoolAcquires:value=0
failedPoolReleases:value=0
failedResponses:value=10688029
failedWrites:value=1307696
poolAcquires:value=105863755
poolEvictions:value=29754374
poolReleases:value=105857076
poolReuse:value=76103210
processedRequests:value=111154980
requestWriteFailed:value=954
GAUGE:
liveConnections:value=8407
pendingFlushes:value=66165040
pendingConnects:value=0
pendingConnectionClose:value=0
pendingPoolAcquires:value=0
pendingPoolReleases:value=0
pendingWrites:value=0
requestBacklog:value=55968
inflightRequests:value=-5288964
flushTimes:value=3066
connectionTimes:value=733
------------------------------------------------
Tue Aug 30 23:18:01 UTC 2016
COUNTERS:
bytesRead:value=12700535635
bytesWritten:value=191875283953
connectionCount:value=29805643
failedConnectionClose:value=0
failedConnects:value=0
failedContentSource:value=0
failedFlushes:value=957
failedPoolAcquires:value=0
failedPoolReleases:value=0
failedResponses:value=10704510
failedWrites:value=1309768
poolAcquires:value=106048166
poolEvictions:value=29799207
poolReleases:value=106041342
poolReuse:value=76242427
processedRequests:value=111347051
requestWriteFailed:value=957
GAUGE:
liveConnections:value=8418
pendingFlushes:value=66286462
pendingConnects:value=0
pendingConnectionClose:value=0
pendingPoolAcquires:value=0
pendingPoolReleases:value=0
pendingWrites:value=0
requestBacklog:value=56060
inflightRequests:value=-5297203
flushTimes:value=3076
connectionTimes:value=741
------------------------------------------------
AUTO REBOOT DUE TO TOO HIGH liveConnections VALUES HAPPENED HERE
------------------------------------------------
Tue Aug 30 23:19:01 UTC 2016
COUNTERS:
bytesRead:value=2785258
bytesWritten:value=44089930
connectionCount:value=1779
failedConnectionClose:value=0
failedConnects:value=0
failedContentSource:value=0
failedFlushes:value=0
failedPoolAcquires:value=0
failedPoolReleases:value=0
failedResponses:value=1014
failedWrites:value=154
poolAcquires:value=31794
poolEvictions:value=3352
poolReleases:value=33319
poolReuse:value=30419
processedRequests:value=35612
requestWriteFailed:value=0
GAUGE:
liveConnections:value=11
pendingFlushes:value=28512
pendingConnects:value=2
pendingConnectionClose:value=0
pendingPoolAcquires:value=10
pendingPoolReleases:value=0
pendingWrites:value=0
requestBacklog:value=10
inflightRequests:value=-1572
flushTimes:value=324
connectionTimes:value=14
An other fact we observed: those liveConnection values increase much more slowly on servers with less than 1000 QPS (like 50x slower). Over that value it starts building up much faster. For instance the metrics i extracted are from a server that receives an average of 1700 QPS and caps around 2000 QPS.
Would you have any lead to help us figure out what's going wrong ? it seems to be due to high concurrency of requests and i suspect the fact that some connections are closed early may lead to these.
FYI, we have a combination of load balancers with the reverse proxy that have their own rules which could lead to various behaviors of input and output requests. I'd love to be more specific but this would require a more private conversation. Is there anyway we could exchange via email or some other private channel ?
Thanks
@Crystark I believe that the issue you are seeing is because you are writing too fast/too much of data OR some code path is blocking the eventloop. If you are writing too fast then you need backpressure or pessimistic limits to control the number of concurrent requests being processed by the server. Netty backpressure is hooked up in RxNetty only in 0.5.x.
Sure, I would love to hear more about your use-case and how you are using RxNetty too. You can email me at nitesh1706 at gmail and we can take it from there.
@NiteshKant : We are using Ribbon and seeing the similar issue where our JVMs end up in a zombie state where all the request to that instance is a 500 failing with below error:
Caused by: com.netflix.client.ClientException: Number of retries on next server exceeded max 2 retries, while making a call for:
We have to manually bounce the JVM to get it back to normal. Here are the ribbon connection configs MaxConnectionsPerHost: 600 MaxTotalConnections: 3000 ( 5 backend servers X MaxConnectionsPerHost)
We were previously using Ribbon 2.0.0 and i have tried updating it to 2.2.2 and also updated RxNetty to 0.5.1 considering the Connection Leak issue# 532. Coz i initally assumed it to be something to do with RxNetty. But no luck and the issue still persists.
Request your inputs on the above issue.
Simple repro:
for our project, https://github.com/Azure/azure-cosmosdb-java, we are trying to upgrade our dependency on RxNetty from 0.4.x to 0.5.2 however it seems there is a connection leak issue in 0.5.2 that we are facing.
code to repro:
public class testing {
private static class DefaultSSLEngineFactory implements Func1<ByteBufAllocator, SSLEngine> {
private final SslContext sslContex;
private DefaultSSLEngineFactory() {
try {
SslProvider sslProvider = SslContext.defaultClientProvider();
sslContex = SslContextBuilder.forClient().sslProvider(sslProvider).build();
} catch (SSLException e) {
throw new IllegalStateException("Failed to create default SSL context", e);
}
}
@Override
public SSLEngine call(ByteBufAllocator byteBufAllocator) {
return sslContex.newEngine(byteBufAllocator);
}
}
public static void main(String[] args) throws Exception {
PoolConfig<ByteBuf, ByteBuf> pConfig = new PoolConfig<>();
pConfig.maxConnections(1000);
HttpClient<ByteBuf, ByteBuf> cl = HttpClientImpl.newClient(SingleHostPoolingProviderFactory.create(pConfig),
Observable.just(new Host(new InetSocketAddress(InetAddress.getByName("github.com"), 443))))
.secure(new DefaultSSLEngineFactory());
List<Observable<Void>> list = new ArrayList<>();
for(int i = 0; i < 4000; i++) {
Observable<HttpClientResponse<ByteBuf>> rsp =
cl.createRequest(HttpMethod.GET, "/").writeBytesContent(Observable.just(new byte[] {}));
Observable<Void> contentDiscardedObs = rsp.flatMap(hcr -> hcr.discardContent());
list.add(contentDiscardedObs);
}
List<Void> res = Observable.merge(list, 1000)
.observeOn(Schedulers.computation())
.toList().toBlocking().single();
System.out.println("DONE");
}
}
As Connection Pool Size is 1000, and we are merging results with 1000 as the degree of concurrency the above code should work. However we get the following failure:
Exception in thread "main" java.lang.RuntimeException: io.reactivex.netty.client.pool.PoolExhaustedException: Client connection pool exhausted.
at rx.exceptions.Exceptions.propagate(Exceptions.java:57)
at rx.observables.BlockingObservable.blockForSingle(BlockingObservable.java:463)
at rx.observables.BlockingObservable.single(BlockingObservable.java:340)
at testing.main(testing.java:66)
Caused by: io.reactivex.netty.client.pool.PoolExhaustedException: Client connection pool exhausted.
at io.reactivex.netty.client.pool.PooledConnectionProviderImpl$7.call(PooledConnectionProviderImpl.java:193)
at io.reactivex.netty.client.pool.PooledConnectionProviderImpl$7.call(PooledConnectionProviderImpl.java:173)
at rx.Observable.unsafeSubscribe(Observable.java:10256)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.drain(OnSubscribeConcatMap.java:286)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.onNext(OnSubscribeConcatMap.java:144)
at rx.internal.operators.OnSubscribeFromArray$FromArrayProducer.slowPath(OnSubscribeFromArray.java:100)
at rx.internal.operators.OnSubscribeFromArray$FromArrayProducer.request(OnSubscribeFromArray.java:63)
at rx.Subscriber.setProducer(Subscriber.java:211)
at rx.internal.operators.OnSubscribeFromArray.call(OnSubscribeFromArray.java:32)
at rx.internal.operators.OnSubscribeFromArray.call(OnSubscribeFromArray.java:24)
at rx.Observable.unsafeSubscribe(Observable.java:10256)
at rx.internal.operators.OnSubscribeConcatMap.call(OnSubscribeConcatMap.java:94)
at rx.internal.operators.OnSubscribeConcatMap.call(OnSubscribeConcatMap.java:42)
at rx.Observable.unsafeSubscribe(Observable.java:10256)
at rx.internal.operators.OnSubscribeFilter.call(OnSubscribeFilter.java:45)
at rx.internal.operators.OnSubscribeFilter.call(OnSubscribeFilter.java:30)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
at rx.Observable.unsafeSubscribe(Observable.java:10256)
at io.reactivex.netty.client.pool.PooledConnectionProviderImpl$4.call(PooledConnectionProviderImpl.java:129)
at io.reactivex.netty.client.pool.PooledConnectionProviderImpl$4.call(PooledConnectionProviderImpl.java:108)
at rx.Observable.unsafeSubscribe(Observable.java:10256)
at io.reactivex.netty.protocol.tcp.client.ConnectionRequestImpl$1.call(ConnectionRequestImpl.java:30)
at io.reactivex.netty.protocol.tcp.client.ConnectionRequestImpl$1.call(ConnectionRequestImpl.java:27)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
at rx.Observable.unsafeSubscribe(Observable.java:10256)
at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:48)
at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:33)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
at rx.Observable.unsafeSubscribe(Observable.java:10256)
at io.reactivex.netty.protocol.http.client.internal.HttpClientRequestImpl$OnSubscribeFuncImpl.call(HttpClientRequestImpl.java:447)
at io.reactivex.netty.protocol.http.client.internal.HttpClientRequestImpl$OnSubscribeFuncImpl.call(HttpClientRequestImpl.java:420)
at rx.Observable.unsafeSubscribe(Observable.java:10256)
at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:48)
at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:33)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
at rx.Observable.unsafeSubscribe(Observable.java:10256)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:248)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:148)
at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.slowPath(OnSubscribeFromIterable.java:117)
at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.request(OnSubscribeFromIterable.java:89)
at rx.Subscriber.request(Subscriber.java:157)
at rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop(OperatorMerge.java:781)
at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:568)
at rx.internal.operators.OperatorMerge$InnerSubscriber.onCompleted(OperatorMerge.java:857)
at rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop(OperatorMerge.java:656)
at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:568)
at rx.internal.operators.OperatorMerge$InnerSubscriber.onCompleted(OperatorMerge.java:857)
at rx.internal.operators.OperatorIgnoreElements$1.onCompleted(OperatorIgnoreElements.java:42)
at rx.internal.operators.OnSubscribeMap$MapSubscriber.onCompleted(OnSubscribeMap.java:97)
at io.reactivex.netty.protocol.http.internal.AbstractHttpConnectionBridge$ConnectionInputSubscriber.contentComplete(AbstractHttpConnectionBridge.java:508)
at io.reactivex.netty.protocol.http.internal.AbstractHttpConnectionBridge.processNextItemInEventloop(AbstractHttpConnectionBridge.java:283)
at io.reactivex.netty.protocol.http.internal.AbstractHttpConnectionBridge.access$1200(AbstractHttpConnectionBridge.java:56)
at io.reactivex.netty.protocol.http.internal.AbstractHttpConnectionBridge$ConnectionInputSubscriber.onNext(AbstractHttpConnectionBridge.java:431)
at io.reactivex.netty.channel.AbstractConnectionToChannelBridge$ReadProducer.sendOnNext(AbstractConnectionToChannelBridge.java:373)
at io.reactivex.netty.channel.AbstractConnectionToChannelBridge.newMessage(AbstractConnectionToChannelBridge.java:189)
at io.reactivex.netty.channel.BackpressureManagingHandler.channelRead(BackpressureManagingHandler.java:77)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
at io.reactivex.netty.protocol.http.ws.client.Ws7To13UpgradeHandler.channelRead(Ws7To13UpgradeHandler.java:135)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:438)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284)
at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
at io.netty.handler.codec.MessageToMessageCodec.channelRead(MessageToMessageCodec.java:111)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1389)
at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1159)
at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1203)
at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:489)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:428)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:265)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
at io.reactivex.netty.channel.BytesInspector.channelRead(BytesInspector.java:56)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1414)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:945)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:146)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
can someone please look at this? I only face this problem with 0.5.2 but not 0.4.x does anyone have any suggestion? @NiteshKant
I'm actually here because I'm getting the same issue using azure-cosmosdb-java with RxNetty 0.4.20 version. Would love to hear some suggestions as well.
I am facing this issue with Netflix Ribbon loadbalancer. I am getting io.reactivex.netty.client.pool.PoolExhaustedException for requests. Please help if anybody having solution for this. I am using RxNetty 0.5.1 currently.