deduplicate-elasticsearch
deduplicate-elasticsearch copied to clipboard
Incorrect doc_type value
Line in the script:
matching_docs = es.mget(index="stocks", doc_type="doc", body={"ids": array_of_ids})
should change to:
matching_docs = es.mget(index="stocks", doc_type="_doc", body={"ids": array_of_ids})
doc_type shoud be _doc instead of doc, otherwise the matching_docs return found:false
JC
Base on the las version of elasticsearch-py mget the doc_type
& body
it's not available, you can update this line like this:
matching_docs = es.mget(index=index, ids=array_of_ids)
You can change the es.search too like that (this example use time range to limit the scope of the query):
query = {"range": {"@timestamp": {"gte": "2023-01-30T09:48:00.000Z","lte": "2023-01-30T09:59:00.000Z"}}}
index = "the_index"
es.search(index=index, scroll='1m', query=query)
this version works for me:
import hashlib
from elasticsearch import Elasticsearch
index="INDEX"
query={"range": {"@timestamp": {"gte": "2023-01-30T09:48:00.000Z","lte": "2023-01-30T09:59:00.000Z"}}}
elastic = "https://HOST:PORT"
user = "USER"
password = "PASSWORD"
keys_to_include_in_hash = ["filed1", "filed2", "filed3", "filed4"]
es = Elasticsearch(
hosts = elastic,
basic_auth=(user, password)
)
dict_of_duplicate_docs = {}
def populate_dict_of_duplicate_docs(hits):
for item in hits:
combined_key = ""
for mykey in keys_to_include_in_hash:
combined_key += str(item['_source'][mykey])
_id = item["_id"]
hashval = hashlib.md5(combined_key.encode('utf-8')).digest()
dict_of_duplicate_docs.setdefault(hashval, []).append(_id)
def scroll_over_all_docs():
data = es.search(index=index, scroll='1m', query=query)
sid = data['_scroll_id']
scroll_size = len(data['hits']['hits'])
populate_dict_of_duplicate_docs(data['hits']['hits'])
while scroll_size > 0:
data = es.scroll(scroll_id=sid, scroll='2m')
populate_dict_of_duplicate_docs(data['hits']['hits'])
sid = data['_scroll_id']
scroll_size = len(data['hits']['hits'])
def loop_over_hashes_and_remove_duplicates():
for hashval, array_of_ids in dict_of_duplicate_docs.items():
if len(array_of_ids) > 1:
print("********** Duplicate docs hash=%s **********" % hashval)
matching_docs = es.mget(index=index, ids=array_of_ids)
for doc in matching_docs['docs']:
print("doc=%s\n" % doc)
scroll_over_all_docs()
loop_over_hashes_and_remove_duplicates()