iceberg icon indicating copy to clipboard operation
iceberg copied to clipboard

Core: Interface based DataFile reader and writer API - PoC

Open pvary opened this issue 9 months ago • 11 comments

Here is what the PR does:

  • Created 3 interface classes which are implemented by the file formats:
    • ReadBuilder - Builder for reading data from data files
    • AppenderBuilder - Builder for writing data to data files
    • ObjectModel - Providing ReadBuilders, and AppenderBuilders for the specific data file format and object model pair
  • Updated the Parquet, Avro, ORC implementation for this interfaces, and deprecated the old reader/writer APIs
  • Created interface classes which will be used by the actual readers/writers of the data files:
    • AppenderBuilder - Builder for writing a file
    • DataWriterBuilder - Builder for generating a data file
    • PositionDeleteWriterBuilder - Builder for generating a position delete file
    • EqualityDeleteWriterBuilder - Builder for generating an equality delete file
    • No ReadBuilder here - the file format reader builder is reused
  • Created a WriterBuilder class which implements the interfaces above (AppenderBuilder/DataWriterBuilder/PositionDeleteWriterBuilder/EqualityDeleteWriterBuilder) based on a provided file format specific AppenderBuilder
  • Created an ObjectModelRegistry which stores the available ObjectModels, and engines and users could request the readers (ReadBuilder) and writers (AppenderBuilder/DataWriterBuilder/PositionDeleteWriterBuilder/EqualityDeleteWriterBuilder) from.
  • Created the appropriate ObjectModels:
    • GenericObjectModels - for reading and writing Iceberg Records
    • SparkObjectModels - for reading (vectorized and non-vectorized) and writing Spark InternalRow/ColumnarBatch objects
    • FlinkObjectModels - for reading and writing Flink RowData objects
    • An arrow object model is also registered for vectorized reads of Parquet files into Arrow ColumnarBatch objects
  • Updated the production code where the reading and writing happens to use the ObjectModelRegistry and the new reader/writer interfaces to access data files
  • Kept the testing code intact to ensure that the new API/code is not breaking anything

pvary avatar Feb 17 '25 15:02 pvary

I will start to collect the differences here between the different writer types (appender/dataWriter/equalityDeleteWriter/positionalDeleteWriter) for reference:

  • Writer context is different between delete and data files. This contains TableProperties/Configurations which could be different between delete and data files. For example for parquet: RowGroupSize/PageSize/PageRowLimit/DictSize/Compression etc. For ORC and Avro we have some similar changing configs
  • Specific writer functions for position deletes to write out the PositionDelete records
  • Positional delete PathTransformFunction to convert writer data type for the path to file format data type

pvary avatar Feb 21 '25 14:02 pvary

While I think the goal here is a good one, the implementation looks too complex to be workable in its current form.

