spark-fits icon indicating copy to clipboard operation
spark-fits copied to clipboard

On the multifile problem in spark-fits

Open JulienPeloton opened this issue 7 years ago • 4 comments

The current implementation is not great for reading many files (100+).

Current implementation, and why this is not great.

The way we read and distribute the data from many files is by:

  1. list all the files to read,
  2. for each file create a RDD from the HDU data,
  3. make the union of RDD recursively
// Union if more than one file
for ((file, index) <- fns.slice(1, nFiles).zipWithIndex) {
  rdd = if (implemented) {
    rdd.union(loadOneHDU(file))
  } else {
    rdd.union(loadOneEmpty)
  }
}

While it is very simple and it works great for a small number of files, it completely explodes when the number of files gets big (100+). There are two problems: (1) the RDD lineage gets horribly long (to enforce fault tolerance, the DAG keeps track of every single RDD) and Spark overhead is going to absolutely dominate anything, and (2) for file size smaller than the partition size the union of RDD is creating one (quasi-empty) partition per file hence getting super sub-optimal. Here is the DAG for reading 3 files where you can see how the final RDD is created:

screen shot 2018-10-18 at 07 54 17

Moreover for large number of files (i.e. large number of rdd.union), you are not just optimal, you run into deeper problem

Exception in thread "dag-scheduler-event-loop" java.lang.StackOverflowError
       at java.io.ObjectStreamClass$WeakClassKey.<init>(ObjectStreamClass.java:2505)
       at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:348)
       at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1134)
       at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
       at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
       at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
       at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
       at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
       at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
       at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)

Small fix, and why this is not great either.

Instead of performing multiple RDD unions, one can squeeze into one union of all the RDD in once (changing point c. above). For that, one can use the method union of the sparkContext directly:

// Union if more than one file
val rdd = if (implemented) {
  sqlContext.sparkContext.union(
    fns.map(file => loadOneHDU(file))
  )
} else {
  sqlContext.sparkContext.union(
    fns.map(file => loadOneEmpty)
  )
}

The DAG gets updated nicely:

screen shot 2018-10-18 at 07 53 59

and I could go up to 1000 files. While I do not encounter the StackOverflowError anymore, for more than 1000 files I got the nasty:

2018-10-18 08:33:30 WARN  TransportChannelHandler:78 - Exception in connection from /134.158.75.162:47942
java.lang.OutOfMemoryError: GC overhead limit exceeded
	at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:357)
	at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:681)
	at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1884)
	at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1750)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2041)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:430)
	at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
	at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:108)
	at org.apache.spark.rpc.netty.NettyRpcEnv$$anonfun$deserialize$1$$anonfun$apply$1.apply(NettyRpcEnv.scala:271)

Of course! Looking at the DAG, it is obvious that this series of union cannot scale... We need to implement a different approach.

How this could be fixed

We could instead forget about this union strategy, and focus on what is used by other connectors (PartitionedFile).

What should really be done

Rewrite everything for complying with V2... ;-)

JulienPeloton avatar Oct 18 '18 06:10 JulienPeloton

Problem solved in #55 ! For the record, after the fix, the DAG is:

screen shot 2018-10-18 at 14 49 06

No more explicit union! Under the hood it uses PartitionedFile but the only things I had to do was to tell Spark directly that I have many files... (and not creating RDD one-by-one + union...).

JulienPeloton avatar Oct 18 '18 12:10 JulienPeloton

Still need to understand the error when reading >> 10000 files at once... Keeping this issue open.

JulienPeloton avatar Oct 19 '18 11:10 JulienPeloton

Hi!

Is there any fix yet for reading > 10000 files yet?

Thanks!

ptallada avatar Jan 17 '21 22:01 ptallada

Hi @ptallada,

Unfortunately there has been no work on this recently - and I have little time these days. Eventually, this would be solved when spark-fits moves to Spark Datasource V2 (stalled work for the moment).

JulienPeloton avatar Jan 18 '21 20:01 JulienPeloton