首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Rabbitmq -新消费者没有收到任何东西

Rabbitmq -新消费者没有收到任何东西
EN

Stack Overflow用户
提问于 2015-12-29 12:20:44
回答 2查看 9K关注 0票数 2

我使用rabbitMQ将任务发送给运行时创建的工作人员(消费者)。当前,每次创建新任务时,都会创建一个新的工作人员。问题是这样的:-A用户创建一个任务

创建-A工作者,然后将任务发送到队列中,供工作人员处理。

-The工作人员开始处理队列(该工作人员基本上睡了一段时间)

-Another用户创建一个任务

创建-New工作人员并在队列上发送任务。

-The新工作人员不处理新任务,同时什么也不做,第一个工作人员完成第一个任务后,新任务将由他处理。

我已经检查了rabbitmq的管理部分,并且有两个被绑定到队列的消费者,但是其中一个似乎完成了所有的工作,而另一个只在等待。

以下是工作人员的代码:公共类工作人员扩展Thread {

代码语言:javascript
复制
private final static String QUEUE_NAME = "Tasks";
private final static String QUEUE_COMPL = "Completed";
public static int id = 0;
private static final String EXCHANGE_NAME = "logs";
public int compteur;
String identifier;

public Worker() {
    Worker.id++;
    compteur = id;
}
public void run() {
    try {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.basicQos(1);
        final Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("WORKER " + compteur + " : Message received :" + message);

                String taskName = message.split(" ")[0];
                String length = message.split(" ")[1];

                try {
                    System.out.println("WORKER " + compteur + " : Commencing job :" + message);
                    doWork(length);
                    System.out.println("WORKER " + compteur + " : Job's finished :" + message);
                    taskName += " done by " + compteur;
                   // confirm(taskName);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                } finally {
                    System.out.println("WORKER " + compteur + " : Waiting for a new task...");
                }

            }
        };

        channel.basicConsume(QUEUE_NAME, true, consumer);
    } catch (IOException ex) {
            Logger.getLogger(Worker.class.getName()).log(Level.SEVERE, null, ex);
    } catch (TimeoutException ex) {
            Logger.getLogger(Worker.class.getName()).log(Level.SEVERE, null, ex);
    }
}

private static void doWork(String taskLength) throws InterruptedException {
    int temps = Integer.parseInt(taskLength);
    Thread.sleep(temps);
}
}

以及将消息放入队列的部分的代码: public class serveurSD {

代码语言:javascript
复制
private final static String QUEUE_NAME = "Tasks";
private  Manager MANAGER = new Manager();

@WebMethod(operationName = "processTask")
public String processTask(@WebParam(name = "message") String txt, @WebParam(name = "duree") int duree) throws IOException, TimeoutException {
    if (MANAGER == null){
        MANAGER= new Manager();
        MANAGER.listen();
    }
    System.out.println("SERVER : Message received : " + txt + " " + duree);
    MANAGER.giveTask();
    ConnectionFactory factory = new ConnectionFactory();
    String message = txt + " " + duree;
    System.out.println("SERVER : Sending message to workers : " + message);
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);

    channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
    channel.close();
    connection.close();       
    return "Your task is being processed";
}
}

(Manager是创建工作人员的类。)

如果已经有人问过类似的问题,我很抱歉,但我找不到。(谢谢任何可能的帮助:)

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2015-12-31 04:05:23

basicConsume方法的第二个参数是“自动确认”。将此参数设置为true意味着一旦收到消息,使用者将告诉RabbitMQ消息已被确认。

当使用者被设置为autoAck true时,它很可能会立即从队列接收下一条可用消息,即使basicQos设置为1。这是发生的,因为消费者会立即减少1限制,也就是说它不再有任何消息,它可以接受下一条消息。

将auto参数更改为false可以防止这个问题,当它与基本的QoS设置1相结合时,因为它迫使您的消费者说:“嗨,我收到了一条消息,我正在处理。在我做完之前不要给我发任何其他东西。”

这让另一位消费者说:“嘿,我有个空位,请给我发个口信。”

票数 4
EN

Stack Overflow用户

发布于 2015-12-29 12:37:53

好吧,我好像找到了办法让它起作用,问题是我不知道它为什么会起作用,所以如果有人知道,我很乐意得到一个解释。

我将channel.basicConsume(QUEUE_NAME, true, consumer);的第二个参数改为false,但我不确定它为什么会解决我的问题。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/34510963

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档