我知道我们可以使用Window function in pyspark来计算累积和。但Window仅在HiveContext中受支持,在SQLContext中不受支持。我需要使用SQLContext,因为HiveContext不能在多进程中运行。
是否有使用SQLContext计算累计和的有效方法?一种简单的方法是将数据加载到驱动程序的内存中并使用numpy.cumsum,但缺点是数据需要能够放入内存中
发布于 2016-01-13 06:46:58
我不确定这是否是您想要的结果,但这里有两个使用sqlContext计算累计和的示例:
首先,当您想按某些类别对其进行划分时:
from pyspark.sql.types import StructType, StringType, LongType
from pyspark.sql import SQLContext
rdd = sc.parallelize([
("Tablet", 6500),
("Tablet", 5500),
("Cell Phone", 6000),
("Cell Phone", 6500),
("Cell Phone", 5500)
])
schema = StructType([
StructField("category", StringType(), False),
StructField("revenue", LongType(), False)
])
df = sqlContext.createDataFrame(rdd, schema)
df.registerTempTable("test_table")
df2 = sqlContext.sql("""
SELECT
category,
revenue,
sum(revenue) OVER (PARTITION BY category ORDER BY revenue) as cumsum
FROM
test_table
""")输出:
[Row(category='Tablet', revenue=5500, cumsum=5500),
Row(category='Tablet', revenue=6500, cumsum=12000),
Row(category='Cell Phone', revenue=5500, cumsum=5500),
Row(category='Cell Phone', revenue=6000, cumsum=11500),
Row(category='Cell Phone', revenue=6500, cumsum=18000)]第二,当你只想求一个变量的累加和时。将df2更改为:
df2 = sqlContext.sql("""
SELECT
category,
revenue,
sum(revenue) OVER (ORDER BY revenue, category) as cumsum
FROM
test_table
""")输出:
[Row(category='Cell Phone', revenue=5500, cumsum=5500),
Row(category='Tablet', revenue=5500, cumsum=11000),
Row(category='Cell Phone', revenue=6000, cumsum=17000),
Row(category='Cell Phone', revenue=6500, cumsum=23500),
Row(category='Tablet', revenue=6500, cumsum=30000)]希望这能有所帮助。在收集数据后使用np.cumsum效率不是很高,尤其是在数据集很大的情况下。您可以探索的另一种方法是使用简单的RDD转换,如groupByKey(),然后使用map通过某个键计算每个组的累积和,然后在最后减少它。
发布于 2017-03-02 00:20:53
下面是一个简单的例子:
import pyspark
from pyspark.sql import window
import pyspark.sql.functions as sf
sc = pyspark.SparkContext(appName="test")
sqlcontext = pyspark.SQLContext(sc)
data = sqlcontext.createDataFrame([("Bob", "M", "Boston", 1, 20),
("Cam", "F", "Cambridge", 1, 25),
("Lin", "F", "Cambridge", 1, 25),
("Cat", "M", "Boston", 1, 20),
("Sara", "F", "Cambridge", 1, 15),
("Jeff", "M", "Cambridge", 1, 25),
("Bean", "M", "Cambridge", 1, 26),
("Dave", "M", "Cambridge", 1, 21),],
["name", 'gender', "city", 'donation', "age"])
data.show()给出输出
+----+------+---------+--------+---+
|name|gender| city|donation|age|
+----+------+---------+--------+---+
| Bob| M| Boston| 1| 20|
| Cam| F|Cambridge| 1| 25|
| Lin| F|Cambridge| 1| 25|
| Cat| M| Boston| 1| 20|
|Sara| F|Cambridge| 1| 15|
|Jeff| M|Cambridge| 1| 25|
|Bean| M|Cambridge| 1| 26|
|Dave| M|Cambridge| 1| 21|
+----+------+---------+--------+---+定义窗口
win_spec = (window.Window
.partitionBy(['gender', 'city'])
.rowsBetween(window.Window.unboundedPreceding, 0))window.Window.unboundedPreceding --组的第一行# .rowsBetween(...,0) -- 0指的是当前行,如果指定了-2,则最多在当前行之前2行
现在,这是一个陷阱:
temp = data.withColumn('cumsum',sum(data.donation).over(win_spec))有错误:
TypeErrorTraceback (most recent call last)
<ipython-input-9-b467d24b05cd> in <module>()
----> 1 temp = data.withColumn('cumsum',sum(data.donation).over(win_spec))
/Users/mupadhye/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/column.pyc in __iter__(self)
238
239 def __iter__(self):
--> 240 raise TypeError("Column is not iterable")
241
242 # string methods
TypeError: Column is not iterable这是因为使用了python的sum函数而不是pyspark's。修复此问题的方法是使用pyspark.sql.functions.sum中的sum函数
temp = data.withColumn('AgeSum',sf.sum(data.donation).over(win_spec))
temp.show()将给予:
+----+------+---------+--------+---+--------------+
|name|gender| city|donation|age|CumSumDonation|
+----+------+---------+--------+---+--------------+
|Sara| F|Cambridge| 1| 15| 1|
| Cam| F|Cambridge| 1| 25| 2|
| Lin| F|Cambridge| 1| 25| 3|
| Bob| M| Boston| 1| 20| 1|
| Cat| M| Boston| 1| 20| 2|
|Dave| M|Cambridge| 1| 21| 1|
|Jeff| M|Cambridge| 1| 25| 2|
|Bean| M|Cambridge| 1| 26| 3|
+----+------+---------+--------+---+--------------+发布于 2017-04-07 05:32:05
在这个线程上尝试解决类似的问题后,我已经使用此代码解决了我的问题。不确定我是否遗漏了操作的一部分,但这是对SQLContext列求和的一种方法:
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql.context import SQLContext
sc = SparkContext()
sc.setLogLevel("ERROR")
conf = SparkConf()
conf.setAppName('Sum SQLContext Column')
conf.set("spark.executor.memory", "2g")
sqlContext = SQLContext(sc)
def sum_column(table, column):
sc_table = sqlContext.table(table)
return sc_table.agg({column: "sum"})
sum_column("db.tablename", "column").show()https://stackoverflow.com/questions/34726268
复制相似问题