asynch icon indicating copy to clipboard operation
asynch copied to clipboard

Why aren't the connections to Clickhouse closed?

Open IzyI opened this issue 3 years ago • 3 comments

Hi. I have this code.

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional
import sentry_sdk
from sentry_sdk.integrations.asgi import SentryAsgiMiddleware
import uuid
from asynch.cursors import DictCursor
import motor.motor_asyncio
from settings import MONGODB_USER, MONGODB_PASS, MONGODB_PORT, MONGODB_HOST, MONGODB_NAME, CLICKHOUSE_HOST, \
    CLICKHOUSE_USER, CLICKHOUSE_PASS, CLICKHOUSE_PORT, CLICKHOUSE_NAME
from asynch import connect

MONGO_DETAILS = f"mongodb://{MONGODB_USER}:{MONGODB_PASS}@{MONGODB_HOST}:{MONGODB_PORT}"
client = motor.motor_asyncio.AsyncIOMotorClient(MONGO_DETAILS)
db = client[MONGODB_NAME]


async def connect_database_click():
    return await connect(
        host=CLICKHOUSE_HOST,
        port=CLICKHOUSE_PORT,
        database=CLICKHOUSE_NAME,
        user=CLICKHOUSE_USER,
        password=CLICKHOUSE_PASS,
    )


async def insert_1_events(params, db_clickhouse):
    async with db_clickhouse.cursor(cursor=DictCursor) as cursor:
        await cursor.execute(
            f'INSERT INTO events1 '
            f'(id,user_id,data VALUES',
            params
        )


async def insert_2_events(params, db_clickhouse):
    async with db_clickhouse.cursor(cursor=DictCursor) as cursor:
        await cursor.execute(
            f'INSERT INTO events2 '
            f'(id,user_id,data VALUES',
            params
        )


async def insert_3_events(params, db_clickhouse):
    async with db_clickhouse.cursor(cursor=DictCursor) as cursor:
        await cursor.execute(
            f'INSERT INTO events3 '
            f'(id,user_id,data VALUES',
            params
        )


async def insert_4_events(params, db_clickhouse):
    async with db_clickhouse.cursor(cursor=DictCursor) as cursor:
        await cursor.execute(
            f'INSERT INTO events4 '
            f'(id,user_id,data VALUES',
            params
        )


class EventScheme(BaseModel):
    user_id: Optional[str]
    data: Optional[dict]


app = FastAPI()
sentry_sdk.init(dsn="https://*******@sny.*****.com/14")


@app.get('/sentry-debug')
async def trigger_error():
    return 1 / 0


@app.get('/ping')
async def ping():
    return {"result": True}


@app.post("/save-event")
async def save_event(event: EventScheme):
    db_clickhouse = await connect_database_click()

    data = event.dict()
    del data['token']
    params = [{
        'id': str(uuid.uuid4()),
        "user_id": data.get('user_id'),

    }]

    if data.get('name') == 'ping':
        params['data'] = data.get('data_ping')
        await insert_1_events(params, db_clickhouse)
    elif data.get('name') == 'first_run':
        params['data'] = data.get('data_first_run')
        await insert_2_events(params, db_clickhouse)
    elif data.get('name') == 'first':
        params['data'] = data.get('data_first')
        await insert_3_events(params, db_clickhouse)
    elif data.get('name') == 'first':
        params['data'] = data.get('data_first')
        await insert_4_events(params, db_clickhouse)
    else:
        print('NONE EVENTS')

    await db_clickhouse.close()
    # пихаем в монгу
    try:
        await db.events.insert_one(data)
    except Exception as e:
        print(e)
    return {'success': True}


app = SentryAsgiMiddleware(app)

It works fine, but for some reason it doesn't close the clickhouse connections. Why is this happening? It gives 200 for all requests until the number of connections exceeds the limit, then an error occurs:

DB::Exception: Too many simultaneous queries. Maximum: 100

IzyI avatar Feb 11 '22 12:02 IzyI

There are a lot of requests.

IzyI avatar Feb 11 '22 13:02 IzyI

I tested but not that error, is there too many requests at same time?

long2ice avatar Feb 13 '22 02:02 long2ice

You can try latest source code

long2ice avatar May 11 '22 13:05 long2ice