beam icon indicating copy to clipboard operation
beam copied to clipboard

[Bug]: Reject illformed GBK Coders

Open pskevin opened this issue 2 years ago • 3 comments

What happened?

p.apply(
        Create.of(KV.of("X", "Y"))
                .withCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())))
        .apply(GroupByKey.create())
        .setCoder(
            SerializableCoder.of(
                TypeDescriptors.kvs(
                    TypeDescriptors.strings(),
                    TypeDescriptors.iterables(TypeDescriptors.strings()))));

The pipeline above exemplifies how it is, at the moment, acceptable to author pipelines with opaque input/output GBK coders whose semantics clearly violate that of Beam's programming model. Pipelines of this kind should be rejected by SDKs during pipeline creation time itself.

Issue Priority

Priority: 1

Issue Component

Component: sdk-java-core

pskevin avatar Jun 17 '22 23:06 pskevin

I'd suggest to initiate a discussion on [email protected] for wider audience.

manuzhang avatar Jul 09 '22 07:07 manuzhang

I think this is probably P2 only because most users are not going in and manually overriding coders for primitive operations. Totally agree with the issue though.

kennknowles avatar Jul 27 '22 16:07 kennknowles

This also affects the python SDK and appears to be much easier to do there with type hints.

apilloud avatar Aug 05 '22 19:08 apilloud

Recent changes on java SDK for coder validation are affecting scio: https://github.com/spotify/scio/pull/4549

In scio, we wrap coders into a thin layer able to reference user code in the exception stack trace upon failure. Here, as you are checking strict equality, users MUST use beam's KVCoder and IterableCoder which makes things difficult for customization

RustedBones avatar Oct 21 '22 13:10 RustedBones

Ah, interesting. The reason for it is that a runner needs to be able to understand the format in order to handle the element for GBK and state operations, and is expected to output from GBK in a standard format. Is there a way to have the wrapper in the Java layer but move the check to the portability layer perhaps?

kennknowles avatar Oct 21 '22 22:10 kennknowles

The error seen in SCIO was:

input: KV(MaterializedCoder(StringUtf8Coder), MaterializedCoder(VarIntCoder)) expected output KV(MaterializedCoder(StringUtf8Coder), Iterable(MaterializedCoder(VarIntCoder)) actual output KV(MaterializedCoder(StringUtf8Coder), MaterializedCoder(Iterable(VarIntCoder))

It seems as though there is logic already in SCIO to handle the case where the input/output is a KvCoder and is not wrapped with a MaterializedCoder. I suggested on the SCIO issue that the logic be extended for the value coder on the output PCollection to wrap the Iterable.

lukecwik avatar Oct 24 '22 18:10 lukecwik