Question: how to set a custom json_serializer?
Question: how to set a custom json_serializer? I have to store a datetime data in JSONB column, so I have to override json_serializer to take care of it. Is there any way? thanks
What database backend are you using? For asyncpg, I'm doing it with the init callback (https://magicstack.github.io/asyncpg/current/api/index.html#connection-pools) which you can pass in Database class kwargs.
async def asyncpg_init(connection: asyncpg.Connection) -> None:
await connection.set_type_codec('jsonb', encoder=json.dumps, decoder=json.loads, schema='pg_catalog')
database_connection = Database(
'postgresql://...,
init=asyncpg_init,
)
What database backend are you using? For asyncpg, I'm doing it with the
initcallback (https://magicstack.github.io/asyncpg/current/api/index.html#connection-pools) which you can pass inDatabaseclass kwargs.async def asyncpg_init(connection: asyncpg.Connection) -> None: await connection.set_type_codec('jsonb', encoder=json.dumps, decoder=json.loads, schema='pg_catalog') database_connection = Database( 'postgresql://..., init=asyncpg_init, )
thanks. that's what I'm using as well.
but it doesn't help. Still fails with: Object of type datetime is not JSON serializable
Model:
class Sample(Base):
__tablename__ = 'samples'
samp_id = Column(BIGINT, primary_key=True)
data = Column(JSONB(astext_type=sa.Text()))
database:
async def asyncpg_init(connection: asyncpg.Connection) -> None:
await connection.set_type_codec('jsonb', encoder=CustomJsonEncoder, decoder=json.loads, schema='pg_catalog')
database = databases.Database(settings.db_dsn, init=asyncpg_init)
but it doesn't help. Still fails with: Object of type datetime is not JSON serializable
@kamikaze have you figured it out?
If not, could you share the code of your CustomJsonEncoder?
but it doesn't help. Still fails with: Object of type datetime is not JSON serializable
@kamikaze have you figured it out? If not, could you share the code of your
CustomJsonEncoder?
Sure:
class CustomJsonEncoder(json.JSONEncoder):
def default(self, o):
if isinstance(o, datetime):
return o.isoformat()
elif isinstance(o, date):
return o.isoformat()
return json.JSONEncoder.default(self, o)
following doesn't help as well
async def asyncpg_init(connection: asyncpg.Connection) -> None:
await connection.set_type_codec('jsonb', encoder=partial(json.dumps, cls=CustomJSONEncoder), decoder=json.loads,
schema='pg_catalog')
database = databases.Database(settings.db_dsn, init=asyncpg_init)
also it seems it doesn't call this function when I'm trying to debug
following doesn't help as well
This method actually works for me
Given the following Postgres table:
CREATE TABLE public.my_table (
id int4 NOT NULL GENERATED ALWAYS AS IDENTITY,
ts timestamptz NULL,
ts_jsonb jsonb NULL
);
This code works fine with Python 3.10, databases 0.7.0, asyncpg 0.27.0
import asyncio
import json
from datetime import date, datetime, timezone
from functools import partial
import asyncpg
from databases import Database
class CustomJsonEncoder(json.JSONEncoder):
def default(self, o):
if isinstance(o, datetime):
return o.isoformat()
elif isinstance(o, date):
return o.isoformat()
return super.default(o)
async def asyncpg_init(connection: asyncpg.Connection) -> None:
await connection.set_type_codec(
"jsonb",
encoder=partial(json.dumps, cls=CustomJsonEncoder),
decoder=json.loads,
schema="pg_catalog",
)
async def main():
db = Database(
"postgresql+asyncpg://postgres@localhost/encode_databases_issue_504",
init=asyncpg_init,
)
await db.connect()
ts = datetime.now(timezone.utc)
await db.execute(
"insert into my_table (ts, ts_jsonb) values (:ts, :ts_jsonb)",
dict(ts=ts, ts_jsonb=ts),
)
data = await db.fetch_all("select * from my_table")
for row in data:
print([row[key] for key in row.keys()])
if __name__ == "__main__":
asyncio.run(main())
ok... I found the reason... I'm using sqlalchemy for query composition instead of raw sql strings... there is a way to override json_serializer when calling create_engine(), but I don't call it. Will try to find the solution... thanks :|
import sqlalchemy as sa
await database.execute(sa.insert(MyModel), values)
Just for reference.. I have this code. So database doesn't call its/asyncpg routine, but calls .values() for SA Insert object
I guess somehow I need to modify PG dialect so it would use custom json serializer
Could you provide a complete example for us to just copy-paste and run it?
Could you provide a complete example for us to just copy-paste and run it?
I have modified your example:
import asyncio
import json
from datetime import date, datetime, timezone
from functools import partial
import asyncpg
import sqlalchemy as sa
from databases import Database
from sqlalchemy import Column, MetaData, DateTime, BIGINT
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.sql.ddl import CreateColumn
class CustomJsonEncoder(json.JSONEncoder):
def default(self, o):
if isinstance(o, datetime):
return o.isoformat()
elif isinstance(o, date):
return o.isoformat()
return super.default(o)
async def asyncpg_init(connection: asyncpg.Connection) -> None:
await connection.set_type_codec(
"jsonb",
encoder=partial(json.dumps, cls=CustomJsonEncoder),
decoder=json.loads,
schema="pg_catalog",
)
metadata = MetaData()
Base = declarative_base(metadata=metadata)
@compiles(CreateColumn, 'postgresql')
def use_identity(element, compiler, **kw):
result = compiler.visit_create_column(element, **kw).replace('SERIAL', 'INT GENERATED BY DEFAULT AS IDENTITY')
return result.replace('BIGSERIAL', 'BIGINT GENERATED BY DEFAULT AS IDENTITY')
class MyTable(Base):
__tablename__ = 'my_table'
id = Column(BIGINT, primary_key=True)
ts = Column(DateTime(timezone=True), nullable=False)
ts_jsonb = Column(JSONB, nullable=False)
async def main():
db = Database(
"postgresql+asyncpg://postgres@localhost/encode_databases_issue_504",
init=asyncpg_init,
)
await db.connect()
ts = datetime.now(timezone.utc)
await db.execute(
sa.insert(MyTable),
dict(ts=ts, ts_jsonb=ts)
)
data = await db.fetch_all("select * from my_table")
for row in data:
print([row[key] for key in row.keys()])
if __name__ == "__main__":
asyncio.run(main())
Have the same problem :(
With asyncpg + sqlalchemy - error occurs on sqlalchemy.dialect side, so I need to pass some parameters (in my case json_serializer) to databases.backends.postgres.PostgresBackend._get_dialect, but currently its not supported by databases itself.