sql icon indicating copy to clipboard operation
sql copied to clipboard

[FEATURE] Stabilize preferred SQL implicit sorting method

Open Swiddis opened this issue 1 year ago • 2 comments

Is your feature request related to a problem? Coming from #3061, there seems to be a few places in-code where we implicitly sort records when ORDER BY clauses aren't supplied by a user. Currently we use _doc, a per-shard incrementing counter for each indexed document. This works fine for single-shard indices, but causes unstable sorting in some cases when there are multiple shards involved. As a workaround, I'm adding a secondary sort by _id in cases where it's known that this causes issues, but this has a large performance impact (~4x latency in local testing).

What solution would you like? There should be a single source of truth for what the "implicit sorting" key is, and that implicit sort should be guaranteed unique per-document. This can probably go in a helper library somewhere, I'm just not quite acquainted enough with the plugin to know where that would be.

What alternatives have you considered?

  • Sort just by ID, throw out _doc: this might be faster than trying to sort by two keys, but throws out all ordering.
  • Sort by @timestamp: Not guaranteed to be present on all indices anymore.
  • Some other field that bundles the rough chronology of _doc with information on shards: doesn't yet exist to my knowledge.

Do you have any additional context? See the comments on #3061

Swiddis avatar Oct 09 '24 18:10 Swiddis

Example reproducer script for the issue (before 3061 fix):

import pytest
import opensearchpy
import datetime
import os
import json


def nginx_bulk_req(idx):
    # prereq: put ~50 random lines in nginx50.log (doesn't actually need to be nginx logs)
    bulk_req = []
    with open("test/nginx50.log", "r") as f:
        for i, line in enumerate(f):
            bulk_req.append(
                {"create": {"_index": idx, "_id": f"L{i}"}}
            )
            bulk_req.append(
                {
                    "@timestamp": datetime.datetime.now(datetime.UTC).isoformat(),
                    "observedTimestamp": datetime.datetime.now(datetime.UTC).isoformat(),
                    "traceId": os.urandom(16).hex(),
                    "spanId": os.urandom(8).hex(),
                    "body": line,
                }
            )
    return bulk_req


@pytest.fixture(scope="session")
def os_client():
    client = opensearchpy.OpenSearch(["localhost:9200"])
    with open("test/ss4o_logs.json", "r") as tmpl_file:
        tmpl = json.load(tmpl_file)
        client.http.post("/_index_template/ss4o_logs", body=tmpl)
    return client


@pytest.fixture(scope="module")
def nginx_index(os_client):
    idx = f"ss4o_logs-nginx-test-" + os.urandom(4).hex()
    bulk_req = nginx_bulk_req(idx)
    os_client.bulk("\n".join(map(json.dumps, bulk_req)))
    os_client.indices.refresh(idx) # Or else indexed data might not be available in tests

    yield idx

    os_client.indices.delete_data_stream(idx)


def test_sql_pagination_equals_larger_pages(nginx_index, os_client):
    query = f"SELECT _id, spanId FROM `{nginx_index}`"

    expected_records = os_client.http.post(
        url="/_plugins/_sql",
        body={
            "query": query,
            "fetch_size": 12, # 3x shard count
        }
    )
    expected = list(d[0] for d in expected_records["datarows"])

    actual_page_1 = os_client.http.post(
        url="/_plugins/_sql",
        body={
            "query": query,
            "fetch_size": 6,
        }
    )
    actual_page_2 = os_client.http.post(
        url="/_plugins/_sql",
        body={
            "cursor": actual_page_1["cursor"],
            "fetch_size": 6,
        }
    )
    actual = list(d[0] for d in actual_page_1["datarows"] + actual_page_2["datarows"])

    assert expected == actual
