iceberg icon indicating copy to clipboard operation
iceberg copied to clipboard

Distributing Job Planning on Spark

Open RussellSpitzer opened this issue 5 years ago • 14 comments

Background: One issue we’ve seen come up frequently with our larger table scans in Spark is the amount of time required for job planning. Spark reads have a long startup phase waiting to plan all read tasks before any progress can be made. This phase represents a large amount of time where resources are held but no progress is made on the job. In our workloads we see the the majority of time is spent trying to read and process ManifestFiles so we are looking into ways of parallelizing this step and hopefully reducing the amount of time in planning.

One recent example of how this can cause dramatic speedups is in the work with ExpireSnapshotsAction, which prior to a fix made by rdblue, spent the majority of it’s time reading manifest files locally. Prior to this fix, the job was stalled during the planning phase as manifests were being read. Here the issue was fixed acutely by changing the construction of the metadata dataframe. We would like to take this approach to Spark based reads in general.

Potential Solutions: Ideally we would like to be able to heuristically decide when a distributed planning solution would be useful. When those checks pass we would start a Spark job whose only mission is to produce the tasks for the actual scan.

We have prototyped two approaches we think might be steps in the right direction but were hoping for some feedback from the community, especially if there is another route we haven’t considered yet.

Both of these implementations work at a basic level although the 2nd approach does not yet have deletes implemented.

  1. Addition to TableScan api (https://github.com/apache/iceberg/pull/1420) This modification targets ManifestGroup with the goal of just parallelizing the ManifestReading phase on command. To do this, we allow TableScan users to specify a function (ManifestProcessor) for turning a list of Manifests into an CloteableIterable of Manifest Entries. The Actual reading function is expected to still be provided by the TableScan but the execution of that function is changed based on the user provided ManifestProcessor. This requires a bit of additional code to the API module so presents a bit of risk there, as well as requiring some things to be serializable which previously were not. On the plus side, this allows the distributed planning and normal planning pathways to share code.

  2. Refactor of PlanFiles for Spark (https://github.com/apache/iceberg/pull/1421) This modification aims to replicate the planning portion of DataTableScan as separate code (SparkPlanUtil). Here we use the DataFiles metadata table to get our original listing of DataFiles. Because the metadata table doesn’t support any kind of pushdown yet, we have to read all manifest files in the current implementation. Then we apply our filters to the resultant DataFile rows and end up producing ScanTasks in Spark. These results are returned to the driver and used in the scan. This approach also required changing a bit of serializability, although we have plans on how to reduce that. The biggest pain point here was getting the PartitionSpecId info from the metadata table. Currently that information is missing since the row only returns the information located in the avro files. A ghost column was added to the table which is populated with the specId to propagate the information but this approach could definitely be improved.

If you are interested in this or have feedback I would love to hear it. Neither of the above PR's is production ready at the moment, but I wanted to get some feedback before we finish one (or neither) of them.

RussellSpitzer avatar Sep 03 '20 21:09 RussellSpitzer

If it costs a lot of time to read manifest file, why not merge the data files or merge manifests under a manifest list file?

yiguolei avatar Sep 04 '20 02:09 yiguolei

To give a bit more background, job planning is fast even on huge tables if we have a partition predicate and end up processing 10-15 partitions in a single job. We have RewriteManifestsAction to rewrite metadata and align it with partitions on demand. That covers all common use cases.

At the same time, Iceberg supports file filtering within partitions and opens the opportunity for efficient full table scans. So there are two use cases we want to address:

  • job planning for full table scans (less important)
  • job planning for queries with predicates only on sort key in partitioned tables (becomes common in Iceberg tables)

The second use case is the primary one. It is a common pattern to partition tables by date and sort by key and have queries without partition predicates but with predicates on the sort key. Such use cases were not possible without Iceberg since we could not filter files within partitions and we ended up with full table scans. Right now, we can narrow down the number of matching files to 1-2 per partition. So we can scan PB scale tables for a given key faster. If we do so right now, we end up spending most of the time during job planning. That's why it would be better to parallelize that.

aokolnychyi avatar Sep 04 '20 05:09 aokolnychyi

In the future, I see the possibility to load and leverage secondary indexes using this approach too.

aokolnychyi avatar Sep 04 '20 05:09 aokolnychyi

@yiguolei In the use cases we are thinking about the

If it costs a lot of time to read manifest file, why not merge the data files or merge manifests under a manifest list file?

The issue is that that approach also does not scale with very large amounts of metadata. We would I really like a solution that allows us to do TableScan style queries even when the underlying table has hundreds of megs if not gigabytes of metadata.

RussellSpitzer avatar Sep 04 '20 15:09 RussellSpitzer

I haven't formed an opinion on what approach is more promising right now. Here are my thoughts after a quick look:

Option 1 (extension to the TableScan API)

Pros:

  • Reuses the existing filtering logic.
  • API could be reused by other query engines.
  • Seems to require less code?

Cons:

  • Is a substantial change to the core planning logic that requires thorough testing (both performance and correctness).
  • Requires to think about serialization and especially Kryo serialization during planning (was not needed before).

Option 2 (metadata tables)

Pros:

  • Reuses the existing logic to read manifests in a scalable way via metadata tables.
  • Reuses the existing logic for wrapping Spark Rows into Iceberg DataFiles.
  • Doesn't touch the core planning API and is more isolated.
  • Maybe, can be exposed as an action (makes sense or not?)

Cons:

  • Requires instantiating evaluators ourselves.
  • Seems to require a bit more code but I feel like it can be simplified.
  • Specific to Spark but could be implemented in other systems that support metadata tables.

Implementation aside, we need to consider when to apply this. Ideally, we would have some sort of a threshold for the size of manifests we need to scan after manifest filtering. If we narrow down the scope to a couple of manifests, plan locally. Otherwise, plan using Spark. I am not sure it will be that easy, though. Instead, we could support a Spark read option that would tell us which mode to use. The value can be local, distributed, auto. In auto, the simplest option is to analyze the scan condition. If there is a reasonable partition predicate (e.g. equals or inSet), we could always do planning locally. If not and if distributed is enabled, leverage Spark.

aokolnychyi avatar Sep 04 '20 23:09 aokolnychyi

I tend to think that Option 2 would require less substantial changes. We anyway want to have predicate pushdown in metadata tables, fix spec id in SparkDataFile, propagate spec id in the files metadata table. That being said, I would be interested to know whether any other query engines plan to leverage this too. I know planning in Presto is very different, though.

aokolnychyi avatar Sep 23 '20 05:09 aokolnychyi

@aokolnychyi We have this use case too

job planning for queries with predicates only on sort key in partitioned tables

where distributed job planning can be helpful. I think one reason why partition predicates are faster is because we have partition upper and lower bounds aggregated at a manifest file level. If we aggregate these metrics for other fields as well do you think that helps your case here?

shardulm94 avatar Sep 23 '20 20:09 shardulm94

I think one reason why partition predicates are faster is because we have partition upper and lower bounds aggregated at a manifest file level. If we aggregate these metrics for other fields as well do you think that helps your case here?

I am afraid that won't help as a single manifest covers a subset of partitions and we tend to have values for the complete range of sort keys in every partition. We really need to read all manifests to get to the file stats.

aokolnychyi avatar Sep 24 '20 01:09 aokolnychyi

So it's been a while and we've gotten a bit of feedback on the PR's - Thanks @kbendick , @aokolnychyi , and @rdblue

Since I think at the moment it seems like the safer path is the MetadataTables based approach I'm going to start cleaning up and moving forward with that PR.

For me the big benefits of the other approach were mainly around code re-use and the possibility of other systems to use the same method for parallelization. Since we haven't really heard from anyone using presto, flink, or whatnot I'm going to assume that the benefit is probably not that great. We can always use a set of common ideas with all these platforms as well each having their own planning algorithm if need be.

RussellSpitzer avatar Sep 29 '20 14:09 RussellSpitzer

I think that I agree with the metadata table approach. Because Presto can run tasks and planning at the same time, this is less of an issue. And the work done for Spark in option 2 could translate into a parallel scan on a Presto metadata table as well (converting partition predicates to filters on metadata table columns). Flink is much more likely to consume tables incrementally, so I think it wouldn't be a big issue there for now (but would be nice to hear from them).

Risk is lower with option 2, and I think it sounds like the better option. It also pushes on the metadata tables in healthy ways: it would incentivize building pushdown in the files and entries metadata tables and might require adding a delete_files metadata table. Those are good side-effects of implementing this that way.

rdblue avatar Sep 29 '20 19:09 rdblue

Sounds like we have an initial plan then. Excited to see progress on this.

aokolnychyi avatar Sep 29 '20 20:09 aokolnychyi

Hello, i see this is work in progress. Any estimates on when will this be merged to master?

MayankSharma-MS avatar Jul 15 '22 07:07 MayankSharma-MS

It is very out of date, we never went forward with finishing it, so my current estimate would be the far future :)

RussellSpitzer avatar Jul 15 '22 18:07 RussellSpitzer

This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.

github-actions[bot] avatar Feb 22 '24 00:02 github-actions[bot]

This issue has been closed because it has not received any activity in the last 14 days since being marked as 'stale'

github-actions[bot] avatar Mar 08 '24 00:03 github-actions[bot]