OpenSearch icon indicating copy to clipboard operation
OpenSearch copied to clipboard

[RFC] OpenSearch Execution Engine

Open mgodwan opened this issue 6 months ago • 6 comments

RFC: OpenSearch Execution Engine

This RFC proposes extending OpenSearch to support new storage formats along with corresponding indexing and/or query engines in addition to Lucene. This enhancement will provide users with flexibility to choose data format/engines based on their specific use cases, performance requirements and data characteristics. This RFC currently focuses on a high level overview of the proposal, detailed approaches will follow.

Motivation

Users leverage OpenSearch for a wide variety of use-cases. These use cases spread across multiple data sources (catalog, transactions, etc.) and data (logs, metrics, traces, etc.) from different components in their pipeline. OpenSearch users face various pain points categorized along the following themes:

Storage Cost

  • Different data types and access patterns require different storage optimizations
  • Need for more cost-effective storage solutions for specific workloads

Data Tiering

  • Limited flexibility in how data is stored across different tiers
  • Need for better control over data placement and storage formats

New Use Cases

  • First-class support for features like cross-index joins
  • Aggregation push down to the data storage layer

Current Data Storage Limitations

Today, OpenSearch uses Lucene’s codec interface to store the data. This allows OpenSearch to leverage different formats that are exposed through Lucene and extend them as necessary (e.g. doc values, points, postings, vectors, etc.) to build required implementations suited to the use case. But this has certain drawbacks:

  1. This limits addition of any new fields / field types which doesn’t fit in with existing types supported by Lucene codecs (e.g. multi-field data structures).
  2. This also requires constructs like roll-ups, aggregation, etc to be built on application layer, rather than being built on the existing data directly.

There have been similar discussions in the OpenSearch community which this RFC aims to provide a high level solution for:

  1. https://github.com/opensearch-project/OpenSearch/issues/13668
  2. https://github.com/opensearch-project/OpenSearch/issues/17501

Goals

  • Enable pluggable storage formats and corresponding indexing, and query engines.
  • Maintain backward compatibility with data stored using Lucene engine.
  • Consistency of API interface.
  • Usage of mixed engines within the same index (provided the data format is compatible with the engines).
  • Data migration from existing sources

Concepts

Data Storage Format

In order to extend OpenSearch to support more use-cases and increase the efficiency (better compression) of data storage and retrieval, OpenSearch can make the data storage format configurable. This will allow OpenSearch to expose multiple other index types with different field combinations, encoding, etc.

Projections

A projection is a view on top of user’s data which provides an optimized operation based on certain fields e.g. when a number field is added to the document, OpenSearch creates numeric index in Lucene (points format) to support fast range/term queries on numeric fields. With different data formats, a projection can lie within any such format and the corresponding engine will solely be responsible for maintaining the management of the operations related to the projection. A projection can have multiple sub-types (and more can be registered), with additional data formats being allowed to define more (E.g. Document Source, Inverted Index, Materialized Views such as star-tree, vector indexes, etc). These projections will be built as part of the refresh flow on index shards.

Engine

An engine is the entity which is responsible for managing operations on the data structures associated for OpenSearch documents based on the data storage format. Engine can be decomposed into 2 different types:

Indexing Engine

Indexing engine is responsible for creating the persistent data structures from the incoming documents. Based on the data storage format, engine will be responsible for creating its representation of the incoming stream of documents. Once the data is ingested, it will be the responsibility of the same engine to make the data available for search (refresh), and provide durability guarantee signals (flush) to the OpenSearch’s indexing logical layer, which in turn will ensure a consistent view of data across the different engines.

There will be a common coordinating engine layer on top of multiple indexing engines, which can support shard lifecycle operations like refreshes, commits, merges, recovery, etc. We will follow with a detailed RFC on indexing engine highlighting synchronization of operations across multiple engines.

Query Engine

Query engine will provide efficient query constructs on top of the data format. In lines with this, it will support query execution, planning, and optimization. A query engine can map the data provided by multiple indexing engines across data storage formats to build queries which optimizes on resource usage of the system. Similarly, an indexing engine can publish data prepared by it to different query engines so that they can take decisions on how to query data.

Primary Indexing/Query Engine

Today, OpenSearch uses translog as the source storage layer for immediate term, and then purges translog once data is persisted into Lucene segments. Lucene segments then uses _source field to store the incoming document bytes. With the introduction multiple storage formats, OpenSearch can be configured with the storage format which can be the source of truth for holding the user document and will be used in OpenSearch’s fetch phases to get the document’s source.