The primary issue that we currently have is adapting object models (like Iceber's internal StructLike, Spark's InternalRow, or Flink's RowData) to file formats so that you can separately write object model to format glue code and have it work throughout support for an engine. I think a diff from the InternalData PR demonstrates it pretty well:

-    switch (format) {
-      case AVRO:
-        AvroIterable<ManifestEntry<F>> reader =
-            Avro.read(file)
-                .project(ManifestEntry.wrapFileSchema(Types.StructType.of(fields)))
-                .createResolvingReader(this::newReader)
-                .reuseContainers()
-                .build();
+    CloseableIterable<ManifestEntry<F>> reader =
+        InternalData.read(format, file)
+            .project(ManifestEntry.wrapFileSchema(Types.StructType.of(fields)))
+            .reuseContainers()
+            .build();
 
-        addCloseable(reader);
+    addCloseable(reader);
 
-        return CloseableIterable.transform(reader, inheritableMetadata::apply);
+    return CloseableIterable.transform(reader, inheritableMetadata::apply);
-
-      default:
-        throw new UnsupportedOperationException("Invalid format for manifest file: " + format);
-    }

This shows:

  • Rather than a switch, the format is passed to create the builder
  • There is no longer a callback passed to create readers for the object model (createResolvingReader)

In this PR, there are a lot of other changes as well. I'm looking at one of the simpler Spark cases in the row reader.

The builder is initialized from DataFileServiceRegistry and now requires a format, class name, file, projection, and constant map:

    return DataFileServiceRegistry.readerBuilder(
            format, InternalRow.class.getName(), file, projection, idToConstant)

There are also new static classes in the file. Each creates a new service and each service creates the builder and object model:

  public static class AvroReaderService implements DataFileServiceRegistry.ReaderService {
    @Override
    public DataFileServiceRegistry.Key key() {
      return new DataFileServiceRegistry.Key(FileFormat.AVRO, InternalRow.class.getName());
    }

    @Override
    public ReaderBuilder builder(
        InputFile inputFile,
        Schema readSchema,
        Map<Integer, ?> idToConstant,
        DeleteFilter<?> deleteFilter) {
      return Avro.read(inputFile)
          .project(readSchema)
          .createResolvingReader(schema -> SparkPlannedAvroReader.create(schema, idToConstant));
    }

The createResolvingReader line is still there, just moved into its own service class instead of in branches of a switch statement.

In addition, there are now a lot more abstractions:

  • A builder for creating an appender for a file format
  • A builder for creating a data file writer for a file format
  • A builder for creating an equality delete writer for a file format
  • A builder for creating a position delete writer for a file format
  • A builder for creating a reader for a file format
  • A "service" registry (what is a service?)
  • A "key"
  • A writer service
  • A reader service

I think that the next steps are to focus on making this a lot simpler, and there are some good ways to do that:

  • Focus on removing boilerplate and hiding the internals. For instance, Key, if needed, should be an internal abstraction and not complexity that is exposed to callers
  • The format-specific data and delete file builders typically wrap an appender builder. Is there a way to handle just the reader builder and appender builder?
  • Is the extra "service" abstraction helpful?
  • Remove ServiceLoader and use a simpler solution. I think that formats could simply register themselves like we do for InternalData. I think it would be fine to have a trade-off that Iceberg ships with a list of known formats that can be loaded, and if you want to replace that list it's at your own risk.
  • Standardize more across the builders for FileFormat. How idToConstant is handled is a good example. That should be passed to the builder instead of making the whole API more complicated. Projection is the same.

rdblue avatar Feb 22 '25 00:02 rdblue

While I think the goal here is a good one, the implementation looks too complex to be workable in its current form.

I'm happy that we agree with the goals. I created a PR to start the conversation. If there are willing reviewers we can introduce more invasive changes to archive a better API. I'm all for it!

The primary issue that we currently have is adapting object models (like Iceber's internal StructLike, Spark's InternalRow, or Flink's RowData) to file formats so that you can separately write object model to format glue code and have it work throughout support for an engine.

I think we need to keep this direct transformations to prevent the performance loss which would be caused by multiple transformations between object model -> common model -> file format.

We have a matrix of transformation which we need to encode somewhere:

Source Target
Parquet StructLike
Parquet InternalRow
Parquet RowData
Parquet Arrow
Avro ...
ORC ...

[..]

  • Rather than a switch, the format is passed to create the builder
  • There is no longer a callback passed to create readers for the object model (createResolvingReader)

The InternalData reader has one advantage over the data file readers/writers. The internal object model is static for these readers/writers. For the DataFile readers/writers we have multiple object models to handle.

[..] I think that the next steps are to focus on making this a lot simpler, and there are some good ways to do that:

  • Focus on removing boilerplate and hiding the internals. For instance, Key, if needed, should be an internal abstraction and not complexity that is exposed to callers

If we allow adding new builders for the file formats we can remove a good chunk of the boilerplate code. Let me see how this would look like

  • The format-specific data and delete file builders typically wrap an appender builder. Is there a way to handle just the reader builder and appender builder?

We need to refactor the Avro positional delete write for this, or add a positionalWriterFunc. Also need to consider that the format specific configurations which are different for the appenders and the delete files (DELETE_PARQUET_ROW_GROUP_SIZE_BYTES vs. PARQUET_ROW_GROUP_SIZE_BYTES)

  • Is the extra "service" abstraction helpful?

If we are ok with having a new Builder for the readers/writers, then we don't need the service. It was needed to keep the current APIs and the new APIs compatible.

  • Remove ServiceLoader and use a simpler solution. I think that formats could simply register themselves like we do for InternalData. I think it would be fine to have a trade-off that Iceberg ships with a list of known formats that can be loaded, and if you want to replace that list it's at your own risk.

Will do

  • Standardize more across the builders for FileFormat. How idToConstant is handled is a good example. That should be passed to the builder instead of making the whole API more complicated. Projection is the same.

Will see what could be arcived

pvary avatar Feb 24 '25 10:02 pvary

@rdblue: Addressed most of your comments. Could you check if you agree with the general approach? If the community is satisfied with the approach, we might want to proceed by separating out different parts of the changeset here for easier approach.

WDYT?

pvary avatar Feb 28 '25 16:02 pvary

Here are a few important open questions:

  1. We should decide on the expected filtering behavior. Currently the filters are applied as best effort for the file format readers. We might decide on more strict behavior, and enforce the file formats to apply all filters when provided. I would suggest to do it in another PR even if we chose to change current state.
  2. Batch sizes are currently parameters which could be set for non-vectorized readers too. We could put the batch size as a reader property, and tell the readers to parse the reader properties when batch read happens. I would prefer the current solution as the expectation for the readers is self documented.
  3. Parquet/Orc configuration. Currently the Spark batch reader uses different configuration objects for Parquet and ORC as requested by @aokolnychyi. @rdblue suggested to use a common configuration instead. I'm still learning the Spark code, so I don't have a strong opinion here

pvary avatar Mar 19 '25 15:03 pvary

Overall, +1 from me 🎉

I made a prototype Lance implementation here: 84bf5c5

@westonpace: Thanks for the feedback, I really appreciate that you took time to implement the API for Lance and shared your learnings!

That being said, I think a really cool addition in the future would be a base implementation that uses Arrow. As long as a reader/writer can produce/consume VectorSchemaRoot and it puts the field ids in the Arrow field schema, then 80% of the glue code will be provided for them. The name mapping, field id handling, constant handling, and spark<->arrow conversion could all be part of the base implementation.

Are you suggesting that we should use Arrow as an intermediate format? So basically Iceberg should implement the transformations between an Arrow VectorSchemaRoot to the engine specific ObjectModels (Generic/Spark/Flink), and the File Formats should implement the transformation between the File Format internal model and the Arrow VectorSchemaRoot? What do you think about the overhead (memory/CPU) of the double transformation? Do you have experience with this on the hot path for reading/writing? I specifically tried to avoid the double transformation to ensure that the performance doesn't suffer.

Thanks, Peter

pvary avatar Apr 09 '25 08:04 pvary

Are you suggesting that we should use Arrow as an intermediate format?

That is one way, and personally, I'd be a fan, but I wasn't sure it was practical or not in the code base.

I was thinking more that there could just be an abstract base class for formats that were Arrow-native. E.g. "here is the reader api and you need to implement these 10 methods" but also "if you speak arrow, extend this, which implements the 10 methods for you as long as you implement these 3 arrow-methods" (I'm just making the 3 and 10 up here).

What do you think about the overhead (memory/CPU) of the double transformation? Do you have experience with this on the hot path for reading/writing?

Arrow is designed to be zero-copy. Currently, I believe there are no transformations on the read path. The data starts out as an arrow-rs RecordBatch. When we cross the JNI boundary we use the C data interface so only the metadata is marshaled to create the VectorSchemaRoot. Then, and this is where I'm a little less certain, I believe it is zero-copy to create Spark's ColumnarBatch from VectorSchemaRoot (ArrowColumnVector is a subclass of ColumnVector that satisfies the APIs against the underlying vector directly instead of copying it). So at most there should be a few hundred bytes of allocation (for things like lists of children in structs) per batch.

The write path does have one transformation. Spark is not providing us a ColumnarBatch on write (and even if it did I doubt it would be backed by ArrowColumnVector) and so a copy is required to go from InternalRow to VectorSchemaRoot. However, the copy from VectorSchemaRoot to Rust's RecordBatch is zero-copy so there is only one transformation total.

westonpace avatar Apr 09 '25 15:04 westonpace

Are you suggesting that we should use Arrow as an intermediate format?

That is one way, and personally, I'd be a fan, but I wasn't sure it was practical or not in the code base.

This would be an interesting next step to consider. It might be easier on Spark, but for other engines that do not rely on Arrow for reads, this next layer would be an additional step. They would need to create their own zero-copy implementation for their internal format above Arrow to make this work effectively. I'm not sure this is an easy option in every engine. On the other hand, this path would have the added benefit of removing the burden from the engines to implement the ObjectModel for every file format. They would only need to implement the Arrow reader/writer once, and then the other part is handled by the code provided by the supported file formats.

I would defer this discussion to the dev list for a later stage.

I think the PR is big enough at this point to avoid any further feature creep. It creates a nice, clean API that could be used later for whatever the community decides to use it for.

pvary avatar Apr 10 '25 08:04 pvary

Extracted the proposed interfaces to a different PR, so it is easier to review them. See: #12774

pvary avatar Apr 11 '25 13:04 pvary

Right now, there are two questions that Peter and I are discussing offline.

  1. is it necessary to have a globally shared FileAccessFactoryRegistry in the iceberg-data module and have engine to register the factories class (e.g. FlinkObjectModels)?

The usage pattern in the engine (like Flink) is not much different in both models (with or without registry).

here is the usage pattern for with registry model.

      WriteBuilder<?, RowType, RowData> builder =
          FileAccessFactoryRegistry.writeBuilder(
              format,
              FlinkObjectModels.FLINK_OBJECT_MODEL,
              EncryptedFiles.plainAsEncryptedOutput(outputFile));

Here is the usage pattern for without registry model (from Peter's testing PR #13257).

      WriteBuilder<?, RowType, RowData> builder =
          FlinkFileAccessor.INSTANCE.writeBuilder(
              format, EncryptedFiles.plainAsEncryptedOutput(outputFile));

From my perspective, I don't see much value for sharing the Flink and Spark factory registration globally. Each engine should know the factory it should use.

  1. Peter was thinking about consolidating engine integrations in the file format module.

Let's say we want to add support for Vortex file format. Here are the steps needed after this effort

  1. [api] add vortex to FileFormat enum
  2. [vortex] implement VortexFileAccessFactory
  3. [flink/spark] implement vortex row reader and writer
  4. [flink/spark] add the factory implementation and register to the global factory (if keeping the registry model)

Peter thought 2-4 can be consolidated to the iceberg-vortex module. But it means that iceberg-vortex module would depend on iceberg-flink and iceberg-spark modules.

From my perspective, it does not seem right to have file format module (like iceberg-vortex) depends on engine module. The reverse dependency model (engine depends on file format module) makes more sense to me. The engine integration code for file format (readers for engine data type like Flink RowData) should exist in the engine module. I know it means a new file format support would requires changes in engine modules. But that would be the same for engines living outside iceberg repo.

stevenzwu avatar Jun 23 '25 22:06 stevenzwu

Let's say we want to add support for Vortex file format. Here are the steps needed after this effort

[api] add vortex to FileFormat enum [vortex] implement VortexFileAccessFactory [flink/spark] implement vortex row reader and writer [flink/spark] add the factory implementation and register to the global factory (if keeping the registry model)

Peter thought 2-4 can be consolidated to the iceberg-vortex module. But it means that iceberg-vortex module would depend on iceberg-flink and iceberg-spark modules.

From my perspective, it does not seem right to have file format module (like iceberg-vortex) depends on engine module. The reverse dependency model (engine depends on file format module) makes more sense to me. The engine integration code for file format (readers for engine data type like Flink RowData) should exist in the engine module. I know it means a new file format support would requires changes in engine modules. But that would be the same for engines living outside iceberg repo.

There is a part of the code where both Vortex/Spark and Vortex/Flink is needed. We can't get away from it. We have:

  1. Iceberg File Format API + Vortex specific code for the readers/writers
  2. Vortex + Spark specific code for the Spark readers
  3. Vortex + Flink specific code for the Flink readers

We can add 2 to Spark, and add 3 to Flink, or we can decouple Spark and Flink and have an independent module for 2 and 3. Another option is to merge 1, 2, 3 together.

Maybe the best would be keeping all of them separate, but if the size is not too big, I would prefer to put all of them to a single module and share in a single jar.

If we keep the Vortex/Spark, Vortex/Flink, or for that matter ORC/Spark and ORC/Flink in their separate modules, then the Registry based solution enables us to avoid changing the Spark/Flink code when we decide to add Vortex, or remove ORC support.

pvary avatar Jun 24 '25 12:06 pvary