[Feature][Flink] Add flink version 1.20.1 support
https://github.com/apache/seatunnel/issues/9513
Purpose of this pull request
This pull request fixes the Flink 1.20 compatibility issue in the SeaTunnel Flink translation layer. The current implementation uses reflection to access internal StreamingRuntimeContext from SourceReaderContext, which is fragile and may break with Flink version updates.
Does this PR introduce any user-facing change?
No. This is an internal implementation fix that maintains the same public API behavior. Users will continue to use the same SeaTunnel connector APIs without any changes. The metrics functionality remains identical from the user perspective.
How was this patch tested?
Manual Testing: Built SeaTunnel with Flink 1.20 dependencies Ran sample connector jobs to verify metrics collection Confirmed no reflection-related errors in logs Validated metrics appear correctly in Flink Web UI
Check list
- [ ] If any new Jar binary package adding in your PR, please add License Notice according New License Guide
- [ ] If necessary, please update the documentation to describe the new feature. https://github.com/apache/seatunnel/tree/dev/docs
- [ ] If you are contributing the connector code, please check that the following files are updated:
- Update plugin-mapping.properties and add new connector information in it
- Update the pom file of seatunnel-dist
- Add ci label in label-scope-conf
- Add e2e testcase in seatunnel-e2e
- Update connector plugin_config
cc @TyrantLucifer
Good pr
We need new flink 1.20.1 test container to verify it. https://github.com/apache/seatunnel/tree/dev/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink
The code has now been simplified and the Flink20Container has been added.
https://hub.docker.com/repository/docker/tyrantlucifer/flink/tags/1.20.2-scala_2.12_hadoop27/sha256-2a3f548cd690dd3718fbcd73208c9de7d28bbbd9385df3581ccf347128b390d0
I have pushed test docker image, tag is tyrantlucifer/flink:1.20.2-scala_2.12_hadoop27
https://hub.docker.com/repository/docker/tyrantlucifer/flink/tags/1.20.2-scala_2.12_hadoop27/sha256-2a3f548cd690dd3718fbcd73208c9de7d28bbbd9385df3581ccf347128b390d0
I have pushed test docker image, tag is
tyrantlucifer/flink:1.20.2-scala_2.12_hadoop27We've currently found that when using this Docker image, there seem to be some image-related issues during the execution of the CI process. Could you please check it out?
We need new flink 1.20.1 test container to verify it. https://github.com/apache/seatunnel/tree/dev/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink
The test container for Flink 1.20 has been added.
We need new flink 1.20.1 test container to verify it. https://github.com/apache/seatunnel/tree/dev/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink
It has been added.
LGTM
waiting test case passes.
@yzeng1618 CI failed. Please check
waiting test case passes.
@yzeng1618 CI failed. Please check
Currently, it seems that there is an issue with the CI (Continuous Integration) process of the dev (development) environment itself.
@Hisoka-X @zhangshenghang It seems there's a problem with the mirror image. Do you have any updated information?
It should be working normally now, and other users haven't reported this issue.
Please merge dev. it fix the timeout issue.
Please merge dev. it fix the timeout issue.
Have merged
Please pull the latest changes and try again.
@Hisoka-X @TyrantLucifer @Carl-Zhou-CN @zhangshenghang After adapting the new code and fixing the CI (Continuous Integration) process, the modifications have now been completed. Could you please help review this
@Hisoka-X @TyrantLucifer Do you still have any questions about the previous issue?
issue?
Yes,it also has some problems,please wait a minute
issue?
Yes,it also has some problems,please wait a minute
Please take some time to elaborate on the specific issues in detail @TyrantLucifer
I try to implement metrics use origin logic, it looks worked.
https://github.com/TyrantLucifer/incubator-seatunnel/tree/BDPL-support1.20.1-test-metrics
cc @CloverDew
I try to implement metrics use origin logic, it looks worked.
https://github.com/TyrantLucifer/incubator-seatunnel/tree/BDPL-support1.20.1-test-metrics
cc @CloverDew
meter also worked
I try to implement metrics use origin logic, it looks worked. https://github.com/TyrantLucifer/incubator-seatunnel/tree/BDPL-support1.20.1-test-metrics cc @CloverDew
meter also worked
![]()
Thanks a lot for your review and guidance. @TyrantLucifer My original counter implementation in the test branch was mainly to get things working, but it has some drawbacks: it maintains two states (metrics.Counter + accumulator), relies on special handling for a few “key” metrics, and diverges from the Flink 1.13 implementation. The current approach using runtimeContext.getLongCounter(name) and the shared FlinkCounter is cleaner and more consistent across Flink versions, so I’m happy to follow this design.
LGTM, waiting CICD, thank for your contribution ! @yzeng1618
LGTM, waiting CICD, thank for your contribution ! @yzeng1618
Thanks a lot for your guidance
@Hisoka-X @zhangshenghang It seems there's a problem with the mirror image. Do you have any updated information?
I try to implement metrics use origin logic, it looks worked.