spark-deep-learning icon indicating copy to clipboard operation
spark-deep-learning copied to clipboard

Unexpected results when loading images with readImages

Open sdikby opened this issue 8 years ago • 15 comments

Hi,

i am getting unexpected results when loading images (*.jpg) with readImages() from a directory in Hdfs. I am getting nulls instead of the binary image data

Following the results stored as a *.csv file with image_df.repartition(1).write.format("csv").save("/path/to/output_csv")

hdfs://path/to/test_data/Koala.jpg,"[[B@26e4b33,768,null,3,null]"
hdfs://path/to/test_data/Hydrangeas.jpg,"[[B@170b0284,768,null,3,null]"
hdfs://path/to/test_data/Lighthouse.jpg,"[[B@53d233ed,768,null,3,null]"
hdfs://path/to/MA/test_data/Desert.jpg,"[[B@dcacdc,768,null,3,null]"
hdfs://path/to/MA/test_data/Jellyfish.jpg,"[[B@17d89ff5,768,null,3,null]"
hdfs://path/to/test_data/Penguins.jpg,"[[B@eed9a0e,768,null,3,null]"
hdfs://path/to/test_data/Chrysanthemum.jpg,"[[B@111d14ca,768,null,3,null]"
hdfs://path/to/test_data/Tulips.jpg,"[[B@c3fe28d,768,null,3,null]"

ENV: -cdh.5.7.0 -spark 1.60 -Anaconda 4.2.0

Any help is appreciated.

sdikby avatar Aug 16 '17 20:08 sdikby

Do you still see nulls if you inspect the DataFrame itself (before writing out)? (e.g. df.show())

cc @thunterdb -- should writing out the binary data as csv work here?

sueann avatar Aug 16 '17 20:08 sueann

This is what i get when doing df.show(). It does not seem to show all of the df though

+--------------------+--------------------+ | filePath| image| +--------------------+--------------------+ |hdfs://hadoop-ser...|[[B@6ad79116,768,...| |hdfs://hadoop-ser...|[[B@11104c4a,768,...| |hdfs://hadoop-ser...|[[B@45009706,768,...| |hdfs://hadoop-ser...|[[B@29bb2e4e,768,...| |hdfs://hadoop-ser...|[[B@3fdd1849,768,...| |hdfs://hadoop-ser...|[[B@16175c73,768,...| |hdfs://hadoop-ser...|[[B@45caa136,768,...| |hdfs://hadoop-ser...|[[B@11104c4a,768,...| +--------------------+--------------------+

sdikby avatar Aug 16 '17 20:08 sdikby

ping @sueann @thunterdb

sdikby avatar Aug 18 '17 05:08 sdikby

