Gaffer
Gaffer copied to clipboard
ImportRDDOfElements on Accumulo fails when run on a cluster
For example:
val addRdd = new ImportRDDOfElements.Builder()
.input(elementsRdd)
.option("outputPath", "output")
.option("failurePath", "failure")
.build()
graph.execute(addRdd, user)
Exception is:
Exception in thread "main" java.io.NotSerializableException: uk.gov.gchq.gaffer.accumulostore.key.core.impl.byteEntity.ByteEntityAccumuloElementConverter
Serialization stack:
- object not serializable (class: uk.gov.gchq.gaffer.accumulostore.key.core.impl.byteEntity.ByteEntityAccumuloElementConverter, value: uk.gov.gchq.gaffer.accumulostore.key.core.impl.byteEntity.ByteEntityAccumuloElementConverter@333813e7)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$blockifyObject$2.apply(TorrentBroadcast.scala:276)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$blockifyObject$2.apply(TorrentBroadcast.scala:276)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337)
at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:277)
at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:126)
at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:88)
at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:56)
at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1488)
at uk.gov.gchq.gaffer.sparkaccumulo.operation.handler.scalardd.ImportRDDOfElementsHandler.doOperation(ImportRDDOfElementsHandler.java:56)
at uk.gov.gchq.gaffer.sparkaccumulo.operation.handler.scalardd.ImportRDDOfElementsHandler.doOperation(ImportRDDOfElementsHandler.java:43)
at uk.gov.gchq.gaffer.sparkaccumulo.operation.handler.scalardd.ImportRDDOfElementsHandler.doOperation(ImportRDDOfElementsHandler.java:35)
at uk.gov.gchq.gaffer.store.Store.handleOperation(Store.java:689)
at uk.gov.gchq.gaffer.store.operation.handler.OperationChainHandler.doOperation(OperationChainHandler.java:47)
at uk.gov.gchq.gaffer.store.operation.handler.OperationChainHandler.doOperation(OperationChainHandler.java:35)
at uk.gov.gchq.gaffer.store.Store.handleOperation(Store.java:689)
at uk.gov.gchq.gaffer.store.Store.execute(Store.java:299)
at uk.gov.gchq.gaffer.store.Store.execute(Store.java:282)
at uk.gov.gchq.gaffer.graph.Graph.execute(Graph.java:200)
at uk.gov.gchq.gaffer.graph.Graph.execute(Graph.java:124)
at uk.gov.gchq.gaffer.lanl.query.spark.PageRankQuery$.main(PageRankQuery.scala:123)
at uk.gov.gchq.gaffer.lanl.query.spark.PageRankQuery.main(PageRankQuery.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:755)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
It does work, but you need to configure kryo serialisation.
"spark.serializer": "org.apache.spark.serializer.KryoSerializer"
@p013570 and what is the status of this one?