frameless
frameless copied to clipboard
Parquet predicate pushdown doesn't seem to be working with timestamps
Vanilla Spark:
val df: DataFrame = ???
val filtered = df.filter(df("value") <= new Timestamp(1512558961000000L))
filtered.explain(true);
Both, IsNotNull
and GreaterThanOrEqual
get pushed down nicely.
Frameless:
case class Foo(value: SQLTimestamp)
val ds: TypedDataset[Foo] = ???
val filtered = ds.filter(ds('value) >= SQLTimestamp(1512558961000000L))
filtered.explain(true)
IsNotNull
gets pushed down, while GreaterThanOrEqual
doesn't.
@kujon thanks for opening the ticket!
Notes on comparing encoders of Timestamps using Frameless and vanilla Spark:
TypedExpressionEncoder.apply[SQLTimestamp].schema
> org.apache.spark.sql.types.StructType = StructType(StructField(_1,TimestampType,false))
whereas
org.apache.spark.sql.Encoders.TIMESTAMP.schema
org.apache.spark.sql.types.StructType = StructType(StructField(value,TimestampType,true))
One difference is that they generate schema with different nullable flags.
Changing the encoder to return the same schema, didn't change the behavior.
...
Might be a FramelessLit
issue. Investigating.
Looking at the Logical Plan:
Vanilla has:
+- *(1) Filter (isnotnull(value#17) && (value#17 >= 1000))
Frameless has:
*(1) Filter (isnotnull(value#1) && (value#1 >= FramelessLit(SQLTimestamp(1512558961000000))))
If the literal is encoded as FramelessLit, then Parquet will have no idea on how to compare that and hence the predicate is never pushed down.
Verifying that this is a FramelessLit issue by comparing two columns that don't involve literals. --> Does't make sense since predicate push down doesn't show when two columns are compared. Same for both Frameless and Vanilla.
Checked that the predicate push down works for Long and other constants. This seem to be an issue only with SQLTimestamp for now.
Conclusion (as of 01/21/2019): The parquet predicate push down will only work for specific literals. Better if we stick to a java.sql.Timestamp representation for now (?)
I tried to replace the SQLTimestamp with an encoder that operates on java.sql.Timestamp
to make it closer to the encoding of native types. The predicate was pushed down as expected in this case.
As long as in this code here: https://github.com/typelevel/frameless/blob/0d52c03c6a485147c8e5b6745be7dcaed0b1119d/dataset/src/main/scala/frameless/functions/package.scala#L27-L32 we don't go into the FramelessLit
path and instead generate a regular Spark Literal, then the predicate works. The new encoder didn't work for serde at this point, so we can't really replace what we have with this temp workaround, but at least it increased my confidence that the predicate push down fails when it encounters a FramelessLit, so I will be focusing more on understanding the why.
The reason for this not working with FramelessLit is that spark cannot recognise it as a Literal so it won't get past this code: which only pushes down Literals
Possibly the only way to remediate this is to have a plan substitute the above code, I may attempt this, but it probably requires a full blown spark.sql.extensions plugin so not exactly transparent (e.g. it and all of frameless with dependencies has to be on the spark cluster classpath).
Yea, we could add an optimizer rule, it should not be hard.
Indeed it was not, as it's so simple it even works with session.sqlContext.experimental.extraOptimizations #721 raised for it but it comes with extra baggage to enable windows local dev without winutils.
fyi / NB for searchers - 3.2.4 cannot work with push down predicates for anything that uses InvokeLike (as SQLTimestamp, structs etc. does) Jira's SPARK-40380 and SPARK-39106 aren't present in them. Frameless built against spark versions greater than 3.3.2 will push down the predicates.