[Bug]: ReadFromKafka YAML transform cannot specify JSON schema
What happened?
pipeline:
type: chain
transforms:
- type: ReadFromKafka
name: ReadFromMyTopic
config:
format: JSON
schema:
type: "object"
properties:
value: { type: "string" }
topic: test
bootstrap_servers: kafka:9092
auto_offset_reset_config: earliest
- type: LogForTesting
Running the pipeline (Beam version 2.65.0) gives the following error:
ValueError: Error applying transform "ReadFromMyTopic" at line 4: java.lang.IllegalArgumentException: Encountered an error when retrieving a configuration
at org.apache.beam.sdk.managed.ManagedSchemaTransformProvider$ManagedSchemaTransform.<init>(ManagedSchemaTransformProvider.java:168)
at org.apache.beam.sdk.managed.ManagedSchemaTransformProvider.from(ManagedSchemaTransformProvider.java:152)
at org.apache.beam.sdk.managed.ManagedSchemaTransformProvider.from(ManagedSchemaTransformProvider.java:57)
at org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider.from(TypedSchemaTransformProvider.java:111)
at org.apache.beam.sdk.expansion.service.ExpansionServiceSchemaTransformProvider.getTransform(ExpansionServiceSchemaTransformProvider.java:137)
at org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator.getTransform(ExpansionService.java:263)
at org.apache.beam.sdk.expansion.service.TransformProvider.apply(TransformProvider.java:121)
at org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:657)
at org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:758)
at org.apache.beam.model.expansion.v1.ExpansionServiceGrpc$MethodHandlers.invoke(ExpansionServiceGrpc.java:306)
at org.apache.beam.vendor.grpc.v1p69p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182)
at org.apache.beam.vendor.grpc.v1p69p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:356)
at org.apache.beam.vendor.grpc.v1p69p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:861)
at org.apache.beam.vendor.grpc.v1p69p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at org.apache.beam.vendor.grpc.v1p69p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.lang.UnsupportedOperationException: Converting YAML type 'class java.util.LinkedHashMap' to 'STRING' is not supported
at org.apache.beam.sdk.schemas.utils.YamlUtils.toBeamValue(YamlUtils.java:147)
at org.apache.beam.sdk.schemas.utils.YamlUtils.lambda$toBeamRow$5(YamlUtils.java:171)
at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
at java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:992)
at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
at org.apache.beam.sdk.schemas.utils.YamlUtils.toBeamRow(YamlUtils.java:173)
at org.apache.beam.sdk.managed.ManagedSchemaTransformProvider.getRowConfig(ManagedSchemaTransformProvider.java:228)
at org.apache.beam.sdk.managed.ManagedSchemaTransformProvider$ManagedSchemaTransform.<init>(ManagedSchemaTransformProvider.java:166)
... 17 more
However, if I specify the schema like
schema: |
{
"type": "object",
"properties": {
"value": {"type": "string"}
}
}
instead, then it works.
Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
Issue Components
- [ ] Component: Python SDK
- [ ] Component: Java SDK
- [ ] Component: Go SDK
- [ ] Component: Typescript SDK
- [ ] Component: IO connector
- [x] Component: Beam YAML
- [ ] Component: Beam examples
- [ ] Component: Beam playground
- [ ] Component: Beam katas
- [ ] Component: Website
- [ ] Component: Infrastructure
- [ ] Component: Spark Runner
- [ ] Component: Flink Runner
- [ ] Component: Samza Runner
- [ ] Component: Twister2 Runner
- [ ] Component: Hazelcast Jet Runner
- [ ] Component: Google Cloud Dataflow Runner
cc @chamikaramj
Schema parameter is expected to be a string in this context and your found solution is the correct way to handle. Its not ideal and I hit the same issue. Probably should update documentation noting this if not already.
Thanks for following up on this. Just that it's a bit inconsistent when compared to e.g. ReadFromPubSub https://beam.apache.org/documentation/sdks/yaml/#windowing
Thanks for following up on this. Just that it's a bit inconsistent when compared to e.g. ReadFromPubSub https://beam.apache.org/documentation/sdks/yaml/#windowing
I think this should be supported like ReadFromPubSub