首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >ActiveMQ实现异步确认JAVA 8

ActiveMQ实现异步确认JAVA 8
EN

Stack Overflow用户
提问于 2017-05-23 09:18:21
回答 1查看 1K关注 0票数 0

背景:I有一个标准的生产者消费者队列,消费者是慢的,而生产者是快速的。期望是每当生产者完成所请求的消息时,它就会确认消息,并且生产者将承担与消息相关的任务完成。由于生产者是快速的,我不希望线程产生等待,相反,每当消息被确认时,就应该调用回调。由于JMS在这方面是有限的,而且我已经直接使用了ActiveMQ类,比如ActiveMQMessageProducer --尽可能多地使用。

问题:消息得到了自动确认,注册的异步回调正在被调用,即使消费者还没有启动。public void send(Destination destination, Message message, AsyncCallback onComplete)

生产者

代码语言:javascript
复制
public static boolean setup() {     
        Producer.connectionFactory = new 
        ActiveMQConnectionFactory("tcp://127.0.0.1:61616"); 
        // Create a Connection
        Producer.connection = 
            (ActiveMQConnection)connectionFactory.createConnection();
        connection.setAlwaysSessionAsync(true);
        connection.start();         
    }   

public Producer() {
        session = (ActiveMQSession)connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
        destination = (ActiveMQDestination)session.createQueue("TEST.FOO");
        producer = (ActiveMQMessageProducer)session.createProducer(destination);
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        }
...

public void run() {
        long id  = messageID.getAndIncrement();         
        String text = "Hello world!"
        Message message = session.createTextMessage(text);
        producer.send(message, new MessageCompletion(id, this.messageRundown));
    }

Consumer

代码语言:javascript
复制
public static boolean setup() {     
    Consumer.connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");    
    Consumer.connection = (ActiveMQConnection)connectionFactory.createConnection();
    connection.setAlwaysSessionAsync(true);         
    return true;
}

public  Consumer() {
    session = (ActiveMQSession)connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
    destination = (ActiveMQDestination)session.createQueue("TEST.FOO");
    consumer = (ActiveMQMessageConsumer)session.createConsumer(destination);
    consumer.setMessageListener(this);
    connection.start();
}

// implements MessageListener
@Override
public void onMessage(Message message) {    
    messageQueue.add(message);
}
public void run() {
    while(true) {
        Message message = messageQueue.poll();
        while(message != null) {
            // do some work             
            message.acknowledge();
            message = messageQueue.poll();              
        }
        Thread.sleep(10000);            
    }
}

虽然不需要消费者(我已经添加了它作为参考),但为了确保简洁性,已经删除了一些内容,这是工作代码的一部分。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2017-05-23 13:01:02

你对确认工作方式的理解是错误的。发送方的异步回调只告诉您代理已收到消息。如果是持久发送,回调将指示消息也被写入磁盘。

JMS或大多数其他消息代理中没有生产者和使用者之间的耦合。生产者将消息放置在队列中,然后使用者可以在任何时候出现并从该队列中消费。两者之间没有耦合,生产者不能等待消费者才能继续生成下一条消息。

如果您想知道什么时候处理特定的消息,这样就可以控制工作,那么就需要查看JMS请求/响应样式的消息传递模式。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/44130672

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档