我目前有一个由Akka和Cassandra支持的事件源服务。这是一个名为AuctionService的竞价系统,在某种程度上,我需要检索最后一个名为BiddenOnLot的竞价事件。为此,我使用akka-persistence-query。
下面是我当前的代码:
// obtain read journal by plugin id
val readJournal = PersistenceQuery(context.system).readJournalFor[CassandraReadJournal](
"cassandra-query-journal")
// issue query to journal
val source: Source[EventEnvelope, NotUsed] = readJournal.eventsByPersistenceId(self.path.name.toString, 0, Long.MaxValue)
// materialize stream, consuming events
implicit val mat = ActorMaterializer()
source.runForeach(envelope ⇒ {
if (envelope.event.isInstanceOf[BiddenOnLot]) {
val biddenOnLot = envelope.event.asInstanceOf[BiddenOnLot]
if (biddenOnLot.paddleId == paddleId) {
// TODO: get last bid event by paddle id
}
}
})到目前为止,我遍历了所有事件,并可以确定其类型。但我真的很难隔离最后一个竞标事件,并能够异步使用它。有什么想法吗?
发布于 2017-05-05 03:43:23
您可以通过使用模式匹配来解构发出的EventEnvelope并将其映射到您想要的事件:
source.map {
case EventEnvelope(_, _, _, bidenOnLot: BidenOnLot) if bidenOnLot.paddleId == paddleId => bidenOnLot
}.runForeach(println)https://stackoverflow.com/questions/43788241
复制相似问题