flink
flink copied to clipboard
[FLINK-37245] Prevent masking null values in BinaryRowData
RowData#createFieldGetter is the go-to method for creating field getters for a given field type and position. The method FieldGetter#getFieldOrNull suggests that null is returned if the field has been nulled. But that is not always the case.
When using BinaryRowData with a non-null field, which has been set to null, a call to FieldGetter#getFieldOrNull will return a non-null value, interpreting whatever bytes are backing the field as an actual value instead of null.
Example:
public static void main(String[] args) {
IntType nullableIntType = new IntType(true);
IntType nonNullableIntType = new IntType(false);
RowDataSerializer rowDataSerializer = new RowDataSerializer(
nullableIntType, nonNullableIntType
);
BinaryRowData binaryRow = rowDataSerializer.toBinaryRow(GenericRowData.of(null, null));
RowData.FieldGetter fieldGetter1 = RowData.createFieldGetter(nullableIntType, 0);
RowData.FieldGetter fieldGetter2 = RowData.createFieldGetter(nonNullableIntType, 1);
System.out.println(fieldGetter1.getFieldOrNull(binaryRow));
System.out.println(fieldGetter2.getFieldOrNull(binaryRow));
}
Output is:
null
0
The expected output would be that the second non-null field also returns null, or raises a NullPointerException directly. That's not the case because RowData#createFieldGetter only checks for null values (via a call to Rowdata#isNullAt(pos)) when the type is nullable (see https://github.com/apache/flink/blob/b86fdf906c06af8fc2841bca3d98dd3944bb5031/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/RowData.java#L289).
It seems fair to always check for null fields, instead of deferring this easy to forget check to the caller.
We needed to work around this bug in https://github.com/apache/iceberg/pull/12049.
CI report:
- 705558f6263fb0cee6b67b72a492a086b2585410 Azure: SUCCESS
Bot commands
The @flinkbot bot supports the following commands:@flinkbot run azurere-run the last Azure build
I think there are different concerns here: (a) data integrity and (b) data semantics.
(a) The value of a field should always return its correct value. That's why the Flink runtime always check isNullAt before attempting to read. In Iceberg, we don't always do that because we rely on the FieldGetters for which this change is intended. If a field is null, it should read null. If a field is 42, it should read 42, but it shouldn't return 42 if its value is null. Unfortunately, this can happen without a null check when reading from BinaryRowData.
(b) It is a separate concern, whether or not to allow null values for nonNull types. This needs to be enforced by the layer which initializes the RowData. In Iceberg there are actually writers which allow nulled "required" types.
Does that make sense?
My intuitive understanding of the mentioned problem in Iceberg is why not fix the nullability of the field?
RowData as the basic structure of flink sql, follows the semantics of sql nullability, one field declared not-null should not be filled with null values. On the other hand, if a nullable field has set null bit, then its value can be seen as null directly and no need to read the value in the memorysegment (because it is reused as much as possible at the sql runtime implementation, which can save some operations).
Accurate nullability is important to the sql system, not only to ensure proper processing on the consumer side (e.g., the field may be used as the primary key of the target table, which requires not null), but also gives the planner more optimization possibilities (e.g., predicate derivation, and also the runtime code can skip the null check overhead for non-null fields, etc.).
My intuitive understanding of the mentioned problem in Iceberg is why not fix the nullability of the field?
In Iceberg, users can directly provide a stream of RowData, for which the precise schema is only known when writing the stream to the table. Once the table schema is available, Iceberg relies on the Flink FieldGetters to read values from the provided data, but it turns out that the result of the FieldGetter#getFieldOrNull cannot be trusted. When a null field returns an arbitrary value, depending what random bytes are currently stored for that field with the null flag being set, then the result is plainly wrong.
Accurate nullability is important to the sql system, not only to ensure proper processing on the consumer side (e.g., the field may be used as the primary key of the target table, which requires not null), but also gives the planner more optimization possibilities (e.g., predicate derivation, and also the runtime code can skip the null check overhead for non-null fields, etc.).
Thanks for highlighting the value of the non-null concept. I'm definitely not trying to get rid of non-null, but we need to be able to trust the values stored in RowData when using FieldGetters. If there are lower levels that don't want that check, that is fine too, they can directly operate on RowData and not use the FieldGetters.
What you say about skipping null checks is not in line with what Flink does. The Flink runtime performs null checks when reading from RowData, regardless of the nullability type. I believe the externally facing API of RowData#createFieldGetter(..) should do that as well.
I think it would be interesting to check how Flink handles user-provided null values for non-null types. I've seen table.exec.sink.not-null-enforcer option, but that only applies to sinks. That wouldn't help in this case because the null value would have already been resurrected in case of a missing isNullAt(..) check.
For the 1st part, my question is since the nullability info provided by users are not reliable, why not enforce the use of nullable type when use RowData#createFieldGetter? Am I missing something here?
Regarding Flink's current handling, you're right. I checked the code, and during the codegen phase, null checks are always added when accessing fields of Row types (not relying on fieldgetter).
For the 1st part, my question is since the nullability info provided by users are not reliable, why not enforce the use of nullable type when use
RowData#createFieldGetter? Am I missing something here?
I understand where you are coming from, but not all Iceberg writers have the concept of "non-null" / required columns. A schema may have certain type definitions, but ultimately the writer decides what can be written.
This PR is being marked as stale since it has not had any activity in the last 90 days. If you would like to keep this PR alive, please leave a comment asking for a review. If the PR has merge conflicts, update it with the latest from the base branch.
If you are having difficulty finding a reviewer, please reach out to the community, contact details can be found here: https://flink.apache.org/what-is-flink/community/
If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed.
@twalthr Do you mind having a look?
@mxm could you rebase this PR? We also observed similar issues when implementing PTFs. The field getter in RowData got fixed already. Not sure about the other changes in this PR. But let's rebase first.
@twalthr Rebased. It may still be worthwhile to merge the tests, to prevent regressions regarding null values.
@twalthr nit: from my reading of the conversation, it looks like this issue has been fixed in the code already, but we are adding a test, could we update the title of the PR to reflect the change we are now making please.
@davidradl Thanks for chiming in! IMHO the title still reflects the change, but I can adjust the title to focus more on the tests, e.g.: "[FLINK-37245] Add tests for FieldGetters to prevent masking null values in BinaryRowData". Does that look better to you?
@davidradl @twalthr Could you have another look?
@davidradl @twalthr Can we merge this?
Ack, renamed the PR and update the JIRA ticket. The root cause has been fixed with https://github.com/apache/flink/commit/0053b41d5d07252fef468e43c33f30b7fd1e27da.
@flinkbot run azure
Thanks for the review @ferenc-csaky @davidradl @lincoln-lil @twalthr!