RxNetty icon indicating copy to clipboard operation
RxNetty copied to clipboard

io.reactivex.netty.client.PoolExhaustedException although pool is at 12k and we're receiving 2k QPS

Open Crystark opened this issue 8 years ago • 21 comments

Hi,

We're seing the following errors on our proxy server:

io.reactivex.netty.client.PoolExhaustedException
        at io.reactivex.netty.client.ConnectionPoolImpl$1.call(ConnectionPoolImpl.java:147)
        at io.reactivex.netty.client.ConnectionPoolImpl$1.call(ConnectionPoolImpl.java:117)
        at rx.Observable$2.call(Observable.java:162)
        at rx.Observable$2.call(Observable.java:154)
        at rx.Observable$2.call(Observable.java:162)
        at rx.Observable$2.call(Observable.java:154)
        at rx.Observable$2.call(Observable.java:162)
        at rx.Observable$2.call(Observable.java:154)
        at rx.Observable$2.call(Observable.java:162)
        at rx.Observable$2.call(Observable.java:154)
        at rx.Observable.unsafeSubscribe(Observable.java:8314)
        at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:235)
        at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:145)
        at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:54)
        at rx.internal.operators.OperatorDoOnEach$1.onNext(OperatorDoOnEach.java:85)
        at rx.internal.operators.NotificationLite.accept(NotificationLite.java:150)
        at rx.internal.operators.CachedObservable$ReplayProducer.replay(CachedObservable.java:404)
        at rx.internal.operators.CachedObservable$ReplayProducer.request(CachedObservable.java:304)
        at rx.Subscriber.setProducer(Subscriber.java:211)
        at rx.Subscriber.setProducer(Subscriber.java:205)
        at rx.Subscriber.setProducer(Subscriber.java:205)
        at rx.internal.operators.CachedObservable$CachedSubscribe.call(CachedObservable.java:244)
        at rx.internal.operators.CachedObservable$CachedSubscribe.call(CachedObservable.java:230)
        at rx.Observable$2.call(Observable.java:162)
        at rx.Observable$2.call(Observable.java:154)
        at rx.Observable$2.call(Observable.java:162)
        at rx.Observable$2.call(Observable.java:154)
        at rx.Observable$2.call(Observable.java:162)
        at rx.Observable$2.call(Observable.java:154)
        at rx.Observable$2.call(Observable.java:162)
        at rx.Observable$2.call(Observable.java:154)
        at rx.Observable$2.call(Observable.java:162)
        at rx.Observable$2.call(Observable.java:154)
        at rx.Observable.unsafeSubscribe(Observable.java:8314)
        at rx.internal.operators.OperatorSwitchIfEmpty$ParentSubscriber.subscribeToAlternate(OperatorSwitchIfEmpty.java:78)
        at rx.internal.operators.OperatorSwitchIfEmpty$ParentSubscriber.onCompleted(OperatorSwitchIfEmpty.java:71)
        at rx.internal.operators.OperatorDoOnEach$1.onCompleted(OperatorDoOnEach.java:53)
        at rx.internal.operators.OperatorMap$1.onCompleted(OperatorMap.java:43)
        at rx.internal.operators.OperatorFilter$1.onCompleted(OperatorFilter.java:42)
        at rx.internal.producers.SingleDelayedProducer.emit(SingleDelayedProducer.java:110)
        at rx.internal.producers.SingleDelayedProducer.setValue(SingleDelayedProducer.java:85)
        at rx.internal.operators.OperatorAny$1.onNext(OperatorAny.java:57)
        at rx.internal.operators.OperatorFilter$1.onNext(OperatorFilter.java:54)
        at rx.internal.operators.NotificationLite.accept(NotificationLite.java:150)
        at rx.internal.operators.CachedObservable$ReplayProducer.replay(CachedObservable.java:404)
        at rx.internal.operators.CachedObservable$CacheState.dispatch(CachedObservable.java:220)
        at rx.internal.operators.CachedObservable$CacheState.onNext(CachedObservable.java:191)
        at rx.internal.operators.CachedObservable$CacheState$1.onNext(CachedObservable.java:171)
        at rx.internal.operators.NotificationLite.accept(NotificationLite.java:150)
        at rx.internal.operators.CachedObservable$ReplayProducer.replay(CachedObservable.java:404)
        at rx.internal.operators.CachedObservable$CacheState.dispatch(CachedObservable.java:220)
        at rx.internal.operators.CachedObservable$CacheState.onNext(CachedObservable.java:191)
        at rx.internal.operators.CachedObservable$CacheState$1.onNext(CachedObservable.java:171)
        at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:54)
        at io.reactivex.netty.protocol.http.UnicastContentSubject$AutoReleaseByteBufOperator$1.onNext(UnicastContentSubject.java:271)
        at rx.internal.operators.NotificationLite.accept(NotificationLite.java:150)
        at rx.internal.operators.BufferUntilSubscriber.emit(BufferUntilSubscriber.java:145)
        at rx.internal.operators.BufferUntilSubscriber.onNext(BufferUntilSubscriber.java:178)
        at io.reactivex.netty.protocol.http.UnicastContentSubject.onNext(UnicastContentSubject.java:295)
        at io.reactivex.netty.protocol.http.server.ServerRequestResponseConverter.invokeContentOnNext(ServerRequestResponseConverter.java:206)
        at io.reactivex.netty.protocol.http.server.ServerRequestResponseConverter.channelRead(ServerRequestResponseConverter.java:142)
        at io.netty.channel.ChannelHandlerInvokerUtil.invokeChannelReadNow(ChannelHandlerInvokerUtil.java:83)
        at io.netty.channel.DefaultChannelHandlerInvoker.invokeChannelRead(DefaultChannelHandlerInvoker.java:154)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:354)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:145)
        at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
        at io.netty.channel.ChannelHandlerInvokerUtil.invokeChannelReadNow(ChannelHandlerInvokerUtil.java:83)
        at io.netty.channel.DefaultChannelHandlerInvoker.invokeChannelRead(DefaultChannelHandlerInvoker.java:154)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:354)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:145)
        at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:277)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:264)
        at io.netty.channel.ChannelHandlerInvokerUtil.invokeChannelReadNow(ChannelHandlerInvokerUtil.java:83)
        at io.netty.channel.DefaultChannelHandlerInvoker.invokeChannelRead(DefaultChannelHandlerInvoker.java:154)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:354)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:145)
        at io.netty.handler.logging.LoggingHandler.channelRead(LoggingHandler.java:233)
        at io.netty.channel.ChannelHandlerInvokerUtil.invokeChannelReadNow(ChannelHandlerInvokerUtil.java:83)
        at io.netty.channel.DefaultChannelHandlerInvoker.invokeChannelRead(DefaultChannelHandlerInvoker.java:154)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:354)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:145)
        at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
        at io.reactivex.netty.metrics.BytesInspector.channelRead(BytesInspector.java:59)
        at io.netty.channel.ChannelHandlerInvokerUtil.invokeChannelReadNow(ChannelHandlerInvokerUtil.java:83)
        at io.netty.channel.DefaultChannelHandlerInvoker.invokeChannelRead(DefaultChannelHandlerInvoker.java:154)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:354)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:145)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:1078)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:117)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:527)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:484)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:398)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:370)
        at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:742)
        at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:145)
        at java.lang.Thread.run(Thread.java:745)