Lucene will continue to be the default OpenSearch data format and engine, while other data format and engines can exist beside it to support any new fields/operations.

Core Architecture

Image

Engine Abstraction Layer

The engine abstraction layer provides pluggable interfaces to allow users to bring their own indexing/query engines. At high level, engine interfaces would look something like the following (needs refining but provides an idea for now):

IndexingEngine {

    // Properties
    EngineConfig;
    List<DataFormat>;
    
    // Core indexing operations
    List<ValidationError> validateDocument(Document doc);
    index(Document doc);
    delete(Term term);
    updateDocument(Term term, Document doc);
    
    // Lifecycle management operations
    refresh();
    CommitPoint commit();
    merge(MergeContext mergeContext);
    Releasable preservePointInTimeData(QueryContext context);
    close();
    rollback();
    
    // Recovery and replication operations
    recover(RecoveryContext context);
    tierData();

    IndexingStats stats();
}
QueryEngine {

    EngineConfig;
    
    // Query validation
    List<ValidationError> canSupportQuery(Query query);
    
    // Core query execution
    QueryResult execute(QueryResult request);
    
    // Query planning and optimization
    QueryPlan createQueryPlan(Query query, QueryContext context);
    
    // Aggregation handling
    AggregationResult aggregate(AggregationBuilder aggregationBuilder, SearchContext context);
    
    // Cost estimation
    QueryCost estimateQueryCost(QueryPlan plan);
        
    QueryStats stats();
}

There can be more methods/APIs which we can expose through the engines, but the above would be a bare minimum set of methods we need from the engines.

Index Configuration

Index configuration in a pluggable environment will have mechanism for users to define how their data should be indexed across different data formats. When creating an index, users can either choose to specify engine preferences through index templates or explicit mappings, with sensible defaults for engine → data format. The mappings will support both single-engine and hybrid approaches, allowing different fields stored to be stored with different data formats within the same index and indexed/queried by different engines based on their specific requirements.

Index settings can include data format specific parameters such as compression methods, encoding formats, and optimization flags, which are validated and applied during index creation e.g. an index might be configured to use a specialized vector storage format/engine for embedding fields while utilizing Lucene for text search. This flexibility enables users to optimize their storage and query patterns based on their specific use cases while maintaining OpenSearch's operational simplicity.

Joins

When working with multiple data formats, efficient join operations become crucial for query performance and data consistency. To facilitate joins across different data formats/engines, a common row ID field will be added to serve as the primary identifier that links related data across various projections and data formats. The query planning layer can then optimize join operations by considering the capabilities and performance characteristics of each engine involved, data distribution, size, and engine-specific optimizations and come up with the most efficient join strategy. For example, when joining data between a Lucene-based index and a vector store, the system can leverage the strengths of each engine - using Lucene's state of the art inverted index for filtering and the vector store's similarity calculations to come up with an efficient query plan. With a pluggable query engine, OpenSearch should also be able to support joins across data from multiple indexes. The engine will be able to support querying data for multiple indexes together.

Related component

Indexing

Additional context

Contributors: @backslasht @bharath-techie

mgodwan avatar Jun 02 '25 16:06 mgodwan

@Bukhtawar @shwetathareja @mch2 @getsaurabh02 Looking forward to your thoughts on this.

mgodwan avatar Jun 06 '25 08:06 mgodwan

Thanks for the RFC. Allowing storage format extension is indeed valuable, and we have seen such demands in several scenarios too. For example, as AI/ML becomes more popular these days, new emerging formats like Lance can better such workloads.

However, I wonder if we have to extend an engine to support a new format. Given Java does not allow multiple class extensions, how will we use multiple new formats at the same time? Ideally we want composability at the storage format layer, and I have seen in the past we have tried to leverage the binary data in Lucene for new formats such as the KNN plugin. Perhaps we can think along this line to introduce some plugin interface at the storage format layer?

yupeng9 avatar Jun 06 '25 15:06 yupeng9

This is a really nice proposal! Thanks @mgodwan.

However, I wonder if we have to extend an engine to support a new format. Given Java does not allow multiple class extensions, how will we use multiple new formats at the same time? Ideally we want composability at the storage format layer, and I have seen in the past we have tried to leverage the binary data in Lucene for new formats such as the KNN plugin. Perhaps we can think along this line to introduce some plugin interface at the storage format layer?

This is a good question. Based on this part:

To facilitate joins across different data formats/engines, a common row ID field will be added to serve as the primary identifier that links related data across various projections and data formats.

