odbc icon indicating copy to clipboard operation
odbc copied to clipboard

ODBC driver does not effectively take advantage of parallelization in Snowflake (INSERTs are sequential and slow)

Open tshaken opened this issue 2 years ago • 2 comments

Issue Description and Expected Result

RStudio ODBC driver does not effectively take advantage of parallelization in Snowflake as compared with SQL Server. INSERT functions in Snowflake are executed sequentially, which results in slower performance.

Database

Snowflake 6.22.0

Reproducible Example

ODBC Connections

library(tidyverse)
library(odbc)
library(DBI)
library(glue)
library(rbenchmark)
library(parallel)

sqlCon <- dbConnect(odbc(), dsn='XXXXX', timeout=60*60, database='XXXXX')

snowCon <- dbConnect(odbc(), dsn='snowflake', warehouse='XXXXX_WH',
                     PRIV_KEY_FILE_PWD=rstudioapi::askForPassword("Passphrase for key: "),
                     role="XXXXX",
                     database="XXXXX",
                     schema="DBO"

numCores <- 8
repeatQuery <- 10

Test 1: Select performance (SELECT TOP 10 MM)

qry <- 'SELECT TOP 10000000 *
        FROM "XXXXX_db"."DBO"."V_XXXXX"
        '

read_perf <- benchmark(
  Read_SQLServer = {
    dbGetQuery(sqlCon, qry)
  },
  Read_Snowflake = {
    dbGetQuery(snowCon, qry)
  },
  replications = repeatQuery
)

knitr::kable(read_perf)
test replications elapsed relative user.self sys.self user.child sys.child
2 Read_Snowflake 10 211.047 1.268 205.212 16.197 0 0
1 Read_SQLServer 10 166.406 1.000 149.501 16.452 0 0

Test 2: Write performance (100 columns, 100K rows)

source("customWrite.R")

big_table <- as.data.frame(round(MASS::mvrnorm(n=1e5, mu=1:100, Sigma=diag(1:100)),3))

write_perf <- benchmark(
  Write_SQLServer = {
    customWriteTable(sqlCon, targetChar = "t_big_table", sourceDf = big_table, overwrite = TRUE)
  },
  Write_Snowflake = {
    customWriteTable(snowCon, targetChar = "t_big_table", sourceDf = big_table, overwrite = TRUE)
  },
  replications = repeatQuery
)

knitr::kable(write_perf)
test replications elapsed relative user.self sys.self user.child sys.child
2 Write_Snowflake 10 356.456 8.875 203.692 3.786 0.205 0.182
1 Write_SQLServer 10 40.165 1.000 14.619 2.335 0.000 0.000

Test 3: Concurrent writes 100 columns, 360 rows, 8 concurrent writes, 100 repeats

set.seed(123)

rowIndex <- vector("list", 100)
rowIndex <- lapply(rowIndex, function(x) sample(1:nrow(big_table), 360))

subset_and_write <- function(index, con){
  if(con@info$dbms.name == 'Microsoft SQL Server') {
    con <- dbConnect(odbc(), dsn='BI_17', timeout=60*60)
  }
  
  customWriteTable(con, targetChar = "t_big_table", sourceDf = big_table[index,], append = TRUE)
}

write_concur_perf <- benchmark(
  Write_Concur_SQLServer = {
    mclapply(rowIndex, subset_and_write, con=sqlCon, mc.cores=numCores)
  },
  Write_Concur_Snowflake = {
    mclapply(rowIndex, subset_and_write, con=snowCon, mc.cores=numCores)
  },
  replications = repeatQuery
)

knitr::kable(write_concur_perf)
test replications elapsed relative user.self sys.self user.child sys.child
2 Write_Concur_Snowflake 10 278.674 19.297 1.687 9.049 112.092 22.192
1 Write_Concur_SQLServer 10 14.441 1.000 0.116 4.901 14.682 11.200

Custom Write Table Source File

customWriteTable <- function(conn, targetChar, sourceDf, overwrite=FALSE, append=FALSE, batch_size=50000, ...){
  
  original_settings <- getOption("odbc.batch_rows", 1024)
  
  options(odbc.batch_rows=batch_size)
  
  if(overwrite==TRUE & append==TRUE){
    stop("Only one arg can be TRUE: overwrite and append.")
  }
  
  if(overwrite==TRUE){
    qry <- glue_sql("drop table if exists ", targetChar, .con=conn)
    dbExecute(conn, qry)
    dbWriteTable(conn, targetChar, value=sourceDf, batch_rows = batch_size, ...)
    
  } else if(append==TRUE){
    if(conn@info$dbms.name == 'Microsoft SQL Server') {
      dbWriteTable(conn, value=sourceDf, name = targetChar, overwrite = FALSE, append = TRUE,
                   row.names=FALSE, batch_rows = batch_size)
    } else(
      #  **dbWriteTable() fails in Snowflake for appends**
      dbAppendTable(conn, targetChar, value=sourceDf, batch_rows = batch_size, ...) 
    )
  } else{
    dbWriteTable(conn, targetChar, value=sourceDf, batch_rows = batch_size, ...)
  }
  options(odbc.batch_rows=original_settings)
}

tshaken avatar Jul 11 '22 22:07 tshaken

A workaround to improve writes to a Snowflake table is to create a Snowflake "stage" and use the PUT-COPY-INTO commands. Examples adopted from https://github.com/snowflakedb/dplyr-snowflakedb (deprecated).

Step 1: Write the data frame into multiple CSV files on disk Step 2: PUT the CSV files to Snowflake stage Step 3: COPY INTO the CSV files from stage to table

The example assumes the table (t_big_table) already exists.

Write data frame to CSV files on disk

library(tidyverse)
library(odbc)
library(DBI)
library(glue)
library(rbenchmark)
library(parallel)

numCores <- 8

big_table <- as.data.frame(round(MASS::mvrnorm(n=1e5, mu=1:100, Sigma=diag(1:100)),3))


set.seed(123)

rowIndex <- vector("list", 100)
rowIndex <- lapply(rowIndex, function(x) sample(1:nrow(big_table), 360))

subset_and_write_to_CSV <- function(index){

  data.table::fwrite(x=big_table[index,], file = paste0("WriteBack_",index[1], ".csv"),
                     nThread = numCores, verbose=FALSE, quote=TRUE, append =FALSE)
}


mclapply(rowIndex, subset_and_write_to_CSV, mc.cores=numCores)

PUT CSV files to stage

snowCon <- dbConnect(odbc(), dsn='snowflake', warehouse='xx_WH',
                     PRIV_KEY_FILE_PWD=rstudioapi::askForPassword("Passphrase for key: "),
                     role="xxx",
                     database="xxxx",
                     schema="DBO"
)  

dbSendQuery(snowCon, "
              create or replace file format CSV_WRITE_BACK
              type = csv
              field_delimiter = ','
              skip_header = 1
              null_if = ('NULL', 'null')
              empty_field_as_null = true
              compression = gzip
            ")

dbSendQuery(snowCon, "CREATE OR REPLACE STAGE mystage file_format=CSV_WRITE_BACK")

dbSendQuery(snowCon,"
            PUT file:///mnt/xxxxxx/WriteBack_*.csv @mystage overwrite=TRUE
            ")

dbGetQuery(snowCon, "LIST @mystage")

COPY-INTO table

dbSendQuery(snowCon, "copy into t_big_table
                FROM @mystage
                FILE_FORMAT = 'CSV_WRITE_BACK'
            ")

dbGetQuery(snowCon, "SELECT COUNT(*) FROM xxxx.DBO.t_big_table")

dbSendQuery(snowCon, "remove @mystage;")

crossxwill avatar Jul 15 '22 22:07 crossxwill

Can you please simplify your reprex to the smallest possible case, use bench::mark() for timings, and run it with the reprex package?

I'd strongly advise against using DBI with mclapply() as there's zero guarantee that this will work in general. Do you have some evidence to support your claim that the slow performance is due to lack of parallelisation?

hadley avatar Jul 17 '22 14:07 hadley

Not sure if this is related, but even dbAppendTable takes at least 25 seconds, even with a trivial number of rows (5) and columns(1) of type numeric.

eriksquires avatar Apr 20 '23 03:04 eriksquires

@eriksquires could you please provide a simple reprex illustrating the problem?

hadley avatar Apr 24 '23 14:04 hadley

library(DBI)


#
# Do this in Snowflake
#
#create schema r_testing;


#create or replace TABLE R_TESTING.TINY (
#  PRODUCT_ID NUMERIC
#);


con <- dbConnect(odbc::odbc(), "odb_conn_name", Warehouse = "your_warehouse", timeout = 10)

df <- data.frame(PRODUCT_ID=1:5000)


system.time({dbAppendTable( con, "TINY", df)})
#>    user  system elapsed
#>   1.160   0.363   9.595

# Fully qualified table name is at least 2 seconds faster
system.time({dbAppendTable( con, SQL("DB_TEST_NAME.R_TESTING.TINY"), df)})
#>    user  system elapsed
#>   0.332   0.096   5.065


# Create second schema with the Tiny table
#
# create schema r_testing_second;
# create or replace TABLE R_TESTING_SECOND.TINY (
#  PRODUCT_ID NUMERIC
# );


# Now this query which worked fine above will fail
# no matter what your current schema is, or the default schema in the ODBC.ini file
# or adding Schema = "R_TESTING" to the odbc connection string. 
# 
# It also doesn't matter if the second table is in another schema in a different DB. 
# So long as TINY exists anywhere in your Snowflake account this seems to fail. 
# 
# Note that this issue also doesn't seem to
# respect role visibility.  Even if you can't see the second schema, this will fail.
dbAppendTable( con, "TINY", df)

# Statement above results in these errors, and it's always '18' supplied at the end.
#
#
# Error in result_describe_parameters(rs@ptr, fieldDetails) :
#   Query requires '1' params; '18' supplied.


# Adding the fully qualified name will work.  
dbAppendTable( con, SQL("DB_TEST_NAME.R_TESTING.TINY"), df)

dbDisconnect(con)

eriksquires avatar Apr 26 '23 19:04 eriksquires

@eriksquires thanks! That's really is terrible performance 😞

hadley avatar Apr 26 '23 20:04 hadley

Yes, I was using the RJDBC interface for performance reasons, but again, got stuck with dbAppendApp.

On Wed, Apr 26, 2023 at 4:53 PM Hadley Wickham @.***> wrote:

@eriksquires https://github.com/eriksquires thanks! That's really is terrible performance 😞

— Reply to this email directly, view it on GitHub https://github.com/r-dbi/odbc/issues/499#issuecomment-1524031312, or unsubscribe https://github.com/notifications/unsubscribe-auth/ABCW6H4QWWFOKSRDS3BULQTXDGDK5ANCNFSM53I6XZOQ . You are receiving this because you were mentioned.Message ID: @.***>

eriksquires avatar Apr 26 '23 22:04 eriksquires