首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >缓存流处理大文件

缓存流处理大文件
EN

Stack Overflow用户
提问于 2016-04-11 15:48:14
回答 1查看 1.5K关注 0票数 4

我正在研究一个主题“在Apache Camel中使用缓存以及如何处理大文件”。

其目的是使用camel处理大文件,而无需将文件加载到内存中,因为这是一个超过5 GO的大文件。

我们发现了几个磁道,第一个磁道是使用拆分器组件,以允许我们逐行或逐块读取文件,但是,如果我们使用拆分器,我们无法从头开始再次读取文件,功能需求是即使拆分完成也能够读取文件的某些部分。

因此,我们必须使用缓存系统,将数据块放入缓存中以重用它们。

所以我们认为在拆分器之后使用CachedOutputStream类在磁盘上写入文件的某些部分是强制性的,这个类还提供了在磁盘上加密数据的能力。

示例如下:

代码语言:javascript
复制
<camelContext xmlns="http://camel.apache.org/schema/spring" trace="false" streamCache="true">

    <streamCaching id="myCacheConfig"  spoolDirectory="target/cachedir" spoolThreshold="16"/>

    <route id="SPLIT-FLOW" streamCache="true">
        <from uri="file:src/data/forSplitCaching\SimpleRecord?noop=true"/>
        <split streaming="true">
            <tokenize token="\n"/>
            <to uri="direct:PROCESS-BUSINESS"/>
        </split>
    </route>

    <route id="PROCESS-BUSINESS" streamCache="true">
        <from uri="direct:PROCESS-BUSINESS"/>
        <bean ref="ProcessBusiness" method="dealRecord"/>
        <choice>
            <when>
                <simple>${in.header.CamelSplitComplete} == "true"</simple>
                <to uri="direct:STREAM-CACHING"/>
            </when>
        </choice>
    </route>

    <route id="STREAM-CACHING">
        <from uri="direct:STREAM-CACHING"/>
        <bean ref="ProcessStreamCaching" method="usingStream"/>
        <setHeader headerName="CamelFileName">
            <simple>${header.CamelFileName}.${header.CamelSplitIndex}</simple>
        </setHeader>
        <to uri="file:src/out"/>
    </route>

</camelContext>

方法dealRecord将拆分的每一行放入缓存中:

代码语言:javascript
复制
public void dealRecord(Exchange exchange) throws Exception { 

   String body; 
   File file; 
   String[] files; 
   boolean isSplitComplete; 

   body = (String) exchange.getIn().getBody(); 
   isSplitComplete = (boolean) exchange.getProperties().get("CamelSplitComplete"); 

   CachedOutputStream cos = new CachedOutputStream(exchange, false); 
   cos.write(body.getBytes("UTF-8")); 

   file = new File("target/cachedir"); 
   files = file.list(); 
   for (String nameTmpfile : files) { 
      LOG.info("Genered File [" + nameTmpfile + "]"); 
   } 

   lstCache.add(cos); 

   if(isSplitComplete){ 
      exchange.getIn().setHeader("Cached",lstCache); 
   } 
} 

方法usingStream可以使用报头中存在的每个缓存

代码语言:javascript
复制
public byte[] usingStream(Exchange exchange) throws InputStreamException { 

   final ArrayList<CachedOutputStream> lstcache; 
   byte[] bytesMessage; 
   StringBuilder messageCompleteOut = new StringBuilder(); 
   InputStream is = null; 

   lstcache = (ArrayList<CachedOutputStream>) exchange.getIn().getHeader("Cached"); 
   for (CachedOutputStream oneCache : lstcache) { 
      try { 
         is = oneCache.getWrappedInputStream(); 
         String messageInputstream = toString(is); 
         LOG.info("Message of Cache ["+ messageInputstream +"]"); 
         messageCompleteOut.append(messageInputstream); 
         messageCompleteOut.append(System.lineSeparator()); 
      } catch (IOException e) { 
         LOG.error(InputStreamException.ERROR_MANIPULATING_INPUT_STREAM_CHANNEL); 
         throw new InputStreamException(InputStreamException.ERROR_MANIPULATING_INPUT_STREAM_CHANNEL,e); 
      } 
      // On ferme le flux 
      IOHelper.close(is); 
   } 
   bytesMessage = messageCompleteOut.toString().getBytes(Charset.forName("UTF-8")); 
   return bytesMessage; 
} 

这个解决方案看起来还行吗?或者也许有更好的方法?

thxs

EN

回答 1

Stack Overflow用户

发布于 2016-04-20 16:23:55

GenericFileMessage (文件组件使用的消息实现)不会将文件内容加载到内存中。因此,事实上,您只需要确保您不会以强制它转换的方式访问body。您还可以编写自己的消息(继承自GenericFileMessage)并防止这种转换,或者返回一些不同的内容(某种“摘要”)。

在此过程中,处理器可以获取文件系统中文件的位置(从消息头),并直接打开它,可能会用其他消息替换文件消息。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/36542766

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档