spark-dgraph-connector
spark-dgraph-connector copied to clipboard
WideNodeEncoder fails on multiple values per predicate
Hi,
When I run the following code:
./spark-shell --packages graphframes:graphframes:0.8.1-spark3.0-s_2.12,uk.co.gresearch.spark:spark-dgraph-connector_2.12:0.7.0-3.1
import uk.co.gresearch.spark.dgraph.graphframes._
import org.graphframes._
val graph: GraphFrame = spark.read.dgraph.graphframes("localhost:9080")
graph.vertices.show()
I get the following error:
21/10/18 12:12:18 ERROR WideNodeEncoder: failed to encode node: {"uid":"0x80e4d3","Entity.identifier":"6","Entity.label":"[email protected]","Entity.type":"email","Entity.updateTS":"2021-10-18T12:11:23+02:00","dgraph.type":["Entity","EmailEntity"]}
21/10/18 12:12:18 ERROR Executor: Exception in task 3.0 in stage 5.0 (TID 14)
java.lang.IllegalStateException
at com.google.gson.JsonArray.getAsString(JsonArray.java:133)
at uk.co.gresearch.spark.dgraph.connector.encoder.JsonNodeInternalRowEncoder.getValue(JsonNodeInternalRowEncoder.scala:110)
at uk.co.gresearch.spark.dgraph.connector.encoder.JsonNodeInternalRowEncoder.getValue$(JsonNodeInternalRowEncoder.scala:104)
at uk.co.gresearch.spark.dgraph.connector.encoder.WideNodeEncoder.getValue(WideNodeEncoder.scala:35)
at uk.co.gresearch.spark.dgraph.connector.encoder.WideNodeEncoder.$anonfun$toNode$5(WideNodeEncoder.scala:118)
at uk.co.gresearch.spark.dgraph.connector.encoder.WideNodeEncoder.$anonfun$toNode$5$adapted(WideNodeEncoder.scala:116)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
at uk.co.gresearch.spark.dgraph.connector.encoder.WideNodeEncoder.toNode(WideNodeEncoder.scala:116)
at uk.co.gresearch.spark.dgraph.connector.encoder.WideNodeEncoder.$anonfun$fromJson$1(WideNodeEncoder.scala:92)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:494)
at uk.co.gresearch.spark.dgraph.connector.TriplePartitionReader.get(TriplePartitionReader.scala:30)
at uk.co.gresearch.spark.dgraph.connector.TriplePartitionReader.get(TriplePartitionReader.scala:23)
at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.next(DataSourceRDD.scala:89)
at org.apache.spark.sql.execution.datasources.v2.MetricsRowIterator.next(DataSourceRDD.scala:124)
at org.apache.spark.sql.execution.datasources.v2.MetricsRowIterator.next(DataSourceRDD.scala:121)
at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:40)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:346)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
My suspicion is that it fails because I have multiple entries in dgraph.type. Could you test this scenario? Or is there something else in the json above that fails to encode? Thanks!
You are right, list properties / array values / multiple values per predicate are not supported (#69). This is definitively a required feature. Not sure when I will get back to this so contribution welcome.
I'm more than happy to give a PR a go, any pointers you can give me on where in the code is best to begin?
Excellent, here is what I would do:
First lets simplify the example above and get rid of the GraphFrames dependency as GraphFrames internally uses the wide node mode:
import uk.co.gresearch.spark.dgraph.connector._
val nodes = spark.read.option(NodesModeOption, NodesModeWideOption).dgraph.nodes("localhost:9080")
Then what we want is that nodes.printSchema()
tells us that dgraph.type
is an array of strings:
root
|-- subject: long (nullable = false)
|-- dgraph.type: array (nullable = false)
| |-- element: string (containsNull = true)
...
So we expect the DataFrame to contain string arrays as elements in the dgraph.type
column.
Multiple values per predicate are indicated by the list
attribute in the Dgraph schema. The SchemaProvider
fetches the schema from Dgraph. Its current query "schema { predicate type lang }"
excludes the list information, so this should read "schema { predicate type lang list }"
. Then Predicate
need to store that list information.
The WideNodeEncoder
is central here. In toStructField
, a given Predicate
is translated into a Spark type. So when list
is true, this needs to be ArrayType
.
In toNode
it turns a JSON object like
{
"uid": "0x3",
"name": "Luke Skywalker"
}
into a Seq[Any] = Seq(3, "Luke Skywalker")
and eventually into a Spark InternalRow
. It uses JsonNodeInternalRowEncoder
to parse the individual JSON values into their corresponding Scala types, here long
and String
, which need to be able to parse lists of values.
This all should be implemented test-driven through TestWideNodeEncoder
. Maybe it is also time to refactor SchemaProvider
so that the JSON parsing bit in getSchema
is decoupled from the DgraphClient
bit and can be tested by giving it some test JSON strings. You could also introduce a test for JsonNodeInternalRowEncoder
as currently its methods are only tested through the derived classes, not directly. But do it as you prefer.
I hope these pointers help you to get started.