在我的项目中,我添加了两个不同的JmsListener,但是当我在ActiveMQ面板中运行项目时,只有其中一个主题有使用者!
那么,我应该为每个jmsListenerContainerFactory添加单独的JmsListener配置吗?
@JmsListener(destination = "foo1")
public void foo1(final Message jsonMessage) throws JMSException {
...
}
@JmsListener(destination = "foo2")
public void foo12(final Message jsonMessage) throws JMSException {
...
}编辑:--这是来自JMS文件的:
@Configuration
@EnableJms
public class FooJmsConfig {
@Bean
public ActiveMQConnectionFactory connectionFactory() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL(BROKER_URL);
connectionFactory.setPassword(BROKER_USERNAME);
connectionFactory.setUserName(BROKER_PASSWORD);
connectionFactory.setUseCompression(true);
connectionFactory.setClientID("FPP_API");
connectionFactory.setConnectionIDPrefix("DRR");
connectionFactory.setUseAsyncSend(true);
return connectionFactory;
}
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setConcurrency("1-1");
factory.setPubSubDomain(true);
factory.setSubscriptionDurable(true);
return factory;
}
}发布于 2018-02-12 08:16:08
我发现通过移动setClientID() 从ActiveMQConnectionFactory提供程序方法到DefaultJmsListenerContainerFactory提供者方法,
对于每个ActiveMQConnectionFactory,我只能有一个全局DefaultJmsListenerContainerFactory提供程序方法和多个jmsListener提供程序方法:
因此,最终的工作代码是:
JMSConfig文件:
@Configuration
@EnableJms
public class FooJmsConfig {
@Bean
public ActiveMQConnectionFactory connectionFactory() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL(BROKER_URL);
connectionFactory.setPassword(BROKER_USERNAME);
connectionFactory.setUserName(BROKER_PASSWORD);
connectionFactory.setUseCompression(true);
connectionFactory.setConnectionIDPrefix("DRR");
connectionFactory.setUseAsyncSend(true);
return connectionFactory;
}
@Bean(name= "foo1")
public DefaultJmsListenerContainerFactory foo1() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setConcurrency("1-1");
factory.setPubSubDomain(true);
factory.setSubscriptionDurable(true);
connectionFactory.setClientID("FOO_1");
return factory;
}
@Bean(name= "foo2")
public DefaultJmsListenerContainerFactory foo2() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setConcurrency("1-1");
factory.setPubSubDomain(true);
factory.setSubscriptionDurable(true);
connectionFactory.setClientID("FOO_1");
return factory;
}
}而jms的侦听器将是
@JmsListener(destination = "foo1", containerFactory="foo1")
public void foo1(final Message jsonMessage) throws JMSException {
...
}
@JmsListener(destination = "foo2", containerFactory="foo2")
public void foo12(final Message jsonMessage) throws JMSException {
...
}发布于 2018-02-09 10:55:14
我认为问题在于您已经在1上为两个侦听器使用的JmsListenerContainerFactory设置了并发性。
如果需要,您可以创建多个JmsListenerContainerFactory,并通过指定JmsListener#containerFactory属性为JmsListener设置它们。
发布于 2022-05-13 10:36:15
可以建立多个JMSFactory连接。默认JMS侦听器只接受单例连接工厂对象。因此,您需要设置jmslistenercontainerfactory本身的两个bean。
连接到单个主题的代码如下所示:
@Configuration
@EnableJms
@EnableTransactionManagement
public class JMSConnectionConfig{
private static final String AMQP_URI_FORMAT = "amqps://%s?amqp.idleTimeout=%d";
private int idleTimeout;
private String hostURL;
@Bean(name = "cachingConnectionFactory1")
@Primary
public ConnectionFactory myConnectionFactory1() {
// set up connection details to the topic
String remoteUri = String.format(AMQP_URI_FORMAT, hostURL, idleTimeout);
JmsConnectionFactory jmsConnectionFactory = new JmsConnectionFactory();
jmsConnectionFactory.setRemoteURI(remoteUri);
jmsConnectionFactory.setClientID(clientId);
jmsConnectionFactory.setUsername(topic1SASName);
jmsConnectionFactory.setPassword(topic1SASKey);
// create caching factory object and return as connectionfactory parent object
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
cachingConnectionFactory.setTargetConnectionFactory(jmsConnectionFactory);
cachingConnectionFactory.setReconnectOnException(true);
return cachingConnectionFactory;
}
@Bean(name = "factory1")
public JmsListenerContainerFactory<?> factory1(@Qualifier("cachingConnectionFactory1") ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory topicFactory = new DefaultJmsListenerContainerFactory();
topicFactory.setConnectionFactory(connectionFactory);
topicFactory.setSubscriptionDurable(Boolean.TRUE);
// configure DefaultJmsListenerContainerFactoryConfigurer with caching factory and listener factory
configurer.configure(topicFactory, connectionFactory);
return topicFactory;
}
// Goes here for another topic in the same way as above two methods to establish initial connection to the topic
}JMS侦听器消息接收类的代码如下所示,您可以提供目标作为您的主题名称,对于工厂,您可以提供我们前面设置的conenction工厂的bean:
@JmsListener(destination = "${topic.name}", containerFactory = "factory1",
subscription = "${topic.subscription.name}")
public void receiveMessage(JmsTextMessage jmsTextMessage) throws JMSException, IOException, InterruptedException {
// listener code goes here
}https://stackoverflow.com/questions/48600321
复制相似问题