I'm kinda unsure of what can be causing this. Our Proxy is getting around 4k QPS but only redirect half of those (2k) to the RxNetty client which is configured with withMaxConnections(12000). The target server answers in an average of 30ms. We got this error after 12 hours of production and restarting the service fixed the problem (until it comes back).

What could cause the pool to be exhausted ? MAy there be something one needs to do to make sure connections are released properly ? Thanks

Crystark avatar May 12 '16 13:05 Crystark

Connection should be released when the response completes or there is an error. So, if this is happening, it looks like a leak. Hard to find where it would be coming from though without looking at the code. Do you use the metrics for connection pool? If so, do release and acquire align?

NiteshKant avatar May 13 '16 23:05 NiteshKant

I face the same problem these days we we're developing our gateway.

At the high load, first, there're few connections getting ConnectionReset exception.

Bit by bit, the later connections get the io.reactivex.netty.client.PoolExhaustedException.

Feng-Zihao avatar May 22 '16 06:05 Feng-Zihao

@NiteshKant Sorry seems like i missed your answer earlier. I don't use the metrics but I could if I knew how ;) i'll dig a bit into this see if I can find anything.

Crystark avatar May 22 '16 09:05 Crystark

@Crystark I have seen similar issue when I was using ribbon as a client. There was a bug on the way the maxConnectionsPerHost was handled and I have filed the following ticket (also working on a fix right now)

