[Bug]: Coder cannot be resolved after `WithKeys.of().withKeyType()` for @DefaultSchema(JavaBeanSchema.class) types
What happened?
When using WithKeys.of(<lambda>).withKeyType(TypeDescriptor.of(TypeWithSchema.class) for types annotated with @DefaultSchema(JavaBeanSchema.class), Beam is not able to resolve the coder for the type, despite the schema being available.
See also https://stackoverflow.com/questions/77575574/unable-to-return-a-default-coder-for-withkeys-when-using-defaultschemajavabean
In particular, the docs say:
A PCollection with a schema does not need to have a Coder specified, as Beam knows how to encode and decode Schema rows; Beam uses a special coder to encode schema types.
However, when using such a type with WithKeys, I consistently get this exception:
java.lang.IllegalStateException: Unable to return a default Coder for WithKeys/AddKeys/Map/ParMultiDo(Anonymous).output [PCollection@621486957]. Correct one of the following root causes:
No Coder has been manually specified; you may do so using .setCoder().
Inferring a Coder from the CoderRegistry failed: Cannot provide coder for parameterized type org.apache.beam.sdk.values.KV<K, V>: Unable to provide a Coder for K.
Building a Coder using a registered CoderProvider failed.
See suppressed exceptions for detailed failures.
Using the default output Coder from the producing PTransform failed: PTransform.getOutputCoder called.
Example POJO:
@DefaultSchema(JavaBeanSchema.class)
public class Pojo {
private final long num;
private final String str;
@SchemaCreate
public Pojo(long num, String str) {
this.num = num;
this.str = str;
}
public long getNum() {
return this.num;
}
public String getStr() {
return this.str;
}
@Override
public boolean equals(final Object o) {
...
}
@Override
public int hashCode() {
...
}
}
Example pipeline which generates this error:
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
Pipeline p = Pipeline.create(options);
PCollection<String> a =
p.apply(Create.of(List.of("1", "2", "3")).withType(TypeDescriptors.strings()));
PCollection<String> b =
p.apply(Create.of(List.of("2", "3", "4")).withType(TypeDescriptors.strings()));
PCollection<KV<Pojo, String>> a1 =
a.apply(
WithKeys.<Pojo, String>of(v -> new Pojo(1, v))
.withKeyType(TypeDescriptor.of(Pojo.class)));
PCollection<KV<Pojo, String>> b1 =
b.apply(
WithKeys.<Pojo, String>of(v -> new Pojo(1, v))
.withKeyType(TypeDescriptor.of(Pojo.class)));
var aTag = new TupleTag<String>();
var bTag = new TupleTag<String>();
KeyedPCollectionTuple.of(aTag, a1) // <== Exception here (complaining about a1)
.and(bTag, b1)
.apply(CoGroupByKey.create())
.apply(
ParDo.of(
new DoFn<KV<Pojo, CoGbkResult>, Integer>() {
@ProcessElement
public void processElement(ProcessContext context) {
var element = context.element();
String a = element.getValue().getOnly(aTag, null);
String b = element.getValue().getOnly(bTag, null);
context.output(a != null && b != null ? 1 : 0);
}
}));
}
Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
Issue Components
- [ ] Component: Python SDK
- [X] Component: Java SDK
- [ ] Component: Go SDK
- [ ] Component: Typescript SDK
- [ ] Component: IO connector
- [ ] Component: Beam YAML
- [ ] Component: Beam examples
- [ ] Component: Beam playground
- [ ] Component: Beam katas
- [ ] Component: Website
- [ ] Component: Spark Runner
- [ ] Component: Flink Runner
- [ ] Component: Samza Runner
- [ ] Component: Twister2 Runner
- [ ] Component: Hazelcast Jet Runner
- [ ] Component: Google Cloud Dataflow Runner
I tried a few things since this appeared similar to an issue I was experiencing. It turned out my issue was not similar, but I'll share the mods that ended up working.
- Made Pojo implement Serializable => Got past initial exception (new exception is expected).
java.lang.IllegalStateException: the keyCoder of a GroupByKey must be deterministic
...
Caused by: org.apache.beam.sdk.coders.Coder$NonDeterministicException: SerializableCoder(GithubIssue29577$Pojo) is not deterministic because:
- Registered a Coder manually
CoderRegistry coderRegistry = p.getCoderRegistry();
SchemaRegistry schemaRegistry = p.getSchemaRegistry();
coderRegistry.registerCoderForClass(Pojo.class, schemaRegistry.getSchemaCoder(Pojo.class));
The issue appears to be that the Coder for the corresponding Schema at runtime (https://stackoverflow.com/a/70352977) but this is not sufficient for your code because the exception is happening at build time (there isn't even a Pipeline.run statement).
.take-issue