azure-sdk-for-java icon indicating copy to clipboard operation
azure-sdk-for-java copied to clipboard

[QUERY] Throughput control failing - Cosmos Spark Connector on Azure Databricks / PySpark

Open bmckalla opened this issue 1 year ago • 5 comments

Query/Question The throughput control on the Cosmos spark connector seems to be failing to provide even a nominally close throttle of throughput usage. We currently have 40000 RU/s set globally across our DB. Generally, we only interact with one collection at a time. We also confirmed that our partitionKeys were evenly distributed (because I am aware that each partition only receives an equal share of the global RU). However, with setting throughput control to say 0.5 on a particular pipeline, it will almost always hit 100% usage. It depends on the query, but in some cases I even have it set to 0.01 and it still uses over 50% when reading a subset of columns from a single collection.

  1. We first tried using the globalControl configuration and came across that the throughput container was in itself causing all RU/s to be consumed. The container queried is around 6 million documents. Not sure on average size but I can get that information. Something like 10,000 of ours records costs 140 RU I believe.
(
        spark.read.format('cosmos.oltp')
        .option(
            'spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint',
            cosmos_endpoint,
        )
        .option('spark.cosmos.read.inferSchema.enabled', 'true')
        .option('spark.cosmos.accountEndpoint', cosmos_endpoint)
        .option('spark.cosmos.accountKey', cosmos_master_key)
        .option('spark.cosmos.database', db)
        .option('spark.cosmos.container', container)
        .option('spark.cosmos.read.partitioning.strategy', 'Aggressive')
        .option('spark.cosmos.throughputControl.enabled', 'true')
        .option('spark.cosmos.throughputControl.name', 'ThroughputControl')
        .option('spark.cosmos.throughputControl.targetThroughputThreshold', '0.5')
        .option('spark.cosmos.throughputControl.globalControl.container', throughput_container)
        .option('spark.cosmos.throughputControl.globalControl.database', db)
        .load()
        .select(
          'id', 
          'address', 'city', 'state', 'zip_code', 
          'address_2', 'city_2', 'state_2', 'zip_code_2', 
          'address_3', 'city_3', 'state_3', 'zip_code_3', 
          'old_residences'  # an array of objects
        )
        .write
        .option('overwriteSchema', 'true')
        .mode('overwrite')
        .saveAsTable('some_schema.address')
 )

This hit the full 40,000 RU/s and stayed there until completed.

  1. Next we tried using local control, aka .option('spark.cosmos.throughputControl.globalControl.useDedicatedContainer', 'false')

Which also using a targetThroughputThreshold = 0.5 maxed out RU usage pretty much immediately.

  1. Finally, with some experimentation we tuned to using 0.01 for that particular query and it hovered at 23k usage but sometimes even spiked to the full 40k usage for several minutes.

While I'm aware the threshold set has no guarantee to be how many RUs are consumed, it seems pretty insane that setting it to 1% still uses over 50% of global RUs.

Why is this not a Bug or a feature Request? I am unsure if this is a bug, considering I don't see any other issues posted here that talk about this. So, I'm curious if I'm doing something incorrectly.

Setup (please complete the following information if applicable):

  • Azure Databricks 12.2 LTS ML - Apache Spark 3.3.2, Scala 2.12
  • Using PySpark
  • com.azure.cosmos.spark:azure-cosmos-spark_3-3_2-12:4.24.1

Information Checklist Kindly make sure that you have added all the following information above and checkoff the required fields otherwise we will treat the issuer as an incomplete report

  • [x] Query Added
  • [x] Setup information Added

bmckalla avatar Feb 08 '24 19:02 bmckalla

@kushagraThapar could you help route this issue?

joshfree avatar Feb 09 '24 17:02 joshfree

@xinlian12 can you please take a look at this, thanks!

kushagraThapar avatar Feb 09 '24 17:02 kushagraThapar

@kushagraThapar @xinlian12 heartbeat check on this?

bmckalla avatar Feb 20 '24 20:02 bmckalla

@bmckalla for the RU usage, are you referring to the normalized RU usage or the avg RU usage. currently the throughput control is a best efforts, so you might still see it hitting 100% normalized RU usage.

From the config you have shared, can you try the following two changes and see how it goes:

  1. use Restrictive read strategy: spark.cosmos.read.partitioning.strategy -> Restrictive
  2. try to make the above change first and check whether any RU usage change. If not, tune down this parameter spark.cosmos.read.maxItemCount to 100 or 10 and see any differences

xinlian12 avatar Feb 21 '24 15:02 xinlian12

@bmckalla for the RU usage, are you referring to the normalized RU usage or the avg RU usage. currently the throughput control is a best efforts, so you might still see it hitting 100% normalized RU usage.

Normalized. However, we are also viewing the provisioned throughput at the same time. For example, if it's at 100% normalized but provisioned throughput is 32k then it's really only using (or peaking) that amount of RUs over that time range.

From the config you have shared, can you try the following two changes and see how it goes:

1. use `Restrictive` read strategy: spark.cosmos.read.partitioning.strategy -> Restrictive

2. try to make the above change first and check whether any RU usage change. If not, tune down this parameter `spark.cosmos.read.maxItemCount` to 100 or 10 and see any differences

I can give it a shot.

bmckalla avatar Feb 22 '24 20:02 bmckalla