piccolo icon indicating copy to clipboard operation
piccolo copied to clipboard

Parsing Custom columns

Open Jacky56 opened this issue 1 year ago • 7 comments

With the rise of storing vectors and very unique data structures, the base columns from piccolo.columns is simply not good enough to express these datatypes for our DBs.

In my example I am creating a custom column type called Vector:

from piccolo.columns import Column

class Vector(Column):
    def __init__(self, length: int, *args,  **kwargs):
        self.length = length
        super().__init__(*args, **kwargs)

    @property
    def column_type(self):
        return f"VECTOR({self.length})"

Now every time I try to save the vector, I must serialise it as such:

Table.vector = a_serialiser_method(Table.vector)
Table.save()

Now because Table.vector = a_serialiser_method(Table.vector) creates side effects, I must unserialise it to the python type.

This also applies to fetching the vector, I must deserialise it to the python equalivent:

rows = Table.objects().all()
for row in rows:
  row.vector = a_deserialiser_method(row.vector)

Question

Is there a way to simply add a default serialising/deserialising behaviour for custom columns?

Suggestion

extend Column so that we add default serialiser/deserialiser methods to translate python types to db types and vice versa such as:

from piccolo.columns import Column

class Vector(Column):
    def __init__(self, length: int, *args,  **kwargs):
        self.length = length
        super().__init__(*args, **kwargs)

    @property
    def column_type(self):
        return f"VECTOR({self.length})"

   def serializer(self): # suggestion 1
       """a method to translate the python datatype to database column type"""
       self.value = value.tostring()
       return self
 
  def deserializer(self): # suggestion 2
       """a method to translate the database column type to python datatype"""
       self.value =  self.value.tolist()
       return self

and these serializer and deserializer are pre hooks when calling methods such as .todict() or .save()/writing to db. These methods should also not generate side effects.

Jacky56 avatar Oct 26 '24 22:10 Jacky56

In my case, pgvector needs the vector to be a string such as '[1,2,3]' but in the python code it should be accessed as a iterable.

Jacky56 avatar Oct 26 '24 22:10 Jacky56

In my case, pgvector needs the vector to be a string such as '[1,2,3]' but in the python code it should be accessed as a iterable.

@Jacky56 I don't know if this helps, but you can use the json module to transform '[1,2,3]' into an iterable (list) like this

import json

embeddings = '[1,2,3]'
results = json.loads(embeddings)
print(type(results))  # <class 'list'>

You can also look in this discussion. If Piccolo doesn't support a feature, you can always just use raw sql to get the desired result. Hope this helps.

sinisaos avatar Oct 27 '24 09:10 sinisaos

hello @sinisaos the suggested discussion is what I'm currently doing, but constantly using Table.raw to resolve most uncovered query needs kinda defeats the purpose of the ORM existing.

When I grab an object from the database, the python data structure is as shows:

obj = Table.objects().first()
obj.embedding # piccolo resolves this as a string when converting the custom column

Since its a custom column, therefore the dev should have the ability to edit the behaviour on how the column is serialised to python and deserialised as a sql statement.

which that I suspect a behaviour such as:

obj = Table.objects().first()
obj.embedding # piccolo resolves this as `np.ndarray` as the developer specified how it should be serialised from the defined method on `Column`

Jacky56 avatar Oct 27 '24 19:10 Jacky56

@Jacky56 I don't know if this helps, but you can use the methods of the table class to serialize/deserialize. Try something like this for serialization/deserialization data.

Example script
import asyncio
import json

import numpy as np
from piccolo.columns import Column, Varchar
from piccolo.engine import engine_finder
from piccolo.engine.postgres import PostgresEngine
from piccolo.table import Table

DB = PostgresEngine(
    config={
        "database": "vectordb",
        "user": "postgres",
        "password": "postgres",
        "host": "localhost",
        "port": 5432,
    }
)


# Custom vector column
class Vector(Column):
    def __init__(self, dimensions: int, *args, **kwargs):
        self.dimensions = dimensions
        super().__init__(*args, **kwargs)

    @property
    def column_type(self):
        return f"VECTOR({self.dimensions})"


# Create a JSON Encoder class
class json_serialize(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, np.ndarray):
            return obj.tolist()
        return json.JSONEncoder.default(self, obj)


