[Bug]: Dataflow worker retries pipeline if FinishBundle blocks for too long
What happened?
While performing load tests for the FhirIO Import transform I implemented in #22460 for the Go SDK, I noticed that after the resources were successfully imported, the pipeline would restart from the beginning. It would do this 4 times until the pipeline fails with the error: The job failed because a work item has failed 4 times. Root cause: The worker lost contact with the service.
My theory is that because the import request performed inside FinishBundle takes several minutes to complete (since it is importing millions of resources), the runner thinks it failed and then it retries.
The reason for filing this issue is because a blocking call that takes the same amount of time does not cause this retry loop failure and completes successfully in the Java SDK, so there's a discrepancy there. Also because using a long-running FinishBundle method is a Beam design pattern that is widespread and even considered a best practice as described here.
The issue can be reproduced with a dummy pipeline that just sleeps for 20 minutes inside FinishBundle or any other DoFn lifecycle method. I made these dummy pipelines available so others can easily reproduce the issue themselves: https://github.com/apache/beam/compare/master...lnogueir:beam:dataflowWorkerLiveness
Instructions on how to execute them are commented on the files in the link above.
I have validated that the Java SDK doesn't experience this issue on neither runner V1 nor V2.
CC: @lostluck @jrmccluskey @msbukal @youngoli
Issue Priority
Priority: 2
Issue Component
Component: io-go-gcp
I'll note that that advice is for streaming pipelines, not batch pipelines.
The code should still institute a cap to the batch sizes (where it then makes the request, and then starts filling another batch), as a single "bundle" could be billions of eleemnts in batch contexts, while they tend to be about a dozen in streaming ones.
Essentially, waiting until FinishBundle means that the actually batch sizes are dictated unknowingly by the runner. Better to set hard caps and self batch a little first (either by end RPC size if constructed in progress, or by simple count cap if per element size adds up). The goal is always to get the task done, not to simply rote follow
That said, it is odd if the Java side is doing the same approach exactly with FinishBundle, and not failing. That's worth keeping the issue around even after implementing batching in the code.
The worst thing would be that the Java SDK has as different "keep alive" behavior than the Go SDK. This should be handled in the FnAPI with the bundle progress responses.
But that seems unlikely. Given that the issue can be worked around by adjusting the DoFn, this should remain at most a P2 until further investigation indicates otherwise.