beam
beam copied to clipboard
[Bug]: Reject illformed GBK Coders
What happened?
p.apply(
Create.of(KV.of("X", "Y"))
.withCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())))
.apply(GroupByKey.create())
.setCoder(
SerializableCoder.of(
TypeDescriptors.kvs(
TypeDescriptors.strings(),
TypeDescriptors.iterables(TypeDescriptors.strings()))));
The pipeline above exemplifies how it is, at the moment, acceptable to author pipelines with opaque input/output GBK coders whose semantics clearly violate that of Beam's programming model. Pipelines of this kind should be rejected by SDKs during pipeline creation time itself.
Issue Priority
Priority: 1
Issue Component
Component: sdk-java-core
I'd suggest to initiate a discussion on [email protected] for wider audience.
I think this is probably P2 only because most users are not going in and manually overriding coders for primitive operations. Totally agree with the issue though.
This also affects the python SDK and appears to be much easier to do there with type hints.
Recent changes on java SDK for coder validation are affecting scio: https://github.com/spotify/scio/pull/4549
In scio, we wrap coders into a thin layer able to reference user code in the exception stack trace upon failure.
Here, as you are checking strict equality, users MUST use beam's KVCoder
and IterableCoder
which makes things difficult for customization
Ah, interesting. The reason for it is that a runner needs to be able to understand the format in order to handle the element for GBK and state operations, and is expected to output from GBK in a standard format. Is there a way to have the wrapper in the Java layer but move the check to the portability layer perhaps?
The error seen in SCIO was:
input: KV(MaterializedCoder(StringUtf8Coder), MaterializedCoder(VarIntCoder)) expected output KV(MaterializedCoder(StringUtf8Coder), Iterable(MaterializedCoder(VarIntCoder)) actual output KV(MaterializedCoder(StringUtf8Coder), MaterializedCoder(Iterable(VarIntCoder))
It seems as though there is logic already in SCIO to handle the case where the input/output is a KvCoder and is not wrapped with a MaterializedCoder. I suggested on the SCIO issue that the logic be extended for the value coder on the output PCollection to wrap the Iterable.