spark-tfrecord
spark-tfrecord copied to clipboard
java.lang.RuntimeException: unsupported type ...
hi, @junshi15, i load a tfrecord file but got java.lang.RuntimeException: unsupported type ... error. Is it related to empty feature value ?
df = spark.read.format("tfrecord").option("recordType", "Example").load(train_path)
the tfrecord is like this:
features { feature { key: "k1" value { float_list { value: 0.0 } } } feature { key: "k2" value { float_list { value: 0.0 value: 0.0 } } } feature { key: "k3" value { } } }
traceback: `Py4JJavaError: An error occurred while calling o58.load. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, 10.240.15.46, executor 0): java.lang.RuntimeException: unsupported type ... at com.linkedin.spark.datasources.tfrecord.TensorFlowInferSchema$.com$linkedin$spark$datasources$tfrecord$TensorFlowInferSchema$$inferField(TensorFlowInferSchema.scala:136) at com.linkedin.spark.datasources.tfrecord.TensorFlowInferSchema$$anonfun$inferFeatureTypes$1.apply(TensorFlowInferSchema.scala:78) at com.linkedin.spark.datasources.tfrecord.TensorFlowInferSchema$$anonfun$inferFeatureTypes$1.apply(TensorFlowInferSchema.scala:76) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at com.linkedin.spark.datasources.tfrecord.TensorFlowInferSchema$.inferFeatureTypes(TensorFlowInferSchema.scala:76) at com.linkedin.spark.datasources.tfrecord.TensorFlowInferSchema$.com$linkedin$spark$datasources$tfrecord$TensorFlowInferSchema$$inferExampleRowType(TensorFlowInferSchema.scala:71) at com.linkedin.spark.datasources.tfrecord.TensorFlowInferSchema$$anonfun$1.apply(TensorFlowInferSchema.scala:39) at com.linkedin.spark.datasources.tfrecord.TensorFlowInferSchema$$anonfun$1.apply(TensorFlowInferSchema.scala:39) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336) at scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:214) at scala.collection.AbstractIterator.aggregate(Iterator.scala:1336) at org.apache.spark.rdd.RDD$$anonfun$aggregate$1$$anonfun$21.apply(RDD.scala:1116) at org.apache.spark.rdd.RDD$$anonfun$aggregate$1$$anonfun$21.apply(RDD.scala:1116) at org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:2130) at org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:2130) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1602) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1590) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1589) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1589) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1823) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1772) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1761) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2131) at org.apache.spark.rdd.RDD$$anonfun$aggregate$1.apply(RDD.scala:1118) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:363) at org.apache.spark.rdd.RDD.aggregate(RDD.scala:1111) at com.linkedin.spark.datasources.tfrecord.TensorFlowInferSchema$.apply(TensorFlowInferSchema.scala:39) at com.linkedin.spark.datasources.tfrecord.DefaultSource.inferSchema(DefaultSource.scala:47) at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$8.apply(DataSource.scala:202) at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$8.apply(DataSource.scala:202) at scala.Option.orElse(Option.scala:289) at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:201) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:392) at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:174) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748)`
I have not seen this error before, but it is possibly due to null values. If you can provide a small sample file, I can give it a try.
I have not seen this error before, but it is possibly due to null values. If you can provide a small sample file, I can give it a try.
i have sent an email, please give a try, thx.
Thanks for sharing the tfrecord file.
I notice some empty features in your file. Spark-TFRecord can not infer the schema from the empty features.
...
feature {
key: "sparse_seq_clk90_b"
value {
}
}
feature {
key: "sparse_seq_clk90_c"
value {
}
}
...
Spark-TFRecord does not handle null values well. I saw two issues.
- if there is a null value, the schema can not be inferred, even if the null value only appears once in the column.
- The above can be worked around by providing schema using
.schema(your_schema)
, then the failure happens downstream, when reading the dataframe.
This is an enhancement worth spending time on. Unfortunately, we do not have bandwidth at this moment. Pull requests are encouraged.
Thanks for sharing the tfrecord file.
I notice some empty features in your file. Spark-TFRecord can not infer the schema from the empty features.
... feature { key: "sparse_seq_clk90_b" value { } } feature { key: "sparse_seq_clk90_c" value { } } ...
Spark-TFRecord does not handle null values well. I saw two issues.
- if there is a null value, the schema can not be inferred, even if the null value only appears once in the column.
- The above can be worked around by providing schema using
.schema(your_schema)
, then the failure happens downstream, when reading the dataframe.This is an enhancement worth spending time on. Unfortunately, we do not have bandwidth at this moment. Pull requests are encouraged.
But the schema may be very long, and it is not flexible by providing schema.
The problem is not only the schema. Even you provide the schema, the reader will still fail. So unfortunately the null values are not supported well. My suggestion is to not include null/empty values, until better support is available.
The problem is not only the schema. Even you provide the schema, the reader will still fail. So unfortunately the null values are not supported well. My suggestion is to not include null/empty values, until better support is available.
ok, i got it. Thanks for your reply and looking forward to better support.