beam icon indicating copy to clipboard operation
beam copied to clipboard

[Bug]: Concurrent exception with ReadFromKafka YAML transform on PrismRunner

Open charlespnh opened this issue 6 months ago • 3 comments

What happened?

pipeline:
  type: chain
  transforms:
    - type: ReadFromKafka
      name: ReadFromMyTopic
      config:
        format: JSON
        schema: |
          {
            "type": "object",
            "properties": {
              "value": {"type": "string"}
            }
          }
        topic: test
        bootstrap_servers: kafka:9092
        auto_offset_reset_config: earliest

    - type: LogForTesting

Run this pipeline (Beam version 2.65.0) on prism runner

python -m apache_beam.yaml.main \
--yaml_pipeline_file=sdks/python/apache_beam/yaml/examples/transforms/io/kafka_read.yaml \
--runner PrismRunner

... gives java.util.ConcurrentModificationException:

RuntimeError: Pipeline job-001 failed in state FAILED: bundle inst004 stage-002 failed:org.apache.beam.sdk.util.UserCodeException: java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access. currentThread(name: pool-2-thread-6, id: 33) otherThread(id: 32)
        at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
        at org.apache.beam.sdk.io.kafka.ReadFromKafkaDoFn$Unbounded$DoFnInvoker.invokeProcessElement(Unknown Source)
        at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForWindowObservingSizedElementAndRestriction(FnApiDoFnRunner.java:1123)
        at org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:145)
        at org.apache.beam.fn.harness.FnApiDoFnRunner$4.accept(FnApiDoFnRunner.java:658)
        at org.apache.beam.fn.harness.FnApiDoFnRunner$4.accept(FnApiDoFnRunner.java:653)
        at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:348)
        at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:275)
        at org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:211)
        at org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.multiplexElements(BeamFnDataInboundObserver.java:231)
        at org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.awaitCompletion(BeamFnDataInboundObserver.java:185)
        at org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:543)
        at org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:150)
        at org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:115)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:163)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access. currentThread(name: pool-2-thread-6, id: 33) otherThread(id: 32)
        at org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.acquire(ClassicKafkaConsumer.java:1232)
        at org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.acquireAndEnsureOpen(ClassicKafkaConsumer.java:1213)
        at org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.resume(ClassicKafkaConsumer.java:965)
        at org.apache.kafka.clients.consumer.KafkaConsumer.resume(KafkaConsumer.java:1524)
        at org.apache.beam.sdk.io.kafka.ReadFromKafkaDoFn.processElement(ReadFromKafkaDoFn.java:593)

Regardless if there's data read or not from the topic, it will give this exception.

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • [ ] Component: Python SDK
  • [x] Component: Java SDK
  • [ ] Component: Go SDK
  • [ ] Component: Typescript SDK
  • [x] Component: IO connector
  • [x] Component: Beam YAML
  • [ ] Component: Beam examples
  • [ ] Component: Beam playground
  • [ ] Component: Beam katas
  • [ ] Component: Website
  • [ ] Component: Infrastructure
  • [ ] Component: Spark Runner
  • [ ] Component: Flink Runner
  • [ ] Component: Samza Runner
  • [ ] Component: Twister2 Runner
  • [ ] Component: Hazelcast Jet Runner
  • [ ] Component: Google Cloud Dataflow Runner

charlespnh avatar Jun 06 '25 15:06 charlespnh

cc @chamikaramj

charlespnh avatar Jun 06 '25 17:06 charlespnh

That's unfortunate and probably related to #34331, but I haven't experienced any ConcurrentModificationExceptions in tests or on Dataflow.

sjvanrossum avatar Jun 10 '25 21:06 sjvanrossum

That's unfortunate and probably related to #34331, but I haven't experienced any ConcurrentModificationExceptions in tests or on Dataflow.

Yes it works fine on Dataflow for me as well... I just want to bring up this issue for prism runner only

charlespnh avatar Jun 10 '25 22:06 charlespnh

This issue has been marked as stale due to 150 days of inactivity. It will be closed in 30 days if no further activity occurs. If you think that’s incorrect or this issue still needs to be addressed, please simply write any comment. If closed, you can reopen the issue at any time. Thank you for your contributions.

github-actions[bot] avatar Nov 08 '25 12:11 github-actions[bot]

Closing this as completed by https://github.com/apache/beam/pull/36512

charlespnh avatar Nov 14 '25 04:11 charlespnh