首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >多个批处理使用者在接收消息时抛出MessageLockLostException

多个批处理使用者在接收消息时抛出MessageLockLostException
EN

Stack Overflow用户
提问于 2021-11-11 14:48:35
回答 1查看 283关注 0票数 0

我有一个问题,我发布消息到Azure服务总线主题。我有几个批处理用户订阅了这些主题(没有转发到队列)。

问题是,有一些关于MessageLockLostException的随机警告,感觉好像出了什么问题,但我不知道是什么。

我设定锁的持续时间是5分钟。而且错误几乎是立即抛出的(所以我想不可能是这样)。

以下是抛出的错误的示例:

代码语言:javascript
复制
warn: MassTransit[0]
      Message Lock Lost: 5d5400005de20015b8d008d9a521105f
      Microsoft.Azure.ServiceBus.MessageLockLostException: The lock supplied is invalid. Either the lock expired, or the message has already been removed from the queue, or was received by a different receiver instance.
         at Microsoft.Azure.ServiceBus.Core.MessageReceiver.DisposeMessagesAsync(IEnumerable`1 lockTokens, Outcome outcome)
         at Microsoft.Azure.ServiceBus.RetryPolicy.RunOperation(Func`1 operation, TimeSpan operationTimeout)
         at Microsoft.Azure.ServiceBus.RetryPolicy.RunOperation(Func`1 operation, TimeSpan operationTimeout)
         at Microsoft.Azure.ServiceBus.Core.MessageReceiver.CompleteAsync(IEnumerable`1 lockTokens)
         at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock)
         at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock)
         at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock)
         at MassTransit.Azure.ServiceBus.Core.Transport.BrokeredMessageReceiver.MassTransit.Azure.ServiceBus.Core.Transport.IBrokeredMessageReceiver.Handle(Message message, CancellationToken cancellationToken, Action`1 contextCallback)
warn: MassTransit[0]
      Message Lock Lost: 5d5400005de20015a5cd08d9a521105f
      Microsoft.Azure.ServiceBus.MessageLockLostException: The lock supplied is invalid. Either the lock expired, or the message has already been removed from the queue, or was received by a different receiver instance.
         at Microsoft.Azure.ServiceBus.Core.MessageReceiver.DisposeMessagesAsync(IEnumerable`1 lockTokens, Outcome outcome)
         at Microsoft.Azure.ServiceBus.RetryPolicy.RunOperation(Func`1 operation, TimeSpan operationTimeout)
         at Microsoft.Azure.ServiceBus.RetryPolicy.RunOperation(Func`1 operation, TimeSpan operationTimeout)
         at Microsoft.Azure.ServiceBus.Core.MessageReceiver.CompleteAsync(IEnumerable`1 lockTokens)
         at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock)
         at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock)
         at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock)
         at MassTransit.Azure.ServiceBus.Core.Transport.BrokeredMessageReceiver.MassTransit.Azure.ServiceBus.Core.Transport.IBrokeredMessageReceiver.Handle(Message message, CancellationToken cancellationToken, Action`1 contextCallback)

以下是这个问题的最低限度复制。它将设置所有内容并发布50k条消息。

csproj:

代码语言:javascript
复制
<Project Sdk="Microsoft.NET.Sdk.Worker">

    <PropertyGroup>
        <TargetFramework>net6.0</TargetFramework>
        <Nullable>enable</Nullable>
        <ImplicitUsings>enable</ImplicitUsings>
        <UserSecretsId>dotnet-WorkerService-C6197FFA-DCA6-4867-8576-A51ADAE04FD3</UserSecretsId>
    </PropertyGroup>

    <ItemGroup>
        <PackageReference Include="MassTransit" Version="7.2.3" />
        <PackageReference Include="MassTransit.AspNetCore" Version="7.2.3" />
        <PackageReference Include="MassTransit.Azure.ServiceBus.Core" Version="7.2.3" />
        <PackageReference Include="MassTransit.EntityFrameworkCore" Version="7.2.3" />
        <PackageReference Include="MassTransit.Prometheus" Version="7.2.3" />
        <PackageReference Include="MassTransit.RabbitMQ" Version="7.2.3" />
        <PackageReference Include="Microsoft.Extensions.Hosting" Version="6.0.0" />
    </ItemGroup>
</Project>

守则:

代码语言:javascript
复制
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using GreenPipes;
using MassTransit;
using MassTransit.Azure.ServiceBus.Core;
using MassTransit.Topology;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using WorkerService;
using IHost = Microsoft.Extensions.Hosting.IHost;

