[FLINK-38444][table] Join unique keys should not contain null values
What is the purpose of the change
The planner currently considers the union of both the unique and upsert keys from the left and from the right to be a valid resulting upsert key. That's true for inner joins but for left/right/full joins that leads to a resulting unique key that contains columns that can be null, which is not valid.
Brief change log
- Check for null generating columns when creating superset of unique keys
Verifying this change
- Adjusted existing tests
Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (no)
- The public API, i.e., is any changed class annotated with
@Public(Evolving): (no) - The serializers: (no)
- The runtime per-record code paths (performance sensitive): (no)
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
- The S3 file system connector: (no)
Documentation
- Does this pull request introduce a new feature? (no)
- If yes, how is the feature documented? (not applicable)
CI report:
- 1a80731c2b5216b1bfaf65d2160a2d4329d1c405 Azure: FAILURE
Bot commands
The @flinkbot bot supports the following commands:@flinkbot run azurere-run the last Azure build
CI is failling on unrelated e2e tests issue happening across PRs
Hi, @gustavodemorais. I'm wondering why we can't have null columns as part of unique keys? From what I see in the SQL standard, it seems that unique keys can contain null columns. Here is the specific description from the SQL 2016 standard:
A unique constraint that does not include a <without overlap specification> on a table T is satisfied if and only
if there do not exist two rows R1 and R2 of T such that R1 and R2 have the same non-null values in the unique
columns. If a unique constraint UC on a table T includes a <without overlap specification>WOS, then let ATPN
be the <application time period name> contained in WOS. UC is satisfied if and only if there do not exist two
rows R1 and R2 of T such that R1 and R2 have the same non-null values in the unique columns and the ATPN
period values of R1 and R2 overlap. In addition, if the unique constraint was defined with PRIMARY KEY,
then it requires that none of the values in the specified column or columns be a null value.
Hi, @gustavodemorais. I'm wondering why we can't have null columns as part of unique keys? From what I see in the SQL standard, it seems that unique keys can contain null columns. Here is the specific description from the SQL 2016 standard:
A unique constraint that does not include a <without overlap specification> on a table T is satisfied if and only if there do not exist two rows R1 and R2 of T such that R1 and R2 have the same non-null values in the unique columns. If a unique constraint UC on a table T includes a <without overlap specification>WOS, then let ATPN be the <application time period name> contained in WOS. UC is satisfied if and only if there do not exist two rows R1 and R2 of T such that R1 and R2 have the same non-null values in the unique columns and the ATPN period values of R1 and R2 overlap. In addition, if the unique constraint was defined with PRIMARY KEY, then it requires that none of the values in the specified column or columns be a null value.
Hey @xuyangzhong, thanks for the comment. I was at Flink Forward and just reading your reply now.
That's a good catch. The issue is that we are using the unique keys as upsert keys and in my understand we can't allow upsert keys to contain null values and that's what I was trying to fix here. It leads to a runtime error since a sink expects the upsert key not to contain a null value which can happen in a case of a left join. https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeys.scala#L352
We could also have additional logic in getJoinUpsertKeys to remove such keys. Do you have other suggestions?
@gustavodemorais IIUC, currently there is no explicit handling of null values for the upsert key from the planning stage (where it originates from the unique key, which is not restricted from containing null values) to runtime (where the operators do not include any special treatment for columns that may contain null values in the upsert key). In other words, there are no restrictions preventing the upsert key from containing null values. I’m curious about the issue you mentioned. Could you explain it in more detail?
It leads to a runtime error since a sink expects the upsert key not to contain a null value which can happen in a case of a left join
I’m curious about the issue you mentioned. Could you explain it in more detail?
Sure. If we create two tables and apply a left join, it might happen that the upsert key contains nullable columns, like in the examples in the tests. We can then create a third table with a primary key and submit an insert into with this upsert key. The planner will accept the statement and as soon as we produce one row with a null value, the job fails.
Here are simple repro steps
CREATE TABLE `users` (
`user_id` INT NOT NULL,
`other_data` STRING,
CONSTRAINT `PRIMARY` PRIMARY KEY (`user_id`) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'key.format' = 'json',
'properties.bootstrap.servers' = 'localhost:9092',
'topic' = 'users',
'value.format' = 'json'
);
CREATE TABLE `orders_with_composite_key` (
`order_id` BIGINT NOT NULL,
`user_id` INT NOT NULL,
`item_name` STRING,
CONSTRAINT `PRIMARY` PRIMARY KEY (`order_id`, `user_id`) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'key.format' = 'json',
'properties.bootstrap.servers' = 'localhost:9092',
'topic' = 'orders',
'value.format' = 'json'
);
CREATE TABLE test_join_upsert (
user_id INT NOT NULL,
order_id BIGINT NOT NULL,
user_id0 INT NOT NULL,
other_data STRING,
PRIMARY KEY (user_id, order_id, user_id0) NOT ENFORCED -- this matches the nullable upsert key calculated by the planner
)
WITH (
'connector' = 'upsert-kafka',
'key.format' = 'json',
'properties.bootstrap.servers' = 'localhost:9092',
'topic' = 'users_orders_upsert',
'value.format' = 'json'
)
INSERT INTO test_join_upsert
SELECT
o.user_id,
o.order_id,
u.user_id AS user_id0,
u.other_data
FROM `users` AS u
LEFT JOIN `orders_with_composite_key` AS o
ON o.user_id = u.user_id;
-- this causes the job to fail
INSERT INTO users (VALUES (1, '1'));
Two things happen
- Running explain on the insert above doesn't show upsertMaterialize=[true] because the primary key matches the upsert key calculated by the planner (which I don't think is correct)
- It fails at runtime after inserting an user with
Caused by: org.apache.flink.table.runtime.operators.sink.constraint.EnforcerException: Column 'user_id' is NOT NULL, however, a null value is being written into it. You can set job configuration 'table.exec.sink.not-null-enforcer'='DROP' to suppress this exception and drop such records silently.
at org.apache.flink.table.runtime.operators.sink.constraint.NotNullConstraint.enforce(NotNullConstraint.java:55)
at org.apache.flink.table.runtime.operators.sink.constraint.ConstraintEnforcerExecutor.enforce(ConstraintEnforcerExecutor.java:445)
at org.apache.flink.table.runtime.operators.sink.constraint.ConstraintEnforcer.processElement(ConstraintEnforcer.java:66)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at StreamExecCalc$6.processElement(Unknown Source)
Ideally we would fail this during planning. Now, I still think that by definition upsert keys cannot contain null values - do you agree with that? At the same time, while reproducing this I realized the runtime issue might not be solved by addressing nullable upsert keys. What do you think?
@gustavodemorais I agree with your point that we should ideally find this unexpected behavior during the planning phase. In fact, I believe there's a fundamental issue with how this sql is written. When using a LEFT JOIN, one must be aware that fields from the right table may be null. If the sink table defines some fields from the right table as part of the combined primary key—which must not contain null values—then it's reasonable to encounter an error stating "null attempted to be written to not null."
Moreover, this issue is not related to sink upsert materialization, which is intended to address out-of-order problems. Under this sql, out-of-order issues aren't present, so upsertMaterialize=[true] won't be shown. In other words, even if upsert materialization were utilized, it wouldn't resolve the potential for null values being written to the primary key.
I ran the SQL on MySQL, and I received a similar error regarding null being written to the primary key.
CREATE TABLE T1 (
a1 int primary key,
b1 int
);
CREATE TABLE T2 (
a2 int primary key,
b2 int
);
CREATE TABLE T3 (
a1 int,
b1 int,
a2 int,
b2 int,
primary key (a1, a2)
);
insert into T1 values(1, 1);
insert into T1 values(2, 2);
insert into T2 values(1, 1);
insert into T3 select * from T1 left join T2 on a1 = a2;
ERROR 1048 (23000) at line 23: Column 'a2' cannot be null
Hey @xuyangzhong, thanks for the reply. I think we're aligned that it'd be good to have a validation error during the planning phase for this. We can have a different ticket for that. I agree this is not related to the upsertKeys 👍
Regarding upsertKeys, since it's an internal concept that doesn't come from a standard we follow, it would be good to have a place where we have our definition. Do we have something like this somewhere? Or else we could use this PR to add a detailed description to use as a reference in the future.
@gustavodemorais Sorry for the late reply. I found the description of the upsert key here https://github.com/apache/flink/blob/3fb23beb87266dfabc8c6c643a39944a8875a0fc/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/metadata/FlinkRelMetadataQuery.java#L269
Thanks for the link and the discussion @xuyangzhong. One thing we could make a bit clearer in the description is if
- upsertKeys can contain nullable attributes
- If they can consist of only nullable attributes
I think your point is that 1 is true. Now regarding 2, if we look purely at the current implementation for joins, and if I understand it correctly, they cannot consist only of nullable values. This is a bit confusing since they can contain nullable attributes (1). Do you know happen to know why? We have these checks for outer joins, see https://github.com/apache/flink/pull/27090/files#diff-8be3389bbe106e1e8b09a8d33b8d0b803e320384dec6fb486de2867bd923e46cR622
@gustavodemorais The upsert key is essentially the unique key based on the shuffle key, so when we're discussing whether the upsert key can contain or consist solely of null values, we're actually talking about the unique key.
Let me try to explain the code you pointed out. As indicated in the comment, it's a scenario involving a 1-on-1 join. Let's consider that each record on the right can join to either zero or one record on the left. In this case, if we have an inner join (where joinRelType.generatesNullsOnLeft is true), the unique key from the left side can serve as an independent set of unique keys after the join. However, if it's a right outer join, the unique key from the left side cannot independently form a set of unique keys after join.
For example, consider a pkpk join between the left table with schema (pk_left, a) and the right table (pk_right, b), where the join key is defined as pk_left = pk_right. The output would include two columns: pk_left and pk_right.
If it's a right join, the result may include rows like [null, pk_right_1] and [null, pk_right_2], where the values in the pk_left can be null. If this were in a batch processing scenario and we were inserting data directly into a database that has a unique constraint:
- If the unique constraint for the result table is defined on the
pk_leftcolumn, the database will throw an error because it doesn't permit duplicate valuesnullin a unique column. - However, if the unique constraint is defined on the
right_pkcolumn or on bothleft_pkandright_pk, there will be no error.
Based on the reasoning above, the unique keys output from this right join will consist of two sets: {{right_pk}, {left_pk + right_pk}}. And the latter set, {left_pk + right_pk}, may include null values in the column left_pk.
@gustavodemorais The upsert key is essentially the unique key based on the shuffle key, so when we're discussing whether the upsert key can contain or consist solely of null values, we're actually talking about the unique key.
Hey Xuyang, that makes sense. Thanks for the very detailed and well written reply!
In this case, if we have an inner join (where joinRelType.generatesNullsOnLeft is true)
For an inner join, you probably mean joinRelType.generatesNullsOnLeft is false, right? Thus !joinRelType.generatesNullsOnLeft is true