[RFC] OpenSearch Execution Engine
RFC: OpenSearch Execution Engine
This RFC proposes extending OpenSearch to support new storage formats along with corresponding indexing and/or query engines in addition to Lucene. This enhancement will provide users with flexibility to choose data format/engines based on their specific use cases, performance requirements and data characteristics. This RFC currently focuses on a high level overview of the proposal, detailed approaches will follow.
Motivation
Users leverage OpenSearch for a wide variety of use-cases. These use cases spread across multiple data sources (catalog, transactions, etc.) and data (logs, metrics, traces, etc.) from different components in their pipeline. OpenSearch users face various pain points categorized along the following themes:
Storage Cost
- Different data types and access patterns require different storage optimizations
- Need for more cost-effective storage solutions for specific workloads
Data Tiering
- Limited flexibility in how data is stored across different tiers
- Need for better control over data placement and storage formats
New Use Cases
- First-class support for features like cross-index joins
- Aggregation push down to the data storage layer
Current Data Storage Limitations
Today, OpenSearch uses Lucene’s codec interface to store the data. This allows OpenSearch to leverage different formats that are exposed through Lucene and extend them as necessary (e.g. doc values, points, postings, vectors, etc.) to build required implementations suited to the use case. But this has certain drawbacks:
- This limits addition of any new fields / field types which doesn’t fit in with existing types supported by Lucene codecs (e.g. multi-field data structures).
- This also requires constructs like roll-ups, aggregation, etc to be built on application layer, rather than being built on the existing data directly.
There have been similar discussions in the OpenSearch community which this RFC aims to provide a high level solution for:
- https://github.com/opensearch-project/OpenSearch/issues/13668
- https://github.com/opensearch-project/OpenSearch/issues/17501
Goals
- Enable pluggable storage formats and corresponding indexing, and query engines.
- Maintain backward compatibility with data stored using Lucene engine.
- Consistency of API interface.
- Usage of mixed engines within the same index (provided the data format is compatible with the engines).
- Data migration from existing sources
Concepts
Data Storage Format
In order to extend OpenSearch to support more use-cases and increase the efficiency (better compression) of data storage and retrieval, OpenSearch can make the data storage format configurable. This will allow OpenSearch to expose multiple other index types with different field combinations, encoding, etc.
Projections
A projection is a view on top of user’s data which provides an optimized operation based on certain fields e.g. when a number field is added to the document, OpenSearch creates numeric index in Lucene (points format) to support fast range/term queries on numeric fields. With different data formats, a projection can lie within any such format and the corresponding engine will solely be responsible for maintaining the management of the operations related to the projection. A projection can have multiple sub-types (and more can be registered), with additional data formats being allowed to define more (E.g. Document Source, Inverted Index, Materialized Views such as star-tree, vector indexes, etc). These projections will be built as part of the refresh flow on index shards.
Engine
An engine is the entity which is responsible for managing operations on the data structures associated for OpenSearch documents based on the data storage format. Engine can be decomposed into 2 different types:
Indexing Engine
Indexing engine is responsible for creating the persistent data structures from the incoming documents. Based on the data storage format, engine will be responsible for creating its representation of the incoming stream of documents. Once the data is ingested, it will be the responsibility of the same engine to make the data available for search (refresh), and provide durability guarantee signals (flush) to the OpenSearch’s indexing logical layer, which in turn will ensure a consistent view of data across the different engines.
There will be a common coordinating engine layer on top of multiple indexing engines, which can support shard lifecycle operations like refreshes, commits, merges, recovery, etc. We will follow with a detailed RFC on indexing engine highlighting synchronization of operations across multiple engines.
Query Engine
Query engine will provide efficient query constructs on top of the data format. In lines with this, it will support query execution, planning, and optimization. A query engine can map the data provided by multiple indexing engines across data storage formats to build queries which optimizes on resource usage of the system. Similarly, an indexing engine can publish data prepared by it to different query engines so that they can take decisions on how to query data.
Primary Indexing/Query Engine
Today, OpenSearch uses translog as the source storage layer for immediate term, and then purges translog once data is persisted into Lucene segments. Lucene segments then uses _source field to store the incoming document bytes. With the introduction multiple storage formats, OpenSearch can be configured with the storage format which can be the source of truth for holding the user document and will be used in OpenSearch’s fetch phases to get the document’s source.
Lucene will continue to be the default OpenSearch data format and engine, while other data format and engines can exist beside it to support any new fields/operations.
Core Architecture
Engine Abstraction Layer
The engine abstraction layer provides pluggable interfaces to allow users to bring their own indexing/query engines. At high level, engine interfaces would look something like the following (needs refining but provides an idea for now):
IndexingEngine {
// Properties
EngineConfig;
List<DataFormat>;
// Core indexing operations
List<ValidationError> validateDocument(Document doc);
index(Document doc);
delete(Term term);
updateDocument(Term term, Document doc);
// Lifecycle management operations
refresh();
CommitPoint commit();
merge(MergeContext mergeContext);
Releasable preservePointInTimeData(QueryContext context);
close();
rollback();
// Recovery and replication operations
recover(RecoveryContext context);
tierData();
IndexingStats stats();
}
QueryEngine {
EngineConfig;
// Query validation
List<ValidationError> canSupportQuery(Query query);
// Core query execution
QueryResult execute(QueryResult request);
// Query planning and optimization
QueryPlan createQueryPlan(Query query, QueryContext context);
// Aggregation handling
AggregationResult aggregate(AggregationBuilder aggregationBuilder, SearchContext context);
// Cost estimation
QueryCost estimateQueryCost(QueryPlan plan);
QueryStats stats();
}
There can be more methods/APIs which we can expose through the engines, but the above would be a bare minimum set of methods we need from the engines.
Index Configuration
Index configuration in a pluggable environment will have mechanism for users to define how their data should be indexed across different data formats. When creating an index, users can either choose to specify engine preferences through index templates or explicit mappings, with sensible defaults for engine → data format. The mappings will support both single-engine and hybrid approaches, allowing different fields stored to be stored with different data formats within the same index and indexed/queried by different engines based on their specific requirements.
Index settings can include data format specific parameters such as compression methods, encoding formats, and optimization flags, which are validated and applied during index creation e.g. an index might be configured to use a specialized vector storage format/engine for embedding fields while utilizing Lucene for text search. This flexibility enables users to optimize their storage and query patterns based on their specific use cases while maintaining OpenSearch's operational simplicity.
Joins
When working with multiple data formats, efficient join operations become crucial for query performance and data consistency. To facilitate joins across different data formats/engines, a common row ID field will be added to serve as the primary identifier that links related data across various projections and data formats. The query planning layer can then optimize join operations by considering the capabilities and performance characteristics of each engine involved, data distribution, size, and engine-specific optimizations and come up with the most efficient join strategy. For example, when joining data between a Lucene-based index and a vector store, the system can leverage the strengths of each engine - using Lucene's state of the art inverted index for filtering and the vector store's similarity calculations to come up with an efficient query plan. With a pluggable query engine, OpenSearch should also be able to support joins across data from multiple indexes. The engine will be able to support querying data for multiple indexes together.
Related component
Indexing
Additional context
Contributors: @backslasht @bharath-techie
@Bukhtawar @shwetathareja @mch2 @getsaurabh02 Looking forward to your thoughts on this.
Thanks for the RFC. Allowing storage format extension is indeed valuable, and we have seen such demands in several scenarios too. For example, as AI/ML becomes more popular these days, new emerging formats like Lance can better such workloads.
However, I wonder if we have to extend an engine to support a new format. Given Java does not allow multiple class extensions, how will we use multiple new formats at the same time? Ideally we want composability at the storage format layer, and I have seen in the past we have tried to leverage the binary data in Lucene for new formats such as the KNN plugin. Perhaps we can think along this line to introduce some plugin interface at the storage format layer?
This is a really nice proposal! Thanks @mgodwan.
However, I wonder if we have to extend an engine to support a new format. Given Java does not allow multiple class extensions, how will we use multiple new formats at the same time? Ideally we want composability at the storage format layer, and I have seen in the past we have tried to leverage the binary data in Lucene for new formats such as the KNN plugin. Perhaps we can think along this line to introduce some plugin interface at the storage format layer?
This is a good question. Based on this part:
To facilitate joins across different data formats/engines, a common row ID field will be added to serve as the primary identifier that links related data across various projections and data formats.
And the section "Current Data Storage Limitations", I'm working on the assumption that this operating at a higher layer than KNN does. While KNN embeds vectors in Lucene segments (and did so before Lucene had its own vector concept), it sounds like this is inserting a logical branch between the Engine and the Lucene index, so the Engine can write to Lucene and something else, and then provides a higher level query abstraction to handle reading both (and picking the best option for those reads). While the Engine currently manages a Lucene index and a translog, I think this proposal would allow it to manage other stuff. If we wanted to embed some other specialized data format (like Lance) we would be able to do that via this mechanism.
@mgodwan, at the risk of getting a little into the implementation weeds, would this proposal end up opening up the ShardPath (ref) class? Currently, it has a fixed translog and index directory (called "folders" in a very Windowsy nomenclature). Would other data stores be able to register their own paths there? (I don't think we can store additional stuff in the index directory, since I think Lucene deletes files that it doesn't recognize on startup.)
Thanks @mgodwan , it would be pretty useful to plug other engines and storage formats in OpenSearch to get the best performance across use cases. It would also open up to integrate with remote engines and need not be local to the cluster always.
There will be a common coordinating engine layer on top of multiple indexing engines, which can support shard lifecycle operations like refreshes, commits, merges, recovery, etc.
We should also call out (probably in detailed RFC) clear tenets and guarantees from other engines and data formats to ensure ordering and consistency during ingestion to ensure failure cases like refresh succeeded in one engine, while failed in other are handled gracefully.
Thanks for this RFC. I can see a benefit for other storage formats for the observability use-cases as well. As an example OpenTelemetry data contains a lot of resource attributes, that are not supposed to change during the application runtime. A dedicated store can avoid a lot of data duplication.
Thanks Mohit for the proposal, I guess there the multi-data format engine abstraction is multi-fold as might need to support cases 1/ where the source of truth sits entirely on one system and needs a join across data format also similar to #12948 2/ where the source of truth is ingested across disparate systems like vector fields on specialised computes requiring custom segment creation/merge lifecycles where a logical data replicator might be a better fit. The current translog should be evolved into a persistent stream of logical records. This is to ensure updates and deletes are handled correctly
Another orthogonal thought is if we could have a more native integration with data prepper that directly writes the stream of record as source(in an open-format) on a remote storage and with pull-based ingestion we create Lucene indices/vector graphs on-demand across independent compute nodes
Thanks folks for reviewing the proposal.
However, I wonder if we have to extend an engine to support a new format. Given Java does not allow multiple class extensions, how will we use multiple new formats at the same time? Ideally we want composability at the storage format layer
There are multiple ways to use engine which goes beyond the storage format, involving in memory buffering, custom metadata, etc which the engine can govern while preserving core properties of the storage format, and hence the proposal to have an engine as well. I'm working on more low level details which can help discuss how we can achieve the same.
If we wanted to embed some other specialized data format (like Lance) we would be able to do that via this mechanism.
Yes, the abstractions proposed should allow to achieve the same.
Would other data stores be able to register their own paths there? (I don't think we can store additional stuff in the index directory, since I think Lucene deletes files that it doesn't recognize on startup.)
Yes, I'd prefer to leverage ShardPath class with a registry hooked into it via engine for possible path for different storage formats combinations. This should keep the formats' storage loosely coupled. Let me know if you think there are other alternative approaches which we can dive deep into as well.
ensure ordering and consistency during ingestion to ensure failure cases like refresh succeeded in one engine, while failed in other are handled gracefully.
Yes, I was thinking of chaining mechanism which allows each engine to create files, but have a consistent format equivalent to segment info which can help the query layers provide a consistent view of data across different engine/storage formats. With proper failure scenario implementation, it should help achieve consistency of data across storage formats. I'll cover more on this. I'm working on a POC and can share details on the issue itself.
Another orthogonal thought is if we could have a more native integration with data prepper that directly writes the stream of record as source(in an open-format) on a remote storage and with pull-based ingestion we create Lucene indices/vector graphs on-demand across independent compute nodes
+1, This should be a good value add to the overall proposal.