beam icon indicating copy to clipboard operation
beam copied to clipboard

Add PeriodicStream in the new time series folder.

Open shunping opened this issue 6 months ago • 1 comments

Please add a meaningful description for your change here


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • [ ] Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • [ ] Update CHANGES.md with noteworthy changes.
  • [ ] If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels Python tests Java tests Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

shunping avatar Jun 16 '25 03:06 shunping

Assigning reviewers:

R: @claudevdm for label python.

Note: If you would like to opt out of this review, comment assign to next reviewer.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

github-actions[bot] avatar Jun 16 '25 07:06 github-actions[bot]

I just came across this code and I have a couple of concerns:

  1. Why are we creating a new transform instead of extending periodicImpulse? It seems like the same goals could be pretty easily accomplished by attaching data as an option to ImpulseSeqGenDoFn. There is not really anything ml specific to this transform, it could probably be broadly useful for testing. We actually already see this in https://github.com/apache/beam/blob/4c1c2daa17d508afd2f94bffe83ed61ec9ccec8d/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py#L1267 - it is really odd to have fn_api_runner tests depending on an ML package
  2. Using time like this in testing is quite fragile - https://github.com/apache/beam/blob/4c1c2daa17d508afd2f94bffe83ed61ec9ccec8d/sdks/python/apache_beam/ml/ts/util_test.py#L44 - if a runner is slower or there is a lot of contention (running tests in parallel), this will fail. It already fails on prism. We should instead be inspecting element timestamps for correctness.
  3. There are actual some regressions in functionality from PeriodicImpulse which seem intentional (e.g. not exposing a start time - https://github.com/apache/beam/blob/4c1c2daa17d508afd2f94bffe83ed61ec9ccec8d/sdks/python/apache_beam/ml/ts/util.py#L181). That further diverges us and doesn't seem useful.

@shunping I think we should back this out and move the functionality to PeriodicImpulse

damccorm avatar Jun 23 '25 18:06 damccorm

I just came across this code and I have a couple of concerns:

  1. Why are we creating a new transform instead of extending periodicImpulse? It seems like the same goals could be pretty easily accomplished by attaching data as an option to ImpulseSeqGenDoFn. There is not really anything ml specific to this transform, it could probably be broadly useful for testing. We actually already see this in https://github.com/apache/beam/blob/4c1c2daa17d508afd2f94bffe83ed61ec9ccec8d/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py#L1267

Extending PeriodicImpulse was actually my first thought. However, I notice that it is used extensively in IOs so I hesitate to modify the code to change its behavior or affect its performance. Also, as its name suggests, it may be cleaner if we keep PeriodicImpulse as it is to give impulse (timestamp) to the whole pipeline. These are the reason I started a new PTransform, and at the same time reused the SDF detail from ImpulseSeqGenDoFn as much as possible.

- it is really odd to have fn_api_runner tests depending on an ML package

+1 on this.

  1. Using time like this in testing is quite fragile - https://github.com/apache/beam/blob/4c1c2daa17d508afd2f94bffe83ed61ec9ccec8d/sdks/python/apache_beam/ml/ts/util_test.py#L44
    • if a runner is slower or there is a lot of contention (running tests in parallel), this will fail. It already fails on prism. We should instead be inspecting element timestamps for correctness.

That's a good point. Will update the tests.

  1. There are actual some regressions in functionality from PeriodicImpulse which seem intentional (e.g. not exposing a start time - https://github.com/apache/beam/blob/4c1c2daa17d508afd2f94bffe83ed61ec9ccec8d/sdks/python/apache_beam/ml/ts/util.py#L181 ). That further diverges us and doesn't seem useful.

@damccorm: Could you explain a bit more on this? Yes, the start time is not exposed in the new PTransform, because the current catch-up behavior on PeriodicImpulse is a bit unintuitive for testing.

@shunping I think we should back this out and move the functionality to PeriodicImpulse Yes, let me revert it and address the previous items.

shunping avatar Jun 23 '25 21:06 shunping