parquet-dotnet icon indicating copy to clipboard operation
parquet-dotnet copied to clipboard

does Multithreaded Reads support? how to read fast ?

Open skyyearxp opened this issue 5 years ago • 1 comments

i use parquet-dotnet to read big parquet file, but it seems little slow. read all rowgroups pyarrow take 14 seconds, but parquet-dotnet take 42 seconds.

` private void ReadParquet(string file) { //遍历读取所有rowgroup c# 42秒 python_pyarrow 14秒

        using (var reader = ParquetReader.OpenFromFile(file))
        {
            //var schema = reader.Schema;
            var dataFields = reader.Schema.GetDataFields();
            var cnt = reader.RowGroupCount;

            for (var i = 0; i < cnt; i++)
            {
                using (var groupReader = reader.OpenRowGroupReader(i))
                {
                    var columns = dataFields.Select(groupReader.ReadColumn);

                    foreach(var column in columns)
                    //Parallel.ForEach(columns, column =>
                    {
                        var data = column.Data;
                    }
                    //);

                    //foreach (DataField field in reader.Schema.GetDataFields())
                    //{
                    //    var dataColumn = groupReader.ReadColumn(field);
                    //}
                }
            }
        }

    }`

https://arrow.apache.org/docs/python/parquet.html

Multithreaded Reads Each of the reading functions by default use multi-threading for reading columns in parallel. Depending on the speed of IO and how expensive it is to decode the columns in a particular file (particularly with GZIP compression), this can yield significantly higher data throughput.

skyyearxp avatar May 11 '20 03:05 skyyearxp

I did my own multi-thread implementation. I first split the fields into groups such that I ended up with one group per CPU core.

Example: If your parquet file has 48 fields and you have a 12 thread CPU (E.g. 6 cores with hyper threading), you would have 12 groups with 4 fields each.

And then I run a ParallelAsync.ForEachAsync to start one thread per group. Each thread then has its own ParquetReader that it uses to read the same file in parallel.

List<string> fieldsToLoad = ...
var fieldGroups = new List<(int, List<string>)>();
foreach (List<string> fields in UtilityMethods.Split(fieldsToLoad, (int)(fieldsToLoad.Count / Environment.ProcessorCount)))
{
	fieldGroups.Add((i++, fields));
}

task = ParallelAsync.ForeachAsync(fieldGroups, Environment.ProcessorCount,
	async fieldGroup =>
	{
		await Task.Run(() =>
		{
			using (Stream parquetStream = new FileStream(this.OpenFilePath, FileMode.Open, FileAccess.Read))
			using (var parquetReader = new ParquetReader(parquetStream, new ParquetOptions() { TreatByteArrayAsString = true }))
			{
				DataTable result = UtilityMethods.ParquetReaderToDataTable(parquetReader, fieldGroup.Item2, this.CurrentOffset, this.CurrentMaxRowCount, cancellationToken.Token);
				//Do whatever you want with the data now. 
			}
		});
	});

await task;

mukunku avatar May 12 '21 22:05 mukunku