Support all Aggregate Functions as parameter for columns of type AggregateFunction
Currently, only groupBitmap is supported as parameter for a table column of type AggregateFunction.
It would be good if all aggregate functions are supported.
It's in roadmap but we haven't completed refactoring yet. In v0.4.0, serialization/deserialization was refactored with some performance improvement but still not easy to extend. On the other hand, in order to support more AggregateFunction types, since there's no document about the data structures for read and write, we have to dig into ClickHouse code to figure it out one by one, which is going to take a while. Would be great if someone from the server team can document all the details, so that not only Java but all other clients will benefit from that.
I just ran into this issue as well. I have a AggregatingMergeTree table with a column of type AggregateFunction(argMin, Decimal(18, 4), DateTime).
The working around for now seems to be:
Create a new table with Engine = MergeTree and create a materialized view on top of the table if one has any AggregateFunction functions and wants to insert those columns.
@Zarathustra2 see https://kb.altinity.com/altinity-kb-schema-design/ingestion-aggregate-function/
@den-crane Thanks for the neat trick that is cool but unfortunately that does not work for me as I use the JDBC driver in a Flink pipeline, I am getting the error:
Caused by: java.sql.SQLException: External table, input function, and query parameter cannot be used together in PreparedStatement.
at com.clickhouse.jdbc.SqlExceptionUtils.clientError(SqlExceptionUtils.java:73)
at com.clickhouse.jdbc.internal.ClickHouseConnectionImpl.prepareStatement(ClickHouseConnectionImpl.java:743)
at com.clickhouse.jdbc.ClickHouseConnection.prepareStatement(ClickHouseConnection.java:121)
at org.apache.flink.connector.jdbc.internal.executor.SimpleBatchStatementExecutor.prepareStatements(SimpleBatchStatementExecutor.java:58)
at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.createAndOpenStatementExecutor(JdbcBatchingOutputFormat.java:144)
... 12 more
So I assume I have to go with 2 tables for now or write my own sink function
Hi @Zarathustra2, you don't have to specify parameters when using input function. Just added below test case for demonstration.
public void testInsertAggregateFunction() throws SQLException {
// https://kb.altinity.com/altinity-kb-schema-design/ingestion-aggregate-function/
Properties props = new Properties();
try (ClickHouseConnection conn = newConnection(props);
Statement s = conn.createStatement();
PreparedStatement ps = conn.prepareStatement(
"insert into test_insert_aggregate_function SELECT uid, updated, arrayReduce('argMaxState', [name], [updated]) "
+ "FROM input('uid Int16, updated DateTime, name String')")) {
s.execute("drop table if exists test_insert_aggregate_function;"
+ "CREATE TABLE test_insert_aggregate_function (uid Int16, updated SimpleAggregateFunction(max, DateTime), "
+ "name AggregateFunction(argMax, String, DateTime)) ENGINE=AggregatingMergeTree order by uid");
ps.setInt(1, 1);
ps.setString(2, "2020-01-02 00:00:00");
ps.setString(3, "b");
ps.addBatch();
ps.setInt(1, 1);
ps.setString(2, "2020-01-01 00:00:00");
ps.setString(3, "a");
ps.addBatch();
ps.executeBatch();
try (ResultSet rs = s.executeQuery(
"select uid, max(updated) AS updated, argMaxMerge(name) from test_insert_aggregate_function group by uid")) {
Assert.assertTrue(rs.next());
Assert.assertEquals(rs.getInt(1), 1);
Assert.assertEquals(rs.getString(2), "2020-01-02 00:00:00");
Assert.assertEquals(rs.getString(3), "b");
Assert.assertFalse(rs.next());
}
}
}
oh wow, @zhicwu, I thought I had to specify the params as well. Now it works!!! :)
Thanks so much!
This issue has been automatically marked as stale because it has not had activity in the last year. It will be closed in 30 days if no further activity occurs. Please feel free to leave a comment if you believe the issue is still relevant. Thank you for your contributions!