velox icon indicating copy to clipboard operation
velox copied to clipboard

Support for dictionary encoded INT96 timestamp in parquet files

Open rui-mo opened this issue 2 years ago • 26 comments

Support timestamp reader for Parquet file format to read from dictionary- encoded INT96 timestamps. Hive configs kReadTimestampUnit and kReadTimestampUnitSession are added to control the precision when reading timestamps from files.

rui-mo avatar Apr 20 '23 07:04 rui-mo

Deploy Preview for meta-velox canceled.

Name Link
Latest commit fdceec25991fe6c6bf757c02906a7192435e033d
Latest deploy log https://app.netlify.com/sites/meta-velox/deploys/6699f6bec2d39a0008c88862

netlify[bot] avatar Apr 20 '23 07:04 netlify[bot]

You probably need to make the writer to generate INT96 in writeToMemory

@Yuhta I doubt if the Arrow Bridge supports int96 type. But worth checking. The alternative is to check in a file. Arrow Bridge has a similar issue with Parquet Decimal types backed by int64.

majetideepak avatar Apr 21 '23 18:04 majetideepak

@majetideepak Using a fixed file gives less coverage, but if the writer is not working then we have to do it this way for now. Either way we should make sure the result is correct with or without filters.

Yuhta avatar Apr 21 '23 19:04 Yuhta

@majetideepak @Yuhta Thanks for your review! Your comments are well received, and I'm working on them.

rui-mo avatar Apr 23 '23 02:04 rui-mo

Please add some tests in https://github.com/facebookincubator/velox/blob/main/velox/dwio/parquet/tests/reader/E2EFilterTest.cpp

You probably need to make the writer to generate INT96 in writeToMemory

@Yuhta I also tried that. Use enable_deprecated_int96_timestamps can make the arrow writer generate INT96. Since int128_t is used in Timestamp reader for now, the decoder calls readInt128() but little endian is not support currently (see IntDecoder.h). I will continue to work on this after the type issue is decided.

rui-mo avatar Apr 28 '23 07:04 rui-mo

I don't think using int128_t will work here, the valueSize_ is different and you will end up reading different part of data and even read out of bound.

hi @Yuhta, I spent more time on Int96Timestamp type support but it is not easy to make it work through. We also made more tests on the int128_t workaround, and found it could work for a pure scan. As posted in this PR, int96 in Parquet is converted to Velox Timestamp type (which is of 16-byte length) in PageReader (see link), and only numValues * sizeof(Int96Timestamp) bytes of data was read in PageReader.

Below is the stack of current timestamp scan.

facebook::velox::parquet::PageReader::prepareDictionary(facebook::velox::parquet::thrift::PageHeader const&) in ./velox_dwio_parquet_table_scan_test
 1# facebook::velox::parquet::PageReader::seekToPage(long) in ./velox_dwio_parquet_table_scan_test
 2# facebook::velox::parquet::PageReader::rowsForPage(facebook::velox::dwio::common::SelectiveColumnReader&, bool, bool, folly::Range<int const*>&, unsigned long const*&) in ./velox_dwio_parquet_table_scan_test
 3# void facebook::velox::parquet::PageReader::readWithVisitor<facebook::velox::dwio::common::ColumnVisitor<__int128, facebook::velox::common::AlwaysTrue, facebook::velox::dwio::common::ExtractToReader<facebook::velox::dwio::common::SelectiveIntegerColumnReader>, true> >(facebook::velox::dwio::common::ColumnVisitor<__int128, facebook::velox::common::AlwaysTrue, facebook::velox::dwio::common::ExtractToReader<facebook::velox::dwio::common::SelectiveIntegerColumnReader>, true>&) in ./velox_dwio_parquet_table_scan_test

Could you explain more about the possible risks? Thank you.

rui-mo avatar May 18 '23 08:05 rui-mo

@Yuhta Thanks for your reply.

So the assumption here is it is always dictionary-encoded? If this assumption holds all the time, we can probably go this way. It's only a problem when we want to apply a filter on the column of flat values.

Understood the gap here. I guess RLEV1 and Plain encoding are also possible because the column encoding can be set during Parquet write, but we only tested the Parquet generated with default configs.

Make sure to beef up your E2E filter tests with filters on some primary keys (int64 is fine), and also put timestamp in complex types (array, map, struct) in addition to top-level column.

