首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >按值将pyspark数据收集到字典列表中。

按值将pyspark数据收集到字典列表中。
EN

Stack Overflow用户
提问于 2022-08-02 20:46:57
回答 2查看 148关注 0票数 0

我有以下数据:

代码语言:javascript
复制
+-------------------+-------------------+---------+--------------+--------+
|            fs_date|            ss_date|fs_origin|fs_destination|   price|
+-------------------+-------------------+---------+--------------+--------+
|2022-06-01T00:00:00|2022-06-02T00:00:00|      TLV|           AUH|681.0715|
|2022-06-01T00:00:00|2022-06-03T00:00:00|      TLV|           AUH|  406.46|
|2022-06-01T00:00:00|2022-06-02T00:00:00|      TLV|           BOM|545.7715|
|2022-06-01T00:00:00|2022-06-03T00:00:00|      TLV|           BOM| 372.435|

我想将整个数据收集到一个字典列表中,按'fs_destination‘进行partitiond如下:

代码语言:javascript
复制
{ "AUH":
  [{"fs_date":"2022-06-01T00:00:00","ss_date":"2022-06-02T00:00:00","fs_origin":"TLV","fs_destination":"AUH","price":681.0715},
   {"fs_date":"2022-06-01T00:00:00","ss_date":"2022-06-03T00:00:00","fs_origin":"TLV","fs_destination":"AUH","price":406.46}],
"BOM":
  [{"fs_date":"2022-06-01T00:00:00","ss_date":"2022-06-02T00:00:00","fs_origin":"TLV","fs_destination":"BOM","price":545.7715}, 
   {"fs_date":"2022-06-01T00:00:00","ss_date":"2022-06-02T00:00:00","fs_origin":"TLV","fs_destination":"BOM","price":372.435}]
}

我不希望它出现在JSON中,只有普通的python字典。谢谢!

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2022-08-03 11:39:39

谢谢你的回答,我发现这个解决方案(在评论中提到)对我来说是最好的:

替换此答案中的最后一行:Collect pyspark dataframe into list by value

这一行:

代码语言:javascript
复制
{row["fs_destination"]: list(map(lambda x: json.loads(x), row["JSON"])) for row in df_output.toLocalIterator()}
票数 0
EN

Stack Overflow用户

发布于 2022-08-03 10:32:22

我们可以在pyspark中创建json字符串,然后使用json.loads()将它们转换为字典。

代码语言:javascript
复制
# create a list of destinations available in the data
fs_dest_list = data_sdf.select('fs_destination').distinct().rdd. \
    map(lambda x: x.fs_destination). \
    collect()

# create a dictionary style string -- `to_json()` returns a string and so will the `collect()`
fs_dict_str = data_sdf. \
    withColumn('all_col_json_str', func.to_json(func.struct(*[func.col(k) for k in data_sdf.columns]))). \
    groupBy(func.lit(1).alias('gk')). \
    pivot('fs_destination', values=fs_dest_list). \
    agg(func.collect_list('all_col_json_str').alias('json_str')). \
    withColumn('dest_json_str', func.to_json(func.struct(*[func.col(k) for k in fs_dest_list]))). \
    select('dest_json_str'). \
    collect()[0][0]

# '{"AUH":["{\"fs_date\":\"2022-06-01T00:00:00\",\"ss_date\":\"2022-06-02T00:00:00\",\"fs_origin\":\"TLV\",\"fs_destination\":\"AUH\",\"price\":681.0715}","{\"fs_date\":\"2022-06-01T00:00:00\",\"ss_date\":\"2022-06-03T00:00:00\",\"fs_origin\":\"TLV\",\"fs_destination\":\"AUH\",\"price\":406.46}"],
#  "BOM":["{\"fs_date\":\"2022-06-01T00:00:00\",\"ss_date\":\"2022-06-02T00:00:00\",\"fs_origin\":\"TLV\",\"fs_destination\":\"BOM\",\"price\":545.7715}","{\"fs_date\":\"2022-06-01T00:00:00\",\"ss_date\":\"2022-06-03T00:00:00\",\"fs_origin\":\"TLV\",\"fs_destination\":\"BOM\",\"price\":372.435}"]}'

使用json.loads()从json字符串创建字典

代码语言:javascript
复制
fs_dict = json.loads(fs_dict_str)

# result
# {'AUH': ['{"fs_date":"2022-06-01T00:00:00","ss_date":"2022-06-02T00:00:00","fs_origin":"TLV","fs_destination":"AUH","price":681.0715}',
#   '{"fs_date":"2022-06-01T00:00:00","ss_date":"2022-06-03T00:00:00","fs_origin":"TLV","fs_destination":"AUH","price":406.46}'],
#  'BOM': ['{"fs_date":"2022-06-01T00:00:00","ss_date":"2022-06-02T00:00:00","fs_origin":"TLV","fs_destination":"BOM","price":545.7715}',
#   '{"fs_date":"2022-06-01T00:00:00","ss_date":"2022-06-03T00:00:00","fs_origin":"TLV","fs_destination":"BOM","price":372.435}']}

由于to_json的结果,列表中的字典仍然是json字符串,这还不够。使用字典键上的for循环使用json.loads创建字典。

代码语言:javascript
复制
for k in fs_dict.keys():
    fs_dict[k] = [json.loads(s) for s in fs_dict[k]]

# {'AUH': [{'fs_date': '2022-06-01T00:00:00',
#    'fs_destination': 'AUH',
#    'fs_origin': 'TLV',
#    'price': 681.0715,
#    'ss_date': '2022-06-02T00:00:00'},
#   {'fs_date': '2022-06-01T00:00:00',
#    'fs_destination': 'AUH',
#    'fs_origin': 'TLV',
#    'price': 406.46,
#    'ss_date': '2022-06-03T00:00:00'}],
#  'BOM': [{'fs_date': '2022-06-01T00:00:00',
#    'fs_destination': 'BOM',
#    'fs_origin': 'TLV',
#    'price': 545.7715,
#    'ss_date': '2022-06-02T00:00:00'},
#   {'fs_date': '2022-06-01T00:00:00',
#    'fs_destination': 'BOM',
#    'fs_origin': 'TLV',
#    'price': 372.435,
#    'ss_date': '2022-06-03T00:00:00'}]}

代码语言:javascript
复制
type(fs_dict)
# dict

for k in fs_dict.keys():
    print(type(fs_dict[k]))
# <class 'list'>
# <class 'list'>

for k in fs_dict.keys():
    print([type(ele) for ele in fs_dict[k]])
# [<class 'dict'>, <class 'dict'>]
# [<class 'dict'>, <class 'dict'>]
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/73213608

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档