flink icon indicating copy to clipboard operation
flink copied to clipboard

[FLINK-37245] Prevent masking null values in BinaryRowData

Open mxm opened this issue 9 months ago • 13 comments

RowData#createFieldGetter is the go-to method for creating field getters for a given field type and position. The method FieldGetter#getFieldOrNull suggests that null is returned if the field has been nulled. But that is not always the case.

When using BinaryRowData with a non-null field, which has been set to null, a call to FieldGetter#getFieldOrNull will return a non-null value, interpreting whatever bytes are backing the field as an actual value instead of null.

Example:

  public static void main(String[] args) {
    IntType nullableIntType = new IntType(true);
    IntType nonNullableIntType = new IntType(false);
    RowDataSerializer rowDataSerializer = new RowDataSerializer(
            nullableIntType, nonNullableIntType
    );
    BinaryRowData binaryRow = rowDataSerializer.toBinaryRow(GenericRowData.of(null, null));
    RowData.FieldGetter fieldGetter1 = RowData.createFieldGetter(nullableIntType, 0);
    RowData.FieldGetter fieldGetter2 = RowData.createFieldGetter(nonNullableIntType, 1);
    System.out.println(fieldGetter1.getFieldOrNull(binaryRow));
    System.out.println(fieldGetter2.getFieldOrNull(binaryRow));
  }

Output is:

null
0

The expected output would be that the second non-null field also returns null, or raises a NullPointerException directly. That's not the case because RowData#createFieldGetter only checks for null values (via a call to Rowdata#isNullAt(pos)) when the type is nullable (see https://github.com/apache/flink/blob/b86fdf906c06af8fc2841bca3d98dd3944bb5031/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/RowData.java#L289).

It seems fair to always check for null fields, instead of deferring this easy to forget check to the caller.

We needed to work around this bug in https://github.com/apache/iceberg/pull/12049.

mxm avatar Feb 03 '25 20:02 mxm

CI report:

  • 705558f6263fb0cee6b67b72a492a086b2585410 Azure: SUCCESS
Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

flinkbot avatar Feb 03 '25 20:02 flinkbot

I think there are different concerns here: (a) data integrity and (b) data semantics.

(a) The value of a field should always return its correct value. That's why the Flink runtime always check isNullAt before attempting to read. In Iceberg, we don't always do that because we rely on the FieldGetters for which this change is intended. If a field is null, it should read null. If a field is 42, it should read 42, but it shouldn't return 42 if its value is null. Unfortunately, this can happen without a null check when reading from BinaryRowData.

(b) It is a separate concern, whether or not to allow null values for nonNull types. This needs to be enforced by the layer which initializes the RowData. In Iceberg there are actually writers which allow nulled "required" types.

Does that make sense?

mxm avatar Feb 06 '25 17:02 mxm

My intuitive understanding of the mentioned problem in Iceberg is why not fix the nullability of the field? RowData as the basic structure of flink sql, follows the semantics of sql nullability, one field declared not-null should not be filled with null values. On the other hand, if a nullable field has set null bit, then its value can be seen as null directly and no need to read the value in the memorysegment (because it is reused as much as possible at the sql runtime implementation, which can save some operations). Accurate nullability is important to the sql system, not only to ensure proper processing on the consumer side (e.g., the field may be used as the primary key of the target table, which requires not null), but also gives the planner more optimization possibilities (e.g., predicate derivation, and also the runtime code can skip the null check overhead for non-null fields, etc.).

lincoln-lil avatar Feb 07 '25 06:02 lincoln-lil

My intuitive understanding of the mentioned problem in Iceberg is why not fix the nullability of the field?

In Iceberg, users can directly provide a stream of RowData, for which the precise schema is only known when writing the stream to the table. Once the table schema is available, Iceberg relies on the Flink FieldGetters to read values from the provided data, but it turns out that the result of the FieldGetter#getFieldOrNull cannot be trusted. When a null field returns an arbitrary value, depending what random bytes are currently stored for that field with the null flag being set, then the result is plainly wrong.

