首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >在火花流中缓存DStream

在火花流中缓存DStream
EN

Stack Overflow用户
提问于 2016-06-07 16:20:19
回答 1查看 5.2K关注 0票数 2

我有一个火花流处理程序,从卡夫卡读取数据,进入一个DStream。

在我的管道中,我做了两次(一个接一个):

DStream.foreachRDD(对RDD的转换并插入目标)。

(每次我做不同的处理并将数据插入到不同的目的地)。

我在想,在我刚读完卡夫卡的数据之后,​DStream.cache​会怎么做呢?有可能做到吗?

这个过程实际上是从卡夫卡那里读取了两次数据吗?

请记住,不可能将两个foreachRDDs放在一个中(因为两条路径非常不同,那里有状态转换--需要在DStream上应用.)

谢谢你的帮忙

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2016-06-08 11:54:53

有两种选择:

  • 使用Dstream.cache()将底层RDDs标记为缓存。火花流将处理在超时后,由spark.cleaner.ttl配置控制的RDDs的未持久化。
  • 使用附加的foreachRDDcache()unpersist(false)副作用操作应用于DStream中的RDDs:

例如:

代码语言:javascript
复制
val kafkaDStream = ???
val targetRDD = kafkaRDD
                       .transformation(...)
                       .transformation(...)
                       ...
// Right before the lineage fork mark the RDD as cacheable:
targetRDD.foreachRDD{rdd => rdd.cache(...)}
targetRDD.foreachRDD{do stuff 1}
targetRDD.foreachRDD{do stuff 2}
targetRDD.foreachRDD{rdd => rdd.unpersist(false)}

请注意,如果可以选择缓存,则可以将缓存合并为do stuff 1的第一个语句。

我更喜欢这个选项,因为它给了我对缓存生命周期的细粒度控制,并允许我在需要时立即清理东西,而不是依赖于ttl。

票数 7
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/37684506

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档