iceberg icon indicating copy to clipboard operation
iceberg copied to clipboard

Flink: Migrate Flink `TableSchema` to `Schema`/`ResolvedSchema`

Open liamzwbao opened this issue 7 months ago • 12 comments

Summary

This is part of #13054.

To deprecate FlinkSchemaUtil.convert(TableSchema), we also need to deprecate its dependent methods and remove the use of TableSchema. This PR contributes to that goal by removing TableSchema usage from Flink 2.0 and Flink 1.20.

liamzwbao avatar May 15 '25 23:05 liamzwbao

Hi @ajantha-bhat @nastra. PTAL, thank you!

liamzwbao avatar May 18 '25 00:05 liamzwbao

I remember previous attempts to remove TableSchema from Flink are failed. Could you please check if we were able to backport these changes to Flink 1.19/1.20?

Thanks for working on this!

pvary avatar May 19 '25 11:05 pvary

I remember previous attempts to remove TableSchema from Flink are failed. Could you please check if we were able to backport these changes to Flink 1.19/1.20?

Hi @pvary, thank you for the review! I’m wondering what’s our usual approach for checking backward compatibility. Should I apply similar changes to 1.19 and 1.20 to see if all tests still pass?

liamzwbao avatar May 20 '25 23:05 liamzwbao

I remember previous attempts to remove TableSchema from Flink are failed. Could you please check if we were able to backport these changes to Flink 1.19/1.20?

Hi @pvary, thank you for the review! I’m wondering what’s our usual approach for checking backward compatibility. Should I apply similar changes to 1.19 and 1.20 to see if all tests still pass?

That's a good idea. Just please keep this PR 1.20 only, so it is easier to address the comments, and review.

Thanks, Peter

pvary avatar May 21 '25 15:05 pvary

Made the same change for Flink 1.20, tests passed locally. Please take another look, thank you @pvary @mxm!

liamzwbao avatar May 22 '25 01:05 liamzwbao

@liamzwbao: What is the plan with the other uses of TableSchema? Do we plan to remove those as well?

pvary avatar May 23 '25 08:05 pvary

@pvary Yep, my plan is to first remove TableSchema from IcebergSource. After that, I’ll deprecate the remaining methods in FlinkSchemaUtil, followed by deprecations in FlinkSink and IcebergSink. I’ll also clean up any remaining usages of TableSchema across the Flink codebase after that.

liamzwbao avatar May 24 '25 13:05 liamzwbao

What are the obstacles for completely getting rid of TableSchema?

mxm avatar May 28 '25 10:05 mxm

What are the obstacles for completely getting rid of TableSchema?

Hi @mxm, do you mean removing all usages of TableSchema in a single PR? I think that would result in a fairly large change, which might be difficult to review. So my current plan is to break it down into smaller PRs. However, if you believe it's better to handle it all in one PR, I can give that a try.

As for the obstacles, one current discrepancy in the migration is that the legacy TableSchema.builder().build() performs some validation in the build() method. These validations are missing when directly using ResolvedSchema.of(), as they have been moved to the SchemaResolver.

So I see two possible options:

  • Add the missing validation logic into the current utility function.
  • Build a Schema first and then resolve it to ResolvedSchema. However, this approach requires a SchemaResolver, which may involve API changes. I'm not sure how we can obtain a SchemaResolver in this context.

For more discussion, please refer to this thread.

liamzwbao avatar May 29 '25 22:05 liamzwbao

Thanks for elaborating! I've responded in the thread. If we can, it would be great to completely get rid of TableSchema in this PR. We can add the steps as separate commits. I leave it up to you though, if you think that is too much work.

mxm avatar Jun 03 '25 09:06 mxm

Hi @mxm and @pvary, this PR is ready for review. I’ve fully removed/deprecated the use of TableSchema in Flink 2.0/1.20. The changes are split into four commits: IcebergSource, FlinkSchemaUtil, FlinkSink/IcebergSink, and other remaining items such as FlinkCatalog. Please take another look, thanks!

liamzwbao avatar Jun 04 '25 02:06 liamzwbao

@liamzwbao: This is quite a big change, but seems like a good direction to me. If I can have 2 requests, I would like to ask you:

  • It is good that we validated that the Flink 1.20, and the Flink 2.0 migration is working, but in the PR only change the thing for Flink 2.0. This helps for the reviewers, so we don't have to double check everything, and also helps the contributors as a requested change is needed to be modified once. Later, when the Flink 2.0 PR is merged, then in a separate PR we backport the final changes to Flink 1.20/1.19, and then we only need to highlight the changes compared to the original PR.
  • Could you please check that we keep calling the old, deprecated methods at least in a few unit tests (maybe add some new duplicated smokes tests). This is needed to ensure that the old functionality still works.

Thanks for all your effort here! I would love to merge these changes after the 2 asks above are addressed.

Thanks, Peter

pvary avatar Jun 19 '25 17:06 pvary

Merged to main. Thanks for the migration @liamzwbao and @mxm for the review!

@liamzwbao: Please prepare the backport PRs to Flink 1.19/1.20. The following command could help: g diff HEAD~1 HEAD flink/v2.0 |sed "s/v2.0/v1.19/g">/tmp/patch. Please leave a not on the backport PR if you need to modify anything above cleanly applying the patch.

pvary avatar Jun 25 '25 08:06 pvary