我目前正在用akka和它的持久化堆栈做一些实验,用akka-http堆栈包装。
注意:为了持久性,我使用非官方插件将Akka FSM持久化到mongodb。
但我的问题是使用JsonEntityStreamingSupport,recommended by akka to serve Source as json。
implicit val jsonEntityStreamingSupport: JsonEntityStreamingSupport = EntityStreamingSupport.json()
val readJournal = PersistenceQuery(system).readJournalFor[ScalaDslMongoReadJournal](MongoReadJournal.Identifier)
val route =
path("workflows") {
get {
complete(readJournal.currentPersistenceIds())
}
}
Http().bindAndHandle(route, "localhost", 8081)但不幸的是,我带来了这个错误:
$ curl localhost:8081/workflows
curl: (56) Recv failure: Connection reset by peer我没有看到任何错误或日志,这些错误或日志可能会导致服务器关闭连接的原因。
有人已经做过这样的实验了吗?
发布于 2017-03-28 20:48:37
好了,我想通了。
readJournal.currentPersistenceIds()给了我一个Source[String, NotUsed]。
但是,正如在akka-http specs中指定的那样,
这是错误的,因为我们试图呈现JSON,但是String不是一个有效的顶级元素,如果我们真的想呈现一个字符串列表,我们需要提供一个显式的MarshallerString,ByteString。
所以我必须为它提供一个Marshaller。例如,通过这些相同的测试:
implicit val stringFormat = Marshaller[String, ByteString] { ec ⇒ s ⇒
Future.successful {
List(Marshalling.WithFixedContentType(ContentTypes.`application/json`, () ⇒
ByteString("\"" + s + "\"")) // "raw string" to be rendered as json element in our stream must be enclosed by ""
)
}
}https://stackoverflow.com/questions/43069671
复制相似问题