I have not investigated further than that, but Spark is not behaving as you would like here. It is taking a string representation of the java object that contains the row, and by default in java, the arrays are just printed by their pointers (the [[B@6ad79116 elements). It seems to be a spark issue independent of images, which I will try to reproduce.

In the meantime, though, can you store your images in the parquet format, for example? It will be more compact, and high quality readers exist for various languages.

thunterdb avatar Aug 18 '17 06:08 thunterdb

The binarized image data is also not persisted. As you see i am getting NULLs everywhere. Do you mean instead of storing it as CSV file, i should do it as a parquet file? If so you suppose that each field in my DF is getting properly extracted and the problem happens while storing the file??? Or did i misunderstand you

sdikby avatar Aug 18 '17 06:08 sdikby

@sdikby can you try image_df.printSchema()? It should show something like the following.

root
 |-- filePath: string (nullable = false)
 |-- image: struct (nullable = true)
 |    |-- mode: string (nullable = false)
 |    |-- height: integer (nullable = false)
 |    |-- width: integer (nullable = false)
 |    |-- nChannels: integer (nullable = false)
 |    |-- data: binary (nullable = false)

In fact, if the schema is correctly inferred, Spark will not allow the DataFrame to be saved as CSV.

java.lang.UnsupportedOperationException: CSV data source does not support
struct<mode:string,height:int,width:int,nChannels:int,data:binary> data type.

Is there any chance you can try using Spark 2.0+ versions (best if using the latest, 2.2). As @thunterdb has mentioned, it is likely a Spark 1.6 issue.

phi-dbq avatar Aug 18 '17 07:08 phi-dbq

I don't think that i can afford to install a new spark version. Is there a way that i could make it spark 1.6 compatible? My use case is only to persist image data + image metadata on HDFS to process them later with spark (simple image processing algorithms). For this i don't need the whole sparkdl but only imageIO

sdikby avatar Aug 18 '17 08:08 sdikby

@phi-dbq here what i get when i do image_df.printSchema()

root
 |-- filePath: string (nullable = false)
 |-- image: struct (nullable = true)
 |    |-- mode: string (nullable = false)
 |    |-- height: integer (nullable = false)
 |    |-- width: integer (nullable = false)
 |    |-- nChannels: integer (nullable = false)
 |    |-- data: binary (nullable = false)

Does it mean something?

sdikby avatar Aug 18 '17 15:08 sdikby

@sdikby that looks good. It will be great if you can try loading the file from a local file system instead of HDFS. Then see what image_df.show() looks like (it is supposed to only print a few rows).

phi-dbq avatar Aug 18 '17 15:08 phi-dbq

@phi-dbq i have followed your suggestion. For that i used the following instructions

  1. pyspark --master local[1] --py-files /path/to/ImageIO.py
  2. from ImageIO import readImages
  3. image_df = readImages("file:///path/to/test_data/")
  4. image_df.show() I get the following result:
+--------------------+--------------------+
|            filePath|               image|
+--------------------+--------------------+
|file:///path/to/tes...|[[B@12b97ab1,768,...|
|file:///path/to/tes...|[[B@3ae4353c,768,...|
|file:///path/to/tes...|[[B@5c631458,768,...|
|file:///path/to/tes...|[[B@6cccccca,768,...|
|file:///path/to/tes...|[[B@79991993,768,...|
|file:///path/to/tes...|[[B@325e4043,768,...|
|file:///path/to/tes...|[[B@49cd088a,768,...|
|file:///path/to/tes...|[[B@33ebc1e0,768,...|
+--------------------+--------------------+

So? does it help to workaround this unexpected behavior in spark 1.6.0 ? I will be glad to help make it (at least the ImageIO) spark 1.6.0 compatible

sdikby avatar Aug 19 '17 11:08 sdikby

I was able to reproduce the same behavior using spark 1.6.0. @sdikby It is strongly recommended that you persist the data in parquet (instead of CSV).

There are two issues

  1. A struct type JVM <-> Python data marshaling bug has been fixed in later versions. For 1.6.0, the workaround is to order the struct type alphanumerically w.r.t the field name. You can modify your ImageIO.py with the following.
imageSchema = StructType([
    StructField("mode", StringType(), False),
    StructField("height", IntegerType(), False),
    StructField("width", IntegerType(), False),
    StructField("nChannels", IntegerType(), False),
    StructField("data", BinaryType(), False),
])
# Add the following line after imageSchema declaration 
imageSchema = StructType(sorted(imageSchema.fields, key=lambda fd: fd.name))
  1. The DataFrame show function in 1.6.0 was not able to correctly display struct types. It has been fixed in later versions. This however does not affect the data in the DataFrame. Please ignore it.

phi-dbq avatar Aug 19 '17 18:08 phi-dbq

@phi-dbq Thanks! your help is very much appreciated. I did what you suggested. I saved the data with the following image_df.write.parquet("/path/to/image_dataset") Did you intentionally left a , (comma) after the StructField("data", BinaryType(), False), because it was not there?

And i got the following stack_trace: (Please excuse me. I am not very experienced with spark and certainly not with debugging spark apps)

org.apache.spark.api.python.PythonException: Traceback (most recent call last):
 File "/opt/cloudera/parcels/CDH-5.7.0-1.cdh5.7.0.p0.45/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 98, in main
   command = pickleSer._read_with_length(infile)
 File "/opt/cloudera/parcels/CDH-5.7.0-1.cdh5.7.0.p0.45/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length
   return self.loads(obj)
 File "/opt/cloudera/parcels/CDH-5.7.0-1.cdh5.7.0.p0.45/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 422, in loads
   return pickle.loads(obj)
ImportError: No module named sparkdl.image.ImageIO

       at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
       at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
       at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
       at org.apache.spark.sql.execution.BatchPythonEvaluation$$anonfun$doExecute$1.apply(python.scala:405)
       at org.apache.spark.sql.execution.BatchPythonEvaluation$$anonfun$doExecute$1.apply(python.scala:370)
       at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
       at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
       at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
       at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
       at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
       at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
       at org.apache.spark.scheduler.Task.run(Task.scala:89)
       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
       at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
       at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
       at java.lang.Thread.run(Thread.java:745)

17/08/20 00:14:02 ERROR scheduler.TaskSetManager: Task 0 in stage 1.0 failed 4 times; aborting job
17/08/20 00:14:02 ERROR datasources.InsertIntoHadoopFsRelation: Aborting job.
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 6, hadoop-server5): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
 File "/opt/cloudera/parcels/CDH-5.7.0-1.cdh5.7.0.p0.45/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 98, in main
   command = pickleSer._read_with_length(infile)
 File "/opt/cloudera/parcels/CDH-5.7.0-1.cdh5.7.0.p0.45/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length
   return self.loads(obj)
 File "/opt/cloudera/parcels/CDH-5.7.0-1.cdh5.7.0.p0.45/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 422, in loads
   return pickle.loads(obj)
ImportError: No module named sparkdl.image.ImageIO

       at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
       at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
       at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
       at org.apache.spark.sql.execution.BatchPythonEvaluation$$anonfun$doExecute$1.apply(python.scala:405)
       at org.apache.spark.sql.execution.BatchPythonEvaluation$$anonfun$doExecute$1.apply(python.scala:370)
       at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
       at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
       at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
       at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
       at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
       at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
       at org.apache.spark.scheduler.Task.run(Task.scala:89)
       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
       at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
       at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
       at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
       at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
       at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
       at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
       at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
       at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
       at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
       at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
       at scala.Option.foreach(Option.scala:236)
       at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
       at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
       at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
       at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
       at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
       at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
       at org.apache.spark.SparkContext.runJob(SparkContext.scala:1843)
       at org.apache.spark.SparkContext.runJob(SparkContext.scala:1856)
       at org.apache.spark.SparkContext.runJob(SparkContext.scala:1933)
       at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:150)
       at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
       at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
       at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:53)
       at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108)
       at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:58)
       at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:56)
       at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:70)
       at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
       at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
       at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
       at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
       at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55)
       at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55)
       at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:256)
       at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148)
       at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:139)
       at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:334)
       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
       at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
       at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
       at java.lang.reflect.Method.invoke(Method.java:606)
       at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
       at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
       at py4j.Gateway.invoke(Gateway.java:259)
       at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
       at py4j.commands.CallCommand.execute(CallCommand.java:79)
       at py4j.GatewayConnection.run(GatewayConnection.java:209)
       at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
 File "/opt/cloudera/parcels/CDH-5.7.0-1.cdh5.7.0.p0.45/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 98, in main
   command = pickleSer._read_with_length(infile)
 File "/opt/cloudera/parcels/CDH-5.7.0-1.cdh5.7.0.p0.45/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length
   return self.loads(obj)
 File "/opt/cloudera/parcels/CDH-5.7.0-1.cdh5.7.0.p0.45/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 422, in loads
   return pickle.loads(obj)
