我正在尝试通过masstransit/rabbitmq实现调度机制。
我添加了如docs中所述的配置:
Uri schedulerEndpoint = new (Constants.MassTransit.SchedulerEndpoint);
services.AddMassTransit(mtConfiguration =>
{
mtConfiguration.AddMessageScheduler(schedulerEndpoint);
mtConfiguration.AddSagaStateMachine<ArcStateMachine, ArcProcess>(typeof(ArcSagaDefinition))
.Endpoint(e => e.Name = massTransitConfiguration.SagaQueueName)
.MongoDbRepository(mongoDbConfiguration.ConnectionString, r =>
{
r.DatabaseName = mongoDbConfiguration.DbName;
r.CollectionName = mongoDbConfiguration.CollectionName;
});
mtConfiguration.UsingRabbitMq((context, cfg) =>
{
cfg.UseMessageScheduler(schedulerEndpoint);
cfg.Host(new Uri(rabbitMqConfiguration.Host), hst =>
{
hst.Username(rabbitMqConfiguration.Username);
hst.Password(rabbitMqConfiguration.Password);
});
cfg.ConfigureEndpoints(context);
});
});然后我使用Bus发送一条预定的消息
DateTime messageScheduleTime = DateTime.UtcNow + TimeSpan.FromMinutes(1);
await _MessageScheduler.SchedulePublish<ScheduledMessage>(messageScheduleTime, new
{
ActivationId = context.Data.ActivationId
});_MessageCheduler是IMessageScheduler实例。
我确实看到Scheduler队列接收预定的消息,并在其中看到了正确的scheduledTime属性,但是无论何时它的调度应该触发,消息都不会到达状态机。似乎我在配置中遗漏了一些东西,或者一些尚未启动的MassTransit服务。
拜托,帮忙。
发布于 2022-01-25 13:03:40
如果您实际阅读了https://masstransit-project.com/advanced/scheduling/rabbitmq-delayed.html,您将看到UseDelayedMessageScheduler是使用RabbitMQ进行调度的适当配置。和AddDelayedMessageScheduler用于基于容器的IMessageScheduler注册。
https://stackoverflow.com/questions/70848658
复制相似问题