当我试图在我的项目中将MassTransit包从3.2.4升级到version 6.2.3时,一些现有代码停止工作。在总线已经创建和配置之后,我使用ConnectHandler扩展将消息处理程序附加到总线上。它以前工作得很好,但现在似乎不再起作用了。使用这个分机注册的处理程序就是不触发。在总线配置期间向ReceiveEndpoint注册处理程序的替代方法在这两个MassTransit版本中似乎仍然很好。以下是一些简化的代码:
[TestFixture]
public class When_MassTransit_handler_should_consume_published_message
{
[Test]
public void This_works_fine_with_MassTransit_3_2_4_but_never_enters_handler_with_MassTransit_6_2_3()
{
var someEvent = new SomeEvent { Data = 123 };
var tcs = new TaskCompletionSource<ConsumeContext<SomeEvent>>();
var bus = Bus.Factory.CreateUsingInMemory(config => { });
bus.Start();
bus.ConnectHandler<SomeEvent>(c => Task.Run(() => tcs.SetResult(c)));
bus.Publish(someEvent, someEvent.GetType(), context => { }).Wait();
if (!tcs.Task.Wait(5000))
Assert.Fail("Event consuming takes too long (> 5000 ms)"); // fails here for 6.2.3 because the handler never fires
Assert.That(someEvent.Data, Is.EqualTo(tcs.Task.Result.Message.Data));
bus.Stop();
}
[Test]
public void This_works_fine_with_both_MassTransit_versions()
{
var someEvent = new SomeEvent { Data = 123 };
var tcs = new TaskCompletionSource<ConsumeContext<SomeEvent>>();
var bus = Bus.Factory.CreateUsingInMemory(config =>
{
config.ReceiveEndpoint("input_queue", endpoint =>
{
endpoint.Handler<SomeEvent>(c => Task.Run(() => tcs.SetResult(c)));
});
});
bus.Start();
bus.Publish(someEvent, someEvent.GetType(), context => { }).Wait();
if (!tcs.Task.Wait(5000))
Assert.Fail("Event consuming takes too long (> 5000 ms)");
Assert.That(someEvent.Data, Is.EqualTo(tcs.Task.Result.Message.Data));
bus.Stop();
}
class SomeEvent
{
public int Data;
}
}MassTransit中是否有任何导致这种行为的重大变化?这真的是MassTransit扩展方法的问题吗?还是我的代码过时了,应该进行调整以与最新版本相匹配?需要进行哪些调整?在总线已经创建和配置之后,是否有其他方法将\detach处理程序附加到总线上?
发布于 2020-03-31 22:53:25
简短的回答?是的,内存中的传输被改变了,所以它的行为更像RabbitMQ.实际上,它是完全重写的,其行为方式与RabbitMQ交换和队列相同。因此,在ConnectHandler和任何ConnectXxx方法中,它不再为处理程序消息类型创建到消息交换的绑定。内存中的传输也被更改,因此不能被多个总线实例共享,使用内存传输创建的每个总线都有自己的消息结构。
若要匹配前面的行为,请使用以下代码:
await bus.ConnectReceiveEndpoint("queue-name", x =>
{
x.Handler<SomeEvent>(...);
});对于直接发送到总线地址的消息,ConnectHandler工作正常。它只是不会到达的发布消息,因为发布交换不再绑定到总线接收端点队列。
https://stackoverflow.com/questions/60957571
复制相似问题