seatunnel
seatunnel copied to clipboard
[Bug] [KafkaStreamTable] Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in the classpath.
Search before asking
- [X] I had searched in the issues and found no similar issues.
What happened
SeaTunnel Version
2.1.2
SeaTunnel Config
env {
# You can set flink configuration here
execution.parallelism = 1
#execution.checkpoint.interval = 10000
#execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}
source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
#"name":"jack";"age":12
#"name":"Mike";"age":14
KafkaTableStream {
consumer.bootstrap.servers = "127.0.0.1:9092"
consumer.group.id = "seatunnel1"
offset.reset = earliest
topics = "source"
result_table_name = "test"
format.type = csv
schema = "[{\"field\":\"name\",\"type\":\"string\"},{\"field\":\"age\",\"type\":\"int\"}]"
format.field-delimiter = ","
}
# If you would like to get more information about how to configure seatunnel and see full list of source plugins,
# please go to https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
}
transform {
sql {
sql = "select name,age from test"
}
# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
# please go to https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
}
sink {
kafka {
topics = "sink"
producer.bootstrap.servers = "127.0.0.1:9092"
}
# If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
# please go to https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console
}
Running Command
TestContainer
Error Exception
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.
Flink or Spark Version
No response
Java or Scala Version
No response
Screenshots
No response
Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
What is your sample data?
是不是缺少了org.apache.flink.table.factories.TableSourceFactory类,所以对应的应该是flink连接JDBC的包。去官网上下载你对应flink版本的连接JDBC的包
是不是缺少了org.apache.flink.table.factories.TableSourceFactory类,所以对应的应该是flink连接JDBC的包。去官网上下载你对应flink版本的连接JDBC的包
我用的是Kakfa Connector,数据类型是csv
Seem like this problem happened when spilt connector jar from core jar. Then flink can't find right kafka factory implement class when use SPI. We should checkout why flink can't find kafka source factory after connector jar splited. If you want use kafka you can use 2.1.1. Also can help us to fix it.😁
解决思路如下:
1.Could not find a suitable table factory,需要检查flink-1.xx.x/lib目录下是否存在jdbc相关包
2.No factory supports all properties,检查参数是否正确。
以flink-1.16.2的jdbc连接mysql 5.6.43为例,我依次丢了这些包到lib目录下
mysql-connector-java-8.0.29.jar, flink-connector-jdbc-1.16.2.jar, flink-table-common-1.16.2.jar
但是并不管用,还是一样的报错,我检查我的代码:
CREATE TABLE products ( id INT, name string, description string, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector.type'='jdbc', 'connector.url'='jdbc:mysql://localhost:3306/mydb?useUnicode=true&characterEncoding=utf8&useSSL=true&serverTimezone=UTC', 'connector.table'='products', 'connector.driver'='com.mysql.cj.jdbc.Driver', 'connector.port' = '3306', 'connector.username'='root', 'connector.password'='root' );
去官网上查看Table API Connectors的说明文档,我修改了代码,代码如下,可以正常运行了
CREATE TABLE products ( id INT, name string, description string, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://localhost:3306/mydb?useUnicode=true&characterEncoding=utf8&useSSL=true&serverTimezone=UTC', 'table-name' = 'products', 'username'='root', 'password'='root' );
希望对你有帮助