首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Dstream运行时创建/销毁

Dstream运行时创建/销毁
EN

Stack Overflow用户
提问于 2016-06-13 22:01:20
回答 1查看 93关注 0票数 2

可以在运行时创建具有新名称的Dstream并销毁旧的dstream吗?

代码语言:javascript
复制
//Read the Dstream 
inputDstream = ssc.textFileStream("./myPath/")

示例:我正在读取一个名为cvd_filter.txt的文件,其中的每一行都包含一个字符串,该字符串应该是dstream的筛选条件。此文件将使用新值更新(也可以附加):

示例: At time 10:00;cat cvd_filter.txt

"1001" "1002" "1003"

代码语言:javascript
复制
// Read cvd_filter.txt every 5 mins and do creation/destruction of dstreams.

with open(cvd_filter.txt) as f:
    content = f.readlines()
    dstream_content[0] = inputDstream.filter(lambda a: content[0] in a)

// At this point (dstream_1001 , dstream_1002, dstream_1003) should get created. 
// NOW, DO SOME OPERATION ON INDIVIDUAL dstreams. 

时间10:05;cat cvd_filter.txt

"1004" "1002" "1003"

//为新筛选器字符串创建dstream_1004,只销毁dstream_1001 //,但保留dstream_1002和dstream_1003。此时(dstream_1004、dstream_1002、dstream_1003)应在场。//现在,对单个dstream执行一些操作。

EN

回答 1

Stack Overflow用户

发布于 2016-06-14 07:19:04

不是的。DStreams上的任何新流或操作都不能添加到正在运行的上下文中。我建议将您的使用程序建模为foreachRDD,这样您就可以自由地对底层RDD进行任意操作。例如:

代码语言:javascript
复制
val dstream = ??? /// original dstream
dstream.foreachRDD{rdd =>
  val filters =  // read file
  val filteredRDDs = filters.map(f => rdd.filter(elem => elem.contains(f))
  ...
}

然后进一步表示对不同筛选的RDD所需的操作。DStreams将所有转换操作委托给底层RDD,因此您应该能够以这种方式表达业务逻辑。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/37799842

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档