首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Akka persistent actor - functional方法

Akka persistent actor - functional方法
EN

Stack Overflow用户
提问于 2019-11-25 05:08:42
回答 2查看 131关注 0票数 2

我一直在尝试用函数式方法实现persistent actor --我的意思是根本没有变量。但是我遇到了麻烦:)下面的代码示例不能正常工作,因为处理程序参数没有在处理程序之间共享(receiveCommand/receiveRecover). 都从零开始,然后相互覆盖-在重播一些事件后,命令处理程序仍将处于起始点。

此实现的另一个问题是在同一位置处理命令和事件

用函数式的方式做这件事是不是很好的实践呢?

代码语言:javascript
复制
class Item(sku: String) extends PersistentActor with ActorLogging {
    import Item._
    override def persistenceId: String = sku
    override def receiveCommand: Receive = handler(0, 0)
    override def receiveRecover: Receive = handler(0, 0)

    def handler(quantity: Int, booked: Int): Receive = {
      case Increase(q) =>
        val event = StockChanged(sku, q)
        persist(event)(e => context.become(handler(quantity + e.quantity, booked)))
      case Decrease(q) =>
        val event = StockChanged(sku, -q)
        persist(event)(e => context.become(handler(quantity + e.quantity, booked)))
      case StockChanged(_, q) => {
        context.become(handler(quantity + q, booked))
      }
    }
}
EN

回答 2

Stack Overflow用户

发布于 2019-11-25 21:43:24

我已经找到了答案。新的akka persistance 2.6仅以函数方式工作:)

https://doc.akka.io/docs/akka/current/typed/persistence.html#event-sourcing

票数 2
EN

Stack Overflow用户

发布于 2021-01-27 04:09:30

对于那些需要使用Classic Persistence的人来说,我能想到的最好的方法不需要整个actor范围的状态,但是您仍然需要一个本地var。然后,您可以将本地var封装在恢复处理程序的闭包中。

展开并清理问题中不完整的示例:

代码语言:javascript
复制
package item_example

import akka.actor.ActorLogging
import akka.persistence.PersistentActor

object ItemExample {

  case class Increase(quantity: Int)

  case class Decrease(quantity: Int)

  case class StockChanged(id: String, quantity: Int)

  class Item(sku: String) extends PersistentActor with ActorLogging {
    override def persistenceId: String = sku

    override def receiveCommand: Receive = listening()

    private def listening(quantity: Int = 0): Receive = {
      case Increase(q) =>
        persist(StockChanged(sku, q)) { e: StockChanged =>
          context.become(listening(quantity + e.quantity))
        }
      case Decrease(q) =>
        persist(StockChanged(sku, -q)) { e: StockChanged =>
          context.become(listening(quantity + e.quantity))
        }
    }

    override def receiveRecover: Receive = recovering()

    private def recovering(): Receive = {
      /*
      This is the local var that will be part of the recovery handler's closure.
      The semicolon is needed only so that the below code within parentheses is interpreted as
      a partial function and not a code block parameter for the integer 0.
      You can also just return a new PartialFunction[Any, Unit] but I find this to be nicer.
       */
      var totalQuantity: Int = 0;
      {
        case StockChanged(_, quantity) =>
          totalQuantity += quantity
          context.become(listening(totalQuantity))
      }
    }
  }
}

恢复处理程序将只覆盖每个恢复的事件之后的状态。这没问题,因为recovery is guaranteed to finish before the actor starts consuming events from the mailbox

上面的方法之所以有效,是因为在恢复处理程序中调用context::become不会修改恢复处理程序!它只修改邮箱处理程序。当我们遍历重放的事件时,我们只需在每个事件之后替换邮箱处理程序。对于所有重放的事件,将原封不动地调用恢复处理程序。

附注:重要的是要区分发送到参与者的命令和从持久化存储中重放的事件。问题定义了两个命令:IncreaseDecrease。它还定义了一个事件:StockChanged。但是,receiveCommand方法可以处理所有这三种情况。这是一种代码气味。receiveCommand应该处理命令,而receiveRecover应该处理事件。我已经在上面的解决方案中解决了这个问题。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/59022422

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档