首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Apache :从数据库路由到JMS端点的GBs

Apache :从数据库路由到JMS端点的GBs
EN

Stack Overflow用户
提问于 2012-04-23 10:14:40
回答 1查看 2.7K关注 0票数 1

我现在已经在骆驼上做了一些小项目,但有一件事我很难理解,那就是当我在骆驼路线上消费时,如何处理大数据(不适合记忆)。

我有一个数据库,包含几个GBs值的数据,我想使用camel路由。显然,将所有数据读入内存并不是一种选择。

如果我是作为一个独立的应用程序这样做的话,我就会有通过数据分页并将块发送到我的JMS端点的代码。我想用骆驼,因为它提供了一个很好的图案。如果我使用的是一个文件,我可以使用streaming()调用。

另外,我应该使用camel-sql/camel-jdbc/camel-jpa还是使用bean从我的数据库中读取。

希望大家都和我在一起。我更熟悉Java,但希望人们能提供任何帮助/建议。

更新:2012年5月2日-2012年5月2日

所以我有一些时间玩这个,我认为我实际上是在滥用制片人的概念,这样我就可以在一条路线上使用它。

代码语言:javascript
复制
public class MyCustomRouteBuilder extends RouteBuilder {

    public void configure(){
         from("timer:foo?period=60s").to("mycustomcomponent:TEST");

         from("direct:msg").process(new Processor() {
               public void process(Exchange ex) throws Exception{
                   System.out.println("Receiving value" : + ex.getIn().getBody() );
               }
         }
    }

}

我的制片人看上去如下所示。为了清楚起见,我没有包含CustomEndpoint或CustomComponent,因为它看起来只是一个薄包装。

代码语言:javascript
复制
public class MyCustomProducer extends DefaultProducer{ 

    Endpoint e;
    CamelContext c;

    public MyCustomProducer(Endpoint epoint){
          super(endpoint)   
          this.e = epoint;
          this.c = e.getCamelContext();
    }

    public void process(Exchange ex) throws Exceptions{

        Endpoint directEndpoint = c.getEndpoint("direct:msg");
        ProducerTemplate t = new DefaultProducerTemplate(c);

        // Simulate streaming operation / chunking of BIG data.
        for (int i=0; i <20 ; i++){
           t.start();
           String s ="Value " + i ;                  
           t.sendBody(directEndpoint, value)
           t.stop();         
        }
    }
} 

首先,上面的东西看起来不太干净。执行这一任务的最干净的方法似乎是通过调度的石英作业填充一个jms队列(而不是直接:msg),然后我的骆驼路线会消耗这个任务,这样我就可以比在我的骆驼管道中接收到的消息更灵活了。但是,我非常喜欢将基于时间的活动设置为路由的一部分的语义。

有没有人对做这件事的最佳方式有任何想法。

EN

回答 1

Stack Overflow用户

发布于 2012-04-24 12:21:23

据我理解,你所需要做的就是:

代码语言:javascript
复制
from("jpa:SomeEntity" + 
    "?consumer.query=select e from SomeEntity e where e.processed = false" +
    "&maximumResults=150" +
    "&consumeDelete=false")
.to("jms:queue:entities");

maximumResults定义了每个查询得到多少个实体的限制。

当您完成实体实例的处理时,需要设置e.processed = true;persist(),这样实体就不会再被处理了。

这样做的一种方法是使用@Consumed注释:

代码语言:javascript
复制
class SomeEntity {
    @Consumed
    public void markAsProcessed() {
        setProcessed(true);
    }
}

另一件事是,在将实体发送到队列之前,您需要注意如何序列化该实体。您可能需要在from和to之间使用丰富她的

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/10278621

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档