首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >PersistenceQuery和Akka-Http僵尸流

PersistenceQuery和Akka-Http僵尸流
EN

Stack Overflow用户
提问于 2018-06-18 17:04:22
回答 1查看 38关注 0票数 0

我正在尝试使用akka-http作为SSE来流式传输PersistenceQuery结果,但似乎当http连接从客户端关闭时,PersistenceQuery流仍然周期性地命中事件后端。

代码语言:javascript
复制
// Http part
complete {
   source(id)
        .map(e => e.event) // other transformations
        .map(e => ServerSentEvent(m.toString))
        .keepAlive(4 seconds, () => ServerSentEvent.heartbeat)
}

// source
def source(id: UUID)(implicit system: ActorSystem, materializer: ActorMaterializer)= {
    import system.dispatcher

    val journalQuery = PersistenceQuery(system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)

    val futureSrcGraph:  RunnableGraph[Future[Source[EventEnvelope, NotUsed]]] =
      journalQuery.currentEventsByPersistenceId(id.toString, 0, Long.MaxValue)
        .map(_.sequenceNr)
        .toMat(Sink.last)(Keep.right)
        .mapMaterializedValue(fs => fs.recoverWith {
          case _ => Future { 0L } // assume we start at 1
        }.map(s => journalQuery.eventsByPersistenceId(id.toString, s + 1, Long.MaxValue)))

    Source.fromFutureSource(futureSrcGraph.run())

因此,这基本上是有效的,唯一的问题是流永远不会结束,至少看起来是这样的。我在CassandraReadJournal和LevelDb上都试过

日志输出示例:

代码语言:javascript
复制
[DEBUG] [06/18/2018 10:52:16.774] [sys-cassandra-plugin-default-dispatcher-17] [EventsByPersistenceIdStage(akka://sys)] EventsByPersistenceId [c6031a8a-db71-4dcb-9d4f-f140faa2f4c4] Query from seqNr [6] in partition [0]
[DEBUG] [06/18/2018 10:52:16.790] [sys-cassandra-plugin-default-dispatcher-17] [EventsByPersistenceIdStage(akka://sys)] EventsByPersistenceId [c6031a8a-db71-4dcb-9d4f-f140faa2f4c4] Query took [15] ms (empty)
[DEBUG] [06/18/2018 10:52:16.790] [sys-cassandra-plugin-default-dispatcher-17] [EventsByPersistenceIdStage(akka://sys)] EventsByPersistenceId [c6031a8a-db71-4dcb-9d4f-f140faa2f4c4] Query from seqNr [6] in partition [1]
[DEBUG] [06/18/2018 10:52:16.796] [sys-cassandra-plugin-default-dispatcher-17] [EventsByPersistenceIdStage(akka://sys)] EventsByPersistenceId [c6031a8a-db71-4dcb-9d4f-f140faa2f4c4] Query took [5] ms (empty)
[DEBUG] [06/18/2018 10:52:19.768] [sys-cassandra-plugin-default-dispatcher-17] [EventsByPersistenceIdStage(akka://sys)] EventsByPersistenceId [c6031a8a-db71-4dcb-9d4f-f140faa2f4c4] Query from seqNr [6] in partition [0]
[DEBUG] [06/18/2018 10:52:19.784] [sys-cassandra-plugin-default-dispatcher-17] [EventsByPersistenceIdStage(akka://sys)] EventsByPersistenceId [c6031a8a-db71-4dcb-9d4f-f140faa2f4c4] Query took [15] ms (empty)
[DEBUG] [06/18/2018 10:52:19.784] [sys-cassandra-plugin-default-dispatcher-17] [EventsByPersistenceIdStage(akka://sys)] EventsByPersistenceId [c6031a8a-db71-4dcb-9d4f-f140faa2f4c4] Query from seqNr [6] in partition [1]
[DEBUG] [06/18/2018 10:52:19.790] [sys-cassandra-plugin-default-dispatcher-17] [EventsByPersistenceIdStage(akka://sys)] EventsByPersistenceId [c6031a8a-db71-4dcb-9d4f-f140faa2f4c4] Query took [6] ms (empty)
[DEBUG] [06/18/2018 10:52:22.765] [sys-cassandra-plugin-default-dispatcher-17] [EventsByPersistenceIdStage(akka://sys)] EventsByPersistenceId [c6031a8a-db71-4dcb-9d4f-f140faa2f4c4] Query from seqNr [6] in partition [0]
[DEBUG] [06/18/2018 10:52:22.772] [sys-cassandra-plugin-default-dispatcher-17] [EventsByPersistenceIdStage(akka://sys)] EventsByPersistenceId [c6031a8a-db71-4dcb-9d4f-f140faa2f4c4] Query took [6] ms (empty)
[DEBUG] [06/18/2018 10:52:22.772] [sys-cassandra-plugin-default-dispatcher-17] [EventsByPersistenceIdStage(akka://sys)] EventsByPersistenceId [c6031a8a-db71-4dcb-9d4f-f140faa2f4c4] Query from seqNr [6] in partition [1]
[DEBUG] [06/18/2018 10:52:22.790] [sys-cassandra-plugin-default-dispatcher-17] [EventsByPersistenceIdStage(akka://sys)] EventsByPersistenceId [c6031a8a-db71-4dcb-9d4f-f140faa2f4c4] Query took [17] ms (empty)

而且它会一直持续下去。

我还尝试省略Source.fromFutureSource,只运行journalQuery.eventsByPersistenceId,结果相同。

我做错了什么?

EN

回答 1

Stack Overflow用户

发布于 2018-06-18 20:25:34

这就是我的公司代理永远不会断开到服务器的连接,即使客户端关闭连接。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/50905978

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档