velox icon indicating copy to clipboard operation
velox copied to clipboard

Support the null value in bloom_filter_agg Spark aggregate function

Open weixiuli opened this issue 2 years ago • 23 comments

Currently, the velox BloomFilterAggregate checks the input row and throws an exception if there are some null values in the row. So we need to be consistent with spark's behavior and ignore null values.

The spark BloomFilterAggregate will Ignore null values. https://github.com/apache/spark/blob/6cdca10f148433664b3e2be6f655b0ddba817537/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/BloomFilterAggregate.scala#L180-L188

 override def update(buffer: BloomFilter, inputRow: InternalRow): BloomFilter = {
    val value = child.eval(inputRow)
    // Ignore null values.
    if (value == null) {
      return buffer
    }
    updater.update(buffer, value)
    buffer
  }

The unittest:

class VeloxBloomFilteAggregateFunctionsSuite extends VeloxWholeStageTransformerSuite {
  val funcId_bloom_filter_agg = new FunctionIdentifier("bloom_filter_agg")
  override protected val backend: String = "velox"
  override protected val resourcePath: String = "/tpch-data-parquet-velox"
  override protected val fileFormat: String = "parquet"
  val table = "bloomTable"

  protected def registerFunAndcCreatTable(): Unit ={
    val funcId_bloom_filter_agg = new FunctionIdentifier("bloom_filter_agg")
    // Register 'bloom_filter_agg'
    spark.sessionState.functionRegistry.registerFunction(funcId_bloom_filter_agg,
      new ExpressionInfo(classOf[BloomFilterAggregate].getName, "bloom_filter_agg"),
      (children: Seq[Expression]) => children.size match {
        case 1 => new BloomFilterAggregate(children.head)
        case 2 => new BloomFilterAggregate(children.head, children(1))
        case 3 => new BloomFilterAggregate(children.head, children(1), children(2))
      })
    val schema2 = new StructType().add("a2", IntegerType, nullable = true)
      .add("b2", LongType, nullable = true)
      .add("c2", IntegerType, nullable = true)
      .add("d2", IntegerType, nullable = true)
      .add("e2", IntegerType, nullable = true)
      .add("f2", IntegerType, nullable = true)
    val data2 = Seq(Seq(67, 17L, 45, 91, null, null),
      Seq(98, 63L, 0, 89, null, 40),
      Seq(null, null, 68, 75, 20, 19))
    val rdd2 = spark.sparkContext.parallelize(data2)
    val rddRow2 = rdd2.map(s => Row.fromSeq(s))
    spark.createDataFrame(rddRow2, schema2).write.saveAsTable(table)
  }

  protected def dropFunctionAndTable(): Unit ={
    spark.sessionState.functionRegistry.dropFunction(funcId_bloom_filter_agg)
    spark.sql(s"DROP TABLE IF EXISTS $table")
  }

  override def beforeAll(): Unit = {
    super.beforeAll()
    registerFunAndcCreatTable()
  }
  override def afterAll(): Unit = {
    dropFunctionAndTable()
    super.afterAll()
  }

  test("Test bloom_filter_agg with Nulls input") {
    spark.sql(
      s"""
       SELECT  bloom_filter_agg(b2) from $table
         """.stripMargin).show
  }

weixiuli avatar Dec 05 '23 03:12 weixiuli

Deploy Preview for meta-velox ready!

Name Link
Latest commit 709c836a657925e9dd0ae7e635f9c7160ce73c81
Latest deploy log https://app.netlify.com/sites/meta-velox/deploys/6638cb08f2c2d50008c3be11
Deploy Preview https://deploy-preview-7872--meta-velox.netlify.app
Preview on mobile
Toggle QR Code...

QR Code

Use your smartphone camera to open QR code link.

To edit notification comments on pull requests, go to your Netlify site configuration.

netlify[bot] avatar Dec 05 '23 03:12 netlify[bot]

Hi @weixiuli!

Thank you for your pull request and welcome to our community.

Action Required

In order to merge any pull request (code, docs, etc.), we require contributors to sign our Contributor License Agreement, and we don't seem to have one on file for you.

Process

In order for us to review and merge your suggested changes, please sign at https://code.facebook.com/cla. If you are contributing on behalf of someone else (eg your employer), the individual CLA may not be sufficient and your employer may need to sign the corporate CLA.

Once the CLA is signed, our tooling will perform checks and validations. Afterwards, the pull request will be tagged with CLA signed. The tagging process may take up to 1 hour after signing. Please give it that time before contacting us about it.

If you have received this in error or have any questions, please contact us at [email protected]. Thanks!

facebook-github-bot avatar Dec 05 '23 03:12 facebook-github-bot

Thank you for signing our Contributor License Agreement. We can now accept your code for this (and any) Meta Open Source project. Thanks!

facebook-github-bot avatar Dec 05 '23 05:12 facebook-github-bot

ping @marin-ma @jinchengchenghh Kindly reivew, thanks.

weixiuli avatar Dec 12 '23 13:12 weixiuli

The bloom_filter_agg is opensource Spark internal function, all the first argument of it is wrapped by xxhash, so it won't be null. But for bloom_filter_agg itself, it can accept null value as this PR shows. This case may occur because they use it's internal Spark version which change the usage of the function. We could accept this PR because it aligns with Spark function behavior. What do you think? @rui-mo image

jinchengchenghh avatar Dec 13 '23 02:12 jinchengchenghh

Please also update the document https://github.com/facebookincubator/velox/blob/main/velox/docs/functions/spark/aggregate.rst?plain=1#L27

jinchengchenghh avatar Dec 13 '23 02:12 jinchengchenghh

Please update the title to Support the null value in bloom_filter_agg Spark aggregate function

jinchengchenghh avatar Dec 13 '23 02:12 jinchengchenghh

@jinchengchenghh @@rui-mo Kindly reivew, thanks.

weixiuli avatar Dec 14 '23 03:12 weixiuli

@jinchengchenghh Thanks for your comment. It looks good to align with Spark's behavior.

rui-mo avatar Dec 14 '23 03:12 rui-mo

@rui-mo @jinchengchenghh PTAL.

weixiuli avatar Dec 15 '23 02:12 weixiuli

@rui-mo @jinchengchenghh PTAL.

weixiuli avatar Jan 02 '24 10:01 weixiuli

@weixiuli Thanks for iterating. I have no further comment.

rui-mo avatar Jan 02 '24 10:01 rui-mo

Please also update the document https://github.com/facebookincubator/velox/blob/main/velox/docs/functions/spark/aggregate.rst?plain=1#L27

please note this comment

jinchengchenghh avatar Jan 04 '24 08:01 jinchengchenghh

@mbasmanova Kindly reivew, thanks.

weixiuli avatar Apr 02 '24 06:04 weixiuli

@weixiuli I googled for bloom_filter_agg Spark function, but nothing came up. Would you share a link to Spark documentation for this function? I'm trying to understand what has changed from the time this function was added to Velox and now. Was the original implementation incorrect? How can we ensure that this implementation is right?

mbasmanova avatar Apr 02 '24 14:04 mbasmanova

@weixiuli I googled for bloom_filter_agg Spark function, but nothing came up. Would you share a link to Spark documentation for this function? I'm trying to understand what has changed from the time this function was added to Velox and now. Was the original implementation incorrect? How can we ensure that this implementation is right?

Thanks @mbasmanova I've already described the differences between Velox and spark in BloomFilterAggregate at the beginning of PR, please take a look and the the link to Spark documentation for this function is https://github.com/apache/spark/blob/6cdca10f148433664b3e2be6f655b0ddba817537/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/BloomFilterAggregate.scala#L180-L188

weixiuli avatar Apr 03 '24 03:04 weixiuli

@weixiuli I googled for bloom_filter_agg Spark function, but nothing came up. Would you share a link to Spark documentation for this function? I'm trying to understand what has changed from the time this function was added to Velox and now. Was the original implementation incorrect? How can we ensure that this implementation is right?

Thanks @mbasmanova I've already described the differences between Velox and spark in BloomFilterAggregate at the beginning of PR, please take a look and the the link to Spark documentation for this function is https://github.com/apache/spark/blob/6cdca10f148433664b3e2be6f655b0ddba817537/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/BloomFilterAggregate.scala#L180-L188

@mbasmanova PTAL.

weixiuli avatar Apr 07 '24 09:04 weixiuli

@weixiuli

Would you share a link to Spark documentation for this function? I'm trying to understand what has changed from the time this function was added to Velox and now. Was the original implementation incorrect? How can we ensure that this implementation is right?

Would you answer all of these questions? The link you provided points to some code in Spark. Code is not documentation.

mbasmanova avatar Apr 08 '24 15:04 mbasmanova

Would you answer all of these questions? The link you provided points to some code in Spark. Code is not documentation.

The Spark BloomFilterAggregate document can refer to https://books.japila.pl/spark-sql-internals/expressions/BloomFilterAggregate/

weixiuli avatar Apr 09 '24 15:04 weixiuli

@mbasmanova Kindly review, thanks.

weixiuli avatar Apr 12 '24 01:04 weixiuli

@mbasmanova Could you help me review this pr again, thanks.

weixiuli avatar Apr 22 '24 03:04 weixiuli

@weixiuli I was out sick last week. I'm back today, but still recovering. Would you rebase this PR? I'll take a look sometime this week.

mbasmanova avatar Apr 22 '24 15:04 mbasmanova

@mbasmanova PTAL.

weixiuli avatar May 07 '24 07:05 weixiuli