flink-connector-jdbc icon indicating copy to clipboard operation
flink-connector-jdbc copied to clipboard

[FLINK-36051] Re-prepare invalid statement even if connection is valid

Open morozov opened this issue 1 year ago • 1 comments

An SQL Server sink connector may fail with the following exception:

java.io.IOException: Writing records to JDBC failed.
	at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.writeRecord(JdbcOutputFormat.java:198)
	at org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.invoke(GenericJdbcSinkFunction.java:57)
	at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:68)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:62)
	at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator$ContextImpl.output(LegacyKeyedProcessOperator.java:134)
	at com.acme.flink.job.AcmeFlinkJob$1.processElement(AcmeFlinkJob.java:123)
	at com.acme.flink.job.AcmeFlinkJob$1.processElement(AcmeFlinkJob.java:118)
	at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.processElement(LegacyKeyedProcessOperator.java:88)
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:807)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:756)
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
	at java.base/java.lang.Thread.run(Thread.java:831)
Caused by: java.io.IOException: java.sql.BatchUpdateException: Could not find prepared statement with handle 1.
	at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:222)
	at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.writeRecord(JdbcOutputFormat.java:195)
	... 22 more
Caused by: java.sql.BatchUpdateException: Could not find prepared statement with handle 1.
	at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.executeBatch(SQLServerPreparedStatement.java:2231)
	at org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl.executeBatch(FieldNamedPreparedStatementImpl.java:65)
	at org.apache.flink.connector.jdbc.internal.executor.TableSimpleStatementExecutor.executeBatch(TableSimpleStatementExecutor.java:64)
	at org.apache.flink.connector.jdbc.internal.executor.TableBufferReducedStatementExecutor.executeBatch(TableBufferReducedStatementExecutor.java:101)
	at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.attemptFlush(JdbcOutputFormat.java:246)
	at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:216)
	... 23 more

While the connector can handle connection-level errors (see FLINK-16681), it doesn't handle the statement level ones.

Unfortunately, I don't know how to reproduce this issue on SQL Server. In theory, a statement can get un-prepared on the server side as a result of statement cache eviction due to memory pressure, but is not deterministic.

AFAIK, MySQL can also invalidate prepared statement under certain circumstances. This is reported as "1615 Prepared statement needs to be re-prepared".

morozov avatar Aug 14 '24 00:08 morozov

Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html)

boring-cyborg[bot] avatar Aug 14 '24 00:08 boring-cyborg[bot]

Given the absence of additional feedback, how do we proceed?

morozov avatar Sep 02 '24 17:09 morozov

Applied, thanks.

Jiabao-Sun avatar Sep 03 '24 01:09 Jiabao-Sun

Awesome work, congrats on your first merged pull request!

boring-cyborg[bot] avatar Sep 03 '24 01:09 boring-cyborg[bot]