Flink: Migrate Flink `TableSchema` to `Schema`/`ResolvedSchema`
Summary
This is part of #13054.
To deprecate FlinkSchemaUtil.convert(TableSchema), we also need to deprecate its dependent methods and remove the use of TableSchema. This PR contributes to that goal by removing TableSchema usage from Flink 2.0 and Flink 1.20.
Hi @ajantha-bhat @nastra. PTAL, thank you!
I remember previous attempts to remove TableSchema from Flink are failed. Could you please check if we were able to backport these changes to Flink 1.19/1.20?
Thanks for working on this!
I remember previous attempts to remove TableSchema from Flink are failed. Could you please check if we were able to backport these changes to Flink 1.19/1.20?
Hi @pvary, thank you for the review! I’m wondering what’s our usual approach for checking backward compatibility. Should I apply similar changes to 1.19 and 1.20 to see if all tests still pass?
I remember previous attempts to remove TableSchema from Flink are failed. Could you please check if we were able to backport these changes to Flink 1.19/1.20?
Hi @pvary, thank you for the review! I’m wondering what’s our usual approach for checking backward compatibility. Should I apply similar changes to 1.19 and 1.20 to see if all tests still pass?
That's a good idea. Just please keep this PR 1.20 only, so it is easier to address the comments, and review.
Thanks, Peter
Made the same change for Flink 1.20, tests passed locally. Please take another look, thank you @pvary @mxm!
@liamzwbao: What is the plan with the other uses of TableSchema? Do we plan to remove those as well?
@pvary Yep, my plan is to first remove TableSchema from IcebergSource. After that, I’ll deprecate the remaining methods in FlinkSchemaUtil, followed by deprecations in FlinkSink and IcebergSink. I’ll also clean up any remaining usages of TableSchema across the Flink codebase after that.
What are the obstacles for completely getting rid of TableSchema?
What are the obstacles for completely getting rid of TableSchema?
Hi @mxm, do you mean removing all usages of TableSchema in a single PR? I think that would result in a fairly large change, which might be difficult to review. So my current plan is to break it down into smaller PRs. However, if you believe it's better to handle it all in one PR, I can give that a try.
As for the obstacles, one current discrepancy in the migration is that the legacy TableSchema.builder().build() performs some validation in the build() method. These validations are missing when directly using ResolvedSchema.of(), as they have been moved to the SchemaResolver.
So I see two possible options:
- Add the missing validation logic into the current utility function.
- Build a
Schemafirst and then resolve it toResolvedSchema. However, this approach requires aSchemaResolver, which may involve API changes. I'm not sure how we can obtain aSchemaResolverin this context.
For more discussion, please refer to this thread.
Thanks for elaborating! I've responded in the thread. If we can, it would be great to completely get rid of TableSchema in this PR. We can add the steps as separate commits. I leave it up to you though, if you think that is too much work.
Hi @mxm and @pvary, this PR is ready for review. I’ve fully removed/deprecated the use of TableSchema in Flink 2.0/1.20. The changes are split into four commits: IcebergSource, FlinkSchemaUtil, FlinkSink/IcebergSink, and other remaining items such as FlinkCatalog. Please take another look, thanks!
@liamzwbao: This is quite a big change, but seems like a good direction to me. If I can have 2 requests, I would like to ask you:
- It is good that we validated that the Flink 1.20, and the Flink 2.0 migration is working, but in the PR only change the thing for Flink 2.0. This helps for the reviewers, so we don't have to double check everything, and also helps the contributors as a requested change is needed to be modified once. Later, when the Flink 2.0 PR is merged, then in a separate PR we backport the final changes to Flink 1.20/1.19, and then we only need to highlight the changes compared to the original PR.
- Could you please check that we keep calling the old, deprecated methods at least in a few unit tests (maybe add some new duplicated smokes tests). This is needed to ensure that the old functionality still works.
Thanks for all your effort here! I would love to merge these changes after the 2 asks above are addressed.
Thanks, Peter
Merged to main. Thanks for the migration @liamzwbao and @mxm for the review!
@liamzwbao: Please prepare the backport PRs to Flink 1.19/1.20. The following command could help: g diff HEAD~1 HEAD flink/v2.0 |sed "s/v2.0/v1.19/g">/tmp/patch. Please leave a not on the backport PR if you need to modify anything above cleanly applying the patch.