deduplicate-elasticsearch icon indicating copy to clipboard operation
deduplicate-elasticsearch copied to clipboard

Incorrect doc_type value

Open jcferrer opened this issue 3 years ago • 1 comments

Line in the script:

matching_docs = es.mget(index="stocks", doc_type="doc", body={"ids": array_of_ids})

should change to:

matching_docs = es.mget(index="stocks", doc_type="_doc", body={"ids": array_of_ids})

doc_type shoud be _doc instead of doc, otherwise the matching_docs return found:false

JC

jcferrer avatar Sep 22 '21 23:09 jcferrer

Base on the las version of elasticsearch-py mget the doc_type & body it's not available, you can update this line like this:

matching_docs = es.mget(index=index, ids=array_of_ids)

You can change the es.search too like that (this example use time range to limit the scope of the query):

query = {"range": {"@timestamp": {"gte": "2023-01-30T09:48:00.000Z","lte": "2023-01-30T09:59:00.000Z"}}}
index = "the_index"

es.search(index=index, scroll='1m', query=query)

this version works for me:

import hashlib
from elasticsearch import Elasticsearch

index="INDEX"
query={"range": {"@timestamp": {"gte": "2023-01-30T09:48:00.000Z","lte": "2023-01-30T09:59:00.000Z"}}}
elastic = "https://HOST:PORT"
user = "USER"
password = "PASSWORD"


keys_to_include_in_hash = ["filed1", "filed2", "filed3", "filed4"]

es = Elasticsearch(
    hosts = elastic,
    basic_auth=(user, password)
)

dict_of_duplicate_docs = {}

def populate_dict_of_duplicate_docs(hits):
    for item in hits:
        combined_key = ""
        for mykey in keys_to_include_in_hash:
            combined_key += str(item['_source'][mykey])
        _id = item["_id"]
        hashval = hashlib.md5(combined_key.encode('utf-8')).digest()
        dict_of_duplicate_docs.setdefault(hashval, []).append(_id)

def scroll_over_all_docs():
    data = es.search(index=index, scroll='1m', query=query)
    sid = data['_scroll_id']
    scroll_size = len(data['hits']['hits'])
    populate_dict_of_duplicate_docs(data['hits']['hits'])
    while scroll_size > 0:
        data = es.scroll(scroll_id=sid, scroll='2m')
        populate_dict_of_duplicate_docs(data['hits']['hits'])
        sid = data['_scroll_id']
        scroll_size = len(data['hits']['hits'])

def loop_over_hashes_and_remove_duplicates():
    for hashval, array_of_ids in dict_of_duplicate_docs.items():
        if len(array_of_ids) > 1:
            print("********** Duplicate docs hash=%s **********" % hashval)
            matching_docs = es.mget(index=index, ids=array_of_ids)
            for doc in matching_docs['docs']:
                print("doc=%s\n" % doc)

scroll_over_all_docs()

loop_over_hashes_and_remove_duplicates()

ocuil avatar Jan 31 '23 07:01 ocuil