hyperspace icon indicating copy to clipboard operation
hyperspace copied to clipboard

[FEATURE REQUEST]: Hyperspace indexing on s3 based data.

Open nikhilsimha opened this issue 4 years ago • 10 comments

Feature requested

As a developer at Airbnb, I want to be able to index over s3 based scan nodes, in order to create hyperspace indexes.

I am not entirely sure that s3 is the reason why I see the following error.

Exception in thread "main" com.microsoft.hyperspace.HyperspaceException: Only creating index over HDFS file based scan nodes is supported.
        at com.microsoft.hyperspace.actions.CreateAction.validate(CreateAction.scala:47)
        at com.microsoft.hyperspace.actions.Action$class.run(Action.scala:88)
        at com.microsoft.hyperspace.actions.CreateAction.run(CreateAction.scala:30)
        at com.microsoft.hyperspace.index.IndexCollectionManager.create(IndexCollectionManager.scala:43)
        at com.microsoft.hyperspace.index.CachingIndexCollectionManager.create(CachingIndexCollectionManager.scala:77)
        at com.microsoft.hyperspace.Hyperspace.createIndex(Hyperspace.scala:42) 

I am not sure because, looking at the code that throws the exception, I don't see anything specific to s3 in the isLogicalRelation function.

    // CreateAction.scala:47
    // We currently only support createIndex() over HDFS file based scan nodes.
    if (!LogicalPlanUtils.isLogicalRelation(df.queryExecution.optimizedPlan)) {
      throw HyperspaceException(
        "Only creating index over HDFS file based scan nodes is supported.")
    }
    
    // the filter itself.
    def isLogicalRelation(logicalPlan: LogicalPlan): Boolean = {
    logicalPlan match {
      case _: LogicalRelation => true
      case _ => false
    }
  }

Acceptance criteria

  • [ ] CreateIndex is tested on s3 - I can help
  • [ ] spark version support needs to be 2.4

nikhilsimha avatar Feb 16 '21 17:02 nikhilsimha

@nikhilsimha Could you share how you constructed the DataFrame that you passed to createIndex?

imback82 avatar Feb 16 '21 17:02 imback82

Wow, thanks for the quick reply! Here are the relevant code snippets:

    import com.microsoft.hyperspace._
    import com.microsoft.hyperspace.index._
    val hyperspace = new Hyperspace(leftDf.sparkSession)

    val (leftTagged, additionalCols) = if (leftDf.schema.names.contains(Constants.TimeColumn)) {
      leftDf.withTimestampBasedPartition(Constants.TimePartitionColumn) ->
        Seq(Constants.TimeColumn, Constants.TimePartitionColumn)
    } else {
      leftDf -> Seq.empty[String]
    }

    if (enableHyperspace) {
      tableUtils.sparkSession.enableHyperspace()
      val leftKeys = joinConf.leftKeyCols ++ Seq(Constants.PartitionColumn) ++ additionalCols
      val leftIndexConf = IndexConfig(
        s"${joinConf.metaData.cleanName}_left_index",
        leftKeys,
        leftTagged.schema.names.filterNot(leftKeys.contains)
      )
      hyperspace.createIndex(leftTagged, leftIndexConf)
    }

The leftDf comes from a simple scanQuery of a hive table(T) based on s3 - select * from table T where ds > 'x' and ds < 'y'.

nikhilsimha avatar Feb 16 '21 18:02 nikhilsimha

Oh, you cannot have a filter in the dataframe from which you are trying to create an index. You should be able to create an index if leftDf is simply spark.table("table").

We are tracking to support materialized view here: https://github.com/microsoft/hyperspace/issues/186

imback82 avatar Feb 16 '21 18:02 imback82

I see. That makes sense. This is not an s3 issue then? If I restructure my code to save the relation to a table and execute the join, it should work?

nikhilsimha avatar Feb 16 '21 18:02 nikhilsimha

I don't think this is s3 related issue. As long as you create an index from a relation (no filter, etc.), it should work. Please let me know if you encounter any other issues.

imback82 avatar Feb 16 '21 18:02 imback82

I was facing same issue. I changed the df preparation to reading the relation without any filter. df preparation and index creation steps:

df = spark.table("table")
hs.createIndex(df, IndexConfig("index", ["id"], ["name"]))

Now I am getting following error when I try to run hs.createIndex:

