flink-cdc
flink-cdc copied to clipboard
oracle cdc must create and write tables and cannot synchronize slave libraries
Describe the bug(Please use English) oracle cdc must create and write tables and cannot synchronize slave libraries. I need to sync oracle cdc data from oracle slave repository. However, the slave library cannot create and write tables, so the create table and write table permissions cannot be given. But the program reports an error. The oracle slave library can not give permissions:
GRANT CREATE TABLE TO flinkuser;
GRANT LOCK ANY TABLE TO flinkuser;
GRANT ALTER ANY TABLE TO flinkuser;
GRANT CREATE SEQUENCE TO flinkuser;
Environment :
- Flink version : 1.14.5
- Flink CDC version: 2.2.1
- Database and version: oracle 11g slave node
To Reproduce Steps to reproduce the behavior:
- The test data : oracle cdc
- The test code :
create table test_json
(
cdc_json string
)WITH (
'hostname' = '10.2.5.1',
'connector' = 'cdc-json',
'port'='66',
'username'='flinkuser5',
'password'='xx',
'database-list'='helowin',
'schema-list'='FLINKUSER5',
'table-list'='FLINKUSER5.CDC_TEST',
'source-type'='oracle',
'extra-prop.debezium.database.tablename.case.insensitive' = 'false',
'extra-prop.debezium.log.mining.strategy' = 'online_catalog',
'extra-prop.debezium.log.mining.continuous.mine' = 'true',
'extra-prop.debezium.decimal.handling.mode' = 'double',
'extra-prop.debezium.snapshot.mode' = 'schema_only'
);
- The error :
rg.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:252)
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:242)
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:233)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:684)
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:444)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:316)
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:314)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at akka.actor.Actor.aroundReceive(Actor.scala:537)
at akka.actor.Actor.aroundReceive$(Actor.scala:535)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
at akka.actor.ActorCell.invoke(ActorCell.scala:548)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
at akka.dispatch.Mailbox.run(Mailbox.scala:231)
at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42)
at io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.execute(LogMinerStreamingChangeEventSource.java:208)
at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:152)
at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:119)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.sql.SQLSyntaxErrorException: ORA-01031: 权限不足
at oracle.jdbc.driver.T4CTTIoer11.processError(T4CTTIoer11.java:509)
at oracle.jdbc.driver.T4CTTIoer11.processError(T4CTTIoer11.java:461)
at oracle.jdbc.driver.T4C8Oall.processError(T4C8Oall.java:1104)
at oracle.jdbc.driver.T4CTTIfun.receive(T4CTTIfun.java:550)
at oracle.jdbc.driver.T4CTTIfun.doRPC(T4CTTIfun.java:268)
at oracle.jdbc.driver.T4C8Oall.doOALL(T4C8Oall.java:655)
at oracle.jdbc.driver.T4CCallableStatement.doOall8(T4CCallableStatement.java:265)
at oracle.jdbc.driver.T4CCallableStatement.doOall8(T4CCallableStatement.java:86)
at oracle.jdbc.driver.T4CCallableStatement.executeForRows(T4CCallableStatement.java:965)
at oracle.jdbc.driver.OracleStatement.doExecuteWithTimeout(OracleStatement.java:1205)
at oracle.jdbc.driver.OraclePreparedStatement.executeInternal(OraclePreparedStatement.java:3666)
at oracle.jdbc.driver.T4CCallableStatement.executeInternal(T4CCallableStatement.java:1358)
at oracle.jdbc.driver.OraclePreparedStatement.execute(OraclePreparedStatement.java:3778)
at oracle.jdbc.driver.OracleCallableStatement.execute(OracleCallableStatement.java:4251)
at oracle.jdbc.driver.OraclePreparedStatementWrapper.execute(OraclePreparedStatementWrapper.java:1081)
at io.debezium.connector.oracle.logminer.LogMinerHelper.executeCallableStatement(LogMinerHelper.java:693)
at io.debezium.connector.oracle.logminer.LogMinerHelper.createFlushTable(LogMinerHelper.java:122)
at io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.execute(LogMinerStreamingChangeEventSource.java:124)
... 7 more
Caused by: Error : 1031, Position : 0, Sql = CREATE TABLE LOG_MINING_FLUSH(LAST_SCN NUMBER(19,0)), OriginalSql = CREATE TABLE LOG_MINING_FLUSH(LAST_SCN NUMBER(19,0)), Error Msg = ORA-01031: 权限不足
at oracle.jdbc.driver.T4CTTIoer11.processError(T4CTTIoer11.java:513)
... 24 more
Additional Description If applicable, add screenshots to help explain your problem.