qbeast-spark icon indicating copy to clipboard operation
qbeast-spark copied to clipboard

Overhead of qbeast_hash filtering when doing a Sample

Open osopardo1 opened this issue 3 years ago • 5 comments

What went wrong? When a sample() is performed against a qbeast dataset, the qbeast sql extension changes the Sample operation into a Filter to:

  • Pushdown the filter to the data source
  • Filter the cubes that satisfy the [lowerBound, upperBound]
  • In the end, read less data

But in this process, we are producing a hash in memory of the columns indexed to make sure that the records matched with the Sample range. This hash has an extra cost when we don't need the information of those columns to satisfy the query. For example, if we have a dataset indexed by (user_id,product_id) and we want to know the avg price, we are avoiding the pruning of user_id, product_id columns.

We need to study how to overcome this problem. One solution is to avoid the in-memory filtering for those files that have a maxWeight < upperBound, because we know for sure that all the records in that file match the predicate.

osopardo1 avatar Jan 27 '22 10:01 osopardo1

UPDATE: recent revision of the code evaluates that the Weight (corresponding to qbeast_hash of the indexed columns) should be computed in another way to stick to the randomness of the algorithm:

  1. Either the qbeast_hash it's done with all the columns.
  2. Either we use Random.nextInt() function.

In both cases, the reading process faces the issue of reproducing the value in a correct way. For that, there's a theoretical solution:

  1. In the first one, we can redo the hash like before. But this will add an extra performance penalty when the query does not need all the columns in a table.
  2. In the second one, there's a risk with distributing the indexing because we have to make sure that Random is spread across partitions and processes.
  3. In both the first and the second one, we can overcome the read with the following proposal: Since we know the max and min values of a file from the metadata written in the DeltaLog, we should be able to re-generate a Random value between [min, max] and then filter by that. This ends directly with the overhead problem, but we need to figure out how to structure the code, which processes are involved, and if it works as expected. (Also, it could be possible that two equal samples output different results...)

osopardo1 avatar Jun 30 '22 15:06 osopardo1

We can just use Random.nextInt() to implement the qbeast_hash function and forget about the indexed columns (only for writing). The complex part comes when we read the data. Two options could be to either write a random weight per cube (user invasive) or create the random weight in memory to use it to filter the data per cube.

Adricu8 avatar Aug 29 '22 10:08 Adricu8

Update with a brief description of how I am trying to implement this:

  1. Write: We can change qbeast hash function and write a Random.nextInt() as the weight column.

  2. Read: Either changing spark code (not sure we want to do this for this project) or try to add a new spark extension that allows us to make the necessary changes in the Physical Plan stage. From what I understand, these changes will modify the way we pushdown the filters when a sample/filter operator is performed. Right now, we add the SampleRulewhich allows us to transform the sample operator to a filter that can be pushed down to the data source. What we want to do now is apply these changes a bit deeper in the tree. Instead, we can try to do it during the execution of the Physical Plan. The idea is to read the data per file (cube) and filter the data that we need, because we know the min/max Weight per cube, we should be able to implement this.

  • injectPlannerStrategy could be one entry point.

  • DataSourceScanExec: This file contains important classes and functions that we want to modify. It containes classes FileSourceScanExec: "Physical plan node for scanning data from HadoopFsRelations.", functions createBucketedReadRDDand createNonBucketedReadRDDwhich call method .getPartitionedFile per each file in the HaddopFSrelation given as input.

  • PartitionedFile: Takes the offset and length parameters to read a chunk from a file. The idea is to use it with a logic like this:

    def getPartitionedFileFiltered(
        file: FileStatus,
        filePath: Path,
        partitionValues: InternalRow,
        percentage: Double,
        min: Long,
        max: Long): PartitionedFile = {

      val offset = if (percentage < 1.0) {
        min + Random.nextInt((max - min).toInt + 1)
      } else {
        0
      }
      val hosts = getBlockHosts(getBlockLocations(file), 0, file.getLen - offset)
      PartitionedFile(partitionValues, filePath.toUri.toString, 0, file.getLen - offset, hosts)
    }

Where we directly read the data that we need without filtering. If this does not work we can filter the data after reading it in createBucketedReadRDD.

  • Challenges: The main challenge that I see now is how to bind the LogicalPlan that we create with the SparkStrategy and connect it to the file with the implemented changes.

Adricu8 avatar Sep 06 '22 15:09 Adricu8

Issue on hold due to current developments on #175

osopardo1 avatar Mar 20 '23 15:03 osopardo1

Due to the complexity of adding these changes to the code, we will postpone the solution of this issue.

Keep in mind that the only case affected on performance is when reading a SAMPLE of files.

osopardo1 avatar Oct 20 '23 09:10 osopardo1