seatunnel icon indicating copy to clipboard operation
seatunnel copied to clipboard

[Bug] [KafkaStreamTable] Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in the classpath.

Open zhangyuge1 opened this issue 2 years ago • 4 comments

Search before asking

  • [X] I had searched in the issues and found no similar issues.

What happened

image image

SeaTunnel Version

2.1.2

SeaTunnel Config

env {
  # You can set flink configuration here
  execution.parallelism = 1
  #execution.checkpoint.interval = 10000
  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}

source {
  # This is a example source plugin **only for test and demonstrate the feature source plugin**
  #"name":"jack";"age":12
  #"name":"Mike";"age":14
    KafkaTableStream {
        consumer.bootstrap.servers = "127.0.0.1:9092"
        consumer.group.id = "seatunnel1"
        offset.reset = earliest
        topics = "source"
        result_table_name = "test"
        format.type = csv
        schema = "[{\"field\":\"name\",\"type\":\"string\"},{\"field\":\"age\",\"type\":\"int\"}]"
        format.field-delimiter = ","
    }

  # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
  # please go to https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
}

transform {
    sql {
      sql = "select name,age from test"
    }

  # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
  # please go to https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
}

sink {
    kafka {
      topics = "sink"
      producer.bootstrap.servers = "127.0.0.1:9092"
    }
  # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
  # please go to https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console
}

Running Command

TestContainer

Error Exception

Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.

Flink or Spark Version

No response

Java or Scala Version

No response

Screenshots

No response

Are you willing to submit PR?

  • [ ] Yes I am willing to submit a PR!

Code of Conduct

zhangyuge1 avatar Jul 07 '22 10:07 zhangyuge1

What is your sample data?

adudu137 avatar Jul 08 '22 08:07 adudu137

是不是缺少了org.apache.flink.table.factories.TableSourceFactory类,所以对应的应该是flink连接JDBC的包。去官网上下载你对应flink版本的连接JDBC的包

adudu137 avatar Jul 08 '22 09:07 adudu137

是不是缺少了org.apache.flink.table.factories.TableSourceFactory类,所以对应的应该是flink连接JDBC的包。去官网上下载你对应flink版本的连接JDBC的包

我用的是Kakfa Connector,数据类型是csv

zhangyuge1 avatar Jul 08 '22 09:07 zhangyuge1

Seem like this problem happened when spilt connector jar from core jar. Then flink can't find right kafka factory implement class when use SPI. We should checkout why flink can't find kafka source factory after connector jar splited. If you want use kafka you can use 2.1.1. Also can help us to fix it.😁

Hisoka-X avatar Jul 14 '22 08:07 Hisoka-X

解决思路如下: 1.Could not find a suitable table factory,需要检查flink-1.xx.x/lib目录下是否存在jdbc相关包 2.No factory supports all properties,检查参数是否正确。 以flink-1.16.2的jdbc连接mysql 5.6.43为例,我依次丢了这些包到lib目录下 mysql-connector-java-8.0.29.jar, flink-connector-jdbc-1.16.2.jar, flink-table-common-1.16.2.jar 但是并不管用,还是一样的报错,我检查我的代码: CREATE TABLE products ( id INT, name string, description string, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector.type'='jdbc', 'connector.url'='jdbc:mysql://localhost:3306/mydb?useUnicode=true&characterEncoding=utf8&useSSL=true&serverTimezone=UTC', 'connector.table'='products', 'connector.driver'='com.mysql.cj.jdbc.Driver', 'connector.port' = '3306', 'connector.username'='root', 'connector.password'='root' ); 去官网上查看Table API Connectors的说明文档,我修改了代码,代码如下,可以正常运行了 CREATE TABLE products ( id INT, name string, description string, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://localhost:3306/mydb?useUnicode=true&characterEncoding=utf8&useSSL=true&serverTimezone=UTC', 'table-name' = 'products', 'username'='root', 'password'='root' ); 希望对你有帮助

ZhouYuling avatar Jun 27 '23 05:06 ZhouYuling