[Task]: [RRIO] [Throttle] transform that slows down element transmission without an external resource
Feature: Throttle a PCollection
As a Beam developer, I want to slow down element throughput of an API request PCollection, so that I can minimize API overusage and save costs.
Scenario: I apply the Throttle PTransform to a bounded PCollection in my pipeline
Given Input PCollection is bounded
And I configured Throttle with <configured_rate>
And <size> elements in PCollection will be processed by the pipeline
When pipeline runs
Then Output PCollection emits elements <throughput_rate>
And Output PCollection is assigned to the same WindowStrategy as the Input PCollection
And Output PCollection elements' timestamps are preserved from their original Input
Examples:
| configured_rate | size | throughput_rate |
| n / second | <= n | as soon as available |
| n / second | > n | (0, n / second] |
Scenario: I apply the Throttle PTransform to an unbounded PCollection and I forget to set parameters with additional streaming configuration
Given Input PCollection is unbounded
And I configured Throttle with a transmission rate
When pipeline runs
Then I see an error that Throttle is missing additional streaming configuration
Scenario Outline: I apply the Throttle PTransform to an unbounded PCollection in my pipeline
Given Input PCollection is unbounded
And I configured Throttle with <configured_rate>
And I configured Throttle with additional streaming configuration
And PCollection that emits at <input_rate> will be processed by the pipeline
When pipeline runs
Then Throttle applies GroupIntoBatches to Input PCollection
And Throttle forwards additional streaming configuration parameters to the GroupIntoBatches transform
And Output PCollection emits elements <throughput_rate>
And Output PCollection is assigned to the same WindowStrategy as the Input PCollection
And Output PCollection elements' timestamps are preserved from their original GroupIntoBatches output
Examples:
| configured_rate | input_rate | throughput_rate |
| n / second | <= n / second | <= n / second |
| n / second | > n / second | <= n / second |
Transform configuration details
| Parameter name | Parameter type | Description | Optional/Required |
|---|---|---|---|
| maximumRate | Rate (see below) | The maximum goal rate to throttle elements in PCollection | Required |
| collectMetrics | boolean | Configures the transform to count inputs and outputs | Optional |
| streamBufferingSize | long | The number of elements to buffer prior to throttling; forwarded to GroupIntoBatches for unbounded PCollections. | Required (Unbounded PCollection) |
| streamMaxBufferingDuration | Duration | Duration to buffer prior to throtting; forwarded to GroupIntoBatches for unbounded PCollections | optional |
Rate type
| Parameter name | Parameter type | Description | Optional/Required |
|---|---|---|---|
| numElements | Integer | The number of elements | Required |
| interval | Duration | The interval duration within which to throttle numElements | Required |
Alternatives considered
Stateful DoFns
One approach to this problem employs the use of State and Timers also known as "Stateful DoFns". Briefly described, the algorithm involves collecting elements in some collection stored in state and emit them after an OnTimer trigger. As of this writing (Beam version 2.53.0), the OnTimer trigger only fires with Unbounded or stream PCollection elements processed by the DoFn. Batch does not fire OnTimer triggers for both event and process time domains. Batch is generally more problematic with API overusage as it typically involves a larger amount of elements processed within a short period of time.
Redis
Another explored approach was the use of redis.io. Essentially, incoming elements were offloaded to a Redis queue. Using PeriodicImpulse, a separate downstream transform checked for a shared quota value, and if available, dequeued and emitted an element. Additionally, a separate PeriodicImpulse driven transform refreshed the quota value on a fixed interval. Despite working well, the problem with this approach is that the runner has no signal for the backlog of remaining elements and could potentially add pressure to the Redis queue. Also, offloading decouples the runner from the element processing state and could potentially retry and duplicate data.
Measures of Ready
No blockers
Measures of Done
- [x] Tests validate that the interval between outputted elements is no greater than a configured value
- [x] Tests validate upstream window assignments are preserved
- [x] Tests validate no data loss within reasonable unittest compatible sizes
Issue Priority
Priority: 2 (default / most normal work should be filed as P2)