https://github.com/Netflix/ribbon/issues/290

Could this be similar to what you are seeing?

mkmainali avatar May 26 '16 12:05 mkmainali

@Crystark rxnetty-spectator module gives information about how to attach metrics listeners

NiteshKant avatar May 28 '16 07:05 NiteshKant

I still need to dig more into this as it's currently a real problem that forces us to restart our services on a regular basis but i thought I'd mention this seems linked to #523

FYI, all of our services that use RxNetty as an HTTP client have this issue.

@mkmainali we're not using Ribbon on our end and we're having this issue even on a service that uses only one RxNetty HTTP Client

Crystark avatar Jul 20 '16 16:07 Crystark

@Crystark after 2 weeks struggling with RxNetty, we have changed to use Pure Netty as our gateway implementation. It works extremely direct, clean, and high performance.

Here's an example version written in Kotlin (not our production version, but just an experiment code during research ), hope that it would be useful. https://github.com/Feng-Zihao/protox

Based on the model of protox, it's easily to gain more than 3K QPS on a normal machine, with just about 20 threads or less (I test it with 2 * core number, without deep machine/JVM turning,constant number of connection).

Feng-Zihao avatar Jul 21 '16 16:07 Feng-Zihao

@Crystark As you just interested in Http Client, please refer to the file below: https://github.com/Feng-Zihao/protox/blob/master/src/main/kotlin/org/protox/http/BackendHandler.kt It's not hard to translate it to Java.

Protox set AutoRead to false, to control the IO manually. The purpose is to reduce Memory Use.

If you don't need to concerned about Memory Use, just set autoRead to true, and attach HttpObjectAggregator to simplify the job.

Feng-Zihao avatar Jul 21 '16 16:07 Feng-Zihao

@Feng-Zihao Thanks for the advice, I'll be looking into this if I can't find out a solution using RxNetty which i'd like to keep.

@NiteshKant We're currently digging a bit more into this so if you could let me know everything I can do to find out more information on this leak it would be very helpful.

I've added RxNetty.useMetricListenersFactory(new ServoEventsListenerFactory()); to our proxy server that we'll be able to monitor through JMX. I wanted to have this come up in graphite like our other Servo metrics but it seems that this doesn't register the metrics in the MonitorRegistry. I'm not sure if I can do something about that. We're using the GraphiteMetricObserver combined with AsyncMetricObserver and CounterToRateMetricTransform with servo's PollScheduler. I haven't for now dug enough in spectator to switch to it.

Any advice would be welcomed. I'd really like to have RxNetty fully working. Thanks

Crystark avatar Jul 26 '16 13:07 Crystark

@NiteshKant

Here are some metrics collected manually through JMX. It took approximately 3 minutes to write all values down so bear that in mind when reading them. The order I'm presenting them in is the order they were wrote down in.

COUNTERS
--------

bytesRead              12717694549
bytesWritten           187986085411
connectionCount        15189579
failedConnectionClosed 0           
failedConnects         0
failedContentSource    0
failedFlush            446 
failedPoolAcquire      0       
failedPoolRelease      0       
failedResponses        5481888     
failedWrites           857516  
poolAcquire            101589015 
poolEviction           15258924  
poolRelease            101676551 
poolReuse              86438609
processedRequest       104494834      
requestWriteFailed     450        

GAUGE
-----
inflightRequest        -2725850     
liveConnections        5528     
pendingConnectionClose 0            
pendingConnects        0     
pendingFlushes         81049184     
pendingPoolAcquire     3        
pendingPoolrelease     0        
pendingWrite           0  
requestBacklog         30233    

I hope this can give you some insight on what may be happening. The service hasn't crashed yet. We'll get some more data when it does.

Crystark avatar Jul 27 '16 13:07 Crystark

Updated values just now. Our max connection is set to 20k so it will break later. We'll try with a lower max conn to make it break faster.

COUNTERS
--------

bytesRead              19721917559
bytesWritten           299488150851
connectionCount        30460281
failedConnectionClosed 0           
failedConnects         0
failedContentSource    0
failedFlush            1075 
failedPoolAcquire      0       
failedPoolRelease      0       
failedResponses        11955234     
failedWrites           1758824  
poolAcquire            163678049 
poolEviction           30557316  
poolRelease            163797064 
poolReuse              133279320
processedRequest       169876336      
requestWriteFailed     1078      

