iceberg
iceberg copied to clipboard
Core: Avro writers use BlockingBinaryEncoder to enable array/map size calculations.
When writing Avro files often Iceberg is writing arrays and maps. The current use of binaryEncoder()
and directBinaryEncoder()
of org.apache.avro.io.EncoderFactory
do not write the length of the arrays or maps to Avro since the binaryEncoder()
and directBinaryEncoder()
does not buffer the output to calculate a length.
Knowing the length of an array or map is useful to clients decoding the Avro file since they can skip decoding the entire array or map if it is not needed when reading the file. This PR changes all Avro writers to use blockingBinaryEncoder()
, this encoder does not "block" in the concurrency sense but it does buffer the output of objects such that the lengths of arrays and maps will be calculated.
Having the byte lengths of maps and arrays written will speed up the Python decoding of Avro files significantly for tables that have many columns.
See:
https://avro.apache.org/docs/1.5.1/api/java/org/apache/avro/io/EncoderFactory.html#blockingBinaryEncoder(java.io.OutputStream,%20org.apache.avro.io.BinaryEncoder)
For details between the different Avro encoders.
@Fokko can you take a look at this too?
@fokko The 1MB was really just a guess.
I think that configureBlockSize()
represents the largest block of the map/array that will be buffered in memory before being written, of course an array or map can consist of multiple blocks.
Thinking of my use cases this is how I came up with my guess that 1 MB is a reasonable size.
The largest maps I common encounter are the maps from the field_id
to the highest or lowest value for a column in a particular file. The highest or lowest value is byte array which can be variable length, lets commonly lets bound those values at 256 bytes. The field_id
also won't be greater than 8 bytes in length (commonly it will be shorter due to zigzag encoding). So for a table of 200 columns lets try:
8 bytes (field_id) * 256 bytes (value length) * 200 (column count) = 409,600 bytes.
I'm happy to hear your thoughts on this, but 1 MB seems like a reasonable first guess, until we make it a table property. Do we want to make it a table property?
Also took a stab at it: https://github.com/apache/avro/pull/2521
Still a bit slower due to buffering and copying:
>>> without_buffer = [
... 27.026061125,
... 26.702181417,
... 26.625919333,
... 26.615276875,
... 26.583737958
...]
>>> sum(without_buffer) / 5
26.7106353416
Removed another copy and introduced ThreadLocal, and we're close to the original:
➜ iceberg git:(fd-add-benchmark) ✗ python3
Python 3.11.5 (main, Aug 24 2023, 15:09:45) [Clang 14.0.3 (clang-1403.0.22.14.1)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> with_threadlocale = [
... 26.181581084,
... 26.2257085,
... 26.292216292,
... 25.931295708,
... 25.773379833
... ]
>>> sum(with
with with_threadlocale
>>> sum(with_threadlocale) / 5
26.080836283399996
@fokko Those times seem close enough to me.
I'm sorry this is a bit complicated to get working.
I just realized that this would also speed up operations snapshot expiration, because we do need to access the manifest files, but don't need to use the metrics.
Yes it would!
I'd love to take a look early next week.
@rustyconover @Fokko, do we need any changes in readers to benefit from this? If not, can we run some existing benchmarks to showcase the read improvement is as we anticipate?
Here is what we have:
ManifestReadBenchmark (may need to be extended?)
PlanningBenchmark (Spark specific, can ignore distributed cases)
Question. Aren't we using DataFileWriter
from Avro in our AvroFileAppender
? If so, how is this PR affecting it? Won't we still use direct encoders there?
https://github.com/apache/avro/blob/85ddfcdaba60d1cb18a16c9908d737bb32369df7/lang/java/avro/src/main/java/org/apache/avro/file/DataFileWriter.java#L245
Also, nice work on a new encoder in Avro, @Fokko! Do you know when will that be available?
do we need any changes in readers to benefit from this? If not, can we run some existing benchmarks to showcase the read improvement is as we anticipate?
Since we use the decoders from Avro itself, we don't need any changes. The relevant code is here: https://github.com/apache/avro/blob/main/lang/java/avro/src/main/java/org/apache/avro/io/BinaryDecoder.java#L398-L424
It will speed up the reading tremendously when we don't need to read in the map[int, bytes]
that we use to store statistics. This way you can jump right over them without having to skip each key-value individually.
Question. Aren't we using DataFileWriter from Avro in our AvroFileAppender? If so, how is this PR affecting it? Won't we still use direct encoders there?
This is a good question. The goal of this PR is to write the block sizes for the manifests. @rustyconover any thoughts on this?
Also, nice work on a new encoder in Avro, @Fokko! Do you know when will that be available?
Thanks! I can check in with the Avro community to see if we can do a release.
@rustyconover @Fokko, I was wondering whether there were any updates. It would be great to have this in.
@aokolnychyi I think we can start a release somewhere soon, but I need to align this with the Avro community. I also wanted to include nanosecond timestamp in there: https://github.com/apache/avro/pull/2608
Hello @aokolnychyi and @Fokko,
Question. Aren't we using DataFileWriter from Avro in our AvroFileAppender? If so, how is this PR affecting it? Won't we still use direct encoders there?
This is a good question. The goal of this PR is to write the block sizes for the manifests. @rustyconover any thoughts on this?
I am particularly interested in the explicit inclusion of block sizes in manifest files. Currently, PyIceberg requires deserialization of three maps (high, low, and null counts) even when the query planner may not necessarily need that information. If block sizes are explicitly written, I plan to modify the PyIceberg Avro decoder to adopt a "lazy" approach. It would involve copying arrays or maps as byte buffers, deferring the decoding process until they are accessed by client code. Consequently, if the code never accesses the map, the deserialization code would not execute.
This optimization could significantly reduce scan planning time, especially for tables with numerous files across multiple columns. For instance, in a table with 1,250,000 files, copying bytes in memory for later decoding proves much faster than the deserialization of Avro structures.
Currently the PyIceberg Avro code isn't performing lazy decoding, it just decodes everything. This is because due to the choices in the Java code about how Avro is serialized directly without buffering so the byte length count be included. I can prepare a PR to do this.
Rusty
My point in the earlier message is that I am not sure this PR would actually have an effect because changes are not going to be used by our write path in Java. Am I missing anything here?
Hi @aokolnychyi can we change it to be a buffered binary writer that way we would get the length counts written?
@aokolnychyi This is about the Iceberg metadata, not about the Datafiles itself. It might also be interesting for the Datafiles, but then we should analyze the access patterns first.
@Fokko, aren't we using DataFileWriter
from Avro for Iceberg metadata? Yeah, I fully support the idea, it is just my preliminary analysis showed it would have no effect on currently produced metadata by Java (as the PR stands today).
@aokolnychyi Hmm, I did some quick checks and that seems to be correct. I'm pretty sure that it was using the code because I was seeing exceptions and differences in the benchmarks. Let me dig into this a bit more. I would love to solve this on the Iceberg side.
Any updates on this?
Curious if there were any updates as well.
I could re-test it. It would take me a day or two.
Yes, this is still top of mind! I'm going to see what's needed and make sure that it will be included in the next Avro release!
Here we go: https://github.com/apache/avro/pull/2874