beam icon indicating copy to clipboard operation
beam copied to clipboard

Add BigQuery Storage Read API Enrichment Handler

Open pandasanjay opened this issue 6 months ago • 25 comments

#35196

Description:

This pull request introduces a new EnrichmentSourceHandler for Apache Beam, BigQueryStorageEnrichmentHandler, designed to leverage the Google Cloud BigQuery Storage Read API for efficient data enrichment. This handler provides a high-performance alternative to traditional SQL-based BigQuery lookups within Beam pipelines.

Motivation and Context:

Enriching data by joining PCollection elements with data stored in BigQuery is a common use case. While existing methods often rely on executing SQL queries, the BigQuery Storage Read API offers a more direct and typically faster way to retrieve data, especially for large volumes or when fine-grained row-level access is needed. This handler aims to:

  • Improve the performance of BigQuery enrichments.
  • Reduce BigQuery query costs associated with SQL execution for enrichment tasks.
  • Provide more flexible and programmatic control over data fetching and filtering.

Key Features and Improvements:

The BigQueryStorageEnrichmentHandler offers several enhancements:

  • Efficient Data Retrieval: Utilizes the BigQuery Storage Read API for significantly faster data reads compared to SQL queries, especially for bulk lookups. Data is read in Apache Arrow format, minimizing serialization/deserialization overhead.
  • Flexible Filtering:
    • Supports static filter templates via row_restriction_template.
    • Allows dynamic, per-element filter string generation using row_restriction_template_fn.
  • Advanced Keying and Value Extraction:
    • fields: Specifies input beam.Row fields for generating join keys and for use in filter templates.
    • additional_condition_fields: Allows using input fields for filtering without including them in the join key.
    • condition_value_fn: Provides complete control over generating the dictionary of values used for both filtering and join key creation.
  • Field Renaming/Aliasing: Supports aliasing of selected BigQuery columns (e.g., original_col as alias_col in column_names) to prevent naming conflicts in the enriched beam.Row.
  • Batching Support: Groups multiple input elements to make fewer CreateReadSession calls, reducing API overhead. Batch size and duration are configurable (min_batch_size, max_batch_size, max_batch_duration_secs).
  • Parallel Stream Reading: (Experimental) Employs a ThreadPoolExecutor to read data from multiple streams of a BigQuery Read Session in parallel, potentially improving data fetching throughput. Concurrency is configurable via max_parallel_streams.
  • Custom Row Selection: Includes a latest_value_selector callback that allows users to define custom logic for selecting the desired row when multiple BigQuery rows match a single input request (e.g., picking the record with the most recent timestamp). primary_keys can be used by this selector.
  • Automatic Client Management: Manages the lifecycle of the BigQueryReadClient.

Advantages over Traditional SQL-based BigQuery Enrichment:

  • Performance: Direct access to table storage via the Storage Read API typically bypasses the SQL query processing engine, leading to lower latency and higher throughput, especially for fetching many individual rows or large data segments.
  • Cost Efficiency: Reading data via the Storage API can be more cost-effective than running many small SQL queries, as Storage API pricing is based on data scanned, while query pricing involves slots and scanned data.
  • Scalability: The streaming nature of the Storage Read API is well-suited for scalable data processing in Beam.
  • Reduced Query Complexity: For simple lookups, it avoids the need to construct and manage SQL query strings dynamically.

Documentation:

Comprehensive documentation for this handler, including usage examples, parameter descriptions, features, and limitations, has been added in docs/bigquery_storage_enrichment_handler.md.

Implementation Details:

The handler (sdk/ptyhon/transforms/enrichment_handlers/bigquery_storage_read.py) manages BigQueryReadClient instances, constructs ReadSession requests with appropriate row restrictions and selected fields, and processes the resulting Arrow record batches. It integrates with Beam's Enrichment transform, providing batching and caching key generation.

Testing Considerations:

  • Unit tests for key generation, filter construction, and data processing logic.
  • Integration tests against a live BigQuery instance.
  • Performance benchmarks comparing against SQL-based handlers.

This handler provides a powerful and efficient way to enrich data in Apache Beam pipelines using BigQuery.

Please add a meaningful description for your change here


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • [ ] Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • [ ] Update CHANGES.md with noteworthy changes.
  • [ ] If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels Python tests Java tests Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

pandasanjay avatar Jun 07 '25 15:06 pandasanjay

Hi @sjvanrossum, 👋

Could you please take a look at this pull request when you have a moment?

It introduces a new BigQueryStorageEnrichmentHandler for Apache Beam, which leverages the BigQuery Storage Read API. The goal is to provide a more performant and potentially cost-effective way to enrich PCollections by fetching data from BigQuery, as an alternative to traditional SQL-based lookups.

The PR includes the handler implementation and accompanying documentation.

Please review the changes and let me know if anything needs an update or further clarification.

