iceberg icon indicating copy to clipboard operation
iceberg copied to clipboard

Core: Avro writers use BlockingBinaryEncoder to enable array/map size calculations.

Open rustyconover opened this issue 1 year ago • 28 comments

When writing Avro files often Iceberg is writing arrays and maps. The current use of binaryEncoder() and directBinaryEncoder() of org.apache.avro.io.EncoderFactory do not write the length of the arrays or maps to Avro since the binaryEncoder() and directBinaryEncoder() does not buffer the output to calculate a length.

Knowing the length of an array or map is useful to clients decoding the Avro file since they can skip decoding the entire array or map if it is not needed when reading the file. This PR changes all Avro writers to use blockingBinaryEncoder(), this encoder does not "block" in the concurrency sense but it does buffer the output of objects such that the lengths of arrays and maps will be calculated.

Having the byte lengths of maps and arrays written will speed up the Python decoding of Avro files significantly for tables that have many columns.

See:

https://avro.apache.org/docs/1.5.1/api/java/org/apache/avro/io/EncoderFactory.html#blockingBinaryEncoder(java.io.OutputStream,%20org.apache.avro.io.BinaryEncoder)

For details between the different Avro encoders.

rustyconover avatar Sep 23 '23 19:09 rustyconover

@Fokko can you take a look at this too?

rustyconover avatar Sep 23 '23 19:09 rustyconover

@fokko The 1MB was really just a guess.

I think that configureBlockSize() represents the largest block of the map/array that will be buffered in memory before being written, of course an array or map can consist of multiple blocks.

Thinking of my use cases this is how I came up with my guess that 1 MB is a reasonable size.

The largest maps I common encounter are the maps from the field_id to the highest or lowest value for a column in a particular file. The highest or lowest value is byte array which can be variable length, lets commonly lets bound those values at 256 bytes. The field_id also won't be greater than 8 bytes in length (commonly it will be shorter due to zigzag encoding). So for a table of 200 columns lets try:

8 bytes (field_id) * 256 bytes (value length) * 200 (column count) = 409,600 bytes.

I'm happy to hear your thoughts on this, but 1 MB seems like a reasonable first guess, until we make it a table property. Do we want to make it a table property?

rustyconover avatar Sep 25 '23 01:09 rustyconover

Also took a stab at it: https://github.com/apache/avro/pull/2521

Still a bit slower due to buffering and copying:

>>> without_buffer = [
...        27.026061125,
...        26.702181417,
...        26.625919333,
...        26.615276875,
...        26.583737958
...]
>>> sum(without_buffer) / 5
26.7106353416

Fokko avatar Sep 25 '23 14:09 Fokko

Removed another copy and introduced ThreadLocal, and we're close to the original:

