pyobvector
pyobvector copied to clipboard
A Python SDK for OceanBase Multimodal Store—enabling vector search, full-text search, and JSON table operations—offers both Milvus-compatible API and SQLAlchemy-based SQL mode, and supports both Ocean...
pyobvector
A python SDK for OceanBase Multimodal Store (Vector Store / Full Text Search / JSON Table), based on SQLAlchemy, compatible with Milvus API.
Installation
- git clone this repo, then install with:
uv sync
- install with pip:
pip install pyobvector==0.2.22
Build Doc
You can build document locally with sphinx:
mkdir build
make html
Release Notes
For detailed release notes and changelog, see RELEASE_NOTES.md.
Usage
pyobvector supports three modes:
-
Milvus compatible mode: You can use theMilvusLikeClientclass to use vector storage in a way similar to the Milvus API -
SQLAlchemy hybrid mode: You can use the vector storage function provided by theObVecClientclass and execute the relational database statement with the SQLAlchemy library. In this mode, you can regardpyobvectoras an extension of SQLAlchemy. -
Hybrid Search mode: You can use theHybridSearchclass to perform hybrid search that combines full-text search and vector similarity search, with Elasticsearch-compatible query syntax.
Milvus compatible mode
Refer to tests/test_milvus_like_client.py for more examples.
A simple workflow to perform ANN search with OceanBase Vector Store:
- setup a client:
from pyobvector import *
client = MilvusLikeClient(uri="127.0.0.1:2881", user="test@test")
- create a collection with vector index:
test_collection_name = "ann_test"
# define the schema of collection with optional partitions
range_part = ObRangePartition(False, range_part_infos = [
RangeListPartInfo('p0', 100),
RangeListPartInfo('p1', 'maxvalue'),
], range_expr='id')
schema = client.create_schema(partitions=range_part)
# define field schema of collection
schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
schema.add_field(field_name="embedding", datatype=DataType.FLOAT_VECTOR, dim=3)
schema.add_field(field_name="meta", datatype=DataType.JSON, nullable=True)
# define index parameters
idx_params = self.client.prepare_index_params()
idx_params.add_index(
field_name='embedding',
index_type=VecIndexType.HNSW,
index_name='vidx',
metric_type="L2",
params={"M": 16, "efConstruction": 256},
)
# create collection
client.create_collection(
collection_name=test_collection_name,
schema=schema,
index_params=idx_params,
)
- insert data to your collection:
# prepare
vector_value1 = [0.748479,0.276979,0.555195]
vector_value2 = [0, 0, 0]
data1 = [{'id': i, 'embedding': vector_value1} for i in range(10)]
data1.extend([{'id': i, 'embedding': vector_value2} for i in range(10, 13)])
data1.extend([{'id': i, 'embedding': vector_value2} for i in range(111, 113)])
# insert data
client.insert(collection_name=test_collection_name, data=data1)
- do ann search:
res = client.search(collection_name=test_collection_name, data=[0,0,0], anns_field='embedding', limit=5, output_fields=['id'])
# For example, the result will be:
# [{'id': 112}, {'id': 111}, {'id': 10}, {'id': 11}, {'id': 12}]
SQLAlchemy hybrid mode
- setup a client:
from pyobvector import *
from sqlalchemy import Column, Integer, JSON
from sqlalchemy import func
client = ObVecClient(uri="127.0.0.1:2881", user="test@test")
- create a partitioned table with vector index:
# create partitioned table
range_part = ObRangePartition(False, range_part_infos = [
RangeListPartInfo('p0', 100),
RangeListPartInfo('p1', 'maxvalue'),
], range_expr='id')
cols = [
Column('id', Integer, primary_key=True, autoincrement=False),
Column('embedding', VECTOR(3)),
Column('meta', JSON)
]
client.create_table(test_collection_name, columns=cols, partitions=range_part)
# create vector index
client.create_index(
test_collection_name,
is_vec_index=True,
index_name='vidx',
column_names=['embedding'],
vidx_params='distance=l2, type=hnsw, lib=vsag',
)
- insert data to your collection:
# insert data
vector_value1 = [0.748479,0.276979,0.555195]
vector_value2 = [0, 0, 0]
data1 = [{'id': i, 'embedding': vector_value1} for i in range(10)]
data1.extend([{'id': i, 'embedding': vector_value2} for i in range(10, 13)])
data1.extend([{'id': i, 'embedding': vector_value2} for i in range(111, 113)])
client.insert(test_collection_name, data=data1)
- do ann search:
# perform ann search with basic column selection
res = self.client.ann_search(
test_collection_name,
vec_data=[0,0,0],
vec_column_name='embedding',
distance_func=l2_distance,
topk=5,
output_column_names=['id'] # Legacy parameter
)
# For example, the result will be:
# [(112,), (111,), (10,), (11,), (12,)]
# perform ann search with SQLAlchemy expressions (recommended)
from sqlalchemy import Table, text, func
table = Table(test_collection_name, client.metadata_obj, autoload_with=client.engine)
res = self.client.ann_search(
test_collection_name,
vec_data=[0,0,0],
vec_column_name='embedding',
distance_func=l2_distance,
topk=5,
output_columns=[
table.c.id,
table.c.meta,
(table.c.id + 1000).label('id_plus_1000'),
text("JSON_EXTRACT(meta, '$.key') as extracted_key")
]
)
# For example, the result will be:
# [(112, '{"key": "value"}', 1112, 'value'), ...]
# perform ann search with distance threshold (filter results by distance)
res = self.client.ann_search(
test_collection_name,
vec_data=[0,0,0],
vec_column_name='embedding',
distance_func=l2_distance,
with_dist=True,
topk=10,
output_column_names=['id'],
distance_threshold=0.5 # Only return results where distance <= 0.5
)
# Only returns results with distance <= 0.5
# For example, the result will be:
# [(10, 0.0), (11, 0.0), ...] # Only includes results with distance <= 0.5
ann_search Parameters
The ann_search method supports flexible output column selection through the output_columns parameter:
-
output_columns(recommended): Accepts SQLAlchemy Column objects, expressions, or a mix of both- Column objects:
table.c.id,table.c.name - Expressions:
(table.c.age + 10).label('age_plus_10') - JSON queries:
text("JSON_EXTRACT(meta, '$.key') as extracted_key") - String functions:
func.concat(table.c.name, ' (', table.c.age, ')').label('name_age')
- Column objects:
-
output_column_names(legacy): Accepts list of column name strings- Example:
['id', 'name', 'meta']
- Example:
-
Parameter Priority:
output_columnstakes precedence overoutput_column_nameswhen both are provided -
distance_threshold(optional): Filter results by distance threshold- Type:
Optional[float] - Only returns results where
distance <= threshold - Example:
distance_threshold=0.5returns only results with distance <= 0.5 - Use case: Quality control for similarity search, only return highly similar results
- Type:
-
If you want to use pure
SQLAlchemyAPI withOceanBasedialect, you can just get anSQLAlchemy.engineviaclient.engine. The engine can also be created as following:
import pyobvector
from sqlalchemy.dialects import registry
from sqlalchemy import create_engine
uri: str = "127.0.0.1:2881"
user: str = "root@test"
password: str = ""
db_name: str = "test"
registry.register("mysql.oceanbase", "pyobvector.schema.dialect", "OceanBaseDialect")
connection_str = (
f"mysql+oceanbase://{user}:{password}@{uri}/{db_name}?charset=utf8mb4"
)
engine = create_engine(connection_str, **kwargs)
- Async engine is also supported:
import pyobvector
from sqlalchemy.dialects import registry
from sqlalchemy.ext.asyncio import create_async_engine
uri: str = "127.0.0.1:2881"
user: str = "root@test"
password: str = ""
db_name: str = "test"
registry.register("mysql.aoceanbase", "pyobvector", "AsyncOceanBaseDialect")
connection_str = (
f"mysql+aoceanbase://{user}:{password}@{uri}/{db_name}?charset=utf8mb4"
)
engine = create_async_engine(connection_str)
- For further usage in pure
SQLAlchemymode, please refer to SQLAlchemy
Hybrid Search Mode
pyobvector supports hybrid search that combines full-text search and vector similarity search, with query syntax compatible with Elasticsearch. This allows you to perform semantic search with both keyword matching and vector similarity in a single query.
- setup a client:
from pyobvector import *
from pyobvector.client.hybrid_search import HybridSearch
from sqlalchemy import Column, Integer, VARCHAR
client = HybridSearch(uri="127.0.0.1:2881", user="test@test")
Note: Hybrid search requires OceanBase version >= 4.4.1.0, or SeekDB.
- create a table with both vector index and full-text index:
test_table_name = "hybrid_search_test"
# create table with vector and text columns
client.create_table(
table_name=test_table_name,
columns=[
Column("id", Integer, primary_key=True, autoincrement=False),
Column("source_id", VARCHAR(32)),
Column("enabled", Integer),
Column("vector", VECTOR(3)), # vector column
Column("title", VARCHAR(255)), # text column for full-text search
Column("content", VARCHAR(255)), # text column for full-text search
],
indexes=[
VectorIndex("vec_idx", "vector", params="distance=l2, type=hnsw, lib=vsag"),
],
mysql_charset='utf8mb4',
mysql_collate='utf8mb4_unicode_ci',
)
# create full-text indexes for text columns
from pyobvector import FtsIndexParam, FtsParser
for col in ["title", "content"]:
client.create_fts_idx_with_fts_index_param(
table_name=test_table_name,
fts_idx_param=FtsIndexParam(
index_name=f"fts_idx_{col}",
field_names=[col],
parser_type=FtsParser.IK, # or other parser types
),
)
- insert data:
client.insert(
table_name=test_table_name,
data=[
{
"id": 1,
"source_id": "3b767712b57211f09c170242ac130008",
"enabled": 1,
"vector": [1, 1, 1],
"title": "ä¼ä¸çå社åºççåè½å·®å¼",
"content": "OceanBase æ°æ®åºæä¾ä¼ä¸çå社åºç两ç§å½¢æã",
},
{
"id": 2,
"vector": [1, 2, 3],
"enabled": 1,
"source_id": "3b791472b57211f09c170242ac130008",
"title": "å¿«éä½éª OceanBase 社åºç",
"content": "æ¬ææ ¹æ®ä½¿ç¨åºæ¯è¯¦ç»ä»ç»å¦ä½å¿«éé¨ç½² OceanBase æ°æ®åºã",
},
# ... more data
]
)
- perform hybrid search with Elasticsearch-compatible query syntax:
# build query body (compatible with Elasticsearch syntax)
query = {
"bool": {
"must": [
{
"query_string": {
"fields": ["title^10", "content"], # field weights
"type": "best_fields",
"query": "oceanbase æ°æ® è¿ç§»",
"minimum_should_match": "30%",
"boost": 1
}
}
],
"filter": [
{
"terms": {
"source_id": [
"3b791472b57211f09c170242ac130008",
"3b7af31eb57211f09c170242ac130008"
]
}
},
{
"bool": {
"must_not": [
{
"range": {
"enabled": {"lt": 1}
}
}
]
}
}
],
"boost": 0.7
}
}
body = {
"query": query,
"knn": { # vector similarity search
"field": "vector",
"k": 1024,
"num_candidates": 1024,
"query_vector": [1, 2, 3],
"filter": query, # optional: apply same filter to KNN
"similarity": 0.2 # similarity threshold
},
"from": 0, # pagination offset
"size": 60 # pagination size
}
# execute hybrid search
results = client.search(index=test_table_name, body=body)
# results is a list of matching documents
Supported Query Types
The hybrid search supports Elasticsearch-compatible query syntax:
-
boolquery: Combine multiple queries withmust,must_not,should,filter -
query_string: Full-text search with field weights, boost, and matching options -
terms: Exact match filtering for multiple values -
range: Range queries (lt,lte,gt,gte) -
knn: Vector similarity search (KNN) with:-
field: Vector field name -
query_vector: Query vector -
k: Number of results to return -
num_candidates: Number of candidates to consider -
filter: Optional filter to apply to KNN search -
similarity: Similarity threshold
-
-
Pagination:
fromandsizeparameters
Get SQL Query
You can also get the actual SQL that will be executed:
sql = client.get_sql(index=test_table_name, body=body)
print(sql) # prints the SQL query