Thanks, @pandasanjay

pandasanjay avatar Jun 07 '25 15:06 pandasanjay

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

github-actions[bot] avatar Jun 07 '25 16:06 github-actions[bot]

Please fix the failed workflows.

liferoad avatar Jun 08 '25 14:06 liferoad

Thanks for the PR, @pandasanjay 🙏

You might want to take a look at the CI test results. Also, this guide could help address some of the linting issues: 🔗 https://cwiki.apache.org/confluence/display/BEAM/Python+Tips

Additionally, you can check out my PR for the CloudSQL Enrichment Handler (#34398). I believe some of the feedback there could be relevant and applicable here as well.

mohamedawnallah avatar Jun 09 '25 11:06 mohamedawnallah

Thank you @mohamedawnallah for providing additional details! 👍

I’m planning to make a few more documentation updates similar to yours.

It looks like the pipeline failures aren’t related to my changes—I’m currently investigating the linting issues. Do these pipelines often fail and require multiple retries to pass?

pandasanjay avatar Jun 09 '25 12:06 pandasanjay

It looks like the pipeline failures aren’t related to my changes—I’m currently investigating the linting issues. Do these pipelines often fail and require multiple retries to pass?

Some CI tests can occasionally be flaky, but this doesn’t happen very often. To minimize issues, make sure you’re using the latest code from the Beam upstream master branch (git merge) and run all tests locally (gradlew tasks is helpful here). If any CI tests still fail, This can be investigated from CI test logs and resolve as you go. 👍

mohamedawnallah avatar Jun 09 '25 13:06 mohamedawnallah

assign set of reviewers

pandasanjay avatar Jun 16 '25 09:06 pandasanjay

Assigning reviewers:

R: @damccorm for label python. R: @liferoad for label website.

Note: If you would like to opt out of this review, comment assign to next reviewer.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

github-actions[bot] avatar Jun 16 '25 09:06 github-actions[bot]

Looks like there is a need for a BQ table to make the examples pipeline to pass. image project_id = "apache-beam-testing" dataset = "beam-test" table_name = "bigquery-enrichment-test-products"

-- BigQuery DDL for enrichment_with_bigquery_storage_basic

CREATE TABLE `apache-beam-testing.beam-test.bigquery-enrichment-test-products` (
    id INT64,
    product_name STRING,
    category STRING,
    unit_price FLOAT64
);

INSERT INTO `apache-beam-testing.beam-test.bigquery-enrichment-test-products` (id, product_name, category, unit_price) VALUES
    (101, 'Laptop', 'Electronics', 999.99),
    (102, 'Desk Chair', 'Furniture', 199.99),
    (103, 'Monitor', 'Electronics', 299.99);

If this can be configured in any config in code, please let me know.

@damccorm Could you please help here. Thanks :)

pandasanjay avatar Jun 17 '25 06:06 pandasanjay

@pandasanjay – It looks like the example pipelines fail with this error:

> from google.api_core.exceptions import TooManyRequests
E   ModuleNotFoundError: No module named 'google.api_core'

Given this to be installed have you tested this with BQ table in your Google Cloud and the tests pass?

mohamedawnallah avatar Jun 17 '25 06:06 mohamedawnallah

@pandasanjay – It looks like the example pipelines fail with this error:

> from google.api_core.exceptions import TooManyRequests
E   ModuleNotFoundError: No module named 'google.api_core'

Given this to be installed have you tested this with BQ table in your Google Cloud and the tests pass?

@mohamedawnallah, you’re right.

I’ve noticed that wherever this module is used, the Beam pipeline is skipping those tests. The pyproject.toml file does list the necessary dependency, so this might be an issue with the pipeline itself. I suspect that’s also why there aren’t any examples available for BQ enrichment.

@damccorm would you be able guide us here. :)

pandasanjay avatar Jun 20 '25 19:06 pandasanjay

I’ve noticed that wherever this module is used, the Beam pipeline is skipping those tests

@pandasanjay I found it depends mostly on the GitHub runner/workflow those tests run on. I have recently come across this when I have been working on Milvus Enrichment Handler #35216.

Using try-except on the related imports that need dependencies and skipping that test would solve the issue. That being said, I think it may change from case to case, so someone may check that this happens on specific workflows, not all. Otherwise, those test cases would never run since they are entirely skipped.

An examples for this on how it could look like: https://github.com/apache/beam/blob/c786cd004b78c5690613623ab9658ff9febffbca/sdks/python/apache_beam/transforms/enrichment_handlers/bigtable_it_test.py#L31-L41

mohamedawnallah avatar Jun 20 '25 19:06 mohamedawnallah

Have you actually seen poor performance from the existing handler in practice? Since we're usually not doing bulk reads with enrichment, its unclear to me that the storage read approach will actually be more performant

