hyperspace
hyperspace copied to clipboard
[FEATURE REQUEST]: Hyperspace indexing on s3 based data.
Feature requested
As a developer at Airbnb, I want to be able to index over s3 based scan nodes, in order to create hyperspace indexes.
I am not entirely sure that s3 is the reason why I see the following error.
Exception in thread "main" com.microsoft.hyperspace.HyperspaceException: Only creating index over HDFS file based scan nodes is supported.
at com.microsoft.hyperspace.actions.CreateAction.validate(CreateAction.scala:47)
at com.microsoft.hyperspace.actions.Action$class.run(Action.scala:88)
at com.microsoft.hyperspace.actions.CreateAction.run(CreateAction.scala:30)
at com.microsoft.hyperspace.index.IndexCollectionManager.create(IndexCollectionManager.scala:43)
at com.microsoft.hyperspace.index.CachingIndexCollectionManager.create(CachingIndexCollectionManager.scala:77)
at com.microsoft.hyperspace.Hyperspace.createIndex(Hyperspace.scala:42)
I am not sure because, looking at the code that throws the exception, I don't see anything specific to s3 in the isLogicalRelation
function.
// CreateAction.scala:47
// We currently only support createIndex() over HDFS file based scan nodes.
if (!LogicalPlanUtils.isLogicalRelation(df.queryExecution.optimizedPlan)) {
throw HyperspaceException(
"Only creating index over HDFS file based scan nodes is supported.")
}
// the filter itself.
def isLogicalRelation(logicalPlan: LogicalPlan): Boolean = {
logicalPlan match {
case _: LogicalRelation => true
case _ => false
}
}
Acceptance criteria
- [ ] CreateIndex is tested on s3 - I can help
- [ ] spark version support needs to be 2.4
@nikhilsimha Could you share how you constructed the DataFrame
that you passed to createIndex
?
Wow, thanks for the quick reply! Here are the relevant code snippets:
import com.microsoft.hyperspace._
import com.microsoft.hyperspace.index._
val hyperspace = new Hyperspace(leftDf.sparkSession)
val (leftTagged, additionalCols) = if (leftDf.schema.names.contains(Constants.TimeColumn)) {
leftDf.withTimestampBasedPartition(Constants.TimePartitionColumn) ->
Seq(Constants.TimeColumn, Constants.TimePartitionColumn)
} else {
leftDf -> Seq.empty[String]
}
if (enableHyperspace) {
tableUtils.sparkSession.enableHyperspace()
val leftKeys = joinConf.leftKeyCols ++ Seq(Constants.PartitionColumn) ++ additionalCols
val leftIndexConf = IndexConfig(
s"${joinConf.metaData.cleanName}_left_index",
leftKeys,
leftTagged.schema.names.filterNot(leftKeys.contains)
)
hyperspace.createIndex(leftTagged, leftIndexConf)
}
The leftDf
comes from a simple scanQuery of a hive table(T) based on s3 - select * from table T where ds > 'x' and ds < 'y'
.
Oh, you cannot have a filter in the dataframe from which you are trying to create an index. You should be able to create an index if leftDf
is simply spark.table("table")
.
We are tracking to support materialized view here: https://github.com/microsoft/hyperspace/issues/186
I see. That makes sense. This is not an s3 issue then? If I restructure my code to save the relation to a table and execute the join, it should work?
I don't think this is s3 related issue. As long as you create an index from a relation (no filter, etc.), it should work. Please let me know if you encounter any other issues.
I was facing same issue. I changed the df preparation to reading the relation without any filter. df preparation and index creation steps:
df = spark.table("table")
hs.createIndex(df, IndexConfig("index", ["id"], ["name"]))
Now I am getting following error when I try to run hs.createIndex:
py4j.protocol.Py4JJavaError: An error occurred while calling o71.createIndex.
: java.lang.NullPointerException
at com.microsoft.hyperspace.index.sources.FileBasedSourceProviderManager.run(FileBasedSourceProviderManager.scala:156)
at com.microsoft.hyperspace.index.sources.FileBasedSourceProviderManager.signature(FileBasedSourceProviderManager.scala:91)
at com.microsoft.hyperspace.index.FileBasedSignatureProvider.$anonfun$fingerprintVisitor$1(FileBasedSignatureProvider.scala:53)
at com.microsoft.hyperspace.index.FileBasedSignatureProvider.$anonfun$fingerprintVisitor$1$adapted(FileBasedSignatureProvider.scala:51)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:177)
at com.microsoft.hyperspace.index.FileBasedSignatureProvider.fingerprintVisitor(FileBasedSignatureProvider.scala:51)
at com.microsoft.hyperspace.index.FileBasedSignatureProvider.signature(FileBasedSignatureProvider.scala:40)
at com.microsoft.hyperspace.index.IndexSignatureProvider.signature(IndexSignatureProvider.scala:45)
at com.microsoft.hyperspace.actions.CreateActionBase.getIndexLogEntry(CreateActionBase.scala:64)
at com.microsoft.hyperspace.actions.CreateAction.logEntry(CreateAction.scala:38)
at com.microsoft.hyperspace.actions.Action.begin(Action.scala:50)
at com.microsoft.hyperspace.actions.Action.run(Action.scala:90)
at com.microsoft.hyperspace.actions.Action.run$(Action.scala:83)
at com.microsoft.hyperspace.actions.CreateAction.run(CreateAction.scala:30)
at com.microsoft.hyperspace.index.IndexCollectionManager.create(IndexCollectionManager.scala:47)
at com.microsoft.hyperspace.index.CachingIndexCollectionManager.create(CachingIndexCollectionManager.scala:78)
at com.microsoft.hyperspace.Hyperspace.createIndex(Hyperspace.scala:43)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:564)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:832)
I am using : python: 3.8 spark 3.0.1
@ashutosh-hs Hyperspace doesn't officially support Spark 3 yet (#85). Do you see the same error in Spark 2.4?
In any case, could you run df.explain
and copy/paste the output here? (I am thinking depending on the catalog, the relation may be an unsupported one).
@imback82 I couldn't test it with Spark 2.4 yet. Will post it here when I have tested that. following is the output of df.explain(extended=True):
+-Relation[resourceType#395,id#396,meta#397,implicitRules#398,language#399,text#400,contained#401,extension#402,modifierExtension#403,identifier#404,active#405,name#406,telecom#407,gender#408,birthDate#409,deceasedBoolean#410,deceasedDatetime#411,address#412,maritalStatus#413,multipleBirthBoolean#414,multipleBirthInteger#415,photo#416,contact#417,communication#418,... 7 more fields] parquet
== Optimized Logical Plan ==
Relation[resourceType#395,id#396,meta#397,implicitRules#398,language#399,text#400,contained#401,extension#402,modifierExtension#403,identifier#404,active#405,name#406,telecom#407,gender#408,birthDate#409,deceasedBoolean#410,deceasedDatetime#411,address#412,maritalStatus#413,multipleBirthBoolean#414,multipleBirthInteger#415,photo#416,contact#417,communication#418,... 7 more fields] parquet
== Physical Plan ==
FileScan parquet default.patient[resourceType#395,id#396,meta#397,implicitRules#398,language#399,text#400,contained#401,extension#402,modifierExtension#403,identifier#404,active#405,name#406,telecom#407,gender#408,birthDate#409,deceasedBoolean#410,deceasedDatetime#411,address#412,maritalStatus#413,multipleBirthBoolean#414,multipleBirthInteger#415,photo#416,contact#417,communication#418,... 7 more fields] Batched: false, DataFilters: [], Format: Parquet, Location: TahoeLogFileIndex[s3a://demo1/warehouse/patient], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<resourceType:string,id:string,meta:struct<id:string,extension:array<string>,versionId:stri...
>>> df.explain('cost')
== Optimized Logical Plan ==
Relation[resourceType#395,id#396,meta#397,implicitRules#398,language#399,text#400,contained#401,extension#402,modifierExtension#403,identifier#404,active#405,name#406,telecom#407,gender#408,birthDate#409,deceasedBoolean#410,deceasedDatetime#411,address#412,maritalStatus#413,multipleBirthBoolean#414,multipleBirthInteger#415,photo#416,contact#417,communication#418,... 7 more fields] parquet, Statistics(sizeInBytes=76.2 MiB)
== Physical Plan ==
FileScan parquet default.patient[resourceType#395,id#396,meta#397,implicitRules#398,language#399,text#400,contained#401,extension#402,modifierExtension#403,identifier#404,active#405,name#406,telecom#407,gender#408,birthDate#409,deceasedBoolean#410,deceasedDatetime#411,address#412,maritalStatus#413,multipleBirthBoolean#414,multipleBirthInteger#415,photo#416,contact#417,communication#418,... 7 more fields] Batched: false, DataFilters: [], Format: Parquet, Location: TahoeLogFileIndex[s3a://demo1/warehouse/patient], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<resourceType:string,id:string,meta:struct<id:string,extension:array<string>,versionId:stri...
I am using : python: 3.8 spark 3.0.1 hyperspace: 0.4
@imback82 Did you figure out the solution? I'm having same error even if the DataFrame is created from tableName directly. I'm using latest code with Spark 3.1.2.
@imback82 Did you figure out the solution? I'm having same error even if the DataFrame is created from tableName directly. I'm using latest code with Spark 3.1.2.
Yes I am having the same issue using Spark 3.1.1 - Just wondering if this is a spark version problem and there is any solution to this?