GAUGE
-----
inflightRequest        -5938256     
liveConnections        11326     
pendingConnectionClose 0            
pendingConnects        2     
pendingFlushes         124804329     
pendingPoolAcquire     0        
pendingPoolrelease     0        
pendingWrite           0  
requestBacklog         64154

Crystark avatar Jul 27 '16 16:07 Crystark

@crystark are these metrics for one RxNetty client or for multiple clients? High number of pending flushes look suspicious. I will look at the metrics more.

In order to eliminate the issue with client shutdown, if you can remove shutdown of clients, it will help narrow down the problem I think.

NiteshKant avatar Jul 27 '16 16:07 NiteshKant

@NiteshKant ~~those are all the client metrics~~. I'm not sure what you mean by client shutdown but I'll look more into this tomorrow when I'm back at work.

Crystark avatar Jul 27 '16 22:07 Crystark

@NiteshKant I'm really not sure what you mean by "client shutdown". Can you elaborate ? If you mean shutting down the RxNetty client, we should never be doing that.

I see my previous answer wasn't really on topic (sorry it was late). Those are the metrics for only one RxNetty client in our reverse proxy server.

Crystark avatar Jul 28 '16 08:07 Crystark

More data. We restarted the server with a client using max connection at 2000 so it crashes faster. Here is the data right before and right after it crashed (reached 2k live connections).

Thu Jul 28 10:45:01 UTC 2016
COUNTERS:
bytesRead:value=5366545868
bytesWritten:value=87346338408
connectionCount:value=6054151
failedConnectionClose:value=0
failedConnects:value=0
failedContentSource:value=0
failedFlushes:value=122
failedPoolAcquires:value=37
failedPoolReleases:value=0
failedResponses:value=1882875
failedWrites:value=313334
poolAcquires:value=47481382
poolEvictions:value=6056595
poolReleases:value=47483216
poolReuse:value=41427256
processedRequests:value=48419617
requestWriteFailed:value=124
GAUGE:
liveConnections:value=1919
pendingFlushes:value=38802682
pendingConnects:value=1
pendingConnectionClose:value=0
pendingPoolAcquires:value=0
pendingPoolReleases:value=0
pendingWrites:value=0
requestBacklog:value=10772
inflightRequests:value=-932203
flushTimes:value=5500
connectionTimes:value=946
------------------------------------------------
Thu Jul 28 10:46:01 UTC 2016
COUNTERS:
bytesRead:value=5377140316
bytesWritten:value=87510804902
connectionCount:value=6069020
failedConnectionClose:value=0
failedConnects:value=0
failedContentSource:value=0
failedFlushes:value=125
failedPoolAcquires:value=132691
failedPoolReleases:value=0
failedResponses:value=1888534
failedWrites:value=314078
poolAcquires:value=47692433
poolEvictions:value=6067022
poolReleases:value=47555589
poolReuse:value=41488729
processedRequests:value=48491275
requestWriteFailed:value=125
GAUGE:
liveConnections:value=2000
pendingFlushes:value=38839631
pendingConnects:value=0
pendingConnectionClose:value=0
pendingPoolAcquires:value=0
pendingPoolReleases:value=0
pendingWrites:value=0
requestBacklog:value=154082
inflightRequests:value=-789577
flushTimes:value=1714
connectionTimes:value=298

Note: I have about half an hour of those stats extracted every minutes if you need. We automated the process as we couldn't graph it.

Crystark avatar Jul 28 '16 12:07 Crystark

@NiteshKant We updated to the RxNetty 0.4.18 with great hopes but we're still facing the connection leaks. Here are the extracted metrics. The max connection was set to 10k on this server.

