首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >spring死信队列不适用于事务。

spring死信队列不适用于事务。
EN

Stack Overflow用户
提问于 2018-02-21 20:18:48
回答 1查看 1.1K关注 0票数 0

当channelTransacted(true)出现问题而不是请求时,如何设置侦听器容器将消息抛到死信队列中?当我不使用channelTransacted时,一切正常,我可以在死信队列中看到消息。

代码语言:javascript
复制
@Bean(name = "amqpInboundEsignRequest")
public IntegrationFlow amqpInbound(ConnectionFactory connectionFactory, PlatformTransactionManager transactionManager) {
    return IntegrationFlows.from(
            Amqp.inboundAdapter(connectionFactory, esignIAutoRequestQueue())
                    .acknowledgeMode(AcknowledgeMode.AUTO)
                    .messageConverter(new Jackson2JsonMessageConverter())
                    .autoStartup(false)
                    .defaultRequeueRejected(false)
                    //.channelTransacted(true) // dead letter does not work
                    //.transactionManager(transactionManager) // dead letter does not work
    )
            .log("amqpInbound.start-process")
            .handle(p -> {
                throw new RuntimeException("Something wrong!");
            })
            .get();
}

编辑

这些是依赖关系的版本。

代码语言:javascript
复制
[INFO] +- org.springframework.boot:spring-boot-starter-amqp:jar:1.5.9.RELEASE:compile
[INFO] |  +- org.springframework:spring-messaging:jar:4.3.13.RELEASE:compile
[INFO] |  \- org.springframework.amqp:spring-rabbit:jar:1.7.4.RELEASE:compile
[INFO] |     +- com.rabbitmq:http-client:jar:1.1.1.RELEASE:compile
[INFO] |     +- com.rabbitmq:amqp-client:jar:4.0.3:compile
[INFO] |     +- org.springframework.retry:spring-retry:jar:1.2.1.RELEASE:compile
[INFO] |     \- org.springframework.amqp:spring-amqp:jar:1.7.4.RELEASE:compile

[INFO] +- org.springframework.boot:spring-boot-starter-integration:jar:1.5.9.RELEASE:compile
[INFO] |  +- org.springframework.integration:spring-integration-core:jar:4.3.12.RELEASE:compile
[INFO] |  \- org.springframework.integration:spring-integration-java-dsl:jar:1.2.3.RELEASE:compile
[INFO] |     \- org.reactivestreams:reactive-streams:jar:1.0.0:compile

我希望事务与外部事务数据库(PlatformTransactionManager)同步。当我在侦听器容器上设置transactionManager(transactionManager)时,它总是执行请求。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2018-02-21 21:44:42

你用的是什么版本?我刚刚用SpringIntegration5.0.2和SpringAMQP2.0.2以及SpringIntegration4.3.14和SpringAMQP1.7.6测试了channelTransacted=true,一切都如期而至,失败的消息传递给DLQ。

代码语言:javascript
复制
@SpringBootApplication
public class So48914749Application {

    public static void main(String[] args) {
        SpringApplication.run(So48914749Application.class, args);
    }

    @Bean
    public IntegrationFlow amqpInbound(ConnectionFactory connectionFactory) {
        return IntegrationFlows.from(
                Amqp.inboundAdapter(connectionFactory, "foo")
                        .acknowledgeMode(AcknowledgeMode.AUTO)
                        .messageConverter(new Jackson2JsonMessageConverter())
                        .autoStartup(true)
                        .defaultRequeueRejected(false)
                        .channelTransacted(true) // dead letter does not work
        // .transactionManager(transactionManager) // dead letter does not work
        )
                .log("amqpInbound.start-process")
                .handle(p -> {
                    throw new RuntimeException("Something wrong!");
                })
                .get();
    }

    @Bean
    public Queue queue() {
        return QueueBuilder.durable("foo")
                .withArgument("x-dead-letter-exchange", "")
                .withArgument("x-dead-letter-routing-key", "dlq")
                .build();
    }

    @Bean
    public Queue dlq() {
        return new Queue("dlq");
    }

    @RabbitListener(queues = "dlq")
    public void dlq(Message in) {
        System.out.println(in);
    }

}

结果:

(Body:‘B@17793549(Byte8)’MessageProperties [headers={x-first-death-exchange=,x-death=[{reason=rejected,count=1,exchange=,time=Wed Feb 21 16:48 EST 2018,路由-密钥=foo,queue=foo}],x-先死亡-原因=拒绝,x-第一死亡-队列=foo},timestamp=null,messageId=null,userId=null,receivedUserId=null,appId=null,clusterId=null,type=null,correlationId=null,correlationIdString=null,type=null,en19#,en31 20#,#en21,#en25,#en26,#en29,#en30,Taenqam.cypdcypdcdtag-Xen5kkkkz2wgIcnkkkmg9wg2wgmg9wkmg9wkmg2wmg9wmg2wg2wg2wxng2wg2wxnbmg9wgmgmgmg.xnbmg2wg2wg2wgmtn.n.nt31)(headers={x-first-death-exchange=,x-death=[{reason=rejected,count=1,exchange=,time=Wed Fe2 1 16:48 EST,路由密钥=foo,queue=foo}]

编辑

请在未来更加精确;您的问题暗示同样的问题发生了,不管是否提供了事务管理器。

关于接收到的消息回滚的说明。这在2.0中得到了修正。

在1.7.x中,为了保持向后兼容性,我们必须默认保留旧的行为。1.7.x文件解释说,您必须将alwaysRequeueWithTxManagerRollback设置为获取新(一致)行为的false

这是一个侦听器容器属性,因此您需要连接一个容器并将其注入入站通道适配器。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/48914749

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档