首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何在Spark-Streaming的DStream中使用'for‘循环进行转换和输出?

如何在Spark-Streaming的DStream中使用'for‘循环进行转换和输出?
EN

Stack Overflow用户
提问于 2016-11-21 00:56:50
回答 1查看 211关注 0票数 0

我是Spark中的菜鸟,我使用我定义的类生成1000个不同的实例(这些实例中的函数是相同的,但详细函数的参数不同)。sampler=generateClass()然后我需要将这些实例的函数映射到我的Stream。(为了测试,只使用10和2个实例)。

代码语言:javascript
复制
s=[]
for i in range(10):        
    s.append(mappedStream.map(lambda x: sampler[i].insert(x)).reduce(min))

uStream=ssc.union(s[0],s[1],s[2],s[3],s[4],s[5],s[6],s[7],s[8],s[9])
uStream.pprint()

但它的输出只有10个相同的键值对,看起来这些代码只是将我的数据映射到第一个实例,然后重复10次。

代码语言:javascript
复制
(85829323L, [2, 1])
(85829323L, [2, 1])
(85829323L, [2, 1])
(85829323L, [2, 1])
....

然后,我试着

代码语言:javascript
复制
myStream1=mappedStream.map(lambda x: sampler[0].insert(x)).reduce(min)
myStream2=mappedStream.map(lambda x: sampler[1].insert(x)).reduce(min)
ssc.union(myStream1,myStream2).pprint()

输出是正确的:

代码语言:javascript
复制
(85829323L, [2, 1])
(99580454L, [4, 1])

为什么会发生这种情况?我该怎么处理呢?非常感谢。

EN

回答 1

Stack Overflow用户

发布于 2016-11-21 03:08:13

这是因为python lambda是延迟计算的,并且当您在s[0]上调用一个操作时,它使用最后一个i参数来计算(在您的例子中,9是最后一个循环值)。

您可以使用函数生成器模式来“强制”使用适当的i,例如:

代码语言:javascript
复制
def call_sampler(i):
    return lambda x: sampler[i].insert(x)

s=[]
for i in range(10):        
    s.append(mappedStream.map(call_sampler(i)).reduce(min))

uStream=ssc.union(s[0],s[1],s[2],s[3],s[4],s[5],s[6],s[7],s[8],s[9])
uStream.pprint()
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/40706752

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档