milvus
milvus copied to clipboard
[Bug]: Filtering by partition key fails to return results for bulk imported data
Is there an existing issue for this?
- [X] I have searched the existing issues
Environment
- Milvus version: 2.4.1
- Deployment mode(standalone or cluster): standalone
- MQ type(rocksmq, pulsar or kafka): -
- SDK version(e.g. pymilvus v2.0.0rc2): pymilvus 2.4.3
- OS(Ubuntu or CentOS): Ubuntu 18.04
- CPU/Memory: CPU (AMD Ryzen Threadripper 1920X 12-Core Processor) / 64 GB memory
- GPU: -
- Others: -
Current Behavior
I import data using the bulk import feature with a parquet file containing the following fields:
-
item_id
(primary key) -
partition_id
(partition key) -
embedding
(float vector)
I can successfully query and filter by item_id
.
However, when I attempt to filter by partition_id
(partition key) with an existing value, the result set is empty.
This behavior is observed using both the Python SDK and the Attu UI.
This behavior cannot be reproduced when I insert the same data using the Python SDK.
Expected Behavior
When importing data using the bulk import feature and filtering by the partition key with an existing value, the result set should contain all entries for the queried partition key.
Steps To Reproduce
The following script can be used to
1. create the test data for a bulk import,
2. import the data into a new collection and
3. query the collection using an existing partition key returning an empty result.
Steps:
1. Create the test data by running the script as is (output is written to a new folder `data`)
2. Upload the bulk data file to the Milvus MinIO
3. Comment `step_1` and uncomment `step_2` in the script, fill out MILVUS_HOST, ITEM_ID (from write_bulk_data output) and PATH_TO_BULK_DATA_FILE and run the script to create and populate the collection
3. Comment `step_2` and uncomment `step_3` to perform the test query.
```python
import time
import uuid
import numpy as np
from pymilvus import CollectionSchema, MilvusClient, FieldSchema, DataType, BulkInsertState, connections
from pymilvus.bulk_writer import LocalBulkWriter, BulkFileType
from pymilvus.milvus_client import IndexParams
from pymilvus.orm import utility
COLLECTION_SCHEMA = CollectionSchema(
fields=[
FieldSchema(name="item_id", dtype=DataType.VARCHAR, is_primary=True, auto_id=False, max_length=100),
FieldSchema(name="partition_id", dtype=DataType.VARCHAR, max_length=100, is_partition_key=True),
FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=768),
]
)
def write_bulk_data(output_path: str):
writer = LocalBulkWriter(
schema=COLLECTION_SCHEMA,
local_path=output_path,
segment_size=4 * 1024 * 1024,
file_type=BulkFileType.PARQUET,
)
for i in range(100):
_id = uuid.uuid4().hex
writer.append_row(
{
"item_id": _id,
"partition_id": "partition-id-1",
"embedding": np.random.rand(768),
}
)
def callback(location):
print("Commit completed", location)
writer.commit(call_back=callback)
def create_collection(client: MilvusClient):
index_params = IndexParams()
index_params.add_index(
field_name="embedding",
index_type="HNSW",
index_name="embedding_index",
metric_type="IP",
params={"M": 32, "efConstruction": 160},
)
client.create_collection(
collection_name="test_data",
schema=COLLECTION_SCHEMA,
index_params=index_params,
num_partitions=10,
consistency_level="Strong",
)
def import_data(address, files):
try:
connections.connect(
address=address,
db_name="default",
alias="default",
)
task_id = utility.do_bulk_insert(
collection_name="test_data",
files=files,
)
while True:
insert_state = utility.get_bulk_insert_state(task_id)
print(f"Insert state: {insert_state}")
if insert_state.state in [
BulkInsertState.ImportFailed,
BulkInsertState.ImportCompleted,
BulkInsertState.ImportPersisted,
]:
break
time.sleep(1)
while True:
index_status = utility.index_building_progress(collection_name="test_data", index_name="embedding_index")
print(
f"Indexing indexed rows: {index_status.get('indexed_rows', 0)}, pending_index_rows: {index_status.get('pending_index_rows', 0)}"
)
if index_status.get("pending_index_rows", 0) <= 0:
break
time.sleep(1)
finally:
connections.disconnect(alias="default")
def query(client: MilvusClient):
print("Query [filter by partition_id]:")
res = client.query(collection_name="test_data", filter="partition_id=='partition-id-1'", consistency_level="Strong")
print("Result: ", res)
def step_1(output_path: str):
write_bulk_data(output_path=output_path)
def step_2(milvus_host, files_to_import):
client = MilvusClient(uri=f"http://{milvus_host}", db_name="default")
try:
print("Creating collection...")
create_collection(client)
print("Importing data...")
import_data(milvus_host, files_to_import)
finally:
client.close()
def step_3(milvus_host):
client = MilvusClient(uri=f"http://{milvus_host}", db_name="default")
try:
print("Querying data...")
query(client)
finally:
client.close()
if __name__ == "__main__":
# =========================
# Step 1: create bulk data
# =========================
step_1(output_path="./data")
# upload data to Milvus Minio
# ==========================================
# Step 2: uncomment to import data to Milvus
# ==========================================
# MILVUS_HOST = "<MILVUS_HOST>"
# FILES_TO_IMPORT = ["<PATH TO BULK DATA FILE>"]
#
# step_2(
# milvus_host=MILVUS_HOST,
# files_to_import=FILES_TO_IMPORT,
# )
# ===============================
# Step 3: uncomment to query data
# ===============================
# step_3(
# milvus_host=MILVUS_HOST,
# )
Milvus Log
No response
Anything else?
No response