tutorial icon indicating copy to clipboard operation
tutorial copied to clipboard

Running milvus similarity search in ray

Open SandorSeres opened this issue 2 years ago • 0 comments

Hi,
I have a basic similarity search test program creating a collection.
-----------------------------------
import time
from pymilvus import (Collection, CollectionSchema, DataType, FieldSchema,
                      connections)
def connect(host: str, port: int) :
    # Connect to server
    while not connections.has_connection('default'):
        print('Connecting to server...')
        try:
            connections.connect('default', host=host, port=port)
            print(connections.has_connection('default'))
            print(connections.get_connection_addr('default'))
            print('Connected...')
        except Exception as e:
            print(f'Failed to connect: {e}')
            print('Retrying in 5 seconds...')
            time.sleep(5)
    return "default"

def create_collection(collection_name):
    try :
        param = [
            FieldSchema(name="pk", dtype=DataType.INT64, is_primary=True, auto_id=True),
            FieldSchema(name="topic", dtype=DataType.VARCHAR, max_length=100),
            FieldSchema(name="embeddings", dtype=DataType.FLOAT_VECTOR, dim=10)
        ]
    except Exception as e :
        print("param",e)
    try:
        schema = CollectionSchema(param, "Similarity search in the given schema to find the")
    except Exception as e :
        print("schema",e)
    try:
        collection = Collection(collection_name, schema, consistency_level="Strong", using='default')
    except Exception as e :
        print("collection",e)
    return collection

connect_handle = connect("192.168.1.18", 19530)
collection = create_collection( collection_name = 'data')
print(collection)
-----------------------------
It is working without problem.
But:
If I want to run this in **ray distributed environment** it is not working.
The code is:
---------------------------------------
import time

import pymilvus
import ray
from pymilvus import (Collection, CollectionSchema, DataType, FieldSchema,
                      connections)
@ray.remote
def connect(host: str, port: int) :
    # Connect to server
    while not connections.has_connection('default'):
        print('Connecting to server...')
        try:
            connections.connect('default', host=host, port=port)
            print(connections.get_connection_addr('default'))
            print('Connected...')
        except Exception as e:
            print(f'Failed to connect: {e}')
            print('Retrying in 5 seconds...')
            time.sleep(5)
    return "default"

@ray.remote
def create_collection(collection_name):
    try :
        param = [
            FieldSchema(name="pk", dtype=DataType.INT64, is_primary=True, auto_id=True),
            FieldSchema(name="topic", dtype=DataType.VARCHAR, max_length=100),
            FieldSchema(name="embeddings", dtype=DataType.FLOAT_VECTOR, dim=10)
        ]
    except Exception as e :
        print("param",e)
    try:
        schema = CollectionSchema(param, "Similarity search in the given schema to find the")
    except Exception as e :
        print("schema",e)
    try:
        collection = Collection(collection_name, schema, consistency_level="Strong", using='default')
    except Exception as e :
        print("collection",e)
    return collection
 
connect_handle = connect.remote("192.168.1.18", 19530)
ray.get(connect_handle)
collection = create_collection.remote( collection_name = 'data')
print(collection)
------------------------------The error is:
2023-01-25 09:34:02,613 WARNING services.py:1732 -- WARNING: The object store is using /tmp instead of /dev/shm because /dev/shm has only 4445294592 bytes available. This will harm performance! You may be able to free up space by deleting files in /dev/shm. If you are inside a Docker container, you can increase /dev/shm size by passing '--shm-size=4.74gb' to 'docker run' (or add it to the run_options list in a Ray cluster config). Make sure to set this to more than 30% of available RAM.
2023-01-25 09:34:02,739 INFO worker.py:1529 -- Started a local Ray instance. View the dashboard at http://127.0.0.1:8265 
ObjectRef(16310a0f0a45af5cffffffffffffffffffffffff0100000001000000)
(connect pid=67324) Connecting to server...
(connect pid=67324) {'address': '192.168.1.18:19530', 'user': ''}
(connect pid=67324) Connected...
(create_collection pid=67326) collection <ConnectionNotExistException: (code=1, message=should create connect first.)>
-------------------------------------------
I think the problem is somewhere in the sharing of the connect handle between tasks.
I also tried to create an actor, but had the same problem.
Any idea?
Thanx

SandorSeres avatar Jan 25 '23 09:01 SandorSeres