milvus icon indicating copy to clipboard operation
milvus copied to clipboard

[Bug]: calling num_entities with multiprocessing insert will get less insert numbers

Open XuanYang-cn opened this issue 2 years ago • 8 comments

Is there an existing issue for this?

  • [X] I have searched the existing issues

Environment

- Milvus version:
- Deployment mode(standalone or cluster):
- SDK version(e.g. pymilvus v2.0.0rc2):
- OS(Ubuntu or CentOS): 
- CPU/Memory: 
- GPU: 
- Others: 

See also: #16990

Current Behavior

import time
import os
from multiprocessing import Process

import numpy as np
from pymilvus import (
    connections,
    utility,
    FieldSchema, CollectionSchema, DataType,
    Collection,
)

num_entities, dim, loopCnt, mpCnt = 1000, 2048, 10, 10


def insertData(idx):
    connections.connect()

    hello_milvus = Collection("hello_milvus")

    print("Start inserting entities:", idx)
    rng = np.random.default_rng(seed=19530)
    errCnt = 0
    insertCnt = 0
    updateCnt = 0
    for m in range(loopCnt):
        entities = [
            [i for i in range(idx * loopCnt * num_entities + num_entities * m, idx * loopCnt * num_entities + num_entities * m + num_entities)],
            rng.random(num_entities).tolist(),
            rng.random((num_entities, dim)),
        ]

        try:
            insert_result = hello_milvus.insert(entities)
            insertCnt += insert_result.insert_count
            updateCnt += insert_result.upsert_count
        except Exception as r:
            errCnt += 1
            print('exception %s' % (r))

        # print("insert end")

    print(f"end inserting entities: {idx}, insertCnt: {insertCnt}, updateCnt: "
           f"{updateCnt}, errCnt: {errCnt}, num_entities: {hello_milvus.num_entities}")
           # f"{updateCnt}, errCnt: {errCnt}")
    connections.disconnect("default")


def run():

    processes = []
    print("insert start")
    start_time = time.time()
    for i in range(mpCnt):
        p = Process(target=insertData, args=(i,))
        p.start()
        processes.append(p)

    for p in processes:
        p.join()
    end_time = time.time()

    print("insert end", end_time - start_time)


def check_at_last():
    connections.connect()
    hello_milvus = Collection("hello_milvus")

    print(f"final check: {hello_milvus.num_entities}")


if __name__ == "__main__":
    run()
    check_at_last()

run the first:

insert start
Start inserting entities: 0
Start inserting entities: 1
Start inserting entities: 3
Start inserting entities: 2
Start inserting entities: 4
Start inserting entities: 8
Start inserting entities: 9
Start inserting entities: 7
Start inserting entities: 6
Start inserting entities: 5
end inserting entities: 9, insertCnt: 10000, updateCnt: 0, errCnt: 0, num_entities: 100000
end inserting entities: 5, insertCnt: 10000, updateCnt: 0, errCnt: 0, num_entities: 100000
end inserting entities: 3, insertCnt: 10000, updateCnt: 0, errCnt: 0, num_entities: 100000
end inserting entities: 4, insertCnt: 10000, updateCnt: 0, errCnt: 0, num_entities: 100000
end inserting entities: 0, insertCnt: 10000, updateCnt: 0, errCnt: 0, num_entities: 100000
end inserting entities: 2, insertCnt: 10000, updateCnt: 0, errCnt: 0, num_entities: 100000
end inserting entities: 7, insertCnt: 10000, updateCnt: 0, errCnt: 0, num_entities: 100000
end inserting entities: 1, insertCnt: 10000, updateCnt: 0, errCnt: 0, num_entities: 100000
end inserting entities: 6, insertCnt: 10000, updateCnt: 0, errCnt: 0, num_entities: 100000
end inserting entities: 8, insertCnt: 10000, updateCnt: 0, errCnt: 0, num_entities: 100000
insert end 37.51774525642395
final check: 100000

run the second time:

insert start
Start inserting entities: 1
Start inserting entities: 3
Start inserting entities: 0
Start inserting entities: 4
Start inserting entities: 2
Start inserting entities: 6
Start inserting entities: 7
Start inserting entities: 5
Start inserting entities: 8
Start inserting entities: 9
end inserting entities: 1, insertCnt: 10000, updateCnt: 0, errCnt: 0, num_entities: 195498
end inserting entities: 6, insertCnt: 10000, updateCnt: 0, errCnt: 0, num_entities: 199008
end inserting entities: 2, insertCnt: 10000, updateCnt: 0, errCnt: 0, num_entities: 199498
end inserting entities: 9, insertCnt: 10000, updateCnt: 0, errCnt: 0, num_entities: 199498
end inserting entities: 7, insertCnt: 10000, updateCnt: 0, errCnt: 0, num_entities: 199498
end inserting entities: 3, insertCnt: 10000, updateCnt: 0, errCnt: 0, num_entities: 199498
end inserting entities: 4, insertCnt: 10000, updateCnt: 0, errCnt: 0, num_entities: 199498
end inserting entities: 0, insertCnt: 10000, updateCnt: 0, errCnt: 0, num_entities: 199498
end inserting entities: 5, insertCnt: 10000, updateCnt: 0, errCnt: 0, num_entities: 199498
end inserting entities: 8, insertCnt: 10000, updateCnt: 0, errCnt: 0, num_entities: 199498
insert end 37.44102215766907
final check: 199498

