picologging icon indicating copy to clipboard operation
picologging copied to clipboard

Picologging leaks memory when used in an ASGI Framework

Open ThinksFast opened this issue 1 year ago • 9 comments

Hi, I've been using picologging in an ASGI Framework called Starlite. It looks like picologging creates a memory leak when used in this framework. If I switch to the Python stdlib logging module, the memory leak goes away.

I'm providing a reproducible example below. The script makes a request to an external API, validates the response data and then saves it to Postgres. I removed the code to save to postgres because all that's needed to exacerbate the memory leak is a logging statement in that function. This also helps showcase the issue since nothing is being done with the data.

To run this, you just need a REDIS and Postgres connection string, and the following dependancies:

pip install ...

  • starlite[full]
  • saq
  • uvicorn
  • sqlalchemy
  • python-dotenv
  • httpx

Assuming the python file is called main, you can run the script in terminal with: uvicorn main:app

And you'll see memory usage climb quickly within a few minutes. If you switch out picologging for standard Python logging module, the memory leak goes away.

import asyncio
import datetime
import os

import httpx
import picologging
import uvicorn
from dotenv import load_dotenv
from saq import CronJob, Queue, Worker
from sqlalchemy import Column, UniqueConstraint, create_engine
from sqlalchemy.dialects.postgresql import FLOAT, INTEGER, JSONB, TEXT
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from sqlalchemy.orm import declarative_base
from sqlalchemy.types import TIMESTAMP
from starlite import DTOFactory, LoggingConfig, Starlite, get
from starlite.plugins.sql_alchemy import (
    SQLAlchemyConfig,
    SQLAlchemyPlugin,
    SQLAlchemySessionConfig,
)

########################################################
#################### ENV & LOGGING #####################
load_dotenv()

DATABASE_URL = os.getenv("DATABASE_URL")
REDIS_URL = os.getenv("REDIS_URL")

logging_config = LoggingConfig(
    loggers={
        "app": {
            "level": "DEBUG",
            "handlers": ["queue_listener"],
            "propagate": False,
        }
    }
)

logger = picologging.getLogger("app")


########################################################
######## SQLALCHEMY CONFIG & DB MODELS  ########
Base = declarative_base()
engine = create_engine(url=DATABASE_URL, echo=True, echo_pool=True)

async_engine = create_async_engine(
    url=DATABASE_URL,
    future=True,
    echo=False,
    echo_pool=True,
    max_overflow=10,
    pool_size=5,
    pool_timeout=30,
    pool_recycle=300,
    pool_pre_ping=True,
    pool_use_lifo=True,  # use lifo to reduce the number of idle connections
)

async_session_factory: async_sessionmaker[AsyncSession] = async_sessionmaker(
    async_engine, expire_on_commit=True, autoflush=True
)

session_config = SQLAlchemySessionConfig(expire_on_commit=True, autoflush=True)

sqlalchemyconfig = SQLAlchemyConfig(
    dependency_key="async_session",
    engine_instance=async_engine,
    session_config=session_config,
    session_maker_app_state_key="db_session_maker",
)

dto_factory = DTOFactory(
    plugins=[
        SQLAlchemyPlugin(
            config=sqlalchemyconfig,
        )
    ]
)


async def upsert_sql(
    model,
    objs: list,
    conflict_mode: str = "do_nothing",
    index_elements: list = None,
    constraint: str = None,
) -> None:

    # I've removed the actual code to save to SQL here, because...
    logger.debug("Simply logging in this function causes a big memory leak")


class LUNCValidator(Base):
    __tablename__ = "lunc_validators"

    uid = Column(INTEGER, primary_key=True, nullable=False)
    last_updated = Column(TIMESTAMP(timezone=True), onupdate=datetime.datetime.now)
    operatorAddress = Column(TEXT, nullable=False)
    tokens = Column(FLOAT, nullable=False)
    delegatorShares = Column(FLOAT, nullable=False)
    upTime = Column(FLOAT, nullable=False)
    status = Column(TEXT, nullable=False)
    accountAddress = Column(TEXT, nullable=False)
    description = Column(JSONB, nullable=False)
    votingPower = Column(JSONB, nullable=False)
    commissionInfo = Column(JSONB, nullable=False)
    rewardsPool = Column(JSONB, nullable=False)
    selfDelegation = Column(JSONB, nullable=False)

    __table_args__ = (UniqueConstraint("operatorAddress"),)


DTO_LUNCVALIDATOR = dto_factory("DTO_LUNCVALIDATOR", LUNCValidator, exclude=["uid"])

Base.metadata.create_all(engine)

########################################################
############ Reusable HTTPX Requests Func ##############


async def fetcha(
    url: str,
    params: dict = None,
) -> None:
    async with httpx.AsyncClient() as client:
        resp = await client.get(url=url, params=params)

    results = resp.json()
    return results


########################################################
######## REQUEST JSON AND STORE TO SQL ################
async def make_request(ctx) -> None:
    logger.info("Starting to make request")

    url = "https://fcd.terrarebels.net/v1/staking/validators"
    data = await fetcha(url=url)

    logger.info(f"{len(data)} results in response")

    index_elements = ["operatorAddress"]

    # Validate data using Starlite DTO's (Pydantic behind the scenes)
    dtos = [DTO_LUNCVALIDATOR(**i) for i in data]

    # Save to SQL Function
    # Comment out this function and the memory leak becomes smaller... no idea why.
    await upsert_sql(
        LUNCValidator,
        dtos,
        conflict_mode="update",
        index_elements=index_elements,
    )


## SAQ CONFIG: SCHEDULES REQUEST EVERY 10 SECONDS

queue = Queue.from_url(REDIS_URL)

tb = CronJob(make_request, cron="* * * * * */10")  # Every 10 seconds
worker = Worker(queue=queue, functions=[], cron_jobs=[tb])


async def tasks_on_startup() -> None:
    logger.warning("Starting SAQ worker")

    asyncio.create_task(worker.start())
    asyncio.create_task(worker.schedule())

    logger.info("SAQ started and tasks scheduled")


async def tasks_on_shutdown() -> None:
    logger.warning("Stopping SAQ worker")

    asyncio.create_task(worker.abort())
    asyncio.create_task(worker.stop())

    logger.info("SAQ worker should be dead")


######## BASIC STARLITE APP #########


@get("/")
def hello_world() -> dict[str, str]:
    """Handler function that returns a greeting dictionary."""
    return {"hello": "world"}


app = Starlite(
    route_handlers=[hello_world],
    on_startup=[tasks_on_startup],
    on_shutdown=[tasks_on_shutdown],
    logging_config=logging_config,
    debug=True,
)

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)

ThinksFast avatar Jun 13 '23 21:06 ThinksFast