beam
beam copied to clipboard
[Bug]: Schema inference is non-deterministic
What happened?
When using the inference of schema with a POJO, a Java Bean or an AutoValue class, the order of the fields in the schema may change during the inference process. This can cause issues with updates in the runners that support them (e.g. Cloud Dataflow).
Because the schemas are different (same fields, but in different order), the coders used for those PCollection
s will be different too, and reading the data from the previous job's checkpoint will fail. The job will process new elements fine, but the old checkpoint elements will throw decode errors.
Example of the same AutoValue class with two different runs, omitting the fields part for brevity. In the first run, this is the inferred schema:
[...]
Encoding positions:
{sequenceNumber=12, destination=11, messageId=7, receiveTimestamp=5, priority=1, senderTimestamp=6, redelivered=4, timeToLive=3, payload=2, replyTo=8, expiration=10, replicationGroupMessageId=9, properties=0}
[...]
In the second run, no changes in the code, but the code is recompiled again, this is the schema:
[...]
Encoding positions:
{sequenceNumber=12, destination=11, messageId=8, receiveTimestamp=6, priority=1, senderTimestamp=7, redelivered=5, timeToLive=3, payload=2, replyTo=9, expiration=10, replicationGroupMessageId=4, properties=0}
[...]
Notice how for instance messageId
is detected as the 7th field in the first schema, and the 8th in the second.
The problem seems to be in the usage of Class.getDeclaredMethods
in the reflect utils.
The documentation of getDeclaredMethods
(doc for Java 8, doc for Java 17) says that: "The elements in the returned array are not sorted and are not in any particular order.".
So in summary, the schema inference is non-deterministic.
As a workaround, you can use the annotation @SchemaFieldNumber
to override the order of the fields, but it is far from obvious that if you create a data class without changing the order of fields in the code, the order may not always be the same.
So I would suggest for instance ordering the getter methods lexicographically (which is deterministic across runs using the same code), and clearly adding that to the documentation. The main drawback would be that adding a field, even if it is nullable, would change the order if fields are sorted lexicographically. But if this clearly documented, users can always add the @SchemaFieldNumber
to the new fields, to ensure they are appended at the end of the fields positions.
Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
Issue Components
- [ ] Component: Python SDK
- [X] Component: Java SDK
- [ ] Component: Go SDK
- [ ] Component: Typescript SDK
- [ ] Component: IO connector
- [ ] Component: Beam YAML
- [ ] Component: Beam examples
- [ ] Component: Beam playground
- [ ] Component: Beam katas
- [ ] Component: Website
- [ ] Component: Spark Runner
- [ ] Component: Flink Runner
- [ ] Component: Samza Runner
- [ ] Component: Twister2 Runner
- [ ] Component: Hazelcast Jet Runner
- [ ] Component: Google Cloud Dataflow Runner
I think @reuvenlax did some work to deal with this before.
Sorting doesn't help here as a common use case is to add a new field on update. Dataflow at least should be able to handle schema update, as long as the schema is on a PCollection.