picologging
picologging copied to clipboard
Picologging leaks memory when used in an ASGI Framework
Hi, I've been using picologging
in an ASGI Framework called Starlite
. It looks like picologging
creates a memory leak when used in this framework. If I switch to the Python stdlib logging
module, the memory leak goes away.
I'm providing a reproducible example below. The script makes a request to an external API, validates the response data and then saves it to Postgres. I removed the code to save to postgres because all that's needed to exacerbate the memory leak is a logging statement in that function. This also helps showcase the issue since nothing is being done with the data.
To run this, you just need a REDIS
and Postgres
connection string, and the following dependancies:
pip install ...
- starlite[full]
- saq
- uvicorn
- sqlalchemy
- python-dotenv
- httpx
Assuming the python file is called main
, you can run the script in terminal with: uvicorn main:app
And you'll see memory usage climb quickly within a few minutes. If you switch out picologging
for standard Python logging
module, the memory leak goes away.
import asyncio
import datetime
import os
import httpx
import picologging
import uvicorn
from dotenv import load_dotenv
from saq import CronJob, Queue, Worker
from sqlalchemy import Column, UniqueConstraint, create_engine
from sqlalchemy.dialects.postgresql import FLOAT, INTEGER, JSONB, TEXT
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from sqlalchemy.orm import declarative_base
from sqlalchemy.types import TIMESTAMP
from starlite import DTOFactory, LoggingConfig, Starlite, get
from starlite.plugins.sql_alchemy import (
SQLAlchemyConfig,
SQLAlchemyPlugin,
SQLAlchemySessionConfig,
)
########################################################
#################### ENV & LOGGING #####################
load_dotenv()
DATABASE_URL = os.getenv("DATABASE_URL")
REDIS_URL = os.getenv("REDIS_URL")
logging_config = LoggingConfig(
loggers={
"app": {
"level": "DEBUG",
"handlers": ["queue_listener"],
"propagate": False,
}
}
)
logger = picologging.getLogger("app")
########################################################
######## SQLALCHEMY CONFIG & DB MODELS ########
Base = declarative_base()
engine = create_engine(url=DATABASE_URL, echo=True, echo_pool=True)
async_engine = create_async_engine(
url=DATABASE_URL,
future=True,
echo=False,
echo_pool=True,
max_overflow=10,
pool_size=5,
pool_timeout=30,
pool_recycle=300,
pool_pre_ping=True,
pool_use_lifo=True, # use lifo to reduce the number of idle connections
)
async_session_factory: async_sessionmaker[AsyncSession] = async_sessionmaker(
async_engine, expire_on_commit=True, autoflush=True
)
session_config = SQLAlchemySessionConfig(expire_on_commit=True, autoflush=True)
sqlalchemyconfig = SQLAlchemyConfig(
dependency_key="async_session",
engine_instance=async_engine,
session_config=session_config,
session_maker_app_state_key="db_session_maker",
)
dto_factory = DTOFactory(
plugins=[
SQLAlchemyPlugin(
config=sqlalchemyconfig,
)
]
)
async def upsert_sql(
model,
objs: list,
conflict_mode: str = "do_nothing",
index_elements: list = None,
constraint: str = None,
) -> None:
# I've removed the actual code to save to SQL here, because...
logger.debug("Simply logging in this function causes a big memory leak")
class LUNCValidator(Base):
__tablename__ = "lunc_validators"
uid = Column(INTEGER, primary_key=True, nullable=False)
last_updated = Column(TIMESTAMP(timezone=True), onupdate=datetime.datetime.now)
operatorAddress = Column(TEXT, nullable=False)
tokens = Column(FLOAT, nullable=False)
delegatorShares = Column(FLOAT, nullable=False)
upTime = Column(FLOAT, nullable=False)
status = Column(TEXT, nullable=False)
accountAddress = Column(TEXT, nullable=False)
description = Column(JSONB, nullable=False)
votingPower = Column(JSONB, nullable=False)
commissionInfo = Column(JSONB, nullable=False)
rewardsPool = Column(JSONB, nullable=False)
selfDelegation = Column(JSONB, nullable=False)
__table_args__ = (UniqueConstraint("operatorAddress"),)
DTO_LUNCVALIDATOR = dto_factory("DTO_LUNCVALIDATOR", LUNCValidator, exclude=["uid"])
Base.metadata.create_all(engine)
########################################################
############ Reusable HTTPX Requests Func ##############
async def fetcha(
url: str,
params: dict = None,
) -> None:
async with httpx.AsyncClient() as client:
resp = await client.get(url=url, params=params)
results = resp.json()
return results
########################################################
######## REQUEST JSON AND STORE TO SQL ################
async def make_request(ctx) -> None:
logger.info("Starting to make request")
url = "https://fcd.terrarebels.net/v1/staking/validators"
data = await fetcha(url=url)
logger.info(f"{len(data)} results in response")
index_elements = ["operatorAddress"]
# Validate data using Starlite DTO's (Pydantic behind the scenes)
dtos = [DTO_LUNCVALIDATOR(**i) for i in data]
# Save to SQL Function
# Comment out this function and the memory leak becomes smaller... no idea why.
await upsert_sql(
LUNCValidator,
dtos,
conflict_mode="update",
index_elements=index_elements,
)
## SAQ CONFIG: SCHEDULES REQUEST EVERY 10 SECONDS
queue = Queue.from_url(REDIS_URL)
tb = CronJob(make_request, cron="* * * * * */10") # Every 10 seconds
worker = Worker(queue=queue, functions=[], cron_jobs=[tb])
async def tasks_on_startup() -> None:
logger.warning("Starting SAQ worker")
asyncio.create_task(worker.start())
asyncio.create_task(worker.schedule())
logger.info("SAQ started and tasks scheduled")
async def tasks_on_shutdown() -> None:
logger.warning("Stopping SAQ worker")
asyncio.create_task(worker.abort())
asyncio.create_task(worker.stop())
logger.info("SAQ worker should be dead")
######## BASIC STARLITE APP #########
@get("/")
def hello_world() -> dict[str, str]:
"""Handler function that returns a greeting dictionary."""
return {"hello": "world"}
app = Starlite(
route_handlers=[hello_world],
on_startup=[tasks_on_startup],
on_shutdown=[tasks_on_shutdown],
logging_config=logging_config,
debug=True,
)
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)