Allow sparksql to override target split size with session property
The target split size can be set in SparkSQL by spark.sql.iceberg.db.tbl.read-split-size where such config is a table oriented config.
Why would we want to support this property at the session level? The split size is specific to data volume, which varies from table to table. Spark built-in tables rely solely on the session config but it is not the case for Iceberg. We do have SQL properties to override what's set in the table but I am not sure this is a good use case.
Hi @aokolnychyi I think there is legit value for this.
We are migrating hundreds of Hive tables to Iceberg. Ensuring the SparkSQL consumers of these tables don't fail is our top priorities. So the SparkSQL job used to read Hive table with some "spark.sql.files.maxPartitionBytes" values will fail if the Iceberg table split size is at huge difference, since this is causing more splits to be generated causing job failures.
It is even more complicated if different downstream jobs have different "spark.sql.files.maxPartitionBytes" values (I am not sure if this really happens, but in theory it could).
Can we identify exact scenarios when the default split size performs poorly and check if we can solve the underlying problem? For instance, if the scheduler is FIFO, can we use the default cluster parallelism and the size of the data to be processed to come up with an optimal split size? We first find matching files and then plan splits so the split size can be dynamic, we just need a good way to estimate it correctly.
I am not going to oppose a SQL config but I don't think we should rely on an internal SQL property for built-in file sources.
Thoughts, @puchengy @RussellSpitzer @szehon-ho @singhpk234 @rdblue?
We first find matching files and then plan splits so the split size can be dynamic, we just need a good way to estimate it correctly.
+1 on @aokolnychyi's suggestion on dynamic split size generation, this would help in solving the problem from both sides small table with a large split size, and large table with a small split size, since iceberg is responsible for the scan planning it's best if it's takes the call itself on how to make these splits.
Hi @aokolnychyi,
I am not going to oppose a SQL config but I don't think we should rely on an internal SQL property for built-in file sources.
Trying understand your stance here. Do you mean you are fine with the current change but against making it correlate to "spark.sql.files.maxPartitionBytes"? If so, I am fine with that.
Can we identify exact scenarios when the default split size performs poorly and check if we can solve the underlying problem?
I can share two scenarios, they don't really lead to poor performance, but it made our platform team's life harder ("harder" means making migration work more challenging).
(1) as mentioned above, when SparkSQL used to consume a Hive table with a large "spark.sql.files.maxPartitionBytes" value (for example, 1GB), changing the underlying table to Iceberg (default to 128MB split size) will immediately increase the split count by 8x (in theory), this will lead to driver memory consumption increased and cause job driver OOM. To fix that, we will have to increase driver memory manually for that job.
(2) we have a strict SLA with we customer, this usually mean when we perform a change to a SparkSQL job (i.e. underlying table from Hive to Iceberg), hopefully we make sure the output are the same (number of files and size of each files). In the case of Iceberg migration, when source table is changed from Hive to Iceberg, due to the split count changes, it will directly increase the SparkSQL job output files by 8x (in theory). While we can further make a case that the increase is OK, but this is making the surface of work larger thus slower down the innovation. To fix that, we have to add explicit coalesce to match the behavior.
Noted that I am also pro for the dynamic generation, but it will still be valuable for folks who are trying to adopt Iceberg in a large scale but don't want to care too much about the nuances and just want to match the existing behavior (i.e. no need to change workload configuration or query).
Do you mean you are fine with the current change but against making it correlate to "spark.sql.files.maxPartitionBytes"?
Yeah, I don't mind adding an Iceberg SQL property if it benefits you and other folks also support it but I would like to think through other alternatives to make sure we are not overlooking a better approach. I don't think it is a good idea to support any properties for built-in sources, though. Split planning is a bit different. If we support one config, will we have to support others?
this will lead to driver memory consumption increased and cause job driver OOM. To fix that, we will have to increase driver memory manually for that job.
Is there a way to set this value correctly during the migration or is the split size different for different workloads?
To fix that, we have to add explicit coalesce to match the behavior.
I assume there is no shuffle in that read-write job so that AQE cannot coalesce/split tasks during writes?
Let's hear what others think. I am OK to add this property to unblock you but it would be great to explore the automatic split configuration. I created #7465 for that.
@aokolnychyi Thanks for your understanding.
Split planning is a bit different. If we support one config, will we have to support others?
Unfortunately, that is the case. But we don't have to proactively pull in other configs if no one need.
Is there a way to set this value correctly during the migration or is the split size different for different workloads?
Yes, there is a way, but to intelligently automate this will need more work (which is why I am trying to explore this possibility). Also higher driver memory means more resource usage, this will lead to additional layer of complexity for user education (why it is ok to bump the memory and why the cost will not be high etc).
I haven't seen a case where split size is different for different workloads, but I am not surprised if there is since in our platform, customers are allowed to set any configs they would like.
I assume there is no shuffle in that read-write job so that AQE cannot coalesce/split tasks during writes?
Yes, you are correct.
I think one use case for different split size, is the more advanced GDPR use cases (row level delete).
If you have completely optimized the job to do a broadcast join (no shuffle), you can control the number of written files by split size. This may be different than doing a regular read where performance is the main concern.
@aokolnychyi @RussellSpitzer @szehon-ho @singhpk234 @rdblue Hi there, a gentle ping on this diff. Given that we have some discussions and a general agreement on this topic already, I am wondering what we need to do proceed and merge the change? Thanks
I thought this was already supported, but I don't see it. The way we did this at Netflix was to add a table-specific property to SQL, like spark.sql.iceberg.db.table.split-size=....
This is something that should be set per table because it is data-specific, which is why we don't have a global option for it.
@rdblue Hi Ryan, thanks for sharing this. I can implement this if it is not supported.
@rdblue @aokolnychyi @singhpk234 @szehon-ho Hi, I updated the PR as what Ryan suggested. The current implementation allows spark.sql.iceberg.db.tbl.read-split-size to be set for each individual table. Can some of you take a look, thanks!
@puchengy, we have discussed this during the sync (it is unfortunate we missed you). The consensus was to invest into an algorithm to pick the split size automatically. We also converged that just offering a single SQL property to override the split is probably not a good idea as we plan to pick the split size automatically.
That said, we should discuss whether offering a generic way to override a table property via SQL would make sense. In my view, this is the most reasonable short-term solution for this problem. The only concern I have is how to handle special characters.
@rdblue @RussellSpitzer @danielcweeks, could you share your thoughts?
I am going to drop this PR from the 1.3.0 as it, probably, needs a bit of discussion.
@aokolnychyi Thanks for sharing this. I would to love to go for the generic way to override a table property via SQL approach and I would love to offer the implementation, If you and @rdblue @RussellSpitzer @danielcweeks thinks this is something that community can adopt. Otherwise, we can continue the discussion for other solutions.
If we are fine with the approach, then my next question is I don't know a good implementation to this, so if you know, please let me know. Otherwise I can investigate and propose one.
Thanks all for participating this.
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions.
This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time.