[Bug]: Concurrent exception with ReadFromKafka YAML transform on PrismRunner
What happened?
pipeline:
type: chain
transforms:
- type: ReadFromKafka
name: ReadFromMyTopic
config:
format: JSON
schema: |
{
"type": "object",
"properties": {
"value": {"type": "string"}
}
}
topic: test
bootstrap_servers: kafka:9092
auto_offset_reset_config: earliest
- type: LogForTesting
Run this pipeline (Beam version 2.65.0) on prism runner
python -m apache_beam.yaml.main \
--yaml_pipeline_file=sdks/python/apache_beam/yaml/examples/transforms/io/kafka_read.yaml \
--runner PrismRunner
... gives java.util.ConcurrentModificationException:
RuntimeError: Pipeline job-001 failed in state FAILED: bundle inst004 stage-002 failed:org.apache.beam.sdk.util.UserCodeException: java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access. currentThread(name: pool-2-thread-6, id: 33) otherThread(id: 32)
at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
at org.apache.beam.sdk.io.kafka.ReadFromKafkaDoFn$Unbounded$DoFnInvoker.invokeProcessElement(Unknown Source)
at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForWindowObservingSizedElementAndRestriction(FnApiDoFnRunner.java:1123)
at org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:145)
at org.apache.beam.fn.harness.FnApiDoFnRunner$4.accept(FnApiDoFnRunner.java:658)
at org.apache.beam.fn.harness.FnApiDoFnRunner$4.accept(FnApiDoFnRunner.java:653)
at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:348)
at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:275)
at org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:211)
at org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.multiplexElements(BeamFnDataInboundObserver.java:231)
at org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.awaitCompletion(BeamFnDataInboundObserver.java:185)
at org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:543)
at org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:150)
at org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:115)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:163)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access. currentThread(name: pool-2-thread-6, id: 33) otherThread(id: 32)
at org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.acquire(ClassicKafkaConsumer.java:1232)
at org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.acquireAndEnsureOpen(ClassicKafkaConsumer.java:1213)
at org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.resume(ClassicKafkaConsumer.java:965)
at org.apache.kafka.clients.consumer.KafkaConsumer.resume(KafkaConsumer.java:1524)
at org.apache.beam.sdk.io.kafka.ReadFromKafkaDoFn.processElement(ReadFromKafkaDoFn.java:593)
Regardless if there's data read or not from the topic, it will give this exception.
Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
Issue Components
- [ ] Component: Python SDK
- [x] Component: Java SDK
- [ ] Component: Go SDK
- [ ] Component: Typescript SDK
- [x] Component: IO connector
- [x] Component: Beam YAML
- [ ] Component: Beam examples
- [ ] Component: Beam playground
- [ ] Component: Beam katas
- [ ] Component: Website
- [ ] Component: Infrastructure
- [ ] Component: Spark Runner
- [ ] Component: Flink Runner
- [ ] Component: Samza Runner
- [ ] Component: Twister2 Runner
- [ ] Component: Hazelcast Jet Runner
- [ ] Component: Google Cloud Dataflow Runner
cc @chamikaramj
That's unfortunate and probably related to #34331, but I haven't experienced any ConcurrentModificationExceptions in tests or on Dataflow.
That's unfortunate and probably related to #34331, but I haven't experienced any
ConcurrentModificationExceptions in tests or on Dataflow.
Yes it works fine on Dataflow for me as well... I just want to bring up this issue for prism runner only
This issue has been marked as stale due to 150 days of inactivity. It will be closed in 30 days if no further activity occurs. If you think that’s incorrect or this issue still needs to be addressed, please simply write any comment. If closed, you can reopen the issue at any time. Thank you for your contributions.
Closing this as completed by https://github.com/apache/beam/pull/36512