inlong icon indicating copy to clipboard operation
inlong copied to clipboard

[INLONG-5133][Sort] Support InfluxDB extract node

Open liangyepianzhou opened this issue 2 years ago • 4 comments

Prepare a Pull Request

  • Fixes https://github.com/apache/inlong/issues/5133

Motivation

Add Apache InfluxDB Extract for Sort

Modifications

The details can be found at https://github.com/apache/inlong/issues/5133.

Verifying this change

(Please pick either of the following options)

  • [ ] This change is a trivial rework/code cleanup without any test coverage.

  • [ ] This change is already covered by existing tests, such as: (please describe tests)

  • [ ] This change added tests and can be verified as follows:

    (example:)

    • Added integration tests for end-to-end deployment with large payloads (10MB)
    • Extended integration test for recovery after broker failure

Documentation

  • Does this pull request introduces a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

liangyepianzhou avatar Sep 03 '22 11:09 liangyepianzhou

@jun0315 I have finished writing the main framework of influx source, and now there are three points that need to be improved. I've listed it below, please take a look at this question when you have time.

  1. Parameters
    • The parameters written in the building function in the InfluxDBSource may not be complete. So if you need to add parameters here, you also need to add them in the getScanRuntimeProvider and InfluxDBTableSourceFactory.
    • Divide required and optional parameters into the methods requiredOptions and optionalOptions
  2. Serialization
    • InfluxDB Table Source needs to pass in a deserializer DebeziumDeserializationSchema<RowData> deserializer when calling the function; This can get help at flink-connector-influxdb2
  3. Connecter CanonicalName: There is a configuration item in InfluxDBSource that I don't know how to deal with props.setProperty("connector.class", InfluxDbConnector.class.getCanonicalName());

In order to better understand the workflow of the Flink influx DB source, I wrote a simple process described below: Workflow of Flink source connecter (simplified version): InfluxDBTableSourceFactory creates an InfluxDBTableSource, and the getScanRuntimeProvider method of InfluxDBTableSource creates a DebeziumSourceFunction responsible for fetching captured change data from the database into Flink. And InfluxDBSource is similar to a creator used to create a DebeziumSourceFunction. And more details can be found here.

liangyepianzhou avatar Sep 03 '22 11:09 liangyepianzhou

@dockerzhang Please help check this PR when you have time, THX. In addition, I noticed that except for MySQL and JDBC, other CDCs only implement source and not sink. Influx DB also only needs to implement source, right?

liangyepianzhou avatar Sep 03 '22 11:09 liangyepianzhou

@dockerzhang Please help check this PR when you have time, THX. In addition, I noticed that except for MySQL and JDBC, other CDCs only implement source and not sink. Influx DB also only needs to implement source, right?

I guess this issue means you need to implement both sink and source

EMsnap avatar Sep 05 '22 02:09 EMsnap

@dockerzhang Please help check this PR when you have time, THX. In addition, I noticed that except for MySQL and JDBC, other CDCs only implement source and not sink. Influx DB also only needs to implement source, right?

I guess this issue means you need to implement both sink and source

also the sink of these connector is implement by jdbc driver, can check the jdbc connector for details

EMsnap avatar Sep 05 '22 02:09 EMsnap

This PR is stale because it has been open for 60 days with no activity.

github-actions[bot] avatar Jan 07 '23 01:01 github-actions[bot]

This PR is stale because it has been open for 60 days with no activity.

github-actions[bot] avatar Apr 05 '23 01:04 github-actions[bot]