spark-dgraph-connector icon indicating copy to clipboard operation
spark-dgraph-connector copied to clipboard

WideNodeEncoder fails on multiple values per predicate

Open RJKeevil opened this issue 3 years ago • 3 comments

Hi,

When I run the following code:

./spark-shell --packages graphframes:graphframes:0.8.1-spark3.0-s_2.12,uk.co.gresearch.spark:spark-dgraph-connector_2.12:0.7.0-3.1

import uk.co.gresearch.spark.dgraph.graphframes._
import org.graphframes._
val graph: GraphFrame = spark.read.dgraph.graphframes("localhost:9080")

graph.vertices.show()

I get the following error:

21/10/18 12:12:18 ERROR WideNodeEncoder: failed to encode node: {"uid":"0x80e4d3","Entity.identifier":"6","Entity.label":"[email protected]","Entity.type":"email","Entity.updateTS":"2021-10-18T12:11:23+02:00","dgraph.type":["Entity","EmailEntity"]}
21/10/18 12:12:18 ERROR Executor: Exception in task 3.0 in stage 5.0 (TID 14)
java.lang.IllegalStateException
        at com.google.gson.JsonArray.getAsString(JsonArray.java:133)
        at uk.co.gresearch.spark.dgraph.connector.encoder.JsonNodeInternalRowEncoder.getValue(JsonNodeInternalRowEncoder.scala:110)
        at uk.co.gresearch.spark.dgraph.connector.encoder.JsonNodeInternalRowEncoder.getValue$(JsonNodeInternalRowEncoder.scala:104)
        at uk.co.gresearch.spark.dgraph.connector.encoder.WideNodeEncoder.getValue(WideNodeEncoder.scala:35)
        at uk.co.gresearch.spark.dgraph.connector.encoder.WideNodeEncoder.$anonfun$toNode$5(WideNodeEncoder.scala:118)
        at uk.co.gresearch.spark.dgraph.connector.encoder.WideNodeEncoder.$anonfun$toNode$5$adapted(WideNodeEncoder.scala:116)
        at scala.collection.Iterator.foreach(Iterator.scala:941)
        at scala.collection.Iterator.foreach$(Iterator.scala:941)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
        at uk.co.gresearch.spark.dgraph.connector.encoder.WideNodeEncoder.toNode(WideNodeEncoder.scala:116)
        at uk.co.gresearch.spark.dgraph.connector.encoder.WideNodeEncoder.$anonfun$fromJson$1(WideNodeEncoder.scala:92)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:494)
        at uk.co.gresearch.spark.dgraph.connector.TriplePartitionReader.get(TriplePartitionReader.scala:30)
        at uk.co.gresearch.spark.dgraph.connector.TriplePartitionReader.get(TriplePartitionReader.scala:23)
        at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.next(DataSourceRDD.scala:89)
        at org.apache.spark.sql.execution.datasources.v2.MetricsRowIterator.next(DataSourceRDD.scala:124)
        at org.apache.spark.sql.execution.datasources.v2.MetricsRowIterator.next(DataSourceRDD.scala:121)
        at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:40)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:346)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)

My suspicion is that it fails because I have multiple entries in dgraph.type. Could you test this scenario? Or is there something else in the json above that fails to encode? Thanks!

RJKeevil avatar Oct 18 '21 10:10 RJKeevil

You are right, list properties / array values / multiple values per predicate are not supported (#69). This is definitively a required feature. Not sure when I will get back to this so contribution welcome.

EnricoMi avatar Oct 18 '21 13:10 EnricoMi

I'm more than happy to give a PR a go, any pointers you can give me on where in the code is best to begin?

RJKeevil avatar Oct 18 '21 13:10 RJKeevil

Excellent, here is what I would do:

First lets simplify the example above and get rid of the GraphFrames dependency as GraphFrames internally uses the wide node mode:

import uk.co.gresearch.spark.dgraph.connector._

val nodes = spark.read.option(NodesModeOption, NodesModeWideOption).dgraph.nodes("localhost:9080")

Then what we want is that nodes.printSchema() tells us that dgraph.type is an array of strings:

root
 |-- subject: long (nullable = false)
 |-- dgraph.type: array (nullable = false)
 |    |-- element: string (containsNull = true)
 ...

So we expect the DataFrame to contain string arrays as elements in the dgraph.type column.

Multiple values per predicate are indicated by the list attribute in the Dgraph schema. The SchemaProvider fetches the schema from Dgraph. Its current query "schema { predicate type lang }" excludes the list information, so this should read "schema { predicate type lang list }". Then Predicate need to store that list information.

The WideNodeEncoder is central here. In toStructField, a given Predicate is translated into a Spark type. So when list is true, this needs to be ArrayType.

In toNode it turns a JSON object like

{
  "uid": "0x3",
  "name": "Luke Skywalker"
}

into a Seq[Any] = Seq(3, "Luke Skywalker") and eventually into a Spark InternalRow. It uses JsonNodeInternalRowEncoder to parse the individual JSON values into their corresponding Scala types, here long and String, which need to be able to parse lists of values.

This all should be implemented test-driven through TestWideNodeEncoder. Maybe it is also time to refactor SchemaProvider so that the JSON parsing bit in getSchema is decoupled from the DgraphClient bit and can be tested by giving it some test JSON strings. You could also introduce a test for JsonNodeInternalRowEncoder as currently its methods are only tested through the derived classes, not directly. But do it as you prefer.

I hope these pointers help you to get started.

EnricoMi avatar Oct 18 '21 16:10 EnricoMi