sergeant
sergeant copied to clipboard
Out of memory Issue reading single column from parquet file
Thanks for providing the sergeant
package!
My use case is reading a single column of data from a fairly large parquet file. The column I want to read from the parquet file is called occurrenceId
and the whole column fits into a character vector of length 70M in R where each value is a string of length 41 characters (it is a unique identifier, and I'd like to check it for uniqueness and presence in R). In R the whole column would occupy about 700M in memory when I inspect it with ls()
.
I can do this with sparklyr
but the drill sergeant
's approach is appealing, being more lightweight. I am struggling with an out-of-memory issue, though and I have 16 GB available, which I think should suffice, given the size of the vector in R, so now I'm wondering if this use case is supported or if I'm doing it wrong?
The dataset I'm using is public and can be viewed here https://www.gbif.org/dataset/38b4c89f-584c-41bb-bd8f-cd1def33e92f and it can be downloaded from here in .zip format: http://www.gbif.se/ipt/archive.do?r=artdata
I first tried using vroom
and reading directly from the compressed .zip file (details here https://github.com/r-lib/vroom/issues/116) but the promise of being able to read directly from a comparatively smaller parquet file and being able to just read the columns I need made me turn to the sergeant
. So in my attempt to read the parquet file I have first converted the .zip to parquet using sparklyr
, like so:
library(sparklyr)
library(dplyr)
# first install spark 2.40 hadoop 2.7 with sparklyr::spark_install()
Sys.setenv("SPARK_MEM" = "12g")
config <- spark_config()
config$`sparklyr.shell.driver-memory` <- '12G'
config$`sparklyr.shell.executor-memory` <- '4G'
config$sparklyr.defaultPackages <- "com.datastax.spark:spark-cassandra-connector_2.11:2.0.0-M3"
config$spark.cassandra.cassandra.host <- "localhost"
config$spark.driver.maxResultSize <- "4G"
config$spark.executor.cores <- 3
# is pushdown option TRUE?
sc <- spark_connect(master = "local", config = config)
# for this connection, load all records
system.time(
spark_read_csv(sc, memory = FALSE,
name = "artdata", path = "file:///home/roger/artdata/artdata.tsv", delimiter = "\t")
)
#user system elapsed
#6.154 7.060 1559.874
# generate a parquet file based on the dataframe above
system.time(
spark_write_parquet(
tbl(sc, "artdata"),
"file:///home/roger/artdata/artdata.parquet")
)
#user system elapsed
#14.634 16.586 3816.375
# the parquet-file is 3.8 GB on disk, smaller than the zip
spark_tbl_handle <- spark_read_parquet(sc, memory = FALSE,
"artdata", "file:///home/roger/artdata/artdata.parquet")
has_valid_bor <- function() {
bor <-
spark_tbl_handle %>%
count(basisOfRecord) %>%
collect() %>%
mutate(is_ok = basisOfRecord %in% c(
"humanobservation",
"machineobservation"
)
)
bor %>% pull(is_ok) %>% all
}
n_rowcount <- function() {
spark_tbl_handle %>%
summarise(n = n()) %>%
pull(n)
}
has_valid_id <- function() {
ids <-
spark_tbl_handle %>%
count(occurrenceID) %>%
filter(n > 1, is.na(occurrenceID)) %>%
collect()
nrow(ids) == 0
}
system.time(
has_valid_bor()
)
system.time(
has_valid_id()
)
system.time(
n_rowcount()
)
sort_artdata <- function() {
spark_tbl_handle %>%
arrange(occurrenceID) %>%
head(10) %>%
collect()
}
system.time(
sort_artdata()
)
# sorting in spark takes about 5 minutes...
#user system elapsed
#3.182 1.370 282.698
This gives me a parquet file on disk.
I then proceed to attempt to use the sergeant
to read the occurrenceId column like so:
library(sergeant)
library(tidyverse)
if (Sys.which("docker") == "")
stop("Please install docker first - see https://docs.docker.com/install/")
# install and run official Apache Drill software
system("docker stop drill; docker rm drill; docker run -i --name drill -e DRILL_HEAP=10G -v /home/markus/tmp/artdata:/tmp -p 8047:8047 --detach drill/apache-drill:1.16.0 /bin/bash")
dc <- drill_connection("localhost")
df <- drill_query(dc, "SELECT occurrenceId FROM dfs.`/tmp/artdata.parquet`")
The error message that I get is the following:
|===========================================================================| 100%
Query ==> SELECT occurrenceId FROM dfs.`/tmp/artdata.parquet`
RESOURCE ERROR: There is not enough heap memory to run this query using the web interface.
Please try a query with fewer columns or with a filter or limit condition to limit the data returned.
You can also try an ODBC/JDBC client.
[Error Id: f42e398d-0afb-4931-a800-2a56baaa074c ]
I tried to set the DRILL_HEAP to 10G.
Is this use case supported with the sergeant
?
Any advice on how I should proceed?