sparklyr icon indicating copy to clipboard operation
sparklyr copied to clipboard

Analogue to createTable and recoverPartitions

Open MichaelChirico opened this issue 7 years ago • 8 comments

In the native Spark API (e.g. SparkR) the process of running daily jobs which build up the same table is pretty straightforward:

library(SparkR)
library(magrittr)
# ... ETL ...

S3PATH = ...
TBL = 'my_table.my_schema'

SDF %>% write %>% partitionBy('column1', 'column2') %>% 
  mode('overwrite') %>% parquet(S3PATH)

sql(sprintf('DROP TABLE IF EXISTS %s', TBL))
createTable(TBL, S3PATH)
recoverPartitions(TBL)

Unfortunately, I don't see a straightforward way to imitate this in the sparklyr API.

spark_write_parquet covers the first bit pretty effectively, but I don't see a way to register a table to an existing S3 bucket besides manually (and laboriously) constructing a query of the form

CREATE EXTERNAL TABLE IF NOT EXISTS my_schema.my_table (
  {{manually specified schema}}
)
PARTITIONED BY (
  {{manually specified partitions}}
)
STORED AS PARQUET
LOCATION 'S3PATH'

And then, even more tediously,

ALTER TABLE my_schema.my_table
ADD IF NOT EXISTS PARTITION ( {{manually specified again}} )

Is there any plan to have something like spark_register_table and spark_recover_partitions?

MichaelChirico avatar Nov 22 '18 11:11 MichaelChirico

FWIW I'm happy to report that sending

MSCK REPAIR TABLE my_schema.my_table

as SQL works as expected in obviating the ALTER TABLE... step(s)...

so having to manually specify the schema in CREATE EXTERNAL TABLE is the last major barrier to this...

Am I missing something in how spark_write_table works? It doesn't seem very well documented...

MichaelChirico avatar Nov 22 '18 12:11 MichaelChirico

I was trying to do the same job.

I am using hdfs command to delete a specific directory files. like hdfs fs -rm /your/external_table_partition_path And then calling spark_write_table with mode = 'append'

harryprince avatar Nov 23 '18 02:11 harryprince

yea I'm resorting to the same... could you elaborate on the hdfs to remove existing files? I couldn't figure anything out that wasn't heavily manual...

how did you construct the partition path? I was hoping to be able to do it without any spark execution since that part's costly...

MichaelChirico avatar Nov 23 '18 02:11 MichaelChirico

@MichaelChirico

just like this:

hdfs fs -rm /abs/dt=2017*

directly delete files from hdfs will be faster than spark one, because it skips to checkout hive partition information in mysql metastore(a transaction process).

harryprince avatar Nov 26 '18 03:11 harryprince

mode = 'append' was not working for me since I was getting an error when combining with partition_by (perhaps a bug? I recall something along the lines of insertInto() and partitionBy() being incompatible, but I use SDF.partitionBy(...).write.mode('append').parquet(...) in the plain Spark API all the time...), so I resorted to writing my own helper function... it's not robust enough to be worth a PR but hopefully can be useful anyway.

For appending partitions, first, I use spark_write_parquet to write to a temp folder, then remove any existing files at the current partition and migrate from temp. That means this approach is not amenable to different runs needing to write to the same partition...

