beam icon indicating copy to clipboard operation
beam copied to clipboard

[Bug]: windmillServiceCommitThreads option can lead to ConcurrentModificationException and stuck commits

Open scwhittle opened this issue 1 year ago • 0 comments

What happened?

Part of refactoring of the commit path for StreamingDataflowWorker introduced a bug when the number of commit threads is increased via windmillServiceCommitThreads option. Previously we created a separate commit stream for each thread. Now the StreamingEngineWorkCommitter starts multiple threads that use a shared stream cache for commits. The batching/flushing of commits is not-threadsafe within the commit stream, so if the same stream is vended by the cache for multiple threads (guaranteed now since cache size is 1 but could occur with larger cache as well as it vends randomly) concurrent modifications [1] can occur on the map within the batcher for the stream. This leads to commit request being lost and then later stuck commits are detected and logged and recovered [2].

Possible fixes:

  • easiest is probably to just create a separate cache per commit thread started
  • modify GrpcCommitWorkStream to be threadsafe (and increase streampool cache size)
  • add support to the stream pool to allow exclusive access to streams

[1]

java.util.ConcurrentModificationException
at java.util.HashMap$HashIterator.nextNode(HashMap.java:1469)
at java.util.HashMap$EntryIterator.next(HashMap.java:1503)
at java.util.HashMap$EntryIterator.next(HashMap.java:1501)
at org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcCommitWorkStream.issueBatchedRequest(GrpcCommitWorkStream.java:233)
at org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcCommitWorkStream.flushInternal(GrpcCommitWorkStream.java:207)
at org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcCommitWorkStream.access$500(GrpcCommitWorkStream.java:46)
at org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcCommitWorkStream$Batcher.flush(GrpcCommitWorkStream.java:331)
at org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcCommitWorkStream.flush(GrpcCommitWorkStream.java:191)
at org.apache.beam.runners.dataflow.worker.windmill.client.commits.StreamingEngineWorkCommitter.streamingCommitLoop(StreamingEngineWorkCommitter.java:165)

[2] Detected key 82e0000000000001 stuck in COMMITTING state since 2024-05-09T20:40:41.538Z, completing it with error.

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • [ ] Component: Python SDK
  • [ ] Component: Java SDK
  • [ ] Component: Go SDK
  • [ ] Component: Typescript SDK
  • [ ] Component: IO connector
  • [ ] Component: Beam YAML
  • [ ] Component: Beam examples
  • [ ] Component: Beam playground
  • [ ] Component: Beam katas
  • [ ] Component: Website
  • [ ] Component: Spark Runner
  • [ ] Component: Flink Runner
  • [ ] Component: Samza Runner
  • [ ] Component: Twister2 Runner
  • [ ] Component: Hazelcast Jet Runner
  • [X] Component: Google Cloud Dataflow Runner

scwhittle avatar May 15 '24 08:05 scwhittle