我正在遵循来自RabbitMQ的指南:https://www.rabbitmq.com/tutorials/tutorial-two-java.html。我想用一个队列上的多个线程来模拟这个功能。
如果我在启动发送器之前启动接收器,它会按预期工作,如下所示:
[*] Rcvr1 Waiting for messages...
[*] Rcvr2 Waiting for messages...
[x] Rcvr1 Received 'Hello 0'
[x] Rcvr2 Received 'Hello 1'
[x] Rcvr1 Received 'Hello 2'
[x] Rcvr2 Received 'Hello 3'
[x] Rcvr1 Received 'Hello 4'
[x] Rcvr2 Received 'Hello 5'
[x] Rcvr1 Received 'Hello 6'
[x] Rcvr2 Received 'Hello 7'
[x] Rcvr1 Received 'Hello 8'
...但是,首先启动我的接收器会导致只有一个线程接收消息(启动的最后一个线程):
[*] Rcvr2 Waiting for messages...
[*] Rcvr1 Waiting for messages...
[x] Rcvr1 Received 'Hello 9'
[x] Rcvr1 Received 'Hello 10'
[x] Rcvr1 Received 'Hello 11'
[x] Rcvr1 Received 'Hello 12'
[x] Rcvr1 Received 'Hello 13'
[x] Rcvr1 Received 'Hello 14'
[x] Rcvr1 Received 'Hello 15'
...有趣的是,如果我启动发送器,然后启动接收器,如上所述,然后再次启动发送器(而接收器正在处理第一批)。发送的第一个消息是串行处理的,而第二批消息是并行处理的,或者至少与其余线程一起处理。
[*] Rcvr1 Waiting for messages...
[*] Rcvr2 Waiting for messages...
[x] Rcvr1 Received '[Batch 1] Hello 0'
[x] Rcvr1 Received '[Batch 1] Hello 1'
[x] Rcvr1 Received '[Batch 1] Hello 2'
[x] Rcvr1 Received '[Batch 1] Hello 3'
[x] Rcvr1 Received '[Batch 1] Hello 4'
[x] Rcvr1 Received '[Batch 1] Hello 5'
[x] Rcvr1 Received '[Batch 1] Hello 6'
[x] Rcvr1 Received '[Batch 1] Hello 7'
[x] Rcvr1 Received '[Batch 1] Hello 8'
[x] Rcvr2 Received '[Batch 2] Hello 1'
[x] Rcvr1 Received '[Batch 1] Hello 9'
[x] Rcvr2 Received '[Batch 2] Hello 3'
[x] Rcvr1 Received '[Batch 1] Hello 10'
[x] Rcvr2 Received '[Batch 2] Hello 5'
[x] Rcvr1 Received '[Batch 1] Hello 11'
[x] Rcvr2 Received '[Batch 2] Hello 7'
[x] Rcvr1 Received '[Batch 1] Hello 12'
[x] Rcvr2 Received '[Batch 2] Hello 9'
[x] Rcvr1 Received '[Batch 1] Hello 13'
[x] Rcvr2 Received '[Batch 2] Hello 11'对于RabbitMQ,这显然是可能的,我不确定我做错了什么。我的简单代码如下:
发件人
public class Send {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
for(int x=0; x<100; x++) {
String message = "Hello "+x;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
}
}
}
}接收器
package com.mawv.ingest.rabbitmq;
import com.rabbitmq.client.*;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
public class Recv {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ThreadPoolExecutor rcvrPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Runnable rcvr1 = () -> {
try {
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
System.out.println(" [*] Rcvr1 Waiting for messages...");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
Envelope envelope = delivery.getEnvelope();
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Rcvr1 Received '" + message + "'");
long deliveryTag = envelope.getDeliveryTag();
channel.basicAck(deliveryTag, true);
try {
Thread.sleep(1000);
} catch (Exception ex) { }
};
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { });
} catch(Exception ex){
ex.printStackTrace();
}
};
Runnable rcvr2 = () -> {
try {
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
System.out.println(" [*] Rcvr2 Waiting for messages...");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
Envelope envelope = delivery.getEnvelope();
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Rcvr2 Received '" + message + "'");
long deliveryTag = envelope.getDeliveryTag();
channel.basicAck(deliveryTag, true);
try {
Thread.sleep(1000);
} catch (Exception ex) {
}
};
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {
});
} catch(Exception ex){
ex.printStackTrace();
}
};
rcvrPool.execute(rcvr1);
rcvrPool.execute(rcvr2);
}
}我还将这个示例与他们描述的完全相同,并看到了相同的结果。https://self-learning-java-tutorial.blogspot.com/2015/09/rabbitmq-one-producer-and-multiple.html
我假设我的设置有些地方不对劲。
发布于 2018-12-31 03:38:27
根据RabbitMQ接口:
“虽然通道可以由多个线程使用,但确保一次只有一个线程执行命令很重要。并发执行命令可能会导致抛出UnexpectedFrameError。”
首先,我认为你应该对不同的线程使用不同的通道。
最后,我认为第一个线程被终止了,因为它是空闲的,所以只有第二个线程是活动的,并且完成了所有的工作。在这种情况下,一个线程就足够了。
看看Java8的ThreadPoolExecutor应用编程接口:
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ThreadPoolExecutor.html
例如,您可以找到:
“默认情况下,即使是核心线程也只在新任务到达时初始创建和启动,但这可以使用方法prestartCoreThread()或prestartAllCoreThreads()动态覆盖。如果使用非空队列构造池,则可能需要预先启动线程。”
和
如果池当前具有超过corePoolSize的线程数,则超过的线程在空闲时间超过keepAliveTime时将被终止(请参见getKeepAliveTime(TimeUnit))。
即使在空闲时,也应该使用prestartAllCoreThreads()或prestartCoreThreads()来启动核心线程,或者使用getKeepAliveTime(TimeUnit)来使核心线程在空闲时保持活动状态。
发布于 2018-12-31 04:03:06
看起来我错过了一个关键的通道配置。这解决了我的问题:
channel.basicQos(1);
这就是RabbitMQ对此的看法。
公平调度
您可能已经注意到,分派仍然不能完全按照我们的要求工作。例如,在有两个工作者的情况下,当所有的奇数消息都很重,而偶数消息都很轻时,一个工作者将经常忙碌,而另一个工作者几乎不会做任何工作。嗯,RabbitMQ对此一无所知,仍然会均匀地分发消息。
这是因为当消息进入队列时,RabbitMQ只调度消息。它不会为消费者查看未确认消息的数量。它只是盲目地将每第n条消息分派到第n条消费者。
为了克服这一点,我们可以使用prefetchCount =1设置的basicQos方法。这告诉RabbitMQ不要一次向一个worker发送多条消息。或者,换句话说,在worker处理并确认前一条消息之前,不要向worker发送新消息。相反,它会将其分派给下一个不忙的worker。
https://stackoverflow.com/questions/53980620
复制相似问题