iceberg icon indicating copy to clipboard operation
iceberg copied to clipboard

Distributing Job Planning on Spark

Open RussellSpitzer opened this issue 3 years ago • 14 comments

Background: One issue we’ve seen come up frequently with our larger table scans in Spark is the amount of time required for job planning. Spark reads have a long startup phase waiting to plan all read tasks before any progress can be made. This phase represents a large amount of time where resources are held but no progress is made on the job. In our workloads we see the the majority of time is spent trying to read and process ManifestFiles so we are looking into ways of parallelizing this step and hopefully reducing the amount of time in planning.

One recent example of how this can cause dramatic speedups is in the work with ExpireSnapshotsAction, which prior to a fix made by rdblue, spent the majority of it’s time reading manifest files locally. Prior to this fix, the job was stalled during the planning phase as manifests were being read. Here the issue was fixed acutely by changing the construction of the metadata dataframe. We would like to take this approach to Spark based reads in general.

Potential Solutions: Ideally we would like to be able to heuristically decide when a distributed planning solution would be useful. When those checks pass we would start a Spark job whose only mission is to produce the tasks for the actual scan.

We have prototyped two approaches we think might be steps in the right direction but were hoping for some feedback from the community, especially if there is another route we haven’t considered yet.

Both of these implementations work at a basic level although the 2nd approach does not yet have deletes implemented.

  1. Addition to TableScan api (https://github.com/apache/iceberg/pull/1420) This modification targets ManifestGroup with the goal of just parallelizing the ManifestReading phase on command. To do this, we allow TableScan users to specify a function (ManifestProcessor) for turning a list of Manifests into an CloteableIterable of Manifest Entries. The Actual reading function is expected to still be provided by the TableScan but the execution of that function is changed based on the user provided ManifestProcessor. This requires a bit of additional code to the API module so presents a bit of risk there, as well as requiring some things to be serializable which previously were not. On the plus side, this allows the distributed planning and normal planning pathways to share code.

  2. Refactor of PlanFiles for Spark (https://github.com/apache/iceberg/pull/1421) This modification aims to replicate the planning portion of DataTableScan as separate code (SparkPlanUtil). Here we use the DataFiles metadata table to get our original listing of DataFiles. Because the metadata table doesn’t support any kind of pushdown yet, we have to read all manifest files in the current implementation. Then we apply our filters to the resultant DataFile rows and end up producing ScanTasks in Spark. These results are returned to the driver and used in the scan. This approach also required changing a bit of serializability, although we have plans on how to reduce that. The biggest pain point here was getting the PartitionSpecId info from the metadata table. Currently that information is missing since the row only returns the information located in the avro files. A ghost column was added to the table which is populated with the specId to propagate the information but this approach could definitely be improved.

If you are interested in this or have feedback I would love to hear it. Neither of the above PR's is production ready at the moment, but I wanted to get some feedback before we finish one (or neither) of them.

RussellSpitzer avatar Sep 03 '20 21:09 RussellSpitzer