可以在运行时创建具有新名称的Dstream并销毁旧的dstream吗?
//Read the Dstream
inputDstream = ssc.textFileStream("./myPath/")示例:我正在读取一个名为cvd_filter.txt的文件,其中的每一行都包含一个字符串,该字符串应该是dstream的筛选条件。此文件将使用新值更新(也可以附加):
示例: At time 10:00;cat cvd_filter.txt
"1001" "1002" "1003"
// Read cvd_filter.txt every 5 mins and do creation/destruction of dstreams.
with open(cvd_filter.txt) as f:
content = f.readlines()
dstream_content[0] = inputDstream.filter(lambda a: content[0] in a)
// At this point (dstream_1001 , dstream_1002, dstream_1003) should get created.
// NOW, DO SOME OPERATION ON INDIVIDUAL dstreams. 时间10:05;cat cvd_filter.txt
"1004" "1002" "1003"
//为新筛选器字符串创建dstream_1004,只销毁dstream_1001 //,但保留dstream_1002和dstream_1003。此时(dstream_1004、dstream_1002、dstream_1003)应在场。//现在,对单个dstream执行一些操作。
发布于 2016-06-14 07:19:04
不是的。DStreams上的任何新流或操作都不能添加到正在运行的上下文中。我建议将您的使用程序建模为foreachRDD,这样您就可以自由地对底层RDD进行任意操作。例如:
val dstream = ??? /// original dstream
dstream.foreachRDD{rdd =>
val filters = // read file
val filteredRDDs = filters.map(f => rdd.filter(elem => elem.contains(f))
...
}然后进一步表示对不同筛选的RDD所需的操作。DStreams将所有转换操作委托给底层RDD,因此您应该能够以这种方式表达业务逻辑。
https://stackoverflow.com/questions/37799842
复制相似问题