ImportError: No module named sparkdl.image.ImageIO

       at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
       at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
       at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
       at org.apache.spark.sql.execution.BatchPythonEvaluation$$anonfun$doExecute$1.apply(python.scala:405)
       at org.apache.spark.sql.execution.BatchPythonEvaluation$$anonfun$doExecute$1.apply(python.scala:370)
       at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
       at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
       at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
       at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
       at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
       at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
       at org.apache.spark.scheduler.Task.run(Task.scala:89)
       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
       at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
       at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
       ... 1 more
17/08/20 00:14:02 ERROR datasources.DefaultWriterContainer: Job job_201708200013_0000 aborted.
17/08/20 00:14:02 WARN spark.ExecutorAllocationManager: No stages are running, but numRunningTasks != 0
Traceback (most recent call last):
 File "/path/to/test_ImageIO.py", line 36, in <module>
   image_df.write.parquet("/path/to/image_dataset")
 File "/opt/cloudera/parcels/CDH-5.7.0-1.cdh5.7.0.p0.45/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 471, in parquet
 File "/opt/cloudera/parcels/CDH-5.7.0-1.cdh5.7.0.p0.45/lib/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in __call__
 File "/opt/cloudera/parcels/CDH-5.7.0-1.cdh5.7.0.p0.45/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 45, in deco
 File "/opt/cloudera/parcels/CDH-5.7.0-1.cdh5.7.0.p0.45/lib/spark/python/lib/py4j-0.9-src.zip/py4j/protocol.py", line 308, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o68.parquet.
