gazelle_plugin icon indicating copy to clipboard operation
gazelle_plugin copied to clipboard

[ORC] Count queries without projections encounter Preconditions IllegalStateException issue

Open zhixingheyi-tian opened this issue 3 years ago • 2 comments

Describe the bug

Case:

 test("count() without group by") {
    val df = spark.sql("SELECT count(*) as cnt FROM " +
        "item LIMIT 100")
    df.explain()
    df.show()
  }

Error:

Caused by: java.lang.IllegalStateException
	at org.apache.arrow.util.Preconditions.checkState(Preconditions.java:443)
	at org.apache.arrow.dataset.jni.NativeScanner$1.hasNext(NativeScanner.java:95)
	at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:43)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:488)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.sql.execution.datasources.v2.arrow.SparkMemoryUtils$UnsafeItr.hasNext(SparkMemoryUtils.scala:313)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:93)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:173)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:93)
	at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:503)

To Reproduce Steps to reproduce the behavior:

Expected behavior A clear and concise description of what you expected to happen.

Additional context Add any other context about the problem here.

zhixingheyi-tian avatar Nov 19 '21 08:11 zhixingheyi-tian

Have troubleshooted the cause, as below

From ORC lib source code; https://github.com/apache/orc/blob/22828f79a526069d9629719c9476b7addad91ae6/c%2B%2B/src/Reader.cc#L120-L144

void ColumnSelector::updateSelected(std::vector<bool>& selectedColumns,
                                      const RowReaderOptions& options) {
    selectedColumns.assign(static_cast<size_t>(contents->footer->types_size()), false);
    if (contents->schema->getKind() == STRUCT && options.getIndexesSet()) {
      for(std::list<uint64_t>::const_iterator field = options.getInclude().begin();
          field != options.getInclude().end(); ++field) {
        updateSelectedByFieldId(selectedColumns, *field);
      }
    } else if (contents->schema->getKind() == STRUCT && options.getNamesSet()) {
      for(std::list<std::string>::const_iterator field = options.getIncludeNames().begin();
          field != options.getIncludeNames().end(); ++field) {
        updateSelectedByName(selectedColumns, *field);
      }
    } else if (options.getTypeIdsSet()) {
      for(std::list<uint64_t>::const_iterator typeId = options.getInclude().begin();
          typeId != options.getInclude().end(); ++typeId) {
        updateSelectedByTypeId(selectedColumns, *typeId);
      }
    } else {
      // default is to select all columns
      std::fill(selectedColumns.begin(), selectedColumns.end(), true);
    }
    selectParents(selectedColumns, *contents->schema.get());
    selectedColumns[0] = true; // column 0 is selected by default
  }

So, for ColumnSelection_NONE will execute "// default is to select all columns"

Comparing parquet empty projection execution, arrow/cpp/src/parquet/arrow/reader.cc:

if (readers.empty()) {
    std::cout <<  "FileReaderImpl::GetRecordBatchReader: if (readers.empty()) :"   << std::endl;
    // Just generate all batches right now; they're cheap since they have no columns.
    int64_t batch_size = properties().batch_size();
    std::cout <<  "FileReaderImpl::GetRecordBatchReader: batch_size :"  << batch_size << std::endl;
    auto max_sized_batch =
        ::arrow::RecordBatch::Make(batch_schema, batch_size, ::arrow::ArrayVector{});

    ::arrow::RecordBatchVector batches;

    for (int row_group : row_groups) {
      int64_t num_rows = parquet_reader()->metadata()->RowGroup(row_group)->num_rows();

      batches.insert(batches.end(), num_rows / batch_size, max_sized_batch);

      if (int64_t trailing_rows = num_rows % batch_size) {
        batches.push_back(max_sized_batch->Slice(0, trailing_rows));
      }
    }

    *out = ::arrow::internal::make_unique<RowGroupRecordBatchReader>(
        ::arrow::MakeVectorIterator(std::move(batches)), std::move(batch_schema));

    return Status::OK();
  }

From the code, the parquet implemented dedicated logic to hand no columns. And parquet num_rows calculation is in arrow side, but the ORC calculation is in ORC lib side as below:

orc lib :c++/src/reader.cc, just from https://github.com/apache/orc/blob/22828f79a526069d9629719c9476b7addad91ae6/c%2B%2B/src/Reader.cc#L913

 bool RowReaderImpl::next(ColumnVectorBatch& data) {
    if (currentStripe >= lastStripe) {
      data.numElements = 0;
      if (lastStripe > 0) {
        previousRow = firstRowOfStripe[lastStripe - 1] +
          footer->stripes(static_cast<int>(lastStripe - 1)).numberofrows();
      } else {
        previousRow = 0;
      }
      return false;
    }
    if (currentRowInStripe == 0) {
      startNextStripe();
    }
    uint64_t rowsToRead =
      std::min(static_cast<uint64_t>(data.capacity),
               rowsInCurrentStripe - currentRowInStripe);
    data.numElements = rowsToRead;
    if (enableEncodedBlock) {
      reader->nextEncoded(data, rowsToRead, nullptr);
    }
    else {
      reader->next(data, rowsToRead, nullptr);
    }
    // update row number
    previousRow = firstRowOfStripe[currentStripe] + currentRowInStripe;
    currentRowInStripe += rowsToRead;
    if (currentRowInStripe >= rowsInCurrentStripe) {
      currentStripe += 1;
      currentRowInStripe = 0;
    }
    return rowsToRead != 0;
  }

zhixingheyi-tian avatar Nov 24 '21 06:11 zhixingheyi-tian

@zhouyuan @zhztheplayer

I think I can implement the same logic in arrow side like parquet to avoid modifying orc lib code. At the same time, could we can open a Arrow issue jira?

zhixingheyi-tian avatar Nov 24 '21 06:11 zhixingheyi-tian