aiohttp icon indicating copy to clipboard operation
aiohttp copied to clipboard

Connection Pool does not reuse the TCP connection

Open mrzhangboss opened this issue 8 years ago • 5 comments

TCPConnector not reusing the TCP connection

TCPConnector does not reuse all TCP connections which have same host and port and limit all request use one TCP connector, that make my tcp client slow

I think the TCPConnector should reuse all same TCP connection in connection pool

Let me show what happen in my code

First I build a web from flask

    from flask import Flask, request
    from werkzeug.serving import WSGIRequestHandler
    
    app = Flask(__name__)
    WSGIRequestHandler.protocol_version = "HTTP/1.1"  # I open http 1.1 to connection pool
    app.debug = False
    from functools import reduce
    from collections import defaultdict
    
    host_ports = defaultdict(int)
    
    
    
    @app.route('/')
    def hello_world():
        host_ports[request.remote_addr, request.environ.get('REMOTE_PORT')] += 1
        return '%s:%s' % (request.remote_addr, request.environ.get('REMOTE_PORT'))
    
    
    @app.route('/sum')
    def count():
        reduce(lambda x,y:x+y, host_ports.values())
        sum = reduce(lambda x,y:x+y, host_ports.values())
        sort_values = sorted(host_ports.items(), key=lambda x:x[1],  reverse=True)
        return 'ip-port num:%d \n req sum: %d \n sorted values:%r' % (len(host_ports.keys()), sum, sort_values)
    
    
    if __name__ == '__main__':
        app.run(host=None, port=8000, threaded=True)

I make flask to use HTTP/1.1 then it will open connection alive

then I try use aiohttp client, try to use TCP connection pool by TCPConnector, alas the TCPConnector not reuse all the TCP connections in pool, almost just use one in all request, here my client code

    import time
    import asyncio
    from aiohttp import ClientSession, TCPConnector
    
    async def fetch(url, session):
        async with session.get(url) as response:
            print(await response.read())
    
    connector = TCPConnector(limit=20)
    session = ClientSession(connector=connector)
    nums = 1000
    url = 'http://127.0.0.1:8000/'
    tasks = [fetch(url, session) for x in range(nums)]
    begin = time.time()
    try:
        loop = asyncio.get_event_loop()
        loop.run_until_complete(asyncio.wait(tasks))
    except:
        pass
    finally:
        end = time.time()
        loop.close()
        session.close()
        print('cost', end - begin, 'speed', nums / (end - begin), 'req/s')

The output is cost 40.27701497077942 speed 24.828056416928867 req/s, and I open http://127.0.0.1:8000/sum, the out put is

    ip-port num:20 
    req sum: 1000 
    sorted values:[(('127.0.0.1', 57934), 965), (('127.0.0.1', 57948), 6), (('127.0.0.1', 57937), 3), (('127.0.0.1', 57938), 2), (('127.0.0.1', 57939), 2), (('127.0.0.1', 57946), 2), (('127.0.0.1', 57947), 2), (('127.0.0.1', 57935), 2), (('127.0.0.1', 57945), 2), (('127.0.0.1', 57932), 2), (('127.0.0.1', 57930), 2), (('127.0.0.1', 57940), 2), (('127.0.0.1', 57949), 1), (('127.0.0.1', 57936), 1), (('127.0.0.1', 57944), 1), (('127.0.0.1', 57942), 1), (('127.0.0.1', 57933), 1), (('127.0.0.1', 57943), 1), (('127.0.0.1', 57931), 1), (('127.0.0.1', 57941), 1)] 

almost all request use one TCP client 127.0.0.1:57934 in my computer, this is amazing.and I try two way to fix it

  1. use asyncio.Semaphore, this is helpful,but I don't want to control it speed
  2. use session.get(url) request twice, it kind of help, but it awulf.

My backgroud is py3.5.1, my aiohttp==2.2.5

mrzhangboss avatar Oct 14 '17 16:10 mrzhangboss

http://aiohttp.readthedocs.io/en/stable/client_reference.html?highlight=limit_per_host#aiohttp.BaseConnector.limit_per_host

