does Multithreaded Reads support? how to read fast ?
i use parquet-dotnet to read big parquet file, but it seems little slow. read all rowgroups pyarrow take 14 seconds, but parquet-dotnet take 42 seconds.
` private void ReadParquet(string file) { //遍历读取所有rowgroup c# 42秒 python_pyarrow 14秒
using (var reader = ParquetReader.OpenFromFile(file))
{
//var schema = reader.Schema;
var dataFields = reader.Schema.GetDataFields();
var cnt = reader.RowGroupCount;
for (var i = 0; i < cnt; i++)
{
using (var groupReader = reader.OpenRowGroupReader(i))
{
var columns = dataFields.Select(groupReader.ReadColumn);
foreach(var column in columns)
//Parallel.ForEach(columns, column =>
{
var data = column.Data;
}
//);
//foreach (DataField field in reader.Schema.GetDataFields())
//{
// var dataColumn = groupReader.ReadColumn(field);
//}
}
}
}
}`
https://arrow.apache.org/docs/python/parquet.html
Multithreaded Reads Each of the reading functions by default use multi-threading for reading columns in parallel. Depending on the speed of IO and how expensive it is to decode the columns in a particular file (particularly with GZIP compression), this can yield significantly higher data throughput.
I did my own multi-thread implementation. I first split the fields into groups such that I ended up with one group per CPU core.
Example: If your parquet file has 48 fields and you have a 12 thread CPU (E.g. 6 cores with hyper threading), you would have 12 groups with 4 fields each.
And then I run a ParallelAsync.ForEachAsync to start one thread per group. Each thread then has its own ParquetReader that it uses to read the same file in parallel.
List<string> fieldsToLoad = ...
var fieldGroups = new List<(int, List<string>)>();
foreach (List<string> fields in UtilityMethods.Split(fieldsToLoad, (int)(fieldsToLoad.Count / Environment.ProcessorCount)))
{
fieldGroups.Add((i++, fields));
}
task = ParallelAsync.ForeachAsync(fieldGroups, Environment.ProcessorCount,
async fieldGroup =>
{
await Task.Run(() =>
{
using (Stream parquetStream = new FileStream(this.OpenFilePath, FileMode.Open, FileAccess.Read))
using (var parquetReader = new ParquetReader(parquetStream, new ParquetOptions() { TreatByteArrayAsString = true }))
{
DataTable result = UtilityMethods.ParquetReaderToDataTable(parquetReader, fieldGroup.Item2, this.CurrentOffset, this.CurrentMaxRowCount, cancellationToken.Token);
//Do whatever you want with the data now.
}
});
});
await task;