我对路由( Apache语句)的理解是,它代表了从一个端点到另一个端点的数据流,并且在对数据执行EIP类型操作的过程中,它会停在不同的处理器上。
如果这是对一个路由的正确/公平的评估,那么我正在建模一个问题,我认为这个问题需要在同一个CamelContext中使用几个路由(我使用的是一个Spring):
List<SomePOJO>,然后将其发送给聚合器List<SomePOJO>,然后将其发送给聚合器List<SomePOJO>接收到来自的路由1和路由2,然后继续处理聚合列表事情是这样的:两个List<SomePOJO>**s都需要同时到达聚合器**,或者更确切地说,聚合器bean必须等到从两个路由接收到数据后才能将两个列表聚合到一个List<SomePOJO>中,然后将聚合列表发送到路由3的其余部分。
到目前为止,我有以下伪编码的<camelContext>
<camelContext id="my-routes" xmlns="http://camel.apache.org/schema/spring">
<!-- Route 1 -->
<route id="route-1">
<from uri="time://runOnce?repeatCount=1&delay=10" />
<!-- Extracts data from Source 1, processes it, and then produces a List<SomePOJO>. -->
<to uri="bean:extractor1?method=process" />
<!-- Send to aggregator. -->
<to uri="direct:aggregator" />
</route>
<!-- Route 2 -->
<route id="route-2">
<from uri="time://runOnce?repeatCount=1&delay=10" />
<!-- Extracts data from Source 2, processes it, and then produces a List<SomePOJO>. -->
<to uri="bean:extractor2?method=process" />
<!-- Send to aggregator. -->
<to uri="direct:aggregator" />
</route>
<!-- Route 3 -->
<route id="route-3">
<from uri="direct:aggregator" />
<aggregate strategyRef="listAggregatorStrategy">
<correlationExpression>
<!-- Haven't figured this part out yet. -->
</correlationExpression>
<to uri="bean:lastProcessor?method=process" />
</aggregate>
</route>
</camelContext>
<bean id="listAggregatorStrategy" class="com.myapp.ListAggregatorStrategy" />然后在Java中:
public class ListAggregatorStrategy implements AggregatoryStrategy {
public Exchange aggregate(Exchange exchange) {
List<SomePOJO> route1POJOs = extractRoute1POJOs(exchange);
List<SomePOJO> route2POJOs = extractRoute2POJOs(exchange);
List<SomePOJO> aggregateList = new ArrayList<SomePOJO>(route1POJOs);
aggregateList.addAll(route2POJOs);
return aggregateList;
}
}我的问题
direct:aggregator端点将数据从route-1和route-2发送到route-3的聚合器中?extractor1 bean在route-1中只需5秒即可运行,而route-2中的extractor2 bean只需2分钟即可运行。在t=5,聚合器应该接收来自extractor1的数据,并开始等待(2分钟),直到extractor2完成,并将其余的数据提供给它进行聚合。是?发布于 2013-12-13 19:54:51
听起来你是在正确的轨道上,集料器页面有很多关于这方面的好信息。
completionSize是匹配每个路由的<correlationExpression>的关键,并且<correlationExpression>可以指定等待的数量。在您的示例中,似乎每个路由只设计为运行一次,在这种情况下,表达式可能使用来自每个Exchange的固定头值,否则您将需要为每个路由设置一个计数器类。
下面是对您的示例的更新:
<route id="route-1">
<from uri="time://runOnce?repeatCount=1&delay=10" />
<to uri="bean:extractor1?method=process" />
<setHeader headerName="id">
<constant>myHeaderValue</constant>
</setHeader>
<to uri="direct:aggregator" />
</route>
<route id="route-2">
<from uri="time://runOnce?repeatCount=1&delay=10" />
<to uri="bean:extractor2?method=process" />
<setHeader headerName="id">
<constant>myHeaderValue</constant>
</setHeader>
<to uri="direct:aggregator" />
</route>
<route id="route-3">
<from uri="direct:aggregator" />
<aggregate strategyRef="listAggregatorStrategy" completionSize="2">
<correlationExpression>
<simple>header.id</simple>
</correlationExpression>
<to uri="bean:lastProcessor?method=process" />
</aggregate>
</route>https://stackoverflow.com/questions/20573588
复制相似问题