sdf_write_table <- function(sdf, schema, table, 
                            partition_by = NULL, partition_val) {
  if (!nzchar(Sys.which('aws')))
    stop("This writer relies on aws being on your $PATH, please fix")

  tbl_name = as.character(glue('{schema}.{table}'))

  stopifnot(inherits(sdf, "tbl_spark"))
  conn <- sdf[[c("src", "con")]]

  bucket = Sys.getenv('S3BUCKET')
  # use of as.character necessitated by #1784 
  table_path = as.character(glue("s3a://{bucket}/{schema}/{table}"))
  
  has_partition = !is.null(partition_by)
  temp_path = paste0(table_path, if (has_partition) '/temp')
  
  spark_write_parquet(
    sdf, path = temp_path, mode = 'overwrite', 
    partition_by = partition_by
  )
  
  if (has_partition) {
    # remove existing data in partitions at partition_value
    if (missing(partition_val)) {
      # auto-infer partition values based on input data and stated columns
      partition_val = sdf %>% 
        distinct_(.dots = partition_by) %>% collect %>% as.list
    } else {
     # some consistency checks but basically get to the format
     #   to match the missing(partition_val) branch
      if (length(partition_by) > 1L && !is.list(partition_val))
        stop('Multi-column partitions require `partition_val` to be a `list`')
      if (length(partition_val) != length(partition_by))
        stop('Supply the correct number of partition values')
      if (length(partition_by) == 1L && is.atomic(partition_val))
        partition_val = setNames(list(partition_val), partition_by)
      if (!identical(partition_by, names(partition_val)))
        stop('Supply the correct partition column-value mapping')
      if (length(n <- unique(lengths(partition_val))) != 1L)
        stop('Supply the correct number of values for each partition column')
    }
    # as.character prevents e.g. Date columns from ending up
    #   in wonky formats
    partition_val = lapply(partition_val, as.character)

    #transpose makes the below loop prettier but is not strictly necessary
    partition_val = 
      lapply(seq_len(n), function(ii) sapply(partition_val, `[`, ii))
    for (ii in seq_along(partition_val)) {
      # construct path/key1=val1/key2=val2/... paths for each partition
      partition_path = paste(sprintf('%s=%s', partition_by, 
                                     partition_val[[ii]]),
                             collapse = '/')
      # aws s3 cli only accepts s3:// paths but spark_write_parquet only accepts s3a://
      this_partition = gsub('^s3a', 's3',
                            sprintf('%s/%s', table_path, partition_path))
      temp_partition = gsub('^s3a', 's3',
                            sprintf('%s/%s', temp_path, partition_path))
      
      failure = system(sprintf('aws s3 rm %s --recursive', this_partition))
      if (failure) stop('S3 write failed for ', this_partition)
      
      failure = system(sprintf('aws s3 mv %s %s --recursive',
                               temp_partition, this_partition))
      if (failure) stop('S3 migration failed for ', this_partition)
    }
  }
  
  # incomplete subset, e.g. Decimal(38, 8)-class of types requires more
  #   sophisticated logic
  spark_sql_type_map = c(
    LongType = 'bigint', TimestampType = 'timestamp', StringType = 'string',
    DateType = 'date', DoubleType = 'double', BooleanType = 'boolean',
    FloatType = 'float', BinaryType = 'binary'
  )
  this_schema = sapply(sdf_schema(sdf), `[[`, 'type')
  sql_type = spark_sql_type_map[this_schema]
  if (anyNA(sql_type))
    stop('Unrecognized column types, please fix.')
  # keep space-separated to allow for peeling off partition columns
  schema_fmt = paste(names(this_schema), spark_sql_type_map[this_schema])
  if (has_partition) {
    # match to get the right ordering
    idx = match(partition_by, names(this_schema))
    tbl_schema = sprintf('(%s) PARTITIONED BY (%s)', 
                         paste(schema_fmt[-idx], collapse = ', '),
                         paste(schema_fmt[idx], collapse = ', '))
  } else tbl_schema = sprintf('(%s)', paste(schema_fmt, collapse = ', '))
  
  # doesn't delete table, just the registration
  dbSendStatement(conn, glue('drop table if exists {tbl_name}'))
  dbSendStatement(conn, glue("
  CREATE EXTERNAL TABLE {tbl_name} 
  {tbl_schema}
  STORED AS PARQUET
  LOCATION '{table_path}'
  "))
  
  if (has_partition) {
    message(glue('Recovering partitions for {tbl_name} with REPAIR TABLE'))
    dbSendStatement(conn, glue('MSCK REPAIR TABLE {tbl_name}'))
  }
  
  return(TRUE)
}

glue parts can easily be dropped to avoid the extra dependency; dbSendStatement is an internal sparklyr registered method so it works fine.

Major external dependency is the aws s3 rm and aws s3 mv system tools... I could never get any AWS package in R to work 😅

MichaelChirico avatar Nov 26 '18 17:11 MichaelChirico

@MichaelChirico

if spark version greater than 2.0, partion_by is not necessary, if the table partition was claimed.

And your implementation seems too complicated.

harryprince avatar Nov 28 '18 03:11 harryprince

if you works on spark 1.6, you can follow this:

sc <- sparklyr::spark_connect(master = params$master, spark_home = "/lib/spark/", version = "1.6.0",  config = config)
DBI::dbGetQuery(sc,paste0("alter  table db_name.tbl_name drop if exists partition(dt='",params$exe_date,"')"))
  
  result %>% 
  spark_write_table(name ="db_name.tbl_name",mode="append",partition_by =c("dt","hour"))

And if you works on spark 2.0, you can follow this:

sc <- sparklyr::spark_connect(master = params$master, spark_home = "/lib/spark/", version = "2.0.0",  config = config)
DBI::dbGetQuery(sc,paste0("alter  table db_name.tbl_name drop if exists partition(dt='",params$exe_date,"')"))
  
  result %>% 
  spark_write_table(name ="db_name.tbl_name",mode="append")

harryprince avatar Nov 28 '18 03:11 harryprince

@MichaelChirico @harryprince did either of you find a better way for creating external tables with sparklyr in spark3?

the pyspark is having spark.catalog.createTable and spark.catalog.recoverPartitions to do this, and the SparkR is having createTable and recoverPartitions and it is very simple to register as an external table without having to touch the existing data

KevinAppelBofa avatar Aug 31 '22 19:08 KevinAppelBofa