Expected Behavior

No response

Steps To Reproduce

No response

Milvus Log

No response

Anything else?

No response

XuanYang-cn avatar May 19 '22 06:05 XuanYang-cn

See also: #16990

XuanYang-cn avatar May 19 '22 06:05 XuanYang-cn

But I can't reproduce with another script of multiprocessing

import time
import os
import multiprocessing

import numpy as np
from pymilvus import connections
from pymilvus import (
    utility,
    FieldSchema, CollectionSchema, DataType,
    Collection,
)


fmt = "\n=== {:30} ==="
search_latency_fmt = "search latency = {:.4f}s"
num_entities, dim = 1000, 2048


def _initialize_worker():
    name = multiprocessing.current_process().name
    from pymilvus import connections
    connections.connect()
    print(fmt.format(f"initializing process: {name}"))


class MilvusMultiProcessingInsert:
    def __init__(self, collection_name: str, number_of_batch: int):

        self.collection_name = collection_name
        self.batchs = [i for i in range(number_of_batch)]

    def insert_data(self, number: int):
        name = multiprocessing.current_process().name
        hello_milvus = Collection(self.collection_name)
        print(fmt.format(f"No.{number:2}, process: {name}: Start inserting entities"))
        rng = np.random.default_rng(seed=number)
        entities = [
            [i for i in range(num_entities)],
            rng.random(num_entities).tolist(),
            rng.random((num_entities, dim)),
        ]

        for i in range(10):
            insert_result = hello_milvus.insert(entities)
            assert len(insert_result.primary_keys) == num_entities
            print(fmt.format(f"No.{number:2}, process: {name}, No.{i}"))
        print(fmt.format(f"No.{number:2}, process: {name}: Finish inserting entities, num_entities: {hello_milvus.num_entities}"))

    def _prepare_collection(self, is_new_collection: bool):
        print(fmt.format(f"start connecting to Milvus, process: {self.pname}"))
        connections.connect()

        if utility.has_collection("hello_milvus") and is_new_collection:
            utility.drop_collection("hello_milvus")
            print(f"Dropping existing collection hello_milvus")

        fields = [
            FieldSchema(name="pk", dtype=DataType.INT64, is_primary=True, auto_id=False),
            FieldSchema(name="random", dtype=DataType.DOUBLE),
            FieldSchema(name="embeddings", dtype=DataType.FLOAT_VECTOR, dim=dim)
        ]

        schema = CollectionSchema(fields, "hello_milvus is the simplest demo to introduce the APIs")

        print(fmt.format("Create collection `hello_milvus`"))
        h = Collection("hello_milvus", schema, consistency_level="Strong")
        connections.disconnect("default")
        return h

    def insert_all_batches(self):
        with multiprocessing.Pool(processes=12, initializer=_initialize_worker) as pool:
            pool.map(self.insert_data, self.batchs)

    def run(self, is_new_collection=True):
        self.pname = multiprocessing.current_process().name
        hello_milvus = self._prepare_collection(is_new_collection)

        start_time = time.time()
        self.insert_all_batches()
        duration = time.time() - start_time

        connections.connect()
        print(f'Inserted {len(self.batchs)} batches of {num_entities} entities in {duration} seconds, process: {self.pname}')
        print(f"Expected num_entities: {len(self.batchs)*num_entities}. \
                Acutal num_entites: {hello_milvus.num_entities}, process: {self.pname}")


if __name__ == "__main__":
    multiprocessing_insert = MilvusMultiProcessingInsert("hello_milvus", 10)
    multiprocessing_insert.run(is_new_collection=False)

XuanYang-cn avatar May 19 '22 06:05 XuanYang-cn

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions. Rotten issues close after 30d of inactivity. Reopen the issue with /reopen.

stale[bot] avatar Jun 18 '22 07:06 stale[bot]

keep it

xiaofan-luan avatar Jun 20 '22 05:06 xiaofan-luan

image

DataCoord flushed this segment, yet DataNode is still receiving insert data of it.

XuanYang-cn avatar Jun 22 '22 10:06 XuanYang-cn

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions. Rotten issues close after 30d of inactivity. Reopen the issue with /reopen.

