databases icon indicating copy to clipboard operation
databases copied to clipboard

Question: how to set a custom json_serializer?

Open kamikaze opened this issue 3 years ago • 14 comments

Question: how to set a custom json_serializer? I have to store a datetime data in JSONB column, so I have to override json_serializer to take care of it. Is there any way? thanks

kamikaze avatar Aug 04 '22 21:08 kamikaze

What database backend are you using? For asyncpg, I'm doing it with the init callback (https://magicstack.github.io/asyncpg/current/api/index.html#connection-pools) which you can pass in Database class kwargs.

async def asyncpg_init(connection: asyncpg.Connection) -> None:
    await connection.set_type_codec('jsonb', encoder=json.dumps, decoder=json.loads, schema='pg_catalog')

database_connection = Database(
    'postgresql://...,
    init=asyncpg_init,
)

laukhin avatar Aug 08 '22 13:08 laukhin

What database backend are you using? For asyncpg, I'm doing it with the init callback (https://magicstack.github.io/asyncpg/current/api/index.html#connection-pools) which you can pass in Database class kwargs.

async def asyncpg_init(connection: asyncpg.Connection) -> None:
    await connection.set_type_codec('jsonb', encoder=json.dumps, decoder=json.loads, schema='pg_catalog')

database_connection = Database(
    'postgresql://...,
    init=asyncpg_init,
)

thanks. that's what I'm using as well.

kamikaze avatar Aug 09 '22 02:08 kamikaze

but it doesn't help. Still fails with: Object of type datetime is not JSON serializable

Model:

class Sample(Base):
    __tablename__ = 'samples'

    samp_id = Column(BIGINT, primary_key=True)
    data = Column(JSONB(astext_type=sa.Text()))

database:

async def asyncpg_init(connection: asyncpg.Connection) -> None:
    await connection.set_type_codec('jsonb', encoder=CustomJsonEncoder, decoder=json.loads, schema='pg_catalog')


database = databases.Database(settings.db_dsn, init=asyncpg_init)

kamikaze avatar Aug 22 '22 13:08 kamikaze

but it doesn't help. Still fails with: Object of type datetime is not JSON serializable

@kamikaze have you figured it out? If not, could you share the code of your CustomJsonEncoder?

anorlovsky avatar Sep 19 '22 14:09 anorlovsky

but it doesn't help. Still fails with: Object of type datetime is not JSON serializable

@kamikaze have you figured it out? If not, could you share the code of your CustomJsonEncoder?

Sure:

class CustomJsonEncoder(json.JSONEncoder):
    def default(self, o):
        if isinstance(o, datetime):
            return o.isoformat()
        elif isinstance(o, date):
            return o.isoformat()

        return json.JSONEncoder.default(self, o)

kamikaze avatar Nov 23 '22 16:11 kamikaze

following doesn't help as well

async def asyncpg_init(connection: asyncpg.Connection) -> None:
    await connection.set_type_codec('jsonb', encoder=partial(json.dumps, cls=CustomJSONEncoder), decoder=json.loads,
                                    schema='pg_catalog')

database = databases.Database(settings.db_dsn, init=asyncpg_init)

kamikaze avatar Jan 03 '23 15:01 kamikaze

also it seems it doesn't call this function when I'm trying to debug

kamikaze avatar Jan 03 '23 16:01 kamikaze

following doesn't help as well

This method actually works for me

Given the following Postgres table:

CREATE TABLE public.my_table (
	id int4 NOT NULL GENERATED ALWAYS AS IDENTITY,
	ts timestamptz NULL,
	ts_jsonb jsonb NULL
);

This code works fine with Python 3.10, databases 0.7.0, asyncpg 0.27.0

import asyncio
import json
from datetime import date, datetime, timezone
from functools import partial

import asyncpg
from databases import Database


class CustomJsonEncoder(json.JSONEncoder):
    def default(self, o):
        if isinstance(o, datetime):
            return o.isoformat()
        elif isinstance(o, date):
            return o.isoformat()

        return super.default(o)


