Support the null value in bloom_filter_agg Spark aggregate function
Currently, the velox BloomFilterAggregate checks the input row and throws an exception if there are some null values in the row. So we need to be consistent with spark's behavior and ignore null values.
The spark BloomFilterAggregate will Ignore null values. https://github.com/apache/spark/blob/6cdca10f148433664b3e2be6f655b0ddba817537/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/BloomFilterAggregate.scala#L180-L188
override def update(buffer: BloomFilter, inputRow: InternalRow): BloomFilter = {
val value = child.eval(inputRow)
// Ignore null values.
if (value == null) {
return buffer
}
updater.update(buffer, value)
buffer
}
The unittest:
class VeloxBloomFilteAggregateFunctionsSuite extends VeloxWholeStageTransformerSuite {
val funcId_bloom_filter_agg = new FunctionIdentifier("bloom_filter_agg")
override protected val backend: String = "velox"
override protected val resourcePath: String = "/tpch-data-parquet-velox"
override protected val fileFormat: String = "parquet"
val table = "bloomTable"
protected def registerFunAndcCreatTable(): Unit ={
val funcId_bloom_filter_agg = new FunctionIdentifier("bloom_filter_agg")
// Register 'bloom_filter_agg'
spark.sessionState.functionRegistry.registerFunction(funcId_bloom_filter_agg,
new ExpressionInfo(classOf[BloomFilterAggregate].getName, "bloom_filter_agg"),
(children: Seq[Expression]) => children.size match {
case 1 => new BloomFilterAggregate(children.head)
case 2 => new BloomFilterAggregate(children.head, children(1))
case 3 => new BloomFilterAggregate(children.head, children(1), children(2))
})
val schema2 = new StructType().add("a2", IntegerType, nullable = true)
.add("b2", LongType, nullable = true)
.add("c2", IntegerType, nullable = true)
.add("d2", IntegerType, nullable = true)
.add("e2", IntegerType, nullable = true)
.add("f2", IntegerType, nullable = true)
val data2 = Seq(Seq(67, 17L, 45, 91, null, null),
Seq(98, 63L, 0, 89, null, 40),
Seq(null, null, 68, 75, 20, 19))
val rdd2 = spark.sparkContext.parallelize(data2)
val rddRow2 = rdd2.map(s => Row.fromSeq(s))
spark.createDataFrame(rddRow2, schema2).write.saveAsTable(table)
}
protected def dropFunctionAndTable(): Unit ={
spark.sessionState.functionRegistry.dropFunction(funcId_bloom_filter_agg)
spark.sql(s"DROP TABLE IF EXISTS $table")
}
override def beforeAll(): Unit = {
super.beforeAll()
registerFunAndcCreatTable()
}
override def afterAll(): Unit = {
dropFunctionAndTable()
super.afterAll()
}
test("Test bloom_filter_agg with Nulls input") {
spark.sql(
s"""
SELECT bloom_filter_agg(b2) from $table
""".stripMargin).show
}
Deploy Preview for meta-velox ready!
| Name | Link |
|---|---|
| Latest commit | 709c836a657925e9dd0ae7e635f9c7160ce73c81 |
| Latest deploy log | https://app.netlify.com/sites/meta-velox/deploys/6638cb08f2c2d50008c3be11 |
| Deploy Preview | https://deploy-preview-7872--meta-velox.netlify.app |
| Preview on mobile | Toggle QR Code...Use your smartphone camera to open QR code link. |
To edit notification comments on pull requests, go to your Netlify site configuration.
Hi @weixiuli!
Thank you for your pull request and welcome to our community.
Action Required
In order to merge any pull request (code, docs, etc.), we require contributors to sign our Contributor License Agreement, and we don't seem to have one on file for you.
Process
In order for us to review and merge your suggested changes, please sign at https://code.facebook.com/cla. If you are contributing on behalf of someone else (eg your employer), the individual CLA may not be sufficient and your employer may need to sign the corporate CLA.
Once the CLA is signed, our tooling will perform checks and validations. Afterwards, the pull request will be tagged with CLA signed. The tagging process may take up to 1 hour after signing. Please give it that time before contacting us about it.
If you have received this in error or have any questions, please contact us at [email protected]. Thanks!
Thank you for signing our Contributor License Agreement. We can now accept your code for this (and any) Meta Open Source project. Thanks!
ping @marin-ma @jinchengchenghh Kindly reivew, thanks.
The bloom_filter_agg is opensource Spark internal function, all the first argument of it is wrapped by xxhash, so it won't be null.
But for bloom_filter_agg itself, it can accept null value as this PR shows.
This case may occur because they use it's internal Spark version which change the usage of the function.
We could accept this PR because it aligns with Spark function behavior.
What do you think? @rui-mo
Please also update the document https://github.com/facebookincubator/velox/blob/main/velox/docs/functions/spark/aggregate.rst?plain=1#L27
Please update the title to Support the null value in bloom_filter_agg Spark aggregate function
@jinchengchenghh @@rui-mo Kindly reivew, thanks.
@jinchengchenghh Thanks for your comment. It looks good to align with Spark's behavior.
@rui-mo @jinchengchenghh PTAL.
@rui-mo @jinchengchenghh PTAL.
@weixiuli Thanks for iterating. I have no further comment.
Please also update the document https://github.com/facebookincubator/velox/blob/main/velox/docs/functions/spark/aggregate.rst?plain=1#L27
please note this comment
@mbasmanova Kindly reivew, thanks.
@weixiuli I googled for bloom_filter_agg Spark function, but nothing came up. Would you share a link to Spark documentation for this function? I'm trying to understand what has changed from the time this function was added to Velox and now. Was the original implementation incorrect? How can we ensure that this implementation is right?
@weixiuli I googled for bloom_filter_agg Spark function, but nothing came up. Would you share a link to Spark documentation for this function? I'm trying to understand what has changed from the time this function was added to Velox and now. Was the original implementation incorrect? How can we ensure that this implementation is right?
Thanks @mbasmanova I've already described the differences between Velox and spark in BloomFilterAggregate at the beginning of PR, please take a look and the the link to Spark documentation for this function is
https://github.com/apache/spark/blob/6cdca10f148433664b3e2be6f655b0ddba817537/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/BloomFilterAggregate.scala#L180-L188
@weixiuli I googled for bloom_filter_agg Spark function, but nothing came up. Would you share a link to Spark documentation for this function? I'm trying to understand what has changed from the time this function was added to Velox and now. Was the original implementation incorrect? How can we ensure that this implementation is right?
Thanks @mbasmanova I've already described the differences between Velox and spark in BloomFilterAggregate at the beginning of PR, please take a look and the
the link to Spark documentation for this functionis https://github.com/apache/spark/blob/6cdca10f148433664b3e2be6f655b0ddba817537/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/BloomFilterAggregate.scala#L180-L188
@mbasmanova PTAL.
@weixiuli
Would you share a link to Spark documentation for this function? I'm trying to understand what has changed from the time this function was added to Velox and now. Was the original implementation incorrect? How can we ensure that this implementation is right?
Would you answer all of these questions? The link you provided points to some code in Spark. Code is not documentation.
Would you answer all of these questions? The link you provided points to some code in Spark. Code is not documentation.
The Spark BloomFilterAggregate document can refer to https://books.japila.pl/spark-sql-internals/expressions/BloomFilterAggregate/
@mbasmanova Kindly review, thanks.
@mbasmanova Could you help me review this pr again, thanks.
@weixiuli I was out sick last week. I'm back today, but still recovering. Would you rebase this PR? I'll take a look sometime this week.
@mbasmanova PTAL.