首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >MassTransit处理作业失败

MassTransit处理作业失败
EN

Stack Overflow用户
提问于 2021-09-21 07:56:09
回答 1查看 324关注 0票数 1

我正在尝试使用RabbitMQ队列连接两个服务。第一个服务将值推送到队列,第二个服务将其检索并进行处理。一切都很好,但是当第二个服务试图处理作业时,它会抛出异常。队列项保留在没有任何信息的JobAttempt队列中,使用者服务重试处理作业,但每次都抛出相同的异常。

异常

代码语言:javascript
复制
    fail: MassTransit.ReceiveTransport[0]
      S-FAULT rabbitmq://localhost/JobAttempt f0cb0000-1616-902e-edb0-08d97cd26cf9 MassTransit.Contracts.JobService.JobStatusCheckRequested
fail: MassTransit.ReceiveTransport[0]
    T-FAULT rabbitmq://localhost/JobAttempt f0cb0000-1616-902e-8129-08d97cca994f
      System.Threading.Tasks.TaskCanceledException: A task was canceled.
         at MassTransit.Saga.InMemoryRepository.InMemorySagaRepositoryContext`2.Delete(SagaConsumeContext`1 context)
         at MassTransit.Saga.MissingSagaPipe`2.Send(SagaConsumeContext`2 context)
         at MassTransit.Saga.MissingSagaPipe`2.Send(SagaConsumeContext`2 context)
         at MassTransit.Saga.SendSagaPipe`2.Send(SagaRepositoryContext`2 context)
         at MassTransit.Saga.InMemoryRepository.InMemorySagaRepositoryContextFactory`1.Send[T](ConsumeContext`1 context, IPipe`1 next)
         at MassTransit.Saga.Pipeline.Filters.CorrelatedSagaFilter`2.GreenPipes.IFilter<MassTransit.ConsumeContext<TMessage>>.Send(ConsumeContext`1 context, IPipe`1 next)
         at MassTransit.Saga.Pipeline.Filters.CorrelatedSagaFilter`2.GreenPipes.IFilter<MassTransit.ConsumeContext<TMessage>>.Send(ConsumeContext`1 context, IPipe`1 next)
         at MassTransit.Pipeline.Filters.InMemoryOutboxFilter`2.Send(TContext context, IPipe`1 next)
         at MassTransit.Pipeline.Filters.InMemoryOutboxFilter`2.Send(TContext context, IPipe`1 next)
         at GreenPipes.Filters.RetryFilter`1.GreenPipes.IFilter<TContext>.Send(TContext context, IPipe`1 next)
         at GreenPipes.Partitioning.Partition.Send[T](T context, IPipe`1 next)
         at GreenPipes.Filters.TeeFilter`1.<>c__DisplayClass5_0.<<Send>g__SendAsync|1>d.MoveNext()
      --- End of stack trace from previous location ---
         at GreenPipes.Filters.OutputPipeFilter`2.SendToOutput(IPipe`1 next, TOutput pipeContext)
         at GreenPipes.Filters.OutputPipeFilter`2.SendToOutput(IPipe`1 next, TOutput pipeContext)
         at GreenPipes.Filters.DynamicFilter`1.<>c__DisplayClass10_0.<<Send>g__SendAsync|0>d.MoveNext()
      --- End of stack trace from previous location ---
         at MassTransit.Pipeline.Filters.DeserializeFilter.Send(ReceiveContext context, IPipe`1 next)
         at GreenPipes.Filters.RescueFilter`2.GreenPipes.IFilter<TContext>.Send(TContext context, IPipe`1 next)
         at GreenPipes.Filters.RescueFilter`2.GreenPipes.IFilter<TContext>.Send(TContext context, IPipe`1 next)
         at MassTransit.Pipeline.Filters.DeadLetterFilter.GreenPipes.IFilter<MassTransit.ReceiveContext>.Send(ReceiveContext context, IPipe`1 next)
         at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock)
         at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock)
         at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock)
         at MassTransit.RabbitMqTransport.Pipeline.RabbitMqBasicConsumer.<>c__DisplayClass24_0.<<HandleBasicDeliver>b__0>d.MoveNext()

生产者启动:

代码语言:javascript
复制
services.AddMassTransit(x =>
{
    x.SetKebabCaseEndpointNameFormatter();

    x.UsingRabbitMq((context, cfg) =>
    {
       cfg.ConfigureEndpoints(context);
    });
});
services.AddMassTransitHostedService();

消费者创业公司:

代码语言:javascript
复制
        services.AddMassTransit(x =>
        {
            x.AddDelayedMessageScheduler();

            x.AddConsumer<LoanRequestJobConsumer>(cfg =>
            {
                cfg.Options<JobOptions<LoanRequestBroker>>(options =>
                {
                    options.SetJobTimeout(TimeSpan.FromMinutes(5));
                    options.SetConcurrentJobLimit(10);
                });
            });

            x.SetKebabCaseEndpointNameFormatter();

            x.UsingRabbitMq((context, cfg) =>
            {
                cfg.UseDelayedMessageScheduler();

                cfg.ServiceInstance(instance =>
                {
                    instance.ConfigureJobServiceEndpoints(js =>
                    {
                        js.SagaPartitionCount = 1;
                        js.FinalizeCompleted = true;
                    });

                    cfg.ReceiveEndpoint("loan-request-processing", e =>
                    {
                        e.ConfigureConsumer<LoanRequestJobConsumer>(context);
                    });

                    instance.ConfigureEndpoints(context);
                });
            });
        });
        services.AddMassTransitHostedService();

职业消费者

代码语言:javascript
复制
public class LoanRequestJobConsumer : IJobConsumer<LoanRequestBroker>
{
    private readonly ILogger<LoanRequestJobConsumer> _logger;
    private readonly ILoanProcessingService _processingService;

    public LoanRequestJobConsumer(
        ILogger<LoanRequestJobConsumer> logger,
        ILoanProcessingService processingService)
    {
        _logger = logger;
        _processingService = processingService;
    }

    public async Task Run(JobContext<LoanRequestBroker> context)
    {
        _logger.LogInformation($"{nameof(LoanRequestJobConsumer)}: start processing loan request id = {context.Job.Id}");

        var processingInfo = new LoanProcessingInfo
        {
            Status = TaskStatus.InProgress,
            LoanRequest = context.Job.Adapt<LoanRequest>()
        };
        processingInfo = await _processingService.SaveProcessingInfoAsync(processingInfo);

        processingInfo = await _processingService.ProcessAsync(processingInfo);

        processingInfo = await _processingService.SaveProcessingInfoAsync(processingInfo);

        _logger.LogInformation($"{nameof(LoanRequestJobConsumer)}: end processing loan request id = {context.Job.Id}" +
                               $"\nResult: {JsonConvert.SerializeObject(processingInfo)}");
    }
}

如何将项推送到队列

代码语言:javascript
复制
var endpoint = await _sendEndpointProvider.GetSendEndpoint(_brokerEndpoints.LoanProcessingQueue);
await endpoint.Send(loanRequest.Adapt<LoanRequestBroker>());
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-09-21 11:21:08

如果我不得不猜测,在没有任何其他错误日志细节的情况下,我会认为延迟的交换插件没有在RabbitMQ上安装/启用。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/69265302

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档