首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >ConnectHandler 6.2.3中可疑的MassTransit扩展方法问题

ConnectHandler 6.2.3中可疑的MassTransit扩展方法问题
EN

Stack Overflow用户
提问于 2020-03-31 18:35:32
回答 1查看 93关注 0票数 2

当我试图在我的项目中将MassTransit包从3.2.4升级到version 6.2.3时,一些现有代码停止工作。在总线已经创建和配置之后,我使用ConnectHandler扩展将消息处理程序附加到总线上。它以前工作得很好,但现在似乎不再起作用了。使用这个分机注册的处理程序就是不触发。在总线配置期间向ReceiveEndpoint注册处理程序的替代方法在这两个MassTransit版本中似乎仍然很好。以下是一些简化的代码:

代码语言:javascript
复制
    [TestFixture]
    public class When_MassTransit_handler_should_consume_published_message
    {
        [Test]
        public void This_works_fine_with_MassTransit_3_2_4_but_never_enters_handler_with_MassTransit_6_2_3()
        {
            var someEvent = new SomeEvent { Data = 123 };

            var tcs = new TaskCompletionSource<ConsumeContext<SomeEvent>>();

            var bus = Bus.Factory.CreateUsingInMemory(config => { });

            bus.Start();

            bus.ConnectHandler<SomeEvent>(c => Task.Run(() => tcs.SetResult(c)));

            bus.Publish(someEvent, someEvent.GetType(), context => { }).Wait();

            if (!tcs.Task.Wait(5000))
                Assert.Fail("Event consuming takes too long (> 5000 ms)"); // fails here for 6.2.3 because the handler never fires

            Assert.That(someEvent.Data, Is.EqualTo(tcs.Task.Result.Message.Data));

            bus.Stop();
        }

        [Test]
        public void This_works_fine_with_both_MassTransit_versions()
        {
            var someEvent = new SomeEvent { Data = 123 };

            var tcs = new TaskCompletionSource<ConsumeContext<SomeEvent>>();

            var bus = Bus.Factory.CreateUsingInMemory(config => 
            {
                config.ReceiveEndpoint("input_queue", endpoint =>
                {
                    endpoint.Handler<SomeEvent>(c => Task.Run(() => tcs.SetResult(c)));
                });
            });

            bus.Start();

            bus.Publish(someEvent, someEvent.GetType(), context => { }).Wait();

            if (!tcs.Task.Wait(5000))
                Assert.Fail("Event consuming takes too long (> 5000 ms)");

            Assert.That(someEvent.Data, Is.EqualTo(tcs.Task.Result.Message.Data));

            bus.Stop();
        }

        class SomeEvent
        {
            public int Data;
        }
    }

MassTransit中是否有任何导致这种行为的重大变化?这真的是MassTransit扩展方法的问题吗?还是我的代码过时了,应该进行调整以与最新版本相匹配?需要进行哪些调整?在总线已经创建和配置之后,是否有其他方法将\detach处理程序附加到总线上?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-03-31 22:53:25

简短的回答?是的,内存中的传输被改变了,所以它的行为更像RabbitMQ.实际上,它是完全重写的,其行为方式与RabbitMQ交换和队列相同。因此,在ConnectHandler和任何ConnectXxx方法中,它不再为处理程序消息类型创建到消息交换的绑定。内存中的传输也被更改,因此不能被多个总线实例共享,使用内存传输创建的每个总线都有自己的消息结构。

若要匹配前面的行为,请使用以下代码:

代码语言:javascript
复制
await bus.ConnectReceiveEndpoint("queue-name", x =>
{
    x.Handler<SomeEvent>(...);
});

对于直接发送到总线地址的消息,ConnectHandler工作正常。它只是不会到达的发布消息,因为发布交换不再绑定到总线接收端点队列。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/60957571

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档