------------------------------------------------
Tue Aug 30 13:34:13 UTC 2016
COUNTERS:
bytesRead:value=22214935
bytesWritten:value=301571174
connectionCount:value=46530
failedConnectionClose:value=0
failedConnects:value=0
failedContentSource:value=0
failedFlushes:value=0
failedPoolAcquires:value=0
failedPoolReleases:value=0
failedResponses:value=14530
failedWrites:value=1792
poolAcquires:value=176456
poolEvictions:value=48762
poolReleases:value=177645
poolReuse:value=129285
processedRequests:value=186335
requestWriteFailed:value=0
GAUGE:
liveConnections:value=24
pendingFlushes:value=113821
pendingConnects:value=1
pendingConnectionClose:value=0
pendingPoolAcquires:value=0
pendingPoolReleases:value=0
pendingWrites:value=0
requestBacklog:value=88
inflightRequests:value=-7716
flushTimes:value=2180
connectionTimes:value=617
------------------------------------------------
Tue Aug 30 13:35:01 UTC 2016
COUNTERS:
bytesRead:value=38314550
bytesWritten:value=525947318
connectionCount:value=86181
failedConnectionClose:value=0
failedConnects:value=0
failedContentSource:value=0
failedFlushes:value=1
failedPoolAcquires:value=0
failedPoolReleases:value=0
failedResponses:value=26455
failedWrites:value=3234
poolAcquires:value=303131
poolEvictions:value=88797
poolReleases:value=304584
poolReuse:value=216251
processedRequests:value=319400
requestWriteFailed:value=1
GAUGE:
liveConnections:value=38
pendingFlushes:value=189347
pendingConnects:value=0
pendingConnectionClose:value=0
pendingPoolAcquires:value=0
pendingPoolReleases:value=0
pendingWrites:value=2
requestBacklog:value=160
inflightRequests:value=-13495
flushTimes:value=2626
connectionTimes:value=789
------------------------------------------------
Tue Aug 30 13:36:01 UTC 2016
COUNTERS:
bytesRead:value=59477695
bytesWritten:value=810126386
connectionCount:value=128479
failedConnectionClose:value=0
failedConnects:value=0
failedContentSource:value=0
failedFlushes:value=1
failedPoolAcquires:value=0
failedPoolReleases:value=0
failedResponses:value=37365
failedWrites:value=4676
poolAcquires:value=458178
poolEvictions:value=130065
poolReleases:value=459549
poolReuse:value=329898
processedRequests:value=479510
requestWriteFailed:value=1
GAUGE:
liveConnections:value=54
pendingFlushes:value=288876
pendingConnects:value=1
pendingConnectionClose:value=0
pendingPoolAcquires:value=0
pendingPoolReleases:value=0
pendingWrites:value=0
requestBacklog:value=225
inflightRequests:value=-18832
flushTimes:value=2610
connectionTimes:value=704
------------------------------------------------

...

------------------------------------------------
Tue Aug 30 23:16:01 UTC 2016
COUNTERS:
bytesRead:value=12657987871
bytesWritten:value=191209255585
connectionCount:value=29716643
failedConnectionClose:value=0
failedConnects:value=0
failedContentSource:value=0
failedFlushes:value=952
failedPoolAcquires:value=0
failedPoolReleases:value=0
failedResponses:value=10671653
failedWrites:value=1305752
poolAcquires:value=105677887
poolEvictions:value=29709864
poolReleases:value=105671034
poolReuse:value=75961602
processedRequests:value=110960746
requestWriteFailed:value=953
GAUGE:
liveConnections:value=8398
pendingFlushes:value=66041731
pendingConnects:value=0
pendingConnectionClose:value=0
pendingPoolAcquires:value=0
pendingPoolReleases:value=0
pendingWrites:value=4
requestBacklog:value=55868
inflightRequests:value=-5281009
flushTimes:value=3068
connectionTimes:value=773
------------------------------------------------
Tue Aug 30 23:17:01 UTC 2016
COUNTERS:
bytesRead:value=12679116014
bytesWritten:value=191541020749
connectionCount:value=29760853
failedConnectionClose:value=0
failedConnects:value=0
failedContentSource:value=0
failedFlushes:value=954
failedPoolAcquires:value=0
failedPoolReleases:value=0
failedResponses:value=10688029
failedWrites:value=1307696
poolAcquires:value=105863755
poolEvictions:value=29754374
poolReleases:value=105857076
poolReuse:value=76103210
processedRequests:value=111154980
requestWriteFailed:value=954
GAUGE:
liveConnections:value=8407
pendingFlushes:value=66165040
pendingConnects:value=0
pendingConnectionClose:value=0
pendingPoolAcquires:value=0
pendingPoolReleases:value=0
pendingWrites:value=0
requestBacklog:value=55968
inflightRequests:value=-5288964
flushTimes:value=3066
connectionTimes:value=733
------------------------------------------------
Tue Aug 30 23:18:01 UTC 2016
COUNTERS:
bytesRead:value=12700535635
bytesWritten:value=191875283953
connectionCount:value=29805643
failedConnectionClose:value=0
failedConnects:value=0
failedContentSource:value=0
failedFlushes:value=957
failedPoolAcquires:value=0
failedPoolReleases:value=0
failedResponses:value=10704510
failedWrites:value=1309768
poolAcquires:value=106048166
poolEvictions:value=29799207
poolReleases:value=106041342
poolReuse:value=76242427
processedRequests:value=111347051
requestWriteFailed:value=957
GAUGE:
liveConnections:value=8418
pendingFlushes:value=66286462
pendingConnects:value=0
pendingConnectionClose:value=0
pendingPoolAcquires:value=0
pendingPoolReleases:value=0
pendingWrites:value=0
requestBacklog:value=56060
inflightRequests:value=-5297203
flushTimes:value=3076
connectionTimes:value=741
------------------------------------------------