stale[bot] avatar Jul 22 '22 14:07 stale[bot]

/unstale

XuanYang-cn avatar Jul 29 '22 07:07 XuanYang-cn

any progress on it?

xiaofan-luan avatar Aug 10 '22 15:08 xiaofan-luan

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions. Rotten issues close after 30d of inactivity. Reopen the issue with /reopen.

stale[bot] avatar Sep 10 '22 08:09 stale[bot]

/unstale

XuanYang-cn avatar Sep 15 '22 03:09 XuanYang-cn

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions. Rotten issues close after 30d of inactivity. Reopen the issue with /reopen.

stale[bot] avatar Oct 15 '22 06:10 stale[bot]

@wangting0128 did we see the case concurrent insert causing entity count not match?

xiaofan-luan avatar Nov 24 '22 12:11 xiaofan-luan

@xiaofan-luan The stability of multi-threaded insertion is verified, but the number of data items after the insertion is not verified

wangting0128 avatar Nov 28 '22 02:11 wangting0128

DataNode and DC changed a lot after this issue.

Let me try on 2.2 and master to check if reproducing.

XuanYang-cn avatar Nov 28 '22 02:11 XuanYang-cn

Not reproducing in master:

(py10)  ~/Scripts/issue17107  python test_mp_insert.py --new
YX: ['test_mp_insert.py', '--new']
Create collection hello_milvus
insert start
Start inserting entities: 0
Start inserting entities: 1
Start inserting entities: 2
Start inserting entities: 3
Start inserting entities: 5
Start inserting entities: 8
Start inserting entities: 9
Start inserting entities: 4
Start inserting entities: 7
Start inserting entities: 6
end inserting entities: 1, insertCnt: 10000, updateCnt: 0, errCnt: 0, num_entities: 82517
end inserting entities: 3, insertCnt: 10000, updateCnt: 0, errCnt: 0, num_entities: 82517
end inserting entities: 9, insertCnt: 10000, updateCnt: 0, errCnt: 0, num_entities: 82517
end inserting entities: 6, insertCnt: 10000, updateCnt: 0, errCnt: 0, num_entities: 82517
end inserting entities: 2, insertCnt: 10000, updateCnt: 0, errCnt: 0, num_entities: 91000
end inserting entities: 4, insertCnt: 10000, updateCnt: 0, errCnt: 0, num_entities: 91000
end inserting entities: 5, insertCnt: 10000, updateCnt: 0, errCnt: 0, num_entities: 91000
end inserting entities: 0, insertCnt: 10000, updateCnt: 0, errCnt: 0, num_entities: 91000
end inserting entities: 7, insertCnt: 10000, updateCnt: 0, errCnt: 0, num_entities: 91000
end inserting entities: 8, insertCnt: 10000, updateCnt: 0, errCnt: 0, num_entities: 91000
insert end 9.528656482696533
final check: 100000
(py10)  ~/Scripts/issue17107  python test_mp_insert.py      
['test_mp_insert.py']
insert start
Start inserting entities: 0
Start inserting entities: 1
Start inserting entities: 2
Start inserting entities: 3
Start inserting entities: 4
Start inserting entities: 7
Start inserting entities: 8
Start inserting entities: 9
Start inserting entities: 5
Start inserting entities: 6
end inserting entities: 0, insertCnt: 10000, updateCnt: 0, errCnt: 0, num_entities: 182474
end inserting entities: 2, insertCnt: 10000, updateCnt: 0, errCnt: 0, num_entities: 182474
end inserting entities: 3, insertCnt: 10000, updateCnt: 0, errCnt: 0, num_entities: 182474
end inserting entities: 8, insertCnt: 10000, updateCnt: 0, errCnt: 0, num_entities: 182474
end inserting entities: 4, insertCnt: 10000, updateCnt: 0, errCnt: 0, num_entities: 182474
end inserting entities: 1, insertCnt: 10000, updateCnt: 0, errCnt: 0, num_entities: 182474
end inserting entities: 7, insertCnt: 10000, updateCnt: 0, errCnt: 0, num_entities: 182474
end inserting entities: 5, insertCnt: 10000, updateCnt: 0, errCnt: 0, num_entities: 182474
end inserting entities: 9, insertCnt: 10000, updateCnt: 0, errCnt: 0, num_entities: 182474
end inserting entities: 6, insertCnt: 10000, updateCnt: 0, errCnt: 0, num_entities: 182474
insert end 10.268097877502441
final check: 200000

XuanYang-cn avatar Nov 28 '22 03:11 XuanYang-cn

Not reproducing in 2.2.0 either

XuanYang-cn avatar Nov 28 '22 03:11 XuanYang-cn

Cannot reproduce, closing now

XuanYang-cn avatar Nov 28 '22 03:11 XuanYang-cn