ss4o_logs.json
{
  "index_patterns": ["ss4o_logs-*-*"],
  "data_stream": {},
  "template": {
    "mappings": {
      "_meta": {
        "version": "1.0.0",
        "catalog": "observability",
        "type": "logs",
        "component": "log",
        "correlations": [
          {
            "field": "spanId",
            "foreign-schema": "traces",
            "foreign-field": "spanId"
          },
          {
            "field": "traceId",
            "foreign-schema": "traces",
            "foreign-field": "traceId"
          }
        ]
      },
      "_source": {
        "enabled": true
      },
      "dynamic_templates": [
        {
          "resources_map": {
            "mapping": {
              "type": "keyword"
            },
            "path_match": "resource.*"
          }
        },
        {
          "attributes_map": {
            "mapping": {
              "type": "keyword"
            },
            "path_match": "attributes.*"
          }
        },
        {
          "instrumentation_scope_attributes_map": {
            "mapping": {
              "type": "keyword"
            },
            "path_match": "instrumentationScope.attributes.*"
          }
        }
      ],
      "properties": {
        "severity": {
          "properties": {
            "number": {
              "type": "long"
            },
            "text": {
              "type": "text",
              "fields": {
                "keyword": {
                  "type": "keyword",
                  "ignore_above": 256
                }
              }
            }
          }
        },
        "attributes": {
          "type": "object",
          "properties": {
            "data_stream": {
              "properties": {
                "dataset": {
                  "ignore_above": 128,
                  "type": "keyword"
                },
                "namespace": {
                  "ignore_above": 128,
                  "type": "keyword"
                },
                "type": {
                  "ignore_above": 56,
                  "type": "keyword"
                }
              }
            }
          }
        },
        "body": {
          "type": "text"
        },
        "@timestamp": {
          "type": "date"
        },
        "observedTimestamp": {
          "type": "date"
        },
        "observerTime": {
          "type": "alias",
          "path": "observedTimestamp"
        },
        "traceId": {
          "ignore_above": 256,
          "type": "keyword"
        },
        "spanId": {
          "ignore_above": 256,
          "type": "keyword"
        },
        "schemaUrl": {
          "type": "text",
          "fields": {
            "keyword": {
              "type": "keyword",
              "ignore_above": 256
            }
          }
        },
        "instrumentationScope": {
          "properties": {
            "name": {
              "type": "text",
              "fields": {
                "keyword": {
                  "type": "keyword",
                  "ignore_above": 128
                }
              }
            },
            "version": {
              "type": "text",
              "fields": {
                "keyword": {
                  "type": "keyword",
                  "ignore_above": 256
                }
              }
            },
            "dropped_attributes_count": {
              "type": "integer"
            },
            "schemaUrl": {
              "type": "text",
              "fields": {
                "keyword": {
                  "type": "keyword",
                  "ignore_above": 256
                }
              }
            }
          }
        },
        "event": {
          "properties": {
            "domain": {
              "ignore_above": 256,
              "type": "keyword"
            },
            "name": {
              "ignore_above": 256,
              "type": "keyword"
            },
            "source": {
              "ignore_above": 256,
              "type": "keyword"
            },
            "category": {
              "ignore_above": 256,
              "type": "keyword"
            },
            "type": {
              "ignore_above": 256,
              "type": "keyword"
            },
            "kind": {
              "ignore_above": 256,
              "type": "keyword"
            },
            "result": {
              "ignore_above": 256,
              "type": "keyword"
            },
            "exception": {
              "properties": {
                "message": {
                  "ignore_above": 1024,
                  "type": "keyword"
                },
                "type": {
                  "ignore_above": 256,
                  "type": "keyword"
                },
                "stacktrace": {
                  "type": "text"
                }
              }
            }
          }
        }
      }
    },
    "settings": {
      "index": {
        "mapping": {
          "total_fields": {
            "limit": 10000
          }
        },
        "refresh_interval": "5s",
        "number_of_shards": 4
      }
    }
  },
  "version": 1,
  "_meta": {
    "description": "Simple Schema For Observability",
    "catalog": "observability",
    "type": "logs",
    "correlations": [
      {
        "field": "spanId",
        "foreign-schema": "traces",
        "foreign-field": "spanId"
      },
      {
        "field": "traceId",
        "foreign-schema": "traces",
        "foreign-field": "traceId"
      }
    ]
  }
}

Swiddis avatar Oct 09 '24 18:10 Swiddis

~~Actually, it looks like _id already is chronological by default (-> time based UUID). It probably isn't a bad idea to use that directly instead of trying to hack _doc.~~

Tested it, using just _id is actually significantly slower than using both _doc and _id.

Swiddis avatar Oct 09 '24 19:10 Swiddis

I think our problem could be resolved after https://github.com/opensearch-project/OpenSearch/pull/18924 merged.

LantaoJin avatar Aug 11 '25 08:08 LantaoJin