我有一个问题,我发布消息到Azure服务总线主题。我有几个批处理用户订阅了这些主题(没有转发到队列)。
问题是,有一些关于MessageLockLostException的随机警告,感觉好像出了什么问题,但我不知道是什么。
我设定锁的持续时间是5分钟。而且错误几乎是立即抛出的(所以我想不可能是这样)。
以下是抛出的错误的示例:
warn: MassTransit[0]
Message Lock Lost: 5d5400005de20015b8d008d9a521105f
Microsoft.Azure.ServiceBus.MessageLockLostException: The lock supplied is invalid. Either the lock expired, or the message has already been removed from the queue, or was received by a different receiver instance.
at Microsoft.Azure.ServiceBus.Core.MessageReceiver.DisposeMessagesAsync(IEnumerable`1 lockTokens, Outcome outcome)
at Microsoft.Azure.ServiceBus.RetryPolicy.RunOperation(Func`1 operation, TimeSpan operationTimeout)
at Microsoft.Azure.ServiceBus.RetryPolicy.RunOperation(Func`1 operation, TimeSpan operationTimeout)
at Microsoft.Azure.ServiceBus.Core.MessageReceiver.CompleteAsync(IEnumerable`1 lockTokens)
at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock)
at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock)
at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock)
at MassTransit.Azure.ServiceBus.Core.Transport.BrokeredMessageReceiver.MassTransit.Azure.ServiceBus.Core.Transport.IBrokeredMessageReceiver.Handle(Message message, CancellationToken cancellationToken, Action`1 contextCallback)
warn: MassTransit[0]
Message Lock Lost: 5d5400005de20015a5cd08d9a521105f
Microsoft.Azure.ServiceBus.MessageLockLostException: The lock supplied is invalid. Either the lock expired, or the message has already been removed from the queue, or was received by a different receiver instance.
at Microsoft.Azure.ServiceBus.Core.MessageReceiver.DisposeMessagesAsync(IEnumerable`1 lockTokens, Outcome outcome)
at Microsoft.Azure.ServiceBus.RetryPolicy.RunOperation(Func`1 operation, TimeSpan operationTimeout)
at Microsoft.Azure.ServiceBus.RetryPolicy.RunOperation(Func`1 operation, TimeSpan operationTimeout)
at Microsoft.Azure.ServiceBus.Core.MessageReceiver.CompleteAsync(IEnumerable`1 lockTokens)
at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock)
at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock)
at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock)
at MassTransit.Azure.ServiceBus.Core.Transport.BrokeredMessageReceiver.MassTransit.Azure.ServiceBus.Core.Transport.IBrokeredMessageReceiver.Handle(Message message, CancellationToken cancellationToken, Action`1 contextCallback)以下是这个问题的最低限度复制。它将设置所有内容并发布50k条消息。
csproj:
<Project Sdk="Microsoft.NET.Sdk.Worker">
<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<Nullable>enable</Nullable>
<ImplicitUsings>enable</ImplicitUsings>
<UserSecretsId>dotnet-WorkerService-C6197FFA-DCA6-4867-8576-A51ADAE04FD3</UserSecretsId>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="MassTransit" Version="7.2.3" />
<PackageReference Include="MassTransit.AspNetCore" Version="7.2.3" />
<PackageReference Include="MassTransit.Azure.ServiceBus.Core" Version="7.2.3" />
<PackageReference Include="MassTransit.EntityFrameworkCore" Version="7.2.3" />
<PackageReference Include="MassTransit.Prometheus" Version="7.2.3" />
<PackageReference Include="MassTransit.RabbitMQ" Version="7.2.3" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="6.0.0" />
</ItemGroup>
</Project>守则:
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using GreenPipes;
using MassTransit;
using MassTransit.Azure.ServiceBus.Core;
using MassTransit.Topology;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using WorkerService;
using IHost = Microsoft.Extensions.Hosting.IHost;
IHost host = Host.CreateDefaultBuilder(args)
.ConfigureServices(services =>
{
const string connectionString = "<ASB ConnectionString here>";
Configure(services, connectionString);
services.AddHostedService<Worker>();
})
.Build();
await host.RunAsync();
void Configure(IServiceCollection services, string connectionString)
{
services.AddMassTransit(busConfigurator =>
{
busConfigurator.AddConsumer<TestConsumer1>();
busConfigurator.AddConsumer<TestConsumer2>();
busConfigurator.AddConsumer<TestConsumer3>();
busConfigurator.AddConsumer<TestConsumer4>();
busConfigurator.AddConsumer<TestConsumer5>();
busConfigurator.UsingAzureServiceBus((context, serviceBusBusFactoryConfigurator) =>
{
serviceBusBusFactoryConfigurator.Host(connectionString);
ConfigureSubsriptionEndpoint<TestConsumer1>(serviceBusBusFactoryConfigurator, context, "subscriber-1");
ConfigureSubsriptionEndpoint<TestConsumer2>(serviceBusBusFactoryConfigurator, context, "subscriber-2");
ConfigureSubsriptionEndpoint<TestConsumer3>(serviceBusBusFactoryConfigurator, context, "subscriber-3");
ConfigureSubsriptionEndpoint<TestConsumer4>(serviceBusBusFactoryConfigurator, context, "subscriber-4");
ConfigureSubsriptionEndpoint<TestConsumer5>(serviceBusBusFactoryConfigurator, context, "subscriber-5");
});
});
services.AddMassTransitHostedService(true);
}
void ConfigureSubsriptionEndpoint<TConsumer>(IServiceBusBusFactoryConfigurator serviceBusBusFactoryConfigurator, IBusRegistrationContext context, string subscriptionName)
where TConsumer : class, IConsumer<Batch<IMyEvent>>
{
serviceBusBusFactoryConfigurator.SubscriptionEndpoint<IMyEvent>(
subscriptionName,
receiveEndpointConfigurator =>
{
receiveEndpointConfigurator.LockDuration = TimeSpan.FromMinutes(5);
receiveEndpointConfigurator.PublishFaults = false;
receiveEndpointConfigurator.MaxAutoRenewDuration = TimeSpan.FromMinutes(30);
receiveEndpointConfigurator.UseMessageRetry(r => r.Intervals(500, 2000));
receiveEndpointConfigurator.PrefetchCount = 1100;
receiveEndpointConfigurator.ConfigureConsumer<TConsumer>(
context,
consumerConfigurator =>
{
consumerConfigurator.Options<BatchOptions>(batchOptions =>
{
batchOptions.MessageLimit = 100;
batchOptions.TimeLimit = TimeSpan.FromSeconds(5);
batchOptions.ConcurrencyLimit = 10;
});
});
});
}
namespace WorkerService
{
public class TestConsumer1 : IConsumer<Batch<IMyEvent>>
{
private readonly Random _random;
private readonly ILogger<TestConsumer1> _logger;
public TestConsumer1(ILogger<TestConsumer1> logger)
{
_logger = logger;
_random = new Random();
}
public async Task Consume(ConsumeContext<Batch<IMyEvent>> context)
{
_logger.LogInformation("{name} - Consuming {count}", nameof(TestConsumer1), context.Message.Length);
await Task.Delay(TimeSpan.FromSeconds(_random.Next(4, 8)));
}
}
public class TestConsumer2 : IConsumer<Batch<IMyEvent>>
{
private readonly Random _random;
private readonly ILogger<TestConsumer2> _logger;
public TestConsumer2(ILogger<TestConsumer2> logger)
{
_logger = logger;
_random = new Random();
}
public async Task Consume(ConsumeContext<Batch<IMyEvent>> context)
{
_logger.LogInformation("{name} - Consuming {count}", nameof(TestConsumer2), context.Message.Length);
await Task.Delay(TimeSpan.FromSeconds(_random.Next(4, 8)));
}
}
public class TestConsumer3 : IConsumer<Batch<IMyEvent>>
{
private readonly Random _random;
private readonly ILogger<TestConsumer3> _logger;
public TestConsumer3(ILogger<TestConsumer3> logger)
{
_logger = logger;
_random = new Random();
}
public async Task Consume(ConsumeContext<Batch<IMyEvent>> context)
{
_logger.LogInformation("{name} - Consuming {count}", nameof(TestConsumer3), context.Message.Length);
await Task.Delay(TimeSpan.FromSeconds(_random.Next(4, 8)));
}
}
public class TestConsumer4 : IConsumer<Batch<IMyEvent>>
{
private readonly Random _random;
private readonly ILogger<TestConsumer4> _logger;
public TestConsumer4(ILogger<TestConsumer4> logger)
{
_logger = logger;
_random = new Random();
}
public async Task Consume(ConsumeContext<Batch<IMyEvent>> context)
{
_logger.LogInformation("{name} - Consuming {count}", nameof(TestConsumer4), context.Message.Length);
await Task.Delay(TimeSpan.FromSeconds(_random.Next(4, 8)));
}
}
public class TestConsumer5 : IConsumer<Batch<IMyEvent>>
{
private readonly Random _random;
private readonly ILogger<TestConsumer5> _logger;
public TestConsumer5(ILogger<TestConsumer5> logger)
{
_logger = logger;
_random = new Random();
}
public async Task Consume(ConsumeContext<Batch<IMyEvent>> context)
{
_logger.LogInformation("{name} - Consuming {count}", nameof(TestConsumer5), context.Message.Length);
await Task.Delay(TimeSpan.FromSeconds(_random.Next(4, 8)));
}
}
[EntityName("my-event")]
public interface IMyEvent
{
}
public class Worker : BackgroundService
{
private readonly ILogger<Worker> _logger;
private readonly IBus _bus;
public Worker(
ILogger<Worker> logger,
IBus bus)
{
_logger = logger;
_bus = bus;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Worker running at: {time}", DateTimeOffset.Now);
var tasks = new List<Task>();
var count = 50000;
for (int i = 0; i < count; i++)
{
tasks.Add(_bus.Publish<IMyEvent>(new { }));
}
await Task.WhenAll(tasks);
}
}
}更新
我已经确认了这些错误与Azure服务总线实例的节流相关。第一个图像显示错误的发生,第二个图像显示ASB中节流请求的数量。它们似乎关系很好。这也解释了为什么我不能可靠地再现这些错误。


