datahub
datahub copied to clipboard
Unable to ingest Elasticsearch data streams
Describe the bug I cannot ingest any data streams when targeting an elasticsearch cluster running version 8.11.4.
To Reproduce Steps to reproduce the behavior:
- Configure a data stream in an elasticsearch cluster version >=8.11
- Configure the elasticsearch cluster as a new source. Add the data stream name to
source.config.index_pattern.allow[] - Run the recipe
Expected behavior I expect the configured data stream will be ingested as a single dataset.
Screenshots
If applicable, add screenshots to help explain your problem.
An example recipe
Run Results
Data Streams On Targeted Cluster
Desktop (please complete the following information):
- OS: [e.g. iOS]
- Browser [e.g. chrome, safari]
- Version [e.g. 22]
Additional context
- This was tested on a new install following the quick-start guide.
- The aliases API no longer includes data stream indices.
This issue is stale because it has been open for 30 days with no activity. If you believe this is still an issue on the latest DataHub release please leave a comment with the version that you tested it with. If this is a question/discussion please head to https://slack.datahubproject.io. For feature requests please use https://feature-requests.datahubproject.io
The aliases API no longer includes data stream indices.
I'm not too familiar with the elastic source - what API should we be calling instead?
This snippet will return all available indices and data streams (excluding system and hidden) with their mappings.
import logging
from elasticsearch import Elasticsearch
from elasticsearch.exceptions import BadRequestError
logging.basicConfig(level=logging.ERROR)
ES_USER = "elastic"
ES_PASS = "changeme"
es = Elasticsearch(hosts="https://localhost:9200",
basic_auth=(ES_USER, ES_PASS),
verify_certs=False)
def get_mappings():
# All internal and hidden indices start with ".". These should be removed unless
# specifically requested. By using the _cat api we can get exactly the info we need.
# The _all below could be replaced with a wildcard expression. Pr we can use the
# regex from the recipe to filter instead.
indices = [
i for i in es.cat.indices(index="_all", h="index", format="json")
if not i.get("index").startswith(".")
]
# Now go get the mapping for the indices we care about
for each in indices:
index = each.get("index")
each["name"] = index
del each["index"]
each["mapping"] = es.indices.get_mapping(
index=index).body[index]["mappings"]
# The data streams API returns all the info we need
data_streams = es.indices.get_data_stream(name="_all")
for each in data_streams["data_streams"]:
# The most recent index will always be last
latest_index = each["indices"][-1]["index_name"]
mapping = es.indices.get_mapping(
index=latest_index).body[latest_index]["mappings"]
# Don't need this key, delete
del mapping["_data_stream_timestamp"]
description = {
"name": each["name"],
"latest_index": latest_index,
"mapping": mapping
}
indices.append(description)
print("Discovered:")
for each in indices:
print(f" Name: {each['name']}")
if each.get("latest_index"):
print(f" Latest Index: {each['latest_index']}")
print(f" Mapping: {each['mapping']}")
get_mappings()
Example output:
Indices:
Name: index1
Mapping: {'properties': {'@timestamp': {'type': 'date'}}}
Name: index2
Mapping: {'properties': {'@timestamp': {'type': 'date'}, 'number_field': {'type': 'long'}}}
Name: ds1
Latest Index: .ds-ds1-2024.04.09-000001
Mapping: {'properties': {'@timestamp': {'type': 'date'}}}
Name: ds2
Latest Index: .ds-ds2-2024.04.09-000001
Mapping: {'properties': {'@timestamp': {'type': 'date'}, 'number_field': {'type': 'long'}}}