beam icon indicating copy to clipboard operation
beam copied to clipboard

[Bug]: Coder cannot be resolved after `WithKeys.of().withKeyType()` for @DefaultSchema(JavaBeanSchema.class) types

Open RonBarkan opened this issue 2 years ago • 2 comments

What happened?

When using WithKeys.of(<lambda>).withKeyType(TypeDescriptor.of(TypeWithSchema.class) for types annotated with @DefaultSchema(JavaBeanSchema.class), Beam is not able to resolve the coder for the type, despite the schema being available.

See also https://stackoverflow.com/questions/77575574/unable-to-return-a-default-coder-for-withkeys-when-using-defaultschemajavabean

In particular, the docs say:

A PCollection with a schema does not need to have a Coder specified, as Beam knows how to encode and decode Schema rows; Beam uses a special coder to encode schema types.

However, when using such a type with WithKeys, I consistently get this exception:

java.lang.IllegalStateException: Unable to return a default Coder for WithKeys/AddKeys/Map/ParMultiDo(Anonymous).output [PCollection@621486957]. Correct one of the following root causes:
  No Coder has been manually specified;  you may do so using .setCoder().
  Inferring a Coder from the CoderRegistry failed: Cannot provide coder for parameterized type org.apache.beam.sdk.values.KV<K, V>: Unable to provide a Coder for K.
  Building a Coder using a registered CoderProvider failed.
  See suppressed exceptions for detailed failures.
  Using the default output Coder from the producing PTransform failed: PTransform.getOutputCoder called.

Example POJO:

@DefaultSchema(JavaBeanSchema.class)
public class Pojo {
  private final long num;
  private final String str;

  @SchemaCreate
  public Pojo(long num, String str) {
    this.num = num;
    this.str = str;
  }

  public long getNum() {
    return this.num;
  }

  public String getStr() {
    return this.str;
  }

  @Override
  public boolean equals(final Object o) {
     ...
  }

  @Override
  public int hashCode() {
    ...
  }
}

Example pipeline which generates this error:

 public static void main(String[] args) {
    PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
    Pipeline p = Pipeline.create(options);

    PCollection<String> a =
        p.apply(Create.of(List.of("1", "2", "3")).withType(TypeDescriptors.strings()));
    PCollection<String> b =
        p.apply(Create.of(List.of("2", "3", "4")).withType(TypeDescriptors.strings()));

    PCollection<KV<Pojo, String>> a1 =
        a.apply(
            WithKeys.<Pojo, String>of(v -> new Pojo(1, v))
                .withKeyType(TypeDescriptor.of(Pojo.class)));

    PCollection<KV<Pojo, String>> b1 =
        b.apply(
            WithKeys.<Pojo, String>of(v -> new Pojo(1, v))
                .withKeyType(TypeDescriptor.of(Pojo.class)));

    var aTag = new TupleTag<String>();
    var bTag = new TupleTag<String>();
    KeyedPCollectionTuple.of(aTag, a1)  // <== Exception here (complaining about a1)
        .and(bTag, b1)
        .apply(CoGroupByKey.create())
        .apply(
            ParDo.of(
                new DoFn<KV<Pojo, CoGbkResult>, Integer>() {
                  @ProcessElement
                  public void processElement(ProcessContext context) {
                    var element = context.element();
                    String a = element.getValue().getOnly(aTag, null);
                    String b = element.getValue().getOnly(bTag, null);
                    context.output(a != null && b != null ? 1 : 0);
                  }
                }));
  }

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • [ ] Component: Python SDK
  • [X] Component: Java SDK
  • [ ] Component: Go SDK
  • [ ] Component: Typescript SDK
  • [ ] Component: IO connector
  • [ ] Component: Beam YAML
  • [ ] Component: Beam examples
  • [ ] Component: Beam playground
  • [ ] Component: Beam katas
  • [ ] Component: Website
  • [ ] Component: Spark Runner
  • [ ] Component: Flink Runner
  • [ ] Component: Samza Runner
  • [ ] Component: Twister2 Runner
  • [ ] Component: Hazelcast Jet Runner
  • [ ] Component: Google Cloud Dataflow Runner

RonBarkan avatar Nov 30 '23 19:11 RonBarkan

I tried a few things since this appeared similar to an issue I was experiencing. It turned out my issue was not similar, but I'll share the mods that ended up working.

  1. Made Pojo implement Serializable => Got past initial exception (new exception is expected).
java.lang.IllegalStateException: the keyCoder of a GroupByKey must be deterministic
...
Caused by: org.apache.beam.sdk.coders.Coder$NonDeterministicException: SerializableCoder(GithubIssue29577$Pojo) is not deterministic because:
  1. Registered a Coder manually
       CoderRegistry coderRegistry = p.getCoderRegistry();
       SchemaRegistry schemaRegistry = p.getSchemaRegistry();
       coderRegistry.registerCoderForClass(Pojo.class, schemaRegistry.getSchemaCoder(Pojo.class));

The issue appears to be that the Coder for the corresponding Schema at runtime (https://stackoverflow.com/a/70352977) but this is not sufficient for your code because the exception is happening at build time (there isn't even a Pipeline.run statement).

mouyang avatar Jan 05 '24 19:01 mouyang

.take-issue

Amar3tto avatar Jun 28 '24 14:06 Amar3tto