我已经用python编写了一个使用spark的示例函数。函数如下所示
#!/usr/bin/env python
from __future__ import print_function
from pyspark.sql import SparkSession
import os
import sys
os.environ["SPARK_HOME"] = "/usr/local/spark"
os.environ["PYSPARK_PYTHON"]="/usr/bin/python3.4"
spark = SparkSession \
.builder \
.appName("testpython") \
.getOrCreate()
rdd1 = spark.sparkContext.parallelize([1,6,5,2,99,1000,100009,10000,139,44,45343,23234,34])
**rdd3=rdd1.reduce(lambda x,y:x+1)**
print(rdd3)在reduce函数中,我们已经给出了(λx,y:x+1),对于上面的函数,理想的结果必须是13,但结果是6
谁能解释一下为什么结果是6而不是13?是不是因为spark中跨分区的数据dvision?
控制台输出:
/usr/bin/python3.4 /home/PycharmProjects/sampleproject/ttestexmple.py
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
17/07/20 17:45:14 WARN NativeCodeLoader: Unable to load native-hadoop
17/07/20 17:45:14 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
6进程已完成,退出代码为0
发布于 2017-07-20 20:58:33
是的,你说的完全正确。你可以通过使用下面的命令来确保你的rdd只在1个分区上
rdd1 = rdd1.coalesce(1)
rdd2 = rdd1.reduce(lambda x,y: x+1)现在,您将获得预期的答案。
原因是当您的rdd有多个分区,并且您正在尝试使用一个reduce,而这个reduce根本没有使用y。假设rdd有两个分区,因此reduce得到如下(reduce on partition 1, reduce on partition 2),最后它会给出reduce result on partion 1 + 1
发布于 2018-08-27 23:38:45
更具体地说,函数期望(或更确切地说,要求)具有相联和可交换的二进制函数,即函数f(x, y) = x op y,以便
x op (y op z) = (x op y) op z和
x op y = y op x如果不满足这些要求,就无法保证在组合来自不同分区的中间结果时最终结果是相同的。结合性确保计算可以完全并行化;交换性确保并行计算可以减少到相同的最终结果(因为您不知道中间结果最终组合的顺序)。
您可以很容易地看到,您的函数不满足这两个条件,因此不能可靠地与reduce一起使用
x op (y op z) = x op (y + 1) = x + 1鉴于
(x op y) op z = (x + 1) op z = x + 2对于x的任何值,它们都不相等。类似地,
x op y = x + 1它等于y op x当且仅当x= y。
https://stackoverflow.com/questions/45214445
复制相似问题