我有一个非常简单的代码:
def fun(x, n):
return (x, n)
rdds = []
for i in range(2):
rdd = sc.parallelize(range(5*i, 5*(i+1)))
rdd = rdd.map(lambda x: fun(x, i))
rdds.append(rdd)
a = sc.union(rdds)
print a.collect()我曾预期产出如下:
[(0, 0), (1, 0), (2, 0), (3, 0), (4, 0), (5, 1), (6, 1), (7, 1), (8, 1), (9, 1)]但是,输出如下:
[(0, 1), (1, 1), (2, 1), (3, 1), (4, 1), (5, 1), (6, 1), (7, 1), (8, 1), (9, 1)]至少可以说,这是令人困惑的。
由于对RDDs的延迟评估,用于创建RDDs的i的值似乎是它在调用collect()时所承载的值,即1(来自for循环的最后一次运行)。
现在,元组的两个元素都是从i派生的。
但是,对于元组的第一个元素,i的值是0和1,而对于元组的第二个元素,i的值是2。
有人能解释一下发生了什么吗?
谢谢。
发布于 2016-12-21 02:34:05
只要改变
rdd = rdd.map(lambda x: fun(x, i))至
rdd = rdd.map(lambda x, i=i: (x, i))这只是关于Python的,看看这个
https://docs.python.org/2.7/tutorial/controlflow.html#default-argument-values
发布于 2016-12-21 02:29:31
sc.parallelize()是一种可以立即执行的动作。因此,i,即0和1的值都将被使用。
但是,在rdd.map()的情况下,稍后调用collect()时只会使用i的最后一个值。
rdd = sc.parallelize(range(5*i, 5*(i+1)))
rdd = rdd.map(lambda x: fun(x, i))在这里,rdd.map不会转换rdd,它只会创建DAG(有向无环图),也就是说lambda函数将不应用于rdd的元素。
调用but ()时,将调用lambda函数,但到那时i的值为1。如果在调用i=10之前重新分配i=10,则将使用i的值。
https://stackoverflow.com/questions/41254045
复制相似问题