我有以下数据:
+-------------------+-------------------+---------+--------------+--------+
| fs_date| ss_date|fs_origin|fs_destination| price|
+-------------------+-------------------+---------+--------------+--------+
|2022-06-01T00:00:00|2022-06-02T00:00:00| TLV| AUH|681.0715|
|2022-06-01T00:00:00|2022-06-03T00:00:00| TLV| AUH| 406.46|
|2022-06-01T00:00:00|2022-06-02T00:00:00| TLV| BOM|545.7715|
|2022-06-01T00:00:00|2022-06-03T00:00:00| TLV| BOM| 372.435|我想将整个数据收集到一个字典列表中,按'fs_destination‘进行partitiond如下:
{ "AUH":
[{"fs_date":"2022-06-01T00:00:00","ss_date":"2022-06-02T00:00:00","fs_origin":"TLV","fs_destination":"AUH","price":681.0715},
{"fs_date":"2022-06-01T00:00:00","ss_date":"2022-06-03T00:00:00","fs_origin":"TLV","fs_destination":"AUH","price":406.46}],
"BOM":
[{"fs_date":"2022-06-01T00:00:00","ss_date":"2022-06-02T00:00:00","fs_origin":"TLV","fs_destination":"BOM","price":545.7715},
{"fs_date":"2022-06-01T00:00:00","ss_date":"2022-06-02T00:00:00","fs_origin":"TLV","fs_destination":"BOM","price":372.435}]
}我不希望它出现在JSON中,只有普通的python字典。谢谢!
发布于 2022-08-03 11:39:39
谢谢你的回答,我发现这个解决方案(在评论中提到)对我来说是最好的:
替换此答案中的最后一行:Collect pyspark dataframe into list by value
这一行:
{row["fs_destination"]: list(map(lambda x: json.loads(x), row["JSON"])) for row in df_output.toLocalIterator()}发布于 2022-08-03 10:32:22
我们可以在pyspark中创建json字符串,然后使用json.loads()将它们转换为字典。
# create a list of destinations available in the data
fs_dest_list = data_sdf.select('fs_destination').distinct().rdd. \
map(lambda x: x.fs_destination). \
collect()
# create a dictionary style string -- `to_json()` returns a string and so will the `collect()`
fs_dict_str = data_sdf. \
withColumn('all_col_json_str', func.to_json(func.struct(*[func.col(k) for k in data_sdf.columns]))). \
groupBy(func.lit(1).alias('gk')). \
pivot('fs_destination', values=fs_dest_list). \
agg(func.collect_list('all_col_json_str').alias('json_str')). \
withColumn('dest_json_str', func.to_json(func.struct(*[func.col(k) for k in fs_dest_list]))). \
select('dest_json_str'). \
collect()[0][0]
# '{"AUH":["{\"fs_date\":\"2022-06-01T00:00:00\",\"ss_date\":\"2022-06-02T00:00:00\",\"fs_origin\":\"TLV\",\"fs_destination\":\"AUH\",\"price\":681.0715}","{\"fs_date\":\"2022-06-01T00:00:00\",\"ss_date\":\"2022-06-03T00:00:00\",\"fs_origin\":\"TLV\",\"fs_destination\":\"AUH\",\"price\":406.46}"],
# "BOM":["{\"fs_date\":\"2022-06-01T00:00:00\",\"ss_date\":\"2022-06-02T00:00:00\",\"fs_origin\":\"TLV\",\"fs_destination\":\"BOM\",\"price\":545.7715}","{\"fs_date\":\"2022-06-01T00:00:00\",\"ss_date\":\"2022-06-03T00:00:00\",\"fs_origin\":\"TLV\",\"fs_destination\":\"BOM\",\"price\":372.435}"]}'使用json.loads()从json字符串创建字典
fs_dict = json.loads(fs_dict_str)
# result
# {'AUH': ['{"fs_date":"2022-06-01T00:00:00","ss_date":"2022-06-02T00:00:00","fs_origin":"TLV","fs_destination":"AUH","price":681.0715}',
# '{"fs_date":"2022-06-01T00:00:00","ss_date":"2022-06-03T00:00:00","fs_origin":"TLV","fs_destination":"AUH","price":406.46}'],
# 'BOM': ['{"fs_date":"2022-06-01T00:00:00","ss_date":"2022-06-02T00:00:00","fs_origin":"TLV","fs_destination":"BOM","price":545.7715}',
# '{"fs_date":"2022-06-01T00:00:00","ss_date":"2022-06-03T00:00:00","fs_origin":"TLV","fs_destination":"BOM","price":372.435}']}由于to_json的结果,列表中的字典仍然是json字符串,这还不够。使用字典键上的for循环使用json.loads创建字典。
for k in fs_dict.keys():
fs_dict[k] = [json.loads(s) for s in fs_dict[k]]
# {'AUH': [{'fs_date': '2022-06-01T00:00:00',
# 'fs_destination': 'AUH',
# 'fs_origin': 'TLV',
# 'price': 681.0715,
# 'ss_date': '2022-06-02T00:00:00'},
# {'fs_date': '2022-06-01T00:00:00',
# 'fs_destination': 'AUH',
# 'fs_origin': 'TLV',
# 'price': 406.46,
# 'ss_date': '2022-06-03T00:00:00'}],
# 'BOM': [{'fs_date': '2022-06-01T00:00:00',
# 'fs_destination': 'BOM',
# 'fs_origin': 'TLV',
# 'price': 545.7715,
# 'ss_date': '2022-06-02T00:00:00'},
# {'fs_date': '2022-06-01T00:00:00',
# 'fs_destination': 'BOM',
# 'fs_origin': 'TLV',
# 'price': 372.435,
# 'ss_date': '2022-06-03T00:00:00'}]}type(fs_dict)
# dict
for k in fs_dict.keys():
print(type(fs_dict[k]))
# <class 'list'>
# <class 'list'>
for k in fs_dict.keys():
print([type(ele) for ele in fs_dict[k]])
# [<class 'dict'>, <class 'dict'>]
# [<class 'dict'>, <class 'dict'>]https://stackoverflow.com/questions/73213608
复制相似问题