Alink icon indicating copy to clipboard operation
Alink copied to clipboard

Flink sql 执行报错

Open gbguanbo opened this issue 7 months ago • 0 comments

from pyalink.alink import * #ExecutionEnvironment、BatchTableEnvironment、StreamExecutionEnvironment 和 StreamTableEnvironment。 benv, btenv, senv, stenv = useRemoteEnv("localhost", 8081, 1)

jars = "file:///home/guanbo/flink-1.13.3/lib/flink-csv-1.13.3.jar;"
+"file:///home/guanbo/flink-1.13.3/lib/flink-dist_2.11-1.13.3.jar;"
+"file:///home/guanbo/flink-1.13.3/lib/flink-json-1.13.3.jar;"
+"file:///home/guanbo/flink-1.13.3/lib/flink-shaded-zookeeper-3.4.14.jar"
+"file:///home/guanbo/flink-1.13.3/lib/flink-table_2.11-1.13.3.jar;"
+"file:///home/guanbo/flink-1.13.3/lib/flink-table-blink_2.11-1.13.3.jar;"
+"file:///home/guanbo/flink-1.13.3/lib/alink_core_flink-1.13_2.11-1.6.2.jar;"
+"file:///home/guanbo/flink-1.13.3/lib/alink_python_flink-1.13_2.11-1.6.2.jar;"
+"file:///home/guanbo/flink-1.13.3/lib/log4j-1.2-api-2.12.1.jar;"
+"file:///home/guanbo/flink-1.13.3/lib/log4j-api-2.12.1.jar;"
+"file:///home/guanbo/flink-1.13.3/lib/log4j-core-2.12.1.jar;"
+"file:///home/guanbo/flink-1.13.3/lib/log4j-slf4j-impl-2.12.1.jar;"
+"file:///home/guanbo/flink-1.13.3/lib/flink-connector-base-1.13.3.jar;"
+"file:///home/guanbo/flink-1.13.3/lib/flink-streaming-java_2.11-1.13.3.jar;"
+"file:///home/guanbo/flink-1.13.3/lib/flink-table-api-java-1.13.3.jar;"
+"file:///home/guanbo/flink-1.13.3/lib/flink-table-api-java-bridge_2.11-1.13.3.jar;"
+"file:///home/guanbo/flink-1.13.3/lib/flink-table-common-1.13.3.jar" stenv.get_config().get_configuration().set_string("pipeline.jars", jars) source_ddl = """ CREATE TABLE source_table( id STRING, price DOUBLE, ts BIGINT ) WITH ( 'connector' = 'kafka', 'topic' = 'test-topic', 'properties.bootstrap.servers' = '172.20.8.230:9092', 'properties.group.id' = 'test_3', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ) """

sink_ddl = """ CREATE TABLE sink_table( id STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'sink_topic', 'properties.bootstrap.servers' = '172.20.8.230:9092', 'format' = 'json' ) """

stenv.execute_sql(source_ddl) stenv.execute_sql(sink_ddl)

stenv.sql_query("SELECT id FROM source_table")
.execute_insert("sink_table") StreamOperator.execute()

Py4JJavaError: An error occurred while calling o4729.sqlQuery. : org.apache.flink.table.api.ValidationException: SQL validation failed. findAndCreateTableSource failed. at org.apache.flink.table.calcite.FlinkPlannerImpl.validateInternal(FlinkPlannerImpl.scala:147) at org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:107) at org.apache.flink.table.sqlexec.SqlToOperationConverter.convert(SqlToOperationConverter.java:151) at org.apache.flink.table.planner.ParserImpl.parse(ParserImpl.java:92) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:704) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:750) Caused by: org.apache.flink.table.api.TableException: findAndCreateTableSource failed. at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:45) at org.apache.flink.table.catalog.DatabaseCalciteSchema.convertCatalogTable(DatabaseCalciteSchema.java:176) at org.apache.flink.table.catalog.DatabaseCalciteSchema.convertTable(DatabaseCalciteSchema.java:114) at org.apache.flink.table.catalog.DatabaseCalciteSchema.lambda$getTable$0(DatabaseCalciteSchema.java:95) at java.util.Optional.map(Optional.java:215) at org.apache.flink.table.catalog.DatabaseCalciteSchema.getTable(DatabaseCalciteSchema.java:83) at org.apache.calcite.jdbc.SimpleCalciteSchema.getImplicitTable(SimpleCalciteSchema.java:83) at org.apache.calcite.jdbc.CalciteSchema.getTable(CalciteSchema.java:289) at org.apache.calcite.sql.validate.EmptyScope.resolve_(EmptyScope.java:143) at org.apache.calcite.sql.validate.EmptyScope.resolveTable(EmptyScope.java:99) at org.apache.calcite.sql.validate.DelegatingScope.resolveTable(DelegatingScope.java:203) at org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:112) at org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:184) at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:975) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3085) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3070) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3335) at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60) at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:975) at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:952) at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:704) at org.apache.flink.table.calcite.FlinkPlannerImpl.validateInternal(FlinkPlannerImpl.scala:142) ... 15 more Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in the classpath.

Reason: Required context properties mismatch.

The following properties are requested: connector=kafka format=json properties.bootstrap.servers=172.20.8.230:9092 properties.group.id=test_3 scan.startup.mode=latest-offset schema.0.data-type=VARCHAR(2147483647) schema.0.name=id schema.1.data-type=DOUBLE schema.1.name=price schema.2.data-type=BIGINT schema.2.name=ts topic=test-topic

The following factories have been considered: com.alibaba.flink.ml.operator.ops.table.MLTableSourceFactory org.apache.flink.table.sources.CsvBatchTableSourceFactory org.apache.flink.table.sources.CsvAppendTableSourceFactory at org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:300) at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:178) at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:139) at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:93) at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:41) ... 41 more

gbguanbo avatar Jul 29 '24 08:07 gbguanbo