首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >火花2.2.2 -连接多个RDDs发出的记忆体除外。结果RDD有124列。最佳的连接方法应该是什么?

火花2.2.2 -连接多个RDDs发出的记忆体除外。结果RDD有124列。最佳的连接方法应该是什么?
EN

Stack Overflow用户
提问于 2018-11-02 14:21:33
回答 1查看 126关注 0票数 0

我有一个文件,每个电话号码都有多个值。例如:

代码语言:javascript
复制
phone_no circle operator priority1 attribute1 attribute2 attribute3 priority2 attribute1 attribute2 attribute3 
123445   delhi  airtel   1.0        info1      info2      info3      1.1        info4      info5      info6
987654   bhopal idea     1.1        info1      info2      info3      1.4        info4      info5      info6
123445   delhi  airtel   1.3        info1      info2      info3      1.0        info4      info5      info6

我的预期输出是:对于每个电话号码,选择最小的P1及其对应的属性值。

正如我上面的示例所述,对于电话号码123445,第1行中的P1小于第3行中的P1 (1.0 < 1.3),因此我希望从第1行中选择属性1、2和3,而由于第3行中的P2值较小( 1.0 < 1.1),因此我希望从第3行中选择属性值。

下面是我想要的表格格式:

代码语言:javascript
复制
phone_no circle operator priority1 attribute1 attribute2 attribute3 priority2 attribute1 attribute2 attribute3 
123445   delhi  airtel   1.0        info1      info2      info3      1.0        info4      info5      info6
987654   bhopal idea     1.1        info1      info2      info3      1.4        info4      info5      info6

我有25个不同的优先级值,每个优先级值有4个不同的属性,所以我的总列在125左右。

到目前为止,我已经尝试过:

  1. 创建一个Dataframe,其中有一个电话号码作为键和每个优先级值的最小值。
  2. 创建另一个Dataframe,其值为min(Priority1),并为每个电话号码创建相应的属性。
  3. 创建另一个Dataframe,其值为min(Priority2),并为每个电话号码创建相应的属性。
  4. 将电话号码上的这两个Dataframes连接起来,以获得完整的信息,并将此数据存储到磁盘。

我的方法的问题是,考虑到我拥有的列数,这不是一个很好的方法。请给我一些解决这个问题的好方法。

编辑1:下面是我所做工作的pastebin链接:https://pastebin.com/ps4f1KSh

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2018-11-02 22:07:21

我可能会使用窗口函数:

代码语言:javascript
复制
from pyspark.sql.window import Window
import pyspark.sql.functions as spf

df = spark.createDataFrame([
    (123, 1, 'a', 2, 'c'),
    (123, 2, 'b', 1, 'd'),
    (456, 3, 'e', 4, 'f')
], ['phone', 'priority1', 'attribute1', 'priority2', 'attribute2'])

w = Window.partitionBy('phone')
df2 = (
    df
    .select(
        'phone',
        spf.first('attribute1').over(w.orderBy('priority1')).alias('attribute1'),
        spf.first('attribute2').over(w.orderBy('priority2')).alias('attribute2'),
    )
)

(
    df2
    .groupby('phone')
    .agg(*[spf.first(c).alias(c) for c in df2.columns if c != 'phone'])
    .toPandas()
)

给予:

代码语言:javascript
复制
   phone attribute1 attribute2
0    123          a          d
1    456          e          f

对读者来说,这是一个模板化的练习(例如,使用列表理解)来概括到所有属性和优先级。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/53120335

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档