sql
sql copied to clipboard
[FEATURE] Stabilize preferred SQL implicit sorting method
Is your feature request related to a problem?
Coming from #3061, there seems to be a few places in-code where we implicitly sort records when ORDER BY clauses aren't supplied by a user. Currently we use _doc, a per-shard incrementing counter for each indexed document. This works fine for single-shard indices, but causes unstable sorting in some cases when there are multiple shards involved. As a workaround, I'm adding a secondary sort by _id in cases where it's known that this causes issues, but this has a large performance impact (~4x latency in local testing).
What solution would you like? There should be a single source of truth for what the "implicit sorting" key is, and that implicit sort should be guaranteed unique per-document. This can probably go in a helper library somewhere, I'm just not quite acquainted enough with the plugin to know where that would be.
What alternatives have you considered?
- Sort just by ID, throw out
_doc: this might be faster than trying to sort by two keys, but throws out all ordering. - Sort by
@timestamp: Not guaranteed to be present on all indices anymore. - Some other field that bundles the rough chronology of
_docwith information on shards: doesn't yet exist to my knowledge.
Do you have any additional context? See the comments on #3061
Example reproducer script for the issue (before 3061 fix):
import pytest
import opensearchpy
import datetime
import os
import json
def nginx_bulk_req(idx):
# prereq: put ~50 random lines in nginx50.log (doesn't actually need to be nginx logs)
bulk_req = []
with open("test/nginx50.log", "r") as f:
for i, line in enumerate(f):
bulk_req.append(
{"create": {"_index": idx, "_id": f"L{i}"}}
)
bulk_req.append(
{
"@timestamp": datetime.datetime.now(datetime.UTC).isoformat(),
"observedTimestamp": datetime.datetime.now(datetime.UTC).isoformat(),
"traceId": os.urandom(16).hex(),
"spanId": os.urandom(8).hex(),
"body": line,
}
)
return bulk_req
@pytest.fixture(scope="session")
def os_client():
client = opensearchpy.OpenSearch(["localhost:9200"])
with open("test/ss4o_logs.json", "r") as tmpl_file:
tmpl = json.load(tmpl_file)
client.http.post("/_index_template/ss4o_logs", body=tmpl)
return client
@pytest.fixture(scope="module")
def nginx_index(os_client):
idx = f"ss4o_logs-nginx-test-" + os.urandom(4).hex()
bulk_req = nginx_bulk_req(idx)
os_client.bulk("\n".join(map(json.dumps, bulk_req)))
os_client.indices.refresh(idx) # Or else indexed data might not be available in tests
yield idx
os_client.indices.delete_data_stream(idx)
def test_sql_pagination_equals_larger_pages(nginx_index, os_client):
query = f"SELECT _id, spanId FROM `{nginx_index}`"
expected_records = os_client.http.post(
url="/_plugins/_sql",
body={
"query": query,
"fetch_size": 12, # 3x shard count
}
)
expected = list(d[0] for d in expected_records["datarows"])
actual_page_1 = os_client.http.post(
url="/_plugins/_sql",
body={
"query": query,
"fetch_size": 6,
}
)
actual_page_2 = os_client.http.post(
url="/_plugins/_sql",
body={
"cursor": actual_page_1["cursor"],
"fetch_size": 6,
}
)
actual = list(d[0] for d in actual_page_1["datarows"] + actual_page_2["datarows"])
assert expected == actual
ss4o_logs.json
{
"index_patterns": ["ss4o_logs-*-*"],
"data_stream": {},
"template": {
"mappings": {
"_meta": {
"version": "1.0.0",
"catalog": "observability",
"type": "logs",
"component": "log",
"correlations": [
{
"field": "spanId",
"foreign-schema": "traces",
"foreign-field": "spanId"
},
{
"field": "traceId",
"foreign-schema": "traces",
"foreign-field": "traceId"
}
]
},
"_source": {
"enabled": true
},
"dynamic_templates": [
{
"resources_map": {
"mapping": {
"type": "keyword"
},
"path_match": "resource.*"
}
},
{
"attributes_map": {
"mapping": {
"type": "keyword"
},
"path_match": "attributes.*"
}
},
{
"instrumentation_scope_attributes_map": {
"mapping": {
"type": "keyword"
},
"path_match": "instrumentationScope.attributes.*"
}
}
],
"properties": {
"severity": {
"properties": {
"number": {
"type": "long"
},
"text": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
}
}
},
"attributes": {
"type": "object",
"properties": {
"data_stream": {
"properties": {
"dataset": {
"ignore_above": 128,
"type": "keyword"
},
"namespace": {
"ignore_above": 128,
"type": "keyword"
},
"type": {
"ignore_above": 56,
"type": "keyword"
}
}
}
}
},
"body": {
"type": "text"
},
"@timestamp": {
"type": "date"
},
"observedTimestamp": {
"type": "date"
},
"observerTime": {
"type": "alias",
"path": "observedTimestamp"
},
"traceId": {
"ignore_above": 256,
"type": "keyword"
},
"spanId": {
"ignore_above": 256,
"type": "keyword"
},
"schemaUrl": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"instrumentationScope": {
"properties": {
"name": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 128
}
}
},
"version": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"dropped_attributes_count": {
"type": "integer"
},
"schemaUrl": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
}
}
},
"event": {
"properties": {
"domain": {
"ignore_above": 256,
"type": "keyword"
},
"name": {
"ignore_above": 256,
"type": "keyword"
},
"source": {
"ignore_above": 256,
"type": "keyword"
},
"category": {
"ignore_above": 256,
"type": "keyword"
},
"type": {
"ignore_above": 256,
"type": "keyword"
},
"kind": {
"ignore_above": 256,
"type": "keyword"
},
"result": {
"ignore_above": 256,
"type": "keyword"
},
"exception": {
"properties": {
"message": {
"ignore_above": 1024,
"type": "keyword"
},
"type": {
"ignore_above": 256,
"type": "keyword"
},
"stacktrace": {
"type": "text"
}
}
}
}
}
}
},
"settings": {
"index": {
"mapping": {
"total_fields": {
"limit": 10000
}
},
"refresh_interval": "5s",
"number_of_shards": 4
}
}
},
"version": 1,
"_meta": {
"description": "Simple Schema For Observability",
"catalog": "observability",
"type": "logs",
"correlations": [
{
"field": "spanId",
"foreign-schema": "traces",
"foreign-field": "spanId"
},
{
"field": "traceId",
"foreign-schema": "traces",
"foreign-field": "traceId"
}
]
}
}
~~Actually, it looks like _id already is chronological by default (-> time based UUID). It probably isn't a bad idea to use that directly instead of trying to hack _doc.~~
Tested it, using just _id is actually significantly slower than using both _doc and _id.
I think our problem could be resolved after https://github.com/opensearch-project/OpenSearch/pull/18924 merged.