flink-connector-jdbc icon indicating copy to clipboard operation
flink-connector-jdbc copied to clipboard

[FLINK-36051] Re-prepare invalid statement even if connection is valid

Open morozov opened this issue 6 months ago • 1 comments

An SQL Server sink connector may fail with the following exception:

java.io.IOException: Writing records to JDBC failed.
	at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.writeRecord(JdbcOutputFormat.java:198)
	at org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.invoke(GenericJdbcSinkFunction.java:57)
	at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:68)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:62)
	at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator$ContextImpl.output(LegacyKeyedProcessOperator.java:134)
	at com.acme.flink.job.AcmeFlinkJob$1.processElement(AcmeFlinkJob.java:123)
	at com.acme.flink.job.AcmeFlinkJob$1.processElement(AcmeFlinkJob.java:118)
	at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.processElement(LegacyKeyedProcessOperator.java:88)
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:807)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:756)
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
	at java.base/java.lang.Thread.run(Thread.java:831)
Caused by: java.io.IOException: java.sql.BatchUpdateException: Could not find prepared statement with handle 1.
	at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:222)
	at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.writeRecord(JdbcOutputFormat.java:195)
	... 22 more
Caused by: java.sql.BatchUpdateException: Could not find prepared statement with handle 1.
	at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.executeBatch(SQLServerPreparedStatement.java:2231)
	at org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl.executeBatch(FieldNamedPreparedStatementImpl.java:65)
	at org.apache.flink.connector.jdbc.internal.executor.TableSimpleStatementExecutor.executeBatch(TableSimpleStatementExecutor.java:64)
	at org.apache.flink.connector.jdbc.internal.executor.TableBufferReducedStatementExecutor.executeBatch(TableBufferReducedStatementExecutor.java:101)
	at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.attemptFlush(JdbcOutputFormat.java:246)
	at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:216)
	... 23 more

While the connector can handle connection-level errors (see FLINK-16681), it doesn't handle the statement level ones.

Unfortunately, I don't know how to reproduce this issue on SQL Server. In theory, a statement can get un-prepared on the server side as a result of statement cache eviction due to memory pressure, but is not deterministic.

AFAIK, MySQL can also invalidate prepared statement under certain circumstances. This is reported as "1615 Prepared statement needs to be re-prepared".

morozov avatar Aug 14 '24 00:08 morozov