: org.apache.spark.SparkException: Job aborted.
       at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:156)
       at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
       at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
       at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:53)
       at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108)
       at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:58)
       at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:56)
       at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:70)
       at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
       at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
       at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
       at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
       at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55)
       at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55)
       at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:256)
       at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148)
       at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:139)
       at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:334)
       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
       at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
       at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
       at java.lang.reflect.Method.invoke(Method.java:606)
       at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
       at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
       at py4j.Gateway.invoke(Gateway.java:259)
       at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
       at py4j.commands.CallCommand.execute(CallCommand.java:79)
       at py4j.GatewayConnection.run(GatewayConnection.java:209)
       at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 6, hadoop-server5): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
 File "/opt/cloudera/parcels/CDH-5.7.0-1.cdh5.7.0.p0.45/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 98, in main
   command = pickleSer._read_with_length(infile)
 File "/opt/cloudera/parcels/CDH-5.7.0-1.cdh5.7.0.p0.45/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length
   return self.loads(obj)
 File "/opt/cloudera/parcels/CDH-5.7.0-1.cdh5.7.0.p0.45/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 422, in loads
   return pickle.loads(obj)
ImportError: No module named sparkdl.image.ImageIO

       at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
       at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
       at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
       at org.apache.spark.sql.execution.BatchPythonEvaluation$$anonfun$doExecute$1.apply(python.scala:405)
       at org.apache.spark.sql.execution.BatchPythonEvaluation$$anonfun$doExecute$1.apply(python.scala:370)
       at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
       at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
       at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
       at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
       at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
       at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
       at org.apache.spark.scheduler.Task.run(Task.scala:89)
       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
       at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
       at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
       at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
       at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
       at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
       at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
       at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
       at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
       at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
       at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
       at scala.Option.foreach(Option.scala:236)
       at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
       at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
       at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
       at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
       at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
       at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
       at org.apache.spark.SparkContext.runJob(SparkContext.scala:1843)
       at org.apache.spark.SparkContext.runJob(SparkContext.scala:1856)
       at org.apache.spark.SparkContext.runJob(SparkContext.scala:1933)
       at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:150)
       ... 28 more
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
 File "/opt/cloudera/parcels/CDH-5.7.0-1.cdh5.7.0.p0.45/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 98, in main
   command = pickleSer._read_with_length(infile)
 File "/opt/cloudera/parcels/CDH-5.7.0-1.cdh5.7.0.p0.45/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length
   return self.loads(obj)
 File "/opt/cloudera/parcels/CDH-5.7.0-1.cdh5.7.0.p0.45/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 422, in loads
   return pickle.loads(obj)
