首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >为什么orderBy()用PySpark修改DataFrame上的聚合结果?

为什么orderBy()用PySpark修改DataFrame上的聚合结果?
EN

Stack Overflow用户
提问于 2022-06-03 17:26:09
回答 2查看 110关注 0票数 1

我必须使用PySpark来查找,这是最常一起销售的产品,来自存储在一个名为sales_NY的数据中的数据,其示例是:

代码语言:javascript
复制
+-------+--------------------+--------+------+-------------------+--------------------+-------------+-----+-----+----+
|OrderID|             Product|Quantity| Price|          OrderDate|        StoreAddress|         City|State|Month|Hour|
+-------+--------------------+--------+------+-------------------+--------------------+-------------+-----+-----+----+
| 295665|  Macbook Pro Laptop|       1|1700.0|2019-12-30 00:01:00|136 Church St, Ne...|New York City|   NY|   12|   0|
| 295666|  LG Washing Machine|       1| 600.0|2019-12-29 07:03:00|562 2nd St, New Y...|New York City|   NY|   12|   7|
| 295667|USB-C Charging Cable|       1| 11.95|2019-12-12 18:21:00|277 Main St, New ...|New York City|   NY|   12|  18|
| 295670|AA Batteries (4-p...|       1|  3.84|2019-12-31 22:58:00|200 Jefferson St,...|New York City|   NY|   12|  22|
| 295698|     Vareebadd Phone|       1| 400.0|2019-12-13 14:32:00|175 1st St, New Y...|New York City|   NY|   12|  14|
| 295698|USB-C Charging Cable|       2| 11.95|2019-12-13 14:32:00|175 1st St, New Y...|New York City|   NY|   12|  14|
| 295700|Bose SoundSport H...|       1| 99.99|2019-12-25 19:02:00|363 Hickory St, N...|New York City|   NY|   12|  19|
| 295704|    Wired Headphones|       1| 11.99|2019-12-12 00:20:00|457 8th St, New Y...|New York City|   NY|   12|   0|
| 295705|    Wired Headphones|       1| 11.99|2019-12-25 10:41:00|133 Jackson St, N...|New York City|   NY|   12|  10|
| 295712|  Macbook Pro Laptop|       1|1700.0|2019-12-10 20:02:00|331 Madison St, N...|New York City|   NY|   12|  20|
| 295713|Bose SoundSport H...|       1| 99.99|2019-12-24 07:55:00|490 Spruce St, Ne...|New York City|   NY|   12|   7|
| 295720|AA Batteries (4-p...|       1|  3.84|2019-12-17 22:52:00|298 Ridge St, New...|New York City|   NY|   12|  22|
| 295728|    27in FHD Monitor|       1|149.99|2019-12-21 19:21:00|366 Washington St...|New York City|   NY|   12|  19|
| 295735|              iPhone|       1| 700.0|2019-12-22 18:25:00|374 Lincoln St, N...|New York City|   NY|   12|  18|
| 295735|Apple Airpods Hea...|       1| 150.0|2019-12-22 18:25:00|374 Lincoln St, N...|New York City|   NY|   12|  18|
| 295735|    Wired Headphones|       1| 11.99|2019-12-22 18:25:00|374 Lincoln St, N...|New York City|   NY|   12|  18|
| 295740|USB-C Charging Cable|       1| 11.95|2019-12-01 20:36:00|102 Cedar St, New...|New York City|   NY|   12|  20|
| 295742|Apple Airpods Hea...|       1| 150.0|2019-12-09 23:45:00|368 Sunset St, Ne...|New York City|   NY|   12|  23|
| 295743|USB-C Charging Cable|       1| 11.95|2019-12-03 11:52:00|346 South St, New...|New York City|   NY|   12|  11|
| 295745|       Flatscreen TV|       1| 300.0|2019-12-24 10:38:00|124 Lakeview St, ...|New York City|   NY|   12|  10|
+-------+--------------------+--------+------+-------------------+--------------------+-------------+-----+-----+----+

要找到--最常一起销售的产品--,我使用代码的第一部分将数据加载到内存中(很抱歉缺乏可再现性,但在这里包含数据的整个数据的大小相当大):

公共部分:

代码语言:javascript
复制
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, collect_list, size, hour

spark = (SparkSession.builder.appName('SalesAnalytics').getOrCreate())

### This is the local path to my data:
file_path = './data/output/sales/ReportYear=2019'
sales_raw_df = (spark.read.format('parquet')
                 .option('header', 'True')
                 .option('inferSchema', 'True')
                 .load(file_path))
sales_raw_df = sales_raw_df.withColumn('Hour', hour(sales_raw_df.OrderDate))

sales_NY = (sales_raw_df.where(col('State') == 'NY'))

现在,我可能会遵循两个不同版本的解决方案,我认为这是完全等价的,但输出略有不同。版本的不同之处在于,第二个版本使用orderBy('OrderID', 'State')添加了一个中间步骤。

版本1:

代码语言:javascript
复制
sales_q4_df = (sales_NY.groupBy('OrderID', 'State')
                       .agg(collect_list('Product').alias('ProductList')))
sales_q4_df = (sales_q4_df.withColumn('ProductListSize', size('ProductList')))

### discards the orders with a single product (the OrderID just appears once)
sales_q4_df = sales_q4_df.filter(col('ProductListSize') > 1).orderBy('ProductList', ascending=True)
most_prods_together = sales_q4_df.groupBy('ProductList').count().orderBy('count', ascending=False).show(10, False)

输出1:

代码语言:javascript
复制
+------------------------------------------------------+-----+
|ProductList                                           |count|
+------------------------------------------------------+-----+
|[iPhone, Lightning Charging Cable]                    |126  |
|[Google Phone, USB-C Charging Cable]                  |124  |
|[Google Phone, Wired Headphones]                      |52   |
|[Vareebadd Phone, USB-C Charging Cable]               |49   |
|[iPhone, Wired Headphones]                            |46   |
|[iPhone, Apple Airpods Headphones]                    |43   |
|[Google Phone, Bose SoundSport Headphones]            |23   |
|[Vareebadd Phone, Wired Headphones]                   |17   |
|[Apple Airpods Headphones, Wired Headphones]          |12   |
|[Google Phone, USB-C Charging Cable, Wired Headphones]|11   |
+------------------------------------------------------+-----+

版本2:

代码语言:javascript
复制
sales_q4_df = (sales_NY.orderBy('OrderID', 'Product')
                       .groupBy('OrderID', 'State')
                       .agg(collect_list('Product').alias('ProductList')))
sales_q4_df = (sales_q4_df.withColumn('ProductListSize', size('ProductList')))

### discards the orders with a single product (the OrderID just appears once)
sales_q4_df = sales_q4_df.filter(col('ProductListSize') > 1).orderBy('ProductList', ascending=True)
most_prods_together = sales_q4_df.groupBy('ProductList').count().orderBy('count', ascending=False).show(10, False)

输出2:

代码语言:javascript
复制
+-------------------------------------------------+-----+
|ProductList                                      |count|
+-------------------------------------------------+-----+
|[Google Phone, USB-C Charging Cable]             |127  |
|[Lightning Charging Cable, iPhone]               |126  |
|[Google Phone, Wired Headphones]                 |53   |
|[USB-C Charging Cable, Vareebadd Phone]          |50   |
|[Wired Headphones, iPhone]                       |46   |
|[Apple Airpods Headphones, iPhone]               |45   |
|[Bose SoundSport Headphones, Google Phone]       |24   |
|[Apple Airpods Headphones, Wired Headphones]     |19   |
|[Vareebadd Phone, Wired Headphones]              |17   |
|[AA Batteries (4-pack), Lightning Charging Cable]|16   |
+-------------------------------------------------+-----+

有人能解释一下为什么结果不同吗?这是PySpark中的一个bug吗?

我正在使用jupyterlab v.3.4.2、PySpark v.3.0.1和Javav.15开发笔记本。

PD:我必须补充说,我也尝试了sort()方法(由于使用了几个分区而比orderBy()更有效),但是结果是一样的。

EN

回答 2

Stack Overflow用户

发布于 2022-06-03 23:35:55

正如艾玛所提到的,在具有随机顺序的列表列上使用group_by可能会返回奇怪的结果。‘、'b’和'b‘、'a’是两个不同的值。

您可以在array_sort上运行collect_list,然后group_by每次都应该返回相同的结果。

也就是说,清单上的group_by并不是回答以下问题的最准确的方法:什么是最常一起销售的产品?“iphone”、“iphone充电器”、“口香糖”和“iphone”、“iphone充电器”在你的结果中是两行不同的数据,但iphone和iphone充电器之间的关联可能就是你想要的。

您可以使用pyspark.ml.fpm FP-生长,它可以返回通常一起返回的关联规则和对项(或其他大小)。

代码语言:javascript
复制
from pyspark.ml.fpm import FPGrowth
from pyspark.sql.functions import size, col

df = spark.createDataFrame(
    [
     [1, ["tomato", "cucumber", "onion"]],
     [2, ["cucumber", "avocado", "tomato", "olive oil"]],
     [3, ["cucumber", "tomato", "lettuce", "onion"]],
     [4, ["lettuce", "onion"]],
     [5, ["olive oil", "bread"]],
     [6, ["onion", "olive oil", "lettuce"]]
    ], ["order_id", "products"]
)

fpGrowth = FPGrowth(itemsCol="products", minSupport=0.2, minConfidence=0.75)
model = fpGrowth.fit(df)

model.freqItemsets.filter(size("items") > 1).orderBy(col("freq").desc()).show()

+--------------------+----+
|               items|freq|
+--------------------+----+
|    [lettuce, onion]|   3|
|  [tomato, cucumber]|   3|
|   [cucumber, onion]|   2|
|[tomato, cucumber...|   2|
|     [tomato, onion]|   2|
+--------------------+----+

另外,model.associationRules.show()可能会让您感兴趣。

票数 3
EN

Stack Overflow用户

发布于 2022-06-03 19:42:05

这是因为星火的数据是无序的。

在版本1中,您没有orderBy,因此collect_list可以按任何顺序收集产品。

代码语言:javascript
复制
sales_q4_df = (sales_NY.groupBy('OrderID', 'State')
               .agg(collect_list('Product').alias('ProductList')))

请检查版本1 [Google Phone, Bose SoundSport Headphones]中的列表中的这个项目,"G"oogle出现在"B"ose之前,它的计数为23。

如果您移除> 1筛选器,我猜您有一个带有计数1的[Bose SoundSport Headphones, Google Phone]条目。

在第2版中,添加了.orderBy('OrderID', 'Product'),这使得查询具有确定性,并且使用24计算正确的[Bose SoundSport Headphones, Google Phone]集。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/72493145

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档