spark-bigquery-connector
spark-bigquery-connector copied to clipboard
Orc makes all fields nullable
I'm trying to replace a partition in BQ table using orc
as an intermediate format, but it fails with the following error:
20/01/30 10:07:22 INFO com.google.cloud.spark.bigquery.BigQueryWriteHelper: Submitted load to GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=<...>, projectId=<...>, tableId=<...>$19700101}}. jobId: JobId{project=<...>, job=227412f2-2829-48ad-a9b9-1c4aa82c50fb, location=EU}
Traceback (most recent call last):
File "/tmp/marketing-manual-touchpoints-cost-assignment_3a16cfe0/driver.py", line 28, in <module>
job_module.compute(spark, job_args)
File "/hadoop/spark/tmp/spark-82529fc0-149f-49d8-a20f-9ca8fb2d4b14/userFiles-d918a2cb-bd8a-4a6d-aa3c-853de489335e/marketing.zip/bi_etl/lib/marketing/touchpoints_cost_assignment/assign_cost_to_touchpoints.py", line 605, in compute
File "/hadoop/spark/tmp/spark-82529fc0-149f-49d8-a20f-9ca8fb2d4b14/userFiles-d918a2cb-bd8a-4a6d-aa3c-853de489335e/marketing.zip/bi_etl/lib/marketing/touchpoints_cost_assignment/assign_cost_to_touchpoints.py", line 1142, in compute_manual
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 736, in save
File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o119.save.
: java.lang.RuntimeException: Failed to write to BigQuery
at com.google.cloud.spark.bigquery.BigQueryWriteHelper.writeDataFrameToBigQuery(BigQueryWriteHelper.scala:62)
at com.google.cloud.spark.bigquery.BigQueryInsertableRelation.insert(BigQueryInsertableRelation.scala:42)
at com.google.cloud.spark.bigquery.BigQueryRelationProvider.createRelation(BigQueryRelationProvider.scala:86)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryException: Failed to load to <table-name>$19700101 in job JobId{project=<..>, job=227412f2-2829-48ad-a9b9-1c4aa82c50fb, location=EU}. BigQuery error was Provided Schema does not match Table <table-name>$19700101. Field touchpoint_id.web_touchpoint_id.full_visitor_id has changed mode from REQUIRED to NULLABLE
at com.google.cloud.spark.bigquery.BigQueryWriteHelper.loadDataToBigQuery(BigQueryWriteHelper.scala:98)
at com.google.cloud.spark.bigquery.BigQueryWriteHelper.writeDataFrameToBigQuery(BigQueryWriteHelper.scala:68)
... 33 more
I've checked schema right before uploading and the field isn't nullable.
If I'm trying to replace the whole table (not just a partition), then upload succeed, but I'm getting a table where all fields are nullable.
With parquet I didn't experience such issue, but the reason to use orc is another one: https://github.com/GoogleCloudDataproc/spark-bigquery-connector/issues/101
@simplylizz Not solving (yet) the orc case, but have you tried to use Avro as the intermediate format?
No, since its support was implemented just recently. Also I'm not sure that I'll be able to test this in the foreseeable future.
Thanks for the update.
This is a real issue.
I have an array column which when gets inserted in bigquery, changes the column name and adds list.element
inside. As per suggestion, if I change the intermediate format to orc
, then orc changes the required columns to nullable and for avro I need to change my cluster setup.
There should be a proper fix for this. :(. Can anyhelp help?
Unfortunately we dependent on the underlying libraries for the intermediate part, but we are working on a fix.
Has anyone found a solution for this?
Have you tried the direct write method? It also should provide better write performance