我有一个火花流处理程序,从卡夫卡读取数据,进入一个DStream。
在我的管道中,我做了两次(一个接一个):
DStream.foreachRDD(对RDD的转换并插入目标)。
(每次我做不同的处理并将数据插入到不同的目的地)。
我在想,在我刚读完卡夫卡的数据之后,DStream.cache会怎么做呢?有可能做到吗?
这个过程实际上是从卡夫卡那里读取了两次数据吗?
请记住,不可能将两个foreachRDDs放在一个中(因为两条路径非常不同,那里有状态转换--需要在DStream上应用.)
谢谢你的帮忙
发布于 2016-06-08 11:54:53
有两种选择:
Dstream.cache()将底层RDDs标记为缓存。火花流将处理在超时后,由spark.cleaner.ttl配置控制的RDDs的未持久化。foreachRDD将cache()和unpersist(false)副作用操作应用于DStream中的RDDs:例如:
val kafkaDStream = ???
val targetRDD = kafkaRDD
.transformation(...)
.transformation(...)
...
// Right before the lineage fork mark the RDD as cacheable:
targetRDD.foreachRDD{rdd => rdd.cache(...)}
targetRDD.foreachRDD{do stuff 1}
targetRDD.foreachRDD{do stuff 2}
targetRDD.foreachRDD{rdd => rdd.unpersist(false)}请注意,如果可以选择缓存,则可以将缓存合并为do stuff 1的第一个语句。
我更喜欢这个选项,因为它给了我对缓存生命周期的细粒度控制,并允许我在需要时立即清理东西,而不是依赖于ttl。
https://stackoverflow.com/questions/37684506
复制相似问题