corb2 icon indicating copy to clipboard operation
corb2 copied to clipboard

Improve the IO performance of batch (single output file) reporting by managing the IO from a separate thread and single IO write buffer

Open bbandlamudi opened this issue 8 years ago • 6 comments

This is only applicable to ExportBatchToFileTask and corresponding pre-post batch tasks i.e., PreBatchExportFileToTask and PostBatchExportFileToTask.

The current implementation, opens up a new writer for every single task instance and appends the text to the single report file. This implementation has been there for a while and performance is acceptable.

We recently ran some tests with static string buffers on a smaller data sets and found that single flush instead of incremental flushes performance significantly better. This is evident in certain use cases and can make a huge difference in overall performance.

However, we obviously cannot use static string buffers across large volumes of data. So, we need to think of create a BatchWriterWorkerThread backed by a FIFO queue which is responsible for all the writes to single file. The tasks will push the binary content to the FIFO queue and Worker thread continuously writes to buffered writer which automatically flushes when the buffer is full. The corb job is completed a final flush needs to be performed and close the writer.

Tricky items:

  1. Who is going to manage the worker thread? We may not be able to write a simple plugin to existing corb as the worker thread needs to be managed at a higher level than individual tasks.
  2. Who is going to close the worker thread and perform final flush
  3. How do we handle pause/resume/stop/restart etc. seamlessly with out any data loss?
  4. We should be able to write our own worker thread, but it may not be a bad idea to look for 'simple' but robust open source implementations and extend them if possible.

bbandlamudi avatar Nov 03 '17 18:11 bbandlamudi

Sample code below - TaskFactory may need to set this instance to ExportBatchFileToTask and then may be to corresponding pre/post batch tasks as well. May be the instance needs to be managed centrally (not static) and Manager/Monitor needs to ensure/wait for all the pending writes are completely flushed. Also, we probably need to make this optional i.e., use the below thread only if specified.

`package test;

import java.io.BufferedOutputStream; import java.io.File; import java.io.FileOutputStream; import java.io.OutputStream; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue;

public class BatchToFileWriter { protected static final String NEWLINE = System.getProperty("line.separator") != null ? System.getProperty("line.separator") : "\n";

File file;
Queue<String> writeQueue;
Thread writerThread = null;
boolean completed = false;

public BatchToFileWriter(File file) {
    this.file = file;
    writeQueue = new ConcurrentLinkedQueue<String>();
}

public void push(String str) {
    if(str != null && str.length() > 0) {
        writeQueue.add(str);
        startIfRequired();
    }
}

public void completed() {
    completed = true; //will not stop accepting new requests, only helps with timeout.  
}

private void startIfRequired() {
    if(!writeQueue.isEmpty() && (writerThread == null || !writerThread.isAlive())) {
        synchronized(this) {
            if(!writeQueue.isEmpty() && (writerThread == null || !writerThread.isAlive())) {
                writerThread = new Thread(new WriterRunnable(file));
                writerThread.start();
            }
        }
    }
}

public void waitToFinish(long timeout) throws InterruptedException {
    long start = System.currentTimeMillis();
    while(!writeQueue.isEmpty() && System.currentTimeMillis() - start < timeout) {
        startIfRequired();
        Thread.sleep(1L);
    }
}

class WriterRunnable implements Runnable{
    File f;
    long timeout = 1000L; //don't need to wait too long
    public WriterRunnable(File f) {
        this.f = f;
    }
    public void run(){
        try (OutputStream writer = new BufferedOutputStream(new FileOutputStream(f, true))){
            long start = System.currentTimeMillis();
            while(true) {
                if(!writeQueue.isEmpty()) {
                  writer.write(writeQueue.poll().getBytes());
                  writer.write(NEWLINE.getBytes());
                }else if(!completed && System.currentTimeMillis() - start < timeout){
                    Thread.sleep(1L);
                }else {
                   break;
                }
            }
            writer.flush();
        }catch (Exception e) {
            e.printStackTrace();
        }
    }
}

}`

bbandlamudi avatar Nov 04 '17 21:11 bbandlamudi

Hi bandlamudi, I am using the CORB2 utility for unloading the data from Marklogic. This utility helps to convert the documents in delimited file, which is really good. At present I am facing problem with the performance. Is it possible to have something which would work on HADOOP cluster. Similar feature what MLCP (MarkLogic Content PUMP) provides to divert the workload to HADOOP Cluster for distributed computing.

abidzaveri avatar Nov 05 '17 15:11 abidzaveri

hello @abidzaveri, You could subscribe to http://developer.marklogic.com/mailman/listinfo/corb2 as it may be easier to discuss over the email or may be open an new issue for this specific topic.

Please note that MLCP is still the preferred tool for large scale data moment, officially supported by MarkLogic and comes with lots of features. We may not have sufficient bandwidth to add extensive set of new features into corb and we do not prefer to add any features that are already supported by out of the box tools like MLCP or DMSDK framework.

Having said, could you elaborate more on your performance issues you are having and how much data you are trying to export etc. What is the throughput you are getting vs expected? Why/how do you think HADOOP will help with your case and is there something we can address within existing implementation without hadoop as this is purely a performance concern?. May be we can write a custom plugin i.e., implement/extend an existing ExportBatchToFileTask that talks to HADOOP - this should be relatively easy to do and we have had several developers who wrote their own custom tasks.

We would love to hear about news ideas/use cases from the community and if anyone has written extensions to corb that have broader appeal (stable, lightweight and do not interfere with existing features), we may consider adding to the corb in the future. In fact, most of the features added to corb are driven by real-world use cases.

Hadoop support is a great idea and we can think of writing hadoop specific tasks (or consider adding to a corb release if you already have one).

bbandlamudi avatar Nov 05 '17 16:11 bbandlamudi

@hansenmc @vjsaradhi - based on the testing, the current implementation of using a new IO writer for every write and flushing is giving pretty bad performance in windows os. However, on linux systems, it made little difference, though using a single IO writer and letting it flush (with or without background thread with fifo queue) still comes ahead in terms of performance.

We shouldn't stop the current release, but I am shooting this enhancement for v2.4.1. We will make this newer implementation optional as we traditionally do with corb :)

bbandlamudi avatar Nov 06 '17 14:11 bbandlamudi

Sounds good @bbandlamudi.

Another TODO item for us should probably be to document and highlight some of the newer options that enable features which are currently not default, but would improve performance, stability or functionality (improved batch writer, disk-queue, command-file, etc).

hansenmc avatar Nov 06 '17 15:11 hansenmc

@hansenmc - The improved batch writer may only be useful to corb running on windows as on mac/linux the results are pretty close for current implementation vs global writer vs background thread for writes. However, we should still fix it - I am edging towards @vjsaradhi suggestion that we can use a global (only to ExportBatchToFileTask) buffered IOWriter and letting it handle the flushing as it may be easier compared to managing the background thread. We still need to figure out how to do final flush though :)

The performance of global writer is pretty close and actually performs slightly better compared to background thread based implementation in windows (so, windows is bad for multi-threading or windows IO is slower causing more thread contention?).

Note: IO still needs to be synchronized whichever approach we take.

bbandlamudi avatar Nov 06 '17 15:11 bbandlamudi