ImportError: No module named sparkdl.image.ImageIO

       at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
       at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
       at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
       at org.apache.spark.sql.execution.BatchPythonEvaluation$$anonfun$doExecute$1.apply(python.scala:405)
       at org.apache.spark.sql.execution.BatchPythonEvaluation$$anonfun$doExecute$1.apply(python.scala:370)
       at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
       at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
       at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
       at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
       at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
       at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
       at org.apache.spark.scheduler.Task.run(Task.scala:89)
       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
       at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
       at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
       ... 1 more

sdikby avatar Aug 19 '17 22:08 sdikby

Hi spark-deep-learning team (@sueann @thunterdb @phi-dbq),

since my last comment i tried to solve my problem in spark 1.6 with only rdd. I have succeeded to implement a partial first prototype. For that, i followed the logic of ImageIO. The result of the transformation of an image to image metadata+ image bytearray is a text file where each line represents an image and each attribute is separated by a custom delimiter.

Now i need you help to how to get the inverse result which means from one line from a tesxt file i get the original image back. Here how am i doing it (code sample).

def imageType(mode):
    """
    Get type information about the image.
    :param imageRow: spark image row.
    :return: ImageType
    """
    return sparkModeLookup[mode]


def imageStructToArray(mode, height, width, nChannels, data):
    """
    Convert an image to a numpy array.
    :param imageRow: Row, must use imageSchema.
    :return: ndarray, image data.
    """
    imType = imageType(mode)
    shape = (height, width, nChannels)
    return np.ndarray(shape, imType.dtype, data)

imageDataRDD = parsedImageMetaDataRDD.map(
        lambda x: imageStructToArray(x[1].encode('utf-8'), int(x[2]), int(x[3]), int(x[4]), x[5].encode('utf-8')))
# x[1]==mode, x[2]==height, x[3]==width, x[4]==nChannels and x[5]==image_data
qw = imageDataRDD.collect()
for i,x in enumerate(qw):
#img = Image.fromarray(x)
#img.save("/path/to/image/"+str(i)+".jpg")
image = Image.open(BytesIO(x))
image.save("/path/to/image/"+str(i)+".jpg")

The part where i collect my data and save it is just to check if get back the original image. Which i don't unfortunately. Following two image sample that i get as a result. 0 1

Now i would ask you if you could help me. What am i doing wrong? and what is the right way to transform a bytearray stored in a text file to a numpy array and then back to the original image. Your help is very much appreciated

sdikby avatar Oct 04 '17 10:10 sdikby

I have isolated the issue. The problem is that spark does not read and/or write the stored bytearray(imgArray.tobytes()) correctly after extraction. How did i reach this result? Instead of storing the bytearray(imgArray.tobytes()) and other metatdaa data like length, width and nChannels directly after extraction; I tried to reconvert these variables back to the original image to check if the problem was during the conversion and extraction of the metadata and the image bytes. So i did the following:

imType = imageType.dtype
shape = (height, width, nChannels)
im_array = np.ndarray(shape, imType, data)
img = Image.fromarray(im_array)
img .save("/path/to/image.jpg")

And i get my original image back. So the problem is really reading and/or writing the bytearray(imgArray.tobytes()) (the other metadata are read correctly even after storing them on hdfs).

I know that this problem is not directly related to this opened issue, but i will be very grateful if someone here could help. Maybe, when this issue is solved, i could open a pull request for a spark 1.6 compatible ImageIO ;)

sdikby avatar Oct 04 '17 16:10 sdikby

Hi there - I cam across this thread while investigating a similar error : ImportError: No module named sparkdl.image.imageIO. Could you share how you resolved this? Thanks!

hainingren avatar Nov 10 '17 21:11 hainingren