➜  iceberg git:(fd-add-benchmark) ✗ python3
Python 3.11.5 (main, Aug 24 2023, 15:09:45) [Clang 14.0.3 (clang-1403.0.22.14.1)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> with_threadlocale = [
...     26.181581084,
...     26.2257085,
...     26.292216292,
...     25.931295708,
...     25.773379833
... ]
>>> sum(with
with              with_threadlocale
>>> sum(with_threadlocale) / 5
26.080836283399996

Fokko avatar Sep 25 '23 16:09 Fokko

@fokko Those times seem close enough to me.

I'm sorry this is a bit complicated to get working.

rustyconover avatar Sep 25 '23 17:09 rustyconover

I just realized that this would also speed up operations snapshot expiration, because we do need to access the manifest files, but don't need to use the metrics.

Fokko avatar Oct 17 '23 12:10 Fokko

Yes it would!

rustyconover avatar Oct 17 '23 13:10 rustyconover

I'd love to take a look early next week.

aokolnychyi avatar Nov 10 '23 00:11 aokolnychyi

@rustyconover @Fokko, do we need any changes in readers to benefit from this? If not, can we run some existing benchmarks to showcase the read improvement is as we anticipate?

Here is what we have:

ManifestReadBenchmark (may need to be extended?)
PlanningBenchmark (Spark specific, can ignore distributed cases)

aokolnychyi avatar Nov 15 '23 19:11 aokolnychyi

Question. Aren't we using DataFileWriter from Avro in our AvroFileAppender? If so, how is this PR affecting it? Won't we still use direct encoders there?

https://github.com/apache/avro/blob/85ddfcdaba60d1cb18a16c9908d737bb32369df7/lang/java/avro/src/main/java/org/apache/avro/file/DataFileWriter.java#L245

aokolnychyi avatar Nov 15 '23 23:11 aokolnychyi

Also, nice work on a new encoder in Avro, @Fokko! Do you know when will that be available?

aokolnychyi avatar Nov 15 '23 23:11 aokolnychyi

do we need any changes in readers to benefit from this? If not, can we run some existing benchmarks to showcase the read improvement is as we anticipate?

Since we use the decoders from Avro itself, we don't need any changes. The relevant code is here: https://github.com/apache/avro/blob/main/lang/java/avro/src/main/java/org/apache/avro/io/BinaryDecoder.java#L398-L424

It will speed up the reading tremendously when we don't need to read in the map[int, bytes] that we use to store statistics. This way you can jump right over them without having to skip each key-value individually.

Question. Aren't we using DataFileWriter from Avro in our AvroFileAppender? If so, how is this PR affecting it? Won't we still use direct encoders there?

This is a good question. The goal of this PR is to write the block sizes for the manifests. @rustyconover any thoughts on this?

Also, nice work on a new encoder in Avro, @Fokko! Do you know when will that be available?

Thanks! I can check in with the Avro community to see if we can do a release.

Fokko avatar Nov 15 '23 23:11 Fokko

@rustyconover @Fokko, I was wondering whether there were any updates. It would be great to have this in.

aokolnychyi avatar Dec 01 '23 23:12 aokolnychyi

@aokolnychyi I think we can start a release somewhere soon, but I need to align this with the Avro community. I also wanted to include nanosecond timestamp in there: https://github.com/apache/avro/pull/2608

Fokko avatar Dec 04 '23 14:12 Fokko

Hello @aokolnychyi and @Fokko,

Question. Aren't we using DataFileWriter from Avro in our AvroFileAppender? If so, how is this PR affecting it? Won't we still use direct encoders there?

This is a good question. The goal of this PR is to write the block sizes for the manifests. @rustyconover any thoughts on this?

I am particularly interested in the explicit inclusion of block sizes in manifest files. Currently, PyIceberg requires deserialization of three maps (high, low, and null counts) even when the query planner may not necessarily need that information. If block sizes are explicitly written, I plan to modify the PyIceberg Avro decoder to adopt a "lazy" approach. It would involve copying arrays or maps as byte buffers, deferring the decoding process until they are accessed by client code. Consequently, if the code never accesses the map, the deserialization code would not execute.

This optimization could significantly reduce scan planning time, especially for tables with numerous files across multiple columns. For instance, in a table with 1,250,000 files, copying bytes in memory for later decoding proves much faster than the deserialization of Avro structures.

Currently the PyIceberg Avro code isn't performing lazy decoding, it just decodes everything. This is because due to the choices in the Java code about how Avro is serialized directly without buffering so the byte length count be included. I can prepare a PR to do this.

Rusty

rustyconover avatar Dec 16 '23 20:12 rustyconover

My point in the earlier message is that I am not sure this PR would actually have an effect because changes are not going to be used by our write path in Java. Am I missing anything here?

aokolnychyi avatar Feb 09 '24 18:02 aokolnychyi

Hi @aokolnychyi can we change it to be a buffered binary writer that way we would get the length counts written?

rustyconover avatar Feb 12 '24 00:02 rustyconover

@aokolnychyi This is about the Iceberg metadata, not about the Datafiles itself. It might also be interesting for the Datafiles, but then we should analyze the access patterns first.

Fokko avatar Feb 12 '24 20:02 Fokko

@Fokko, aren't we using DataFileWriter from Avro for Iceberg metadata? Yeah, I fully support the idea, it is just my preliminary analysis showed it would have no effect on currently produced metadata by Java (as the PR stands today).

aokolnychyi avatar Feb 22 '24 19:02 aokolnychyi

@aokolnychyi Hmm, I did some quick checks and that seems to be correct. I'm pretty sure that it was using the code because I was seeing exceptions and differences in the benchmarks. Let me dig into this a bit more. I would love to solve this on the Iceberg side.

Fokko avatar Feb 22 '24 21:02 Fokko

Any updates on this?

rustyconover avatar Mar 02 '24 14:03 rustyconover

Curious if there were any updates as well.

aokolnychyi avatar Apr 12 '24 01:04 aokolnychyi

I could re-test it. It would take me a day or two.

rustyconover avatar Apr 12 '24 01:04 rustyconover

Yes, this is still top of mind! I'm going to see what's needed and make sure that it will be included in the next Avro release!

Fokko avatar Apr 25 '24 20:04 Fokko

Here we go: https://github.com/apache/avro/pull/2874

Fokko avatar Apr 26 '24 11:04 Fokko