首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Masstransit consumer death - Peek()

Masstransit consumer death - Peek()
EN

Stack Overflow用户
提问于 2012-12-21 11:55:12
回答 1查看 1.9K关注 0票数 5

我很好奇,在实际检索消息之前,MassTransit使用者是否可以窥视() MSMQ队列。

步骤/过程是什么:

1)消息发送到队列

2)消费者得到它,并且必须进行数据库更新--大约需要5秒

3)如果第一轮更新有效,消费者必须进行第二轮更新。

我的问题是,如果第一次数据库更新失败,消息留在队列中(即网络问题,无法访问数据库),我该如何处理这种情况。

目前,一旦它从队列中读取msg,它就会删除它,然后如果DB更新失败,它就会消失。

此外,我如何处理电源故障--我的意思是,如果用户的“作业”进行到一半,不管是什么(数据库更新或其他东西),电源中断等等,我如何在队列中的msg上重新运行进程?假设作业(至少在我当前的实例中)是将一个新行推送到一个表中。我的意思是,我可以编写代码,首先检查行是否存在,如果行存在,则删除消息,如果不存在,则运行任务,但如何才能让它首先重新运行整个过程?

我读到我可以Peek()队列,然后运行任务,然后真正读取队列msg并删除它,但我终生无法弄清楚这是否适用于公共交通……有点迷失..。

此外,我知道Masstransit有.RetryLater,但我是否可以在过程中使用它?是传奇中的Initially --> When --> Then --> .RetryLater吗?

任何指针都将被指定

最亲切的问候罗宾

编辑

附言:我正在使用一个传奇故事……

代码语言:javascript
复制
Define(() =>
            {
                RemoveWhen(saga => saga.CurrentState == Completed);

                Initially(
                    When(NewAC)
                        .Then((saga, message) => saga.ProcessPSM(message),
                            InCaseOf<Exception>()
                               .TransitionTo(Problem)                                  
                                )
                        .Then((saga, message) => saga.PostProcessPSM())
                        .Complete()
                    );
                During(Problem,
                    When(Waiting)
                               // NOTE: THIS DOES NOT WORK!!!!
                        .RetryLater()
                    );
                });

RetryLater抛出一个错误:“消息不能被现有的saga接受”

我不确定我还能怎么访问‘RetryLater’。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2012-12-21 20:21:52

MassTransit抽象了底层队列的概念。因此,Peek不是解决方案,但它确实有其他方法来重试消息。如果您只对处理错误和失败条件感兴趣,则以下机制就足够了。

默认情况下,如果消费者抛出异常,消息将被重试N次:

重试队列,其中N在总线上配置,默认为5。可以在总线初始化中使用ServiceBusConfigurator

  • Where

  • 上的SetDefaultRetryLimit进行更改,这意味着消息将被添加到队列

的末尾

如果你想要一个更细粒度的错误处理方法,你可以实现一个上下文消费者,捕获可恢复的或瞬时的异常,并手动调用RetryLater。据我所知,这样做的次数是没有限制的。

代码语言:javascript
复制
public class RetryConsumer : Consumes<AwesomeMessage>.Context
{

    public void Consume(IConsumeContext<AwesomeMessage> message)
    {
        try
        {
            Console.WriteLine("This is Attempt " + message.RetryCount);
            // Do Something
        }
        catch (SomeTransientException e)
        {
            message.RetryLater();
        }
    }
}
票数 7
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/13984042

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档