Issues with min/max over big_decimal field when using Multi-stage engine
A simple min/max over a big_decimal fails when using the multi-stage engine,
Query:
select min(block_number) from org_2dYiMRMfas142XRKQ3bJIqmN3V6_erc20_balance_changes limit 10
Error:
Error Code: 200
QueryExecutionError:
Received error query execution result block: {1000=Unable to create DataBlock
java.lang.RuntimeException: Unable to create DataBlock
at org.apache.pinot.query.runtime.blocks.TransferableBlock.getDataBlock(TransferableBlock.java:140)
at org.apache.pinot.query.mailbox.GrpcSendingMailbox.toMailboxContent(GrpcSendingMailbox.java:121)
at org.apache.pinot.query.mailbox.GrpcSendingMailbox.send(GrpcSendingMailbox.java:72)
at org.apache.pinot.query.runtime.operator.exchange.BlockExchange.sendBlock(BlockExchange.java:126)
...
Caused by: java.lang.ClassCastException: class java.lang.Double cannot be cast to class java.math.BigDecimal (java.lang.Double and java.math.BigDecimal are in module java.base of loader 'bootstrap')
at org.apache.pinot.core.common.datablock.DataBlockBuilder.buildFromRows(DataBlockBuilder.java:148)
at org.apache.pinot.query.runtime.blocks.TransferableBlock.getDataBlock(TransferableBlock.java:133)
... 16 more}
org.apache.pinot.query.service.dispatch.QueryDispatcher.getResultTable(QueryDispatcher.java:332)
org.apache.pinot.query.service.dispatch.QueryDispatcher.runReducer(QueryDispatcher.java:276)
org.apache.pinot.query.service.dispatch.QueryDispatcher.submitAndReduce(QueryDispatcher.java:100)
org.apache.pinot.broker.requesthandler.MultiStageBrokerRequestHandler.handleRequest(MultiStageBrokerRequestHandler.java:199)
the same query works on v1 query engine.
Could be related to #12670
Can you try select min(cast(block_number as double)) from org_2dYiMRMfas142XRKQ3bJIqmN3V6_erc20_balance_changes limit 10 and see if it works? Even in v1 this query won't be executed on big_decimal, but on double with potential precision loss
The original query select min(block_number) from org_2dYiMRMfas142XRKQ3bJIqmN3V6_erc20_balance_changes limit 10 works well on v1 for us; tried your version casting as double works. but it shouldn't be necessary here
v2 is strong typed, and currently min() takes double as input and return double, that is why it failed in v2. Ideally min() should be able to directly take big_decimal and return big_decimal.
Ideally it would support all the types that other calcite compatible DBs would accept
- All numeric data types
- All date/time types
- String data types:
The problem here is that the engine is using two different operators.
I was able to replicate the issue with the following quickstart query:
select min(cast(AirTime as DECIMAL)) from airlineStats limit 10
Which produces the following plan:
Execution Plan
LogicalSort(fetch=[10])
PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[false])
PinotLogicalAggregate(group=[{}], agg#0=[MIN($0)], aggType=[FINAL]) <-- whose return type is DECIMAL
PinotLogicalExchange(distribution=[hash]) <-- whose return type is double
PinotLogicalAggregate(group=[{}], agg#0=[MIN($0)], aggType=[LEAF]) <-- whose return type is DOUBLE
LogicalProject($f0=[CAST($4):DECIMAL(1000, 0) NOT NULL])
LogicalTableScan(table=[[default, airlineStats]])
Therefore the leaf stage sends doubles while the intermediate state expects big decimals. Therefore the issue here is not SSE (aka V1) but the way we resolve functions. I still don't know where, but it looks like at some point one of our rules has to be breaking the type matching. Ideally we should always use the correct aggregate function (AFAIK we should be able to do that now that the registry has been unified) but in case we are not, at least we should add a transformation node that automatically converts (aka casts) values.
The issue here is that we're registering the standard operator for aggregation functions like MIN / MAX / SUM from Calcite's SqlStdOperatorTable. These operators infer the return type as DECIMAL if the input operand type is DECIMAL. However, the actual aggregation function implementation in Pinot returns a double value regardless of the input operand type. This causes an issue during data exchange because we cast the output value based on the expected output type from the query plan. This works fine for the other numeric types since double values can be cast to int, long, float but not to BigDecimal. There are two ways to fix this -
-
Update all these aggregation function implementations in Pinot to use the appropriate intermediate and final return type based on the input operand type (possibly a big change with implications to backward compatibility even in the v1 engine).
-
~Register a custom operator for each of these aggregation functions that defines the return type as
DOUBLEinstead of using the standard SQL operators. This is not ideal but reflects the state of the current aggregation functions in Pinot accurately and will resolve the error described in this issue.~ This doesn't work because we don't want to change the actual user facing return type for these functions (backward incompatible and we want to evolve the function implementations to be truly polymorphic in the future which would change these back). -
Fix the type conversion logic for single-stage -> multi-stage to also handle
BigDecimalappropriately.