flink-sql-lineage
flink-sql-lineage copied to clipboard
Error when parsing UDAF which the number of input and udf argument is equal
public class TestAggregateFunction extends AggregateFunction<String, TestAggregateFunction.TestAggregateAcc> {
public void accumulate(TestAggregateAcc acc, String param1, String param2, String param3) {
acc.test = param1 + param2 + param3;
}
@Override
public String getValue(TestAggregateAcc accumulator) {
return accumulator.test;
}
@Override
public TestAggregateAcc createAccumulator() {
return new TestAggregateAcc();
}
public static class TestAggregateAcc {
public String test;
}
}
create function test_aggregate as 'com.hw.lineage.flink.aggregatefunction.TestAggregateFunction'
String sql = "INSERT INTO dwd_hudi_users " +
"SELECT " +
" id ," +
" name ," +
" test_aggregate(concat_ws('_', name, email), address, 'test')," +
" birthday ," +
" ts ," +
" DATE_FORMAT(birthday, 'yyyyMMdd') " +
"FROM" +
" ods_mysql_user_detail group by id, name, birthday, ts ";
context.execute("CREATE TABLE IF NOT EXISTS ods_mysql_user_detail (" +
" id BIGINT PRIMARY KEY NOT ENFORCED ," +
" name STRING ," +
" birthday TIMESTAMP(3) ," +
" ts TIMESTAMP(3) ," +
" email STRING ," +
" address STRING ," +
" proc_time as proctime() " +
") WITH ( " +
" 'connector' = 'mysql-cdc' ," +
" 'hostname' = '127.0.0.1' ," +
" 'port' = '3306' ," +
" 'username' = 'root' ," +
" 'password' = 'xxx' ," +
" 'server-time-zone' = 'Asia/Shanghai' ," +
" 'database-name' = 'demo' ," +
" 'table-name' = 'users' " +
")");