[Bug]: windmillServiceCommitThreads option can lead to ConcurrentModificationException and stuck commits
What happened?
Part of refactoring of the commit path for StreamingDataflowWorker introduced a bug when the number of commit threads is increased via windmillServiceCommitThreads option.
Previously we created a separate commit stream for each thread. Now the StreamingEngineWorkCommitter starts multiple threads that use a shared stream cache for commits. The batching/flushing of commits is not-threadsafe within the commit stream, so if the same stream is vended by the cache for multiple threads (guaranteed now since cache size is 1 but could occur with larger cache as well as it vends randomly) concurrent modifications [1] can occur on the map within the batcher for the stream. This leads to commit request being lost and then later stuck commits are detected and logged and recovered [2].
Possible fixes:
- easiest is probably to just create a separate cache per commit thread started
- modify GrpcCommitWorkStream to be threadsafe (and increase streampool cache size)
- add support to the stream pool to allow exclusive access to streams
[1]
java.util.ConcurrentModificationException
at java.util.HashMap$HashIterator.nextNode(HashMap.java:1469)
at java.util.HashMap$EntryIterator.next(HashMap.java:1503)
at java.util.HashMap$EntryIterator.next(HashMap.java:1501)
at org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcCommitWorkStream.issueBatchedRequest(GrpcCommitWorkStream.java:233)
at org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcCommitWorkStream.flushInternal(GrpcCommitWorkStream.java:207)
at org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcCommitWorkStream.access$500(GrpcCommitWorkStream.java:46)
at org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcCommitWorkStream$Batcher.flush(GrpcCommitWorkStream.java:331)
at org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcCommitWorkStream.flush(GrpcCommitWorkStream.java:191)
at org.apache.beam.runners.dataflow.worker.windmill.client.commits.StreamingEngineWorkCommitter.streamingCommitLoop(StreamingEngineWorkCommitter.java:165)
[2] Detected key 82e0000000000001 stuck in COMMITTING state since 2024-05-09T20:40:41.538Z, completing it with error.
Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
Issue Components
- [ ] Component: Python SDK
- [ ] Component: Java SDK
- [ ] Component: Go SDK
- [ ] Component: Typescript SDK
- [ ] Component: IO connector
- [ ] Component: Beam YAML
- [ ] Component: Beam examples
- [ ] Component: Beam playground
- [ ] Component: Beam katas
- [ ] Component: Website
- [ ] Component: Spark Runner
- [ ] Component: Flink Runner
- [ ] Component: Samza Runner
- [ ] Component: Twister2 Runner
- [ ] Component: Hazelcast Jet Runner
- [X] Component: Google Cloud Dataflow Runner