class Task(Table, db=DB):
    """
    An example table.
    """

    name = Varchar()
    embeddings = Vector(dimensions=3)

    # use class mmethods for serialization
    @classmethod
    def serializer(cls, value):  # suggestion 1
        """a method to translate the python datatype to database column type"""
        if isinstance(value, np.ndarray):
            return json.dumps(value, cls=json_serialize)
        else:
            return json.dumps(value)

    @classmethod
    def deserializer(cls, value):  # suggestion 2
        """a method to translate the database column type to python datatype"""
        return json.loads(value)


async def open_database_connection_pool():
    try:
        engine = engine_finder()
        await engine.start_connection_pool()
    except Exception:
        print("Unable to connect to the database")


async def close_database_connection_pool():
    try:
        engine = engine_finder()
        await engine.close_connection_pool()
    except Exception:
        print("Unable to connect to the database")


async def main():
    # create table
    await Task.create_table(if_not_exists=True)

    embedding = np.array([1, 2, 3])  # or [1, 2, 3]
    # insert data using serializer
    task = Task(name="Task 1", embeddings=Task.serializer(embedding))
    await task.save()
    # retrive data using deserializer
    obj = await Task.objects()
    for i in obj:
        print(Task.deserializer(i.embeddings))


if __name__ == "__main__":

    asyncio.run(main())

Sorry if I missed your point and don't understand your needs.

sinisaos avatar Oct 28 '24 08:10 sinisaos

Thank you for your reply @sinisaos but its not quite whats desired.

The thing I am describing actually exist in the code here Column.get_sql_value, This translate the python type to the sql equivalent

as we can see here for example, it translates the python list to the psql array syntax.

It would be nice to somehow override this method.

Jacky56 avatar Oct 29 '24 00:10 Jacky56

For those who still struggled with this problem: derive JSON instead of Column will do the trick:

from typing import Unpack, override

from piccolo.columns import JSON, Serial
from piccolo.columns.base import ColumnKwargs
from piccolo.table import Table


class Vector(JSON):
    def __init__(
        self,
        *,
        dim: int = 1536,
        **kwargs: Unpack[ColumnKwargs],
    ) -> None:
        super().__init__(default=None, **kwargs)
        self.dim = dim


class DenseVector(Vector):
    @property
    @override
    def column_type(self):  # type: ignore
        return f"VECTOR({self.dim})"


class SparseVector(Vector):
    @property
    @override
    def column_type(self):  # type: ignore
        return f"SPARSEVEC({self.dim})"


class VectorTable(Table):
    id: Serial = Serial(primary=True)
    dense: DenseVector = DenseVector()


VectorTable.insert(VectorTable(dense=[0] * 1536)).run_sync()
tables = VectorTable.select().output(load_json=True).run_sync()

observerw avatar Jun 14 '25 09:06 observerw

For those who still struggled in this problem: derive JSON instead of Column suffices:

from typing import Unpack, override

from piccolo.columns import JSON, Serial from piccolo.columns.base import ColumnKwargs from piccolo.table import Table

class Vector(JSON): def init( self, *, dim: int = 1536, **kwargs: Unpack[ColumnKwargs], ) -> None: super().init(default=None, **kwargs) self.dim = dim

class DenseVector(Vector): @property @override def column_type(self): # type: ignore return f"VECTOR({self.dim})"

class SparseVector(Vector): @property @override def column_type(self): # type: ignore return f"SPARSEVEC({self.dim})"

class VectorTable(Table): id: Serial = Serial(primary=True) dense: DenseVector = DenseVector()

VectorTable.insert(VectorTable(dense=[0] * 1536)).run_sync() tables = VectorTable.select().output(load_json=True).run_sync()

Also, since pgvector requires a special format for sparse vector, you may also need this:

import json
import torch

def sparsify(vec: torch.Tensor) -> str:
    indices = vec.nonzero(as_tuple=True)[0].tolist()
    values = vec[indices].tolist()
    # SQL arrays start at 1
    value = {index + 1: value for index, value in zip(indices, values)}

    return f"{value}/{len(vec)}"


def desparsify(sparse_vec: str) -> torch.Tensor:
    value, dimensions = sparse_vec.split("/")
    dimensions = int(dimensions)
    value = json.loads(value)

    vec = torch.zeros(dimensions, dtype=torch.float32)
    for index, val in value.items():
        # SQL arrays start at 1
        vec[index - 1] = val

    return vec

observerw avatar Jun 14 '25 10:06 observerw