arrow icon indicating copy to clipboard operation
arrow copied to clipboard

ARROW-17178: [R] Support head() in arrow_dplyr_query with user-defined function

Open paleolimbot opened this issue 3 years ago • 6 comments

This PR adds support for more types of queries that include calls to R code (i.e., map_batches(..., .lazy = TRUE), user-defined functions in mutates, arranges, and filters, and custom extension type behaviour). Previously these queries failed because it wasn't possible to guarantee that the exec plan would be completely executed within a call to RunWithCapturedR() where we establish an event loop on the main R thread and launch a background thread to do "arrow stuff" that can queue tasks to run on the R thread.

The approach I took here was to stuff more of the ExecPlan-to-RecordBatchReader logic in a subclass of RecordBatchReader that doesn't call plan->StartProducing() until the first batch has been pulled. This lets you return a record batch reader and pass it around at the R level (currently how head/tail/a few other things are implemented), and as long as it's drained all at once (i.e., reader$read_table()) the calls into R will work.

The R code calls within an exec plan won't work with reader$read_next_batch() or the C data interface because there we can't guarantee an event loop.

This also has the benefit of allowing us to inject some cancelability to the ExecPlan since we can check a StopToken after #13635 (ARROW-11841) for an interrupt (for all exec plans). The biggest benefit is, in my view, that the lifecycle of the ExecPlan is more explicit...before, the plan was stopped when the object was deleted but it was written in a way that I didn't understand for a long time. I think a reader subclass makes it more explicit and maybe will help to print out nested queries (since they're no longer eagerly evaluated).

An example of something that didn't work before that now does:

library(arrow, warn.conflicts = FALSE)
#> Some features are not enabled in this build of Arrow. Run `arrow_info()` for more information.
library(dplyr, warn.conflicts = FALSE)

register_scalar_function(
  "times_32",
  function(context, x) x * 32.0,
  int32(),
  float64(),
  auto_convert = TRUE
)

record_batch(a = 1:1000) %>%
  dplyr::mutate(b = times_32(a)) %>%
  as_record_batch_reader() %>%
  as_arrow_table()
#> Table
#> 1000 rows x 2 columns
#> $a <int32>
#> $b <double>

record_batch(a = 1:1000) %>%
  dplyr::mutate(fun_result = times_32(a)) %>%
  head(11) %>%
  dplyr::collect()
#> # A tibble: 11 × 2
#>        a fun_result
#>    <int>      <dbl>
#>  1     1         32
#>  2     2         64
#>  3     3         96
#>  4     4        128
#>  5     5        160
#>  6     6        192
#>  7     7        224
#>  8     8        256
#>  9     9        288
#> 10    10        320
#> 11    11        352

Created on 2022-07-25 by the reprex package (v2.0.1)

paleolimbot avatar Jul 25 '22 20:07 paleolimbot

https://issues.apache.org/jira/browse/ARROW-17178

github-actions[bot] avatar Jul 25 '22 21:07 github-actions[bot]

:warning: Ticket has not been started in JIRA, please click 'Start Progress'.

github-actions[bot] avatar Jul 25 '22 21:07 github-actions[bot]

@github-actions crossbow submit r-valgrind

paleolimbot avatar Jul 26 '22 11:07 paleolimbot

Unable to match any tasks for `r-valgrind`
The Archery job run can be found at: https://github.com/apache/arrow/actions/runs/2739103985

github-actions[bot] avatar Jul 26 '22 13:07 github-actions[bot]

@github-actions crossbow submit test-r-linux-valgrind

paleolimbot avatar Jul 26 '22 14:07 paleolimbot

Revision: 5427055382398d1e2762247dd53d422329433a10

Submitted crossbow builds: ursacomputing/crossbow @ actions-67d0a81c6d

Task Status
test-r-linux-valgrind Azure

github-actions[bot] avatar Jul 26 '22 15:07 github-actions[bot]

@github-actions crossbow submit test-r-linux-valgrind

paleolimbot avatar Aug 27 '22 00:08 paleolimbot

Revision: 43dec7fc50f1e6a6f4e19619dbde8a1310ad87be

Submitted crossbow builds: ursacomputing/crossbow @ actions-171501a681

Task Status
test-r-linux-valgrind Azure

github-actions[bot] avatar Aug 27 '22 00:08 github-actions[bot]

@github-actions crossbow submit test-r-linux-valgrind

paleolimbot avatar Aug 29 '22 12:08 paleolimbot

Revision: 3997260ad4720c4eaf9861d52fbfcabe971e995a

Submitted crossbow builds: ursacomputing/crossbow @ actions-b55db16b92

Task Status
test-r-linux-valgrind Azure

github-actions[bot] avatar Aug 29 '22 12:08 github-actions[bot]

A quick summary + reprex to augment the bit I wrote above...this PR (1) undoes the kludges I introduced when getting the user-defined function bits to work and not fail the valgrind check, (2) allows nested exec plans to with user-defined functions to work and (3) allows the result of an exec plan to be inspected (e.g., to print/walk its relation tree or calculate its schema).

