我使用Spark处理复杂的工作流(解析、清理、机器学习.)。在工作流的末尾,我希望将聚合的结果发送到elasticsearch,这样我的门户就可以查询数据。将有两种类型的处理:流处理和对所有可用数据重新启动工作流的可能性。
现在,我使用elasticsearch-hadoop,特别是使用saveJsonToEs(myindex/mytype)方法将文档发送到elasticsearch。目标是使用我们构建的适当模板在一天内建立一个索引。AFAIK您不能在文档中添加对特性的考虑,以便将其发送到elasticsearch-hadoop中的适当索引。
实现此功能的正确方法是什么?有一个特殊的步骤,使用火花和散装,以便每个执行者发送文件到适当的索引考虑到每一行的特点?我在elasticsearch-hadoop有什么遗漏吗?
我尝试使用saveJsonToEs(" _bulk ")将JSON发送到_bulk,但模式必须遵循索引/类型
发布于 2015-01-19 16:07:18
多亏了Costin Leau,我找到了解决办法。只需使用像saveJsonToEs这样的动态索引(“my-index-{date}/my-type”)。“日期”必须是必须发送的文档中的一个功能。
关于elasticsearch:https://groups.google.com/forum/#!topic/elasticsearch/5-LwjQxVlhk的讨论
文档:http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/master/spark.html#spark-write-dyn
发布于 2016-09-15 06:59:55
可以使用:(“自定义索引-{date}/customtype”)创建动态索引。这可能是给定rdd中的任何字段。
如果您想格式化日期:("custom-index-{date:{YYYY.mm.dd}}/customtype")在评论中回答了Amit_Hora的问题,因为我没有足够的权限发表评论,我在这里添加以下内容
https://stackoverflow.com/questions/27967323
复制相似问题