首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >按值将pyspark数据收集到列表中。

按值将pyspark数据收集到列表中。
EN

Stack Overflow用户
提问于 2022-07-25 20:24:26
回答 2查看 175关注 0票数 0

我有以下数据:

代码语言:javascript
复制
+-------------------+-------------------+---------+--------------+--------+
|            fs_date|            ss_date|fs_origin|fs_destination|   price|
+-------------------+-------------------+---------+--------------+--------+
|2022-06-01T00:00:00|2022-06-02T00:00:00|      TLV|           AUH|681.0715|
|2022-06-01T00:00:00|2022-06-03T00:00:00|      TLV|           AUH|  406.46|
|2022-06-01T00:00:00|2022-06-02T00:00:00|      TLV|           BOM|545.7715|
|2022-06-01T00:00:00|2022-06-03T00:00:00|      TLV|           BOM| 372.435|

我想将整个数据收集到一个JSON列表中,按'fs_destination‘进行分区,如下所示:

代码语言:javascript
复制
{ "AUH":
  ['{"fs_date":"2022-06-01T00:00:00","ss_date":"2022-06-02T00:00:00","fs_origin":"TLV","fs_destination":"AUH","price":681.0715}',
   '{"fs_date":"2022-06-01T00:00:00","ss_date":"2022-06-03T00:00:00","fs_origin":"TLV","fs_destination":"AUH","price":406.46}'],
"BOM":
  ['{"fs_date":"2022-06-01T00:00:00","ss_date":"2022-06-02T00:00:00","fs_origin":"TLV","fs_destination":"BOM","price":545.7715}', 
   '{"fs_date":"2022-06-01T00:00:00","ss_date":"2022-06-02T00:00:00","fs_origin":"TLV","fs_destination":"BOM","price":372.435}']
}

谢谢!

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2022-07-25 21:37:59

收集数据时要小心,确保群集具有容量。

代码语言:javascript
复制
import pyspark.sql.functions as f


df_output = (df
             .groupBy("fs_destination")
             .agg(f.collect_list(f.to_json(f.struct(*df.columns))).alias("JSON")))

output = {row["fs_destination"]: row["JSON"] for row in df_output.toLocalIterator()}

输出

代码语言:javascript
复制
{
  'AUH': [
    '{"fs_date":"2022-06-01T00:00:00","ss_date":"2022-06-02T00:00:00","fs_origin":"TLV","fs_destination":"AUH","price":681.0715}',
    '{"fs_date":"2022-06-01T00:00:00","ss_date":"2022-06-03T00:00:00","fs_origin":"TLV","fs_destination":"AUH","price":406.46}'
  ],
  'BOM': [
    '{"fs_date":"2022-06-01T00:00:00","ss_date":"2022-06-02T00:00:00","fs_origin":"TLV","fs_destination":"BOM","price":545.7715}',
    '{"fs_date":"2022-06-01T00:00:00","ss_date":"2022-06-03T00:00:00","fs_origin":"TLV","fs_destination":"BOM","price":372.435}'
  ]
}
票数 1
EN

Stack Overflow用户

发布于 2022-07-25 21:27:13

试试这个:

代码语言:javascript
复制
import pyspark.sql.functions as f
df = (
    df
    .withColumn('jsonValue', f.to_json(f.struct(*df.columns)))
    .groupBy('fs_destination')
    .agg(f.collect_list('jsonValue').alias('jsonValues'))
)

df_collected = df.collect()

output = dict(zip(
    [element['fs_destination'] for element in df_collected],
    [element['jsonValues'] for element in df_collected]
))
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/73114965

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档