sparklyr
sparklyr copied to clipboard
Cannot write dataframe to Databricks Unity Catalog table
Hi,
I am unsuccessful in writing to a table in Databricks Unity Catalog. I can easily read data from catalogs/schemas. I am using Python Databricks connect. I receive the same result regardless of using Azure Token or PAT Token. For example, this code:
library(sparklyr)
library(pysparklyr)
library(dplyr)
library(dbplyr)
sc <- spark_connect(
master = "<my_db_workspace_url",
cluster_id = "<cluster_id>",
token = "Azure_Token/PAT_Token",
method = "databricks_connect"
)
my_table <- tbl(sc, in_catalog("main", "default", "my_table"))
The above works well. But I seem to be unable to write data. I have tried the following:
sparklyr::copy_to(sc, my_table, in_catalog("main", "default", "my_table2"))
I receive:
> sparklyr::copy_to(sc, my_table, in_catalog("main", "default", "my_table2"))
Error in py_call_impl(callable, call_args$unnamed, call_args$named) :
TypeError: bad argument type for built-in operation
── Python Exception Message ────────────────────────────────────────────────────────────────────────
Traceback (most recent call last):
File "/home/ubuntu/.virtualenvs/r-sparklyr-databricks-13.3/lib/python3.10/site-packages/pyspark/sql/connect/catalog.py", line 216, in tableExists
pdf = self._execute_and_fetch(plan.TableExists(table_name=tableName, db_name=dbName))
File "/home/ubuntu/.virtualenvs/r-sparklyr-databricks-13.3/lib/python3.10/site-packages/pyspark/sql/connect/catalog.py", line 49, in _execute_and_fetch
pdf = DataFrame.withPlan(catalog, session=self._sparkSession).toPandas()
File "/home/ubuntu/.virtualenvs/r-sparklyr-databricks-13.3/lib/python3.10/site-packages/pyspark/sql/connect/dataframe.py", line 1654, in toPandas
query = self._plan.to_proto(self._session.client)
File "/home/ubuntu/.virtualenvs/r-sparklyr-databricks-13.3/lib/python3.10/site-packages/pyspark/sql/connect/plan.py", line 118, in to_proto
plan.root.CopyFrom(self.plan(session))
File "/home/ubuntu/.virtualenvs/r-sparklyr-databricks-13.3/lib/python3.10/site-packages/pyspark/sql/connect/plan.py", line 1818, in plan
plan.catalog.table_exists.table_name = self._table_name
TypeError: bad argument type for built-in operation
── R Traceback ─────────────────────────────────────────────────────────────────────────────────────
▆
1. ├─sparklyr::copy_to(...)
2. └─sparklyr:::copy_to.spark_connection(...)
3. ├─sparklyr::sdf_copy_to(...)
4. └─pysparklyr:::sdf_copy_to.pyspark_connection(...)
5. └─context$catalog$tableExists(name)
6. └─reticulate:::py_call_impl(callable, call_args$unnamed, call_args$named)
Using:
- Python 3.10
- Sparklyr 1.8.4
- Databricks runtime 13.3 LTS (includes Apache Spark 3.4.1, Scala 2.12)
- Databricks Connect 14.1.0
Any ideas to how I can write to a specific table in Unity Catalog with the path format catalog.schema.table?
https://stackoverflow.com/questions/77398619/how-can-you-write-to-a-databricks-catalog-from-r
@edgararuiz, do you happen to know anything regarding this? :)
Hi, copy_to() focuses on saving temporary tables. Their location is determined by Spark Connect, so all you should have to pass is copy_to(sc, my_table). Are you trying to create a permanent table?
@edgararuiz thanks for the repsonse. Yes, I want to create a permanent table in Unity Catalog. Do you know which method I should use for that? I have not been able to locate the correct one myself. I need to be able to specify which catalog and schema the table should be created in, like using the in_catalog('catalog', 'schema', 'table') function from the dbplyr package.
@edgararuiz sorry for pinging you again. But do you have any updates/ideas?
Hi @Zurina & @edgararuiz IMO the following should work, but it fails:
> spark_write_table(my_table, "main.default.my_table2")
Error in py_get_attr_impl(x, name, silent) :
AttributeError: 'DataFrameWriter' object has no attribute '%>%'
Run `reticulate::py_last_error()` for details.
A workaround might be:
my_table_pydf <- sparklyr:::spark_sqlresult_from_dplyr(my_table)$pyspark_obj
reticulate::py_run_string(
"r.my_table_pydf.write.format('delta').mode('error').saveAsTable('main.default.my_table2')")
(Here mode can be: error, append, overwrite or ignore.)
@cocinerox, thanks for your input. I agree, that part should work. Your workaround definitely works, but I hope this will be possible to do in native R eventually :)
@Zurina, a "native" R solution:
my_table_pydf <- sparklyr:::spark_sqlresult_from_dplyr(my_table)$pyspark_obj
my_table_pydf |>
sparklyr::invoke("write") |>
invoke_obj("format", "delta") |>
invoke_obj("mode", "error") |>
sparklyr::invoke("saveAsTable", "main.default.my_table2")
where
invoke_obj <- function(...) {
sparklyr::invoke(...)$pyspark_obj
}
Morning, the latest version of pysparklyr now supports spark_write_table(), which itself calls saveAsTable. I think that will encapsulate the solution above
@edgararuiz It works for me. Thanks!