async def asyncpg_init(connection: asyncpg.Connection) -> None:
    await connection.set_type_codec(
        "jsonb",
        encoder=partial(json.dumps, cls=CustomJsonEncoder),
        decoder=json.loads,
        schema="pg_catalog",
    )


async def main():
    db = Database(
        "postgresql+asyncpg://postgres@localhost/encode_databases_issue_504",
        init=asyncpg_init,
    )
    await db.connect()

    ts = datetime.now(timezone.utc)

    await db.execute(
        "insert into my_table (ts, ts_jsonb) values (:ts, :ts_jsonb)",
        dict(ts=ts, ts_jsonb=ts),
    )

    data = await db.fetch_all("select * from my_table")
    for row in data:
        print([row[key] for key in row.keys()])


if __name__ == "__main__":
    asyncio.run(main())

anorlovsky avatar Jan 03 '23 16:01 anorlovsky

ok... I found the reason... I'm using sqlalchemy for query composition instead of raw sql strings... there is a way to override json_serializer when calling create_engine(), but I don't call it. Will try to find the solution... thanks :|

kamikaze avatar Jan 03 '23 17:01 kamikaze

import sqlalchemy as sa

await database.execute(sa.insert(MyModel), values)

Just for reference.. I have this code. So database doesn't call its/asyncpg routine, but calls .values() for SA Insert object

kamikaze avatar Jan 03 '23 17:01 kamikaze

I guess somehow I need to modify PG dialect so it would use custom json serializer

kamikaze avatar Jan 03 '23 17:01 kamikaze

Could you provide a complete example for us to just copy-paste and run it?

anorlovsky avatar Jan 03 '23 17:01 anorlovsky

Could you provide a complete example for us to just copy-paste and run it?

I have modified your example:

import asyncio
import json
from datetime import date, datetime, timezone
from functools import partial

import asyncpg
import sqlalchemy as sa
from databases import Database
from sqlalchemy import Column, MetaData, DateTime, BIGINT
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.sql.ddl import CreateColumn


class CustomJsonEncoder(json.JSONEncoder):
    def default(self, o):
        if isinstance(o, datetime):
            return o.isoformat()
        elif isinstance(o, date):
            return o.isoformat()

        return super.default(o)


async def asyncpg_init(connection: asyncpg.Connection) -> None:
    await connection.set_type_codec(
        "jsonb",
        encoder=partial(json.dumps, cls=CustomJsonEncoder),
        decoder=json.loads,
        schema="pg_catalog",
    )


metadata = MetaData()
Base = declarative_base(metadata=metadata)


@compiles(CreateColumn, 'postgresql')
def use_identity(element, compiler, **kw):
    result = compiler.visit_create_column(element, **kw).replace('SERIAL', 'INT GENERATED BY DEFAULT AS IDENTITY')

    return result.replace('BIGSERIAL', 'BIGINT GENERATED BY DEFAULT AS IDENTITY')


class MyTable(Base):
    __tablename__ = 'my_table'

    id = Column(BIGINT, primary_key=True)
    ts = Column(DateTime(timezone=True), nullable=False)
    ts_jsonb = Column(JSONB, nullable=False)


async def main():
    db = Database(
        "postgresql+asyncpg://postgres@localhost/encode_databases_issue_504",
        init=asyncpg_init,
    )
    await db.connect()

    ts = datetime.now(timezone.utc)

    await db.execute(
        sa.insert(MyTable),
        dict(ts=ts, ts_jsonb=ts)
    )

    data = await db.fetch_all("select * from my_table")
    for row in data:
        print([row[key] for key in row.keys()])


if __name__ == "__main__":
    asyncio.run(main())

kamikaze avatar Jan 03 '23 18:01 kamikaze

Have the same problem :(

With asyncpg + sqlalchemy - error occurs on sqlalchemy.dialect side, so I need to pass some parameters (in my case json_serializer) to databases.backends.postgres.PostgresBackend._get_dialect, but currently its not supported by databases itself.

komarovf avatar Mar 10 '23 15:03 komarovf