IHost host = Host.CreateDefaultBuilder(args)
    .ConfigureServices(services =>
    {
        const string connectionString = "<ASB ConnectionString here>";
        Configure(services, connectionString);
        services.AddHostedService<Worker>();
    })
    .Build();
await host.RunAsync();

void Configure(IServiceCollection services, string connectionString)
{
    services.AddMassTransit(busConfigurator =>
    {
        busConfigurator.AddConsumer<TestConsumer1>();
        busConfigurator.AddConsumer<TestConsumer2>();
        busConfigurator.AddConsumer<TestConsumer3>();
        busConfigurator.AddConsumer<TestConsumer4>();
        busConfigurator.AddConsumer<TestConsumer5>();

        busConfigurator.UsingAzureServiceBus((context, serviceBusBusFactoryConfigurator) =>
        {
            serviceBusBusFactoryConfigurator.Host(connectionString);

            ConfigureSubsriptionEndpoint<TestConsumer1>(serviceBusBusFactoryConfigurator, context, "subscriber-1");
            ConfigureSubsriptionEndpoint<TestConsumer2>(serviceBusBusFactoryConfigurator, context, "subscriber-2");
            ConfigureSubsriptionEndpoint<TestConsumer3>(serviceBusBusFactoryConfigurator, context, "subscriber-3");
            ConfigureSubsriptionEndpoint<TestConsumer4>(serviceBusBusFactoryConfigurator, context, "subscriber-4");
            ConfigureSubsriptionEndpoint<TestConsumer5>(serviceBusBusFactoryConfigurator, context, "subscriber-5");
        });
    });
    services.AddMassTransitHostedService(true);
}

void ConfigureSubsriptionEndpoint<TConsumer>(IServiceBusBusFactoryConfigurator serviceBusBusFactoryConfigurator, IBusRegistrationContext context, string subscriptionName)
    where TConsumer : class, IConsumer<Batch<IMyEvent>>
{
    serviceBusBusFactoryConfigurator.SubscriptionEndpoint<IMyEvent>(
        subscriptionName,
        receiveEndpointConfigurator =>
        {
            receiveEndpointConfigurator.LockDuration = TimeSpan.FromMinutes(5);
            receiveEndpointConfigurator.PublishFaults = false;
            receiveEndpointConfigurator.MaxAutoRenewDuration = TimeSpan.FromMinutes(30);
            receiveEndpointConfigurator.UseMessageRetry(r => r.Intervals(500, 2000));
            receiveEndpointConfigurator.PrefetchCount = 1100;

            receiveEndpointConfigurator.ConfigureConsumer<TConsumer>(
                context,
                consumerConfigurator =>
                {
                    consumerConfigurator.Options<BatchOptions>(batchOptions =>
                    {
                        batchOptions.MessageLimit = 100;
                        batchOptions.TimeLimit = TimeSpan.FromSeconds(5);
                        batchOptions.ConcurrencyLimit = 10;
                    });
                });
        });
}

namespace WorkerService
{
    public class TestConsumer1 : IConsumer<Batch<IMyEvent>>
    {
        private readonly Random _random;
        private readonly ILogger<TestConsumer1> _logger;

        public TestConsumer1(ILogger<TestConsumer1> logger)
        {
            _logger = logger;
            _random = new Random();
        }

        public async Task Consume(ConsumeContext<Batch<IMyEvent>> context)
        {
            _logger.LogInformation("{name} - Consuming {count}", nameof(TestConsumer1), context.Message.Length);
            await Task.Delay(TimeSpan.FromSeconds(_random.Next(4, 8)));
        }
    }

    public class TestConsumer2 : IConsumer<Batch<IMyEvent>>
    {
        private readonly Random _random;
        private readonly ILogger<TestConsumer2> _logger;

        public TestConsumer2(ILogger<TestConsumer2> logger)
        {
            _logger = logger;
            _random = new Random();
        }

        public async Task Consume(ConsumeContext<Batch<IMyEvent>> context)
        {
            _logger.LogInformation("{name} - Consuming {count}", nameof(TestConsumer2), context.Message.Length);
            await Task.Delay(TimeSpan.FromSeconds(_random.Next(4, 8)));
        }
    }

    public class TestConsumer3 : IConsumer<Batch<IMyEvent>>
    {
        private readonly Random _random;
        private readonly ILogger<TestConsumer3> _logger;

