hyperspace
hyperspace copied to clipboard
Revisit logical plan serialization
Describe the issue
Hyperspace serializes the logical plan of a dataframe used for creating an index so that the plan can be reused for refreshing the index, etc. The current implementation uses kyro serializer to serialize a logical plan object: https://github.com/microsoft/hyperspace/blob/master/src/main/scala/com/microsoft/hyperspace/index/serde/LogicalPlanSerDeUtils.scala
However, this is not an ideal approach since the serialized bytes are not compatible across Scala versions (2.11 vs. 2.12) and Spark versions (2.4 vs. 3.0). In addition, since those logical plan classes are internal, they could theoretically be changed (or renamed) across patch versions. In other words, compatibility just doesn't work.
I think representing it as JSON instead should serve our purpose and I am open for any other feedback.
Per @sezruby:
I think we can utilize or refer TreeNode.toJSON to create a JSON string for a logical plan. A possible problem is that it serializes LogicalRelation (and maybe other types?) as NULL. (TreeNode.scala) What do you think about this? I guess we need to define a new JSON serializer which also supports Relation and implement a deserializer for it.
Here's a simple output of the function(toJSON):
[
{
"class": "org.apache.spark.sql.execution.datasources.LogicalRelation",
"num-children": 0,
"relation": null,
"output": [
[
{
"class": "org.apache.spark.sql.catalyst.expressions.AttributeReference",
"num-children": 0,
"name": "id",
"dataType": "integer",
"nullable": true,
"metadata": {},
"exprId": {
"product-class": "org.apache.spark.sql.catalyst.expressions.ExprId",
"id": 11,
"jvmId": "f02d5e15-b8cc-4626-9060-49a2949a8ba6"
},
"qualifier": []
}
],
[
{
"class": "org.apache.spark.sql.catalyst.expressions.AttributeReference",
"num-children": 0,
"name": "name",
"dataType": "string",
"nullable": true,
"metadata": {},
"exprId": {
"product-class": "org.apache.spark.sql.catalyst.expressions.ExprId",
"id": 12,
"jvmId": "f02d5e15-b8cc-4626-9060-49a2949a8ba6"
},
"qualifier": []
}
]
],
"isStreaming": false
}
]
@imback82 After some investigation, I think it’s a little bit difficult to support the serialization of complete LogicalPlan
across spark versions seamlessly.
Here’s my analysis based on the current code base:
- deserialized logical plan is used for
- Refresh Action
- IndexSummaryString
- Not used for signature calc
- At
CreateActionBase
,IndexLogEntry
keeps bothserialized plan
andLogicalPlanFingerprint
fromLogicalPlanSignatureProvider
. - Current
LogicalPlanSignatureProvider
( =IndexSignatureProvide
(FileBasedSignatureProvider
+PlanSignatureProvider
)- When validating signature, only compare plan's signature and index's fingerprint signature
- At
For IndexSummaryString,
I just commented out the summary string and just put "index plan summary"
and then was able to read the indexes of Scala11/Spark2.4.6 with Scala12/Spark3 spark-shell:

The first index name "index"
was created with Scala11/Spark2.4.6 (copied "index" directory into ../SparkWarehouse/Indexes/.). So if we need the plan string for IndexSummary then we just could keep the string in IndexLogEntry.
For Refresh Action,
This is the error that I met when trying to deserialize the index of spark2.4:
00:00 WARN: [kryo] Unable to load class org.apache.spark.sql.catalyst.trees.Origin with kryo's ClassLoader. Retrying with current..
com.esotericsoftware.kryo.KryoException: Unable to find class: ????????org.apache.spark.sql.catalyst.trees.Origin
Serialization trace:
catalogTable (com.microsoft.hyperspace.index.serde.package$LogicalRelationWrapper)
at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:160)
at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:133)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:693)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:118)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
at com.microsoft.hyperspace.index.serde.KryoSerDeUtils$.deserialize(KryoSerDeUtils.scala:60)
at com.microsoft.hyperspace.index.serde.LogicalPlanSerDeUtils$.deserialize(LogicalPlanSerDeUtils.scala:69)
at com.microsoft.hyperspace.index.IndexLogEntry.plan(IndexLogEntry.scala:100)
at com.microsoft.hyperspace.index.IndexSummary$.apply(IndexCollectionManager.scala:184)
at com.microsoft.hyperspace.index.IndexCollectionManager.$anonfun$indexes$2(IndexCollectionManager.scala:90)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at scala.collection.TraversableLike.map(TraversableLike.scala:238)
at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at com.microsoft.hyperspace.index.IndexCollectionManager.indexes(IndexCollectionManager.scala:90)
at com.microsoft.hyperspace.Hyperspace.indexes(Hyperspace.scala:32)
... 51 elided
Caused by: java.lang.ClassNotFoundException: ????????org.apache.spark.sql.catalyst.trees.Origin
at scala.reflect.internal.util.AbstractFileClassLoader.findClass(AbstractFileClassLoader.scala:72)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:154)
... 70 more
I browsed spark2.4.6 and 3.0.0 and there're some differences in class structures. So, in order to support this we need 1) full understanding of plan structure in 2.4.6 and 3.0.0, 2) building transformation map 3) and it should be updated for further spark updates. All in all, I think it’s better not to support “refresh” index across different versions of spark and I guess not so many users will try to reuse indexes in that way.
Otherwise, we can try to serialize/deserialize LogicalRelation / HadoopFsRelation only in json format as we support to create an index only with LogicalRelation node. Still it doesn’t work with Kryo Serializer as it's only designed for a single spark version:
scala> hs.refreshIndex("testindex")
com.esotericsoftware.kryo.KryoException: java.lang.ArrayIndexOutOfBoundsException
Serialization trace:
catalogTable (com.microsoft.hyperspace.index.serde.package$LogicalRelationWrapper2)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:144)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
at com.microsoft.hyperspace.index.serde.KryoSerDeUtils$.deserialize(KryoSerDeUtils.scala:60)
at com.microsoft.hyperspace.index.serde.LogicalPlanSerDeUtils$.deserialize(LogicalPlanSerDeUtils.scala:71)
at com.microsoft.hyperspace.actions.RefreshAction.df$lzycompute(RefreshAction.scala:48)
at com.microsoft.hyperspace.actions.RefreshAction.df(RefreshAction.scala:46)
at com.microsoft.hyperspace.actions.RefreshAction.logEntry$lzycompute(RefreshAction.scala:59)
at com.microsoft.hyperspace.actions.RefreshAction.logEntry(RefreshAction.scala:58)
at com.microsoft.hyperspace.actions.RefreshAction.event(RefreshAction.scala:81)
at com.microsoft.hyperspace.actions.Action.run(Action.scala:98)
at com.microsoft.hyperspace.actions.Action.run$(Action.scala:83)
at com.microsoft.hyperspace.actions.RefreshAction.run(RefreshAction.scala:31)
at com.microsoft.hyperspace.index.IndexCollectionManager.$anonfun$refresh$1(IndexCollectionManager.scala:75)
at com.microsoft.hyperspace.index.IndexCollectionManager.$anonfun$refresh$1$adapted(IndexCollectionManager.scala:72)
at com.microsoft.hyperspace.index.IndexCollectionManager.withLogManager(IndexCollectionManager.scala:131)
at com.microsoft.hyperspace.index.IndexCollectionManager.refresh(IndexCollectionManager.scala:72)
at com.microsoft.hyperspace.index.CachingIndexCollectionManager.refresh(CachingIndexCollectionManager.scala:97)
at com.microsoft.hyperspace.Hyperspace.refreshIndex(Hyperspace.scala:77)
... 51 elided
Caused by: java.lang.ArrayIndexOutOfBoundsException
And this could be a draft for serialize/deserialize HadoopFsRelation in json.
LogicalRelation(
HadoopFsRelation(
new InMemoryFileIndex(
spark,
location.rootPathStrings.map(path => new Path(path)), // Seq[String]
Map(),
None),
partitionSchema, // StructType - support (json, fromJson)
dataSchema, // StructType
bucketSpec, // INT, SEQ[STRING]
fileFormat, // csv, json, parquet, orc as string?
options)(spark), // map<string,string>
output, // name, type, nullable
catalogTable, // TODO?
isStreaming) // boolean
In summary,
- Option1) Not support refresh function across different versions of Spark
- Keep the spark version at index creation
- index summary string fix
- Option2) Try to serialize/deserialize HadoopFsRelation only
- Json format design
- Compatible serializer & deserializer for each spark version
- Any other ideas?
What do you think about this??
Option1) Not support refresh function across different versions of Spark
- Keep the spark version at index creation
- index summary string fix
If we do not support, I think we should keep both Spark / Scala version since Spark 2.4 supports both Scala 2.11 and 2.12?
Also, if a logical plan is moved (from a.b.c
to x.y.c
) between patch versions (Spark 3.0.0 and 3.0.1), does this mean we cannot support refresh?
All in all, I think it’s better not to support “refresh” index across different versions of spark and I guess not so many users will try to reuse indexes in that way.
@rapoth should have more insight into this since he talked to few customers. So a simple question here is if a customer will ever move from 2.4 to 3.0.
Option2) Try to serialize/deserialize HadoopFsRelation only
- Json format design
- Compatible serializer & deserializer for each spark version
Currently, we look at only the LogicalRelation
, but there is a work going on to be able to index "any" dataframe, a reason for #77.
If we do not support, I think we should keep both Spark / Scala version since Spark 2.4 supports both Scala 2.11 and 2.12?
Yes, Scala version also should be helpful.
Also, if a logical plan is moved (from
a.b.c
tox.y.c
) between patch versions (Spark 3.0.0 and 3.0.1), does this mean we cannot support refresh?
It might not just be moved, but could be removed. So I think class based deserialization is barely possible; we might have to reconstruct the plan with high level APIs somehow.
It might not just be moved, but could be removed. So I think class based deserialization is barely possible; we might have to reconstruct the plan with high level APIs somehow.
Yes, we need our own abstraction. Btw, we don't have to support every logical plan in the first iteration. We can just have minimum to get started (but a reasonable design to add new logical plans easily).
As I see this, there are two routes we can take:
-
Support index creation ONLY through SparkSQL - If this is the case, we can simply store the SQL and not worry about ser/de. This will limit the functionality in that users cannot take arbitrary Scala code and create indexes/views.
-
Introduce our own intemediate representation (IR) - With Spark evolving so fast, I think it is unreasonable for us to tie the index to a specific Spark version due to the costs involved in creating the index - if we tell users to rebuild the index if they upgrade Spark version, it may not be a good user experience. Also, the bigger question is: how will they rebuild? Do we expect them to remember the index creation code?
We might want to consider doing this in phases:
- Move to our own ser/de framework
- Come up with simple IR that will work for the simplest use case we have today: covering indexes (which is constrained to HDFS-compliant data sources only)
- Constrained support for indexed dataframes (for instance, we only start by supporting indexed views with filters and then move onto aggregations and joins etc.)
Being incremental will also allow us to figure out a flexible IR in the process.
Thoughts?
Is this still the case? I can't find where the Spark LogicalPlan is serialized and stored with indexes.
Seems to be a fixed issue since v0.2.
@clee704 No, this issue is about the source plan. Currently, we only allow "Relation" type of logical plan, and it's only one relation. However, similar to "view" in db, a query plan can be supported - to do so, we need to find a way how to de/serialize the plan for refreshing index and also need to match a query plan & the index source plan in optimizer - to make sure we can replace that plan with the index.
Previously it's done by class level serialization, but it's not compatible between spark versions - since the logical plan classes are changed - class name, type, or replaced with a new class. We could support the feature using "sql string" (but this has some limitations) or devise some intermediate format for Hyperspace.
#305 has the same issue with #186 because of this.
I thought this was fixed in v0.2 because of the following lines in the release note:
In order to better support compatibility across Scala/Spark versions going forward, the team has decided to stop serializing logical plans with KyroSerializer and store the minimum info to reconstruct the original relation(#99). Thus, the indexes generated with v0.1.0 are not compatible with v0.2.0 and need to be reconstructed.
Shouldn't supporting a general query plan, for #186 and #305, a separate issue? Because the main concern of the issue seems to be about incompatibility due to the use of a binary format.
I see that the SerDe related code has been removed since https://github.com/microsoft/hyperspace/pull/325.
I would suggest to close this issue and create a new one with a better description of the issue that we want to fix.
I see more like a proposal than a bug so maybe is better to have it as a proposal. WDYT?
Yea, because of the compatibility issue, I replaced plan de/serialization with "Relation" approach in v0.2.0. Unused utility code is removed by #325.
As there's no feasible solution for plan de/serialization problem yet, let's keep this issue open.
@sezruby Ok. Then let's make a better description for the issue. You know the history of it but new contributors do not know it. I would like to have it better explained. In the description is a reference to a 404
file that has been removed 2 versions ago. Let's update de issue description with the history of it all. Thanks!
We don't have any design yet about how we should support arbitrary plan indexing, and the issue (as @sezruby described, not the original description) seems to be a trivial part of that design.
The description is inaccurate or incomplete. I already give an example that the link in the description (https://github.com/microsoft/hyperspace/blob/master/src/main/scala/com/microsoft/hyperspace/index/serde/LogicalPlanSerDeUtils.scala) ends up with a 404
.
I you want and plan for this to grow and have a bigger community it is very helpful to put some more effort when creating, updating tickets by providing context.
For example, I don't understand - because I don't have the full context and don't know the history of this ticket - why this is still a problem, or what it is and why it needs to be fixed or improved. The code referring to is removed, there is no Kyro serialisation anymore, the Spark version incompatibility has been removed when switching to Relation approach.
From @sezruby's comment I guess that the Relation approach is not covering all the use cases. Am I right? What is the issue with Relation approach?
@rapoth, @imback82 WDYT?
Relation approach is not covering an arbitrary query plan (e.g. with filter conditions or .. etc) = index view. I'll update the description when I have the time.