inlong
inlong copied to clipboard
[INLONG-5133][Sort] Support InfluxDB extract node
Prepare a Pull Request
- Fixes https://github.com/apache/inlong/issues/5133
Motivation
Add Apache InfluxDB Extract for Sort
Modifications
The details can be found at https://github.com/apache/inlong/issues/5133.
Verifying this change
(Please pick either of the following options)
-
[ ] This change is a trivial rework/code cleanup without any test coverage.
-
[ ] This change is already covered by existing tests, such as: (please describe tests)
-
[ ] This change added tests and can be verified as follows:
(example:)
- Added integration tests for end-to-end deployment with large payloads (10MB)
- Extended integration test for recovery after broker failure
Documentation
- Does this pull request introduces a new feature? (yes / no)
- If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
@jun0315 I have finished writing the main framework of influx source, and now there are three points that need to be improved. I've listed it below, please take a look at this question when you have time.
- Parameters
- The parameters written in the building function in the
InfluxDBSource
may not be complete. So if you need to add parameters here, you also need to add them in thegetScanRuntimeProvider
andInfluxDBTableSourceFactory
. - Divide required and optional parameters into the methods
requiredOptions
andoptionalOptions
- The parameters written in the building function in the
- Serialization
- InfluxDB Table Source needs to pass in a
deserializer DebeziumDeserializationSchema<RowData>
deserializer when calling the function; This can get help at flink-connector-influxdb2
- InfluxDB Table Source needs to pass in a
- Connecter CanonicalName: There is a configuration item in InfluxDBSource that I don't know how to deal with
props.setProperty("connector.class", InfluxDbConnector.class.getCanonicalName());
In order to better understand the workflow of the Flink influx DB source, I wrote a simple process described below:
Workflow of Flink source connecter (simplified version):
InfluxDBTableSourceFactory
creates an InfluxDBTableSource
, and the getScanRuntimeProvider
method of InfluxDBTableSource
creates a DebeziumSourceFunction
responsible for fetching captured change data from the database into Flink.
And InfluxDBSource is similar to a creator used to create a DebeziumSourceFunction.
And more details can be found here.
@dockerzhang Please help check this PR when you have time, THX. In addition, I noticed that except for MySQL and JDBC, other CDCs only implement source and not sink. Influx DB also only needs to implement source, right?
@dockerzhang Please help check this PR when you have time, THX. In addition, I noticed that except for MySQL and JDBC, other CDCs only implement source and not sink. Influx DB also only needs to implement source, right?
I guess this issue means you need to implement both sink and source
@dockerzhang Please help check this PR when you have time, THX. In addition, I noticed that except for MySQL and JDBC, other CDCs only implement source and not sink. Influx DB also only needs to implement source, right?
I guess this issue means you need to implement both sink and source
also the sink of these connector is implement by jdbc driver, can check the jdbc connector for details
This PR is stale because it has been open for 60 days with no activity.
This PR is stale because it has been open for 60 days with no activity.