发布于 2021-11-12 08:24:19
首先,我们需要确保我们是最晚的版本,如果没有,请升级。
确保在function.json中添加重试策略,如下所示:
{
"disabled": false,
"bindings": [
{
....
}
],
"retry": {
"strategy": "fixedDelay",
"maxRetryCount": 4,
"delayInterval": "00:00:10"
}
}同时检查死信并在Docs女士的帮助下配置它。
我们需要通过在JSON结构"maxAutoLockRenewalDuration":“00:05:00:00”中包含参数,说明在JSON中更新锁的期限。
下面是host.json的示例
{
"version": "2.0",
"extensions": {
"serviceBus": {
"clientRetryOptions":{
"mode": "exponential",
"tryTimeout": "00:01:00",
"delay": "00:00:00.80",
"maxDelay": "00:01:00",
"maxRetries": 3
},
"prefetchCount": 0,
"autoCompleteMessages": true,
"maxAutoLockRenewalDuration": "00:05:00",
"maxConcurrentCalls": 16,
"maxConcurrentSessions": 8,
"maxMessages": 1000,
"sessionIdleTimeout": "00:01:00"
}
}
}在上面的JSON中,将autoCompleteMessages的参数从true改为false,作为"autoCompleteMessages":false。
当设置为false时,您负责调用MessageReceiver方法来完成、放弃或死信消息。如果抛出异常(且没有调用任何MessageReceiver方法),则锁将保持不变。锁过期后,消息将与DeliveryCount一起重新排队,锁将自动更新。
使用ReceiveandLock解决了这个问题问题。
https://stackoverflow.com/questions/69930309
复制相似问题