首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >MongoDB到弹性搜索索引

MongoDB到弹性搜索索引
EN

Stack Overflow用户
提问于 2019-09-24 05:46:08
回答 3查看 2.8K关注 0票数 1

卡在elasticsearch中索引数据收集的点上。

下面是我试图索引来自mongo的数据的代码。

代码语言:javascript
复制
const elasticsearch = require('elasticsearch');
// instantiate an Elas

var bulk = [];


var MongoClient = require('mongodb').MongoClient;
var ObjectID = require('mongodb').ObjectID;
var mongoDBName = 'mydb'; // Name of mongodb goes here
var mongoCollectionName = 'mycollection'; // Collection name of mongodb goes here
var connectionString = 'mongodb://127.0.0.1:27017/'; // put username and password for mongo here

var esIndexName = 'new-collection'; // Elasticsearch index name will go here
var bulk = [];
const client = new elasticsearch.Client({
   hosts: [ 'http://localhost:9200']
});
// ping the client to be sure Elasticsearch is up
client.ping({
     requestTimeout: 30000,
 }, function(error) {
 // At this point, eastic search is down, please check your Elasticsearch service
     if (error) {
         console.error('Elasticsearch cluster is down!');
     } else {
         console.log('Everything is ok');
     }
 });


MongoClient.connect(connectionString+mongoDBName, function(err, db) {
    if(err) throw err;

   // for each object in a collection
   var collection = db.collection(mongoCollectionName);
   var counter = 0;
   collection.find().each(function(err, item, response, status) {
       console.log(item)
    Array.from(item).forEach(itemdata => {
        bulk.push({index:{ 
                        _index: esIndexName, 
                        _type: mongoCollectionName,
                    }          
                })
        bulk.push(itemdata)
        })
        //perform bulk indexing of the data passed
        client.bulk({body:bulk}, function( err, response  ){ 
            if( err ){ 
                console.log("Failed Bulk operation".red, err) 
            } else { 
                console.log("Successfully imported %s".green, mongoCollectionName.length); 
            } 
            console.log(response);
        });

    if(item != null) {    
        if(counter % 100 == 0) console.log( "Syncing object id: "+ item['_id'] + " #: " + counter);
        client.indices.create(
         { index: esIndexName },
         function(error, response) {
            if (error) {
                     console.log(error);
                 } else {
               console.log("created a new index", response);
              }
         }
       );
   }
     counter += 1;
   });
});

在这里,我试图将数据索引到elasticsearch中,我能够创建集合索引,但是未能将数据插入弹性搜索的索引中。这里有人能帮我吗?我哪里出错了,我在这里犯了什么错误。我在这里使用nodejs,只是简单的函数测试,稍后将添加lambda函数来更新/删除哪些更改。

EN

回答 3

Stack Overflow用户

发布于 2019-09-24 07:41:09

首先,我建议整理您的代码;很难看出这些块是如何嵌套的。

