milvus icon indicating copy to clipboard operation
milvus copied to clipboard

[Bug]: concurrent write data, data loss

Open freefoxcm opened this issue 2 years ago • 1 comments

Is there an existing issue for this?

  • [X] I have searched the existing issues

Environment

- Milvus version:2.1
- Deployment mode(standalone or cluster):cluster
- SDK version(e.g. pymilvus v2.0.0rc2): milvus-sdk-java 2.1.0-beta4
- OS(Ubuntu or CentOS): Centos
- CPU/Memory: 56core 512GB
- GPU: Tesla P4 
- Others: 
Dimensional:1024 Number of fields:3 Volume of data:10000000 BatchSize:1000

Current Behavior

Writing 10 million 1024-dimensional data to milvus concurrently, and writing 1000 data per batch, only 9 million will be written in the end, and some will be lost. If we lower the number of data written to milvus to 10 each time, there will be no data loss again. Test code:https://github.com/freefoxcm/milvus-batchwriting.git

Well, the concurrency is written with akka framework, in my development computer, the concurrency does not go up, probably more than 10 concurrent, also does not seem to have any problems, on the server concurrency up, about 200 + concurrent, it began to appear

Expected Behavior

No response

Steps To Reproduce

No response

Milvus Log

No response

Anything else?

No response

freefoxcm avatar Jul 26 '22 10:07 freefoxcm

/assign @yhmo pls help on investigate

xiaofan-luan avatar Jul 26 '22 13:07 xiaofan-luan

@freefoxcm In your example code: /milvus-batchwriting/src/main/java/org/vectors/milvus/actors/IndexActor.java line67, you ought to check the response of insert api:

            R<MutationResult> insertR = milvusClient.insert(insertParam);
            if(R.Status.Success.getCode() != insertR.getStatus().intValue()) {
                System.out.println("Insert ERROR! " + insertR.getMessage());
                break;
            }

I don't have akka on my local, so I create a thread class to do insert parallelly:

package org.vectors.milvus.actors;

import io.milvus.client.MilvusServiceClient;
import io.milvus.grpc.MutationResult;
import io.milvus.param.R;
import io.milvus.param.dml.InsertParam;
import io.milvus.response.MutationResultWrapper;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;

public class InsertActor implements Runnable {
    private MilvusServiceClient milvusClient;
    private String collectionName;
    private int batchSize;
    private int repeat;
    private int dim;
    private List<Long> ids;
    private List<String> tags;
    private List<List<Float>> vectors;

    public InsertActor(MilvusServiceClient milvusClient, String collectionName, int batchSize, int repeat, int dim) {
        this.milvusClient = milvusClient;
        this.collectionName = collectionName;
        this.batchSize = batchSize;
        this.repeat = repeat;
        this.dim = dim;
        this.ids = generateIDs();
        this.tags = generateTags();
        this.vectors = generateFloatVectors();
    }

    @Override
    public void run() {
        for (int n = 0; n < this.repeat; ++n) {
            long timeStart = System.currentTimeMillis();
            List<InsertParam.Field> fieldsInsert = new ArrayList<>();
            fieldsInsert.add(new InsertParam.Field("id", this.ids));
            fieldsInsert.add(new InsertParam.Field("tags", this.tags));
            fieldsInsert.add(new InsertParam.Field("feature", this.vectors));

            InsertParam insertParam = InsertParam.newBuilder()
                    .withCollectionName(this.collectionName)
                    .withFields(fieldsInsert)
                    .build();

            R<MutationResult> insertR = milvusClient.insert(insertParam);
            if(R.Status.Success.getCode() != insertR.getStatus().intValue()) {
                System.out.println("Insert ERROR! " + insertR.getMessage());
                break;
            }
            long timeEnd = System.currentTimeMillis();
            MutationResultWrapper insertResultWrapper = new MutationResultWrapper(insertR.getData());
            System.out.println(insertResultWrapper.getInsertCount() + " rows inserted. No." + n + ", " + (timeEnd - timeStart) + "ms");
        }
    }

    private List<List<Float>> generateFloatVectors() {
        Random ran = new Random();
        List<List<Float>> vectors = new ArrayList<>();
        for (int n = 0; n < this.batchSize; ++n) {
            List<Float> vector = new ArrayList<>();
            for (int i = 0; i < this.dim; ++i) {
                vector.add(ran.nextFloat());
            }
            vectors.add(vector);
        }

        return vectors;
    }

    private List<Long> generateIDs() {
        List<Long> ids = new ArrayList<>();
        for (int n = 0; n < this.batchSize; ++n) {
            ids.add((long) n);
        }
        return ids;
    }

    private List<String> generateTags() {
        List<String> tags = new ArrayList<>();
        for (int n = 0; n < this.batchSize; ++n) {
            tags.add(String.valueOf(n));
        }
        return tags;
    }
}

Then I test insert by this class:

        int threadCount = 10;
        int batchSize = entityBatchSize;
        int repeat = (int) ((entitySize/threadCount)/batchSize);
        System.out.println(threadCount + " threads," + "each thread do insert " + repeat + " times, each time insert " + batchSize + " vectors");

        List<Thread> threads = new ArrayList<>();
        for (int i = 0; i < threadCount; i++) {
            InsertActor actor = new InsertActor(milvusClient, collectionName, batchSize, repeat, entityDim);
            threads.add(new Thread(actor));
        }

        for (Thread thread : threads) {
            thread.start();
        }

        for (Thread thread : threads) {
            thread.join();
        }

I got exception thrown from insert() api:

ERROR: InsertRequest failed:
IO error: While open a file for random read: /tmp/milvus/rdb_data/000615.sst: Too many open files

This is an exception thrown from RocksDB(integrated by standalone milvus), which means the RocsDB open too menay file descriptors more than the ulimit. If insert() failed, some entities will missed and the row count is less than 10M. I guess your experiment encounter similar problem so that the row count is incorrect. You can check the insert response to know why it failed. If it is failed by "Too many open files", then you need to enlarge the ulimit value.

yhmo avatar Aug 12 '22 03:08 yhmo

ok I'll try

freefoxcm avatar Aug 19 '22 02:08 freefoxcm

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions. Rotten issues close after 30d of inactivity. Reopen the issue with /reopen.

stale[bot] avatar Sep 18 '22 03:09 stale[bot]