astronomer-providers
astronomer-providers copied to clipboard
POC Big Query polling ping-pong
Instead of running a loop in the trigger, this essentially "uses" the triggerer process as the loop. The idea is:
- The operator gets deferred after submitting the query.
- The trigger waits and checks the result once and reports via an event
- The operator is resumed by the event to check the status 3a. If the job is reported as still running, defer to run the trigger again. Go back to 2. 3b. If the job is done (success or error), return or raise.
This kind of achieves the ping-pong effect and emit the relevant logs in the operator's worker process, instead of in the triggerer process. The repeated triggering would probably have some overhead, but should be OK since everything is async.
Test failures are expected for now; we need to change all the BQ operators after (if) we decide this is a good idea and rewrite the test.
Test failures are expected for now; we need to change all the BQ operators after (if) we decide this is a good idea and rewrite the test.
@uranusjr Just thinking if we can come up with a solution that would work across most of the operators rather than specific ones?
Yes that’s what I plan (to try) if this is considered a good idea. We should be able to extract most of the logic to a common Trigger class and an Operator mixin, so any class can do
class MyOperator(AsyncProgressMixin, ...):
def execute(self, ...):
...
self.defer(trigger=MyTrigger(...))
def execute_progress(self, ...):
# Not sure what is needed yet...
class MyTrigger(AsyncProgressTrigger):
# Not sure what is needed yet...
We have a lot of async operators to try this structure out 🙂
Yes that’s what I plan (to try) if this is considered a good idea. We should be able to extract most of the logic to a common Trigger class and an Operator mixin, so any class can do
class MyOperator(AsyncProgressMixin, ...): def execute(self, ...): ... self.defer(trigger=MyTrigger(...)) def execute_progress(self, ...): # Not sure what is needed yet... class MyTrigger(AsyncProgressTrigger): # Not sure what is needed yet...
We have a lot of async operators to try this structure out 🙂
@uranusjr excited to see what the template (methods, object interaction etc) for AsyncProgressTrigger
and AsyncProgressMixin
would look like. We will have to return/yield from trigger to operator execute_complete method display log and re-defer if needed.
I pushed an experiemental implementation to extract the ping-pong logic into common base classes. The implementation is only used in BigQueryInsertJobOperatorAsync
for the moment.
Codecov Report
Base: 98.48% // Head: 98.37% // Decreases project coverage by -0.10%
:warning:
Coverage data is based on head (
fa12553
) compared to base (3525f20
). Patch coverage: 92.13% of modified lines in pull request are covered.
:exclamation: Current head fa12553 differs from pull request most recent head 9c87453. Consider uploading reports for the commit 9c87453 to get more accurate results
Additional details and impacted files
@@ Coverage Diff @@
## main #716 +/- ##
==========================================
- Coverage 98.48% 98.37% -0.11%
==========================================
Files 86 86
Lines 4626 4623 -3
==========================================
- Hits 4556 4548 -8
- Misses 70 75 +5
Impacted Files | Coverage Δ | |
---|---|---|
...nomer/providers/google/cloud/operators/bigquery.py | 94.16% <87.50%> (-5.84%) |
:arrow_down: |
...onomer/providers/google/cloud/triggers/bigquery.py | 94.35% <95.91%> (+0.50%) |
:arrow_up: |
Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here.
:umbrella: View full report at Codecov.
:loudspeaker: Do you have feedback about the report comment? Let us know in this issue.
Closing as this was a POC for the ping-pong functionality.Will use it as a reference for future implementations.