And the section "Current Data Storage Limitations", I'm working on the assumption that this operating at a higher layer than KNN does. While KNN embeds vectors in Lucene segments (and did so before Lucene had its own vector concept), it sounds like this is inserting a logical branch between the Engine and the Lucene index, so the Engine can write to Lucene and something else, and then provides a higher level query abstraction to handle reading both (and picking the best option for those reads). While the Engine currently manages a Lucene index and a translog, I think this proposal would allow it to manage other stuff. If we wanted to embed some other specialized data format (like Lance) we would be able to do that via this mechanism.

@mgodwan, at the risk of getting a little into the implementation weeds, would this proposal end up opening up the ShardPath (ref) class? Currently, it has a fixed translog and index directory (called "folders" in a very Windowsy nomenclature). Would other data stores be able to register their own paths there? (I don't think we can store additional stuff in the index directory, since I think Lucene deletes files that it doesn't recognize on startup.)

msfroh avatar Jun 06 '25 17:06 msfroh

Thanks @mgodwan , it would be pretty useful to plug other engines and storage formats in OpenSearch to get the best performance across use cases. It would also open up to integrate with remote engines and need not be local to the cluster always.

There will be a common coordinating engine layer on top of multiple indexing engines, which can support shard lifecycle operations like refreshes, commits, merges, recovery, etc.

We should also call out (probably in detailed RFC) clear tenets and guarantees from other engines and data formats to ensure ordering and consistency during ingestion to ensure failure cases like refresh succeeded in one engine, while failed in other are handled gracefully.

shwetathareja avatar Jun 10 '25 06:06 shwetathareja

Thanks for this RFC. I can see a benefit for other storage formats for the observability use-cases as well. As an example OpenTelemetry data contains a lot of resource attributes, that are not supposed to change during the application runtime. A dedicated store can avoid a lot of data duplication.

KarstenSchnitter avatar Jun 13 '25 05:06 KarstenSchnitter

Thanks Mohit for the proposal, I guess there the multi-data format engine abstraction is multi-fold as might need to support cases 1/ where the source of truth sits entirely on one system and needs a join across data format also similar to #12948 2/ where the source of truth is ingested across disparate systems like vector fields on specialised computes requiring custom segment creation/merge lifecycles where a logical data replicator might be a better fit. The current translog should be evolved into a persistent stream of logical records. This is to ensure updates and deletes are handled correctly

Another orthogonal thought is if we could have a more native integration with data prepper that directly writes the stream of record as source(in an open-format) on a remote storage and with pull-based ingestion we create Lucene indices/vector graphs on-demand across independent compute nodes

Bukhtawar avatar Jun 14 '25 08:06 Bukhtawar

Thanks folks for reviewing the proposal.

However, I wonder if we have to extend an engine to support a new format. Given Java does not allow multiple class extensions, how will we use multiple new formats at the same time? Ideally we want composability at the storage format layer

There are multiple ways to use engine which goes beyond the storage format, involving in memory buffering, custom metadata, etc which the engine can govern while preserving core properties of the storage format, and hence the proposal to have an engine as well. I'm working on more low level details which can help discuss how we can achieve the same.

If we wanted to embed some other specialized data format (like Lance) we would be able to do that via this mechanism.

Yes, the abstractions proposed should allow to achieve the same.

Would other data stores be able to register their own paths there? (I don't think we can store additional stuff in the index directory, since I think Lucene deletes files that it doesn't recognize on startup.)

Yes, I'd prefer to leverage ShardPath class with a registry hooked into it via engine for possible path for different storage formats combinations. This should keep the formats' storage loosely coupled. Let me know if you think there are other alternative approaches which we can dive deep into as well.

mgodwan avatar Jun 18 '25 09:06 mgodwan

ensure ordering and consistency during ingestion to ensure failure cases like refresh succeeded in one engine, while failed in other are handled gracefully.

Yes, I was thinking of chaining mechanism which allows each engine to create files, but have a consistent format equivalent to segment info which can help the query layers provide a consistent view of data across different engine/storage formats. With proper failure scenario implementation, it should help achieve consistency of data across storage formats. I'll cover more on this. I'm working on a POC and can share details on the issue itself.

mgodwan avatar Jun 18 '25 09:06 mgodwan

Another orthogonal thought is if we could have a more native integration with data prepper that directly writes the stream of record as source(in an open-format) on a remote storage and with pull-based ingestion we create Lucene indices/vector graphs on-demand across independent compute nodes

+1, This should be a good value add to the overall proposal.

mgodwan avatar Jun 18 '25 09:06 mgodwan