首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Apache Camel concurrentConsumers vs线程

Apache Camel concurrentConsumers vs线程
EN

Stack Overflow用户
提问于 2013-11-15 02:45:46
回答 2查看 11.1K关注 0票数 7

我花了将近两天的时间来理解Apache Camel中concurrentConsumers与线程的概念。但我真的不明白这个概念。有人能帮我理解这个概念吗?我使用的是camel 2.12.0。

代码语言:javascript
复制
   from("jms:queue:start?concurrentConsumers=5")
   .threads(3, 3, "replyThread")
   .bean(new SomeBean());

   pom.xml
   ==========================================  
   <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <camel.version>2.12.0</camel.version>
</properties>

<dependencies>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.8.1</version>
        <scope>test</scope>
    </dependency>

    <!-- required by both client and server -->
    <dependency>
        <groupId>org.apache.camel</groupId>
        <artifactId>camel-core</artifactId>
        <version>${camel.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.camel</groupId>
        <artifactId>camel-spring</artifactId>
        <version>${camel.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.camel</groupId>
        <artifactId>camel-jms</artifactId>
        <version>${camel.version}</version>
    </dependency>

    <dependency>
        <groupId>org.apache.camel</groupId>
        <artifactId>camel-test</artifactId>
        <version>${camel.version}</version>
    </dependency>

    <dependency>
        <groupId>org.apache.camel</groupId>
        <artifactId>camel-activemq</artifactId>
        <version>1.1.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.camel</groupId>
        <artifactId>camel-test-spring</artifactId>
        <version>2.12.1</version>
    </dependency>
</dependencies>


   //posting to queue
   for (int i = 0; i < 10; i++) {
      System.out.println(Thread.currentThread() + " sending request..." + i);
      template.asyncCallbackRequestBody("jms:queue:start", (body + " " + i), callback);
   } 

   //my call back
   public class MyCallback extends SynchronizationAdapter {

   @Override
   public void onComplete(Exchange exchange) {
      String body = exchange.getOut().getBody(String.class);
      System.out.println(Thread.currentThread() + " Callback Resposne..." +body);
 }
}

  public static class SomeBean {
    public void someMethod(Exchange body) {
        System.out.println(Thread.currentThread() + " Received: " + body.getIn().getBody());
        body.getOut().setBody(body.getIn().getBody());
    }
}

我的理解(来自apache camel文档)是在jms上有5个相互竞争的消费者:启动队列来消费消息。以及3个用于处理来自jms的异步回复的"replyThreads“:启动队列。但实际上所有的输出都是不同的。

代码语言:javascript
复制
     Thread[main,5,main] sending request...0
     Thread[main,5,main] sending request...1
     Thread[Camel (camel-1) thread #9 - replyThread,5,main] Received: Hello Camel 0
     Thread[Camel (camel-1) thread #10 - replyThread,5,main] Received: Hello Camel 1
     Thread[Camel (camel-1) thread #5 - ProducerTemplate,5,main] Callback  Resposne...Hello Camel 0
     Thread[Camel (camel-1) thread #6 - ProducerTemplate,5,main] Callback Resposne...Hello Camel 1
EN

回答 2

Stack Overflow用户

发布于 2013-11-15 18:44:18

JMS组件具有内置的线程池,它可以根据消息队列的数量进行很好的上下缩放。

所以就用它吧

代码语言:javascript
复制
 from("jms:queue:start?concurrentConsumers=5")
   .bean(new SomeBean());

您还可以指定一个最大值,以便存在一个范围

代码语言:javascript
复制
 from("jms:queue:start?concurrentConsumers=5&maxConcurrentConsumers=10")
   .bean(new SomeBean());
票数 7
EN

Stack Overflow用户

发布于 2013-11-15 16:55:17

如果您想要检查JMS线程池,则需要像这样更改路由

代码语言:javascript
复制
from("jms:queue:start?concurrentConsumers=5")
   .bean(new SomeBean())
   .threads(3, 3, "replyThread")
   .bean(new SomeBean());

SomeBean可以向您展示在camel路由中使用了不同的线程池。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/19985792

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档