sdk icon indicating copy to clipboard operation
sdk copied to clipboard

WIP: Stream DAG

Open edgarrmondragon opened this issue 3 months ago • 4 comments

Summary by Sourcery

Update the RESTStream backoff_runtime signature and introduce comprehensive Stream DAG functionality for orchestrating complex inter-stream dependencies.

New Features:

  • Update RESTStream.backoff_runtime type annotation to support both int and float backoff values
  • Introduce StreamDAG engine with StreamDependency, SyncTrigger, and relation types for DAG-driven stream orchestration
  • Integrate DAG support into Stream and Tap classes with per-record processing and finalization hooks

Enhancements:

  • Add helper module with common DAG pattern builders (dag_examples.py)

Documentation:

  • Add Stream DAG user guide (docs/stream_dag.md) detailing core concepts, usage patterns, and API

Tests:

  • Add extensive tests for StreamDAG covering dependency management, cycle detection, sync triggers, and Tap integration

edgarrmondragon avatar Sep 03 '25 19:09 edgarrmondragon

Reviewer's Guide

Integrates a StreamDAG system into the core SDK to orchestrate complex inter-stream dependencies, extends Tap to configure and expose the DAG, updates a RESTStream type annotation for numeric backoff, and adds comprehensive tests, examples, helper patterns, and documentation for the new DAG API.

File-Level Changes

Change Details Files
Integrate StreamDAG into base Stream class
  • Imported StreamDAG and added _dag field
  • Hooked into _sync_records to call _process_record_with_dag
  • Called _finalize_with_dag at end of sync
  • Implemented set_dag, _process_record_with_dag, and _finalize_with_dag methods
singer_sdk/streams/core.py
Add DAG setup and accessors in Tap class
  • Introduced _dag field in Tap
  • Defined setup_dag to initialize StreamDAG, register streams, and inject into each stream
  • Added dag property to retrieve the DAG instance
singer_sdk/tap_base.py
Generalize backoff_runtime to support any numeric type
  • Changed _TNum type variable to int
float
  • Updated backoff_runtime signature to return Generator[_TNum, None, None]
  • Add comprehensive StreamDAG tests
    • Created MockStream/MockTap test fixtures
    • Covered adding streams, dependencies, cycles, topological sort
    • Tested all SyncTrigger behaviors and multi-parent scenarios
    tests/core/test_stream_dag.py
    Provide a full usage example with DAG patterns
    • Added an example tap demonstrating multiple DAG patterns
    • Included sample Stream classes, setup_dag implementations, and main runner
    examples/dag_usage_example.py
    Supply helper functions for common DAG relationships
    • Added create_one_to_one, one_to_many, many_to_one, many_to_many, conditional dependency generators
    • Packaged higher-level DAGPatterns for fan-in/fan-out hierarchies
    singer_sdk/helpers/dag_examples.py
    Document StreamDAG functionality and patterns
    • Created docs/stream_dag.md with overview, API reference, examples
    • Outlined SyncTrigger and relation types, usage, best practices
    docs/stream_dag.md

    Tips and commands

    Interacting with Sourcery

    • Trigger a new review: Comment @sourcery-ai review on the pull request.
    • Continue discussions: Reply directly to Sourcery's review comments.
    • Generate a GitHub issue from a review comment: Ask Sourcery to create an issue from a review comment by replying to it. You can also reply to a review comment with @sourcery-ai issue to create an issue from it.
    • Generate a pull request title: Write @sourcery-ai anywhere in the pull request title to generate a title at any time. You can also comment @sourcery-ai title on the pull request to (re-)generate the title at any time.
    • Generate a pull request summary: Write @sourcery-ai summary anywhere in the pull request body to generate a PR summary at any time exactly where you want it. You can also comment @sourcery-ai summary on the pull request to (re-)generate the summary at any time.
    • Generate reviewer's guide: Comment @sourcery-ai guide on the pull request to (re-)generate the reviewer's guide at any time.
    • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the pull request to resolve all Sourcery comments. Useful if you've already addressed all the comments and don't want to see them anymore.
    • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull request to dismiss all existing Sourcery reviews. Especially useful if you want to start fresh with a new review - don't forget to comment @sourcery-ai review to trigger a new review!

    Customizing Your Experience

    Access your dashboard to:

    • Enable or disable review features such as the Sourcery-generated pull request summary, the reviewer's guide, and others.
    • Change the review language.
    • Add, remove or edit custom review instructions.
    • Adjust other review settings.

    Getting Help

    • Contact our support team for questions or feedback.
    • Visit our documentation for detailed guides and information.
    • Keep in touch with the Sourcery team by following us on X/Twitter, LinkedIn or GitHub.

    sourcery-ai[bot] avatar Sep 03 '25 19:09 sourcery-ai[bot]

    Documentation build overview

    📚 Meltano SDK | 🛠️ Build #29427370 | 📁 Comparing 7cb1fe5b1e14fa36ae0bd8cb48a5caf92ee487f1 against latest (f8b69b5020f141a7c86dc38879cf14e448e207ae)


    🔍 Preview build

    Show files changed (5 files in total): 📝 4 modified | ➕ 1 added | ➖ 0 deleted
    File Status
    genindex.html 📝 modified
    stream_dag.html ➕ added
    classes/singer_sdk.SQLTap.html 📝 modified
    classes/singer_sdk.Stream.html 📝 modified
    classes/singer_sdk.Tap.html 📝 modified

    Codecov Report

    :x: Patch coverage is 54.92424% with 119 lines in your changes missing coverage. Please review. :white_check_mark: Project coverage is 91.18%. Comparing base (f8b69b5) to head (7cb1fe5).

    Files with missing lines Patch % Lines
    singer_sdk/helpers/dag_examples.py 0.00% 82 Missing :warning:
    singer_sdk/helpers/stream_dag.py 77.35% 23 Missing and 13 partials :warning:
    singer_sdk/tap_base.py 91.66% 0 Missing and 1 partial :warning:

    :x: Your patch check has failed because the patch coverage (54.92%) is below the target coverage (100.00%). You can increase the patch coverage or adjust the target coverage.

    Additional details and impacted files
    @@            Coverage Diff             @@
    ##             main    #3259      +/-   ##
    ==========================================
    - Coverage   92.93%   91.18%   -1.76%     
    ==========================================
      Files          63       65       +2     
      Lines        5453     5717     +264     
      Branches      682      728      +46     
    ==========================================
    + Hits         5068     5213     +145     
    - Misses        280      385     +105     
    - Partials      105      119      +14     
    
    Flag Coverage Δ
    core 78.10% <54.92%> (-1.09%) :arrow_down:
    end-to-end 74.91% <23.10%> (-2.51%) :arrow_down:
    optional-components 42.18% <21.59%> (-1.00%) :arrow_down:

    Flags with carried forward coverage won't be shown. Click here to find out more.

    :umbrella: View full report in Codecov by Sentry.
    :loudspeaker: Have feedback on the report? Share it here.

    :rocket: New features to boost your workflow:
    • :snowflake: Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

    codecov[bot] avatar Sep 03 '25 19:09 codecov[bot]

    CodSpeed Performance Report

    Merging #3259 will not alter performance

    Comparing feat/sdk-stream-dag (7cb1fe5) with main (b8e0d5b)[^unexpected-base] [^unexpected-base]: No successful run was found on main (f8b69b5) during the generation of this report, so b8e0d5b was used instead as the comparison base. There might be some changes unrelated to this pull request in this report.

    Summary

    ✅ 8 untouched benchmarks

    codspeed-hq[bot] avatar Sep 03 '25 19:09 codspeed-hq[bot]