flink-sql-lineage icon indicating copy to clipboard operation
flink-sql-lineage copied to clipboard

Error when parsing UDAF which the number of input and udf argument is equal

Open jeff-zou opened this issue 1 year ago • 0 comments

image

public class TestAggregateFunction extends AggregateFunction<String, TestAggregateFunction.TestAggregateAcc> {

    public void accumulate(TestAggregateAcc acc, String param1, String param2, String param3) {
        acc.test = param1 + param2 + param3;
    }

    @Override
    public String getValue(TestAggregateAcc accumulator) {
        return accumulator.test;
    }

    @Override
    public TestAggregateAcc createAccumulator() {
        return new TestAggregateAcc();
    }

    public static class TestAggregateAcc {
        public String test;
    }
}

create function test_aggregate as 'com.hw.lineage.flink.aggregatefunction.TestAggregateFunction'

  String sql = "INSERT INTO dwd_hudi_users " +
                "SELECT " +
                "   id ," +
                "   name ," +
                "   test_aggregate(concat_ws('_', name, email), address, 'test')," +
                "   birthday ," +
                "   ts ," +
                "   DATE_FORMAT(birthday, 'yyyyMMdd') " +
                "FROM" +
                "   ods_mysql_user_detail group by id, name, birthday, ts ";
context.execute("CREATE TABLE IF NOT EXISTS ods_mysql_user_detail (" +
                "       id                  BIGINT PRIMARY KEY NOT ENFORCED ," +
                "       name                STRING                          ," +
                "       birthday            TIMESTAMP(3)                    ," +
                "       ts                  TIMESTAMP(3)                    ," +
                "       email               STRING                          ," +
                "       address             STRING                          ," +
                "       proc_time as proctime()                              " +
                ") WITH ( " +
                "       'connector' = 'mysql-cdc'            ," +
                "       'hostname'  = '127.0.0.1'       ," +
                "       'port'      = '3306'                 ," +
                "       'username'  = 'root'                 ," +
                "       'password'  = 'xxx'          ," +
                "       'server-time-zone' = 'Asia/Shanghai' ," +
                "       'database-name' = 'demo'             ," +
                "       'table-name'    = 'users' " +
                ")");

jeff-zou avatar Apr 19 '24 09:04 jeff-zou