aiohttp-client-cache icon indicating copy to clipboard operation
aiohttp-client-cache copied to clipboard

ProgrammingError: Cannot operate on a closed database.

Open holymode opened this issue 1 year ago • 3 comments

The problem

Sometimes when I use my function to fetch data, it errors out and gives me ProgrammingError: Cannot operate on a closed database. My bot serves many people, I think this is just some weird race condition? I just added more caching to my bot to save on request and notice the error popping up more and more, before it was very rarely. Traceback:

   File "/root/bots/agent-blox/.venv/lib/python3.10/site-packages/discord/ext/commands/core.py", line 235, in wrapped
    ret = await coro(*args, **kwargs)
   File "/root/bots/agent-blox/ext/robloxx.py", line 847, in get_roblox_id_info
    count = await get_follow_count(user.id)
   File "/root/bots/agent-blox/ext/robloxx.py", line 320, in get_follow_count
    following_response = await proxy_manager.fetch_with_proxy_cache(
   File "/root/bots/agent-blox/helpers/proxymanager.py", line 166, in fetch_with_proxy_cache
    async with session.request(method.upper(), url, params=params, json=json, headers=headers, data=data, cookies=cookies) as response:
   File "/root/bots/agent-blox/.venv/lib/python3.10/site-packages/aiohttp/client.py", line 1197, in __aenter__
    self._resp = await self._coro
   File "/root/bots/agent-blox/.venv/lib/python3.10/site-packages/aiohttp_client_cache/session.py", line 51, in _request
    response, actions = await self.cache.request(
   File "/root/bots/agent-blox/.venv/lib/python3.10/site-packages/aiohttp_client_cache/backends/base.py", line 139, in request
    response = None if actions.skip_read else await self.get_response(actions.key)
   File "/root/bots/agent-blox/.venv/lib/python3.10/site-packages/aiohttp_client_cache/backends/base.py", line 147, in get_response
    response = await self.responses.read(key) or await self._get_redirect_response(str(key))
   File "/root/bots/agent-blox/.venv/lib/python3.10/site-packages/aiohttp_client_cache/backends/base.py", line 169, in _get_redirect_response
    redirect_key = await self.redirects.read(key)
   File "/root/bots/agent-blox/.venv/lib/python3.10/site-packages/aiohttp_client_cache/backends/sqlite.py", line 180, in read
    row = await cursor.fetchone()
   File "/root/bots/agent-blox/.venv/lib/python3.10/site-packages/aiosqlite/cursor.py", line 65, in fetchone
    return await self._execute(self._cursor.fetchone)
   File "/root/bots/agent-blox/.venv/lib/python3.10/site-packages/aiosqlite/cursor.py", line 40, in _execute
    return await self._conn._execute(fn, *args, **kwargs)
   File "/root/bots/agent-blox/.venv/lib/python3.10/site-packages/aiosqlite/core.py", line 132, in _execute
    return await future
   File "/root/bots/agent-blox/.venv/lib/python3.10/site-packages/aiosqlite/core.py", line 115, in run
    result = function()
 sqlite3.ProgrammingError: Cannot operate on a closed database.
 
The above exception was the direct cause of the following exception:

 Traceback (most recent call last):
   File "/root/bots/agent-blox/.venv/lib/python3.10/site-packages/discord/ext/commands/bot.py", line 1366, in invoke
    await ctx.command.invoke(ctx)
   File "/root/bots/agent-blox/.venv/lib/python3.10/site-packages/discord/ext/commands/core.py", line 1029, in invoke
    await injected(*ctx.args, **ctx.kwargs)  # type: ignore
   File "/root/bots/agent-blox/.venv/lib/python3.10/site-packages/discord/ext/commands/core.py", line 244, in wrapped
    raise CommandInvokeError(exc) from exc
 discord.ext.commands.errors.CommandInvokeError: Command raised an exception: ProgrammingError: Cannot operate on a closed database.

Expected behavior

Working without trowing any error.

Steps to reproduce the behavior

My function:

async def fetch_with_proxy_cache(self, url, method: typing.Literal["get", "post"] = "get", use_cache: bool = False, params=None, json=None, headers=None, data=None, cookies=None):
        retries = 0
        while retries < self.max_retries:
            session_class = CachedSession(cache=cache) if use_cache else ClientSession()
            try:
                async with session_class as session:
                    async with session.request(method.upper(), url, params=params, json=json, headers=headers, data=data, cookies=cookies) as response:
                        logger.debug(f"Fetching {url} with cache: {use_cache} - Status: {response.status}")
                        if response.status == 429:
                            return await self.handle_rate_limit(url, session, method, params, json, headers, data, cookies)

                        elif response.status < 500:
                            self.rate_limit_counter = 0
                            response_json = await response.json()
                            if response_json is not None:
                                return response

                            logger.debug(f"Received None JSON. Retrying... Attempt {retries + 1}")
                            retries += 1
                        else:
                            logger.debug(f"Request failed: {response.status} - {response.reason}")
                            retries += 1
                            continue

            except aiohttp.ClientError:
                retries += 1
                logger.exception("ClientError")
                continue

        logger.debug(f"Max retries reached. Giving up on {url}")
        return None

Environment

  • aiohttp-client-cache version: 0.11.0
  • Python version: 3.10
  • Platform: ubuntu 22.04

holymode avatar May 25 '24 19:05 holymode

From the code snippet that you shared it is not clear how and where you manage the cache variable.

I advise you to start with How to debug small programs and create a minimal reproducible example (MRE) that another developer can copy and execute. Without it, it is usually too difficult to say what is wrong. The package creator has a broad background in the subject, but the creation and sharing of MRE from your side would be highly appreciated.

alessio-locatelli avatar May 26 '24 06:05 alessio-locatelli

Just to add more context: This function is used in a loop, that runs every 2min and probably makes around 400-600 requests each time.

cache = SQLiteBackend(
    cache_name='cache/aiohttp-requests.db',
    expire_after=default_expire_after,
    urls_expire_after=urls_expire_after,
    allowed_methods=['GET', 'POST'],
    include_headers=False

)

That's the cache setup.

I will try to rewrite one but not 100% sure if i can recreate the error.

holymode avatar May 26 '24 19:05 holymode

I agree this is a somewhat tricky one to debug. I don't see any problems with the code snippet you provided, and can't reproduce this myself.

Somehow the SQLite connection object is being closed before (or at the same time as) a request is sent. If the connection is closed when a request is made, it is supposed to be reopened, so this sounds like a race condition. Is this loop potentially running from multiple threads?

One thing you could try is passing autoclose=False to the cache object, and then manually call cache.close() when you're done. Let me know if that helps.

JWCook avatar May 28 '24 17:05 JWCook

I have had a similar problem. The root of the problem might be the same.

My question is whether a user is supposed to be able to pass the cache object to multiple CachedSessions instances that are running concurrently? If a single cache variable should be only used by a single CachedSession at a time, then that is fine, end of discussion. However, since SQLiteBackend uses aiosqlite, I thought that it should be fine having multiple instances. The below minimal example shows that it is not fine

import asyncio
from aiohttp_client_cache import CachedSession, SQLiteBackend

cache = SQLiteBackend(cache_name="./test_cache.sqlite")

async def task(n: int):
    url = f"https://httpbin.org/ip?count={n}"
    async with CachedSession(cache=cache) as session:
        return await session.get(url, ssl=False)
    
async def tasks(arr: list):
    tasks = [task(i) for i in arr]
    return await asyncio.gather(*tasks)


await tasks(list(range(100)))

It throws the following error

ProgrammingError: Cannot operate on a closed database.

agercas avatar Nov 06 '24 21:11 agercas

@agercas I have fixed your code and it works now with a single cache instance used by any amount of sessions:

import asyncio
from aiohttp_client_cache import CachedSession, SQLiteBackend

cache = SQLiteBackend(cache_name="./test_cache.sqlite")

async def task(n: int, session):
    url = f"https://httpbin.org/ip?count={n}"
    response = await session.get(url, ssl=False)
    print(response.headers)
    return response

async def tasks(sessions_count: int):
    sessions = [CachedSession(cache=cache) for _ in range(sessions_count)]
    tasks = [task(i, s) for i, s in zip(range(sessions_count), sessions)]
    responses = await asyncio.gather(*tasks)
    for s in sessions:
        await s.close()
    return responses

asyncio.run(tasks(1000))

Explanation

Exiting a CachedSession context manager leads to the closing the Cache object.

In your original code, you close CachedSession objects after a request, but you still have other unfinished/unclosed CachedSession objects that depend on the same Cache.

This is what happens in your code example:

  1. "Session 1": connected? -> no, connect to SQLite
  2. "Session 2": connected? -> yes, reuse connection
  3. "Session 1": make a request
  4. "Session 2": going to make a request...
  5. "Session 1": close SQLite connection
  6. "Session 2": making a request... ERROR!!! the cache was closed by another session.

To summarize, you are "shooting yourself in the foot" by creating a wrong and unrealistic use case when CachedSession objects are opened/closed concurrently per single request, while keeping the reference to the same cache object that is closed with a context manager.

You should review your application lifecycle and avoid using the Cache object after you closed the CachedSession where this cache was used.
If you really need to create multiple ClientSession/CachedSession in your application, create multiple cache objects or wait with closing a session while other sessions are still in use.

Conclusion

I think this can be closed as "nothing to fix" / "not planned". Let me know if I am missing anything.

On our side, we can think about adding a warning if the cache is accessed after closing the session context manager. This will prevent possible confusion for the end user.

alessio-locatelli avatar Nov 07 '24 09:11 alessio-locatelli

@alessio-locatelli thanks for the explanation, that makes sense

Short follow up, would recreating the cache with the same persistent directory also be fine? Even though it is more the question about sqlite rather than CachedSession

import random
import asyncio
from aiohttp_client_cache import CachedSession, SQLiteBackend

async def task(n: int):
    url = f"https://httpbin.org/ip?count={n}"
    cache = SQLiteBackend(cache_name="./test_cache.sqlite")
    async with CachedSession(cache=cache) as session:
        response = await session.get(url, ssl=False)
        await asyncio.sleep(random.uniform(1,3))

    return n
     
    
async def tasks(arr: list):
    tasks = [task(i) for i in arr]
    return await asyncio.gather(*tasks)


await tasks(list(range(200)))

Otherwise I agree that the problem is clear

agercas avatar Nov 07 '24 11:11 agercas

@alessio-locatelli Thanks for the detective work! 🕵️‍♂️

JWCook avatar Nov 07 '24 16:11 JWCook