Parsing Custom columns
With the rise of storing vectors and very unique data structures, the base columns from piccolo.columns is simply not good enough to express these datatypes for our DBs.
In my example I am creating a custom column type called Vector:
from piccolo.columns import Column
class Vector(Column):
def __init__(self, length: int, *args, **kwargs):
self.length = length
super().__init__(*args, **kwargs)
@property
def column_type(self):
return f"VECTOR({self.length})"
Now every time I try to save the vector, I must serialise it as such:
Table.vector = a_serialiser_method(Table.vector)
Table.save()
Now because Table.vector = a_serialiser_method(Table.vector) creates side effects, I must unserialise it to the python type.
This also applies to fetching the vector, I must deserialise it to the python equalivent:
rows = Table.objects().all()
for row in rows:
row.vector = a_deserialiser_method(row.vector)
Question
Is there a way to simply add a default serialising/deserialising behaviour for custom columns?
Suggestion
extend Column so that we add default serialiser/deserialiser methods to translate python types to db types and vice versa such as:
from piccolo.columns import Column
class Vector(Column):
def __init__(self, length: int, *args, **kwargs):
self.length = length
super().__init__(*args, **kwargs)
@property
def column_type(self):
return f"VECTOR({self.length})"
def serializer(self): # suggestion 1
"""a method to translate the python datatype to database column type"""
self.value = value.tostring()
return self
def deserializer(self): # suggestion 2
"""a method to translate the database column type to python datatype"""
self.value = self.value.tolist()
return self
and these serializer and deserializer are pre hooks when calling methods such as .todict() or .save()/writing to db.
These methods should also not generate side effects.
In my case, pgvector needs the vector to be a string such as '[1,2,3]' but in the python code it should be accessed as a iterable.
In my case, pgvector needs the vector to be a string such as
'[1,2,3]'but in the python code it should be accessed as a iterable.
@Jacky56 I don't know if this helps, but you can use the json module to transform '[1,2,3]' into an iterable (list) like this
import json
embeddings = '[1,2,3]'
results = json.loads(embeddings)
print(type(results)) # <class 'list'>
You can also look in this discussion. If Piccolo doesn't support a feature, you can always just use raw sql to get the desired result. Hope this helps.
hello @sinisaos
the suggested discussion is what I'm currently doing, but constantly using Table.raw to resolve most uncovered query needs kinda defeats the purpose of the ORM existing.
When I grab an object from the database, the python data structure is as shows:
obj = Table.objects().first()
obj.embedding # piccolo resolves this as a string when converting the custom column
Since its a custom column, therefore the dev should have the ability to edit the behaviour on how the column is serialised to python and deserialised as a sql statement.
which that I suspect a behaviour such as:
obj = Table.objects().first()
obj.embedding # piccolo resolves this as `np.ndarray` as the developer specified how it should be serialised from the defined method on `Column`
@Jacky56 I don't know if this helps, but you can use the methods of the table class to serialize/deserialize. Try something like this for serialization/deserialization data.
Example script
import asyncio
import json
import numpy as np
from piccolo.columns import Column, Varchar
from piccolo.engine import engine_finder
from piccolo.engine.postgres import PostgresEngine
from piccolo.table import Table
DB = PostgresEngine(
config={
"database": "vectordb",
"user": "postgres",
"password": "postgres",
"host": "localhost",
"port": 5432,
}
)
# Custom vector column
class Vector(Column):
def __init__(self, dimensions: int, *args, **kwargs):
self.dimensions = dimensions
super().__init__(*args, **kwargs)
@property
def column_type(self):
return f"VECTOR({self.dimensions})"
# Create a JSON Encoder class
class json_serialize(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, np.ndarray):
return obj.tolist()
return json.JSONEncoder.default(self, obj)
class Task(Table, db=DB):
"""
An example table.
"""
name = Varchar()
embeddings = Vector(dimensions=3)
# use class mmethods for serialization
@classmethod
def serializer(cls, value): # suggestion 1
"""a method to translate the python datatype to database column type"""
if isinstance(value, np.ndarray):
return json.dumps(value, cls=json_serialize)
else:
return json.dumps(value)
@classmethod
def deserializer(cls, value): # suggestion 2
"""a method to translate the database column type to python datatype"""
return json.loads(value)
async def open_database_connection_pool():
try:
engine = engine_finder()
await engine.start_connection_pool()
except Exception:
print("Unable to connect to the database")
async def close_database_connection_pool():
try:
engine = engine_finder()
await engine.close_connection_pool()
except Exception:
print("Unable to connect to the database")
async def main():
# create table
await Task.create_table(if_not_exists=True)
embedding = np.array([1, 2, 3]) # or [1, 2, 3]
# insert data using serializer
task = Task(name="Task 1", embeddings=Task.serializer(embedding))
await task.save()
# retrive data using deserializer
obj = await Task.objects()
for i in obj:
print(Task.deserializer(i.embeddings))
if __name__ == "__main__":
asyncio.run(main())
Sorry if I missed your point and don't understand your needs.
Thank you for your reply @sinisaos but its not quite whats desired.
The thing I am describing actually exist in the code here Column.get_sql_value, This translate the python type to the sql equivalent
as we can see here for example, it translates the python list to the psql array syntax.
It would be nice to somehow override this method.
For those who still struggled with this problem: derive JSON instead of Column will do the trick:
from typing import Unpack, override
from piccolo.columns import JSON, Serial
from piccolo.columns.base import ColumnKwargs
from piccolo.table import Table
class Vector(JSON):
def __init__(
self,
*,
dim: int = 1536,
**kwargs: Unpack[ColumnKwargs],
) -> None:
super().__init__(default=None, **kwargs)
self.dim = dim
class DenseVector(Vector):
@property
@override
def column_type(self): # type: ignore
return f"VECTOR({self.dim})"
class SparseVector(Vector):
@property
@override
def column_type(self): # type: ignore
return f"SPARSEVEC({self.dim})"
class VectorTable(Table):
id: Serial = Serial(primary=True)
dense: DenseVector = DenseVector()
VectorTable.insert(VectorTable(dense=[0] * 1536)).run_sync()
tables = VectorTable.select().output(load_json=True).run_sync()
For those who still struggled in this problem: derive
JSONinstead ofColumnsuffices:from typing import Unpack, override
from piccolo.columns import JSON, Serial from piccolo.columns.base import ColumnKwargs from piccolo.table import Table
class Vector(JSON): def init( self, *, dim: int = 1536, **kwargs: Unpack[ColumnKwargs], ) -> None: super().init(default=None, **kwargs) self.dim = dim
class DenseVector(Vector): @property @override def column_type(self): # type: ignore return f"VECTOR({self.dim})"
class SparseVector(Vector): @property @override def column_type(self): # type: ignore return f"SPARSEVEC({self.dim})"
class VectorTable(Table): id: Serial = Serial(primary=True) dense: DenseVector = DenseVector()
VectorTable.insert(VectorTable(dense=[0] * 1536)).run_sync() tables = VectorTable.select().output(load_json=True).run_sync()
Also, since pgvector requires a special format for sparse vector, you may also need this:
import json
import torch
def sparsify(vec: torch.Tensor) -> str:
indices = vec.nonzero(as_tuple=True)[0].tolist()
values = vec[indices].tolist()
# SQL arrays start at 1
value = {index + 1: value for index, value in zip(indices, values)}
return f"{value}/{len(vec)}"
def desparsify(sparse_vec: str) -> torch.Tensor:
value, dimensions = sparse_vec.split("/")
dimensions = int(dimensions)
value = json.loads(value)
vec = torch.zeros(dimensions, dtype=torch.float32)
for index, val in value.items():
# SQL arrays start at 1
vec[index - 1] = val
return vec