我现在已经在骆驼上做了一些小项目,但有一件事我很难理解,那就是当我在骆驼路线上消费时,如何处理大数据(不适合记忆)。
我有一个数据库,包含几个GBs值的数据,我想使用camel路由。显然,将所有数据读入内存并不是一种选择。
如果我是作为一个独立的应用程序这样做的话,我就会有通过数据分页并将块发送到我的JMS端点的代码。我想用骆驼,因为它提供了一个很好的图案。如果我使用的是一个文件,我可以使用streaming()调用。
另外,我应该使用camel-sql/camel-jdbc/camel-jpa还是使用bean从我的数据库中读取。
希望大家都和我在一起。我更熟悉Java,但希望人们能提供任何帮助/建议。
更新:2012年5月2日-2012年5月2日
所以我有一些时间玩这个,我认为我实际上是在滥用制片人的概念,这样我就可以在一条路线上使用它。
public class MyCustomRouteBuilder extends RouteBuilder {
public void configure(){
from("timer:foo?period=60s").to("mycustomcomponent:TEST");
from("direct:msg").process(new Processor() {
public void process(Exchange ex) throws Exception{
System.out.println("Receiving value" : + ex.getIn().getBody() );
}
}
}
}我的制片人看上去如下所示。为了清楚起见,我没有包含CustomEndpoint或CustomComponent,因为它看起来只是一个薄包装。
public class MyCustomProducer extends DefaultProducer{
Endpoint e;
CamelContext c;
public MyCustomProducer(Endpoint epoint){
super(endpoint)
this.e = epoint;
this.c = e.getCamelContext();
}
public void process(Exchange ex) throws Exceptions{
Endpoint directEndpoint = c.getEndpoint("direct:msg");
ProducerTemplate t = new DefaultProducerTemplate(c);
// Simulate streaming operation / chunking of BIG data.
for (int i=0; i <20 ; i++){
t.start();
String s ="Value " + i ;
t.sendBody(directEndpoint, value)
t.stop();
}
}
} 首先,上面的东西看起来不太干净。执行这一任务的最干净的方法似乎是通过调度的石英作业填充一个jms队列(而不是直接:msg),然后我的骆驼路线会消耗这个任务,这样我就可以比在我的骆驼管道中接收到的消息更灵活了。但是,我非常喜欢将基于时间的活动设置为路由的一部分的语义。
有没有人对做这件事的最佳方式有任何想法。
发布于 2012-04-24 12:21:23
据我理解,你所需要做的就是:
from("jpa:SomeEntity" +
"?consumer.query=select e from SomeEntity e where e.processed = false" +
"&maximumResults=150" +
"&consumeDelete=false")
.to("jms:queue:entities");maximumResults定义了每个查询得到多少个实体的限制。
当您完成实体实例的处理时,需要设置e.processed = true;和persist(),这样实体就不会再被处理了。
这样做的一种方法是使用@Consumed注释:
class SomeEntity {
@Consumed
public void markAsProcessed() {
setProcessed(true);
}
}另一件事是,在将实体发送到队列之前,您需要注意如何序列化该实体。您可能需要在from和to之间使用丰富她的。
https://stackoverflow.com/questions/10278621
复制相似问题