在事件源/CQRS框架中使用Commanded.ProcessManagers.ProcessManager模块实现saga模式时,我遇到了一个问题。
在发票的上下文中,我需要实现发票的大量创建机制。这种大规模的创造是作为一个集合和一个传奇来实现的。聚合允许开始和完成大规模的创造。通过发出创建发票的命令并将它们的in保存在佐贺状态,佐贺对“大规模创建开始”事件做出了反应。之后,佐贺通过监听发票实例的成功或失败事件来跟踪发票创建的状态。一旦每个发票实例都报告了成功或失败,佐贺就应该发出命令,停止大规模的创造。
为此,跟踪每个发票实例及其当前状态将很有帮助:in progress、created或failed。我尝试在apply回调中实现这一点,原则上这很好。
现在的问题是,apply回调总是在handle回调之后调用。因此,在佐贺应该作出反应之后更新佐贺状态。这似乎有违直觉,正因为如此,handle回调中可用的状态无法正确地作出反应。
在我看来,传奇模式在很多方面都是聚合模式的倒置。虽然首先将命令处理为域事件,然后在聚合的情况下将此域事件应用到状态中是有用的,但我认为,如果发生了某个事件,域事件(它是已经发生的事情的文档)应该应用于状态,然后尝试对其作出反应。
现在,我的问题是:是否有一种方法将命令配置为第一个apply,然后配置为Commanded.ProcessManagers.ProcessManager模块的handle?或者,这实际上是一个bug,一般需要修复吗?
发布于 2018-06-21 11:54:26
在apply/2经过设计之后调用handle/2回调,不可能将命令配置为不同的行为。
我同意您的推理,即在尝试处理事件以生成任何命令之前,将事件应用于流程管理器的状态是更有意义的。这似乎是一个值得修改的命令,可以通过您已经提出的问题(#176)来跟踪。
同时,您可以按以下方式实现流程管理器(saga):
defmodule InvoicingProcessManager do
use Commanded.ProcessManagers.ProcessManager,
name: __MODULE__,
router: InvoicingRouter
defstruct [
:batch_uuid,
pending_invoice_ids: MapSet.new()
]
def interested?(%InvoiceBatchStarted{batch_uuid: batch_uuid}), do: {:start, batch_uuid}
def interested?(%InvoiceCreated{batch_uuid: batch_uuid}), do: {:continue, batch_uuid}
def interested?(%InvoiceFailed{batch_uuid: batch_uuid}), do: {:continue, batch_uuid}
def interested?(%InvoiceBatchStopped{batch_uuid: batch_uuid}), do: {:stop, batch_uuid}
def interested?(_event), do: false
# Event handlers
def handle(%InvoicingSaga{}, %InvoiceBatchStarted{} = started) do
%InvoiceBatchStarted{batch_uuid: batch_uuid, invoice_ids: invoice_ids} = started
Enum.map(invoice_ids, fn invoice_id ->
%CreateInvoice{
invoice_id: invoice_id,
batch_uuid: batch_uuid
}
end)
end
def handle(%InvoicingSaga{}, %InvoiceCreated{invoice_id: invoice_id}),
do: attempt_stop_batch(pm, invoice_id)
def handle(%InvoicingSaga{}, %InvoiceFailed{invoice_id: invoice_id}),
do: attempt_stop_batch(pm, invoice_id)
## State mutators
def apply(%InvoicingSaga{} = pm, %InvoiceBatchStarted{} = started) do
%InvoiceBatchStarted{batch_uuid: batch_uuid, invoice_ids: invoice_ids} = started
%InvoicingSaga{
transfer
| batch_uuid: batch_uuid,
pending_invoice_ids: MapSet.new(invoice_ids)
}
end
def apply(%InvoicingSaga{} = pm, %InvoiceCreated{invoice_id: invoice_id}) do
%InvoicingSaga{pm | pending_invoice_ids: invoice_completed(pm, invoice_id)}
end
def apply(%InvoicingSaga{} = pm, %InvoiceFailed{invoice_id: invoice_id}) do
%InvoicingSaga{pm | pending_invoice_ids: invoice_completed(pm, invoice_id)}
end
## Private helpers
def attempt_stop_batch(%InvoicingSaga{batch_uuid: batch_uuid} = pm, invoice_id) do
pending_invoices = invoice_completed(pm, invoice_id)
case empty?(pending_invoices) do
true -> %StopInvoiceBatch{batch_uuid: batch_uuid}
false -> []
end
end
defp invoice_completed(%InvoicingSaga{pending_invoice_ids: pending_invoice_ids}, invoice_id) do
MapSet.delete(pending_invoice_ids, invoice_id)
end
defp empty?(map_set, empty \\ MapSet.new())
defp empty?(%MapSet{} = empty, %MapSet{} = empty), do: true
defp empty?(%MapSet{}, %MapSet{}), do: false
endhttps://stackoverflow.com/questions/50951135
复制相似问题