我一直在尝试用函数式方法实现persistent actor --我的意思是根本没有变量。但是我遇到了麻烦:)下面的代码示例不能正常工作,因为处理程序参数没有在处理程序之间共享(receiveCommand/receiveRecover). 都从零开始,然后相互覆盖-在重播一些事件后,命令处理程序仍将处于起始点。
此实现的另一个问题是在同一位置处理命令和事件
用函数式的方式做这件事是不是很好的实践呢?
class Item(sku: String) extends PersistentActor with ActorLogging {
import Item._
override def persistenceId: String = sku
override def receiveCommand: Receive = handler(0, 0)
override def receiveRecover: Receive = handler(0, 0)
def handler(quantity: Int, booked: Int): Receive = {
case Increase(q) =>
val event = StockChanged(sku, q)
persist(event)(e => context.become(handler(quantity + e.quantity, booked)))
case Decrease(q) =>
val event = StockChanged(sku, -q)
persist(event)(e => context.become(handler(quantity + e.quantity, booked)))
case StockChanged(_, q) => {
context.become(handler(quantity + q, booked))
}
}
}发布于 2019-11-25 21:43:24
我已经找到了答案。新的akka persistance 2.6仅以函数方式工作:)
https://doc.akka.io/docs/akka/current/typed/persistence.html#event-sourcing
发布于 2021-01-27 04:09:30
对于那些需要使用Classic Persistence的人来说,我能想到的最好的方法不需要整个actor范围的状态,但是您仍然需要一个本地var。然后,您可以将本地var封装在恢复处理程序的闭包中。
展开并清理问题中不完整的示例:
package item_example
import akka.actor.ActorLogging
import akka.persistence.PersistentActor
object ItemExample {
case class Increase(quantity: Int)
case class Decrease(quantity: Int)
case class StockChanged(id: String, quantity: Int)
class Item(sku: String) extends PersistentActor with ActorLogging {
override def persistenceId: String = sku
override def receiveCommand: Receive = listening()
private def listening(quantity: Int = 0): Receive = {
case Increase(q) =>
persist(StockChanged(sku, q)) { e: StockChanged =>
context.become(listening(quantity + e.quantity))
}
case Decrease(q) =>
persist(StockChanged(sku, -q)) { e: StockChanged =>
context.become(listening(quantity + e.quantity))
}
}
override def receiveRecover: Receive = recovering()
private def recovering(): Receive = {
/*
This is the local var that will be part of the recovery handler's closure.
The semicolon is needed only so that the below code within parentheses is interpreted as
a partial function and not a code block parameter for the integer 0.
You can also just return a new PartialFunction[Any, Unit] but I find this to be nicer.
*/
var totalQuantity: Int = 0;
{
case StockChanged(_, quantity) =>
totalQuantity += quantity
context.become(listening(totalQuantity))
}
}
}
}恢复处理程序将只覆盖每个恢复的事件之后的状态。这没问题,因为recovery is guaranteed to finish before the actor starts consuming events from the mailbox。
上面的方法之所以有效,是因为在恢复处理程序中调用context::become不会修改恢复处理程序!它只修改邮箱处理程序。当我们遍历重放的事件时,我们只需在每个事件之后替换邮箱处理程序。对于所有重放的事件,将原封不动地调用恢复处理程序。
附注:重要的是要区分发送到参与者的命令和从持久化存储中重放的事件。问题定义了两个命令:Increase和Decrease。它还定义了一个事件:StockChanged。但是,receiveCommand方法可以处理所有这三种情况。这是一种代码气味。receiveCommand应该处理命令,而receiveRecover应该处理事件。我已经在上面的解决方案中解决了这个问题。
https://stackoverflow.com/questions/59022422
复制相似问题