首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Spring AMQP Java配置

Spring AMQP Java配置
EN

Stack Overflow用户
提问于 2018-02-20 22:42:39
回答 1查看 2.2K关注 0票数 0

我有一个Maven Spring MVC项目,我通过RabbitMQ和spring-amqp库实现了服务之间的内部消息传递。

这是我的spring amqp java配置代码:

代码语言:javascript
复制
@PropertySource({"classpath:hello.properties"})
@Configuration
@ComponentScan("com.example.hello")
public class MessageConfig {

    @Value("${amqp.host}")
    private String host;

    @Value("${amqp.port}")
    private int port;

    @Value("${amqp.usr}")
    private String username;

    @Value("${amqp.pwd}")
    private String password;

    @Value("${amqp.vhost}")
    private String virtual_host;

    @Value("${amqp.ex}")
    private String exchange;

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(host);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(virtual_host);
        return connectionFactory;
    }

    @Bean
    public TopicExchange emsExchange() {
        return new TopicExchange(exchange, true, false);
    }

    @Bean    
    public Queue systemQueque() {
        return new Queue("system");
    }

    @Bean 
    public Binding systemBinding() {
        return BindingBuilder.bind(systemQueque()).to(emsExchange()).with(systemQueque().getName());
    }

    @Bean
    public SimpleMessageListenerContainer listenerSystemQueque() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory());
        container.setQueueNames(systemQueque().getName());
        container.setMessageListener(listenerAdapterSystem());
        return container;
    }

    @Bean
    public MessageConsumerSystem listenerAdapterSystem() {
        return new MessageConsumerSystem();
    }
}

这是我的“消费者”:

代码语言:javascript
复制
public class MessageConsumerSystem implements MessageListener {

    private static final Logger LOGGER = LoggerFactory.getLogger(MessageConsumerSystem.class);

    @Override
    public void onMessage(Message message) {
        try {
            final MessageProperties messageProperties = message.getMessageProperties();
            final String body = new String(message.getBody());

            LOGGER.debug("*********** AMQP Message **********");
            LOGGER.debug(" Id          : " + messageProperties.getMessageId());
            LOGGER.debug(" CorrelId    : " + messageProperties.getCorrelationId());
            LOGGER.debug(" Timestamp   : " + messageProperties.getTimestamp());
            LOGGER.debug(" Service     : " + messageProperties.getHeaders().get("service"));
            LOGGER.debug(" Content-Type: " + messageProperties.getContentType());
            LOGGER.debug(" Encoding    : " + messageProperties.getContentEncoding());
            LOGGER.debug(" Message     : " + body);
            LOGGER.debug("*************** End ***************");

            JAXBContext jaxbContext = JAXBContext.newInstance(ObjectFactory.class);
            Unmarshaller unmarshaller = jaxbContext.createUnmarshaller();
            unmarshaller.setEventHandler(new javax.xml.bind.helpers.DefaultValidationEventHandler());
            Object objectJAXB = unmarshaller.unmarshal(new StringReader(body));

            if (objectJAXB instanceof ServiceStart) {   
            }
            else if (objectJAXB instanceof ServiceStop) {
            }
        } catch (JAXBException ex) {
            LOGGER.error("AMQP Message unmarshalling error: " + ex.getMessage());
        }
    }
}

我还实现了一个“生产者”,当我运行应用程序的队列和绑定不会自动创建,我必须从RabbitMQ web管理器手动创建这些。

以下是一些日志:

代码语言:javascript
复制
DEBUG [AMQP Connection 192.168.0.11:5672] org.springframework.amqp.rabbit.connection.CachingConnectionFactory | Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'system' in vhost 'ems', class-id=50, method-id=10)

DEBUG [listenerSnmpQueque-1] org.springframework.amqp.rabbit.connection.CachingConnectionFactory | Detected closed channel on exception.  Re-initializing: AMQChannel(amqp://admin@192.168.0.11:5672/ems,3)

WARN [listenerSnmpQueque-1] org.springframework.amqp.rabbit.listener.BlockingQueueConsumer | Failed to declare queue: system

WARN [listenerSnmpQueque-1] org.springframework.amqp.rabbit.listener.BlockingQueueConsumer | Queue declaration failed; retries left=1
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[system]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:711) ~[spring-rabbit-2.0.1.RELEASE.jar:2.0.1.RELEASE]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:587) ~[spring-rabbit-2.0.1.RELEASE.jar:2.0.1.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:964) [spring-rabbit-2.0.1.

我在另一个服务的Spring Boot项目中使用了相同的代码,并且工作正常,queque和binding是自动创建的。

你能帮我吗?

EN

回答 1

Stack Overflow用户

发布于 2018-02-20 23:47:45

您必须添加一个boot @Bean来自动处理声明,它会检测应用程序上下文中的beans -当自动配置RabbitMQ时,RabbitAdmin会自动执行此操作。请参阅Spring AMQP documentation

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/48887805

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档