Add BigQuery Storage Read API Enrichment Handler
#35196
Description:
This pull request introduces a new EnrichmentSourceHandler for Apache Beam, BigQueryStorageEnrichmentHandler, designed to leverage the Google Cloud BigQuery Storage Read API for efficient data enrichment. This handler provides a high-performance alternative to traditional SQL-based BigQuery lookups within Beam pipelines.
Motivation and Context:
Enriching data by joining PCollection elements with data stored in BigQuery is a common use case. While existing methods often rely on executing SQL queries, the BigQuery Storage Read API offers a more direct and typically faster way to retrieve data, especially for large volumes or when fine-grained row-level access is needed. This handler aims to:
- Improve the performance of BigQuery enrichments.
- Reduce BigQuery query costs associated with SQL execution for enrichment tasks.
- Provide more flexible and programmatic control over data fetching and filtering.
Key Features and Improvements:
The BigQueryStorageEnrichmentHandler offers several enhancements:
- Efficient Data Retrieval: Utilizes the BigQuery Storage Read API for significantly faster data reads compared to SQL queries, especially for bulk lookups. Data is read in Apache Arrow format, minimizing serialization/deserialization overhead.
- Flexible Filtering:
- Supports static filter templates via
row_restriction_template. - Allows dynamic, per-element filter string generation using
row_restriction_template_fn.
- Supports static filter templates via
- Advanced Keying and Value Extraction:
fields: Specifies inputbeam.Rowfields for generating join keys and for use in filter templates.additional_condition_fields: Allows using input fields for filtering without including them in the join key.condition_value_fn: Provides complete control over generating the dictionary of values used for both filtering and join key creation.
- Field Renaming/Aliasing: Supports aliasing of selected BigQuery columns (e.g.,
original_col as alias_colincolumn_names) to prevent naming conflicts in the enrichedbeam.Row. - Batching Support: Groups multiple input elements to make fewer
CreateReadSessioncalls, reducing API overhead. Batch size and duration are configurable (min_batch_size,max_batch_size,max_batch_duration_secs). - Parallel Stream Reading: (Experimental) Employs a
ThreadPoolExecutorto read data from multiple streams of a BigQuery Read Session in parallel, potentially improving data fetching throughput. Concurrency is configurable viamax_parallel_streams. - Custom Row Selection: Includes a
latest_value_selectorcallback that allows users to define custom logic for selecting the desired row when multiple BigQuery rows match a single input request (e.g., picking the record with the most recent timestamp).primary_keyscan be used by this selector. - Automatic Client Management: Manages the lifecycle of the
BigQueryReadClient.
Advantages over Traditional SQL-based BigQuery Enrichment:
- Performance: Direct access to table storage via the Storage Read API typically bypasses the SQL query processing engine, leading to lower latency and higher throughput, especially for fetching many individual rows or large data segments.
- Cost Efficiency: Reading data via the Storage API can be more cost-effective than running many small SQL queries, as Storage API pricing is based on data scanned, while query pricing involves slots and scanned data.
- Scalability: The streaming nature of the Storage Read API is well-suited for scalable data processing in Beam.
- Reduced Query Complexity: For simple lookups, it avoids the need to construct and manage SQL query strings dynamically.
Documentation:
Comprehensive documentation for this handler, including usage examples, parameter descriptions, features, and limitations, has been added in docs/bigquery_storage_enrichment_handler.md.
Implementation Details:
The handler (sdk/ptyhon/transforms/enrichment_handlers/bigquery_storage_read.py) manages BigQueryReadClient instances, constructs ReadSession requests with appropriate row restrictions and selected fields, and processes the resulting Arrow record batches. It integrates with Beam's Enrichment transform, providing batching and caching key generation.
Testing Considerations:
- Unit tests for key generation, filter construction, and data processing logic.
- Integration tests against a live BigQuery instance.
- Performance benchmarks comparing against SQL-based handlers.
This handler provides a powerful and efficient way to enrich data in Apache Beam pipelines using BigQuery.
Please add a meaningful description for your change here
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
- [ ] Mention the appropriate issue in your description (for example:
addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>instead. - [ ] Update
CHANGES.mdwith noteworthy changes. - [ ] If this contribution is large, please file an Apache Individual Contributor License Agreement.
See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.
Hi @sjvanrossum, 👋
Could you please take a look at this pull request when you have a moment?
It introduces a new BigQueryStorageEnrichmentHandler for Apache Beam, which leverages the BigQuery Storage Read API. The goal is to provide a more performant and potentially cost-effective way to enrich PCollections by fetching data from BigQuery, as an alternative to traditional SQL-based lookups.
The PR includes the handler implementation and accompanying documentation.
Please review the changes and let me know if anything needs an update or further clarification.
Thanks, @pandasanjay
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers
Please fix the failed workflows.
Thanks for the PR, @pandasanjay 🙏
You might want to take a look at the CI test results. Also, this guide could help address some of the linting issues: 🔗 https://cwiki.apache.org/confluence/display/BEAM/Python+Tips
Additionally, you can check out my PR for the CloudSQL Enrichment Handler (#34398). I believe some of the feedback there could be relevant and applicable here as well.
Thank you @mohamedawnallah for providing additional details! 👍
I’m planning to make a few more documentation updates similar to yours.
It looks like the pipeline failures aren’t related to my changes—I’m currently investigating the linting issues. Do these pipelines often fail and require multiple retries to pass?
It looks like the pipeline failures aren’t related to my changes—I’m currently investigating the linting issues. Do these pipelines often fail and require multiple retries to pass?
Some CI tests can occasionally be flaky, but this doesn’t happen very often. To minimize issues, make sure you’re using the latest code from the Beam upstream master branch (git merge) and run all tests locally (gradlew tasks is helpful here). If any CI tests still fail, This can be investigated from CI test logs and resolve as you go. 👍
assign set of reviewers
Assigning reviewers:
R: @damccorm for label python. R: @liferoad for label website.
Note: If you would like to opt out of this review, comment assign to next reviewer.
Available commands:
stop reviewer notifications- opt out of the automated review toolingremind me after tests pass- tag the comment author after tests passwaiting on author- shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)
The PR bot will only process comments in the main thread (not review comments).
Looks like there is a need for a BQ table to make the examples pipeline to pass.
project_id = "apache-beam-testing"
dataset = "beam-test"
table_name = "bigquery-enrichment-test-products"
-- BigQuery DDL for enrichment_with_bigquery_storage_basic
CREATE TABLE `apache-beam-testing.beam-test.bigquery-enrichment-test-products` (
id INT64,
product_name STRING,
category STRING,
unit_price FLOAT64
);
INSERT INTO `apache-beam-testing.beam-test.bigquery-enrichment-test-products` (id, product_name, category, unit_price) VALUES
(101, 'Laptop', 'Electronics', 999.99),
(102, 'Desk Chair', 'Furniture', 199.99),
(103, 'Monitor', 'Electronics', 299.99);
If this can be configured in any config in code, please let me know.
@damccorm Could you please help here. Thanks :)
@pandasanjay – It looks like the example pipelines fail with this error:
> from google.api_core.exceptions import TooManyRequests
E ModuleNotFoundError: No module named 'google.api_core'
Given this to be installed have you tested this with BQ table in your Google Cloud and the tests pass?
@pandasanjay – It looks like the example pipelines fail with this error:
> from google.api_core.exceptions import TooManyRequests E ModuleNotFoundError: No module named 'google.api_core'Given this to be installed have you tested this with BQ table in your Google Cloud and the tests pass?
@mohamedawnallah, you’re right.
I’ve noticed that wherever this module is used, the Beam pipeline is skipping those tests. The pyproject.toml file does list the necessary dependency, so this might be an issue with the pipeline itself. I suspect that’s also why there aren’t any examples available for BQ enrichment.
@damccorm would you be able guide us here. :)
I’ve noticed that wherever this module is used, the Beam pipeline is skipping those tests
@pandasanjay I found it depends mostly on the GitHub runner/workflow those tests run on. I have recently come across this when I have been working on Milvus Enrichment Handler #35216.
Using try-except on the related imports that need dependencies and skipping that test would solve the issue. That being said, I think it may change from case to case, so someone may check that this happens on specific workflows, not all. Otherwise, those test cases would never run since they are entirely skipped.
An examples for this on how it could look like: https://github.com/apache/beam/blob/c786cd004b78c5690613623ab9658ff9febffbca/sdks/python/apache_beam/transforms/enrichment_handlers/bigtable_it_test.py#L31-L41
Have you actually seen poor performance from the existing handler in practice? Since we're usually not doing bulk reads with enrichment, its unclear to me that the storage read approach will actually be more performant
@damccorm – For our use case of building an SCD Type 2 pipeline focused on slot consumption and cost optimization, we've observed that using the BigQuery Storage Read API offers significant advantages:
- Improved Performance: The Storage Read API enables concurrent data reads through streaming, eliminating the need to compete for slots as required by the traditional query engine.
- Cost Optimization: There are two main ways to save costs:
- No Need for Dedicated Slots: Since the Storage Read API doesn't require slot reservations, we avoid the ongoing costs associated with dedicated slots.
- Lower Data Scan Costs: As of June 2024, the cost for scanning data via the Storage Read API is $1.10 per TiB, compared to $5.00 per TiB when using the BigQuery Query Engine.
Only thing is if we can get the the partioning and clustering right, this will be cheap and efficent solution.
References: BigQuery Storage Read API Pricing BigQuery On-Demand Pricing
Let me know if your thoughts :)
I’ve noticed that wherever this module is used, the Beam pipeline is skipping those tests
@pandasanjay I found it depends mostly on the GitHub runner/workflow those tests run on. I have recently come across this when I have been working on Milvus Enrichment Handler #35216.
Using try-except on the related imports that need dependencies and skipping that test would solve the issue. That being said, I think it may change from case to case, so someone may check that this happens on specific workflows, not all. Otherwise, those test cases would never run since they are entirely skipped.
@mohamedawnallah I’ve noticed the same issue and even used that approach to fix my integration tests. However, it seems that all the example tests are failing at that point in the GitHub Actions workflow, which raises the question of whether this module has ever been successfully installed and run via GitHub Actions. This aligns with your last point.
I think we could use some guidance on how to proceed here.
I’ve noticed that wherever this module is used, the Beam pipeline is skipping those tests
@pandasanjay I found it depends mostly on the GitHub runner/workflow those tests run on. I have recently come across this when I have been working on Milvus Enrichment Handler #35216. Using try-except on the related imports that need dependencies and skipping that test would solve the issue. That being said, I think it may change from case to case, so someone may check that this happens on specific workflows, not all. Otherwise, those test cases would never run since they are entirely skipped.
@mohamedawnallah I’ve noticed the same issue and even used that approach to fix my integration tests. However, it seems that all the example tests are failing at that point in the GitHub Actions workflow, which raises the question of whether this module has ever been successfully installed and run via GitHub Actions. This aligns with your last point.
I think we could use some guidance on how to proceed here.
Generally skipping if dependencies are not present is the recommended path. Some tests will only run as postcommits (e.g. see conversation in https://github.com/apache/beam/pull/35395)
In this case since we're just blocking on GCP dependencies which should get installed as part of the postcommit runs - you should be able to run the python postcommit following https://github.com/apache/beam/blob/master/.github/workflows/README.md#workflow-triggers as part of this pr (you would modify https://github.com/apache/beam/blob/master/.github/trigger_files/beam_PostCommit_Python.json). Its always a good idea to make sure tests actually run when they get added
Reminder, please take a look at this pr: @damccorm @liferoad
Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment assign to next reviewer:
R: @tvalentyn for label python. R: @kennknowles for label website.
Available commands:
stop reviewer notifications- opt out of the automated review toolingremind me after tests pass- tag the comment author after tests passwaiting on author- shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)
waiting on author
waiting on author
hey @pandasanjay, is this PR still relevant?
hey @pandasanjay, is this PR still relevant?
Hi @ahmedabu98, Yes.. thanks for taking a look.
I explored two approaches internally:
-
Enrichment — This implementation runs into Dataflow worker instability because it manages threads manually and doesn’t play well with streaming parallel processing. I could use help here: do you know of any recommended patterns or examples to safely handle concurrent enrichment in streaming DoFns (or alternatives that avoid manual thread management)?
-
Custom transformers — This approach works reliably. I’ll add that implementation.
I’m currently on annual leave and will push the changes when I return. If you have suggestions while I’m away, please share and I’ll incorporate them once I’m back. Thanks!
Reminder, please take a look at this pr: @tvalentyn @kennknowles
waiting on author
waiting on author