spark-tfrecord icon indicating copy to clipboard operation
spark-tfrecord copied to clipboard

java.lang.RuntimeException: unsupported type ...

Open seiyagoo opened this issue 4 years ago • 6 comments

hi, @junshi15, i load a tfrecord file but got java.lang.RuntimeException: unsupported type ... error. Is it related to empty feature value ?

df = spark.read.format("tfrecord").option("recordType", "Example").load(train_path)

the tfrecord is like this:

features { feature { key: "k1" value { float_list { value: 0.0 } } } feature { key: "k2" value { float_list { value: 0.0 value: 0.0 } } } feature { key: "k3" value { } } }

traceback: `Py4JJavaError: An error occurred while calling o58.load. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, 10.240.15.46, executor 0): java.lang.RuntimeException: unsupported type ... at com.linkedin.spark.datasources.tfrecord.TensorFlowInferSchema$.com$linkedin$spark$datasources$tfrecord$TensorFlowInferSchema$$inferField(TensorFlowInferSchema.scala:136) at com.linkedin.spark.datasources.tfrecord.TensorFlowInferSchema$$anonfun$inferFeatureTypes$1.apply(TensorFlowInferSchema.scala:78) at com.linkedin.spark.datasources.tfrecord.TensorFlowInferSchema$$anonfun$inferFeatureTypes$1.apply(TensorFlowInferSchema.scala:76) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at com.linkedin.spark.datasources.tfrecord.TensorFlowInferSchema$.inferFeatureTypes(TensorFlowInferSchema.scala:76) at com.linkedin.spark.datasources.tfrecord.TensorFlowInferSchema$.com$linkedin$spark$datasources$tfrecord$TensorFlowInferSchema$$inferExampleRowType(TensorFlowInferSchema.scala:71) at com.linkedin.spark.datasources.tfrecord.TensorFlowInferSchema$$anonfun$1.apply(TensorFlowInferSchema.scala:39) at com.linkedin.spark.datasources.tfrecord.TensorFlowInferSchema$$anonfun$1.apply(TensorFlowInferSchema.scala:39) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336) at scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:214) at scala.collection.AbstractIterator.aggregate(Iterator.scala:1336) at org.apache.spark.rdd.RDD$$anonfun$aggregate$1$$anonfun$21.apply(RDD.scala:1116) at org.apache.spark.rdd.RDD$$anonfun$aggregate$1$$anonfun$21.apply(RDD.scala:1116) at org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:2130) at org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:2130) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1602) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1590) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1589) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1589) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1823) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1772) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1761) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2131) at org.apache.spark.rdd.RDD$$anonfun$aggregate$1.apply(RDD.scala:1118) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:363) at org.apache.spark.rdd.RDD.aggregate(RDD.scala:1111) at com.linkedin.spark.datasources.tfrecord.TensorFlowInferSchema$.apply(TensorFlowInferSchema.scala:39) at com.linkedin.spark.datasources.tfrecord.DefaultSource.inferSchema(DefaultSource.scala:47) at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$8.apply(DataSource.scala:202) at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$8.apply(DataSource.scala:202) at scala.Option.orElse(Option.scala:289) at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:201) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:392) at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:174) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748)`

seiyagoo avatar Jan 07 '21 07:01 seiyagoo

I have not seen this error before, but it is possibly due to null values. If you can provide a small sample file, I can give it a try.

junshi15 avatar Jan 07 '21 22:01 junshi15

I have not seen this error before, but it is possibly due to null values. If you can provide a small sample file, I can give it a try.

i have sent an email, please give a try, thx.

seiyagoo avatar Jan 15 '21 08:01 seiyagoo

Thanks for sharing the tfrecord file.

I notice some empty features in your file. Spark-TFRecord can not infer the schema from the empty features.

...
  feature {
    key: "sparse_seq_clk90_b"
    value {
    }
  }
  feature {
    key: "sparse_seq_clk90_c"
    value {
    }
  }
...

Spark-TFRecord does not handle null values well. I saw two issues.

  1. if there is a null value, the schema can not be inferred, even if the null value only appears once in the column.
  2. The above can be worked around by providing schema using .schema(your_schema), then the failure happens downstream, when reading the dataframe.

This is an enhancement worth spending time on. Unfortunately, we do not have bandwidth at this moment. Pull requests are encouraged.

junshi15 avatar Jan 18 '21 22:01 junshi15

Thanks for sharing the tfrecord file.

I notice some empty features in your file. Spark-TFRecord can not infer the schema from the empty features.

...
  feature {
    key: "sparse_seq_clk90_b"
    value {
    }
  }
  feature {
    key: "sparse_seq_clk90_c"
    value {
    }
  }
...

Spark-TFRecord does not handle null values well. I saw two issues.

  1. if there is a null value, the schema can not be inferred, even if the null value only appears once in the column.
  2. The above can be worked around by providing schema using .schema(your_schema), then the failure happens downstream, when reading the dataframe.

This is an enhancement worth spending time on. Unfortunately, we do not have bandwidth at this moment. Pull requests are encouraged.

But the schema may be very long, and it is not flexible by providing schema.

seiyagoo avatar Jan 19 '21 03:01 seiyagoo

The problem is not only the schema. Even you provide the schema, the reader will still fail. So unfortunately the null values are not supported well. My suggestion is to not include null/empty values, until better support is available.

junshi15 avatar Jan 19 '21 03:01 junshi15

The problem is not only the schema. Even you provide the schema, the reader will still fail. So unfortunately the null values are not supported well. My suggestion is to not include null/empty values, until better support is available.

ok, i got it. Thanks for your reply and looking forward to better support.

seiyagoo avatar Jan 19 '21 03:01 seiyagoo