spark-bigquery-connector icon indicating copy to clipboard operation
spark-bigquery-connector copied to clipboard

Orc makes all fields nullable

Open simplylizz opened this issue 5 years ago • 7 comments

I'm trying to replace a partition in BQ table using orc as an intermediate format, but it fails with the following error:

20/01/30 10:07:22 INFO com.google.cloud.spark.bigquery.BigQueryWriteHelper: Submitted load to GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=<...>, projectId=<...>, tableId=<...>$19700101}}. jobId: JobId{project=<...>, job=227412f2-2829-48ad-a9b9-1c4aa82c50fb, location=EU}
Traceback (most recent call last):
  File "/tmp/marketing-manual-touchpoints-cost-assignment_3a16cfe0/driver.py", line 28, in <module>
    job_module.compute(spark, job_args)
  File "/hadoop/spark/tmp/spark-82529fc0-149f-49d8-a20f-9ca8fb2d4b14/userFiles-d918a2cb-bd8a-4a6d-aa3c-853de489335e/marketing.zip/bi_etl/lib/marketing/touchpoints_cost_assignment/assign_cost_to_touchpoints.py", line 605, in compute
  File "/hadoop/spark/tmp/spark-82529fc0-149f-49d8-a20f-9ca8fb2d4b14/userFiles-d918a2cb-bd8a-4a6d-aa3c-853de489335e/marketing.zip/bi_etl/lib/marketing/touchpoints_cost_assignment/assign_cost_to_touchpoints.py", line 1142, in compute_manual
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 736, in save
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o119.save.
: java.lang.RuntimeException: Failed to write to BigQuery
	at com.google.cloud.spark.bigquery.BigQueryWriteHelper.writeDataFrameToBigQuery(BigQueryWriteHelper.scala:62)
	at com.google.cloud.spark.bigquery.BigQueryInsertableRelation.insert(BigQueryInsertableRelation.scala:42)
	at com.google.cloud.spark.bigquery.BigQueryRelationProvider.createRelation(BigQueryRelationProvider.scala:86)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryException: Failed to load to <table-name>$19700101 in job JobId{project=<..>, job=227412f2-2829-48ad-a9b9-1c4aa82c50fb, location=EU}. BigQuery error was Provided Schema does not match Table <table-name>$19700101. Field touchpoint_id.web_touchpoint_id.full_visitor_id has changed mode from REQUIRED to NULLABLE
	at com.google.cloud.spark.bigquery.BigQueryWriteHelper.loadDataToBigQuery(BigQueryWriteHelper.scala:98)
	at com.google.cloud.spark.bigquery.BigQueryWriteHelper.writeDataFrameToBigQuery(BigQueryWriteHelper.scala:68)
	... 33 more

I've checked schema right before uploading and the field isn't nullable.

If I'm trying to replace the whole table (not just a partition), then upload succeed, but I'm getting a table where all fields are nullable.

With parquet I didn't experience such issue, but the reason to use orc is another one: https://github.com/GoogleCloudDataproc/spark-bigquery-connector/issues/101

simplylizz avatar Jan 30 '20 10:01 simplylizz

@simplylizz Not solving (yet) the orc case, but have you tried to use Avro as the intermediate format?

davidrabinowitz avatar Jul 31 '20 16:07 davidrabinowitz

No, since its support was implemented just recently. Also I'm not sure that I'll be able to test this in the foreseeable future.

simplylizz avatar Jul 31 '20 17:07 simplylizz

Thanks for the update.

davidrabinowitz avatar Jul 31 '20 18:07 davidrabinowitz

This is a real issue. I have an array column which when gets inserted in bigquery, changes the column name and adds list.element inside. As per suggestion, if I change the intermediate format to orc, then orc changes the required columns to nullable and for avro I need to change my cluster setup.

There should be a proper fix for this. :(. Can anyhelp help?

bsikander avatar Aug 05 '21 18:08 bsikander

Unfortunately we dependent on the underlying libraries for the intermediate part, but we are working on a fix.

davidrabinowitz avatar Aug 12 '21 13:08 davidrabinowitz

Has anyone found a solution for this?

devmessias avatar Sep 27 '22 22:09 devmessias

Have you tried the direct write method? It also should provide better write performance

davidrabinowitz avatar Sep 27 '22 22:09 davidrabinowitz