现在,您的代码有几个问题:

  1. 为什么要做Array.from(item).forEach(itemdata => {item是来自Mongo的文档对象,所以对它执行Array.from没有任何影响。在.each回调中调用bulk API的
  2. ;意味着您将对每个文档执行API调用。我不认为这是您想要的,

  1. ,您正在创建之后的索引,即大容量操作。这是错误的。在插入文档之前,您应该一次性创建ES索引,并为所有的创建索引。这一点很重要,因为在将来,您可能希望有一个更高级的配置来处理对ES的documents.
  2. Your ping调用,但这并不妨碍在集群关闭时运行其余的代码。

所以你应该做的是:

在对documents.

  • Iterate文档进行迭代之前,
  1. 将创建ES索引,并将其积累到body对象中。当您有一批n文档时,请调用bulk API并重置您的身体。
票数 0
EN

Stack Overflow用户

发布于 2019-09-24 19:55:50

这是你正在寻找的解决方案

index.js

代码语言:javascript
复制
//MongoDB client config
var MongoClient = require('mongodb').MongoClient;
var mongoDBName = 'mydb'; // Name of mongodb goes here
var mongoCollectionName = 'mycollection'; // Collection name of mongodb goes here
var connectionString = 'mongodb://127.0.0.1:27017/'; // put username and password for mongo here

//Elasticsearch client config
const { Client } = require('@elastic/elasticsearch')
const esClient = new Client({ node: 'http://localhost:9200' });
var esIndexName = 'new-collection'; // Elasticsearch index name will go here

let bulk = [];

async function indexData() {

  const client = await MongoClient.connect(connectionString, { useNewUrlParser: true })
    .catch(err => { console.log(err); });

  if (!client) {
    return;
  }

  try {

    const db = client.db(mongoDBName);

    let collection = db.collection(mongoCollectionName);
    await collection.find().forEach((doc) => {
      bulk.push({
        index: {
          _index: esIndexName,
        }
      })

      let { _id, ...data } = doc;
      bulk.push(data);
    })
    console.log(bulk);

    await esClient.indices.create({
      index: esIndexName,
    }, { ignore: [400] })

    const { body: bulkResponse } = await esClient.bulk({ refresh: true, body: bulk })

    if (bulkResponse.errors) {
      const erroredDocuments = []
      // The items array has the same order of the dataset we just indexed.
      // The presence of the `error` key indicates that the operation
      // that we did for the document has failed.
      bulkResponse.items.forEach((action, i) => {
        const operation = Object.keys(action)[0]
        if (action[operation].error) {
          erroredDocuments.push({
            // If the status is 429 it means that you can retry the document,
            // otherwise it's very likely a mapping error, and you should
            // fix the document before to try it again.
            status: action[operation].status,
            error: action[operation].error,
            operation: bulk[i * 2],
            document: bulk[i * 2 + 1]
          })
        }
      })
      console.log(erroredDocuments)
    }

    const { body: count } = await esClient.count({ index: esIndexName })
    console.log(count)

  } catch (err) {

    console.log(err);
  } finally {
    client.close();
  }
}

indexData();

package.json

代码语言:javascript
复制
{
  "name": "elastic-node-mongo",
  "version": "1.0.0",
  "description": "Simple example to connect ElasticSearch, MongoDB and NodeJS",
  "main": "index.js",
  "dependencies": {
    "@elastic/elasticsearch": "^7.3.0",
    "mongodb": "^3.3.2",
    "nodemon": "1.18.3"
  },
  "scripts": {
    "dev": "nodemon",
    "start": "node index.js"
  },
  "keywords": [
    "nodejs",
    "node",
    "mongodb",
    "elasticsearch",
    "docker"
  ],
  "author": "Sathishkumar Rakkiasmy",
  "license": "ISC"
}

澄清

,我能够创建集合索引,但未能将数据插入弹性搜索索引中。

上面的句子说得通。因为bulk变量是不变的。

请参考下面的链接,说明为什么bulk变量是不变的。

Why is my variable unaltered after I modify it inside of a function? - Asynchronous code reference

How do I return the response from an asynchronous call?

了解有关异步编程的更多信息

https://developer.mozilla.org/en-US/docs/Learn/JavaScript/Asynchronous

https://developer.mozilla.org/en-US/docs/Learn/JavaScript/Asynchronous/Async_await

票数 0
EN

Stack Overflow用户

发布于 2019-09-24 09:03:52

您可以使用logstash将数据从mongo导入到elasticsearch.Please,找到附加的配置以供参考。

代码语言:javascript
复制
 input {
    mongodb {
    codec => “json”
    uri => ‘mongodb://localhost:27017/NewDb’
    placeholder_db_dir => ‘/home/devbrt.shukla/Desktop/scalaoutput/ELK/logstash-6.4.1/db_dir’
    placeholder_db_name => ‘Employee_sqlite.db’
    collection => ‘Employee’
    batch_size => 5000
    generateId => ‘true’
    parse_method => “simple”
    }
    }
    filter {
    mutate {
    remove_field => [ “_id” ]
    }
    }
    output {
    elasticsearch {
    hosts => [“localhost:9200”]
    index => “employee-%{+YYYY.MM.dd}”
    }
    stdout { codec => rubydebug } }

在Logstash中,我们将输入、筛选和输出三个部分。

输入:是从sql、mongodb、mysql等获取数据。

Filter:在本节中,我们可以将定制的json框架设置为索引到elasticsearch中。

output :在本节中,我们将输入输出部分的索引名、doc类型和Ip地址,即elasticsearch。

票数 -1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/58073767

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档