[RFC] OLAP—A OS SQL Query Solution Plugin
Overview
Our team has implemented a SQL plugin named OLAP based on our internal versions of OS. This plugin has been widely used in analytical scenarios and has achieved excellent results in our internal business for two years.
To support modern SQL query functionalities, OLAP adopts a groundbreaking design, rewriting almost the entire query process except for data reading from Lucene. This design enables OLAP to execute data queries in MPP mode. In simple terms, OLAP can be divided into three parts: plan optimization, task scheduling, and execution.
In the query plan optimization phase, OLAP utilizes Apache Calcite for SQL query parsing and plan optimization. It employs Calcite's HepPlanner and VolcanoPlanner for rule-based optimization (RBO) and top-down cost-based optimization (CBO). The layout of the optimizer draws inspiration from Flink's chained optimization, which facilitates the insertion of new optimization rules at any stage. Users can also control the query optimization path based on query parameters. This optimization approach lays the foundation for a pluggable native engine, allowing for different optimization rules to be applied based on different native engines, and the native engine can be selected during the query process.
In the scheduling phase, the query plan is converted into a Directed Acyclic Graph before scheduling begins. The scheduler further transforms this DAG into a series of tasks that can be executed on the cluster and links them based on upstream and downstream dependencies. The scheduling process supports task retries and monitors the health of cluster nodes to avoid dispatching tasks to unavailable nodes.
The execution layer manages the entire lifecycle of query tasks, ensuring they run according to plan, and carries out fault recovery. It is also responsible for data transmission and exchange. The execution layer effectively manages computing resources and reports error information. Additionally, it collects and reports performance metrics of task execution to provide data support for performance tuning. In the computation phase, OLAP has used three different native engines for acceleration, namely Calcite, the community edition of Velox, and an in-house native engine, achieving significant performance gains.
Plan Optimization
OLAP can map Index metadata to SQL metadata and treat regular expressions of time-series indexes as table partitions.
POST /_olap/database/test_database
POST /_olap/table
{
"database": "test_database",
"table": "table",
"type": "external",
"connector": {
"es": {
"index_pattern": "index\\-(\\d{4}.\\d{2}.\\d{2})",
"index_pattern_match": ["partition_0"],
"mapping_merge_policy": "created_as_order",
"partitions":[
{
"field": "_pdate_",
"type": "varchar"
}
]
}
}
}
POST /_olap/sql
{
"sql":"select * from `test_database`.`table` where _pdate_ = "2025.11.14"
}
POST /_olap/sql
{
"sql":"select * from `index-2025.11.14`
}
The query plan can be optimized for both scatter-gather and MPP models. During the planning phase, partition pruning and predicate pushdown are performed based on query conditions. It also supports optimization operations such as RuntimeFilter and TopN.
As shown in the figure, the process first enters the LogicalOptimizer for some conventional RBO optimizations. These rules are common optimization methods in many databases and are purely logical, independent of data distribution. Subsequently, some CBO optimizations are performed, mainly JoinReorder, which relies on collected metadata information.
The physical optimization phase mainly incorporates data distribution factors into the query plan, such as when to insert an Exchange in the data flow. This phase involves more complex optimizations like two-stage sorting and two-stage aggregation. At the end of the physical optimization phase, OLAP will perform RuntimeFilter-related optimizations.
Pluggable Native Engine
The choice of the native engine is actually determined during the query planning phase. The main job of the EngineOptimizer is to extract the parts of the query plan related to the native engine and convert them into a corresponding native engine plan. This step is a prerequisite for a pluggable native engine. The remaining parts of the plan are responsible for data reading and data transmission, which are common parts independent of the native engine.
The subsequent ExecOptimizer part generates the corresponding native engine computation objects from the plan generated in the previous stage. During execution, these generated computation objects are called directly.
The native engine-related logic here is all interface-based. If you want to support any other native engine, such as Velox or DataFusion, you just need to implement the corresponding interfaces.
Engine Interface Diagram
Explain
OLAP also supports displaying the query plan.
--------------------------------------------------
SET runtime_filter_enabled=true;
SELECT a.long_field
FROM (select id,long_field/2 as long_field from `index-v1` where id > 13 and long_field/2 > 767) a
JOIN `index-v2` b on a.id = b.id and a.long_field = b.long_field
EXPLAIN RESULT
RecordType(DOUBLE long_field)
==================== APPLY [SQL TO PLAN CONVERTER]====================
0:LogicalProject(long_field=[$1])
1:LogicalJoin(condition=[AND(=($0, $2), =($1, $3))], joinType=[inner])
4:LogicalProject(id=[$0], long_field=[divide_as_fraction($1, 2)])
5:LogicalFilter(condition=[AND(>($0, 13), >(divide_as_fraction($1, 2), 767))])
6:LogicalProject(id=[$10], long_field=[$14])
7:LogicalTableScan(table=[appendix$1])
2:LogicalProject(id=[$10], long_field0=[CAST($14):DOUBLE])
3:LogicalTableScan(table=[appendix$0])
==================== APPLY [LOGICAL OPTIMIZER]====================
0:LogicalProject(long_field=[$1])
1:LogicalJoin(condition=[AND(=($0, $2), =($1, $3))], joinType=[inner])
4:LogicalProject(id=[$0], long_field=[divide_as_fraction($1, 2)])
5:LogicalFilter(condition=[>(divide_as_fraction($1, 2), 767)])
6:LogicalTableScan(table=[appendix$3])
2:LogicalProject(id=[$0], long_field0=[CAST($1):DOUBLE])
3:LogicalTableScan(table=[appendix$2])
==================== APPLY [CBO OPTIMIZER]====================
0:LogicalProject(long_field=[$1])
1:LogicalJoin(condition=[AND(=($0, $2), =($1, $3))], joinType=[inner])
4:LogicalProject(id=[$0], long_field=[divide_as_fraction($1, 2)])
5:LogicalFilter(condition=[>(divide_as_fraction($1, 2), 767)])
6:LogicalTableScan(table=[appendix$3])
2:LogicalProject(id=[$0], long_field0=[CAST($1):DOUBLE])
3:LogicalTableScan(table=[appendix$2])
==================== APPLY [SCATTER GATHER PHYSICAL OPTIMIZER]====================
0:PhysicalProject(long_field=[$1])
1:PhysicalJoin(condition=[AND(=($0, $2), =($1, $3))], joinType=[inner], hints=[[]])
5:PhysicalExchange(exchangeId=[0], distribution=[{"type":"EXECUTION_SINGLETON"}], exchangeInfo=[{"partitionActionInfos":[{"partitionActionType":"GATHER"}]}])
6:PhysicalProject(id=[$0], long_field=[divide_as_fraction($1, 2)])
7:PhysicalFilter(condition=[>(divide_as_fraction($1, 2), 767)])
8:PhysicalTableScan(table=[appendix$3], hints=[[]], distribution=[{"tableId":0,"type":"SOURCE_RANDOM"}])
2:PhysicalExchange(exchangeId=[3], distribution=[{"type":"EXECUTION_SINGLETON"}], exchangeInfo=[{"partitionActionInfos":[{"partitionActionType":"GATHER"}]}])
3:PhysicalProject(id=[$0], long_field0=[CAST($1):DOUBLE])
4:PhysicalTableScan(table=[appendix$2], hints=[[]], distribution=[{"tableId":1,"type":"SOURCE_RANDOM"}])
==================== APPLY [PHYSICAL RUNTIME FILTER OPTIMIZER]====================
0:PhysicalProject(long_field=[$1])
1:PhysicalJoin(condition=[AND(=($0, $2), =($1, $3))], joinType=[inner], hints=[[]])
5:PhysicalExchange(exchangeId=[0], distribution=[{"type":"EXECUTION_SINGLETON"}], exchangeInfo=[{"partitionActionInfos":[{"partitionActionType":"GATHER"}]}])
6:PhysicalProject(id=[$0], long_field=[divide_as_fraction($1, 2)])
7:PhysicalFilter(condition=[>(divide_as_fraction($1, 2), 767)])
8:PhysicalRfTableScan(table=[appendix$3], hints=[[]], distribution=[{"tableId":0,"type":"SOURCE_RANDOM"}], runtimeFilterToField=[appendix$4])
9:PhysicalExchange(exchangeId=[6], distribution=[{"tableId":0,"type":"SOURCE_BROADCAST"}], exchangeInfo=[appendix$5])
10:PhysicalRuntimeFilterBuilder(select=[0], runtimeFilterType=[TERMS], runtimeFilterStage=[FINAL])
11:PhysicalExchange(exchangeId=[5], distribution=[{"type":"EXECUTION_SINGLETON"}], exchangeInfo=[{"partitionActionInfos":[{"partitionActionType":"GATHER"}]}])
12:PhysicalRuntimeFilterBuilder(select=[0], runtimeFilterType=[TERMS], runtimeFilterStage=[PARTIAL])
ReuseRel(reuseId=3, relType=PhysicalProject)
2:PhysicalExchange(exchangeId=[3], distribution=[{"type":"EXECUTION_SINGLETON"}], exchangeInfo=[{"partitionActionInfos":[{"partitionActionType":"GATHER"}]}])
3:PhysicalProject(id=[$0], long_field0=[CAST($1):DOUBLE])
4:PhysicalTableScan(table=[appendix$2], hints=[[]], distribution=[{"tableId":1,"type":"SOURCE_RANDOM"}])
==================== APPLY [ENGINE OPTIMIZER]====================
0:PhysicalEngine(engineFragment=[appendix$6])
4:PhysicalExchange(exchangeId=[0], distribution=[{"type":"EXECUTION_SINGLETON"}], exchangeInfo=[{"partitionActionInfos":[{"partitionActionType":"GATHER"}]}])
5:PhysicalEngine(engineFragment=[appendix$8])
6:PhysicalRfTableScan(table=[appendix$3], hints=[[]], distribution=[{"tableId":0,"type":"SOURCE_RANDOM"}], runtimeFilterToField=[appendix$4])
7:PhysicalExchange(exchangeId=[6], distribution=[{"tableId":0,"type":"SOURCE_BROADCAST"}], exchangeInfo=[appendix$5])
8:PhysicalRuntimeFilterBuilder(select=[0], runtimeFilterType=[TERMS], runtimeFilterStage=[FINAL])
9:PhysicalExchange(exchangeId=[5], distribution=[{"type":"EXECUTION_SINGLETON"}], exchangeInfo=[{"partitionActionInfos":[{"partitionActionType":"GATHER"}]}])
10:PhysicalRuntimeFilterBuilder(select=[0], runtimeFilterType=[TERMS], runtimeFilterStage=[PARTIAL])
ReuseRel(reuseId=2, relType=PhysicalEngine)
1:PhysicalExchange(exchangeId=[3], distribution=[{"type":"EXECUTION_SINGLETON"}], exchangeInfo=[{"partitionActionInfos":[{"partitionActionType":"GATHER"}]}])
2:PhysicalEngine(engineFragment=[appendix$7])
3:PhysicalTableScan(table=[appendix$2], hints=[[]], distribution=[{"tableId":1,"type":"SOURCE_RANDOM"}])
==================== APPLY [EXEC OPTIMIZER]====================
0:ExecExchange(exchangeId=[7], distribution=[{"type":"EXECUTION_SINGLETON"}], exchangeInfo=[{"partitionActionInfos":[{"partitionActionType":"GATHER"}]}])
1:ExecEngine(engineBridge=[appendix$9])
5:ExecExchange(exchangeId=[0], distribution=[{"type":"EXECUTION_SINGLETON"}], exchangeInfo=[{"partitionActionInfos":[{"partitionActionType":"GATHER"}]}])
6:ExecEngine(engineBridge=[appendix$11])
7:ExecRfTableScan(table=[appendix$3], hints=[[]], distribution=[{"tableId":0,"type":"SOURCE_RANDOM"}], runtimeFilterToField=[appendix$4])
8:ExecExchange(exchangeId=[6], distribution=[{"tableId":0,"type":"SOURCE_BROADCAST"}], exchangeInfo=[appendix$5])
9:ExecRuntimeFilter(select=[0], runtimeFilterType=[TERMS], runtimeFilterStage=[FINAL])
10:ExecExchange(exchangeId=[5], distribution=[{"type":"EXECUTION_SINGLETON"}], exchangeInfo=[{"partitionActionInfos":[{"partitionActionType":"GATHER"}]}])
11:ExecRuntimeFilter(select=[0], runtimeFilterType=[TERMS], runtimeFilterStage=[PARTIAL])
ReuseRel(reuseId=3, relType=ExecEngine)
2:ExecExchange(exchangeId=[3], distribution=[{"type":"EXECUTION_SINGLETON"}], exchangeInfo=[{"partitionActionInfos":[{"partitionActionType":"GATHER"}]}])
3:ExecEngine(engineBridge=[appendix$10])
4:ExecTableScan(table=[appendix$2], hints=[[]], distribution=[{"tableId":1,"type":"SOURCE_RANDOM"}])
APPENDICES:
appendix$0: [ table: [default, index-v2], tableId: 1, indexExpression: index-v2]
appendix$1: [ table: [default, index-v1], tableId: 0, indexExpression: index-v1]
appendix$2: [ table: [default, index-v2], tableId: 1, projects: [id, long_field], indexExpression: index-v2]
appendix$3: [ table: [default, index-v1], tableId: 0, projects: [id, long_field], indexExpression: index-v1, predicates: {"bool":{"filter":[{"range":{"id":{"from":13,"to":null,"include_lower":false,"include_upper":true,"boost":1.0}}},{"match_all":{"boost":1.0}}],"adjust_pure_negative":true,"boost":1.0}}]
appendix$4: {0={"runtimeFilterType":"TERMS","field":"id","forceDependency":false}}
appendix$5: {"partitionActionInfos":[{"partitionActionType":"BROADCAST"}]}
appendix$6: EngineBridgeProject(long_field=[$1])
EngineBridgeHashJoin(condition=[AND(=($0, $2), =($1, $3))], joinType=[inner])
EngineBridgeProject(id=[$0], long_field=[$1])
EngineBridgeTableScan(table=[[engine_data_source_table_0]], fields=[[{"name":"id","type":"INTEGER"}, {"name":"long_field","type":"DOUBLE"}]])
EngineBridgeProject(expr_0=[$0], long_field0=[$1])
EngineBridgeTableScan(table=[[engine_data_source_table_1]], fields=[[{"name":"id","type":"INTEGER"}, {"name":"long_field0","type":"DOUBLE"}]])
appendix$7: EngineBridgeProject(id=[$0], long_field0=[CAST($1):DOUBLE])
EngineBridgeTableScan(table=[[engine_data_source_table_0]], fields=[[{"name":"id","type":"INTEGER"}, {"name":"long_field","type":"BIGINT"}]])
appendix$8: EngineBridgeProject(id=[$0], long_field=[divide_as_fraction($1, 2)])
EngineBridgeFilter(condition=[>(divide_as_fraction($1, 2), 767)])
EngineBridgeTableScan(table=[[engine_data_source_table_0]], fields=[[{"name":"id","type":"INTEGER"}, {"name":"long_field","type":"BIGINT"}]])
appendix$9: plan: {"nodes":[{"id":0,"type":"project","projections":[""long_field" AS "long_field""]},{"id":1,"type":"hashJoin","joinType":"inner","leftKeys":[0,1],"rightKeys":[0,1],"filter":"CAST(true as BOOLEAN)"},{"id":2,"type":"project","projections":[""id" AS "id"",""long_field" AS "long_field""]},{"id":3,"type":"tableScan"},{"id":4,"type":"project","projections":[""id" AS "expr_0"",""long_field0" AS "long_field0""]},{"id":5,"type":"tableScan"}],"edges":[{"src":0,"dest":1},{"src":1,"dest":2},{"src":2,"dest":3},{"src":1,"dest":4},{"src":4,"dest":5}]}, tableScan: {"3": {"names": ["id","long_field"], "types": ["INTEGER","DOUBLE"]}, "5": {"names": ["id","long_field0"], "types": ["INTEGER","DOUBLE"]}}
appendix$10: plan: {"nodes":[{"id":0,"type":"project","projections":[""id" AS "id"","CAST("long_field" as DOUBLE) AS "long_field0""]},{"id":1,"type":"tableScan"}],"edges":[{"src":0,"dest":1}]}, tableScan: {"1": {"names": ["id","long_field"], "types": ["INTEGER","BIGINT"]}}
appendix$11: plan: {"nodes":[{"id":0,"type":"project","projections":[""id" AS "id"","(CAST("long_field" AS DOUBLE) / CAST(CAST(2 as INTEGER) AS DOUBLE)) AS "long_field""]},{"id":1,"type":"filter","filter":"((CAST("long_field" AS DOUBLE) / CAST(CAST(2 as INTEGER) AS DOUBLE)) > CAST(CAST(767 as INTEGER) as DOUBLE))"},{"id":2,"type":"tableScan"}],"edges":[{"src":0,"dest":1},{"src":1,"dest":2}]}, tableScan: {"2": {"names": ["id","long_field"], "types": ["INTEGER","BIGINT"]}}
Task Scheduling
OLAP draws on the design concepts of Flink, Presto, and StarRocks, introducing Stages and Operators to represent the plan topology.
Operators are converted one-to-one from RelNodes and represent the execution of a function. For example, a TaleScanOperator is used to read necessary data, while an EngineOperator represents logic that needs to be executed by the native engine. Operators are uniformly abstracted; you only need to define the input, output, and the logic the operator itself needs to complete. This is also why it can support different native engines in a pluggable manner. For an EngineOperator, it only needs to pass the data to the corresponding native engine processing logic and return the processed result; the specific native engine execution logic has already been determined in the planning phase.
The Operators, after the RelNode transformation is complete, form DAG of a complete query. An Operator can have multiple upstream and downstream operators. For physical execution, this DAG needs to be partitioned to deploy tasks in a distributed cluster. A Stage, as an abstract unit of distributed scheduling, splits and merges the DAG based on data Exchange to avoid frequent network data transmission. A Stage can contain multiple Operators, and a Stage can have multiple upstream or downstream stages.
A QueryTask is the basic unit of distributed scheduling and execution. The scheduling layer will construct one or more QueryTasks from a Stage. It can be understood that a QueryTask is a distributed expansion of a Stage. During the expansion process, upstream and downstream information and physical scheduling information are added. QueryTasks are aggregated by node and then sent for execution. The execution request sent to each node will contain the task information of one or more stages.
Diagram of the relationship between Stage, Operator, and QueryTask
MPP
Stages and Operators separate the specific data computation and data transmission, ensuring scheduling flexibility and making MPP possible. As long as an MPP plan can be generated in the query plan, the scheduling layer can schedule query tasks across the entire OS cluster, regardless of whether the scheduled node acts as a client processing the request or contains the shard corresponding to the query data.
Task Execution
The execution layer adopts a push model, in which upstream pushes data to downstream. When a QueryTask is initialized on a deployment node, the Operators are initialized starting from the downstream. Each Operator is instantiated as a TableConsumer and holds a reference to the downstream Consumer. Multiple TableConsumers form a Task pipeline. After receiving and processing the data, the TableConsumer pushes the data to the downstream. It should be noted that when a node has multiple Tasks, their deployment and execution are asynchronous.
In terms of execution, OLAP has made many acceleration optimizations. OLAP extensively uses Apache Arrow as a data carrier to reduce the overhead caused by frequent data copying. To achieve faster and more efficient data reading, OLAP supports concurrent data reading at the LeafReaderContext granularity when reading shard data.
In addition, it supports custom data transmission strategies, such as hash shuffle.
Execution Flow Diagram
Performance
Currently, our OLAP clusters have been deployed on thousands of nodes internally. While preserving the low latency and high QPS capabilities of Elasticsearch, we have increased the supported computation scale by over an order of magnitude. The system can handle over 10,000 QPS for JOIN queries, with typical single-query latency in hundreds of milliseconds.
In terms of scale, a single machine can support aggregations or JOINs on up to 100 million rows of data. In a distributed environment, when optimizations like hash distribution are properly applied, the query scale is theoretically unlimited.
To further enhance performance and stability, the system utilizes off-heap memory end-to-end, which alleviates GC pressure on the heap and overcomes its memory limitations. For instance, on a machine with 1 TB of physical memory and a 32 GB JVM configuration, by fully leveraging system memory, the available memory for computation can be increased from approximately 20 GB to around 500 GB.
-
In a scenario involving a JOIN between a 200-million-row dataset (after filtering) and a 20,000-row dataset (after filtering), the combination of Runtime Filter and JOIN Reorder optimizations can drastically reduce query time from about 2 minutes to approximately 100 milliseconds.
-
Process 2 million detailed rows:
- Aggregate into 100,000 rows (fewer dimensions), sort and fetch the last page; end-to-end latency is about 0.5 seconds.
- Aggregate into 1 million rows (multiple dimensions), sort and fetch the last page; end-to-end latency is about 1.5 seconds.
-
Process 1 billion detailed rows, aggregate into 100 million buckets, perform a Shuffle JOIN, and return the top 100 results; end-to-end latency is about 3 seconds.
Partial Features and Diagrams
Cross Cluster Query
Join
Proposal
Although there are some differences between the OLAP plugin and opensearch-sql, we believe that the ultimate goal is the same. We hope to contribute this part of the code to the community so that more people can participate.
Coworker: @maosuhan @ViggoC
This is a contribution from Bytedance. They are now donating the OLAP engine built on top of OpenSearch to the community.
Thanks for contributing.
Couple questions.
The query plan can be optimized for both scatter-gather and MPP models
scatter-gather means existing OpenSearch execution model ( no shuffle)? How does optimizer decide scatter-gather model or MPP models? Based on rule?
Task Scheduling / Task Execution
What is transport layer? What is data format in fly?
Performance
Is there any benchmark number can be shared? e.g. Clickbench.
Really excited to see this, the perf results are impressive. We've been thinking of bringing additional extension points into OS core while we work on a datafusion based implementation to support alternative file formats, but we don't currently have planning / scheduling pieces, this will be a great addition. Some initial questions:
What is the entry point at the data node? Are you reconstructing a Lucene query and using Lucene to push down and apply filters and collect fields on matching docs into arrow? Or is the data read effectively a scan of doc values where your engine applies the filter?
At the coordinator / final worker, what is doing the final reduce, is this your native engine or calcite java reading vectors into the jvm?
Is this entirely in a plugin with existing extension points? I'm curious how much of this we should think about bringing to core itself as libs, where the exec engines themselves are plugins.
This is really exciting, thank you @GrandFisher!
Tagging a few other folks who are working on potentially related issues:
- @Bukhtawar @mgodwan (https://github.com/opensearch-project/OpenSearch/issues/18416)
- @bharath-techie (https://github.com/opensearch-project/OpenSearch/issues/18847)
@GrandFisher This is really cool. Thanks for sharing the proposal with the community. We are very much interested in how you approached the Velox engine integration (community version) and extended this to OpenSearch, as this is something we are also starting to look at. If you have velox based benchmark data, it would be great to see the benchmark numbers if you can share.
@mch2 I hope we can come up with a better solution through our discussions.
What is the entry point at the data node?
We create a new QueryTaskService which is different from SearchService.
Are you reconstructing a Lucene query and using Lucene to push down and apply filters and collect fields on matching docs into arrow?
We push down the filters into Lucene as mush as possible. For reading data, we record matched doc ids in a buffer and read doc values in batches. We also support an source reader that can read the source as a string field.
At the coordinator / final worker, what is doing the final reduce, is this your native engine or calcite java reading vectors into the jvm?
It depends on the query setting. We support both the native engine and the Calcite engine.
Is this entirely in a plugin with existing extension points?
Since we have implemented a framework independent of the core, we do not rely on the core's extension points. However, to make our framework more extensible, we have actually added the concept of an "addOn" (we needed a new term because olap is a plugin itself) to extend data source connectors and different compute engines. But it is not yet rudimentary and remains tightly coupled with the overall framework.
@penghuo
scatter-gather means existing OpenSearch execution model ( no shuffle)? How does optimizer decide scatter-gather model or MPP models? Based on rule?
Olap has a boolean setting called MPP_ENABLED to use different optimizers that has different rules. It can be configured in clustersettings and applies to SQL queries across the entire cluster, or it can be configured via a URL parameter to apply only to the current SQL query. The MPP mode also includes rules from the scatter-gather mode. The most suitable query plan is selected using the Cost. In other words, even if mpp_enabled= true, it is still possible to execute in scatter-gather mode.
What is transport layer? What is data format in fly?
We still used the OS/ES Transport framework, but we customized some TransportActions, TransportRequests, and RequestHandlers. The data format used is Apache Arrow.
Is there any benchmark number can be shared? e.g. Clickbench.
We do not have a requirement for benchmarking; our primary objective is to meet internal business requirements.
@GrandFisher This is really cool. Thanks for sharing the proposal with the community. We are very much interested in how you approached the Velox engine integration (community version) and extended this to OpenSearch, as this is something we are also starting to look at. If you have velox based benchmark data, it would be great to see the benchmark numbers if you can share.
@akash-shankaran Thanks for your comment. We didn't conduct comprehensive performance testing because our initial focus was on aggregation scenarios. Therefore, we created a single-machine performance POC for Velox two years ago. Theoretically, Velox's performance should only be better now. This can be considered a query like select d1, d2, sum(m1), sum(m2) .... group by d1, d2. Here, we generated two type of fields: high (cardinality=100000) and low (cardinality<10), and tested various possible combinations of dimensions. This is because Velox itself can enter different optimization paths based on data distribution and cardinality. We also tested the impact of the number of metrics on overall performance.
The unit is miliseconds and all tests were running in a single thread. The time included aggregation computation time + arrow data construction time + JNI call time.
| dim(cardinality) | metric_count | 1k rows | 10k rows | 100k rows | 1M rows | 10M rows |
|---|---|---|---|---|---|---|
| [high ] | 10 | 2.478 | 4.736 | 29.447 | 140.261 | 935.408 |
| [high+high ] | 10 | 2.751 | 5.856 | 50.329 | 338.22 | 3268.067 |
| [high+high ] | 20 | 2.967 | 6.724 | 75.334 | 582.851 | 5989.774 |
| [high+high ] | 30 | 4.513 | 13.003 | 103.107 | 894.136 | 8343.265 |
| [low ] | 10 | 2.569 | 2.156 | 5.093 | 21.797 | 179.668 |
| [low+high ] | 10 | 2.958 | 5.084 | 43.346 | 328.055 | 3256.31 |
| [low+low ] | 10 | 3.392 | 3.096 | 5.627 | 22.732 | 205.039 |
| [low+low ] | 20 | 4.206 | 3.41 | 7.565 | 39.005 | 358.522 |
| [low+low ] | 30 | 4.694 | 4.355 | 8.339 | 53.293 | 502.818 |
Feel free to tell us if you have any other interested questions.
This is super exciting, really like the pluggable interfaces for different engines part.
Can you shed more light on how you decide between calcite / velox / in-house engine ?
- Are there specific use cases for each engine ?
- Do you see specific pros / cons for each engine ?
I'm assuming all are low-level execution engines that'll perform aggregations on doc values once we pushdown the filer to lucene and get back the ids. Correct me if they're not and used for different purposes.
@bharath-techie Thanks for your comment.
Are there specific use cases for each engine ?
Currently, we use calcite engine to run integration tests internally and compare the result with velox or other native engine. We never run calcite engine in our production environment because calcite engine is considered as much slower and more limited compared to mature native engine. We run velox and other native engine in our production.
Also calcite and velox have many differences in behavior. The biggest part is that the functions supported by calcite and velox are different. I think this issue also exists when we compare velox and data fusion. It seems difficult to directly migrate one engine to another without any SQL modification especially if the SQL use many customize functions. So we do not guarantee the transparent switching of different engines.
Do you see specific pros / cons for each engine ?
| Data Format | Performance of big query | Performance of tiny query | Max Supported Data Volume | Stability in production | Maintainability | Feature and Function | Other | |
|---|---|---|---|---|---|---|---|---|
| calcite engine | Object[], bad performance for transmission and construction | Slow, java and only single thread | Slow, need codegen which oftern take 10ms+. QPS is lower. | Low. Limited by JVM | High | Good, all is java | Can support complete SQL. Rich function list. | Not freiendly to distributed computation. Do not support partial agg |
| velox/other native engines | Arrow, good performance for transmission and construction | Very fast, c++ and multiple drivers | Fast, donot need codegen. QPS is higher. | High. Limited by machine memory. | Risky, it may have crash issue if data or plan is incorrect | Bad, we need c++ development and JNI wrapper | Can support complete SQL. Rich function list. | Freiendly to distributed computation. It support partial agg |
I'm assuming all are low-level execution engines that'll perform aggregations on doc values once we pushdown the filer to lucene and get back the ids. Correct me if they're not and used for different purposes.
Yes, we execute the query just like you said. And we use ValuesSourceConfig.resolve(context, type, fieldName, null, null, null, null).toValuesSource(context) as possible when reading from opensearch lucene fields.
We still used the OS/ES Transport framework, but we customized some TransportActions, TransportRequests, and RequestHandlers. The data format used is Apache Arrow.
OLAP extensively uses Apache Arrow as a data carrier to reduce the overhead caused by frequent data copying
do we stream or flush all record batches into one big byte buffer? Is it transport-netty4 default allocators which are being used at transport layer?
Subsequently, some CBO optimizations are performed, mainly JoinReorder, which relies on collected metadata information.
Is the collected metadata around field cardinality? is it something being pulled asynchronously to all the nodes?
I should probably wait for code to be uploaded and get these answers to avoid running into chain of follow up questions
@rishabhmaurya Our distributed tasks use a push model, where the sink operator sends data to the next task via ExchangeTableRequest. Initially, we also encountered the issue of being unable to transmit data exceeding 2GB. Later, we implemented streaming transmission by splitting the data into multiple requests and finally sending an EOS signal.
Currently, we have not implemented dynamic statistics of cardinality and need to set the cardinality of fields in the mapping. Another key focus in CBO is determining the selectivity of filter conditions. We achieve this by sending a separate request to obtain the actual total_hits. Leveraging Lucene's indexing capabilities, this query is relatively efficient.
@ViggoC
Our distributed tasks use a push model, where the sink operator sends data to the next task via ExchangeTableRequest. Initially, we also encountered the issue of being unable to transmit data exceeding 2GB. Later, we implemented streaming transmission by splitting the data into multiple requests and finally sending an EOS signal.
Interesting! couldn't wait to hack this code to use flight transport to benefit from both streaming arrow format and zero copy at network layer. Currently, it is used for streaming aggregation results for high card fields between data and coordinator nodes.
Currently, we have not implemented dynamic statistics of cardinality and need to set the cardinality of fields in the mapping. Another key focus in CBO is determining the selectivity of filter conditions. We achieve this by sending a separate request to obtain the actual total_hits. Leveraging Lucene's indexing capabilities, this query is relatively efficient.
Totally! We have bunch of statistics already in lucene which we never use at coordinator for efficient planning.
@rishabhmaurya Improving the efficiency of data exchange using Arrow Flight has always been on our to-do list, but we lack sufficient manpower. We look forward to collaborating with the community, as it will be a better solution.
Feel free to tell us if you have any other interested questions.
Hi @maosuhan - Thanks for sharing the data. 2 questions:
- Would you have comparison data between ElasticSearch vs Velox (velox part, you already shared above, just wanted to see the improvements compared to a baseline).
- Based on comments and RFC here, looks like you used a JNI to call Velox? Did you evaluate any other approach, such as a sidecar standalone velox process called from Elastic/OpenSearch, before arriving upon JNI? If yes, any pro/cons you can share?
- Is your JNI code open-sourced, or you plan to make it available as part of the proposed plugin?
- Would you have comparison data between ElasticSearch vs Velox (velox part, you already shared above, just wanted to see the improvements compared to a baseline).
It seems more reasonable to compare the E2E performance between SQL and DSL. Maybe SQL group by vs composite agg or terms agg. It is a little complicated to do benchmark for part of the execution path. What do you think?
- Based on comments and RFC here, looks like you used a JNI to call Velox? Did you evaluate any other approach, such as a sidecar standalone velox process called from Elastic/OpenSearch, before arriving upon JNI? If yes, any pro/cons you can share?
We have not considered or tried a sidecar implementation. I think it is also doable because it can save the network and serialization overhead. But we think JNI call maybe a more general way to use velox because flink and spark also use JNI to call velox.
- Is your JNI code open-sourced, or you plan to make it available as part of the proposed plugin?
Currently, all the JNI code is in house and we plan to open it in the future along with this plugin. Another open source option is velox4j which I think can also work for this plugin.
It seems more reasonable to compare the E2E performance between SQL and DSL. Maybe SQL group by vs composite agg or terms agg. It is a little complicated to do benchmark for part of the execution path. What do you think?
Either is fine. I see value in both. Partial execution path shows specific improvements and its benefits, and whole stage is better to compare the full query execution (like you said, group by vs composite agg). Looks like there is some query rewriting that will also be necessary (from DSL -> SQL). We're also evaluating Velox engine, and I'm hoping to see your results to estimate what kind of improvements we can expect on a particular query type. :)
Currently, all the JNI code is in house and we plan to open it in the future along with this plugin. Another open source option is velox4j which I think can also work for this plugin.
Thanks for the pointer. I'll start with velox4j, and do some experiments.
In the first step of open sourcing, we will release calcite engine solution. And then we will release bolt engine which is an in-house engine from bytedance which is going to open-source in 12.7 also.
Bolt is very well supported in bytedance compared to velox and is widely adopted in Presto, Flink and Spark in bytedance. Velox is also an alternative solution and we will look into how to integrate it in the future or maybe some people who are interested in from the community can help together.
Bolt is open sourced: https://github.com/bytedance/bolt