我试图使用alpakka从elasticsearch读取文档(存储在ES中的json数据)。我得到了这个alpakka-Elasticsearch。这里说你可以使用ElasticsearchSource, ElasticsearchFlow or the ElasticsearchSink从Elasticsearch流传送消息或将消息流到Elasticsearch。我试图实现ElasticsearchSource方法。所以我的代码看起来像这样
val url = "http://localhost:9200"
val connectionSettings = ElasticsearchConnectionSettings(url)
val sourceSettings = ElasticsearchSourceSettings(connectionSettings)
val elasticsearchParamsV7 = ElasticsearchParams.V7("category_index")
val copy = ElasticsearchSource
.typed[CategoryData](
elasticsearchParamsV7,
query = query,
sourceSettings
).map { message: ReadResult[CategoryData] =>
println("Inside message==================> "+message)
WriteMessage.createIndexMessage(message.id, message.source)
} .runWith(
ElasticsearchSink.create[CategoryData](
elasticsearchParamsV7,ElasticsearchWriteSettings(connectionSettings)
)
)
println("Final data==============>. "+copy)最后,复制返回Future[Done]的值。但我无法从ES中读取数据。
我是不是漏掉了什么?
还有,有没有其他方式使用akka http客户端api来做同样的事情呢?
在akka中使用ES的首选方式是什么?
发布于 2021-07-20 19:34:24
要从Elasticsearch读取数据,这样的内容应该足够了:
val matchAllQuery = """{"match_all": {}}"""
val result = ElasticsearchSource
.typed[CategoryData](
elasticsearchParamsV7,
query = matchAllQuery,
sourceSettings
).map { message: ReadResult[CategoryData] =>
println("Read message==================> "+message)
}.runWith(Sink.seq)
result.onComplete(res => res.foreach(col => println(s"Read: ${col.size} records")))如果类型CategoryData与索引中存储的内容不正确匹配,则查询可能不会返回结果。
如果有疑问,可以读取原始JSON:
val elasticsearchSourceRaw = ElasticsearchSource
.create(
elasticsearchParamsV7,
query = matchAllQuery,
settings = sourceSettings
)https://stackoverflow.com/questions/68220653
复制相似问题