AUTO REBOOT DUE TO TOO HIGH liveConnections VALUES HAPPENED HERE

------------------------------------------------
Tue Aug 30 23:19:01 UTC 2016
COUNTERS:
bytesRead:value=2785258
bytesWritten:value=44089930
connectionCount:value=1779
failedConnectionClose:value=0
failedConnects:value=0
failedContentSource:value=0
failedFlushes:value=0
failedPoolAcquires:value=0
failedPoolReleases:value=0
failedResponses:value=1014
failedWrites:value=154
poolAcquires:value=31794
poolEvictions:value=3352
poolReleases:value=33319
poolReuse:value=30419
processedRequests:value=35612
requestWriteFailed:value=0
GAUGE:
liveConnections:value=11
pendingFlushes:value=28512
pendingConnects:value=2
pendingConnectionClose:value=0
pendingPoolAcquires:value=10
pendingPoolReleases:value=0
pendingWrites:value=0
requestBacklog:value=10
inflightRequests:value=-1572
flushTimes:value=324
connectionTimes:value=14

An other fact we observed: those liveConnection values increase much more slowly on servers with less than 1000 QPS (like 50x slower). Over that value it starts building up much faster. For instance the metrics i extracted are from a server that receives an average of 1700 QPS and caps around 2000 QPS.

Would you have any lead to help us figure out what's going wrong ? it seems to be due to high concurrency of requests and i suspect the fact that some connections are closed early may lead to these.

FYI, we have a combination of load balancers with the reverse proxy that have their own rules which could lead to various behaviors of input and output requests. I'd love to be more specific but this would require a more private conversation. Is there anyway we could exchange via email or some other private channel ?

Thanks

Crystark avatar Aug 31 '16 09:08 Crystark

@Crystark I believe that the issue you are seeing is because you are writing too fast/too much of data OR some code path is blocking the eventloop. If you are writing too fast then you need backpressure or pessimistic limits to control the number of concurrent requests being processed by the server. Netty backpressure is hooked up in RxNetty only in 0.5.x.

Sure, I would love to hear more about your use-case and how you are using RxNetty too. You can email me at nitesh1706 at gmail and we can take it from there.

NiteshKant avatar Sep 02 '16 04:09 NiteshKant

@NiteshKant : We are using Ribbon and seeing the similar issue where our JVMs end up in a zombie state where all the request to that instance is a 500 failing with below error:

Caused by: com.netflix.client.ClientException: Number of retries on next server exceeded max 2 retries, while making a call for:

We have to manually bounce the JVM to get it back to normal. Here are the ribbon connection configs MaxConnectionsPerHost: 600 MaxTotalConnections: 3000 ( 5 backend servers X MaxConnectionsPerHost)

We were previously using Ribbon 2.0.0 and i have tried updating it to 2.2.2 and also updated RxNetty to 0.5.1 considering the Connection Leak issue# 532. Coz i initally assumed it to be something to do with RxNetty. But no luck and the issue still persists.

Request your inputs on the above issue.

sripadapavan avatar May 31 '17 05:05 sripadapavan

Simple repro:

for our project, https://github.com/Azure/azure-cosmosdb-java, we are trying to upgrade our dependency on RxNetty from 0.4.x to 0.5.2 however it seems there is a connection leak issue in 0.5.2 that we are facing.

code to repro:

public class testing {