Reprex to play with:

library(arrow, warn.conflicts = FALSE)
#> Some features are not enabled in this build of Arrow. Run `arrow_info()` for more information.
library(dplyr, warn.conflicts = FALSE)

# The result of an ExecPlan is now a subclass of the RecordBatchReader
# that more carefully manages the lifecycle of the underlying exec plan
# (which includes not starting it until the first batch has been pulled
# and releasing it as soon as it is no longer needed)
result <- mtcars |> 
  as_arrow_table() |> 
  filter(mpg > 25) |> 
  as_record_batch_reader()

result
#> ExecPlanReader
#> <Status: PLAN_NOT_STARTED>
#> 
#> mpg: double
#> cyl: double
#> disp: double
#> hp: double
#> drat: double
#> wt: double
#> qsec: double
#> vs: double
#> am: double
#> gear: double
#> carb: double
#> 
#> See $metadata for additional Schema metadata
#> 
#> See $Plan() for details.
result$Plan()
#> ExecPlan
#> ExecPlan with 4 nodes:
#> 3:SinkNode{}
#>   2:ProjectNode{projection=[mpg, cyl, disp, hp, drat, wt, qsec, vs, am, gear, carb]}
#>     1:FilterNode{filter=(mpg > 25)}
#>       0:TableSourceNode{}
result$PlanStatus()
#> [1] "PLAN_NOT_STARTED"

as_arrow_table(result)
#> Table
#> 6 rows x 11 columns
#> $mpg <double>
#> $cyl <double>
#> $disp <double>
#> $hp <double>
#> $drat <double>
#> $wt <double>
#> $qsec <double>
#> $vs <double>
#> $am <double>
#> $gear <double>
#> $carb <double>
#> 
#> See $metadata for additional Schema metadata
result$PlanStatus()
#> [1] "PLAN_FINISHED"

# head() on a record batch reader is now fully lazy (i.e., never
# pull batches from its source until requested)
endless_reader <- as_record_batch_reader(
  function() stop("this will error if called"),
  schema = schema()
)

head(endless_reader)
#> RecordBatchReader

Created on 2022-08-29 by the reprex package (v2.0.1)

paleolimbot avatar Aug 29 '22 12:08 paleolimbot

@westonpace Whenever you get a chance, the C++ code here needs a set of eyes, particularly because it involves the lifecycle of the ExecPlan (notably, getting multiple ExecPlans to run under the same R event loop so that we can evaluate user-defined functions).

I was hoping that this change would also make it more clear when we StartProducing() and StopProducing()...previously we did call StopProducing() when the result RecordBatchReader was garbage collected but since we don't have any control of when that happens, we couldn't guarantee a prompt stop request for something like head() (I know that a prompt stop request doesn't stop the plan immediately, but it sounds like at some point it will).

I was also hoping this change would fix the sporadic memory leaks observed, which as you'll see from the valgrind crossbow jobs above, it does not. The checks above also confirm that those leaks are not a result of the IO thread pool shutting down (although occasionally one can observe a direct leak of an ExecPlan).

paleolimbot avatar Aug 31 '22 13:08 paleolimbot

Just bumping this - as Neal noted, it need a review of the C++ from @westonpace or @pitrou since it delves into some object lifecycles with which I'm not all that familiar.

paleolimbot avatar Sep 08 '22 17:09 paleolimbot

Benchmark runs are scheduled for baseline = 557acf524f6b73d73bdb9464e947b78b9d02fcea and contender = 59883630fcd737079e18035a3269a31eb7e0495e. 59883630fcd737079e18035a3269a31eb7e0495e is a master commit associated with this PR. Results will be available as each benchmark for each run completes. Conbench compare runs links: [Finished :arrow_down:0.0% :arrow_up:0.0%] ec2-t3-xlarge-us-east-2 [Failed :arrow_down:0.88% :arrow_up:0.0%] test-mac-arm [Failed :arrow_down:3.1% :arrow_up:0.0%] ursa-i9-9960x [Finished :arrow_down:0.04% :arrow_up:0.04%] ursa-thinkcentre-m75q Buildkite builds: [Finished] 59883630 ec2-t3-xlarge-us-east-2 [Failed] 59883630 test-mac-arm [Failed] 59883630 ursa-i9-9960x [Finished] 59883630 ursa-thinkcentre-m75q [Finished] 557acf52 ec2-t3-xlarge-us-east-2 [Finished] 557acf52 test-mac-arm [Failed] 557acf52 ursa-i9-9960x [Finished] 557acf52 ursa-thinkcentre-m75q Supported benchmarks: ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True test-mac-arm: Supported benchmark langs: C++, Python, R ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java

ursabot avatar Sep 16 '22 20:09 ursabot

['Python', 'R'] benchmarks have high level of regressions. test-mac-arm ursa-i9-9960x

ursabot avatar Sep 16 '22 20:09 ursabot