首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用elasticsearch将元组从storm索引到elasticsearch-hadoop库不起作用

使用elasticsearch将元组从storm索引到elasticsearch-hadoop库不起作用
EN

Stack Overflow用户
提问于 2016-04-16 17:03:51
回答 2查看 1.2K关注 0票数 3

我想从Storm中将文档索引到Elasticsearch中,但我无法将任何文档索引到Elasticsearch中。

在我的拓扑中,我有一个KafkaSpout,它向一个EsBolt发出类似于{“tweetId”:1,“text”:“hello”}的json,这是一个来自elasticsearch-hadoop库的原生螺栓,它将Storm Tuples写入到Elasticsearch (文档在这里:https://www.elastic.co/guide/en/elasticsearch/hadoop/current/storm.html)。以下是我的EsBolt的配置:

代码语言:javascript
复制
Map conf = new HashMap();
conf.put("es.nodes","127.0.0.1");
conf.put("es.port","9200");
conf.put("es.resource","twitter/tweet");
conf.put("es.index.auto.create","no");
conf.put("es.input.json", "true");
conf.put("es.mapping.id", "tweetId");
EsBolt elasticsearchBolt = new EsBolt("twitter/tweet", conf);

前两个配置默认情况下具有这些值,但我选择显式设置它们。我也尝试过不使用它们,也得到了同样的结果。

这就是我构建拓扑的方式:

代码语言:javascript
复制
TopologyBuilder builder = new TopologyBuilder();

builder.setSpout(TWEETS_DATA_KAFKA_SPOUT_ID, kafkaSpout, kafkaSpoutParallelism)
        .setNumTasks(kafkaSpoutNumberOfTasks);


builder.setBolt(ELASTICSEARCH_BOLT_ID, elasticsearchBolt, elasticsearchBoltParallelism)
        .setNumTasks(elasticsearchBoltNumberOfTasks)
        .shuffleGrouping(TWEETS_DATA_KAFKA_SPOUT_ID);

return builder.createTopology();

在本地运行拓扑之前,我在Elasticsearch中创建了"twitter“索引,并为该索引创建了一个映射"tweet”。这是我检索新创建的类型(curl -XGET‘http://localhost:9200/twitter/_mapping/tweet’)的映射时得到的结果:

代码语言:javascript
复制
{
   "twitter": {
      "mappings": {
         "tweet": {
            "properties": {
               "text": {
                  "type": "string"
               },
               "tweetId": {
                  "type": "string"
               }
            }
         }
      }
   }
}

我在本地运行拓扑,这是我在处理元组时在控制台中得到的:

代码语言:javascript
复制
Processing received message FOR 6 TUPLE: source: tweets-data-kafka-spout:9, stream: default, id: {-8010897758788654352=-6240339405307942979}, [{"tweetId":"1","text":"hello"}]

Emitting: elasticsearch-bolt __ack_ack [-8010897758788654352 -6240339405307942979]

TRANSFERING tuple TASK: 2 TUPLE: source: elasticsearch-bolt:6, stream: __ack_ack, id: {}, [-8010897758788654352 -6240339405307942979]

BOLT ack TASK: 6 TIME:  TUPLE: source: tweets-data-kafka-spout:9, stream: default, id: {-8010897758788654352=-6240339405307942979}, [{"tweetId":"1","text":"hello"}]

Execute done TUPLE source: tweets-data-kafka-spout:9, stream: default, id: {-8010897758788654352=-6240339405307942979}, [{"tweetId":"1","text":"hello"}] TASK: 6 DELTA:

所以这些元组似乎已经被处理过了。但是,我在Elasticsearch中没有索引任何文档。

我想我在为EsBolt设置配置时做错了什么,可能是遗漏了一个配置或其他什么。

EN

回答 2

Stack Overflow用户

发布于 2016-09-10 02:51:21

只有在达到es.storm.bolt.flush.entries.size指定的刷新大小时,才会为文档编制索引

或者,您可以设置触发队列刷新的节拍频率。

代码语言:javascript
复制
config.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 5);

默认情况下,根据es.storm.bolt.tick.tuple.flush参数,es-hadoop在节拍时刷新。

票数 1
EN

Stack Overflow用户

发布于 2018-08-20 16:38:59

我也有同样的问题,但当我查找es-Hadoop文档时,我发现因为我没有设置触发队列的频率,所以我在我的存储拓扑(es.storm.bolt.flush.entries.size )中添加了一个配置,当我们设置fine.but的值时,它是Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS,它在Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS execute function中抛出了一个异常。然后我们使用调试模式来测试我的拓扑结构,我发现bolt execute中的输入元组不包含任何条目,但是这个空元组是被触发的。这就是我感到困惑的地方。不要根据设置时间发出元组,即使在我们设置了Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS.i think之后这个元组是空的,这是一个错误。enter image description here enter image description here

更多信息,请访问:https://www.elastic.co/guide/en/elasticsearch/hadoop/current/storm.html

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/36662213

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档