首页
学习
活动
专区
圈层
工具
发布

Rebus,
EN

Stack Overflow用户
提问于 2018-05-21 12:33:56
回答 1查看 1.1K关注 0票数 4

我正在尝试配置一个以下列方式工作的Saga:

  1. Saga接收到一条发货订单消息。该发货订单有一个RouteId属性,我可以使用它来关联同一辆“卡车”的发货订单。
  2. 这些发货订单是由另一个系统创建的,该系统可以使用批处理来发送此订单。但是,这个系统不能对同一地址的发货订单进行分组。
  3. 几秒钟后,我只用这个RouteId发送了另一条消息。我需要抓取接收到的RouteId的所有发货订单,按地址对它们进行分组,并将它们转换为另一个对象并发送到另一个web服务。

但我面临两个问题:

  1. 如果我“同时”向第一个处理程序发送两条消息,则每条消息都会出现,即使有与该消息相关的属性,IsNew属性在第一条消息处理后也不会更改。
  2. 在第二个处理程序中,我希望访问与那些Saga相关的所有数据,但我不能访问,因为数据似乎是数据,就像那些消息的修订被推迟一样。

有关守则:

saga总线配置

代码语言:javascript
复制
Bus = Configure.With(Activator)
   .Transport(t => t.UseRabbitMq(rabbitMqConnectionString, inputQueueName))
   .Logging(l => l.ColoredConsole())
   .Routing(r => r.TypeBased().MapAssemblyOf<IEventContract(publisherQueue))
   .Sagas(s => {
       s.StoreInSqlServer(connectionString, "Sagas", "SagaIndex");
          if (enforceExclusiveAccess)
          {
              s.EnforceExclusiveAccess();
          }
       })
   .Options(o =>
       {
         if (maxDegreeOfParallelism > 0)
         {
            o.SetMaxParallelism(maxDegreeOfParallelism);
         }
         if (maxNumberOfWorkers > 0)
         {
            o.SetNumberOfWorkers(maxNumberOfWorkers);
         }
      })
   .Timeouts(t => { t.StoreInSqlServer(dcMessengerConnectionString, "Timeouts"); })
   .Start();

SagaData类:

代码语言:javascript
复制
public class RouteListSagaData : ISagaData
{
    public Guid Id { get; set; }
    public int Revision { get; set; }

    private readonly IList<LisaShippingActivity> _shippingActivities = new List<LisaShippingActivity>();

    public long RoutePlanId { get; set; }

    public IEnumerable<LisaShippingActivity> ShippingActivities => _shippingActivities;
    public bool SentToLisa { get; set; }

    public void AddShippingActivity(LisaShippingActivity shippingActivity)
    {
        if (!_shippingActivities.Any(x => x.Equals(shippingActivity)))
        {
            _shippingActivities.Add(shippingActivity);
        }
    }

    public IEnumerable<LisaShippingActivity> GroupShippingActivitiesToLisaActivities() => LisaShippingActivity.GroupedByRouteIdAndAddress(ShippingActivities);
}

CorrelateMessages法

代码语言:javascript
复制
protected override void CorrelateMessages(ICorrelationConfig<RouteListSagaData> config)
{
    config.Correlate<ShippingOrder>(x => x.RoutePlanId, y => y.RoutePlanId);
    config.Correlate<VerifyRouteListIsComplete>(x => x.RoutePlanId, y => y.RoutePlanId);
}

消息的句柄,以便启动佐贺并发送DefferedMessage if Saga IsNew

代码语言:javascript
复制
public async Task Handle(ShippingOrder message)
{
  try
  {
    var lisaActivity = message.AsLisaShippingActivity(_commissionerUserName);

    if (Data.ShippingActivities.Contains(lisaActivity))
      return;

    Data.RoutePlanId = message.RoutePlanId;
    Data.AddShippingActivity(lisaActivity);
    var delay = TimeSpan.FromSeconds(_lisaDelayedMessageTime != 0 ? _lisaDelayedMessageTime : 60);

    if (IsNew)
    {
      await _serviceBus.DeferLocal(delay, new VerifyRouteListIsComplete(message.RoutePlanId), _environment);
    }
 }
 catch (Exception err)
 {
   Serilog.Log.Logger.Error(err, "[{SagaName}] - Error while executing Route List Saga", nameof(RouteListSaga));
   throw;
 }
}

