首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >我正确地使用了Apache聚合器吗?

我正确地使用了Apache聚合器吗?
EN

Stack Overflow用户
提问于 2013-12-13 18:34:25
回答 1查看 1.8K关注 0票数 1

我对路由( Apache语句)的理解是,它代表了从一个端点到另一个端点的数据流,并且在对数据执行EIP类型操作的过程中,它会停在不同的处理器上。

如果这是对一个路由的正确/公平的评估,那么我正在建模一个问题,我认为这个问题需要在同一个CamelContext中使用几个路由(我使用的是一个Spring):

  1. 路由1:从Source-1提取数据,对其进行处理,将其转换为List<SomePOJO>,然后将其发送给聚合器
  2. 路由2:从源-2中提取数据,对其进行处理,并将其转换为List<SomePOJO>,然后将其发送给聚合器
  3. 路由3:包含一个聚合器,直到它从List<SomePOJO>接收到来自路由1和路由2,然后继续处理聚合列表

事情是这样的:两个List<SomePOJO>**s都需要同时到达聚合器**,或者更确切地说,聚合器bean必须等到从两个路由接收到数据后才能将两个列表聚合到一个List<SomePOJO>中,然后将聚合列表发送到路由3的其余部分。

到目前为止,我有以下伪编码的<camelContext>

代码语言:javascript
复制
<camelContext id="my-routes" xmlns="http://camel.apache.org/schema/spring">
    <!-- Route 1 -->
    <route id="route-1">
        <from uri="time://runOnce?repeatCount=1&amp;delay=10" />

        <!-- Extracts data from Source 1, processes it, and then produces a List<SomePOJO>. -->
        <to uri="bean:extractor1?method=process" />

        <!-- Send to aggregator. -->
        <to uri="direct:aggregator" />
    </route>

    <!-- Route 2 -->
    <route id="route-2">
        <from uri="time://runOnce?repeatCount=1&amp;delay=10" />

        <!-- Extracts data from Source 2, processes it, and then produces a List<SomePOJO>. -->
        <to uri="bean:extractor2?method=process" />

        <!-- Send to aggregator. -->
        <to uri="direct:aggregator" />
    </route>

    <!-- Route 3 -->
    <route id="route-3">
        <from uri="direct:aggregator" />

        <aggregate strategyRef="listAggregatorStrategy">
            <correlationExpression>
                <!-- Haven't figured this part out yet. -->
            </correlationExpression>
            <to uri="bean:lastProcessor?method=process" />
        </aggregate>
    </route>
</camelContext>

<bean id="listAggregatorStrategy" class="com.myapp.ListAggregatorStrategy" />

然后在Java中:

代码语言:javascript
复制
public class ListAggregatorStrategy implements AggregatoryStrategy {
    public Exchange aggregate(Exchange exchange) {
        List<SomePOJO> route1POJOs = extractRoute1POJOs(exchange);
        List<SomePOJO> route2POJOs = extractRoute2POJOs(exchange);

        List<SomePOJO> aggregateList = new ArrayList<SomePOJO>(route1POJOs);
        aggregateList.addAll(route2POJOs);

        return aggregateList;
    }
}

我的问题

  1. 我的基本设置正确吗?换句话说,我是否正确地使用direct:aggregator端点将数据从route-1route-2发送到route-3的聚合器中?
  2. 我的聚合器会像我期望的那样工作吗?假设extractor1 bean在route-1中只需5秒即可运行,而route-2中的extractor2 bean只需2分钟即可运行。在t=5,聚合器应该接收来自extractor1的数据,并开始等待(2分钟),直到extractor2完成,并将其余的数据提供给它进行聚合。是?
EN

回答 1

Stack Overflow用户

发布于 2013-12-13 19:54:51

听起来你是在正确的轨道上,集料器页面有很多关于这方面的好信息。

completionSize是匹配每个路由的<correlationExpression>的关键,并且<correlationExpression>可以指定等待的数量。在您的示例中,似乎每个路由只设计为运行一次,在这种情况下,表达式可能使用来自每个Exchange的固定头值,否则您将需要为每个路由设置一个计数器类。

下面是对您的示例的更新:

代码语言:javascript
复制
<route id="route-1">
    <from uri="time://runOnce?repeatCount=1&amp;delay=10" />
    <to uri="bean:extractor1?method=process" />
    <setHeader headerName="id">
        <constant>myHeaderValue</constant>
    </setHeader>
    <to uri="direct:aggregator" />
</route>

<route id="route-2">
    <from uri="time://runOnce?repeatCount=1&amp;delay=10" />
    <to uri="bean:extractor2?method=process" />
    <setHeader headerName="id">
        <constant>myHeaderValue</constant>
    </setHeader>
    <to uri="direct:aggregator" />
</route>

<route id="route-3">
    <from uri="direct:aggregator" />

    <aggregate strategyRef="listAggregatorStrategy" completionSize="2">
        <correlationExpression>
            <simple>header.id</simple>
        </correlationExpression>
        <to uri="bean:lastProcessor?method=process" />
    </aggregate>
</route>
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/20573588

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档