我正在尝试配置一个以下列方式工作的Saga:
但我面临两个问题:
有关守则:
saga总线配置
Bus = Configure.With(Activator)
.Transport(t => t.UseRabbitMq(rabbitMqConnectionString, inputQueueName))
.Logging(l => l.ColoredConsole())
.Routing(r => r.TypeBased().MapAssemblyOf<IEventContract(publisherQueue))
.Sagas(s => {
s.StoreInSqlServer(connectionString, "Sagas", "SagaIndex");
if (enforceExclusiveAccess)
{
s.EnforceExclusiveAccess();
}
})
.Options(o =>
{
if (maxDegreeOfParallelism > 0)
{
o.SetMaxParallelism(maxDegreeOfParallelism);
}
if (maxNumberOfWorkers > 0)
{
o.SetNumberOfWorkers(maxNumberOfWorkers);
}
})
.Timeouts(t => { t.StoreInSqlServer(dcMessengerConnectionString, "Timeouts"); })
.Start();SagaData类:
public class RouteListSagaData : ISagaData
{
public Guid Id { get; set; }
public int Revision { get; set; }
private readonly IList<LisaShippingActivity> _shippingActivities = new List<LisaShippingActivity>();
public long RoutePlanId { get; set; }
public IEnumerable<LisaShippingActivity> ShippingActivities => _shippingActivities;
public bool SentToLisa { get; set; }
public void AddShippingActivity(LisaShippingActivity shippingActivity)
{
if (!_shippingActivities.Any(x => x.Equals(shippingActivity)))
{
_shippingActivities.Add(shippingActivity);
}
}
public IEnumerable<LisaShippingActivity> GroupShippingActivitiesToLisaActivities() => LisaShippingActivity.GroupedByRouteIdAndAddress(ShippingActivities);
}CorrelateMessages法
protected override void CorrelateMessages(ICorrelationConfig<RouteListSagaData> config)
{
config.Correlate<ShippingOrder>(x => x.RoutePlanId, y => y.RoutePlanId);
config.Correlate<VerifyRouteListIsComplete>(x => x.RoutePlanId, y => y.RoutePlanId);
}消息的句柄,以便启动佐贺并发送DefferedMessage if Saga IsNew
public async Task Handle(ShippingOrder message)
{
try
{
var lisaActivity = message.AsLisaShippingActivity(_commissionerUserName);
if (Data.ShippingActivities.Contains(lisaActivity))
return;
Data.RoutePlanId = message.RoutePlanId;
Data.AddShippingActivity(lisaActivity);
var delay = TimeSpan.FromSeconds(_lisaDelayedMessageTime != 0 ? _lisaDelayedMessageTime : 60);
if (IsNew)
{
await _serviceBus.DeferLocal(delay, new VerifyRouteListIsComplete(message.RoutePlanId), _environment);
}
}
catch (Exception err)
{
Serilog.Log.Logger.Error(err, "[{SagaName}] - Error while executing Route List Saga", nameof(RouteListSaga));
throw;
}
}最后,被撤销消息的处理程序:
public Task Handle(VerifyRouteListIsComplete message)
{
try
{
if (!Data.SentToLisa)
{
var lisaData = Data.GroupShippingActivitiesToLisaActivities();
_lisaService.SyncRouteList(lisaData).Wait();
Data.SentToLisa = true;
}
MarkAsComplete();
return Task.CompletedTask;
}
catch (Exception err)
{
Serilog.Log.Error(err, "[{SagaName}] - Error sending message to LisaApp. RouteId: {RouteId}", nameof(RouteListSaga), message.RoutePlanId);
_serviceBus.DeferLocal(TimeSpan.FromSeconds(5), message, _configuration.GetSection("AppSettings")["Environment"]).Wait();
MarkAsUnchanged();
return Task.CompletedTask;
}
}任何帮助都是非常感谢的!
发布于 2018-05-21 14:45:13
我不确定我是否理解这些症状,你正在经历,正确的。
如果我“同时”向第一个处理程序发送两条消息,则每条消息都会出现,即使有与该消息相关的属性,IsNew属性在第一条消息处理后也不会更改。
如果调用了EnforceExclusiveAccess,我希望以串行方式处理消息,第一条消息使用IsNew == true,第二条消息使用IsNew == false。
如果不是,我预计这两条消息都将与IsNew == true并行处理,但是当sage数据被插入时,我希望其中一条消息成功,而另一条消息在ConcurrencyException中失败。
在ConcurrencyException之后,将再次处理消息,这一次是使用IsNew == false。
这不是你正在经历的吗?
在第二个处理程序中,我希望访问与那些Saga相关的所有数据,但我不能访问,因为数据似乎是数据,就像那些消息的修订被推迟一样。
您是说佐贺数据中的数据似乎处于延迟VerifyRouteListIsComplete消息时的状态吗?
这听起来很奇怪,而且也不太可能,你可能会再试一次,看看是否真的是这样?
更新:我已经发现了为什么您会遇到这种奇怪的行为:您不小心设置了您的saga处理程序实例,以便跨消息重用。
您是这样注册的(警告:不要这样做!):
_sagaHandler = new ShippingOrderSagaHandler(_subscriber);
_subscriber.Subscribe<ShippingOrderMessage>(_sagaHandler);
_subscriber.Subscribe<VerifyRoutePlanIsComplete>(_sagaHandler);然后,Subscribe方法对BuiltinHandlerActivator进行此调用(警告:不要这样做!):
activator.Register(() => handlerInstance);这是不好的(特别是对于saga处理程序),因为处理程序实例本身是有状态的-它有一个包含进程当前状态的Data属性,并且还包括IsNew属性。
您应该始终做的是,确保每次消息传入时都会创建一个新的处理程序实例--您的代码应该更改为如下所示:
_subscriber.Subscribe<ShippingOrderMessage>(() => new ShippingOrderSagaHandler(_subscriber)).Wait();
_subscriber.Subscribe<VerifyRoutePlanIsComplete>(() => new ShippingOrderSagaHandler(_subscriber)).Wait();如果将Subscribe的实现更改为以下内容,则可以执行以下操作:
public async Task Subscribe<T>(Func<IHandleMessages<T>> getHandler)
{
_activator.Register((bus, context) => getHandler());
await _activator.Bus.Subscribe<T>();
}这将解决您的独占访问问题:)
代码还有另一个问题:在注册处理程序和启动订阅服务器总线实例之间存在一个潜在的争用条件,因为理论上您可能会很不幸,在总线启动和注册处理程序之间开始接收消息。
您应该更改代码,以确保在启动总线之前注册所有处理程序(从而开始接收消息)。
https://stackoverflow.com/questions/50449034
复制相似问题