flink
flink copied to clipboard
[FLINK-28284][Connectors/Jdbc] Add JdbcSink with new format
What is the purpose of the change
Add a JdbcSink with new format (sink2)
Brief change log
- JdbcSink the new sink
- JdbcSinkWriter the writer used by the new sink
- JdbcQueryStatement the query and preparestatement that will be used
- JdbcWriterStatement the writer statement that will write to jdbc, this allow to implement database specific connection provider
- SimpleJdbcWriterStatement a simple implementation of JdbcWriterStatement
Verifying this change
This change added tests and can be verified as follows:
- JdbcITCase.testInsertWithSinkTo
- JdbcITCase.testObjectReuseWithSinkTo
Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): ( no)
- The public API, i.e., is any changed class annotated with
@Public(Evolving)
: (yes) - The serializers: ( no )
- The runtime per-record code paths (performance sensitive): (no
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: ( no)
- The S3 file system connector: (no)
Documentation
- Does this pull request introduce a new feature? (yes )
- If yes, how is the feature documented? ( JavaDocs / not documented)
@leonardBang @MartijnVisser could you check this?
CI report:
- b5734d14242f5f018a85f266cdad98ad8af94c3a Azure: SUCCESS
Bot commands
The @flinkbot bot supports the following commands:-
@flinkbot run azure
re-run the last Azure build
@flinkbot run azure
I will review this in the next few weeks :)
@wanglijie95 if you need anything from me, feel free to contact via slack
@wanglijie95 Do you have any idea when you can look into this ticket?
@MartijnVisser I started. I still need one more week to complete the first round review.
Thanks @eskabetxe for creating this PR. I have 3 questions about the changes:
- Do we need to introduce
JdbcProducer/NonXaJdbcProducer
? I think the existingJdbcOutputFormat
can cover all functionality of non-xa sink, can we reuse it? - Does the non-xa sink need to implement
StatefulSink
andTwoPhaseCommittingSink
? I think the non-xa sink neither has state nor supports 2PC. - I don't see support for xa sink, is it not in the current plan?
Some my thoughts:
I think we can introduce 2 implementations: JdbcXaSink
and JdbcNonXaSink
. JdbcXaSink
implements StatefulSink
and TwoPhaseCommittingSink
, the JdbcNonXaSink
implements the based org.apache.flink.api.connector.sink2.Sink
. These two implementations can then be exposed to the user via .sink
and .exactlyOnceSink
, just like the legacy sink in org.apache.flink.connector.jdbc.JdbcSink
.
WDYT?
Hi @wanglijie95 thanks for reviewing this..
-
The idea for JdbcProducer is to allow that we can generate a new producer if needed, for example have a clickhouse producer based on BalancedClickhouseDataSource. So based on this, we will have by default JdbcProducer/NonXaJdbcProducer, JdbcProducer/XaJdbcProducer allowing other implementations without rewriting again common code. I will check the use of JdbcOutputFormat for NonXa.
-
It correct, its there by design but it not use them. It should be the implementation of JdbcProducer that guarantee the delivery guarantee.. So for NonXa it will ignore the TwoPhaseCommittingSink or use them to commit what is pending, as it should give AtLeastOnce guarantee and for XA it will use them to give the ExactlyOnce guarantee
-
Yes is in current plan, but the current implementation of XA used on SinkFunction use the RuntimeContext that is not available on Sink2, so we will need to reimplement it.
I talk to @MartijnVisser about this and he ask me if we can use the pre/proCommit introduced on Sink2, and as Im not an expert on XA neither on the changes introduced in pre/proCommits here we are..
If this approach is wrong we could go with the 2 sinks approach, as you comment. But I believe that allowing to extend/modify the logic more related to the JdbcClient could be beneficial in some cases.
Hi @eskabetxe, thanks for your reply.
- I think the purpose of providing three sink interfaces(Sink, StatefulSink, TwoPhaseCommittingSink) is to facilitate developers/users to inherit different ones according to their needs, so I prefer to the 2 sinks approach, especially we are likely to introduce only the non-xa sink in this pr.
- I don't see the actual need to introduce JdbcProducer abstraction at present, maybe it's better to do it later when it is really needed (we can decide which interfaces/methods need to be abstracted at that time). Even if the JdbcProducer is introduced, I think the NonXaJdbcProducer should resue the JdbcOutputFormat internally, which will make it easier to fix bugs or add feature options later (we don't need to modify in two places).
- In view of the current blocking(the RuntimeContext) of xa sink migration I think the jdbc sink migration can be disassembled into 2 subtasks (xa and non-xa), and we only do the non-xa sink in this ticket/pr.
Hi @wanglijie95, 1- Im ok with introducing only non-xa sink in this PR, I will clean up the code..
2- For what Im seeing we have the same problem with RuntimeContext, at open method (line 138), at line 144 is calling createAndOpenStatementExecutor, that internally do this "JdbcExec exec = statementExecutorFactory.apply(getRuntimeContext()); (line 170)", so we have the same problem Im afraid We could extract the common code to another class an use that on the two implementation what do you think?
Updated the code to be only non-xa..
FYI: we're currently in the process of moving the JDBC code from the Flink repository to the dedicated Flink JDBC repository, see https://github.com/apache/flink-connector-jdbc/pull/1 which should move the current code as is.
When that's done, let's move this PR to that repository.
We've now moved the code from the JDBC connector; please re-route this PR to https://github.com/apache/flink-connector-jdbc
@MartijnVisser , @wanglijie95 re-route the PR on https://github.com/apache/flink-connector-jdbc/pull/2
closing this