我正在尝试用PersistentActor实现Saga (进程管理器),它更新多个实体/聚合根以在没有ACID事务的情况下实现(最终)一致性。
比方说,在流程的中间,流程管理器收到消息并开始处理它,jvm崩溃了。
或者应用程序正在这一时刻重新部署。
在jvm (以及执行元系统)重新启动后,进程管理器的邮箱是否会恢复?
发布于 2018-01-13 23:26:40
在JVM崩溃的情况下,邮箱不会恢复。发送者有责任确保其消息被接收/持久化。在持久化参与者之间,使用At-Least-Once Delivery。
发布于 2018-01-14 16:57:10
既然您已经在使用PersistentActor,那么利用它的AtLeastOnceDelivery功能来满足您的需求是合理的。AtLeastOnceDelivery本质上确保持久发送者参与者将以可配置的频率持续向接收者发送消息,直到它收到来自接收者(或来自用于取消正在进行的传递尝试的某个终止例程)的确认为止。
与一般的PersistentActor (有或没有AtLeastOnceDelivery)一样,如果JVM崩溃,持久化的消息将被重新发送。对于AtLeastOnceDelivery,持久化消息将在系统恢复时重新发送,直到收到确认为止。Akka提供了方法deliver来发送用连续单调递增的deliveryId标记的每个消息,并且在从接收方接收到具有相应deliveryId的确认时,使用方法confirmDelivery来发信号通知消息的成功传递。
下面的代码片段是来自相关Akka doc的示例代码的一部分,它突出显示了发送者参与者类中的关键AtLeastOnceDelivery逻辑(哪个extends PersistentActor with AtLeastOnceDelivery)。请注意,相同的持久化处理程序updateState用于持久化deliver和confirmDelivery对应的事件,以实现一致的状态恢复:
override def receiveCommand: Receive = {
case s: String =>
persist(MsgSent(s))(updateState)
case Confirm(deliveryId) =>
persist(MsgConfirmed(deliveryId))(updateState)
}
override def receiveRecover: Receive = {
case evt: Evt => updateState(evt)
}
def updateState(evt: Evt): Unit = evt match {
case MsgSent(s) =>
deliver(destination)(deliveryId => Msg(deliveryId, s))
case MsgConfirmed(deliveryId) =>
confirmDelivery(deliveryId)
}https://stackoverflow.com/questions/48232955
复制相似问题