GH-43124: [C++] Initialize offset vector head as 0 after memory allocated in grouper.cc
bug description
In grouper.cc:780, memory is allocated for offset vector of varlen column, however the vector is initialized in encoder_.DecodeFixedLengthBuffers, which will never be called when num_groups==0(see line 786). Then fixedlen_bufs[i][0] will be a uninitialized value that means a random uint32_t. Later this random uint32_t is used to AllocatePaddedBuffer(varlen_size). However an random uint32_t is up to 4GB memory, the program may run normally without being affected.
how to fix
set offset vector head as 0 after memory allocated in case it won't be initialized when num_groups==0
- GitHub Issue: #43124
Thanks for opening a pull request!
If this is not a minor PR. Could you open an issue for this pull request on GitHub? https://github.com/apache/arrow/issues/new/choose
Opening GitHub issues ahead of time contributes to the Openness of the Apache Arrow project.
Then could you also rename the pull request title in the following format?
GH-${GITHUB_ISSUE_ID}: [${COMPONENT}] ${SUMMARY}
or
MINOR: [${COMPONENT}] ${SUMMARY}
In the case of PARQUET issues on JIRA the title also supports:
PARQUET-${JIRA_ISSUE_ID}: [${COMPONENT}] ${SUMMARY}
See also:
Thanks for the contribution! It would be better if having a test to reproduce the problem?
@mapleFU test code
#include <gtest/gtest.h>
#include <climits>
#include <iostream>
#include <cstdio>
#include <cstdlib>
#include <ctime>
#include <arrow/ipc/writer.h>
#include <arrow/ipc/reader.h>
#include <arrow/api.h>
#include <arrow/buffer.h>
#include <arrow/io/memory.h>
#include <arrow/ipc/dictionary.h>
#include <arrow/result.h>
#include <arrow/compute/function.h>
#include "arrow/compute/api_aggregate.h"
#include "arrow/compute/api_scalar.h"
#include "arrow/compute/api_vector.h"
#include "arrow/compute/cast.h"
#include "arrow/compute/function_internal.h"
#include "arrow/compute/kernel.h"
#include "arrow/compute/registry.h"
#include <arrow/acero/options.h>
#include <arrow/acero/exec_plan.h>
#include <arrow/status.h>
#include <vector>
#include "arrow/type_fwd.h"
#include <chrono>
#include <unistd.h>
int main(int argc, char* argv[])
{
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
inline void CHECK_OK(const arrow::Status status,const std::string& msg){
if(!status.ok()) {
std::cout<< "status not ok! msg: " << msg << std::endl;
exit(-1);
}
}
namespace arrow_test {
const int const_build_len = 1030;
const int const_match_keys = 11;
const int const_batch_size = 100000;
int basic_test_len = 400 * 1024;
const std::string long_prefix = "123456";
const std::string short_prefix = "short_prefix_";
const std::string key_prefix = long_prefix;
uint64_t milliseconds_now(){
auto now = std::chrono::system_clock::now();
// 将时间戳转换为毫秒数
auto now_ms = std::chrono::time_point_cast<std::chrono::milliseconds>(now);
auto value = now_ms.time_since_epoch().count();
return value;
}
class SourceReader : public arrow::RecordBatchReader {
private:
std::shared_ptr<arrow::Schema> schema_;
int64_t batch_size_;
int64_t batch_count_ = 0;
int64_t total_len_ = 0;
std::string prefix_;
int64_t offset_;
bool with_null_;
public:
std::string loooooong_prefix;
SourceReader(const std::string& prefix,
std::vector<std::shared_ptr<arrow::DataType>>& types,
int64_t batch_size,
int total_len,
bool with_null = false,
std::string prefix_binary = long_prefix)
: batch_size_{batch_size},
prefix_(prefix),
total_len_(total_len),
offset_(0),
with_null_(with_null),
loooooong_prefix(prefix_binary) {
int i = 0;
std::vector<std::shared_ptr<arrow::Field>> fields;
for(std::shared_ptr<arrow::DataType> &type : types){
std::string field_name = prefix + "_" + std::to_string(i);
fields.emplace_back(arrow::field(field_name, type));
++i;
}
schema_ = std::make_shared<arrow::Schema>(fields);
}
std::shared_ptr<arrow::Schema> schema() const override { return schema_; }
// 每次吐一个recordBatch, 流式执行
arrow::Status ReadNext(std::shared_ptr<arrow::RecordBatch>* out) override {
// 模拟rocksdb iterator, 每扫描batch_size_行构建一个recordBatch, 流式执行,吐给下游执行
if (total_len_ - offset_ <= 0) {
out->reset();
return arrow::Status::OK();
}
int64_t batch_size = std::min(batch_size_, total_len_ - offset_);
auto status = build_row_batch(batch_size, out);
CHECK_OK(status, "next batch: ");
offset_ += batch_size;
std::cout << prefix_ << ": " << offset_ << std::endl;
return arrow::Status::OK();
}
private:
arrow::Status build_row_batch(int64_t batch_size, std::shared_ptr<arrow::RecordBatch>* out){
arrow::Status status;
std::vector<std::shared_ptr<arrow::Array>> arrays(schema_->num_fields());
for(auto array: arrays){
array = nullptr;
}
for(int i = 0 ; i < schema_->num_fields(); ++i){
status = build_column(batch_size, arrays[i], i);
CHECK_OK(status, "build_row_batch: ");
}
*out = arrow::RecordBatch::Make(schema_, batch_size, std::move(arrays));
return arrow::Status::OK();
}
arrow::Status build_large_binary_key_col(
std::unique_ptr<arrow::ArrayBuilder>& builder,
int64_t batch_size,
std::shared_ptr<arrow::Array>& output){
auto type_builder = dynamic_cast<arrow::LargeBinaryBuilder*>((builder).get());
for(size_t i = 0 ; i < batch_size ; ++i){
arrow::Status status;
if(with_null_ && i % 10 == 9){
status = type_builder->AppendNull();
}else{
status = type_builder->Append(key_prefix + std::to_string(offset_ + i));
}
CHECK_OK(status, "build_key_column: Append: row: " + std::to_string(offset_ + i));
}
auto status = builder->Finish(&output);
return status;
}
arrow::Status build_binary_key_col(
std::unique_ptr<arrow::ArrayBuilder>& builder,
int64_t batch_size,
std::shared_ptr<arrow::Array>& output){
auto type_builder = dynamic_cast<arrow::BinaryBuilder*>((builder).get());
for(size_t i = 0 ; i < batch_size ; ++i){
arrow::Status status;
if(with_null_ && i % 10 == 9){
status = type_builder->AppendNull();
}else{
status = type_builder->Append(key_prefix + std::to_string(offset_ + i));
}
CHECK_OK(status, "build_key_column: Append: row: " + std::to_string(offset_ + i));
}
auto status = builder->Finish(&output);
return status;
}
template<typename T>
arrow::Status build(std::unique_ptr<arrow::ArrayBuilder>& builder, int64_t batch_size, std::shared_ptr<arrow::Array>& output){
auto type_builder = dynamic_cast<T*>(builder.get());
for(size_t i = 0 ; i < batch_size ; ++i){
auto status = type_builder->Append(static_cast<typename T::value_type>(offset_ + i));
CHECK_OK(status, "build_key_column: Append: row: " + std::to_string(offset_ + i));
}
auto status = builder->Finish(&output);
return status;
}
arrow::Status build_column(int64_t batch_size, std::shared_ptr<arrow::Array>& array, int col_index);
};
template< >
arrow::Status SourceReader::build<arrow::BinaryBuilder>(
std::unique_ptr<arrow::ArrayBuilder>& builder,
int64_t batch_size,
std::shared_ptr<arrow::Array>& output){
auto type_builder = dynamic_cast<arrow::BinaryBuilder*>((builder).get());
for(size_t i = 0 ; i < batch_size ; ++i){
arrow::Status status;
if(with_null_ && i % 10 == 9){
status = type_builder->AppendNull();
}else{
status = type_builder->Append(loooooong_prefix + std::to_string(offset_ + i));
}
}
auto status = builder->Finish(&output);
return status;
}
template< >
arrow::Status SourceReader::build<arrow::LargeBinaryBuilder>(
std::unique_ptr<arrow::ArrayBuilder>& builder,
int64_t batch_size,
std::shared_ptr<arrow::Array>& output){
auto type_builder = dynamic_cast<arrow::LargeBinaryBuilder *>((builder).get());
for(size_t i = 0 ; i < batch_size ; ++i){
arrow::Status status;
if(with_null_ && i % 10 == 9){
status = type_builder->AppendNull();
}else{
status = type_builder->Append(loooooong_prefix + std::to_string(offset_ + i));
}
CHECK_OK(status, "build_key_column: Append: row: " + std::to_string(offset_ + i));
}
auto status = builder->Finish(&output);
return status;
}
arrow::Status SourceReader::build_column(int64_t batch_size, std::shared_ptr<arrow::Array>& array, int col_index) {
auto type = schema_->field(col_index)->type();
std::unique_ptr<arrow::ArrayBuilder> builder = nullptr;
arrow::MakeBuilder(arrow::default_memory_pool(), type, &builder);
if(col_index == 0){
if(type->id() == arrow::Type::type::BINARY){
return build_binary_key_col(builder, batch_size, array);
}else if(type->id() == arrow::Type::type::LARGE_BINARY){
return build_large_binary_key_col(builder, batch_size, array);
}
}
auto typeId = type->id();
switch(typeId){
case arrow::Type::type::INT32:
return build<arrow::Int32Builder>(builder, batch_size, array);
case arrow::Type::type::UINT64:
return build<arrow::UInt64Builder>(builder, batch_size, array);
case arrow::Type::type::INT64:
return build<arrow::Int64Builder>(builder, batch_size, array);
case arrow::Type::type::FLOAT:
return build<arrow::FloatBuilder>(builder, batch_size, array);
case arrow::Type::type::DOUBLE:
return build<arrow::DoubleBuilder>(builder, batch_size, array);
case arrow::Type::type::BINARY:
return build<arrow::BinaryBuilder>(builder, batch_size, array);
case arrow::Type::type::LARGE_BINARY:
return build<arrow::LargeBinaryBuilder>(builder, batch_size, array);
default:
return arrow::Status::Invalid("unsupport type");
}
}
void run_agg_without_join_key_as_binary(int table_len, bool is_large_binary = false, bool check_result = true) {
auto cup_e = arrow::internal::GetCpuThreadPool();
// 三列,第一列是id,后两列为4kb字符串(> 4kb)
std::vector<std::shared_ptr<arrow::DataType>> types{arrow::binary(), arrow::int64()};
std::shared_ptr<SourceReader> reader = std::make_shared<SourceReader>("table", types, const_batch_size, table_len);
arrow::acero::Declaration data_source{"record_batch_reader_source", arrow::acero::RecordBatchReaderSourceNodeOptions(reader)};
std::vector<arrow::compute::Aggregate> aggregates;
aggregates.emplace_back("hash_max" , /*options*/nullptr, arrow::FieldRef("table_1"), /*new field name*/"col1");
// aggregates.emplace_back("hash_max" , /*options*/nullptr, arrow::FieldRef("table_2"), /*new field name*/"col2");
std::vector<arrow::FieldRef> group_by_fields;
group_by_fields.emplace_back(arrow::FieldRef("table_0"));
arrow::acero::AggregateNodeOptions agg_options{aggregates, group_by_fields};
arrow::acero::Declaration agg{"aggregate", {std::move(data_source)}, std::move(agg_options)};
uint64_t start_time = milliseconds_now();
auto result = arrow::acero::DeclarationToTable(std::move(agg), /*use_threads=*/true);
uint64_t end_time = milliseconds_now();
auto final_table = result.ValueOrDie();
}
TEST(test_arrow_vector_execute, group_basic_key_as_binary){
// this loop is necessary to replay the bug
for (int i = 0 ; i < 10; ++i) {
run_agg_without_join_key_as_binary(/* table_len = */0);
}
}
}
varlen_size may be random uint32_t like this:
:warning: GitHub issue #43124 has been automatically assigned in GitHub to PR creator.
@zanmato1984 Would you like to give this a look?
Yes. I'll take a look.
Hi @flashzxi , I think this is a good fix. I put some comments a while ago.
Are you still willing to move on with it? Or is there anything I can do to help? Thanks.
Hi @flashzxi , I've created a PR in your fork https://github.com/flashzxi/arrow/pull/1 which includes my test to reproduce the issue and verify the fix. Could you merge it? Thanks.
Or at some old friends @pitrou @assignUser , is there a way I can directly make changes to this PR in case the author doesn't respond? I am a committer now and have write access but still didn't figure out how to do that. Thanks.
https://github.com/apache/arrow/pull/43123/commits/bf2aff0822764279eb6834c3346c7c4e684d0b2b
Didn't you updated this successfully?
Hi @flashzxi , I've created a PR in your fork flashzxi#1 which includes my test to reproduce the issue and verify the fix. Could you merge it? Thanks.
Or at some old friends @pitrou @assignUser , is there a way I can directly make changes to this PR in case the author doesn't respond? I am a committer now and have write access but still didn't figure out how to do that. Thanks.
Oops, just made it using push -f. Then @pitrou would you please help to review the test? Thanks!
Didn't you updated this successfully?
That was my other suggestion made directly on the website. But never mind :)
Just for completeness sake: Yes committers can push to PRs from third parties as long as they have set the 'Maintainers are allowed to edit this pull request.' option (which is on by default).
Your process here with multiple pings before you took over the PR looks good :+1:
Kindly ping @pitrou . I'm willing to approve this but I still need someone else to review the test I added to this PR. Could you help? Appreciate it.
@github-actions crossbow submit -g cpp
Revision: 8873730bb8895ca5d992af00539a8da699dda49e
Submitted crossbow builds: ursacomputing/crossbow @ actions-a3633911e3
+1, thanks a lot for picking this up @zanmato1984
Thank you for looking! I'll merge this after the CI and Crossbow are good. (And will try the PR merge script for my first time :) )
Seems all good. Merging.
After merging your PR, Conbench analyzed the 3 benchmarking runs that have been run so far on merge-commit 1302889c01bfbd04dc2dca4995078d2eca9311cd.
There were no benchmark performance regressions. 🎉
The full Conbench report has more details. It also includes information about 6 possible false positives for unstable benchmarks that are known to sometimes produce them.