首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Elasticsearch重复数据删除

Elasticsearch重复数据删除
EN

Stack Overflow用户
提问于 2020-03-06 06:50:43
回答 2查看 249关注 0票数 0

我有一个文档集合,其中每个文档如下所示

代码语言:javascript
复制
    {
        "_id": ... ,
        "Author": ...,
        "Content": ....,
        "DateTime": ...
    }

我想向集合发出一个查询,这样我就可以从每个作者那里得到最早的文档作为响应。我正在考虑使用术语聚合,但当我这样做时,我得到了一个存储桶列表,作为唯一的作者值,它没有告诉我哪个文档是最旧的。此外,该方法需要对ES进行后续调用,这是不可取的。

如果您能提供任何建议,我们将不胜感激。谢谢。

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2020-03-06 12:54:38

您可以在弹性搜索中使用collapse

它将返回在DateTime上排序的每个作者的前1条记录

代码语言:javascript
复制
{
  "size": 10,
  "collapse": {
    "field": "Author.keyword"
  },
  "sort": [
    {
      "DateTime": {
        "order": "desc"
      }
    }
  ]
}

结果

代码语言:javascript
复制
    "hits" : [
      {
        "_index" : "index83",
        "_type" : "_doc",
        "_id" : "e1QwrnABAWOsYG7tvNrB",
        "_score" : null,
        "_source" : {
          "Author" : "b",
          "Content" : "ADSAD",
          "DateTime" : "2019-03-11"
        },
        "fields" : {
          "Author.keyword" : [
            "b"
          ]
        },
        "sort" : [
          1552262400000
        ]
      },
      {
        "_index" : "index83",
        "_type" : "_doc",
        "_id" : "elQwrnABAWOsYG7to9oS",
        "_score" : null,
        "_source" : {
          "Author" : "a",
          "Content" : "ADSAD",
          "DateTime" : "2019-03-10"
        },
        "fields" : {
          "Author.keyword" : [
            "a"
          ]
        },
        "sort" : [
          1552176000000
        ]
      }
    ]
  }

编辑1:

代码语言:javascript
复制
{
  "size": 10,
  "collapse": {
    "field": "Author.keyword"
  },
  "sort": [
    {
      "DateTime": {
        "order": "desc"
      }
    }
  ],
  "aggs": 
         {
           "authors": {
                       "terms": {
                                "field": "Author.keyword", "size": 10 }, 
                       "aggs": {
                                "doc_count": { "value_count": { "field": 
                                                "Author.keyword"
                                             }
                                 }
                           }
                     }
             }
}
票数 2
EN

Stack Overflow用户

发布于 2020-03-06 08:48:07

没有简单的方法可以直接调用Elasticsearch来完成这项工作。幸运的是,有一个nice article on Elastic Blog展示了一些这样做的方法。

其中一种方法是using logstash,用于删除重复项。其他方法包括使用可在this github repository上找到的Python脚本

代码语言:javascript
复制
#!/usr/local/bin/python3
import hashlib
from elasticsearch import Elasticsearch
es = Elasticsearch(["localhost:9200"])
dict_of_duplicate_docs = {}
# The following line defines the fields that will be
# used to determine if a document is a duplicate
keys_to_include_in_hash = ["CAC", "FTSE", "SMI"]
# Process documents returned by the current search/scroll
def populate_dict_of_duplicate_docs(hits):
    for item in hits:
        combined_key = ""
        for mykey in keys_to_include_in_hash:
            combined_key += str(item['_source'][mykey])
        _id = item["_id"]
        hashval = hashlib.md5(combined_key.encode('utf-8')).digest()
        # If the hashval is new, then we will create a new key
        # in the dict_of_duplicate_docs, which will be
        # assigned a value of an empty array.
        # We then immediately push the _id onto the array.
        # If hashval already exists, then
        # we will just push the new _id onto the existing array
        dict_of_duplicate_docs.setdefault(hashval, []).append(_id)
# Loop over all documents in the index, and populate the
# dict_of_duplicate_docs data structure.
def scroll_over_all_docs():
    data = es.search(index="stocks", scroll='1m',  body={"query": {"match_all": {}}})
    # Get the scroll ID
    sid = data['_scroll_id']
    scroll_size = len(data['hits']['hits'])
    # Before scroll, process current batch of hits
    populate_dict_of_duplicate_docs(data['hits']['hits'])
    while scroll_size > 0:
        data = es.scroll(scroll_id=sid, scroll='2m')
        # Process current batch of hits
        populate_dict_of_duplicate_docs(data['hits']['hits'])
        # Update the scroll ID
        sid = data['_scroll_id']
        # Get the number of results that returned in the last scroll
        scroll_size = len(data['hits']['hits'])
def loop_over_hashes_and_remove_duplicates():
    # Search through the hash of doc values to see if any
    # duplicate hashes have been found
    for hashval, array_of_ids in dict_of_duplicate_docs.items():
      if len(array_of_ids) > 1:
        print("********** Duplicate docs hash=%s **********" % hashval)
        # Get the documents that have mapped to the current hashval
        matching_docs = es.mget(index="stocks", doc_type="doc", body={"ids": array_of_ids})
        for doc in matching_docs['docs']:
            # In this example, we just print the duplicate docs.
            # This code could be easily modified to delete duplicates
            # here instead of printing them
            print("doc=%s\n" % doc)
def main():
    scroll_over_all_docs()
    loop_over_hashes_and_remove_duplicates()
main()
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/60555128

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档