arrow
arrow copied to clipboard
ARROW-17619: [C++][Parquet] Add DELTA_BYTE_ARRAY encoder to Parquet writer
This is to add DELTA_BYTE_ARRAY encoder.
https://issues.apache.org/jira/browse/ARROW-17619
This is not really review ready yet and needs https://github.com/apache/arrow/pull/14191 and https://github.com/apache/arrow/pull/14293 to merge first.
note: this should support BYTE_ARRAY and FIXED_LEN_BYTE_ARRAY. PARQUET-2231
- Closes: #32863
(Can this patch go ahead now?)
Personally, I think we can make it easier, like using PutImpl(std::vector<ByteArray>, int num_values). When Put with arrow, we can extract it into it.
Or we can:
template <typename ParquetT>
struct ParquetIndirectType {
static const uint8_t* getPtr(ParquetT object) noexcept;
static int getLen(const parquet::ColumnDescriptor* descr, ParquetT object) noexcept;
};
template <>
struct ParquetIndirectType<parquet::ByteArray> {
static const uint8_t* getPtr(parquet::ByteArray object) noexcept {
return object.ptr;
}
static int getLen([[maybe_unused]] const parquet::ColumnDescriptor* descr, parquet::ByteArray object) noexcept {
return object.len;
}
};
template <>
struct ParquetIndirectType<parquet::FixedLenByteArray> {
static const uint8_t* getPtr(parquet::FixedLenByteArray object) noexcept {
return object.ptr;
}
static int getLen(
const parquet::ColumnDescriptor* descr, [[maybe_unused]] parquet::FixedLenByteArray object) noexcept {
return descr->type_length();
}
};
And use template to just write one part of the code. But it may have worse performance than your impl if compiler doesn't work well. The current skeleton is ok to me
(We can write a simple and slow one, and optimize it later?)
Seems the skeleton look great now. I think we can extract Put because now we have 4 Put(...) implemention now...
Seems the skeleton look great now. I think we can extract
Putbecause now we have 4Put(...)implemention now...
Will do first thing tomorrow!
@mapleFU I've refactored to templates. Could you please do another pass?
I've update a testing here, as an update for https://github.com/apache/arrow/pull/14341#discussion_r1149520662
TEST(DeltaByteArrayEncodingAdHoc, ArrowDirectPut) {
auto CheckEncode = [](std::shared_ptr<::arrow::Array> values,
std::shared_ptr<::arrow::Array> prefix_lengths,
std::shared_ptr<::arrow::Array> suffix_lengths,
std::string_view value) {
auto encoder = MakeTypedEncoder<ByteArrayType>(Encoding::DELTA_BYTE_ARRAY);
ASSERT_NO_THROW(encoder->Put(*values));
auto buf = encoder->FlushValues();
auto prefix_lengths_encoder =
MakeTypedEncoder<Int32Type>(Encoding::DELTA_BINARY_PACKED);
ASSERT_NO_THROW(prefix_lengths_encoder->Put(*prefix_lengths));
auto prefix_lengths_buf = prefix_lengths_encoder->FlushValues();
auto encoded_prefix_lengths_buf = SliceBuffer(buf, 0, prefix_lengths_buf->size());
auto suffix_lengths_encoder =
MakeTypedEncoder<Int32Type>(Encoding::DELTA_BINARY_PACKED);
ASSERT_NO_THROW(suffix_lengths_encoder->Put(*suffix_lengths));
auto suffix_lengths_buf = suffix_lengths_encoder->FlushValues();
auto encoded_values_buf =
SliceBuffer(buf, prefix_lengths_buf->size() + suffix_lengths_buf->size());
auto encoded_prefix_length_buf = SliceBuffer(buf, 0, prefix_lengths_buf->size());
EXPECT_TRUE(prefix_lengths_buf->Equals(*encoded_prefix_length_buf));
auto encoded_suffix_length_buf =
SliceBuffer(buf, prefix_lengths_buf->size(), suffix_lengths_buf->size());
EXPECT_TRUE(suffix_lengths_buf->Equals(*encoded_suffix_length_buf));
EXPECT_EQ(value, encoded_values_buf->ToString());
};
auto arrayToI32 = [](const std::shared_ptr<::arrow::Array>& lengths) {
std::vector<int32_t> arrays;
auto data_ptr = checked_cast<::arrow::Int32Array*>(lengths.get());
for (int i = 0; i < lengths->length(); ++i) {
arrays.push_back(data_ptr->GetView(i));
}
return arrays;
};
auto CheckDecode = [](std::shared_ptr<Buffer> buf,
std::shared_ptr<::arrow::Array> values) {
int num_values = static_cast<int>(values->length());
auto decoder = MakeTypedDecoder<ByteArrayType>(Encoding::DELTA_BYTE_ARRAY);
decoder->SetData(num_values, buf->data(), static_cast<int>(buf->size()));
typename EncodingTraits<ByteArrayType>::Accumulator acc;
if (::arrow::is_string(values->type()->id())) {
acc.builder = std::make_unique<::arrow::StringBuilder>();
} else {
acc.builder = std::make_unique<::arrow::BinaryBuilder>();
}
ASSERT_EQ(num_values,
decoder->DecodeArrow(static_cast<int>(values->length()),
static_cast<int>(values->null_count()),
values->null_bitmap_data(), values->offset(), &acc));
std::shared_ptr<::arrow::Array> result;
ASSERT_OK(acc.builder->Finish(&result));
ASSERT_EQ(num_values, result->length());
ASSERT_OK(result->ValidateFull());
auto upcast_result = CastBinaryTypesHelper(result, values->type());
::arrow::AssertArraysEqual(*values, *upcast_result);
};
auto checkEncodeDecode = [&](std::string_view values,
std::shared_ptr<::arrow::Array> prefix_lengths,
std::shared_ptr<::arrow::Array> suffix_lengths,
std::string_view suffix_data) {
CheckEncode(::arrow::ArrayFromJSON(::arrow::utf8(), values), prefix_lengths,
suffix_lengths, suffix_data);
CheckEncode(::arrow::ArrayFromJSON(::arrow::large_utf8(), values), prefix_lengths,
suffix_lengths, suffix_data);
CheckEncode(::arrow::ArrayFromJSON(::arrow::binary(), values), prefix_lengths,
suffix_lengths, suffix_data);
CheckEncode(::arrow::ArrayFromJSON(::arrow::large_binary(), values), prefix_lengths,
suffix_lengths, suffix_data);
auto encoded = ::arrow::ConcatenateBuffers({DeltaEncode(arrayToI32(prefix_lengths)),
DeltaEncode(arrayToI32(suffix_lengths)),
std::make_shared<Buffer>(suffix_data)})
.ValueOrDie();
CheckDecode(encoded, ::arrow::ArrayFromJSON(::arrow::utf8(), values));
CheckDecode(encoded, ::arrow::ArrayFromJSON(::arrow::large_utf8(), values));
CheckDecode(encoded, ::arrow::ArrayFromJSON(::arrow::binary(), values));
CheckDecode(encoded, ::arrow::ArrayFromJSON(::arrow::large_binary(), values));
};
{
auto values = R"(["axis", "axle", "babble", "babyhood"])";
auto prefix_lengths = ::arrow::ArrayFromJSON(::arrow::int32(), R"([0, 2, 0, 3])");
auto suffix_lengths = ::arrow::ArrayFromJSON(::arrow::int32(), R"([4, 2, 6, 5])");
constexpr std::string_view suffix_data = "axislebabbleyhood";
checkEncodeDecode(values, prefix_lengths, suffix_lengths, suffix_data);
}
{
auto values = R"(["axis", "axis", "axis", "axis"])";
auto prefix_lengths = ::arrow::ArrayFromJSON(::arrow::int32(), R"([0, 4, 4, 4])");
auto suffix_lengths = ::arrow::ArrayFromJSON(::arrow::int32(), R"([4, 0, 0, 0])");
constexpr std::string_view suffix_data = "axis";
checkEncodeDecode(values, prefix_lengths, suffix_lengths, suffix_data);
}
{
auto values = R"(["axisba", "axis", "axis", "axis"])";
auto prefix_lengths = ::arrow::ArrayFromJSON(::arrow::int32(), R"([0, 4, 4, 4])");
auto suffix_lengths = ::arrow::ArrayFromJSON(::arrow::int32(), R"([6, 0, 0, 0])");
constexpr std::string_view suffix_data = "axisba";
checkEncodeDecode(values, prefix_lengths, suffix_lengths, suffix_data);
}
{
auto values = R"(["baaxis", "axis", "axis", "axis"])";
auto prefix_lengths = ::arrow::ArrayFromJSON(::arrow::int32(), R"([0, 0, 4, 4])");
auto suffix_lengths = ::arrow::ArrayFromJSON(::arrow::int32(), R"([6, 4, 0, 0])");
constexpr std::string_view suffix_data = "baaxisaxis";
checkEncodeDecode(values, prefix_lengths, suffix_lengths, suffix_data);
}
}
Thanks for @mapleFU. I used your suggestion.
Please do not forget to update parquet.rst for the new encoder implementation.
Thanks for the review @wgtmac !
The ASAN UBSAN check is failed: https://github.com/apache/arrow/actions/runs/4616021690/jobs/8160523964?pr=14341
[----------] 3 tests from DeltaByteArrayEncodingAdHoc
[ RUN ] DeltaByteArrayEncodingAdHoc.ArrowBinaryDirectPut
/arrow/cpp/src/arrow/buffer_builder.h:138:27: runtime error: null pointer passed as argument 2, which is declared to never be null
/usr/include/string.h:44:28: note: nonnull attribute specified here
#0 0x559f81bab658 in arrow::BufferBuilder::UnsafeAppend(void const*, long) /arrow/cpp/src/arrow/buffer_builder.h:138:5
#1 0x7fecaef96580 in parquet::(anonymous namespace)::DeltaLengthByteArrayEncoder<parquet::PhysicalType<(parquet::Type::type)6> >::Put(parquet::ByteArray const*, int) /arrow/cpp/src/parquet/encoding.cc:2719:11
#2 0x7fecaefc4218 in void parquet::(anonymous namespace)::DeltaByteArrayEncoder<parquet::PhysicalType<(parquet::Type::type)6> >::PutBinaryArray<arrow::BinaryArray>(arrow::BinaryArray const&)::'lambda'(std::basic_string_view<char, std::char_traits<char> >)::operator()(std::basic_string_view<char, std::char_traits<char> >) const /arrow/cpp/src/parquet/encoding.cc:3149:5
#3 0x7fecaefc3046 in arrow::Status arrow::internal::ArraySpanInlineVisitor<arrow::BinaryType, void>::VisitStatus<void parquet::(anonymous namespace)::DeltaByteArrayEncoder<parquet::PhysicalType<(parquet::Type::type)6> >::PutBinaryArray<arrow::BinaryArray>(arrow::BinaryArray const&)::'lambda'(std::basic_string_view<char, std::char_traits<char> >), void parquet::(anonymous namespace)::DeltaByteArrayEncoder<parquet::PhysicalType<(parquet::Type::type)6> >::PutBinaryArray<arrow::BinaryArray>(arrow::BinaryArray const&)::'lambda'()>(arrow::ArraySpan const&, arrow::BinaryArray&&, void parquet::(anonymous namespace)::DeltaByteArrayEncoder<parquet::PhysicalType<(parquet::Type::type)6> >::PutBinaryArray<arrow::BinaryArray>(arrow::BinaryArray const&)::'lambda'()&&)::'lambda'(long)::operator()(long) const /arrow/cpp/src/arrow/visit_data_inline.h:109:18
#4 0x7fecaefc2090 in arrow::Status arrow::internal::VisitBitBlocks<arrow::Status arrow::internal::ArraySpanInlineVisitor<arrow::BinaryType, void>::VisitStatus<void parquet::(anonymous namespace)::DeltaByteArrayEncoder<parquet::PhysicalType<(parquet::Type::type)6> >::PutBinaryArray<arrow::BinaryArray>(arrow::BinaryArray const&)::'lambda'(std::basic_string_view<char, std::char_traits<char> >), void parquet::(anonymous namespace)::DeltaByteArrayEncoder<parquet::PhysicalType<(parquet::Type::type)6> >::PutBinaryArray<arrow::BinaryArray>(arrow::BinaryArray const&)::'lambda'()>(arrow::ArraySpan const&, arrow::BinaryArray&&, void parquet::(anonymous namespace)::DeltaByteArrayEncoder<parquet::PhysicalType<(parquet::Type::type)6> >::PutBinaryArray<arrow::BinaryArray>(arrow::BinaryArray const&)::'lambda'()&&)::'lambda'(long), arrow::Status arrow::internal::ArraySpanInlineVisitor<arrow::BinaryType, void>::VisitStatus<void parquet::(anonymous namespace)::DeltaByteArrayEncoder<parquet::PhysicalType<(parquet::Type::type)6> >::PutBinaryArray<arrow::BinaryArray>(arrow::BinaryArray const&)::'lambda'(std::basic_string_view<char, std::char_traits<char> >), void parquet::(anonymous namespace)::DeltaByteArrayEncoder<parquet::PhysicalType<(parquet::Type::type)6> >::PutBinaryArray<arrow::BinaryArray>(arrow::BinaryArray const&)::'lambda'()>(arrow::ArraySpan const&, arrow::BinaryArray&&, void parquet::(anonymous namespace)::DeltaByteArrayEncoder<parquet::PhysicalType<(parquet::Type::type)6> >::PutBinaryArray<arrow::BinaryArray>(arrow::BinaryArray const&)::'lambda'()&&)::'lambda'()>(unsigned char const*, long, long, arrow::BinaryArray&&, void parquet::(anonymous namespace)::DeltaByteArrayEncoder<parquet::PhysicalType<(parquet::Type::type)6> >::PutBinaryArray<arrow::BinaryArray>(arrow::BinaryArray const&)::'lambda'()&&) /arrow/cpp/src/arrow/util/bit_block_counter.h:445:11
#5 0x7fecaefc1072 in arrow::Status arrow::internal::ArraySpanInlineVisitor<arrow::BinaryType, void>::VisitStatus<void parquet::(anonymous namespace)::DeltaByteArrayEncoder<parquet::PhysicalType<(parquet::Type::type)6> >::PutBinaryArray<arrow::BinaryArray>(arrow::BinaryArray const&)::'lambda'(std::basic_string_view<char, std::char_traits<char> >), void parquet::(anonymous namespace)::DeltaByteArrayEncoder<parquet::PhysicalType<(parquet::Type::type)6> >::PutBinaryArray<arrow::BinaryArray>(arrow::BinaryArray const&)::'lambda'()>(arrow::ArraySpan const&, arrow::BinaryArray&&, void parquet::(anonymous namespace)::DeltaByteArrayEncoder<parquet::PhysicalType<(parquet::Type::type)6> >::PutBinaryArray<arrow::BinaryArray>(arrow::BinaryArray const&)::'lambda'()&&) /arrow/cpp/src/arrow/visit_data_inline.h:103:12
#6 0x7fecaefc0415 in std::enable_if<std::is_same<decltype(return_type_impl(&(std::decay<void parquet::(anonymous namespace)::DeltaByteArrayEncoder<parquet::PhysicalType<(parquet::Type::type)6> >::PutBinaryArray<arrow::BinaryArray>(arrow::BinaryArray const&)::'lambda'(std::basic_string_view<char, std::char_traits<char> >)>::type::operator()))), arrow::Status>::value, arrow::Status>::type arrow::VisitArraySpanInline<arrow::BinaryType, void parquet::(anonymous namespace)::DeltaByteArrayEncoder<parquet::PhysicalType<(parquet::Type::type)6> >::PutBinaryArray<arrow::BinaryArray>(arrow::BinaryArray const&)::'lambda'(std::basic_string_view<char, std::char_traits<char> >), void parquet::(anonymous namespace)::DeltaByteArrayEncoder<parquet::PhysicalType<(parquet::Type::type)6> >::PutBinaryArray<arrow::BinaryArray>(arrow::BinaryArray const&)::'lambda'()>(arrow::ArraySpan const&, void parquet::(anonymous namespace)::DeltaByteArrayEncoder<parquet::PhysicalType<(parquet::Type::type)6> >::PutBinaryArray<arrow::BinaryArray>(arrow::BinaryArray const&)::'lambda'(std::basic_string_view<char, std::char_traits<char> >)&&, void parquet::(anonymous namespace)::DeltaByteArrayEncoder<parquet::PhysicalType<(parquet::Type::type)6> >::PutBinaryArray<arrow::BinaryArray>(arrow::BinaryArray const&)::'lambda'()&&) /arrow/cpp/src/arrow/visit_data_inline.h:195:10
#7 0x7fecaefbe744 in void parquet::(anonymous namespace)::DeltaByteArrayEncoder<parquet::PhysicalType<(parquet::Type::type)6> >::PutBinaryArray<arrow::BinaryArray>(arrow::BinaryArray const&) /arrow/cpp/src/parquet/encoding.cc:3149:5
#8 0x7fecaefbb3b4 in parquet::(anonymous namespace)::DeltaByteArrayEncoder<parquet::PhysicalType<(parquet::Type::type)6> >::Put(arrow::Array const&) /arrow/cpp/src/parquet/encoding.cc:3293:5
#9 0x559f81b7f9b4 in parquet::test::DeltaByteArrayEncodingAdHoc_ArrowBinaryDirectPut_Test::TestBody()::$_5::operator()(std::shared_ptr<arrow::Array>) const /arrow/cpp/src/parquet/encoding_test.cc:2035:5
#10 0x559f81b7ec85 in parquet::test::DeltaByteArrayEncodingAdHoc_ArrowBinaryDirectPut_Test::TestBody() /arrow/cpp/src/parquet/encoding_test.cc:2068:5
#11 0x7fecb0bf85aa in void testing::internal::HandleSehExceptionsInMethodIfSupported<testing::Test, void>(testing::Test*, void (testing::Test::*)(), char const*) /build/cpp/googletest_ep-prefix/src/googletest_ep/googletest/src/gtest.cc:2607:10
#12 0x7fecb0bdcc19 in void testing::internal::HandleExceptionsInMethodIfSupported<testing::Test, void>(testing::Test*, void (testing::Test::*)(), char const*) /build/cpp/googletest_ep-prefix/src/googletest_ep/googletest/src/gtest.cc:2643:14
#13 0x7fecb0bb6a82 in testing::Test::Run() /build/cpp/googletest_ep-prefix/src/googletest_ep/googletest/src/gtest.cc:2682:5
#14 0x7fecb0bb77e8 in testing::TestInfo::Run() /build/cpp/googletest_ep-prefix/src/googletest_ep/googletest/src/gtest.cc:2861:11
#15 0x7fecb0bb8003 in testing::TestSuite::Run() /build/cpp/googletest_ep-prefix/src/googletest_ep/googletest/src/gtest.cc:3015:28
#16 0x7fecb0bc8981 in testing::internal::UnitTestImpl::RunAllTests() /build/cpp/googletest_ep-prefix/src/googletest_ep/googletest/src/gtest.cc:5855:44
#17 0x7fecb0bfb5aa in bool testing::internal::HandleSehExceptionsInMethodIfSupported<testing::internal::UnitTestImpl, bool>(testing::internal::UnitTestImpl*, bool (testing::internal::UnitTestImpl::*)(), char const*) /build/cpp/googletest_ep-prefix/src/googletest_ep/googletest/src/gtest.cc:2607:10
#18 0x7fecb0bdf419 in bool testing::internal::HandleExceptionsInMethodIfSupported<testing::internal::UnitTestImpl, bool>(testing::internal::UnitTestImpl*, bool (testing::internal::UnitTestImpl::*)(), char const*) /build/cpp/googletest_ep-prefix/src/googletest_ep/googletest/src/gtest.cc:2643:14
#19 0x7fecb0bc84ea in testing::UnitTest::Run() /build/cpp/googletest_ep-prefix/src/googletest_ep/googletest/src/gtest.cc:5438:10
#20 0x7fecb0c32210 in RUN_ALL_TESTS() /build/cpp/googletest_ep-prefix/src/googletest_ep/googletest/include/gtest/gtest.h:2490:46
#21 0x7fecb0c321ec in main /build/cpp/googletest_ep-prefix/src/googletest_ep/googletest/src/gtest_main.cc:52:10
#0 0x7fec91e42d8f in
#23 0x7fec91e42e3f in __libc_start_main (/lib/x86_64-linux-gnu/libc.so.6+0x29e3f) (BuildId: 69389d485a9793dbe873f0ea2c93e02efaa9aa3d)
#24 0x559f817bb0d4 in _start (/build/cpp/debug/parquet-internals-test+0x5bc0d4) (BuildId: 4c7595fb7dab36a6ca4c879fa4ce83f9ed7a537e)
SUMMARY: UndefinedBehaviorSanitizer: undefined-behavior /arrow/cpp/src/arrow/buffer_builder.h:138:27 in
/build/cpp/src/parquet
If there are no objections I'd like to merge this before 12.0.0 window closes tomorrow (Tuesday).
Thanks for the review @mapleFU! If CI passes and there's no further comments I'll merge.
If there are no objections I'd like to merge this before 12.0.0 window closes tomorrow (Tuesday).
There is no need to rush this and I think it can wait for a more comprehensive review. There have been enough delicate issues with the DELTA decoders/encoders in the past, to exercise a bit more restraint.
@pitrou do you mean you would prefer this is merged immediately after the 12.0.0 window so we have until 13.0.0 to observe for bugs? Or you mean we need another round of review? Or both :D
I mean a more thorough review to check the implementation for potential issues.
Do we need another review here?
LGTM now, I'm not a committer so no idea about how can this patch be merged ╮( ̄▽ ̄"")╭
@pitrou Mind take a look at this patch when you have spare time? Since it's blocked for a long time
@mapleFU Yes, I'll do. Sorry!
@rok I've written a bit ugly random string generator:
std::shared_ptr<::arrow::Array> generateShareSeqString(
::arrow::random::RandomArrayGenerator& gen, size_t seqNum, double nullPercent) {
::arrow::StringBuilder builder;
auto seed = gen.seed();
std::shared_ptr<::arrow::Buffer> nullBitMap = nullptr;
if (nullPercent != 0) {
nullBitMap = gen.NullBitmap(seqNum, nullPercent);
}
std::mt19937 engine(seed);
std::uniform_int_distribution<int32_t> igen;
std::string s = "share-seq-string";
std::string prefix = s;
for (size_t i = 0; i < seqNum; ++i) {
if (nullBitMap != nullptr && ::arrow::bit_util::GetBit(nullBitMap->data(), i)) {
PARQUET_THROW_NOT_OK(builder.AppendNull());
} else {
PARQUET_THROW_NOT_OK(builder.Append(s));
int length = igen(engine) % 4 + 1; // random string length between 1 and 4
std::string randomStr;
for (int j = 0; j < length; j++) {
char c = 'a' + igen(engine) % 26; // random lowercase letter
s += c;
}
s = prefix + randomStr;
if (igen(engine) % 2 == 0) { // randomly change prefix
prefix = s.substr(0, igen(engine) % s.length());
}
}
}
PARQUET_ASSIGN_OR_THROW(auto array, builder.Finish());
if (static_cast<size_t>(array->length()) != seqNum) {
throw ParquetException("Error");
}
return array;
}
Maybe you can change a bit and use it
@rok Is this ready to review again?
@pitrou I believe I need to finish the random array generator. I got sidetracked on a construction project and I'll try to finish this by end of the week.
I will take a look in depth over the weekend. Hope it can be included in the 13.0.0 release as we have been wishing it for a long time.
Well, is the PR ready? @rok
I will take a look in depth over the weekend. Hope it can be included in the 13.0.0 release as we have been wishing it for a long time.
Thanks @wgtmac! I was occupied with wooden parquet for a while but now finally have some time and am trying to figure out what's causing the issue with random generated data. I'll push what I have by tonight.