mongo-spark icon indicating copy to clipboard operation
mongo-spark copied to clipboard

Auto Bucket Partitioner

Open rozza opened this issue 7 months ago • 0 comments

A $sample based partitioner that provides support for all collection types. Supports partitioning across single or multiple fields, including nested fields.

The logic for the partitioner is as follows:

  • Calculate the number of documents per partition. Runs a $collStats aggregation to get the average document size.
  • Determines the total count of documents. Uses the $collStats count or by running a countDocuments query if the user supplies their own aggregation.pipeline configuration.
  • Determines the number of partitions. Calculated as: count / number of documents per partition
  • Determines the number of documents to $sample. Calculated as: samples per partition * number of partitions.
  • Creates the aggregation pipeline to generate the partitions.
    [{$match: <the $match stage of the users aggregation pipeline - iff the first stage is a $match>},
     {$sample: <number of documents to $sample>},
     {$addFields: {<partition key projection field>: {<'i': '$fieldList[i]' ...>}} // Only added iff fieldList.size() > 1
     {$bucketAuto: {
            groupBy: <partition key projection field>,
            buckets: <number of partitions>
        }
     }
    ]
    

Configurations:

  • fieldList: The field list to be used for partitioning. Either a single field name or a list of comma separated fields. Defaults to: "_id".
  • chunkSize: The average size (MB) for each partition. Note: Uses the average document size to determine the number of documents per partition so partitions may not be even. Defaults to: 64.
  • samplesPerPartition: The number of samples to take per partition. Defaults to: 10.
  • partitionKeyProjectionField: The field name to use for a projected field that contains all the fields used to partition the collection. Defaults to: "__idx". Recommended to only change if there already is a "__idx" field in the document.

Partitions are calculated as logical ranges. When using sharded clusters these will map closely to ranged chunks. When using with hashed shard keys these logical ranges require broadcast operations.

Similar to the SamplePartitioner however uses the $bucketAuto aggregation stage to generate the partition bounds.

SPARK-356

rozza avatar Jun 26 '24 17:06 rozza