我有一个代码作为我的订阅者。我已经创建了持久订阅者。因此,由于这个原因,我得到了异常,因为
Exception in thread "main" javax.jms.JMSException: Error registering consumer: org.wso2.andes.AMQTimeoutException: Server did not respond in a timely fashion [error code 408: Request Timeout]
at org.wso2.andes.client.AMQSession$4.execute(AMQSession.java:2054)
at org.wso2.andes.client.AMQSession$4.execute(AMQSession.java:1997)
at org.wso2.andes.client.AMQConnectionDelegate_8_0.executeRetrySupport(AMQConnectionDelegate_8_0.java:305)
at org.wso2.andes.client.AMQConnection.executeRetrySupport(AMQConnection.java:621)
at org.wso2.andes.client.failover.FailoverRetrySupport.execute(FailoverRetrySupport.java:102)
at org.wso2.andes.client.AMQSession.createConsumerImpl(AMQSession.java:1995)
at org.wso2.andes.client.AMQSession.createConsumer(AMQSession.java:993)
at org.wso2.andes.client.AMQSession.createDurableSubscriber(AMQSession.java:1142)
at org.wso2.andes.client.AMQSession.createDurableSubscriber(AMQSession.java:1042)
at org.wso2.andes.client.AMQTopicSessionAdaptor.createDurableSubscriber(AMQTopicSessionAdaptor.java:73)
at xml.parser.Parser.subscribe(Parser.java:62)
at xml.parser.Parser.main(Parser.java:34)但是当我创建普通订阅者时,我的代码运行良好,没有错误,而不是持久。为什么我会得到这个错误?还有一个问题--我如何才能取消订阅该主题?
我的订阅者代码是:
package xml.parser;
import org.w3c.dom.*;
import javax.xml.xpath.*;
import javax.xml.namespace.NamespaceContext;
import javax.xml.parsers.*;
import java.io.IOException;
import org.xml.sax.SAXException;
import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Properties;
public class Parser {
public static final String QPID_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
private static final String CF_NAME_PREFIX = "connectionfactory.";
private static final String CF_NAME = "qpidConnectionfactory";
String userName = "admin";
String password = "admin";
private static String CARBON_CLIENT_ID = "carbon";
private static String CARBON_VIRTUAL_HOST_NAME = "carbon";
private static String CARBON_DEFAULT_HOSTNAME = "localhost";
private static String CARBON_BROKER_PORT = "5673";
String topicName = "myTopic";
public static void main(String[] args) throws NamingException,
JMSException, XPathExpressionException,
ParserConfigurationException, SAXException, IOException {
Parser queueReceiver = new Parser();
String message = queueReceiver.subscribe();
System.out.println("Got message from Queue ==> " + message);
}
public String subscribe() throws NamingException, JMSException {
String messageContent = "";
Properties properties = new Properties();
properties.put(Context.INITIAL_CONTEXT_FACTORY, QPID_ICF);
properties.put(CF_NAME_PREFIX + CF_NAME,
getTCPConnectionURL(userName, password));
properties.put("topic." + topicName, topicName);
System.out.println("getTCPConnectionURL(userName,password) = "
+ getTCPConnectionURL(userName, password));
InitialContext ctx = new InitialContext(properties);
// Lookup connection factory
TopicConnectionFactory connFactory = (TopicConnectionFactory) ctx
.lookup(CF_NAME);
TopicConnection topicConnection = connFactory.createTopicConnection();
topicConnection.start();
TopicSession topicSession = topicConnection.createTopicSession(false,
QueueSession.AUTO_ACKNOWLEDGE);
// Send message
// Topic topic = topicSession.createTopic(topicName);
Topic topic = (Topic) ctx.lookup(topicName);
javax.jms.TopicSubscriber topicSubscriber = topicSession
.createDurableSubscriber(topic,"topicQueue");
Message message = topicSubscriber.receive();
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
System.out.println("textMessage.getText() = "
+ textMessage.getText());
messageContent = textMessage.getText();
}
topicSubscriber.close();
topicSession.close();
topicConnection.stop();
topicConnection.close();
return messageContent;
}
public String getTCPConnectionURL(String username, String password) {
return new StringBuffer().append("amqp://").append(username)
.append(":").append(password).append("@")
.append(CARBON_CLIENT_ID).append("/")
.append(CARBON_VIRTUAL_HOST_NAME).append("?brokerlist='tcp://")
.append(CARBON_DEFAULT_HOSTNAME).append(":")
.append(CARBON_BROKER_PORT).append("'").toString();
}
}发布于 2013-04-22 16:28:36
这是MB 2.0.1发行版中持久订阅者的一个问题。这样做的原因是当解析器类第一次运行时,收到一条消息,订阅者被停止,然后当你第二次启动解析器时,它无法启动订阅,因为之前的‘订阅者’条目仍然在那里,你会在terminal.The客户端中看到以下内容将在几次尝试后超时,这就是为什么你会得到错误日志。
[2013-04-22 12:12:52,617] INFO {org.wso2.andes.server.protocol.AMQProtocolEngine} - Closing channel due to: Cannot subscribe to queue carbon:topicQueue as it already has an existing exclusive consumer
[2013-04-22 12:12:52,621] INFO {org.wso2.andes.server.protocol.AMQProtocolEngine} - Channel[1] awaiting closure - processing close-ok
[2013-04-22 12:12:52,621] INFO {org.wso2.andes.server.handler.ChannelCloseOkHandler} - Received channel-close-ok for channel-id 1这个问题已经在MB 2.1.0版本中修复,预计将在未来几周内发布。如果需要,请使用here的MB 2.1.0 - Alpha版本试用您的示例订阅者。这个应该可以和那个包很好地工作。
关于取消订阅主题,将以下代码行添加到解析器代码中,并在需要取消订阅时返回。
topicSubscriber.close();
**topicSession.unsubscribe("topicQueue"); // add the name used to identify the subscription in the place of "topicQueue"**
topicSession.close();
topicConnection.stop();
topicConnection.close();https://stackoverflow.com/questions/16107685
复制相似问题