sergeant icon indicating copy to clipboard operation
sergeant copied to clipboard

Out of memory Issue reading single column from parquet file

Open mskyttner opened this issue 5 years ago • 0 comments

Thanks for providing the sergeant package!

My use case is reading a single column of data from a fairly large parquet file. The column I want to read from the parquet file is called occurrenceId and the whole column fits into a character vector of length 70M in R where each value is a string of length 41 characters (it is a unique identifier, and I'd like to check it for uniqueness and presence in R). In R the whole column would occupy about 700M in memory when I inspect it with ls().

I can do this with sparklyr but the drill sergeant's approach is appealing, being more lightweight. I am struggling with an out-of-memory issue, though and I have 16 GB available, which I think should suffice, given the size of the vector in R, so now I'm wondering if this use case is supported or if I'm doing it wrong?

The dataset I'm using is public and can be viewed here https://www.gbif.org/dataset/38b4c89f-584c-41bb-bd8f-cd1def33e92f and it can be downloaded from here in .zip format: http://www.gbif.se/ipt/archive.do?r=artdata

I first tried using vroom and reading directly from the compressed .zip file (details here https://github.com/r-lib/vroom/issues/116) but the promise of being able to read directly from a comparatively smaller parquet file and being able to just read the columns I need made me turn to the sergeant. So in my attempt to read the parquet file I have first converted the .zip to parquet using sparklyr, like so:

library(sparklyr)
library(dplyr)

# first install spark 2.40 hadoop 2.7 with sparklyr::spark_install()

Sys.setenv("SPARK_MEM" = "12g")

config <- spark_config()
config$`sparklyr.shell.driver-memory` <- '12G'
config$`sparklyr.shell.executor-memory` <- '4G'
config$sparklyr.defaultPackages <- "com.datastax.spark:spark-cassandra-connector_2.11:2.0.0-M3"
config$spark.cassandra.cassandra.host <- "localhost"
config$spark.driver.maxResultSize <- "4G"
config$spark.executor.cores <- 3

# is pushdown option TRUE?

sc <- spark_connect(master = "local", config = config)

# for this connection, load all records

system.time(
  spark_read_csv(sc, memory = FALSE,
    name = "artdata", path = "file:///home/roger/artdata/artdata.tsv", delimiter = "\t")
)

#user   system  elapsed 
#6.154    7.060 1559.874 

# generate a parquet file based on the dataframe above

system.time(
  spark_write_parquet(
    tbl(sc, "artdata"),  
    "file:///home/roger/artdata/artdata.parquet")
)

#user   system  elapsed 
#14.634   16.586 3816.375 

# the parquet-file is 3.8 GB on disk, smaller than the zip

spark_tbl_handle <- spark_read_parquet(sc, memory = FALSE,
  "artdata", "file:///home/roger/artdata/artdata.parquet")


has_valid_bor <- function() {
  
  bor <- 
    spark_tbl_handle %>%
    count(basisOfRecord) %>%
    collect() %>%
    mutate(is_ok = basisOfRecord %in% c(
        "humanobservation", 
        "machineobservation"
      )
    )

  bor %>% pull(is_ok) %>% all
}

n_rowcount <- function() {
  
  spark_tbl_handle %>%
  summarise(n = n()) %>%
  pull(n)

}


has_valid_id <- function() {
  
  ids <- 
    spark_tbl_handle %>%
    count(occurrenceID) %>%
    filter(n > 1, is.na(occurrenceID)) %>%
    collect()
  
  nrow(ids) == 0
  
}


system.time(
  has_valid_bor()
)

system.time(
  has_valid_id()
)

system.time(
  n_rowcount()
)

sort_artdata <- function() {
  spark_tbl_handle %>%
  arrange(occurrenceID) %>%
  head(10) %>%
  collect()
}

system.time(
  sort_artdata()
)

# sorting in spark takes about 5 minutes...
#user  system elapsed 
#3.182   1.370 282.698

This gives me a parquet file on disk.

I then proceed to attempt to use the sergeant to read the occurrenceId column like so:

library(sergeant)
library(tidyverse)

if (Sys.which("docker") == "")
  stop("Please install docker first - see https://docs.docker.com/install/")

# install and run official Apache Drill software
system("docker stop drill; docker rm drill; docker run -i --name drill -e DRILL_HEAP=10G -v /home/markus/tmp/artdata:/tmp -p 8047:8047 --detach drill/apache-drill:1.16.0 /bin/bash")

dc <- drill_connection("localhost") 

df <- drill_query(dc, "SELECT occurrenceId FROM dfs.`/tmp/artdata.parquet`")

The error message that I get is the following:

  |===========================================================================| 100%
Query ==> SELECT occurrenceId FROM dfs.`/tmp/artdata.parquet`
RESOURCE ERROR: There is not enough heap memory to run this query using the web interface. 

Please try a query with fewer columns or with a filter or limit condition to limit the data returned. 
You can also try an ODBC/JDBC client. 

[Error Id: f42e398d-0afb-4931-a800-2a56baaa074c ]

I tried to set the DRILL_HEAP to 10G.

Is this use case supported with the sergeant?

Any advice on how I should proceed?

mskyttner avatar Jun 03 '19 17:06 mskyttner