    private static class DefaultSSLEngineFactory implements Func1<ByteBufAllocator, SSLEngine> {
        private final SslContext sslContex;

        private DefaultSSLEngineFactory() {
            try {
                SslProvider sslProvider = SslContext.defaultClientProvider();
                sslContex = SslContextBuilder.forClient().sslProvider(sslProvider).build();
            } catch (SSLException e) {
                throw new IllegalStateException("Failed to create default SSL context", e);
            }
        }

        @Override
        public SSLEngine call(ByteBufAllocator byteBufAllocator) {
            return sslContex.newEngine(byteBufAllocator);
        }
    }

    public static void main(String[] args) throws  Exception {

        PoolConfig<ByteBuf, ByteBuf> pConfig = new PoolConfig<>();
        pConfig.maxConnections(1000);
        HttpClient<ByteBuf, ByteBuf> cl = HttpClientImpl.newClient(SingleHostPoolingProviderFactory.create(pConfig),
                Observable.just(new Host(new InetSocketAddress(InetAddress.getByName("github.com"), 443))))
                .secure(new DefaultSSLEngineFactory());

        List<Observable<Void>> list = new ArrayList<>();

        for(int i = 0; i < 4000; i++) {

            Observable<HttpClientResponse<ByteBuf>> rsp =
                    cl.createRequest(HttpMethod.GET, "/").writeBytesContent(Observable.just(new byte[] {}));

            Observable<Void> contentDiscardedObs = rsp.flatMap(hcr -> hcr.discardContent());
            list.add(contentDiscardedObs);
        }

        List<Void> res = Observable.merge(list, 1000)
                .observeOn(Schedulers.computation())
                .toList().toBlocking().single();

        System.out.println("DONE");
    }
}

As Connection Pool Size is 1000, and we are merging results with 1000 as the degree of concurrency the above code should work. However we get the following failure:

Exception in thread "main" java.lang.RuntimeException: io.reactivex.netty.client.pool.PoolExhaustedException: Client connection pool exhausted.
	at rx.exceptions.Exceptions.propagate(Exceptions.java:57)
	at rx.observables.BlockingObservable.blockForSingle(BlockingObservable.java:463)
	at rx.observables.BlockingObservable.single(BlockingObservable.java:340)
	at testing.main(testing.java:66)