@damccorm – For our use case of building an SCD Type 2 pipeline focused on slot consumption and cost optimization, we've observed that using the BigQuery Storage Read API offers significant advantages:

  • Improved Performance: The Storage Read API enables concurrent data reads through streaming, eliminating the need to compete for slots as required by the traditional query engine.
  • Cost Optimization: There are two main ways to save costs:
    • No Need for Dedicated Slots: Since the Storage Read API doesn't require slot reservations, we avoid the ongoing costs associated with dedicated slots.
    • Lower Data Scan Costs: As of June 2024, the cost for scanning data via the Storage Read API is $1.10 per TiB, compared to $5.00 per TiB when using the BigQuery Query Engine.

Only thing is if we can get the the partioning and clustering right, this will be cheap and efficent solution.

References: BigQuery Storage Read API Pricing BigQuery On-Demand Pricing

Let me know if your thoughts :)

pandasanjay avatar Jun 20 '25 19:06 pandasanjay

I’ve noticed that wherever this module is used, the Beam pipeline is skipping those tests

@pandasanjay I found it depends mostly on the GitHub runner/workflow those tests run on. I have recently come across this when I have been working on Milvus Enrichment Handler #35216.

Using try-except on the related imports that need dependencies and skipping that test would solve the issue. That being said, I think it may change from case to case, so someone may check that this happens on specific workflows, not all. Otherwise, those test cases would never run since they are entirely skipped.

@mohamedawnallah I’ve noticed the same issue and even used that approach to fix my integration tests. However, it seems that all the example tests are failing at that point in the GitHub Actions workflow, which raises the question of whether this module has ever been successfully installed and run via GitHub Actions. This aligns with your last point.

I think we could use some guidance on how to proceed here.

pandasanjay avatar Jun 20 '25 19:06 pandasanjay

I’ve noticed that wherever this module is used, the Beam pipeline is skipping those tests

@pandasanjay I found it depends mostly on the GitHub runner/workflow those tests run on. I have recently come across this when I have been working on Milvus Enrichment Handler #35216. Using try-except on the related imports that need dependencies and skipping that test would solve the issue. That being said, I think it may change from case to case, so someone may check that this happens on specific workflows, not all. Otherwise, those test cases would never run since they are entirely skipped.

@mohamedawnallah I’ve noticed the same issue and even used that approach to fix my integration tests. However, it seems that all the example tests are failing at that point in the GitHub Actions workflow, which raises the question of whether this module has ever been successfully installed and run via GitHub Actions. This aligns with your last point.

I think we could use some guidance on how to proceed here.

Generally skipping if dependencies are not present is the recommended path. Some tests will only run as postcommits (e.g. see conversation in https://github.com/apache/beam/pull/35395)

In this case since we're just blocking on GCP dependencies which should get installed as part of the postcommit runs - you should be able to run the python postcommit following https://github.com/apache/beam/blob/master/.github/workflows/README.md#workflow-triggers as part of this pr (you would modify https://github.com/apache/beam/blob/master/.github/trigger_files/beam_PostCommit_Python.json). Its always a good idea to make sure tests actually run when they get added

damccorm avatar Jun 23 '25 18:06 damccorm

Reminder, please take a look at this pr: @damccorm @liferoad

github-actions[bot] avatar Jul 01 '25 12:07 github-actions[bot]

Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment assign to next reviewer:

R: @tvalentyn for label python. R: @kennknowles for label website.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

github-actions[bot] avatar Jul 04 '25 12:07 github-actions[bot]

waiting on author

tvalentyn avatar Jul 08 '25 17:07 tvalentyn

waiting on author

derrickaw avatar Aug 12 '25 15:08 derrickaw

hey @pandasanjay, is this PR still relevant?

ahmedabu98 avatar Sep 02 '25 15:09 ahmedabu98

hey @pandasanjay, is this PR still relevant?

Hi @ahmedabu98, Yes.. thanks for taking a look.

I explored two approaches internally:

  1. Enrichment — This implementation runs into Dataflow worker instability because it manages threads manually and doesn’t play well with streaming parallel processing. I could use help here: do you know of any recommended patterns or examples to safely handle concurrent enrichment in streaming DoFns (or alternatives that avoid manual thread management)?

  2. Custom transformers — This approach works reliably. I’ll add that implementation.

I’m currently on annual leave and will push the changes when I return. If you have suggestions while I’m away, please share and I’ll incorporate them once I’m back. Thanks!

pandasanjay avatar Sep 03 '25 06:09 pandasanjay

Reminder, please take a look at this pr: @tvalentyn @kennknowles

github-actions[bot] avatar Sep 10 '25 12:09 github-actions[bot]

waiting on author

damccorm avatar Sep 12 '25 15:09 damccorm

waiting on author

damccorm avatar Oct 06 '25 17:10 damccorm