MonetDBLite icon indicating copy to clipboard operation
MonetDBLite copied to clipboard

Parallel column conversion for query results / appends

Open hannes opened this issue 8 years ago • 7 comments

We could probably parallelise dbWriteTable, in particular value conversions. Also on the query side. Because every column is essentially independent. Actually thats a great idea, will probably add this at some point.

importing ten large tables on ten cores is 7x as fast with a monetdb session that allows multiple connections. until monetdblite can compete with multicore imports, monetdb server control code should not be deprecated.

reproducible script below

library(DBI)
library(MonetDBLite)
library(MonetDB.R)

# create ten csv files with 1,000,000 records each
for( j in 1:10 ){
    tf <- tempfile()
    write.csv( mtcars[ rep( 1:32 , 31250 ) , ] , tf , row.names = FALSE )
    assign( paste0( "tf" , j ) , tf )
}



# # # # # monetdblite import of 10,000,000 records
db <- dbConnect( MonetDBLite() )
system.time( {
    for( j in 1:10 ) {
        tablename <- basename( get( paste0( 'tf' , j ) ) )
        dbWriteTable( db , tablename , mtcars[ 0 , ] )
        dbSendUpdate(db, paste0("COPY OFFSET 2 INTO ", tablename, " FROM '", get( paste0( 'tf' , j ) ) , "' using delimiters ',','\\n','\"'  NULL AS ''" ) )
    }
} )




# # # # # external mserver import of 10,000,000 records
batfile <-
    monetdb.server.setup(
                    database.directory = paste0( tempdir() , "/MonetDB" ) ,
                    monetdb.program.path = 
                        ifelse( 
                            .Platform$OS.type == "windows" , 
                            "C:/Program Files/MonetDB/MonetDB5" , 
                            "" 
                        ) ,
                    dbname = "mydb" ,
                    dbport = 50000
    )

dbname <- "mydb"
dbport <- 50000
monetdb.server.start( batfile )
monet.url <- paste0( "monetdb://localhost:" , dbport , "/" , dbname )
mydb <- dbConnect( MonetDB.R() , monet.url , wait = TRUE )
pid <- as.integer( dbGetQuery( mydb , "SELECT value FROM env() WHERE name = 'monet_pid'" )[[1]] )

library(snow)
cl<-makeCluster(10,type="SOCK")

myfun <-
    function( myfile , murl ){

        library(DBI)
        library(MonetDBLite)
        library(MonetDB.R)
        tablename <- basename( myfile ) 
        con <- dbConnect( MonetDB.R() , murl , wait = TRUE )
        dbSendUpdate(con, paste0("COPY OFFSET 2 INTO ", tablename, " FROM '", myfile , "' using delimiters ',','\\n','\"'  NULL AS ''" ) )

        TRUE
    }

system.time({
for( j in 1:10 ) {
    tablename <- basename( get( paste0( 'tf' , j ) ) )
    dbWriteTable( mydb , tablename , mtcars[ 0 , ] )
}
clusterApply(cl,sapply( paste0( 'tf' , 1:10 ) , get ) ,myfun , murl = monet.url) 
})

stopCluster(cl)
monetdb.server.stop( pid )

Results:

# monetdblite time:
# user  system elapsed 
#58.28    2.87   98.41 

# external mserver time:
# user  system elapsed 
#0.04    0.02   14.12

hannes avatar May 23 '16 09:05 hannes

@ajdamico , 👍!

guilhermejacob avatar Jan 25 '17 16:01 guilhermejacob

could use dataflow for this

hannes avatar Jun 15 '17 06:06 hannes

just to clarify: data import parallelization seems most useful as monetdb/mserver improvement and not a monetdblite-specific enhancement? thanks

ajdamico avatar Jun 15 '17 15:06 ajdamico

no in this case its lite-specific because we need to convert sexp's to monetdb columns

hannes avatar Jun 16 '17 07:06 hannes

way to go here is to add column conversion for query results / appends to MAL plan. Need to come up with reasonable C interface for this.

hannes avatar Jun 20 '17 06:06 hannes

can't really do this for query results, most time is spent in string columns and those need to be single-threaded because of R's global string hash table...

hannes avatar Jul 25 '17 19:07 hannes

I like the non-blocking dbSendQuery idea described here: https://github.com/r-dbi/DBI/issues/69 A typical interaction could look like this:

q1 <- dbSendQuery(c1, "COPY INTO t1 ...")
q2 <- dbSendQuery(c2, "COPY INTO t2 ...")
r1 <- dbFetch(q1)
r2 <- dbFetch(q2)

The two queries would run at the same time, but dbFetch would only return if the result of the corresponding query (in this case an "OK" is available. Comments?

hannes avatar Nov 07 '17 13:11 hannes