milvus
milvus copied to clipboard
[Bug]: calling num_entities with multiprocessing insert will get less insert numbers
Is there an existing issue for this?
- [X] I have searched the existing issues
Environment
- Milvus version:
- Deployment mode(standalone or cluster):
- SDK version(e.g. pymilvus v2.0.0rc2):
- OS(Ubuntu or CentOS):
- CPU/Memory:
- GPU:
- Others:
See also: #16990
Current Behavior
import time
import os
from multiprocessing import Process
import numpy as np
from pymilvus import (
connections,
utility,
FieldSchema, CollectionSchema, DataType,
Collection,
)
num_entities, dim, loopCnt, mpCnt = 1000, 2048, 10, 10
def insertData(idx):
connections.connect()
hello_milvus = Collection("hello_milvus")
print("Start inserting entities:", idx)
rng = np.random.default_rng(seed=19530)
errCnt = 0
insertCnt = 0
updateCnt = 0
for m in range(loopCnt):
entities = [
[i for i in range(idx * loopCnt * num_entities + num_entities * m, idx * loopCnt * num_entities + num_entities * m + num_entities)],
rng.random(num_entities).tolist(),
rng.random((num_entities, dim)),
]
try:
insert_result = hello_milvus.insert(entities)
insertCnt += insert_result.insert_count
updateCnt += insert_result.upsert_count
except Exception as r:
errCnt += 1
print('exception %s' % (r))
# print("insert end")
print(f"end inserting entities: {idx}, insertCnt: {insertCnt}, updateCnt: "
f"{updateCnt}, errCnt: {errCnt}, num_entities: {hello_milvus.num_entities}")
# f"{updateCnt}, errCnt: {errCnt}")
connections.disconnect("default")
def run():
processes = []
print("insert start")
start_time = time.time()
for i in range(mpCnt):
p = Process(target=insertData, args=(i,))
p.start()
processes.append(p)
for p in processes:
p.join()
end_time = time.time()
print("insert end", end_time - start_time)
def check_at_last():
connections.connect()
hello_milvus = Collection("hello_milvus")
print(f"final check: {hello_milvus.num_entities}")
if __name__ == "__main__":
run()
check_at_last()
run the first:
insert start
Start inserting entities: 0
Start inserting entities: 1
Start inserting entities: 3
Start inserting entities: 2
Start inserting entities: 4
Start inserting entities: 8
Start inserting entities: 9
Start inserting entities: 7
Start inserting entities: 6
Start inserting entities: 5
end inserting entities: 9, insertCnt: 10000, updateCnt: 0, errCnt: 0, num_entities: 100000
end inserting entities: 5, insertCnt: 10000, updateCnt: 0, errCnt: 0, num_entities: 100000
end inserting entities: 3, insertCnt: 10000, updateCnt: 0, errCnt: 0, num_entities: 100000
end inserting entities: 4, insertCnt: 10000, updateCnt: 0, errCnt: 0, num_entities: 100000
end inserting entities: 0, insertCnt: 10000, updateCnt: 0, errCnt: 0, num_entities: 100000
end inserting entities: 2, insertCnt: 10000, updateCnt: 0, errCnt: 0, num_entities: 100000
end inserting entities: 7, insertCnt: 10000, updateCnt: 0, errCnt: 0, num_entities: 100000
end inserting entities: 1, insertCnt: 10000, updateCnt: 0, errCnt: 0, num_entities: 100000
end inserting entities: 6, insertCnt: 10000, updateCnt: 0, errCnt: 0, num_entities: 100000
end inserting entities: 8, insertCnt: 10000, updateCnt: 0, errCnt: 0, num_entities: 100000
insert end 37.51774525642395
final check: 100000
run the second time:
insert start
Start inserting entities: 1
Start inserting entities: 3
Start inserting entities: 0
Start inserting entities: 4
Start inserting entities: 2
Start inserting entities: 6
Start inserting entities: 7
Start inserting entities: 5
Start inserting entities: 8
Start inserting entities: 9
end inserting entities: 1, insertCnt: 10000, updateCnt: 0, errCnt: 0, num_entities: 195498
end inserting entities: 6, insertCnt: 10000, updateCnt: 0, errCnt: 0, num_entities: 199008
end inserting entities: 2, insertCnt: 10000, updateCnt: 0, errCnt: 0, num_entities: 199498
end inserting entities: 9, insertCnt: 10000, updateCnt: 0, errCnt: 0, num_entities: 199498
end inserting entities: 7, insertCnt: 10000, updateCnt: 0, errCnt: 0, num_entities: 199498
end inserting entities: 3, insertCnt: 10000, updateCnt: 0, errCnt: 0, num_entities: 199498
end inserting entities: 4, insertCnt: 10000, updateCnt: 0, errCnt: 0, num_entities: 199498
end inserting entities: 0, insertCnt: 10000, updateCnt: 0, errCnt: 0, num_entities: 199498
end inserting entities: 5, insertCnt: 10000, updateCnt: 0, errCnt: 0, num_entities: 199498
end inserting entities: 8, insertCnt: 10000, updateCnt: 0, errCnt: 0, num_entities: 199498
insert end 37.44102215766907
final check: 199498
Expected Behavior
No response
Steps To Reproduce
No response
Milvus Log
No response
Anything else?
No response
See also: #16990
But I can't reproduce with another script of multiprocessing
import time
import os
import multiprocessing
import numpy as np
from pymilvus import connections
from pymilvus import (
utility,
FieldSchema, CollectionSchema, DataType,
Collection,
)
fmt = "\n=== {:30} ==="
search_latency_fmt = "search latency = {:.4f}s"
num_entities, dim = 1000, 2048
def _initialize_worker():
name = multiprocessing.current_process().name
from pymilvus import connections
connections.connect()
print(fmt.format(f"initializing process: {name}"))
class MilvusMultiProcessingInsert:
def __init__(self, collection_name: str, number_of_batch: int):
self.collection_name = collection_name
self.batchs = [i for i in range(number_of_batch)]
def insert_data(self, number: int):
name = multiprocessing.current_process().name
hello_milvus = Collection(self.collection_name)
print(fmt.format(f"No.{number:2}, process: {name}: Start inserting entities"))
rng = np.random.default_rng(seed=number)
entities = [
[i for i in range(num_entities)],
rng.random(num_entities).tolist(),
rng.random((num_entities, dim)),
]
for i in range(10):
insert_result = hello_milvus.insert(entities)
assert len(insert_result.primary_keys) == num_entities
print(fmt.format(f"No.{number:2}, process: {name}, No.{i}"))
print(fmt.format(f"No.{number:2}, process: {name}: Finish inserting entities, num_entities: {hello_milvus.num_entities}"))
def _prepare_collection(self, is_new_collection: bool):
print(fmt.format(f"start connecting to Milvus, process: {self.pname}"))
connections.connect()
if utility.has_collection("hello_milvus") and is_new_collection:
utility.drop_collection("hello_milvus")
print(f"Dropping existing collection hello_milvus")
fields = [
FieldSchema(name="pk", dtype=DataType.INT64, is_primary=True, auto_id=False),
FieldSchema(name="random", dtype=DataType.DOUBLE),
FieldSchema(name="embeddings", dtype=DataType.FLOAT_VECTOR, dim=dim)
]
schema = CollectionSchema(fields, "hello_milvus is the simplest demo to introduce the APIs")
print(fmt.format("Create collection `hello_milvus`"))
h = Collection("hello_milvus", schema, consistency_level="Strong")
connections.disconnect("default")
return h
def insert_all_batches(self):
with multiprocessing.Pool(processes=12, initializer=_initialize_worker) as pool:
pool.map(self.insert_data, self.batchs)
def run(self, is_new_collection=True):
self.pname = multiprocessing.current_process().name
hello_milvus = self._prepare_collection(is_new_collection)
start_time = time.time()
self.insert_all_batches()
duration = time.time() - start_time
connections.connect()
print(f'Inserted {len(self.batchs)} batches of {num_entities} entities in {duration} seconds, process: {self.pname}')
print(f"Expected num_entities: {len(self.batchs)*num_entities}. \
Acutal num_entites: {hello_milvus.num_entities}, process: {self.pname}")
if __name__ == "__main__":
multiprocessing_insert = MilvusMultiProcessingInsert("hello_milvus", 10)
multiprocessing_insert.run(is_new_collection=False)
This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.
Rotten issues close after 30d of inactivity. Reopen the issue with /reopen
.
keep it
DataCoord flushed this segment, yet DataNode is still receiving insert data of it.
This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.
Rotten issues close after 30d of inactivity. Reopen the issue with /reopen
.
/unstale
any progress on it?
This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.
Rotten issues close after 30d of inactivity. Reopen the issue with /reopen
.
/unstale
This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.
Rotten issues close after 30d of inactivity. Reopen the issue with /reopen
.
@wangting0128 did we see the case concurrent insert causing entity count not match?
@xiaofan-luan The stability of multi-threaded insertion is verified, but the number of data items after the insertion is not verified
DataNode and DC changed a lot after this issue.
Let me try on 2.2 and master to check if reproducing.
Not reproducing in master:
(py10) ~/Scripts/issue17107 python test_mp_insert.py --new
YX: ['test_mp_insert.py', '--new']
Create collection hello_milvus
insert start
Start inserting entities: 0
Start inserting entities: 1
Start inserting entities: 2
Start inserting entities: 3
Start inserting entities: 5
Start inserting entities: 8
Start inserting entities: 9
Start inserting entities: 4
Start inserting entities: 7
Start inserting entities: 6
end inserting entities: 1, insertCnt: 10000, updateCnt: 0, errCnt: 0, num_entities: 82517
end inserting entities: 3, insertCnt: 10000, updateCnt: 0, errCnt: 0, num_entities: 82517
end inserting entities: 9, insertCnt: 10000, updateCnt: 0, errCnt: 0, num_entities: 82517
end inserting entities: 6, insertCnt: 10000, updateCnt: 0, errCnt: 0, num_entities: 82517
end inserting entities: 2, insertCnt: 10000, updateCnt: 0, errCnt: 0, num_entities: 91000
end inserting entities: 4, insertCnt: 10000, updateCnt: 0, errCnt: 0, num_entities: 91000
end inserting entities: 5, insertCnt: 10000, updateCnt: 0, errCnt: 0, num_entities: 91000
end inserting entities: 0, insertCnt: 10000, updateCnt: 0, errCnt: 0, num_entities: 91000
end inserting entities: 7, insertCnt: 10000, updateCnt: 0, errCnt: 0, num_entities: 91000
end inserting entities: 8, insertCnt: 10000, updateCnt: 0, errCnt: 0, num_entities: 91000
insert end 9.528656482696533
final check: 100000
(py10) ~/Scripts/issue17107 python test_mp_insert.py
['test_mp_insert.py']
insert start
Start inserting entities: 0
Start inserting entities: 1
Start inserting entities: 2
Start inserting entities: 3
Start inserting entities: 4
Start inserting entities: 7
Start inserting entities: 8
Start inserting entities: 9
Start inserting entities: 5
Start inserting entities: 6
end inserting entities: 0, insertCnt: 10000, updateCnt: 0, errCnt: 0, num_entities: 182474
end inserting entities: 2, insertCnt: 10000, updateCnt: 0, errCnt: 0, num_entities: 182474
end inserting entities: 3, insertCnt: 10000, updateCnt: 0, errCnt: 0, num_entities: 182474
end inserting entities: 8, insertCnt: 10000, updateCnt: 0, errCnt: 0, num_entities: 182474
end inserting entities: 4, insertCnt: 10000, updateCnt: 0, errCnt: 0, num_entities: 182474
end inserting entities: 1, insertCnt: 10000, updateCnt: 0, errCnt: 0, num_entities: 182474
end inserting entities: 7, insertCnt: 10000, updateCnt: 0, errCnt: 0, num_entities: 182474
end inserting entities: 5, insertCnt: 10000, updateCnt: 0, errCnt: 0, num_entities: 182474
end inserting entities: 9, insertCnt: 10000, updateCnt: 0, errCnt: 0, num_entities: 182474
end inserting entities: 6, insertCnt: 10000, updateCnt: 0, errCnt: 0, num_entities: 182474
insert end 10.268097877502441
final check: 200000
Not reproducing in 2.2.0 either
Cannot reproduce, closing now