Got it, will do.

rui-mo avatar May 23 '23 09:05 rui-mo

Hi @rui-mo thanks for working on this! I wonder what is the current status of this draft change?

chliang71 avatar Sep 01 '23 17:09 chliang71

It's only a problem when we want to apply a filter on the column of flat values.

Hi @chliang71, as Yuhta pointed out, this PR only works for int96 timestamps when dictionary-encoded. But for flat values, we need to make below int96 type work at https://github.com/facebookincubator/velox/pull/4680/files#diff-ae87451c1577f3b47d2863187de8bf30c7351484d39537419016487cc7b2f71cR38.

struct attribute((packed)) Int96Timestamp { int32_t days; uint64_t nanos; };

rui-mo avatar Sep 05 '23 02:09 rui-mo

@rui-mo Thanks for the prompt reply! Just try to understand the context. Besides dictionary-encoded int96, looks like flat value timestamp of int64 or int96 also does not work either today? Because given the changes to be made in SelectiveColumnReader.cpp , looks like TIMESTAMP is entirely missing from column reader? (Please correct me if I'm wrong). If this is true, is there any plan to support int64? Thanks!

chliang71 avatar Sep 05 '23 17:09 chliang71

Hi @chliang71, in this PR, a timestamp reader is added in TimestampColumnReader which extends IntegerColumnReader, but int96_t type is missing and there is much work to self-define a int96 type with struct. Which int64_t type are you using, micro or milli? The support for int64 type could be considered in the future if needed.

rui-mo avatar Sep 11 '23 08:09 rui-mo

Hey @rui-mo , Thanks for the details! We do need to support int96_t timestamp, so this will be greatly helpful to us for sure. But yeah from our side, we will also need to support int64_t type micro timestamp. Would be great to see this extended support. We are also happy to help contribute

chliang71 avatar Sep 11 '23 22:09 chliang71

I think we will need a more general Timestamp Parquet reader. INT96 has been deprecated and Parquet Timestamp logical type is now annotated with INT64. According to https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#timestamp, a TIMESTAMP(isAdjustedToUTC=true, unit=MILLIS) with the value 172800000 corresponds to 1970-01-03 00:00:00 UTC, because it is equal to 2 * 24 * 60 * 60 * 1000. During the read process, this INT64(or previous INT96) value needs to be converted to Velox Timestamp which is int64_t seconds_ and uint64_t nanos_. This conversion needs to refer to the actual logical type. For example, the way to decode a logical type with unit=MILLIS is different to the way to decode unit=NANOS. For example, the above value 172800000 will be converted to Velox Timestamp {seconds_=172800, nanos_=0}. This conversion can be done in the read() function of the TimestampColumnReader. Plain memory copying the 64or 96 bit values without doing conversion doesn't seem correct.

Also the data may be in flat encodings like PLAIN, RLE/BIT_PACKED, or other V2 integer encodings like DELTA_BINARY_PACKED. The TimestampColumnReader::read() will need to support these encodings.

I don't think using int128_t in prepareRead() has problems. prepareRead's most important job is to prepare the result values buffer. Here the request type is Velox Timestamp, which is 128 bits total. The file type is int64 or int96, but the value will be converted to TWO numbers of 128 bits total. So we will need 128 bits values array anyways. Now let's consider 2 ways of doing it:

  1. do the conversion in read() and the getValues() call would just be a single line of casting.
  2. Read the values in the file type(64 or 96 bits) and store them as is in the values array, then getValues() does the conversion.

I think 1 is better than 2, because in 2 you need to allocate 2 arrays for the results, one of 128 bit wide, the other of 64 or 96 bits wide. This is not good for performance or memory footprint. If we choose 1, then we can just use int128_t to prepare the result values array, and read() would write into the higher 64 bits as int64_t seconds_ and lower 64 bits as uint64_t nanos_. Then in getValues(), we can just use existing code to directly cast the 128 bit value to Timestamp.

@rui-mo @chliang71 Is anyone of you interested in working on this?

yingsu00 avatar Oct 13 '23 07:10 yingsu00

@yingsu00 Thank you so much for the informative guideline.

INT96 has been deprecated and Parquet Timestamp logical type is now annotated with INT64.

Do you think it is still needed to support INT96 in Velox? If not, I shall remove the INT96 support.

This conversion needs to refer to the actual logical type.

For INT64 support, It may take some time for me to try as you suggested. Another question is, there are some filter pushdown logics in integer column reader, what's your suggestion for the filter pushdown of timestamp type? Is it possible to reuse some existing logic?

rui-mo avatar Oct 17 '23 08:10 rui-mo

Do you think it is still needed to support INT96 in Velox? If not, I shall remove the INT96 support.

Do you have existing Parquet files that were using INT96? I think there may be some users still use it and therefore we need to support it. But I'd suggest we start from INT64 first and add INT96 as the next step.. The implementations should be similar but INT96 may need some additional work.

Another question is, there are some filter pushdown logics in integer column reader, what's your suggestion for the filter pushdown of timestamp type? Is it possible to reuse some existing logic?

THere is a kTimestampRange filter kind, but I would suggest convert the filter on the timestamp to the filter on integers before reading the file. For INT64 it's quite easy. For INT96 it's trickier. Then you can just reuse the IntegerColumnReader code for the read() part. This is better than read and convert all values first then apply the TimestampRange filter, which doesn't have the benefit of filter pushdown.

I think 1 is better than 2, because in 2 you need to allocate 2 arrays for the results, one of 128 bit wide, the other of 64 or 96 bits wide.

Correction: 2 may be easier to implement. For that, just use the read() of IntegerColumnReader to read the 64bit values into values_ array (you can still use int128 size for it so we don't allocate twice), and for getValues(), you may implement a specialized getFlatValues<int64, Timestamp>() function to do the conversion in place. INT96 can be done in similar way.

yingsu00 avatar Oct 18 '23 05:10 yingsu00

@rui-mo @yingsu00 I don't think it is possible to evaluate a filter (using the current filter interface) with just days stream or nanos stream. You will need to materialize at least one of them fully in memory and then stream the other one. I would suggest just fully materialize them and convert them to array of Timestamp, this will make it much easier. If we see bottleneck on filtering timestamp we can optimize it later. You can take a look at the DWRF timestamp column reader as an example.

Yuhta avatar Oct 18 '23 14:10 Yuhta

@rui-mo @yingsu00 I don't think it is possible to evaluate a filter (using the current filter interface) with just days stream or nanos stream. You will need to materialize at least one of them fully in memory and then stream the other one. I would suggest just fully materialize them and convert them to array of Timestamp, this will make it much easier. If we see bottleneck on filtering timestamp we can optimize it later. You can take a look at the DWRF timestamp column reader as an example.

@Yuhta Unlike DWRF which uses 2 streams in the file, Parquet uses a single INT_64 integer for timestamps, and its value is the number of units(e.g. MILLIS) to 1970-01-01 00:00:00 (UTC). The unit is defined in its logical type. An example of a logical type is TIMESTAMP(isAdjustedToUTC=true, unit=MILLIS) or TIMESTAMP(isAdjustedToUTC=false, unit=MICROS). For example, value 172800000 with logical type TIMESTAMP(isAdjustedToUTC=true, unit=MILLIS) corresponds to 1970-01-03 00:00:00 UTC, because it is equal to 2 * 24 * 60 * 60 * 1000. This means that a filter on the Timestamp column can be converted to a filter on BIGINT. For example, c > TIMESTAMP '1970-01-03 00:00:00' can be converted to physical value of c > 172800000. This conversion needs to be done in the Parquet Readers and do not apply to DWRF.

yingsu00 avatar Oct 26 '23 06:10 yingsu00

@yingsu00 I see, so instead of using TimestampRange, the column reader will convert it to a private int64 filter. That should work.

Yuhta avatar Oct 26 '23 19:10 Yuhta

We should consider implementing https://github.com/facebookincubator/velox/discussions/2511 first to simply work.

majetideepak avatar Nov 02 '23 00:11 majetideepak

We should consider implementing #2511 first to simply work.

I misunderstood the scope of #2511. That will not impact this work. We can continue this implementation. Apologies!

majetideepak avatar Nov 10 '23 13:11 majetideepak

@yingsu00, @Yuhta how do we decide if the evaluation of the timestamp column should follow Instant semantics (timestamps normalized to UTC) or Local semantics (timestamps not normalized to UTC) I don't see this being handled by Presto Java correctly. For INT96, we seem to use local semantics. And for INT64, we follow the instance semantics. https://github.com/prestodb/presto/blob/master/presto-parquet/src/main/java/com/facebook/presto/parquet/ColumnReaderFactory.java#L81

majetideepak avatar Nov 10 '23 16:11 majetideepak

Related Presto issue https://github.com/prestodb/presto/issues/22605

yingsu00 avatar Apr 24 '24 21:04 yingsu00

Apologize for the extended period since last review. It was suggested that we start with int64 timestamps and https://github.com/facebookincubator/velox/pull/8325 is working on that. Since it is still under review, we would like to work on int96 support at the same time and make some progress on the timestamp reader support if possible. This PR focuses on supporting reading dictionary-encoded INT96 timestamps, and we would like to open another PR to support plain-encoded INT96 timestamps after it is landed (drafted as https://github.com/oap-project/velox/commit/533bb9eb0794915e4de7921f2d018758c5b6031f).

Now let's consider 2 ways of doing it:

  1. do the conversion in read() and the getValues() call would just be a single line of casting.
  2. Read the values in the file type(64 or 96 bits) and store them as is in the values array, then getValues() does the conversion.

I think 1 is better than 2, because in 2 you need to allocate 2 arrays for the results, one of 128 bit wide, the other of 64 or 96 bits wide. This is not good for performance or memory footprint. If we choose 1, then we can just use int128_t to prepare the result values array, and read() would write into the higher 64 bits as int64_t seconds_ and lower 64 bits as uint64_t nanos_. Then in getValues(), we can just use existing code to directly cast the 128 bit value to Timestamp. 2 may be easier to implement. For that, just use the read() of IntegerColumnReader to read the 64bit values into values_ array (you can still use int128 size for it so we don't allocate twice), and for getValues(), you may implement a specialized getFlatValues<int64, Timestamp>() function to do the conversion in place. INT96 can be done in similar way.

As mentioned in the previous review, I looked into the implementation in more detail. The implementation in this PR creates a timestamp after reading out 12 bytes from Parquet. Velox's timestamp is 128 bits wide, so we can avoid using an additional data array in this way. @yingsu00 @Yuhta @majetideepak Could you please spare some time to review this PR again? Thanks!

rui-mo avatar May 31 '24 07:05 rui-mo

@Yuhta Appreciate your help. Above comments are fixed. Would you help review again? Thanks!

rui-mo avatar Jun 03 '24 09:06 rui-mo

@Yuhta Above comments were fixed. Would you help take another look? Thank you.

rui-mo avatar Jun 19 '24 12:06 rui-mo

@bikramSingh91 Could you help import and merge this PR? Thanks!

rui-mo avatar Jun 27 '24 03:06 rui-mo

We have ported this PR internally and so far running fine. Thanks for working on this @rui-mo ! We do encounter one issue though related IntDecoder reading int128.

Since int128_t is used in Timestamp reader for now, the decoder calls readInt128() but little endian is not support currently (see IntDecoder.h). I will continue to work on this after the type issue is decided.

Any quick insights on what needs to be done here? i.e. If the data file uses INT96 (12 bytes), readInt128() would read 16 bytes? Then will the reader need to re-align the bytes correspondingly, plus use little endian?

chliang71 avatar Jul 08 '24 23:07 chliang71

We do encounter one issue though related IntDecoder reading int128.

@chliang71 Thanks for your feedback. I assume this issue is on plain-encoded timestamp reading, while this PR focuses on dictionary-encoding. There is a draft on plain-encoding support https://github.com/oap-project/velox/commit/533bb9eb0794915e4de7921f2d018758c5b6031f by @mskapilks, which may go into a separate PR after this one.

rui-mo avatar Jul 09 '24 05:07 rui-mo

We do encounter one issue though related IntDecoder reading int128.

@chliang71 Thanks for your feedback. I assume this issue is on plain-encoded timestamp reading, while this PR focuses on dictionary-encoding. There is a draft on plain-encoding support oap-project@533bb9e by @mskapilks, which may go into a separate PR after this one.

I can raise the follow up PR for that change once this is done

mskapilks avatar Jul 16 '24 10:07 mskapilks

@Yuhta has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator.

facebook-github-bot avatar Jul 17 '24 22:07 facebook-github-bot