datahub icon indicating copy to clipboard operation
datahub copied to clipboard

Unable to ingest Elasticsearch data streams

Open lksnyder0 opened this issue 1 year ago • 3 comments

Describe the bug I cannot ingest any data streams when targeting an elasticsearch cluster running version 8.11.4.

To Reproduce Steps to reproduce the behavior:

  1. Configure a data stream in an elasticsearch cluster version >=8.11
  2. Configure the elasticsearch cluster as a new source. Add the data stream name to source.config.index_pattern.allow[]
  3. Run the recipe

Expected behavior I expect the configured data stream will be ingested as a single dataset.

Screenshots If applicable, add screenshots to help explain your problem. An example recipe image Run Results image Data Streams On Targeted Cluster image

Desktop (please complete the following information):

  • OS: [e.g. iOS]
  • Browser [e.g. chrome, safari]
  • Version [e.g. 22]

Additional context

  • This was tested on a new install following the quick-start guide.
  • The aliases API no longer includes data stream indices.

lksnyder0 avatar Mar 06 '24 17:03 lksnyder0

This issue is stale because it has been open for 30 days with no activity. If you believe this is still an issue on the latest DataHub release please leave a comment with the version that you tested it with. If this is a question/discussion please head to https://slack.datahubproject.io. For feature requests please use https://feature-requests.datahubproject.io

github-actions[bot] avatar Apr 06 '24 01:04 github-actions[bot]

The aliases API no longer includes data stream indices.

I'm not too familiar with the elastic source - what API should we be calling instead?

hsheth2 avatar Apr 08 '24 17:04 hsheth2

This snippet will return all available indices and data streams (excluding system and hidden) with their mappings.

import logging
from elasticsearch import Elasticsearch
from elasticsearch.exceptions import BadRequestError

logging.basicConfig(level=logging.ERROR)

ES_USER = "elastic"
ES_PASS = "changeme"

es = Elasticsearch(hosts="https://localhost:9200",
                   basic_auth=(ES_USER, ES_PASS),
                   verify_certs=False)

def get_mappings():
    # All internal and hidden indices start with ".". These should be removed unless
    # specifically requested. By using the _cat api we can get exactly the info we need.
    # The _all below could be replaced with a wildcard expression. Pr we can use the
    # regex from the recipe to filter instead.
    indices = [
        i for i in es.cat.indices(index="_all", h="index", format="json")
        if not i.get("index").startswith(".")
    ]
    # Now go get the mapping for the indices we care about
    for each in indices:
        index = each.get("index")
        each["name"] = index
        del each["index"]
        each["mapping"] = es.indices.get_mapping(
            index=index).body[index]["mappings"]

    # The data streams API returns all the info we need
    data_streams = es.indices.get_data_stream(name="_all")
    for each in data_streams["data_streams"]:
        # The most recent index will always be last
        latest_index = each["indices"][-1]["index_name"]
        mapping = es.indices.get_mapping(
            index=latest_index).body[latest_index]["mappings"]
        # Don't need this key, delete
        del mapping["_data_stream_timestamp"]
        description = {
            "name": each["name"],
            "latest_index": latest_index,
            "mapping": mapping
        }
        indices.append(description)

    print("Discovered:")
    for each in indices:
        print(f"    Name: {each['name']}")
        if each.get("latest_index"):
            print(f"    Latest Index: {each['latest_index']}")
        print(f"    Mapping: {each['mapping']}")

get_mappings()

Example output:

Indices:
    Name: index1
    Mapping: {'properties': {'@timestamp': {'type': 'date'}}}
    Name: index2
    Mapping: {'properties': {'@timestamp': {'type': 'date'}, 'number_field': {'type': 'long'}}}
    Name: ds1
    Latest Index: .ds-ds1-2024.04.09-000001
    Mapping: {'properties': {'@timestamp': {'type': 'date'}}}
    Name: ds2
    Latest Index: .ds-ds2-2024.04.09-000001
    Mapping: {'properties': {'@timestamp': {'type': 'date'}, 'number_field': {'type': 'long'}}}

lksnyder0 avatar Apr 09 '24 16:04 lksnyder0