Accurate nullability is important to the sql system, not only to ensure proper processing on the consumer side (e.g., the field may be used as the primary key of the target table, which requires not null), but also gives the planner more optimization possibilities (e.g., predicate derivation, and also the runtime code can skip the null check overhead for non-null fields, etc.).

Thanks for highlighting the value of the non-null concept. I'm definitely not trying to get rid of non-null, but we need to be able to trust the values stored in RowData when using FieldGetters. If there are lower levels that don't want that check, that is fine too, they can directly operate on RowData and not use the FieldGetters.

What you say about skipping null checks is not in line with what Flink does. The Flink runtime performs null checks when reading from RowData, regardless of the nullability type. I believe the externally facing API of RowData#createFieldGetter(..) should do that as well.

I think it would be interesting to check how Flink handles user-provided null values for non-null types. I've seen table.exec.sink.not-null-enforcer option, but that only applies to sinks. That wouldn't help in this case because the null value would have already been resurrected in case of a missing isNullAt(..) check.

mxm avatar Feb 07 '25 16:02 mxm

For the 1st part, my question is since the nullability info provided by users are not reliable, why not enforce the use of nullable type when use RowData#createFieldGetter? Am I missing something here?

Regarding Flink's current handling, you're right. I checked the code, and during the codegen phase, null checks are always added when accessing fields of Row types (not relying on fieldgetter).

lincoln-lil avatar Feb 10 '25 13:02 lincoln-lil

For the 1st part, my question is since the nullability info provided by users are not reliable, why not enforce the use of nullable type when use RowData#createFieldGetter? Am I missing something here?

I understand where you are coming from, but not all Iceberg writers have the concept of "non-null" / required columns. A schema may have certain type definitions, but ultimately the writer decides what can be written.

mxm avatar Feb 18 '25 11:02 mxm

This PR is being marked as stale since it has not had any activity in the last 90 days. If you would like to keep this PR alive, please leave a comment asking for a review. If the PR has merge conflicts, update it with the latest from the base branch.

If you are having difficulty finding a reviewer, please reach out to the community, contact details can be found here: https://flink.apache.org/what-is-flink/community/

If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed.

github-actions[bot] avatar May 20 '25 06:05 github-actions[bot]

@twalthr Do you mind having a look?

mxm avatar May 30 '25 16:05 mxm

@mxm could you rebase this PR? We also observed similar issues when implementing PTFs. The field getter in RowData got fixed already. Not sure about the other changes in this PR. But let's rebase first.

twalthr avatar Jun 04 '25 12:06 twalthr

@twalthr Rebased. It may still be worthwhile to merge the tests, to prevent regressions regarding null values.

mxm avatar Jun 05 '25 08:06 mxm

@twalthr nit: from my reading of the conversation, it looks like this issue has been fixed in the code already, but we are adding a test, could we update the title of the PR to reflect the change we are now making please.

davidradl avatar Jun 05 '25 08:06 davidradl

@davidradl Thanks for chiming in! IMHO the title still reflects the change, but I can adjust the title to focus more on the tests, e.g.: "[FLINK-37245] Add tests for FieldGetters to prevent masking null values in BinaryRowData". Does that look better to you?

mxm avatar Jun 05 '25 08:06 mxm

@davidradl @twalthr Could you have another look?

mxm avatar Jun 12 '25 12:06 mxm

@davidradl @twalthr Can we merge this?

mxm avatar Jul 15 '25 08:07 mxm

Ack, renamed the PR and update the JIRA ticket. The root cause has been fixed with https://github.com/apache/flink/commit/0053b41d5d07252fef468e43c33f30b7fd1e27da.

mxm avatar Sep 10 '25 10:09 mxm

@flinkbot run azure

mxm avatar Sep 10 '25 10:09 mxm

Thanks for the review @ferenc-csaky @davidradl @lincoln-lil @twalthr!

mxm avatar Sep 10 '25 11:09 mxm