Describe the bug(Please use English)
A clear and concise description of what the bug is.
I ran a select sql by sql-client,the source database is MySQL ,target is sql-client print;
There is the Exception log below:
`Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue.
at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:201)
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161)
Caused by: java.lang.NoClassDefFoundError: org/apache/kafka/connect/source/SourceRecord
at java.lang.Class.getDeclaredMethods0(Native Method)
at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
at java.lang.Class.getDeclaredMethod(Class.java:2128)
at java.io.ObjectStreamClass.getPrivateMethod(ObjectStreamClass.java:1644)
at java.io.ObjectStreamClass.access$1700(ObjectStreamClass.java:79)
at java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:520)
at java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:494)
at java.security.AccessController.doPrivileged(Native Method)
at java.io.ObjectStreamClass.(ObjectStreamClass.java:494)
at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:391)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1134)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:624)
at org.apache.flink.util.SerializedValue.(SerializedValue.java:62)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobVertex(StreamingJobGraphGenerator.java:597)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:457)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setChaining(StreamingJobGraphGenerator.java:378)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:179)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:117)
at org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:934)
at org.apache.flink.client.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:50)
at org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:39)
at org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:56)
at org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.execute(AbstractSessionClusterExecutor.java:67)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1957)
at org.apache.flink.table.planner.delegation.ExecutorBase.executeAsync(ExecutorBase.java:55)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:795)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1225)
at org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$executeOperation$3(LocalExecutor.java:213)
at org.apache.flink.table.client.gateway.context.ExecutionContext.wrapClassLoader(ExecutionContext.java:90)
at org.apache.flink.table.client.gateway.local.LocalExecutor.executeOperation(LocalExecutor.java:213)
at org.apache.flink.table.client.gateway.local.LocalExecutor.executeQuery(LocalExecutor.java:235)
at org.apache.flink.table.client.cli.CliClient.callSelect(CliClient.java:479)
at org.apache.flink.table.client.cli.CliClient.callOperation(CliClient.java:412)
at org.apache.flink.table.client.cli.CliClient.lambda$executeStatement$0(CliClient.java:327)
at java.util.Optional.ifPresent(Optional.java:159)
at org.apache.flink.table.client.cli.CliClient.executeStatement(CliClient.java:327)
at org.apache.flink.table.client.cli.CliClient.executeInteractive(CliClient.java:297)
at org.apache.flink.table.client.cli.CliClient.executeInInteractiveMode(CliClient.java:221)
at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:151)
at org.apache.flink.table.client.SqlClient.start(SqlClient.java:95)
at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187)
... 1 more
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.connect.source.SourceRecord
at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 51 more
Shutting down the session...
done.`
Environment :
- Flink version : flink -1.13.3
- Flink CDC version: flink-connector-mysql-cdc-2.1.1.jar, flink-sql-connector-mysql-cdc-2.1.1.jar
- Database and version:
- MySQL 8
To Reproduce
Steps to reproduce the behavior:
- Thes test data
create table if not exists test ( id int primary key auto_increment, name varchar(32) ); CREATE TABLE test ( id INT, name STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = ' user name ', 'password' = ' password ', 'database-name' = ' Database name ', 'table-name' = ' Table name ' );
- The test code :select * from test;
- The error :
Additional Description
If applicable, add screenshots to help explain your problem.