首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >PySpark:爆炸性模式列与底层嵌套模式不匹配

PySpark:爆炸性模式列与底层嵌套模式不匹配
EN

Stack Overflow用户
提问于 2022-07-11 14:02:22
回答 1查看 100关注 0票数 0

我使用火花放电与Azure-Synapse相结合。我正在使用下面的示例在dataframe中读取具有相同结构的多个嵌套JSON:

代码语言:javascript
复制
{
    "AmountOfOrders": 2,
    "TotalEarnings": 1800,
    "OrderDetails": [
        {
            "OrderNumber": 1,
            "OrderDate": "2022-7-06",
            "OrderLine": [
                {
                    "LineNumber": 1,
                    "Product": "Laptop",
                    "Price": 1000
                },
                {
                    "LineNumber": 2,
                    "Product": "Tablet",
                    "Price": 500
                },
                {
                    "LineNumber": 3,
                    "Product": "Mobilephone",
                    "Price": 300
                }
            ]
        },
        {
            "OrderNumber": 2,
            "OrderDate": "2022-7-06",
            "OrderLine": [
                {
                    "LineNumber": 1,
                    "Product": "Printer",
                    "Price": 100,
                    "Discount": 0
                },
                {
                    "LineNumber": 2,
                    "Product": "Paper",
                    "Price": 50,
                    "Discount": 0
                },
                {
                    "LineNumber": 3,
                    "Product": "Toner",
                    "Price": 30,
                    "Discount": 0
                }
            ]
        }
    ]
}

我正在尝试使用一个通用函数来获取generic 1的LineNumbers,该函数提取数据的数组和结构。使用以下代码:

代码语言:javascript
复制
def read_nested_structure(df,excludeList,messageType,coll):
    display(df.limit(10))
    print('read_nested_structure')
    cols =[]
    match = 0
    match_field = ""
    print(df.schema[coll].dataType.fields)
    for field in df.schema[coll].dataType.fields:

        for c in excludeList:
            if c == field.name:
                print('Match = ' + field.name)
                match = 1
        if match == 0:   
            # cols.append(coll)
            cols.append(col(coll + "." + field.name).alias(field.name))
        match = 0 
        #         cols.append(coll)
    print(cols)  
    df = df.select(cols)
    return df

def read_nested_structure_2(df,excludeList,messageType):
    cols =[]
    match = 0
    for coll in df.schema.names:
        if isinstance(df.schema[coll].dataType, ArrayType):
            print(  coll +  "-- : Array")
            df = df.withColumn(coll, explode(coll).alias(coll))
            cols.append(coll)

        elif isinstance(df.schema[coll].dataType, StructType):
            if messageType == 'Header':
                for field in df.schema[coll].dataType.fields:
                    cols.append(col(coll + "." + field.name).alias(coll + "_" + field.name))
            
            elif messageType == 'Content':
                print('Struct - Content')
                for field in df.schema[coll].dataType.fields:
                    cols.append(col(coll + "." + field.name).alias(field.name))

        else:
            for c in excludeList:
                if c == coll:
                    match = 1
            if match == 0:
                cols.append(coll)
                        
    df = df.select(cols)
    return df

df = spark.read.load(datalakelocation + '/test.json', format='json')
df =  unpack_to_content_dataframe_simple_2(df,exclude)
df = df.filter(df.OrderNumber == 1)
df =  unpack_to_content_dataframe_simple_2(df,exclude)
display(df.limit(10))

这将产生以下数据:

如您所见,黄色标记的属性被添加到不属于OrderNumber 1的dataframe中。我如何在dataframe中筛选行,这将导致更新模式(在这种情况下,没有折扣属性)?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-07-11 18:43:22

  • ,我使用read_nested_structure_2()函数的方式如下所示,以获得与您相同的结果。我使用read_nested_structure_2()获取此结果的代码如下所示:

代码语言:javascript
复制
x = read_nested_structure_2(df,[],'Header')
y = read_nested_structure_2(x,[],'Content')

y = y.filter(y.OrderNumber == 1)

z = read_nested_structure_2(y,[],'Header')
final = read_nested_structure_2(z,[],'Content')

display(final)

使用此代码后的输出如下:

  • 即使在整个输入JSON中为一个Product指定了列Discount,也将创建它。为了删除该列,我们必须单独进行,以获得另一个没有Discount的数据格式(只有在它无效的情况下)。

  • 您将使用相同的函数从StructTypeArrayType中提取数据,因此不建议编写代码来删除所有空值的字段(例如Discount)。这样做会使代码复杂化。

相反,我们可以编写另一个为我们工作的函数

  • 。此函数应移除其所有值均为null的列。下面是可用于执行此操作的函数。

代码语言:javascript
复制
def exclude_fields_that_dont_exist(filtered_df):
    cols=[]

    #iterate through columns
    for column in filtered_df.columns:
        
        #null_count is the count of null values in a column
        null_count = filtered_df.filter(filtered_df[column].isNull()).count() 

        #check if null_count equals the total column value count
        #if they are equal, those columns are not required (Like Discount)
        if(filtered_df.select(column).count() != null_count):
            cols.append(column)

    #return dataframe with required columns
    return filtered_df.select(*cols)

  • 当您在过滤的dataframe (在我的例子中是final)上使用这个函数时,您将得到一个结果数据,如下所示:

代码语言:javascript
复制
mydf = exclude_fields_that_dont_exist(final) 
# removes columns from a dataframe that have all null values.

display(mydf)

注:

例如,对于

  • ,对于OrderNumber=1,产品Laptop有10%的折扣,其余的产品(对于相同的订单号)没有折扣值(在JSON中)。

函数需要包含information.列,因为它是必需的

为了避免在函数中使用更多的循环,您还可以考虑用0替换所有空值,因为没有指定值的Product ( null 值)与带有Discount值的产品相同(如果这是可行的,则可以使用fill() or fillna()函数E 162来用任何所需的Discount填充空值。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/72939824

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档