首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用Alpakka在弹性中插入文档

使用Alpakka在弹性中插入文档
EN

Stack Overflow用户
提问于 2022-10-23 17:01:51
回答 1查看 38关注 0票数 0

我正在努力学习如何使用Alpakka,并设置了一个测试,以编写一个文档给弹性。通过阅读文档,包括https://doc.akka.io/docs/alpakka/current/elasticsearch.html,编写了以下内容:

代码语言:javascript
复制
import akka.actor.ActorSystem
import akka.stream.alpakka.elasticsearch.scaladsl.ElasticsearchSink
import akka.stream.alpakka.elasticsearch._
import akka.stream.scaladsl.Source
import spray.json.DefaultJsonProtocol._
import spray.json.{JsonFormat, _}

object AlpakkaWrite extends App{

  case class VolResult(symbol : String, vol : Double, timestamp : Long)

  implicit val actorSystem = ActorSystem()

  val connectionString = "****";
  val userName = "****"
  val password = "****"

  def constructElasticsearchParams(indexName: String, typeName: String, apiVersion: ApiVersion) =
    if (apiVersion eq ApiVersion.V5)
      ElasticsearchParams.V5(indexName, typeName)
    else if (apiVersion eq ApiVersion.V7)
      ElasticsearchParams.V7(indexName)
    else
      throw new IllegalArgumentException("API version " + apiVersion + " is not supported")

  val connectionSettings = ElasticsearchConnectionSettings
    .create(connectionString).withCredentials(userName, password)

  val sinkSettings =
   ElasticsearchWriteSettings.create(connectionSettings).withApiVersion(ApiVersion.V7);

  implicit val formatVersionTestDoc: JsonFormat[VolResult] = jsonFormat3(VolResult)

  Source(List(VolResult("test" , 1 , System.currentTimeMillis())))
    .map { message: VolResult =>
      WriteMessage.createIndexMessage("00002", message )
    }
    .log(("Error"))
   .runWith(
     ElasticsearchSink.create[VolResult](
        constructElasticsearchParams("ccy_vol_normalized", "_doc", ApiVersion.V7),
        settings = sinkSettings
      )
    )

}

产出:

代码语言:javascript
复制
19:15:51.815 [default-akka.actor.default-dispatcher-5] INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started
19:15:52.547 [default-akka.actor.default-dispatcher-5] ERROR akka.stream.alpakka.elasticsearch.impl.ElasticsearchSimpleFlowStage$StageLogic - Received error from elastic after having already processed 0 documents. Error: java.lang.RuntimeException: Request failed for POST /_bulk

我是否正确地定义了case类DataPayload?它是否匹配在索引映射中定义的预期有效载荷?

代码语言:javascript
复制
"properties": {
"timestamp": { "type": "date",
"format": "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"
},
"vol": { "type": "float" },
"symbol": { "type": "text" }
}

使用Elastic工具,以下命令将成功插入文档:

代码语言:javascript
复制
POST ccy_vol_normalized/_doc/
{
"timestamp": "2022-10-21T00:00:00.000Z",
"vol": 1.221,
"symbol" : "SYM"
}
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-10-24 21:53:23

这样做是可行的:

代码语言:javascript
复制
import akka.actor.ActorSystem
import akka.stream.alpakka.elasticsearch._
import akka.stream.alpakka.elasticsearch.scaladsl.ElasticsearchSink
import akka.stream.scaladsl.Source
import spray.json.DefaultJsonProtocol._
import spray.json.JsonFormat

import java.text.SimpleDateFormat
import java.util.Date

object AlpakkaWrite extends App {
  val connectionString = "";

  implicit val actorSystem = ActorSystem()
  val userName = ""
  val password = ""
  val connectionSettings = ElasticsearchConnectionSettings
    .create(connectionString).withCredentials(userName, password)
  val sinkSettings =
    ElasticsearchWriteSettings.create(connectionSettings).withApiVersion(ApiVersion.V7);

  val HOUR = 1000 * 60 * 60
  val utcDate = new Date(System.currentTimeMillis() - HOUR)
  val ts = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS").format(utcDate) + "Z"

  implicit val formatVersionTestDoc: JsonFormat[VolResult] = jsonFormat3(VolResult)

  def constructElasticsearchParams(indexName: String, typeName: String, apiVersion: ApiVersion) =
    if (apiVersion eq ApiVersion.V5)
      ElasticsearchParams.V5(indexName, typeName)
    else if (apiVersion eq ApiVersion.V7)
      ElasticsearchParams.V7(indexName)
    else
      throw new IllegalArgumentException("API version " + apiVersion + " is not supported")

  case class VolResult(symbol: String, vol: Double, timestamp: String)
  println("ts : " + ts)

  Source(List(VolResult("test1", 1, ts)))
    .map { message: VolResult =>
      WriteMessage.createIndexMessage(System.currentTimeMillis().toString, message)
    }
    .log(("Error"))
    .runWith(
      ElasticsearchSink.create[VolResult](
        constructElasticsearchParams("ccy_vol_normalized", "_doc", ApiVersion.V7),
        settings = sinkSettings
      )
    )

}

我的日期格式不正确,使用:

代码语言:javascript
复制
  val HOUR = 1000 * 60 * 60
  val utcDate = new Date(System.currentTimeMillis() - HOUR)
  val ts = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS").format(utcDate) + "Z"

解决了这个问题。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/74173097

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档