Analogue to createTable and recoverPartitions
In the native Spark API (e.g. SparkR) the process of running daily jobs which build up the same table is pretty straightforward:
library(SparkR)
library(magrittr)
# ... ETL ...
S3PATH = ...
TBL = 'my_table.my_schema'
SDF %>% write %>% partitionBy('column1', 'column2') %>%
mode('overwrite') %>% parquet(S3PATH)
sql(sprintf('DROP TABLE IF EXISTS %s', TBL))
createTable(TBL, S3PATH)
recoverPartitions(TBL)
Unfortunately, I don't see a straightforward way to imitate this in the sparklyr API.
spark_write_parquet covers the first bit pretty effectively, but I don't see a way to register a table to an existing S3 bucket besides manually (and laboriously) constructing a query of the form
CREATE EXTERNAL TABLE IF NOT EXISTS my_schema.my_table (
{{manually specified schema}}
)
PARTITIONED BY (
{{manually specified partitions}}
)
STORED AS PARQUET
LOCATION 'S3PATH'
And then, even more tediously,
ALTER TABLE my_schema.my_table
ADD IF NOT EXISTS PARTITION ( {{manually specified again}} )
Is there any plan to have something like spark_register_table and spark_recover_partitions?
FWIW I'm happy to report that sending
MSCK REPAIR TABLE my_schema.my_table
as SQL works as expected in obviating the ALTER TABLE... step(s)...
so having to manually specify the schema in CREATE EXTERNAL TABLE is the last major barrier to this...
Am I missing something in how spark_write_table works? It doesn't seem very well documented...
I was trying to do the same job.
I am using hdfs command to delete a specific directory files. like hdfs fs -rm /your/external_table_partition_path
And then calling spark_write_table with mode = 'append'
yea I'm resorting to the same... could you elaborate on the hdfs to remove existing files? I couldn't figure anything out that wasn't heavily manual...
how did you construct the partition path? I was hoping to be able to do it without any spark execution since that part's costly...
@MichaelChirico
just like this:
hdfs fs -rm /abs/dt=2017*
directly delete files from hdfs will be faster than spark one, because it skips to checkout hive partition information in mysql metastore(a transaction process).
mode = 'append' was not working for me since I was getting an error when combining with partition_by (perhaps a bug? I recall something along the lines of insertInto() and partitionBy() being incompatible, but I use SDF.partitionBy(...).write.mode('append').parquet(...) in the plain Spark API all the time...), so I resorted to writing my own helper function... it's not robust enough to be worth a PR but hopefully can be useful anyway.
For appending partitions, first, I use spark_write_parquet to write to a temp folder, then remove any existing files at the current partition and migrate from temp. That means this approach is not amenable to different runs needing to write to the same partition...
sdf_write_table <- function(sdf, schema, table,
partition_by = NULL, partition_val) {
if (!nzchar(Sys.which('aws')))
stop("This writer relies on aws being on your $PATH, please fix")
tbl_name = as.character(glue('{schema}.{table}'))
stopifnot(inherits(sdf, "tbl_spark"))
conn <- sdf[[c("src", "con")]]
bucket = Sys.getenv('S3BUCKET')
# use of as.character necessitated by #1784
table_path = as.character(glue("s3a://{bucket}/{schema}/{table}"))
has_partition = !is.null(partition_by)
temp_path = paste0(table_path, if (has_partition) '/temp')
spark_write_parquet(
sdf, path = temp_path, mode = 'overwrite',
partition_by = partition_by
)
if (has_partition) {
# remove existing data in partitions at partition_value
if (missing(partition_val)) {
# auto-infer partition values based on input data and stated columns
partition_val = sdf %>%
distinct_(.dots = partition_by) %>% collect %>% as.list
} else {
# some consistency checks but basically get to the format
# to match the missing(partition_val) branch
if (length(partition_by) > 1L && !is.list(partition_val))
stop('Multi-column partitions require `partition_val` to be a `list`')
if (length(partition_val) != length(partition_by))
stop('Supply the correct number of partition values')
if (length(partition_by) == 1L && is.atomic(partition_val))
partition_val = setNames(list(partition_val), partition_by)
if (!identical(partition_by, names(partition_val)))
stop('Supply the correct partition column-value mapping')
if (length(n <- unique(lengths(partition_val))) != 1L)
stop('Supply the correct number of values for each partition column')
}
# as.character prevents e.g. Date columns from ending up
# in wonky formats
partition_val = lapply(partition_val, as.character)
#transpose makes the below loop prettier but is not strictly necessary
partition_val =
lapply(seq_len(n), function(ii) sapply(partition_val, `[`, ii))
for (ii in seq_along(partition_val)) {
# construct path/key1=val1/key2=val2/... paths for each partition
partition_path = paste(sprintf('%s=%s', partition_by,
partition_val[[ii]]),
collapse = '/')
# aws s3 cli only accepts s3:// paths but spark_write_parquet only accepts s3a://
this_partition = gsub('^s3a', 's3',
sprintf('%s/%s', table_path, partition_path))
temp_partition = gsub('^s3a', 's3',
sprintf('%s/%s', temp_path, partition_path))
failure = system(sprintf('aws s3 rm %s --recursive', this_partition))
if (failure) stop('S3 write failed for ', this_partition)
failure = system(sprintf('aws s3 mv %s %s --recursive',
temp_partition, this_partition))
if (failure) stop('S3 migration failed for ', this_partition)
}
}
# incomplete subset, e.g. Decimal(38, 8)-class of types requires more
# sophisticated logic
spark_sql_type_map = c(
LongType = 'bigint', TimestampType = 'timestamp', StringType = 'string',
DateType = 'date', DoubleType = 'double', BooleanType = 'boolean',
FloatType = 'float', BinaryType = 'binary'
)
this_schema = sapply(sdf_schema(sdf), `[[`, 'type')
sql_type = spark_sql_type_map[this_schema]
if (anyNA(sql_type))
stop('Unrecognized column types, please fix.')
# keep space-separated to allow for peeling off partition columns
schema_fmt = paste(names(this_schema), spark_sql_type_map[this_schema])
if (has_partition) {
# match to get the right ordering
idx = match(partition_by, names(this_schema))
tbl_schema = sprintf('(%s) PARTITIONED BY (%s)',
paste(schema_fmt[-idx], collapse = ', '),
paste(schema_fmt[idx], collapse = ', '))
} else tbl_schema = sprintf('(%s)', paste(schema_fmt, collapse = ', '))
# doesn't delete table, just the registration
dbSendStatement(conn, glue('drop table if exists {tbl_name}'))
dbSendStatement(conn, glue("
CREATE EXTERNAL TABLE {tbl_name}
{tbl_schema}
STORED AS PARQUET
LOCATION '{table_path}'
"))
if (has_partition) {
message(glue('Recovering partitions for {tbl_name} with REPAIR TABLE'))
dbSendStatement(conn, glue('MSCK REPAIR TABLE {tbl_name}'))
}
return(TRUE)
}
glue parts can easily be dropped to avoid the extra dependency; dbSendStatement is an internal sparklyr registered method so it works fine.
Major external dependency is the aws s3 rm and aws s3 mv system tools... I could never get any AWS package in R to work 😅
@MichaelChirico
if spark version greater than 2.0, partion_by is not necessary, if the table partition was claimed.
And your implementation seems too complicated.
if you works on spark 1.6, you can follow this:
sc <- sparklyr::spark_connect(master = params$master, spark_home = "/lib/spark/", version = "1.6.0", config = config)
DBI::dbGetQuery(sc,paste0("alter table db_name.tbl_name drop if exists partition(dt='",params$exe_date,"')"))
result %>%
spark_write_table(name ="db_name.tbl_name",mode="append",partition_by =c("dt","hour"))
And if you works on spark 2.0, you can follow this:
sc <- sparklyr::spark_connect(master = params$master, spark_home = "/lib/spark/", version = "2.0.0", config = config)
DBI::dbGetQuery(sc,paste0("alter table db_name.tbl_name drop if exists partition(dt='",params$exe_date,"')"))
result %>%
spark_write_table(name ="db_name.tbl_name",mode="append")
@MichaelChirico @harryprince did either of you find a better way for creating external tables with sparklyr in spark3?
the pyspark is having spark.catalog.createTable and spark.catalog.recoverPartitions to do this, and the SparkR is having createTable and recoverPartitions and it is very simple to register as an external table without having to touch the existing data