duckdb_engine icon indicating copy to clipboard operation
duckdb_engine copied to clipboard

[Bug]: session.execute with JSON column returns JSON-encoded string not JSON object

Open criccomini opened this issue 2 years ago • 9 comments

What happened?

Follow on to #504. I noticed that this query still fails with latest patch:

rows = session.execute(
    f"SELECT metadata FROM catalog WHERE {query}"
).fetchall()

In PG, row[0] is a dict. In duckdb-engine @ f4e900d7475f0fd901db89a6a3163559ee4fcc68, it returns a JSON-encoded string.

DuckDB Engine Version

0.6.7 @ f4e900d7475f0fd901db89a6a3163559ee4fcc68

DuckDB Version

0.6.1

Relevant log output

No response

Code of Conduct

  • [X] I agree to follow this project's Code of Conduct

criccomini avatar Jan 08 '23 18:01 criccomini

Okay yeah, this situation will require the fix I mentioned in https://github.com/Mause/duckdb_engine/issues/504#issuecomment-1374787893 unfortunately, which will mean a using a dev version of DuckDB or waiting for the next release

Alternatively it might be possible to ask sqlalchemy to deserialize a raw query into a model, not sure if that fits your use case

Mause avatar Jan 09 '23 00:01 Mause

Aw, dang.

Yea, I can work around using the ORM for now. Thanks anyway. =) Do you happen to know if there's an issue opened on duckdb?

criccomini avatar Jan 09 '23 00:01 criccomini

Interestingly, it appears SQLite behaves like duckdb-engine. I wonder if Postgres is behaving differently because it's using JSONB instead of JSON as the column type...

criccomini avatar Jan 09 '23 00:01 criccomini

In case anyone else comes across this, you can use a query like this with the ORM:

rows = session.execute(select(CatalogEntry.metadata_).where(
    text(query)
)).fetchall() or []

To get around the string vs. JSON object issue.

criccomini avatar Jan 09 '23 00:01 criccomini

Okay yeah, this situation will require the fix I mentioned in #504 (comment) unfortunately, which will mean a using a dev version of DuckDB or waiting for the next release

Alternatively it might be possible to ask sqlalchemy to deserialize a raw query into a model, not sure if that fits your use case

Now that duckdb 0.10.0 is here it it possible for duckdb_engine to return a dict for JSON columns?

dhirschfeld avatar Feb 14 '24 02:02 dhirschfeld

I've been looking into it yeah, and have a hacky draft pr up

Mause avatar Feb 14 '24 03:02 Mause

One particularity of my situation is that I have a custom encoder/decoder to serialise/deserialise my data. Hopefully it will be possible to provide a custom decoder to correctly deserialise the data in my database 🙏

dhirschfeld avatar Feb 14 '24 08:02 dhirschfeld

@dhirschfeld I would expect that to mostly work already, can you give an example where it doesn't?

Mause avatar Feb 14 '24 08:02 Mause

Hmm, ok. I think it was user error on my part. Using the ORM classes successfully roundtrips a Python dictionary (including using custom serializers). In case it's otherwise useful I've included my test case below:

MCVE
import os
import json
from datetime import datetime as DateTime
from functools import partial
from typing import (
    Any,
    Optional,
    TypeAlias,
)

import sqlalchemy as sa
from sqlalchemy.sql.compiler import SQLCompiler
from sqlalchemy.dialects.postgresql import (
    JSONB,
)
from sqlalchemy.orm import (
    DeclarativeBase,
    declared_attr,
    Mapped,
    mapped_column,
    MappedAsDataclass,
    relationship,
)
from typing_extensions import Annotated


SCHEMA = 'test'

float64: TypeAlias = sa.Double
str32 = Annotated[str, 32]
str64 = Annotated[str, 64]
str128 = Annotated[str, 128]
str256 = Annotated[str, 256]


class Base(MappedAsDataclass, DeclarativeBase):

    @declared_attr.directive
    def __tablename__(cls) -> str:
        return cls.__name__

    type_annotation_map = {
        dict[str, Any]: sa.JSON().with_variant(JSONB(), "postgresql"),
        float64: sa.Double,
        str32: sa.String(32),
        str64: sa.String(64),
        str128: sa.String(128),
        str256: sa.String(256),
    }

    insert_timestamp: Mapped[DateTime] = mapped_column(
        sa.DateTime(timezone=True),
        init=False,
        nullable=False,
        server_default=sa.func.current_timestamp(),
    )


input_arguments_pkid = sa.Sequence('input_arguments_pkid', schema=SCHEMA)


class InputArguments(Base):
    __table_args__ = (
        dict(schema=SCHEMA),
    )
    pkid: Mapped[int] = mapped_column(
        input_arguments_pkid,
        init=False,
        server_default=input_arguments_pkid.next_value(),
        primary_key=True,
    )
    kwargs: Mapped[Optional[dict[str, Any]]]


run_record_pkid = sa.Sequence('run_record_pkid', schema=SCHEMA)
kwargs_fk = Annotated[
    int,
    mapped_column(sa.ForeignKey(InputArguments.pkid)),
]


class RunRecord(Base):
    __table_args__ = (
        dict(schema=SCHEMA),
    )
    pkid: Mapped[int] = mapped_column(
        run_record_pkid,
        init=False,
        server_default=run_record_pkid.next_value(),
        primary_key=True,
    )
    input_arguments_id: Mapped[kwargs_fk] = mapped_column(init=False)
    input_arguments: Mapped[InputArguments] = relationship()


@sa.event.listens_for(Base.metadata, "before_create")
def create_schema(target, connection, **kw):
    connection.execute(sa.DDL(f"CREATE SCHEMA IF NOT EXISTS {SCHEMA}"))


def enc_hook(obj: Any) -> dict:
    if type(obj) is DateTime:
        return dict(type='DateTime', value=obj.isoformat())
    else:
        raise NotImplementedError(f"Objects of type {obj!r} are not supported")


def dec_hook(obj: Any) -> Any:
    match obj:
        case {'type': 'DateTime'}:
            return DateTime.fromisoformat(obj['value'])
        case _:
            return obj


engine = sa.create_engine(
    "duckdb:///:memory:",
    json_serializer=partial(json.dumps, default=enc_hook),
    json_deserializer=partial(json.loads, object_hook=dec_hook),
    echo=True,
)

Base.metadata.create_all(bind=engine)

kwargs = dict(a=1, b='string', c=DateTime.now())
record = RunRecord(
    input_arguments=InputArguments(kwargs=kwargs)
)
with sa.orm.Session(engine, expire_on_commit=False) as session:
    session.add(record)
    session.commit()


query = (
    sa.select(RunRecord)
    .options(
        sa.orm.joinedload("*"),
    )
    .filter_by(pkid=record.pkid)
)
with sa.orm.Session(engine) as session:
    record_1 = session.scalars(query).unique().one()


assert kwargs == dict(**(record_1.input_arguments.kwargs or {}))

dhirschfeld avatar Feb 19 '24 14:02 dhirschfeld