beam icon indicating copy to clipboard operation
beam copied to clipboard

[Bug]: ReadFromKafka YAML transform cannot specify JSON schema

Open charlespnh opened this issue 6 months ago • 1 comments

What happened?

pipeline:
  type: chain
  transforms:
    - type: ReadFromKafka
      name: ReadFromMyTopic
      config:
        format: JSON
        schema:
          type: "object"
          properties:
            value: { type: "string" }
        topic: test
        bootstrap_servers: kafka:9092
        auto_offset_reset_config: earliest

    - type: LogForTesting

Running the pipeline (Beam version 2.65.0) gives the following error:

ValueError: Error applying transform "ReadFromMyTopic" at line 4: java.lang.IllegalArgumentException: Encountered an error when retrieving a configuration
        at org.apache.beam.sdk.managed.ManagedSchemaTransformProvider$ManagedSchemaTransform.<init>(ManagedSchemaTransformProvider.java:168)
        at org.apache.beam.sdk.managed.ManagedSchemaTransformProvider.from(ManagedSchemaTransformProvider.java:152)
        at org.apache.beam.sdk.managed.ManagedSchemaTransformProvider.from(ManagedSchemaTransformProvider.java:57)
        at org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider.from(TypedSchemaTransformProvider.java:111)
        at org.apache.beam.sdk.expansion.service.ExpansionServiceSchemaTransformProvider.getTransform(ExpansionServiceSchemaTransformProvider.java:137)
        at org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator.getTransform(ExpansionService.java:263)
        at org.apache.beam.sdk.expansion.service.TransformProvider.apply(TransformProvider.java:121)
        at org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:657)
        at org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:758)
        at org.apache.beam.model.expansion.v1.ExpansionServiceGrpc$MethodHandlers.invoke(ExpansionServiceGrpc.java:306)
        at org.apache.beam.vendor.grpc.v1p69p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182)
        at org.apache.beam.vendor.grpc.v1p69p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:356)
        at org.apache.beam.vendor.grpc.v1p69p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:861)
        at org.apache.beam.vendor.grpc.v1p69p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
        at org.apache.beam.vendor.grpc.v1p69p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.lang.UnsupportedOperationException: Converting YAML type 'class java.util.LinkedHashMap' to 'STRING' is not supported
        at org.apache.beam.sdk.schemas.utils.YamlUtils.toBeamValue(YamlUtils.java:147)
        at org.apache.beam.sdk.schemas.utils.YamlUtils.lambda$toBeamRow$5(YamlUtils.java:171)
        at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
        at java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:992)
        at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
        at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
        at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
        at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
        at org.apache.beam.sdk.schemas.utils.YamlUtils.toBeamRow(YamlUtils.java:173)
        at org.apache.beam.sdk.managed.ManagedSchemaTransformProvider.getRowConfig(ManagedSchemaTransformProvider.java:228)
        at org.apache.beam.sdk.managed.ManagedSchemaTransformProvider$ManagedSchemaTransform.<init>(ManagedSchemaTransformProvider.java:166)
        ... 17 more

However, if I specify the schema like

schema: |
   {
       "type": "object",
       "properties": {
          "value": {"type": "string"}
        }
   }

instead, then it works.

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • [ ] Component: Python SDK
  • [ ] Component: Java SDK
  • [ ] Component: Go SDK
  • [ ] Component: Typescript SDK
  • [ ] Component: IO connector
  • [x] Component: Beam YAML
  • [ ] Component: Beam examples
  • [ ] Component: Beam playground
  • [ ] Component: Beam katas
  • [ ] Component: Website
  • [ ] Component: Infrastructure
  • [ ] Component: Spark Runner
  • [ ] Component: Flink Runner
  • [ ] Component: Samza Runner
  • [ ] Component: Twister2 Runner
  • [ ] Component: Hazelcast Jet Runner
  • [ ] Component: Google Cloud Dataflow Runner

charlespnh avatar Jun 06 '25 15:06 charlespnh

cc @chamikaramj

charlespnh avatar Jun 06 '25 17:06 charlespnh

Schema parameter is expected to be a string in this context and your found solution is the correct way to handle. Its not ideal and I hit the same issue. Probably should update documentation noting this if not already.

derrickaw avatar Sep 25 '25 17:09 derrickaw

Thanks for following up on this. Just that it's a bit inconsistent when compared to e.g. ReadFromPubSub https://beam.apache.org/documentation/sdks/yaml/#windowing

charlespnh avatar Sep 25 '25 21:09 charlespnh

Thanks for following up on this. Just that it's a bit inconsistent when compared to e.g. ReadFromPubSub https://beam.apache.org/documentation/sdks/yaml/#windowing

I think this should be supported like ReadFromPubSub

liferoad avatar Sep 26 '25 16:09 liferoad