我有一个火花数据格式,它有多个列,其中一个度量列数据类型是字符串格式的。这个列数据如下所示,我需要将它转换成多个列。我尝试使用星火rdd映射函数,它显示了一个类型值错误。
架构:列1,
column2,
度量字符串
样本数据:
[{"name": "ABC","kt":
[{"name": "AB-1",
"values":
[{"date": "2021-05-21 08:04:56.000", "value":0.05520880298702948},
{"date": "2021-05-21 08:05:56.000", "value": 0.6873692705340528},
{"date": "2021-05-21 08:06:56.000", "value": 1.0036619131928861},
{"date": "2021-05-21 08:07:56.000", "value": 0.7431644238409444},
{"date": "2021-05-21 08:08:56.000", "value": 0.9845464929057735},
{"date": "2021-05-21 08:09:56.000", "value": 1.0010811066472702},
{"date": "2021-05-21 08:10:56.000", "value": 1.0009814513959714},
{"date": "2021-05-21 08:11:56.000", "value": 1.001614167307074},
{"date": "2021-05-21 08:12:56.000", "value": 1.001766291527917},
{"date": "2021-05-21 08:13:56.000", "value": 0.5865639742905218},
{"date": "2021-05-21 08:14:56.000", "value": 1.0015836161251768},
{"date": "2021-05-21 08:15:56.000", "value": 0.3571215446400451}]},
{"name": "BC-2",
"values":
[{"date": "2021-05-21 08:04:56.000", "value": 0.14044187962813096},
{"date": "2021-05-21 08:05:56.000", "value": 0.7565445799225486},
{"date": "2021-05-21 08:06:56.000", "value": 1.0017136900856412},
{"date": "2021-05-21 08:07:56.000", "value": 1.001730692743276},
{"date": "2021-05-21 08:08:56.000", "value": 1.0010340874676533},
{"date": "2021-05-21 08:09:56.000", "value": 1.0007168399510786},
{"date": "2021-05-21 08:10:56.000", "value": 1.0017091878186537},
{"date": "2021-05-21 08:11:56.000", "value": 1.0004370714489406},
{"date": "2021-05-21 08:12:56.000", "value": 1.0015819812456357},
{"date": "2021-05-21 08:13:56.000", "value": 0.7370171481823211},
{"date": "2021-05-21 08:14:56.000", "value": 1.001703540193026},
{"date": "2021-05-21 08:15:56.000", "value": 0.5519119341514123}]}]发布于 2022-06-18 08:20:13
首先,您可能需要检查您的JSON是否确实格式化正确,因为提供的示例数据是无效的JSON。
假设您的意图是将嵌套的JSON结构扁平化为行、日期、时间和值,那么您可以尝试类似这样的方法:
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
import pyspark.sql.types as t
spark = SparkSession.builder.getOrCreate()
# example valid JSON
json = """
[
{
"name": "ABC",
"kt": [
{
"name": "AB-1",
"values": [
{
"date": "2021-05-21 08:04:56.000",
"value": 0.05520880298702948
}
]
},
{
"name": "BC-2",
"values": [
{
"date": "2021-05-21 08:04:56.000",
"value": 0.14044187962813096
}
]
}
]
}
]
"""
json_df = spark.createDataFrame([(1, json)],['id', 'json'])
# define the schema for the dataframe
schema = t.ArrayType(
t.StructType([
t.StructField('name', t.StringType()),
t.StructField('kt', t.ArrayType(
t.StructType([
t.StructField('name', t.StringType()),
t.StructField('values', t.ArrayType(
t.StructType([
t.StructField('date', t.TimestampType()),
t.StructField('value', t.DoubleType())
])
))
])
))
])
)
# select the string value in the 'json' column using the defined schema
df = json_df.select(
f.from_json(f.col('json'), schema).alias('json')
)
# flatten the nested structure to expose date and properties as columns
df.select(
f.explode('json')
).select(
f.col('col.name').alias('name_0'),
f.explode('col.kt')
).select(
'name_0',
'col.*'
).select(
'name_0',
f.col('name').alias('name_1'),
f.explode('values')
).select(
'name_0',
'name_1',
'col.*'
).show()预期产出:
+------+------+-------------------+-------------------+
|name_0|name_1| date| value|
+------+------+-------------------+-------------------+
| ABC| AB-1|2021-05-21 08:04:56|0.05520880298702948|
| ABC| BC-2|2021-05-21 08:04:56|0.14044187962813096|
+------+------+-------------------+-------------------+发布于 2022-06-18 08:50:48
df = spark.read.json("<Your JSON file path>")
df.write.option("header","true").csv("<Your CSV file path>")https://stackoverflow.com/questions/72665815
复制相似问题