py4j.protocol.Py4JJavaError: An error occurred while calling o71.createIndex.
: java.lang.NullPointerException
	at com.microsoft.hyperspace.index.sources.FileBasedSourceProviderManager.run(FileBasedSourceProviderManager.scala:156)
	at com.microsoft.hyperspace.index.sources.FileBasedSourceProviderManager.signature(FileBasedSourceProviderManager.scala:91)
	at com.microsoft.hyperspace.index.FileBasedSignatureProvider.$anonfun$fingerprintVisitor$1(FileBasedSignatureProvider.scala:53)
	at com.microsoft.hyperspace.index.FileBasedSignatureProvider.$anonfun$fingerprintVisitor$1$adapted(FileBasedSignatureProvider.scala:51)
	at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:177)
	at com.microsoft.hyperspace.index.FileBasedSignatureProvider.fingerprintVisitor(FileBasedSignatureProvider.scala:51)
	at com.microsoft.hyperspace.index.FileBasedSignatureProvider.signature(FileBasedSignatureProvider.scala:40)
	at com.microsoft.hyperspace.index.IndexSignatureProvider.signature(IndexSignatureProvider.scala:45)
	at com.microsoft.hyperspace.actions.CreateActionBase.getIndexLogEntry(CreateActionBase.scala:64)
	at com.microsoft.hyperspace.actions.CreateAction.logEntry(CreateAction.scala:38)
	at com.microsoft.hyperspace.actions.Action.begin(Action.scala:50)
	at com.microsoft.hyperspace.actions.Action.run(Action.scala:90)
	at com.microsoft.hyperspace.actions.Action.run$(Action.scala:83)
	at com.microsoft.hyperspace.actions.CreateAction.run(CreateAction.scala:30)
	at com.microsoft.hyperspace.index.IndexCollectionManager.create(IndexCollectionManager.scala:47)
	at com.microsoft.hyperspace.index.CachingIndexCollectionManager.create(CachingIndexCollectionManager.scala:78)
	at com.microsoft.hyperspace.Hyperspace.createIndex(Hyperspace.scala:43)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:564)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:832)

I am using : python: 3.8 spark 3.0.1

ashutosh-hs avatar Feb 19 '21 10:02 ashutosh-hs

@ashutosh-hs Hyperspace doesn't officially support Spark 3 yet (#85). Do you see the same error in Spark 2.4?

In any case, could you run df.explain and copy/paste the output here? (I am thinking depending on the catalog, the relation may be an unsupported one).

imback82 avatar Feb 19 '21 16:02 imback82

@imback82 I couldn't test it with Spark 2.4 yet. Will post it here when I have tested that. following is the output of df.explain(extended=True):

+-Relation[resourceType#395,id#396,meta#397,implicitRules#398,language#399,text#400,contained#401,extension#402,modifierExtension#403,identifier#404,active#405,name#406,telecom#407,gender#408,birthDate#409,deceasedBoolean#410,deceasedDatetime#411,address#412,maritalStatus#413,multipleBirthBoolean#414,multipleBirthInteger#415,photo#416,contact#417,communication#418,... 7 more fields] parquet

== Optimized Logical Plan ==
Relation[resourceType#395,id#396,meta#397,implicitRules#398,language#399,text#400,contained#401,extension#402,modifierExtension#403,identifier#404,active#405,name#406,telecom#407,gender#408,birthDate#409,deceasedBoolean#410,deceasedDatetime#411,address#412,maritalStatus#413,multipleBirthBoolean#414,multipleBirthInteger#415,photo#416,contact#417,communication#418,... 7 more fields] parquet

== Physical Plan ==
FileScan parquet default.patient[resourceType#395,id#396,meta#397,implicitRules#398,language#399,text#400,contained#401,extension#402,modifierExtension#403,identifier#404,active#405,name#406,telecom#407,gender#408,birthDate#409,deceasedBoolean#410,deceasedDatetime#411,address#412,maritalStatus#413,multipleBirthBoolean#414,multipleBirthInteger#415,photo#416,contact#417,communication#418,... 7 more fields] Batched: false, DataFilters: [], Format: Parquet, Location: TahoeLogFileIndex[s3a://demo1/warehouse/patient], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<resourceType:string,id:string,meta:struct<id:string,extension:array<string>,versionId:stri...
>>> df.explain('cost')
== Optimized Logical Plan ==
Relation[resourceType#395,id#396,meta#397,implicitRules#398,language#399,text#400,contained#401,extension#402,modifierExtension#403,identifier#404,active#405,name#406,telecom#407,gender#408,birthDate#409,deceasedBoolean#410,deceasedDatetime#411,address#412,maritalStatus#413,multipleBirthBoolean#414,multipleBirthInteger#415,photo#416,contact#417,communication#418,... 7 more fields] parquet, Statistics(sizeInBytes=76.2 MiB)

== Physical Plan ==
FileScan parquet default.patient[resourceType#395,id#396,meta#397,implicitRules#398,language#399,text#400,contained#401,extension#402,modifierExtension#403,identifier#404,active#405,name#406,telecom#407,gender#408,birthDate#409,deceasedBoolean#410,deceasedDatetime#411,address#412,maritalStatus#413,multipleBirthBoolean#414,multipleBirthInteger#415,photo#416,contact#417,communication#418,... 7 more fields] Batched: false, DataFilters: [], Format: Parquet, Location: TahoeLogFileIndex[s3a://demo1/warehouse/patient], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<resourceType:string,id:string,meta:struct<id:string,extension:array<string>,versionId:stri...

I am using : python: 3.8 spark 3.0.1 hyperspace: 0.4

ashutosh-hs avatar Feb 23 '21 13:02 ashutosh-hs

@imback82 Did you figure out the solution? I'm having same error even if the DataFrame is created from tableName directly. I'm using latest code with Spark 3.1.2.

dai-chen avatar Feb 10 '22 00:02 dai-chen

@imback82 Did you figure out the solution? I'm having same error even if the DataFrame is created from tableName directly. I'm using latest code with Spark 3.1.2.

Yes I am having the same issue using Spark 3.1.1 - Just wondering if this is a spark version problem and there is any solution to this?

edjones84 avatar Jul 21 '22 12:07 edjones84