OpenSearch icon indicating copy to clipboard operation
OpenSearch copied to clipboard

[META] Protobuf for Search API

Open VachaShah opened this issue 1 year ago • 21 comments

Proposal

With the experiment done for using protobuf in _cat/nodes API (see https://github.com/opensearch-project/OpenSearch/issues/6844#issuecomment-1742250229) and a 15-30% improvement depending on the size of the cluster, we can assess the benefits of protobuf for Search API in terms of serialization and de-serialization of the requests and responses in between nodes and at the REST level.

Next Steps

Do a similar experiment for the Search API to understand the performance improvements using protobuf. (This is a work in progress)

Details

In order to experiment and see incremental benefits, some of the classes I am targeting to convert to protobuf: SearchRequest, SearchResponse, SearchPhaseResult, FetchSearchResult, ShardFetchRequest, ShardSearchRequest, QueryFetchSearchResult, QuerySearchRequest, QuerySearchResult. Also, classes related to TransportSearchAction are required to support protobuf requests and responses.

Code changes

The code changes for the experiment are being added in a branch on my fork: https://github.com/VachaShah/OpenSearch/compare/poc-cat-nodes-protobuf...VachaShah:OpenSearch:poc-search-protobuf?expand=1

Sub tasks/Milestones

Next steps as discussed with @msfroh and @getsaurabh02. Let me know if I missed something here.

  • [ ] Convert QuerySearchResult, FetchSearchResult and QueryFetchSearchResult into a proto message and add support to communicate this response between nodes using protobuf bytes. This includes adding support for protobuf in OutboundHandler and InboundHandler. Deliver this as an experimental feature. There will be a feature flag which if enabled will allow node-to-node communication using protobuf.
  • [ ] Add support for aggregations (in QuerySearchResult), NestedIdentity (in SearchHit) and other optional fields in SearchHit as protobuf messages.
  • [ ] Repeat the same for other search results and requests.

Note: All of the above changes go behind an experimental feature flag. Once the incremental changes are in for the Search API:

  • [ ] Add backwards compatibility for the node-to-node communication protocols with version checks during node-to-node communication and remove the experimental feature flag.

Related

  • RFC: https://github.com/opensearch-project/OpenSearch/issues/6844
  • _cat/nodes POC: https://github.com/opensearch-project/OpenSearch/pull/9097

VachaShah avatar Oct 18 '23 06:10 VachaShah

would like to do +1 on this feature. This will be great win specially for vector search where search payload increases with increase in dimension of the vectors.

navneet1v avatar Oct 18 '23 06:10 navneet1v

would like to do +1 on this feature. This will be great win specially for vector search where search payload increases with increase in dimension of the vectors.

@navneet1v is there data we could look where the bottleneck is for vector search?

saratvemulapalli avatar Oct 18 '23 21:10 saratvemulapalli

@saratvemulapalli I don't have data available, because while running the benchmarks we remove the _source field. But what I can do is I can run a small benchmarks and provide you that info. Will that work?

navneet1v avatar Oct 18 '23 23:10 navneet1v

@saratvemulapalli I don't have data available, because while running the benchmarks we remove the _source field. But what I can do is I can run a small benchmarks and provide you that info. Will that work?

Yeah what we'd like to know is serialization/de-serialization causing performance latency, and is that during query phase or fetch phase etc. This will help us narrow down which area to work on first. The idea so far (theoretically) is we will work on fetch phase where the responses are serialized on data nodes and de-serialized on co-ordinator which will have the best benefits for protobuf.

saratvemulapalli avatar Oct 18 '23 23:10 saratvemulapalli

For vector search it will be fetch phase only, because that is where a vector having like lets say 100 dimension is getting serialized and de-serialized per document. Consider like every float is represented as 4bytes it becomes like 400 bytes just alone for vectors.

and the same gets transported over the wire to customers too. Hence once we start to make the change it will help a lot.

The idea so far (theoretically) is we will work on fetch phase where the responses are serialized on data nodes and de-serialized on co-ordinator which will have the best benefits for protobuf.

+1 on this.

From benchmarks I will see what I can provide, but I can surely help setup the benchmark code so that you guys can run of different custom OpenSearch to get more numbers if you want.

navneet1v avatar Oct 18 '23 23:10 navneet1v

That would be super helpful @navneet1v!

VachaShah avatar Oct 18 '23 23:10 VachaShah

@saratvemulapalli , @VachaShah Here is one of the benchmarking notebook which you can use, to test k-NN with any OpenSearch cluster with Security Enabled.

https://github.com/navneet1v/OpenSearchVectorDB/blob/main/benchmarking/sift-128/sift-128-benchmarking.ipynb

This is something I was working on. This replicates the behavior on how we do perf testing in K-NN. Good thing about this is its is easy to run.

import numpy as np
from tqdm.notebook import tqdm

# search in the index
def searchQueryGen(input_array=X_TEST):
    for i, vec in enumerate(input_array):
        yield {
            "_source": False, # Don't get the source as this impacts latency
            "size": 100,
            "query": {
                "knn": {
                    "vec": {
                        "vector": vec.tolist(),
                        "k": 100
                    }
                }
            }
        }


neighbors_lists = []
search_latency = []
took_time = []
for query in tqdm(searchQueryGen(), total=len(X_TEST)):
    start = time.time()
    search_response = client.search(body=query, index=vector_index_name, _source=False, docvalue_fields=["_id"], stored_fields="_none_")
    end = time.time()
    search_latency.append(end - start)
    took_time.append(search_response["took"])
    search_hits = search_response['hits']['hits']
    search_neighbors = [int(hit["fields"]["_id"][0]) for hit in search_hits]
    neighbors_lists.append(search_neighbors)

You can remove all the optimization that we have added:

  1. not getting _source for documents.
  2. not getting stored fields.
  3. Getting _id as doc value rather than from stored field.

Please let me know if you need any more details happy to help.

navneet1v avatar Oct 19 '23 06:10 navneet1v

This is super exciting @VachaShah!! Would you like to share insights on what Transport classes are good candidates for trying out and experimenting the improvements first. Do you think instrumenting the node-to-node interaction for Query and Fetch phases, especially targeting implementations for SearchPhaseResult could be a good idea?

This could include heavily exercised code paths during query executions such as QuerySearchResult, FetchSearchResult, ScrollQuerySearchResult and more.

getsaurabh02 avatar Oct 30 '23 23:10 getsaurabh02

Thank you @getsaurabh02! I have added the classes that I am targeting first in the issue description. They include the Query and Fetch phase implementations of SearchPhaseResult and other related request, response and transport action classes.

VachaShah avatar Nov 01 '23 00:11 VachaShah

The code for end-to-end working POC with _search_protobuf API which is a version of _search API with requests, responses and node-to-node communication using protobuf is built on top of _cat/nodes API POC. The code can be found in this diff: https://github.com/VachaShah/OpenSearch/compare/poc-cat-nodes-protobuf...VachaShah:OpenSearch:poc-search-protobuf?expand=1.

The current POC is for QUERY_THEN_FETCH search types with some embedded objects in the Response as bytes. The next step is to convert those embedded objects into proto messages as well.

I am going to micro benchmarking the protobuf integrated API to compare with the original search API.

Next steps

  • Convert the embedded objects in responses to proto messages.
  • Micro benchmarking for numbers.

VachaShah avatar Nov 18 '23 08:11 VachaShah

@VachaShah - Will this require documentation? If so, can you please create a doc issue, let me know who will be submitting the doc PR, and add it to the unified tracker project? Thank you!

hdhalter avatar Dec 11 '23 23:12 hdhalter

@VachaShah - Will this require documentation? If so, can you please create a doc issue, let me know who will be submitting the doc PR, and add it to the unified tracker project? Thank you!

@hdhalter This does not require documentation as of now since it is a performance improvement. I am going to divide this meta issue into sub tasks and if any of them have a need for documentation, I will make sure to create a doc issue for those.

VachaShah avatar Dec 14 '23 00:12 VachaShah

In order to divide this issue into deliverables, converting this issue into a meta issue. Sub tasks will be listed in the issue description.

VachaShah avatar Dec 14 '23 03:12 VachaShah

Thanks @VachaShah for breaking it down. Tagging @dbwiddis who is interested in contributing.

saratvemulapalli avatar Dec 18 '23 19:12 saratvemulapalli

Benchmarks

The benchmarks are taken using opensearch-benchmark for both the original search API and protobuf version of the API for default searches in benchmarks. The workload used is nyc_taxis.

5 nodes cluster

Seeing a 19.1% decrease in latency for default search with protobuf integration.

Original API

Metric Task Value Unit
Cumulative indexing time of primary shards 0.00248333 min
Min cumulative indexing time across primary shards 0.00248333 min
Median cumulative indexing time across primary shards 0.00248333 min
Max cumulative indexing time across primary shards 0.00248333 min
Cumulative indexing throttle time of primary shards 0 min
Min cumulative indexing throttle time across primary shards 0 min
Median cumulative indexing throttle time across primary shards 0 min
Max cumulative indexing throttle time across primary shards 0 min
Cumulative merge time of primary shards 0 min
Cumulative merge count of primary shards 0
Min cumulative merge time across primary shards 0 min
Median cumulative merge time across primary shards 0 min
Max cumulative merge time across primary shards 0 min
Cumulative merge throttle time of primary shards 0 min
Min cumulative merge throttle time across primary shards 0 min
Median cumulative merge throttle time across primary shards 0 min
Max cumulative merge throttle time across primary shards 0 min
Cumulative refresh time of primary shards 0.00153333 min
Cumulative refresh count of primary shards 5
Min cumulative refresh time across primary shards 0.00153333 min
Median cumulative refresh time across primary shards 0.00153333 min
Max cumulative refresh time across primary shards 0.00153333 min
Cumulative flush time of primary shards 0 min
Cumulative flush count of primary shards 0
Min cumulative flush time across primary shards 0 min
Median cumulative flush time across primary shards 0 min
Max cumulative flush time across primary shards 0 min
Total Young Gen GC time 0.008 s
Total Young Gen GC count 1
Total Old Gen GC time 0 s
Total Old Gen GC count 0
Store size 0.000252848 GB
Translog size 5.12227e-08 GB
Heap used for segments 0 MB
Heap used for doc values 0 MB
Heap used for terms 0 MB
Heap used for norms 0 MB
Heap used for points 0 MB
Heap used for stored fields 0 MB
Segment count 8
Min Throughput index 26349.6 docs/s
Mean Throughput index 26349.6 docs/s
Median Throughput index 26349.6 docs/s
Max Throughput index 26349.6 docs/s
50th percentile latency index 31.8154 ms
100th percentile latency index 38.6998 ms
50th percentile service time index 31.8154 ms
100th percentile service time index 38.6998 ms
error rate index 0 %
Min Throughput wait-until-merges-finish 163.69 ops/s
Mean Throughput wait-until-merges-finish 163.69 ops/s
Median Throughput wait-until-merges-finish 163.69 ops/s
Max Throughput wait-until-merges-finish 163.69 ops/s
100th percentile latency wait-until-merges-finish 5.62406 ms
100th percentile service time wait-until-merges-finish 5.62406 ms
error rate wait-until-merges-finish 0 %
Min Throughput default 83.27 ops/s
Mean Throughput default 83.27 ops/s
Median Throughput default 83.27 ops/s
Max Throughput default 83.27 ops/s
100th percentile latency default 17.6042 ms
100th percentile service time default 5.37617 ms
error rate default 0 %
Min Throughput range 112.38 ops/s
Mean Throughput range 112.38 ops/s
Median Throughput range 112.38 ops/s
Max Throughput range 112.38 ops/s
100th percentile latency range 14.9054 ms
100th percentile service time range 5.7996 ms
error rate range 0 %

API with Protobuf

Metric Task Value Unit
Cumulative indexing time of primary shards 0.00171667 min
Min cumulative indexing time across primary shards 0.00171667 min
Median cumulative indexing time across primary shards 0.00171667 min
Max cumulative indexing time across primary shards 0.00171667 min
Cumulative indexing throttle time of primary shards 0 min
Min cumulative indexing throttle time across primary shards 0 min
Median cumulative indexing throttle time across primary shards 0 min
Max cumulative indexing throttle time across primary shards 0 min
Cumulative merge time of primary shards 0 min
Cumulative merge count of primary shards 0
Min cumulative merge time across primary shards 0 min
Median cumulative merge time across primary shards 0 min
Max cumulative merge time across primary shards 0 min
Cumulative merge throttle time of primary shards 0 min
Min cumulative merge throttle time across primary shards 0 min
Median cumulative merge throttle time across primary shards 0 min
Max cumulative merge throttle time across primary shards 0 min
Cumulative refresh time of primary shards 0.001 min
Cumulative refresh count of primary shards 5
Min cumulative refresh time across primary shards 0.001 min
Median cumulative refresh time across primary shards 0.001 min
Max cumulative refresh time across primary shards 0.001 min
Cumulative flush time of primary shards 0 min
Cumulative flush count of primary shards 0
Min cumulative flush time across primary shards 0 min
Median cumulative flush time across primary shards 0 min
Max cumulative flush time across primary shards 0 min
Total Young Gen GC time 0.004 s
Total Young Gen GC count 1
Total Old Gen GC time 0 s
Total Old Gen GC count 0
Store size 0.000253422 GB
Translog size 5.12227e-08 GB
Heap used for segments 0 MB
Heap used for doc values 0 MB
Heap used for terms 0 MB
Heap used for norms 0 MB
Heap used for points 0 MB
Heap used for stored fields 0 MB
Segment count 8
Min Throughput index 31452.4 docs/s
Mean Throughput index 31452.4 docs/s
Median Throughput index 31452.4 docs/s
Max Throughput index 31452.4 docs/s
50th percentile latency index 25.6299 ms
100th percentile latency index 26.2214 ms
50th percentile service time index 25.6299 ms
100th percentile service time index 26.2214 ms
error rate index 0 %
Min Throughput wait-until-merges-finish 292.51 ops/s
Mean Throughput wait-until-merges-finish 292.51 ops/s
Median Throughput wait-until-merges-finish 292.51 ops/s
Max Throughput wait-until-merges-finish 292.51 ops/s
100th percentile latency wait-until-merges-finish 3.01014 ms
100th percentile service time wait-until-merges-finish 3.01014 ms
error rate wait-until-merges-finish 0 %
Min Throughput default 110.58 ops/s
Mean Throughput default 110.58 ops/s
Median Throughput default 110.58 ops/s
Max Throughput default 110.58 ops/s
100th percentile latency default 14.2405 ms
100th percentile service time default 3.97145 ms
error rate default 0 %
Min Throughput range 142.97 ops/s
Mean Throughput range 142.97 ops/s
Median Throughput range 142.97 ops/s
Max Throughput range 142.97 ops/s
100th percentile latency range 11.715 ms
100th percentile service time range 4.50792 ms
error rate range 0 %

10 nodes cluster

Seeing a 23.03% decrease in latency for default search with protobuf integration.

Original API

Metric Task Value Unit
Cumulative indexing time of primary shards 0.00198333 min
Min cumulative indexing time across primary shards 0.00198333 min
Median cumulative indexing time across primary shards 0.00198333 min
Max cumulative indexing time across primary shards 0.00198333 min
Cumulative indexing throttle time of primary shards 0 min
Min cumulative indexing throttle time across primary shards 0 min
Median cumulative indexing throttle time across primary shards 0 min
Max cumulative indexing throttle time across primary shards 0 min
Cumulative merge time of primary shards 0 min
Cumulative merge count of primary shards 0
Min cumulative merge time across primary shards 0 min
Median cumulative merge time across primary shards 0 min
Max cumulative merge time across primary shards 0 min
Cumulative merge throttle time of primary shards 0 min
Min cumulative merge throttle time across primary shards 0 min
Median cumulative merge throttle time across primary shards 0 min
Max cumulative merge throttle time across primary shards 0 min
Cumulative refresh time of primary shards 0.00126667 min
Cumulative refresh count of primary shards 5
Min cumulative refresh time across primary shards 0.00126667 min
Median cumulative refresh time across primary shards 0.00126667 min
Max cumulative refresh time across primary shards 0.00126667 min
Cumulative flush time of primary shards 0 min
Cumulative flush count of primary shards 0
Min cumulative flush time across primary shards 0 min
Median cumulative flush time across primary shards 0 min
Max cumulative flush time across primary shards 0 min
Total Young Gen GC time 0.007 s
Total Young Gen GC count 1
Total Old Gen GC time 0 s
Total Old Gen GC count 0
Store size 0.000223138 GB
Translog size 5.12227e-08 GB
Heap used for segments 0 MB
Heap used for doc values 0 MB
Heap used for terms 0 MB
Heap used for norms 0 MB
Heap used for points 0 MB
Heap used for stored fields 0 MB
Segment count 5
Min Throughput index 24834.7 docs/s
Mean Throughput index 24834.7 docs/s
Median Throughput index 24834.7 docs/s
Max Throughput index 24834.7 docs/s
50th percentile latency index 41.3685 ms
100th percentile latency index 45.1308 ms
50th percentile service time index 41.3685 ms
100th percentile service time index 45.1308 ms
error rate index 0 %
Min Throughput wait-until-merges-finish 167.27 ops/s
Mean Throughput wait-until-merges-finish 167.27 ops/s
Median Throughput wait-until-merges-finish 167.27 ops/s
Max Throughput wait-until-merges-finish 167.27 ops/s
100th percentile latency wait-until-merges-finish 5.51836 ms
100th percentile service time wait-until-merges-finish 5.51836 ms
error rate wait-until-merges-finish 0 %
Min Throughput default 90.29 ops/s
Mean Throughput default 90.29 ops/s
Median Throughput default 90.29 ops/s
Max Throughput default 90.29 ops/s
100th percentile latency default 18.8456 ms
100th percentile service time default 6.55805 ms
error rate default 0 %
Min Throughput range 100.27 ops/s
Mean Throughput range 100.27 ops/s
Median Throughput range 100.27 ops/s
Max Throughput range 100.27 ops/s
100th percentile latency range 19.1147 ms
100th percentile service time range 8.94176 ms
error rate range 0 %

API with protobuf

Metric Task Value Unit
Cumulative indexing time of primary shards 0.00223333 min
Min cumulative indexing time across primary shards 0.00223333 min
Median cumulative indexing time across primary shards 0.00223333 min
Max cumulative indexing time across primary shards 0.00223333 min
Cumulative indexing throttle time of primary shards 0 min
Min cumulative indexing throttle time across primary shards 0 min
Median cumulative indexing throttle time across primary shards 0 min
Max cumulative indexing throttle time across primary shards 0 min
Cumulative merge time of primary shards 0 min
Cumulative merge count of primary shards 0
Min cumulative merge time across primary shards 0 min
Median cumulative merge time across primary shards 0 min
Max cumulative merge time across primary shards 0 min
Cumulative merge throttle time of primary shards 0 min
Min cumulative merge throttle time across primary shards 0 min
Median cumulative merge throttle time across primary shards 0 min
Max cumulative merge throttle time across primary shards 0 min
Cumulative refresh time of primary shards 0.00111667 min
Cumulative refresh count of primary shards 5
Min cumulative refresh time across primary shards 0.00111667 min
Median cumulative refresh time across primary shards 0.00111667 min
Max cumulative refresh time across primary shards 0.00111667 min
Cumulative flush time of primary shards 0 min
Cumulative flush count of primary shards 0
Min cumulative flush time across primary shards 0 min
Median cumulative flush time across primary shards 0 min
Max cumulative flush time across primary shards 0 min
Total Young Gen GC time 0 s
Total Young Gen GC count 0
Total Old Gen GC time 0 s
Total Old Gen GC count 0
Store size 0.000251911 GB
Translog size 5.12227e-08 GB
Heap used for segments 0 MB
Heap used for doc values 0 MB
Heap used for terms 0 MB
Heap used for norms 0 MB
Heap used for points 0 MB
Heap used for stored fields 0 MB
Segment count 8
Min Throughput index 27541 docs/s
Mean Throughput index 27541 docs/s
Median Throughput index 27541 docs/s
Max Throughput index 27541 docs/s
50th percentile latency index 30.1443 ms
100th percentile latency index 31.1228 ms
50th percentile service time index 30.1443 ms
100th percentile service time index 31.1228 ms
error rate index 0 %
Min Throughput wait-until-merges-finish 221.57 ops/s
Mean Throughput wait-until-merges-finish 221.57 ops/s
Median Throughput wait-until-merges-finish 221.57 ops/s
Max Throughput wait-until-merges-finish 221.57 ops/s
100th percentile latency wait-until-merges-finish 3.96129 ms
100th percentile service time wait-until-merges-finish 3.96129 ms
error rate wait-until-merges-finish 0 %
Min Throughput default 138.62 ops/s
Mean Throughput default 138.62 ops/s
Median Throughput default 138.62 ops/s
Max Throughput default 138.62 ops/s
100th percentile latency default 15.7351 ms
100th percentile service time default 5.31692 ms
error rate default 0 %
Min Throughput range 117.22 ops/s
Mean Throughput range 117.22 ops/s
Median Throughput range 117.22 ops/s
Max Throughput range 117.22 ops/s
100th percentile latency range 15.5039 ms
100th percentile service time range 6.77485 ms
error rate range 0 %

VachaShah avatar Jan 03 '24 22:01 VachaShah

Just a quick note, please also benchmark/profile for CPU and JVM overhead reduction with the change during ser/de.

Bukhtawar avatar Jan 04 '24 11:01 Bukhtawar

@VachaShah - 100th percentile is generally skewed, 90th percentile is more reliable, can you please share the numbers for 90th and 99th percentile. Also, are these single runs or average of 'N' runs?

backslasht avatar Jan 09 '24 17:01 backslasht

@VachaShah - 100th percentile is generally skewed, 90th percentile is more reliable, can you please share the numbers for 90th and 99th percentile. Also, are these single runs or average of 'N' runs?

OSB runs 100 iterations for search, so this is average for those 100 runs. OSB publishes the 100th percentile for the operations, I think the OSB code needs to be modified to get the other percentiles. @gkamat Is there a way to customize this from command line?

VachaShah avatar Jan 09 '24 20:01 VachaShah

CPU Utilization

I analyzed the CPU used when running benchmarks and with async-profiler.

CPU used during search

  • CPU spike during original search API runs with OSB: 15.8%
  • CPU spike during protobuf search API runs with OSB: 12.7%

async-profiler CPU profile

  • executeFetchPhase for original search API - CPU 67.05%
  • executeFetchPhase for search API with protobuf - CPU 29.77%

Default Search Screenshot 2024-01-10 at 10 57 35 PM

Protobuf Search Screenshot 2024-01-10 at 10 58 40 PM

VachaShah avatar Jan 11 '24 07:01 VachaShah

Hi, are we on track for this to be released in 2.12 ?

kiranprakash154 avatar Jan 19 '24 00:01 kiranprakash154

Given we are trying to build some cleaner abstractions around detecting the protocol and making it pluggable through some refactoring, this will need some more time and I believe the code will be in a good state by 2.13

getsaurabh02 avatar Feb 01 '24 00:02 getsaurabh02

Update

With the discussion on using protobuf for API request/response and node-to-node communication in the transport layer, we have first taken up making the transport layer abstract to support multiple protocols for serialization and deserialization. This will decouple the node-to-node communication in the transport layer from the current serialization mechanism (which is now referred to as native protocol in the codebase).

After these changes, we will take up adding protobuf into the codebase for search API in a way that the API request/response layer is not tightly coupled with the serialization mechanisms (which is the case currently for example how Writeable is implemented by model classes and request/response classes).

Transport layer abstractions and decoupling

  • Abstracting the inbound side of transport to support multiple serialization protocols - https://github.com/opensearch-project/OpenSearch/pull/12967, https://github.com/opensearch-project/OpenSearch/pull/13126
  • Abstracting the outbound side of transport to support multiple serialization protocols - https://github.com/opensearch-project/OpenSearch/pull/13293

Introduction of protobuf for search API (WIP - might be divided into more PRs)

  • Separating out native serialization from Search classes - https://github.com/opensearch-project/OpenSearch/pull/13697 (this came up from the discussion in #13178 since the current serialization is tightly tied to request/response classes not allowing protobuf to be added in an extensible manner)
  • Adding proto models for basic search related classes - https://github.com/opensearch-project/OpenSearch/pull/13178
  • Integrating protobuf structures into search along with node-to-node communication at the transport layer using protobuf - https://github.com/opensearch-project/OpenSearch/pull/11910

VachaShah avatar May 13 '24 02:05 VachaShah