beam icon indicating copy to clipboard operation
beam copied to clipboard

[Task]: [RRIO] [Throttle] transform that slows down element transmission without an external resource

Open damondouglas opened this issue 2 years ago • 0 comments

Feature: Throttle a PCollection

As a Beam developer, I want to slow down element throughput of an API request PCollection, so that I can minimize API overusage and save costs.

Scenario: I apply the Throttle PTransform to a bounded PCollection in my pipeline

    Given Input PCollection is bounded
      And I configured Throttle with <configured_rate>
      And <size> elements in PCollection will be processed by the pipeline
     When pipeline runs
     Then Output PCollection emits elements <throughput_rate>
      And Output PCollection is assigned to the same WindowStrategy as the Input PCollection
      And Output PCollection elements' timestamps are preserved from their original Input

    Examples:
      | configured_rate | size | throughput_rate      |
      | n / second      | <= n | as soon as available |
      | n / second      | > n  | (0, n / second]      |

Scenario: I apply the Throttle PTransform to an unbounded PCollection and I forget to set parameters with additional streaming configuration

    Given Input PCollection is unbounded
      And I configured Throttle with a transmission rate
     When pipeline runs
     Then I see an error that Throttle is missing additional streaming configuration

Scenario Outline: I apply the Throttle PTransform to an unbounded PCollection in my pipeline

    Given Input PCollection is unbounded
      And I configured Throttle with <configured_rate>
      And I configured Throttle with additional streaming configuration
      And PCollection that emits at <input_rate> will be processed by the pipeline
     When pipeline runs
     Then Throttle applies GroupIntoBatches to Input PCollection
      And Throttle forwards additional streaming configuration parameters to the GroupIntoBatches transform
      And Output PCollection emits elements <throughput_rate>
      And Output PCollection is assigned to the same WindowStrategy as the Input PCollection
      And Output PCollection elements' timestamps are preserved from their original GroupIntoBatches output

    Examples:
      | configured_rate | input_rate    | throughput_rate |
      | n / second      | <= n / second | <= n / second   |
      | n / second      | > n / second  | <= n / second   |

Transform configuration details

Parameter name Parameter type Description Optional/Required
maximumRate Rate (see below) The maximum goal rate to throttle elements in PCollection Required
collectMetrics boolean Configures the transform to count inputs and outputs Optional
streamBufferingSize long The number of elements to buffer prior to throttling; forwarded to GroupIntoBatches for unbounded PCollections. Required (Unbounded PCollection)
streamMaxBufferingDuration Duration Duration to buffer prior to throtting; forwarded to GroupIntoBatches for unbounded PCollections optional

Rate type

Parameter name Parameter type Description Optional/Required
numElements Integer The number of elements Required
interval Duration The interval duration within which to throttle numElements Required

Alternatives considered

Stateful DoFns

One approach to this problem employs the use of State and Timers also known as "Stateful DoFns". Briefly described, the algorithm involves collecting elements in some collection stored in state and emit them after an OnTimer trigger. As of this writing (Beam version 2.53.0), the OnTimer trigger only fires with Unbounded or stream PCollection elements processed by the DoFn. Batch does not fire OnTimer triggers for both event and process time domains. Batch is generally more problematic with API overusage as it typically involves a larger amount of elements processed within a short period of time.

Redis

Another explored approach was the use of redis.io. Essentially, incoming elements were offloaded to a Redis queue. Using PeriodicImpulse, a separate downstream transform checked for a shared quota value, and if available, dequeued and emitted an element. Additionally, a separate PeriodicImpulse driven transform refreshed the quota value on a fixed interval. Despite working well, the problem with this approach is that the runner has no signal for the backlog of remaining elements and could potentially add pressure to the Redis queue. Also, offloading decouples the runner from the element processing state and could potentially retry and duplicate data.

Measures of Ready

No blockers

Measures of Done

  • [x] Tests validate that the interval between outputted elements is no greater than a configured value
  • [x] Tests validate upstream window assignments are preserved
  • [x] Tests validate no data loss within reasonable unittest compatible sizes

Issue Priority

Priority: 2 (default / most normal work should be filed as P2)

damondouglas avatar Oct 10 '23 22:10 damondouglas