Might help You

hellysmile avatar Oct 14 '17 16:10 hellysmile

Thank you for your answer but I add limit_per_host to connector = TCPConnector(limit=20, limit_per_host=20), it still not reuse all the alive connection in connection pool. Here my out put in my computer ``

ip-port num:20
 req sum: 1000 
sorted values:[(('127.0.0.1', 36648), 886), (('127.0.0.1', 36636), 87), (('127.0.0.1', 36645), 5), (('127.0.0.1', 36643), 2), (('127.0.0.1', 36644), 2), (('127.0.0.1', 36647), 2), (('127.0.0.1', 36646), 2), (('127.0.0.1', 36640), 2), (('127.0.0.1', 36642), 1), (('127.0.0.1', 36653), 1), (('127.0.0.1', 36635), 1), (('127.0.0.1', 36652), 1), (('127.0.0.1', 36634), 1), (('127.0.0.1', 36649), 1), (('127.0.0.1', 36637), 1), (('127.0.0.1', 36651), 1), (('127.0.0.1', 36641), 1), (('127.0.0.1', 36650), 1), (('127.0.0.1', 36639), 1), (('127.0.0.1', 36638), 1)]

You can see it main use one connection all the time. And the speed of all 1000 request only 36 request/second.

mrzhangboss avatar Oct 15 '17 13:10 mrzhangboss

try to add limit_per_host=1

hellysmile avatar Oct 15 '17 13:10 hellysmile

Thank you but this is equalliy use limt=1, I want use TCPConnctor with mult TCP connections in connection pool and reuse them. I find that if I add a asyncio.Semaphore to my function like

async def fetch(url, session, semaphore):
			async with semaphore:
				async with session.get(url) as response:
					print(await response.read())

only in this way that TCPConnctor can reuse all TCP Connection in Connection pool, I wonder that it might be some wrong in TCP connction reusing TCP Connection in Connection pool in TCPConnctor

mrzhangboss avatar Oct 15 '17 14:10 mrzhangboss

I tried the following code, which behave as expected: server.py

from flask import Flask, request
from werkzeug.serving import WSGIRequestHandler

app = Flask(__name__)
WSGIRequestHandler.protocol_version = "HTTP/1.1"  # I open http 1.1 to connection pool
app.debug = False
from functools import reduce
from collections import defaultdict

host_ports = defaultdict(int)



@app.route('/')
def hello_world():
    host_ports[request.remote_addr, request.environ.get('REMOTE_PORT')] += 1
    return '%s:%s' % (request.remote_addr, request.environ.get('REMOTE_PORT'))


@app.route('/sum')
def count():
    reduce(lambda x,y:x+y, host_ports.values())
    sum = reduce(lambda x,y:x+y, host_ports.values())
    sort_values = sorted(host_ports.items(), key=lambda x:x[1],  reverse=True)
    return 'ip-port num:%d \n req sum: %d \n sorted values:%r' % (len(host_ports.keys()), sum, sort_values)


if __name__ == '__main__':
    app.run(host=None, port=8000, threaded=True)

client.py

import time
import asyncio
from aiohttp import ClientSession, TCPConnector


async def fetch(session, url):
    async with session.get(url) as response:
        print(await response.read())

async def main():
    connector = TCPConnector(limit=1)

    async with ClientSession(connector=connector) as session:
        nums = 1000
        url = 'http://127.0.0.1:8000/'
        tasks = [fetch(session, url) for x in range(nums)]
        begin = time.time()
        await asyncio.wait(tasks)
        end = time.time()

    print('cost', end - begin, 'speed', nums / (end - begin), 'req/s')


asyncio.run(main())

client output

...
cost 6.090178966522217 speed 164.19878717801421 req/s

server /sum output:

ip-port num:1
 req sum: 1000
 sorted values:[(('127.0.0.1', 54372), 1000)]

Python 3.9.4 aiohttp==3.7.4.post0 Flask==2.0.1

alviezhang avatar Jun 03 '21 00:06 alviezhang