flink icon indicating copy to clipboard operation
flink copied to clipboard

[FLINK-28284][Connectors/Jdbc] Add JdbcSink with new format

Open eskabetxe opened this issue 2 years ago • 3 comments

What is the purpose of the change

Add a JdbcSink with new format (sink2)

Brief change log

  • JdbcSink the new sink
  • JdbcSinkWriter the writer used by the new sink
  • JdbcQueryStatement the query and preparestatement that will be used
  • JdbcWriterStatement the writer statement that will write to jdbc, this allow to implement database specific connection provider
  • SimpleJdbcWriterStatement a simple implementation of JdbcWriterStatement

Verifying this change

This change added tests and can be verified as follows:

  • JdbcITCase.testInsertWithSinkTo
  • JdbcITCase.testObjectReuseWithSinkTo

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): ( no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes)
  • The serializers: ( no )
  • The runtime per-record code paths (performance sensitive): (no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: ( no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (yes )
  • If yes, how is the feature documented? ( JavaDocs / not documented)

eskabetxe avatar Jun 28 '22 19:06 eskabetxe

@leonardBang @MartijnVisser could you check this?

eskabetxe avatar Jun 28 '22 19:06 eskabetxe

CI report:

  • b5734d14242f5f018a85f266cdad98ad8af94c3a Azure: SUCCESS
Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

flinkbot avatar Jun 28 '22 19:06 flinkbot

@flinkbot run azure

eskabetxe avatar Jun 29 '22 09:06 eskabetxe

I will review this in the next few weeks :)

wanglijie95 avatar Oct 21 '22 08:10 wanglijie95

@wanglijie95 if you need anything from me, feel free to contact via slack

eskabetxe avatar Nov 09 '22 16:11 eskabetxe

@wanglijie95 Do you have any idea when you can look into this ticket?

MartijnVisser avatar Nov 16 '22 21:11 MartijnVisser

@MartijnVisser I started. I still need one more week to complete the first round review.

wanglijie95 avatar Nov 17 '22 14:11 wanglijie95

Thanks @eskabetxe for creating this PR. I have 3 questions about the changes:

  1. Do we need to introduce JdbcProducer/NonXaJdbcProducer? I think the existing JdbcOutputFormat can cover all functionality of non-xa sink, can we reuse it?
  2. Does the non-xa sink need to implement StatefulSink and TwoPhaseCommittingSink? I think the non-xa sink neither has state nor supports 2PC.
  3. I don't see support for xa sink, is it not in the current plan?

Some my thoughts: I think we can introduce 2 implementations: JdbcXaSink and JdbcNonXaSink. JdbcXaSink implements StatefulSink and TwoPhaseCommittingSink, the JdbcNonXaSink implements the based org.apache.flink.api.connector.sink2.Sink. These two implementations can then be exposed to the user via .sink and .exactlyOnceSink, just like the legacy sink in org.apache.flink.connector.jdbc.JdbcSink.

WDYT?

wanglijie95 avatar Nov 21 '22 02:11 wanglijie95

Hi @wanglijie95 thanks for reviewing this..

  1. The idea for JdbcProducer is to allow that we can generate a new producer if needed, for example have a clickhouse producer based on BalancedClickhouseDataSource. So based on this, we will have by default JdbcProducer/NonXaJdbcProducer, JdbcProducer/XaJdbcProducer allowing other implementations without rewriting again common code. I will check the use of JdbcOutputFormat for NonXa.

  2. It correct, its there by design but it not use them. It should be the implementation of JdbcProducer that guarantee the delivery guarantee.. So for NonXa it will ignore the TwoPhaseCommittingSink or use them to commit what is pending, as it should give AtLeastOnce guarantee and for XA it will use them to give the ExactlyOnce guarantee

  3. Yes is in current plan, but the current implementation of XA used on SinkFunction use the RuntimeContext that is not available on Sink2, so we will need to reimplement it.
    I talk to @MartijnVisser about this and he ask me if we can use the pre/proCommit introduced on Sink2, and as Im not an expert on XA neither on the changes introduced in pre/proCommits here we are..

If this approach is wrong we could go with the 2 sinks approach, as you comment. But I believe that allowing to extend/modify the logic more related to the JdbcClient could be beneficial in some cases.

eskabetxe avatar Nov 21 '22 12:11 eskabetxe

Hi @eskabetxe, thanks for your reply.

  1. I think the purpose of providing three sink interfaces(Sink, StatefulSink, TwoPhaseCommittingSink) is to facilitate developers/users to inherit different ones according to their needs, so I prefer to the 2 sinks approach, especially we are likely to introduce only the non-xa sink in this pr.
  2. I don't see the actual need to introduce JdbcProducer abstraction at present, maybe it's better to do it later when it is really needed (we can decide which interfaces/methods need to be abstracted at that time). Even if the JdbcProducer is introduced, I think the NonXaJdbcProducer should resue the JdbcOutputFormat internally, which will make it easier to fix bugs or add feature options later (we don't need to modify in two places).
  3. In view of the current blocking(the RuntimeContext) of xa sink migration I think the jdbc sink migration can be disassembled into 2 subtasks (xa and non-xa), and we only do the non-xa sink in this ticket/pr.

wanglijie95 avatar Nov 27 '22 02:11 wanglijie95

Hi @wanglijie95, 1- Im ok with introducing only non-xa sink in this PR, I will clean up the code..

2- For what Im seeing we have the same problem with RuntimeContext, at open method (line 138), at line 144 is calling createAndOpenStatementExecutor, that internally do this "JdbcExec exec = statementExecutorFactory.apply(getRuntimeContext()); (line 170)", so we have the same problem Im afraid We could extract the common code to another class an use that on the two implementation what do you think?

Updated the code to be only non-xa..

eskabetxe avatar Nov 28 '22 09:11 eskabetxe

FYI: we're currently in the process of moving the JDBC code from the Flink repository to the dedicated Flink JDBC repository, see https://github.com/apache/flink-connector-jdbc/pull/1 which should move the current code as is.

When that's done, let's move this PR to that repository.

MartijnVisser avatar Nov 28 '22 20:11 MartijnVisser

We've now moved the code from the JDBC connector; please re-route this PR to https://github.com/apache/flink-connector-jdbc

MartijnVisser avatar Nov 29 '22 14:11 MartijnVisser

@MartijnVisser , @wanglijie95 re-route the PR on https://github.com/apache/flink-connector-jdbc/pull/2

closing this

eskabetxe avatar Nov 29 '22 15:11 eskabetxe