Spark: Make maxRecordPerMicrobatch a soft limit
About the change
Make maxRecordsPerMicrobatch a soft limit, as the cases like for ex max number of records is less than the totalRecords of a file, would expect us to read the file partially (current behavior), this can't be just a row group boundary or something which we can incorporate in our scan tasks as if like splitting it at the record count cutoff, hence its very difficult to define the boundary, it would be better if we make the make this a soft limit as if and when including a file if it can be contained within the limit its fine, otherwise include the whole file and be done with that particular micro-batch stream.
This change is motivated by two major factors :
- stream being stuck presently leading to poor UX https://github.com/apache/iceberg/pull/12217#discussion_r1962211721
- Softlimit is what other solution enforce for ex : delta doc
maxBytesPerTrigger: How much data gets processed in each micro-batch. This option sets a “soft max”, meaning that a batch processes approximately this amount of data and may process more than the limit in order to make the streaming query move forward in cases when the smallest input unit is larger than this limit. This is not set by default.
Testing done
Modified the existing UT which mimics stuckness to pass now.
seems un-related failure
TestRewriteDataFilesAction > testParallelPartialProgressWithMaxFailedCommitsLargerThanTotalFileGroup() > formatVersion = 2 FAILED
java.lang.RuntimeException: partial-progress.enabled is true but 1 rewrite commits failed. This is more than the maximum allowed failures of 0. Check the logs to determine why the individual commits failed. If this is persistent it may help to increase partial-progress.max-commits which will split the rewrite operation into smaller commits.
at org.apache.iceberg.spark.actions.RewriteDataFilesSparkAction.doExecuteWithPartialProgress(RewriteDataFilesSparkAction.java:400)
at org.apache.iceberg.spark.actions.RewriteDataFilesSparkAction.execute(RewriteDataFilesSparkAction.java:187)