我使用火花放电与Azure-Synapse相结合。我正在使用下面的示例在dataframe中读取具有相同结构的多个嵌套JSON:
{
"AmountOfOrders": 2,
"TotalEarnings": 1800,
"OrderDetails": [
{
"OrderNumber": 1,
"OrderDate": "2022-7-06",
"OrderLine": [
{
"LineNumber": 1,
"Product": "Laptop",
"Price": 1000
},
{
"LineNumber": 2,
"Product": "Tablet",
"Price": 500
},
{
"LineNumber": 3,
"Product": "Mobilephone",
"Price": 300
}
]
},
{
"OrderNumber": 2,
"OrderDate": "2022-7-06",
"OrderLine": [
{
"LineNumber": 1,
"Product": "Printer",
"Price": 100,
"Discount": 0
},
{
"LineNumber": 2,
"Product": "Paper",
"Price": 50,
"Discount": 0
},
{
"LineNumber": 3,
"Product": "Toner",
"Price": 30,
"Discount": 0
}
]
}
]
}我正在尝试使用一个通用函数来获取generic 1的LineNumbers,该函数提取数据的数组和结构。使用以下代码:
def read_nested_structure(df,excludeList,messageType,coll):
display(df.limit(10))
print('read_nested_structure')
cols =[]
match = 0
match_field = ""
print(df.schema[coll].dataType.fields)
for field in df.schema[coll].dataType.fields:
for c in excludeList:
if c == field.name:
print('Match = ' + field.name)
match = 1
if match == 0:
# cols.append(coll)
cols.append(col(coll + "." + field.name).alias(field.name))
match = 0
# cols.append(coll)
print(cols)
df = df.select(cols)
return df
def read_nested_structure_2(df,excludeList,messageType):
cols =[]
match = 0
for coll in df.schema.names:
if isinstance(df.schema[coll].dataType, ArrayType):
print( coll + "-- : Array")
df = df.withColumn(coll, explode(coll).alias(coll))
cols.append(coll)
elif isinstance(df.schema[coll].dataType, StructType):
if messageType == 'Header':
for field in df.schema[coll].dataType.fields:
cols.append(col(coll + "." + field.name).alias(coll + "_" + field.name))
elif messageType == 'Content':
print('Struct - Content')
for field in df.schema[coll].dataType.fields:
cols.append(col(coll + "." + field.name).alias(field.name))
else:
for c in excludeList:
if c == coll:
match = 1
if match == 0:
cols.append(coll)
df = df.select(cols)
return df
df = spark.read.load(datalakelocation + '/test.json', format='json')
df = unpack_to_content_dataframe_simple_2(df,exclude)
df = df.filter(df.OrderNumber == 1)
df = unpack_to_content_dataframe_simple_2(df,exclude)
display(df.limit(10))这将产生以下数据:

如您所见,黄色标记的属性被添加到不属于OrderNumber 1的dataframe中。我如何在dataframe中筛选行,这将导致更新模式(在这种情况下,没有折扣属性)?
发布于 2022-07-11 18:43:22
,
read_nested_structure_2()函数的方式如下所示,以获得与您相同的结果。我使用read_nested_structure_2()获取此结果的代码如下所示:x = read_nested_structure_2(df,[],'Header')
y = read_nested_structure_2(x,[],'Content')
y = y.filter(y.OrderNumber == 1)
z = read_nested_structure_2(y,[],'Header')
final = read_nested_structure_2(z,[],'Content')
display(final)使用此代码后的输出如下:

Product指定了列Discount,也将创建它。为了删除该列,我们必须单独进行,以获得另一个没有Discount的数据格式(只有在它无效的情况下)。Discount)。这样做会使代码复杂化。相反,我们可以编写另一个为我们工作的函数
null的列。下面是可用于执行此操作的函数。def exclude_fields_that_dont_exist(filtered_df):
cols=[]
#iterate through columns
for column in filtered_df.columns:
#null_count is the count of null values in a column
null_count = filtered_df.filter(filtered_df[column].isNull()).count()
#check if null_count equals the total column value count
#if they are equal, those columns are not required (Like Discount)
if(filtered_df.select(column).count() != null_count):
cols.append(column)
#return dataframe with required columns
return filtered_df.select(*cols)final)上使用这个函数时,您将得到一个结果数据,如下所示:mydf = exclude_fields_that_dont_exist(final)
# removes columns from a dataframe that have all null values.
display(mydf)

注:
例如,对于
OrderNumber=1,产品Laptop有10%的折扣,其余的产品(对于相同的订单号)没有折扣值(在JSON中)。函数需要包含information.列,因为它是必需的
为了避免在函数中使用更多的循环,您还可以考虑用0替换所有空值,因为没有指定值的Product ( null 值)与带有Discount值的产品相同(如果这是可行的,则可以使用fill() or fillna()函数E 162来用任何所需的Discount填充空值。
https://stackoverflow.com/questions/72939824
复制相似问题