[SPARK-49547][SQL][PYTHON] Add iterator of `RecordBatch` API to `applyInArrow`
What changes were proposed in this pull request?
Add the option to applyInArrow to take a function that takes an iterator of RecordBatch and returns an iterator of RecordBatch, and respect spark.sql.execution.arrow.maxRecordsPerBatch on the input iterator.
Why are the changes needed?
Being limited to returning a single Table requires collecting all results in memory for a single batch. This can require excessive memory for certain edge cases with large groups. Currently the Python worker immediately converts a table into it's underlying batches, so there's barely any changes required to accommodate this. There are larger changes required to support max records per batch on the input side.
Does this PR introduce any user-facing change?
Yes, a new function signature supported by applyInArrow
How was this patch tested?
Updated existing UTs to test both Table signatures and RecordBatch signatures
Was this patch authored or co-authored using generative AI tooling?
No
@zhengruifeng @HyukjinKwon @xinrong-meng this is the alternative/follow-on to https://github.com/apache/spark/pull/48038 that includes implementing the input side JVM batching in addition to the Python API updates
what about adding a new test that likely fails with
Table->Tablesignature but succeeds with new iterator signature?
The only thing that should fail with the Table -> Table is something that goes out of memory
Gentle ping for potential inclusion in 4.0
Gentle ping again now that 4.0 is out
Hi, @HyukjinKwon @zhengruifeng could you please review this again? We indeed encountered this problem in our production jobs when users call applyInPandas, which returns a large DataFrame.
Gentle ping again, it's getting tricky to keep up with/figure out new merge conflicts each time there are conflicting changes that make it in
Gentle ping again, it's getting tricky to keep up with/figure out new merge conflicts each time there are conflicting changes that make it in
@Kimahriman we plan to optimize the batch size in multiple UDF types, e.g.
- SQL_GROUPED_MAP_PANDAS_UDF
- SQL_GROUPED_MAP_ARROW_UDF
- SQL_GROUPED_AGG_ARROW_UDF
- SQL_GROUPED_AGG_PANDAS_UDF
- SQL_WINDOW_AGG_PANDAS_UDF
- SQL_WINDOW_AGG_ARROW_UDF
this is the first one for SQL_GROUPED_MAP_PANDAS_UDF /SQL_GROUPED_MAP_ARROW_UDF. Can we reuse it for the iterator API?
Regarding this PR, I think we should:
1, exclude any changes in cogroup;
2, add a new eval type for the new iterator API, because it is a user-facing change. For example, we have SQL_SCALAR_PANDAS_ITER_UDF for the iterator API in Pandas UDF.
I can try to update with new eval types and using type hints as the mechanism. I do think your update will fit in fine, as it's just a different way to implement the JVM side batching that I implemented here. Basically you just used a trait instead of a subclass. I do think it would still be beneficial for both batched and non-batched to go through the same code path, otherwise including more eval types will make it even more complicated
I have a new version of this with a new eval type and using type hints ready to go after https://github.com/apache/spark/pull/52303 gets merged
Closing in favor of https://github.com/apache/spark/pull/52440