        public TestConsumer3(ILogger<TestConsumer3> logger)
        {
            _logger = logger;
            _random = new Random();
        }

        public async Task Consume(ConsumeContext<Batch<IMyEvent>> context)
        {
            _logger.LogInformation("{name} - Consuming {count}", nameof(TestConsumer3), context.Message.Length);
            await Task.Delay(TimeSpan.FromSeconds(_random.Next(4, 8)));
        }
    }

    public class TestConsumer4 : IConsumer<Batch<IMyEvent>>
    {
        private readonly Random _random;
        private readonly ILogger<TestConsumer4> _logger;

        public TestConsumer4(ILogger<TestConsumer4> logger)
        {
            _logger = logger;
            _random = new Random();
        }

        public async Task Consume(ConsumeContext<Batch<IMyEvent>> context)
        {
            _logger.LogInformation("{name} - Consuming {count}", nameof(TestConsumer4), context.Message.Length);
            await Task.Delay(TimeSpan.FromSeconds(_random.Next(4, 8)));
        }
    }

    public class TestConsumer5 : IConsumer<Batch<IMyEvent>>
    {
        private readonly Random _random;
        private readonly ILogger<TestConsumer5> _logger;

        public TestConsumer5(ILogger<TestConsumer5> logger)
        {
            _logger = logger;
            _random = new Random();
        }

        public async Task Consume(ConsumeContext<Batch<IMyEvent>> context)
        {
            _logger.LogInformation("{name} - Consuming {count}", nameof(TestConsumer5), context.Message.Length);
            await Task.Delay(TimeSpan.FromSeconds(_random.Next(4, 8)));
        }
    }

    [EntityName("my-event")]
    public interface IMyEvent
    {
    }

    public class Worker : BackgroundService
    {
        private readonly ILogger<Worker> _logger;
        private readonly IBus _bus;

        public Worker(
            ILogger<Worker> logger,
            IBus bus)
        {
            _logger = logger;
            _bus = bus;
        }

        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            _logger.LogInformation("Worker running at: {time}", DateTimeOffset.Now);

            var tasks = new List<Task>();
            var count = 50000;
            for (int i = 0; i < count; i++)
            {
                tasks.Add(_bus.Publish<IMyEvent>(new { }));
            }

            await Task.WhenAll(tasks);
        }
    }
}

更新

我已经确认了这些错误与Azure服务总线实例的节流相关。第一个图像显示错误的发生,第二个图像显示ASB中节流请求的数量。它们似乎关系很好。这也解释了为什么我不能可靠地再现这些错误。

EN

回答 1

Stack Overflow用户

发布于 2021-11-12 08:24:19

首先,我们需要确保我们是最晚的版本,如果没有,请升级。

确保在function.json中添加重试策略,如下所示:

代码语言:javascript
复制
{
    "disabled": false,
    "bindings": [
        {
            ....
        }
    ],
    "retry": {
        "strategy": "fixedDelay",
        "maxRetryCount": 4,
        "delayInterval": "00:00:10"
    }
}

同时检查死信并在Docs女士的帮助下配置它。

我们需要通过在JSON结构"maxAutoLockRenewalDuration":“00:05:00:00”中包含参数,说明在JSON中更新锁的期限。

下面是host.json的示例

代码语言:javascript
复制
{
    "version": "2.0",
    "extensions": {
        "serviceBus": {
            "clientRetryOptions":{
                "mode": "exponential",
                "tryTimeout": "00:01:00",
                "delay": "00:00:00.80",
                "maxDelay": "00:01:00",
                "maxRetries": 3
            },
            "prefetchCount": 0,
            "autoCompleteMessages": true,
            "maxAutoLockRenewalDuration": "00:05:00",
            "maxConcurrentCalls": 16,
            "maxConcurrentSessions": 8,
            "maxMessages": 1000,
            "sessionIdleTimeout": "00:01:00"
        }
    }
}

在上面的JSON中,将autoCompleteMessages的参数从true改为false,作为"autoCompleteMessages":false。

当设置为false时,您负责调用MessageReceiver方法来完成、放弃或死信消息。如果抛出异常(且没有调用任何MessageReceiver方法),则锁将保持不变。锁过期后,消息将与DeliveryCount一起重新排队,锁将自动更新。

使用ReceiveandLock解决了这个问题问题。

请参阅以下SO线程:SO1SO2 (感谢已回答的作者获得详细解释)

票数 -1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/69930309

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档