Caused by: io.reactivex.netty.client.pool.PoolExhaustedException: Client connection pool exhausted.
	at io.reactivex.netty.client.pool.PooledConnectionProviderImpl$7.call(PooledConnectionProviderImpl.java:193)
	at io.reactivex.netty.client.pool.PooledConnectionProviderImpl$7.call(PooledConnectionProviderImpl.java:173)
	at rx.Observable.unsafeSubscribe(Observable.java:10256)
	at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.drain(OnSubscribeConcatMap.java:286)
	at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.onNext(OnSubscribeConcatMap.java:144)
	at rx.internal.operators.OnSubscribeFromArray$FromArrayProducer.slowPath(OnSubscribeFromArray.java:100)
	at rx.internal.operators.OnSubscribeFromArray$FromArrayProducer.request(OnSubscribeFromArray.java:63)
	at rx.Subscriber.setProducer(Subscriber.java:211)
	at rx.internal.operators.OnSubscribeFromArray.call(OnSubscribeFromArray.java:32)
	at rx.internal.operators.OnSubscribeFromArray.call(OnSubscribeFromArray.java:24)
	at rx.Observable.unsafeSubscribe(Observable.java:10256)
	at rx.internal.operators.OnSubscribeConcatMap.call(OnSubscribeConcatMap.java:94)
	at rx.internal.operators.OnSubscribeConcatMap.call(OnSubscribeConcatMap.java:42)
	at rx.Observable.unsafeSubscribe(Observable.java:10256)
	at rx.internal.operators.OnSubscribeFilter.call(OnSubscribeFilter.java:45)
	at rx.internal.operators.OnSubscribeFilter.call(OnSubscribeFilter.java:30)
	at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
	at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
	at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
	at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
	at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
	at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
	at rx.Observable.unsafeSubscribe(Observable.java:10256)
	at io.reactivex.netty.client.pool.PooledConnectionProviderImpl$4.call(PooledConnectionProviderImpl.java:129)
	at io.reactivex.netty.client.pool.PooledConnectionProviderImpl$4.call(PooledConnectionProviderImpl.java:108)
	at rx.Observable.unsafeSubscribe(Observable.java:10256)
	at io.reactivex.netty.protocol.tcp.client.ConnectionRequestImpl$1.call(ConnectionRequestImpl.java:30)
	at io.reactivex.netty.protocol.tcp.client.ConnectionRequestImpl$1.call(ConnectionRequestImpl.java:27)
	at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
	at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
	at rx.Observable.unsafeSubscribe(Observable.java:10256)
	at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:48)
	at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:33)
	at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
	at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
	at rx.Observable.unsafeSubscribe(Observable.java:10256)
	at io.reactivex.netty.protocol.http.client.internal.HttpClientRequestImpl$OnSubscribeFuncImpl.call(HttpClientRequestImpl.java:447)
	at io.reactivex.netty.protocol.http.client.internal.HttpClientRequestImpl$OnSubscribeFuncImpl.call(HttpClientRequestImpl.java:420)
	at rx.Observable.unsafeSubscribe(Observable.java:10256)
	at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:48)
	at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:33)
	at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
	at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
	at rx.Observable.unsafeSubscribe(Observable.java:10256)
	at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:248)
	at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:148)
	at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.slowPath(OnSubscribeFromIterable.java:117)
	at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.request(OnSubscribeFromIterable.java:89)
	at rx.Subscriber.request(Subscriber.java:157)
	at rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop(OperatorMerge.java:781)
	at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:568)
	at rx.internal.operators.OperatorMerge$InnerSubscriber.onCompleted(OperatorMerge.java:857)
	at rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop(OperatorMerge.java:656)
	at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:568)
	at rx.internal.operators.OperatorMerge$InnerSubscriber.onCompleted(OperatorMerge.java:857)
	at rx.internal.operators.OperatorIgnoreElements$1.onCompleted(OperatorIgnoreElements.java:42)
	at rx.internal.operators.OnSubscribeMap$MapSubscriber.onCompleted(OnSubscribeMap.java:97)
	at io.reactivex.netty.protocol.http.internal.AbstractHttpConnectionBridge$ConnectionInputSubscriber.contentComplete(AbstractHttpConnectionBridge.java:508)
	at io.reactivex.netty.protocol.http.internal.AbstractHttpConnectionBridge.processNextItemInEventloop(AbstractHttpConnectionBridge.java:283)
	at io.reactivex.netty.protocol.http.internal.AbstractHttpConnectionBridge.access$1200(AbstractHttpConnectionBridge.java:56)
	at io.reactivex.netty.protocol.http.internal.AbstractHttpConnectionBridge$ConnectionInputSubscriber.onNext(AbstractHttpConnectionBridge.java:431)
	at io.reactivex.netty.channel.AbstractConnectionToChannelBridge$ReadProducer.sendOnNext(AbstractConnectionToChannelBridge.java:373)
	at io.reactivex.netty.channel.AbstractConnectionToChannelBridge.newMessage(AbstractConnectionToChannelBridge.java:189)
	at io.reactivex.netty.channel.BackpressureManagingHandler.channelRead(BackpressureManagingHandler.java:77)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
	at io.reactivex.netty.protocol.http.ws.client.Ws7To13UpgradeHandler.channelRead(Ws7To13UpgradeHandler.java:135)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:438)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284)
	at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
	at io.netty.handler.codec.MessageToMessageCodec.channelRead(MessageToMessageCodec.java:111)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1389)
	at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1159)
	at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1203)
	at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:489)
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:428)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:265)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
	at io.reactivex.netty.channel.BytesInspector.channelRead(BytesInspector.java:56)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1414)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:945)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:146)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)

can someone please look at this? I only face this problem with 0.5.2 but not 0.4.x does anyone have any suggestion? @NiteshKant

moderakh avatar Jul 16 '18 23:07 moderakh

I'm actually here because I'm getting the same issue using azure-cosmosdb-java with RxNetty 0.4.20 version. Would love to hear some suggestions as well.

Rakatashii avatar Jun 27 '19 16:06 Rakatashii

I am facing this issue with Netflix Ribbon loadbalancer. I am getting io.reactivex.netty.client.pool.PoolExhaustedException for requests. Please help if anybody having solution for this. I am using RxNetty 0.5.1 currently.

sourafmulla avatar Aug 13 '20 17:08 sourafmulla