spark icon indicating copy to clipboard operation
spark copied to clipboard

[SPARK-49547][SQL][PYTHON] Add iterator of `RecordBatch` API to `applyInArrow`

Open Kimahriman opened this issue 1 year ago • 3 comments

What changes were proposed in this pull request?

Add the option to applyInArrow to take a function that takes an iterator of RecordBatch and returns an iterator of RecordBatch, and respect spark.sql.execution.arrow.maxRecordsPerBatch on the input iterator.

Why are the changes needed?

Being limited to returning a single Table requires collecting all results in memory for a single batch. This can require excessive memory for certain edge cases with large groups. Currently the Python worker immediately converts a table into it's underlying batches, so there's barely any changes required to accommodate this. There are larger changes required to support max records per batch on the input side.

Does this PR introduce any user-facing change?

Yes, a new function signature supported by applyInArrow

How was this patch tested?

Updated existing UTs to test both Table signatures and RecordBatch signatures

Was this patch authored or co-authored using generative AI tooling?

No

Kimahriman avatar Nov 28 '24 13:11 Kimahriman

@zhengruifeng @HyukjinKwon @xinrong-meng this is the alternative/follow-on to https://github.com/apache/spark/pull/48038 that includes implementing the input side JVM batching in addition to the Python API updates

Kimahriman avatar Nov 28 '24 13:11 Kimahriman

what about adding a new test that likely fails with Table->Table signature but succeeds with new iterator signature?

The only thing that should fail with the Table -> Table is something that goes out of memory

Kimahriman avatar Nov 29 '24 12:11 Kimahriman

Gentle ping for potential inclusion in 4.0

Kimahriman avatar Jan 15 '25 16:01 Kimahriman

Gentle ping again now that 4.0 is out

Kimahriman avatar Jun 24 '25 14:06 Kimahriman

Hi, @HyukjinKwon @zhengruifeng could you please review this again? We indeed encountered this problem in our production jobs when users call applyInPandas, which returns a large DataFrame.

ConeyLiu avatar Jul 04 '25 07:07 ConeyLiu

Gentle ping again, it's getting tricky to keep up with/figure out new merge conflicts each time there are conflicting changes that make it in

Kimahriman avatar Sep 11 '25 20:09 Kimahriman

Gentle ping again, it's getting tricky to keep up with/figure out new merge conflicts each time there are conflicting changes that make it in

@Kimahriman we plan to optimize the batch size in multiple UDF types, e.g.

  • SQL_GROUPED_MAP_PANDAS_UDF
  • SQL_GROUPED_MAP_ARROW_UDF
  • SQL_GROUPED_AGG_ARROW_UDF
  • SQL_GROUPED_AGG_PANDAS_UDF
  • SQL_WINDOW_AGG_PANDAS_UDF
  • SQL_WINDOW_AGG_ARROW_UDF

this is the first one for SQL_GROUPED_MAP_PANDAS_UDF /SQL_GROUPED_MAP_ARROW_UDF. Can we reuse it for the iterator API?

Regarding this PR, I think we should: 1, exclude any changes in cogroup; 2, add a new eval type for the new iterator API, because it is a user-facing change. For example, we have SQL_SCALAR_PANDAS_ITER_UDF for the iterator API in Pandas UDF.

zhengruifeng avatar Sep 12 '25 03:09 zhengruifeng

I can try to update with new eval types and using type hints as the mechanism. I do think your update will fit in fine, as it's just a different way to implement the JVM side batching that I implemented here. Basically you just used a trait instead of a subclass. I do think it would still be beneficial for both batched and non-batched to go through the same code path, otherwise including more eval types will make it even more complicated

Kimahriman avatar Sep 13 '25 16:09 Kimahriman

I have a new version of this with a new eval type and using type hints ready to go after https://github.com/apache/spark/pull/52303 gets merged

Kimahriman avatar Sep 24 '25 11:09 Kimahriman

Closing in favor of https://github.com/apache/spark/pull/52440

Kimahriman avatar Sep 24 '25 18:09 Kimahriman