gazelle_plugin
gazelle_plugin copied to clipboard
[ORC] Count queries without projections encounter Preconditions IllegalStateException issue
Describe the bug
Case:
test("count() without group by") {
val df = spark.sql("SELECT count(*) as cnt FROM " +
"item LIMIT 100")
df.explain()
df.show()
}
Error:
Caused by: java.lang.IllegalStateException
at org.apache.arrow.util.Preconditions.checkState(Preconditions.java:443)
at org.apache.arrow.dataset.jni.NativeScanner$1.hasNext(NativeScanner.java:95)
at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:43)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:488)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.sql.execution.datasources.v2.arrow.SparkMemoryUtils$UnsafeItr.hasNext(SparkMemoryUtils.scala:313)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:93)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:173)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:93)
at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:503)
To Reproduce Steps to reproduce the behavior:
Expected behavior A clear and concise description of what you expected to happen.
Additional context Add any other context about the problem here.
Have troubleshooted the cause, as below
From ORC lib source code; https://github.com/apache/orc/blob/22828f79a526069d9629719c9476b7addad91ae6/c%2B%2B/src/Reader.cc#L120-L144
void ColumnSelector::updateSelected(std::vector<bool>& selectedColumns,
const RowReaderOptions& options) {
selectedColumns.assign(static_cast<size_t>(contents->footer->types_size()), false);
if (contents->schema->getKind() == STRUCT && options.getIndexesSet()) {
for(std::list<uint64_t>::const_iterator field = options.getInclude().begin();
field != options.getInclude().end(); ++field) {
updateSelectedByFieldId(selectedColumns, *field);
}
} else if (contents->schema->getKind() == STRUCT && options.getNamesSet()) {
for(std::list<std::string>::const_iterator field = options.getIncludeNames().begin();
field != options.getIncludeNames().end(); ++field) {
updateSelectedByName(selectedColumns, *field);
}
} else if (options.getTypeIdsSet()) {
for(std::list<uint64_t>::const_iterator typeId = options.getInclude().begin();
typeId != options.getInclude().end(); ++typeId) {
updateSelectedByTypeId(selectedColumns, *typeId);
}
} else {
// default is to select all columns
std::fill(selectedColumns.begin(), selectedColumns.end(), true);
}
selectParents(selectedColumns, *contents->schema.get());
selectedColumns[0] = true; // column 0 is selected by default
}
So, for ColumnSelection_NONE will execute "// default is to select all columns"
Comparing parquet empty projection execution, arrow/cpp/src/parquet/arrow/reader.cc:
if (readers.empty()) {
std::cout << "FileReaderImpl::GetRecordBatchReader: if (readers.empty()) :" << std::endl;
// Just generate all batches right now; they're cheap since they have no columns.
int64_t batch_size = properties().batch_size();
std::cout << "FileReaderImpl::GetRecordBatchReader: batch_size :" << batch_size << std::endl;
auto max_sized_batch =
::arrow::RecordBatch::Make(batch_schema, batch_size, ::arrow::ArrayVector{});
::arrow::RecordBatchVector batches;
for (int row_group : row_groups) {
int64_t num_rows = parquet_reader()->metadata()->RowGroup(row_group)->num_rows();
batches.insert(batches.end(), num_rows / batch_size, max_sized_batch);
if (int64_t trailing_rows = num_rows % batch_size) {
batches.push_back(max_sized_batch->Slice(0, trailing_rows));
}
}
*out = ::arrow::internal::make_unique<RowGroupRecordBatchReader>(
::arrow::MakeVectorIterator(std::move(batches)), std::move(batch_schema));
return Status::OK();
}
From the code, the parquet implemented dedicated logic to hand no columns. And parquet num_rows calculation is in arrow side, but the ORC calculation is in ORC lib side as below:
orc lib :c++/src/reader.cc, just from https://github.com/apache/orc/blob/22828f79a526069d9629719c9476b7addad91ae6/c%2B%2B/src/Reader.cc#L913
bool RowReaderImpl::next(ColumnVectorBatch& data) {
if (currentStripe >= lastStripe) {
data.numElements = 0;
if (lastStripe > 0) {
previousRow = firstRowOfStripe[lastStripe - 1] +
footer->stripes(static_cast<int>(lastStripe - 1)).numberofrows();
} else {
previousRow = 0;
}
return false;
}
if (currentRowInStripe == 0) {
startNextStripe();
}
uint64_t rowsToRead =
std::min(static_cast<uint64_t>(data.capacity),
rowsInCurrentStripe - currentRowInStripe);
data.numElements = rowsToRead;
if (enableEncodedBlock) {
reader->nextEncoded(data, rowsToRead, nullptr);
}
else {
reader->next(data, rowsToRead, nullptr);
}
// update row number
previousRow = firstRowOfStripe[currentStripe] + currentRowInStripe;
currentRowInStripe += rowsToRead;
if (currentRowInStripe >= rowsInCurrentStripe) {
currentStripe += 1;
currentRowInStripe = 0;
}
return rowsToRead != 0;
}
@zhouyuan @zhztheplayer
I think I can implement the same logic in arrow side like parquet to avoid modifying orc lib code. At the same time, could we can open a Arrow issue jira?