最后,被撤销消息的处理程序:

代码语言:javascript
复制
public Task Handle(VerifyRouteListIsComplete message)
{
  try
  {
    if (!Data.SentToLisa)
    {
      var lisaData = Data.GroupShippingActivitiesToLisaActivities();

      _lisaService.SyncRouteList(lisaData).Wait();

      Data.SentToLisa = true;
    }
    MarkAsComplete();
    return Task.CompletedTask;
  }
  catch (Exception err)
  {
    Serilog.Log.Error(err, "[{SagaName}] - Error sending message to LisaApp. RouteId: {RouteId}", nameof(RouteListSaga), message.RoutePlanId);
    _serviceBus.DeferLocal(TimeSpan.FromSeconds(5), message, _configuration.GetSection("AppSettings")["Environment"]).Wait();
    MarkAsUnchanged();
    return Task.CompletedTask;
  }
}

任何帮助都是非常感谢的!

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2018-05-21 14:45:13

我不确定我是否理解这些症状,你正在经历,正确的。

如果我“同时”向第一个处理程序发送两条消息,则每条消息都会出现,即使有与该消息相关的属性,IsNew属性在第一条消息处理后也不会更改。

如果调用了EnforceExclusiveAccess,我希望以串行方式处理消息,第一条消息使用IsNew == true,第二条消息使用IsNew == false

如果不是,我预计这两条消息都将与IsNew == true并行处理,但是当sage数据被插入时,我希望其中一条消息成功,而另一条消息在ConcurrencyException中失败。

ConcurrencyException之后,将再次处理消息,这一次是使用IsNew == false

这不是你正在经历的吗?

在第二个处理程序中,我希望访问与那些Saga相关的所有数据,但我不能访问,因为数据似乎是数据,就像那些消息的修订被推迟一样。

您是说佐贺数据中的数据似乎处于延迟VerifyRouteListIsComplete消息时的状态吗?

这听起来很奇怪,而且也不太可能,你可能会再试一次,看看是否真的是这样?

更新:我已经发现了为什么您会遇到这种奇怪的行为:您不小心设置了您的saga处理程序实例,以便跨消息重用。

您是这样注册的(警告:不要这样做!):

代码语言:javascript
复制
_sagaHandler = new ShippingOrderSagaHandler(_subscriber);

_subscriber.Subscribe<ShippingOrderMessage>(_sagaHandler);
_subscriber.Subscribe<VerifyRoutePlanIsComplete>(_sagaHandler);

然后,Subscribe方法对BuiltinHandlerActivator进行此调用(警告:不要这样做!):

代码语言:javascript
复制
activator.Register(() => handlerInstance);

这是不好的(特别是对于saga处理程序),因为处理程序实例本身是有状态的-它有一个包含进程当前状态的Data属性,并且还包括IsNew属性。

您应该始终做的是,确保每次消息传入时都会创建一个新的处理程序实例--您的代码应该更改为如下所示:

代码语言:javascript
复制
_subscriber.Subscribe<ShippingOrderMessage>(() => new ShippingOrderSagaHandler(_subscriber)).Wait();
_subscriber.Subscribe<VerifyRoutePlanIsComplete>(() => new ShippingOrderSagaHandler(_subscriber)).Wait();

如果将Subscribe的实现更改为以下内容,则可以执行以下操作:

代码语言:javascript
复制
public async Task Subscribe<T>(Func<IHandleMessages<T>> getHandler)
{
    _activator.Register((bus, context) => getHandler());
    await _activator.Bus.Subscribe<T>();
}

这将解决您的独占访问问题:)

代码还有另一个问题:在注册处理程序和启动订阅服务器总线实例之间存在一个潜在的争用条件,因为理论上您可能会很不幸,在总线启动和注册处理程序之间开始接收消息。

您应该更改代码,以确保在启动总线之前注册所有处理程序(从而开始接收消息)。

票数 4
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/50449034

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档