[Bug] [flinksql] 单流滚动窗口异常
Search before asking
- [X] I had searched in the issues and found no similar issues.
What happened
任务刚起来往kafka插入数据经过计算将结果正确写入mysql。过了一夜,早上再给kafka插入数据,sink端就报错了:
java.lang.RuntimeException: Connection maybe closed at com.dtstack.flinkx.connector.jdbc.sink.JdbcOutputFormat.processWriteException(JdbcOutputFormat.java:343) at com.dtstack.flinkx.connector.jdbc.sink.JdbcOutputFormat.writeSingleRecordInternal(JdbcOutputFormat.java:193) at com.dtstack.flinkx.sink.format.BaseRichOutputFormat.writeSingleRecord(BaseRichOutputFormat.java:427) at com.dtstack.flinkx.sink.format.BaseRichOutputFormat.lambda$writeRecordInternal$1(BaseRichOutputFormat.java:448) at java.util.ArrayList.forEach(ArrayList.java:1257) at com.dtstack.flinkx.sink.format.BaseRichOutputFormat.writeRecordInternal(BaseRichOutputFormat.java:448) at com.dtstack.flinkx.sink.format.BaseRichOutputFormat.lambda$initTimingSubmitTask$0(BaseRichOutputFormat.java:404) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Suppressed: java.lang.RuntimeException: java.lang.RuntimeException: Connection maybe closed at com.dtstack.flinkx.sink.format.BaseRichOutputFormat.close(BaseRichOutputFormat.java:321) at com.dtstack.flinkx.sink.DtOutputFormatSinkFunction.close(DtOutputFormatSinkFunction.java:127) at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:797) at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:776) at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:691) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:595) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:758) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:573) ... 1 more [CIRCULAR REFERENCE:java.lang.RuntimeException: Connection maybe closed] Caused by: java.sql.SQLException: No operations allowed after statement closed. at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:965) at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:898) at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:887) at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:861) at com.mysql.jdbc.StatementImpl.checkClosed(StatementImpl.java:442) at com.mysql.jdbc.ServerPreparedStatement.checkClosed(ServerPreparedStatement.java:510) at com.mysql.jdbc.ServerPreparedStatement.setTimestamp(ServerPreparedStatement.java:2070) at com.dtstack.flinkx.connector.jdbc.statement.FieldNamedPreparedStatementImpl.setTimestamp(FieldNamedPreparedStatementImpl.java:234) at com.dtstack.flinkx.connector.jdbc.converter.JdbcRowConverter.lambda$createExternalConverter$596bb4b9$1(JdbcRowConverter.java:219) at com.dtstack.flinkx.connector.jdbc.converter.JdbcRowConverter.lambda$wrapIntoNullableExternalConverter$579ce5d$1(JdbcRowConverter.java:84) at com.dtstack.flinkx.connector.jdbc.converter.JdbcRowConverter.toExternal(JdbcRowConverter.java:113) at com.dtstack.flinkx.connector.jdbc.converter.JdbcRowConverter.toExternal(JdbcRowConverter.java:51) at com.dtstack.flinkx.connector.jdbc.sink.PreparedStmtProxy.writeSingleRecordInternal(PreparedStmtProxy.java:194) at com.dtstack.flinkx.connector.jdbc.sink.JdbcOutputFormat.writeSingleRecordInternal(JdbcOutputFormat.java:190) ... 12 more
What you expected to happen
1
How to reproduce
1
Anything else
1
Version
v1.2
Are you willing to submit PR?
- [X] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct