首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何从JSON键/值对中提取数据,如果密钥也具有实际值

如何从JSON键/值对中提取数据,如果密钥也具有实际值
EN

Stack Overflow用户
提问于 2022-11-16 05:15:01
回答 1查看 88关注 0票数 1

我正在尝试使用PySpark DataFrame API解析/扁平JSON数据。挑战是,我需要从实际的键/属性中提取一个数据元素('Id'),并且只筛选具有'statC‘属性的行。首先,我试图引爆JSON对象,但得到了一个错误。是否有更好的方法来提取这些数据?

JSON输入文件:

代码语言:javascript
复制
{
  "changes": {
    "1": [
      {
        "Name": "ABC-1",
        "statC": {
          "newValue": 10
        },
        "column": {
          "notDone": true,
          "newStatus": "10071"
        }
      }
    ],
    "2": [
      {
        "Name": "ABC-2",
        "added": true
      }
    ],
    "3": [
      {
        "Name": "ABC-3",
        "column": {
          "notDone": true,
          "newStatus": "10071"
        }
      }
    ],
    "4": [
      {
        "Name": "ABC-4",
        "statC": {
          "newValue": 40
        }
      }
    ],
    "5": [
      {
        "Name": "ABC-5",
        "statC": {
          "newValue": 50
        },
        "column": {
          "notDone": false,
          "done": true,
          "newStatus": "13685"
        }
      }
    ],
    "6": [
      {
        "Name": "ABC-61",
        "added": true
      },
      {
        "Name": "ABC-62",
        "statC": {
          "oldValue": 60
        }
      }
    ],
    "7": [
      {
        "Name": "ABC-70",
        "added": true
      },
      {
        "Name": "ABC-71",
        "statC": {
          "newValue": 70
        }
      }    
      {
        "Name": "ABC-72",
        "statC": {
          "newValue": 75
        }
      }
    ]
  },
  "startTime": 1666188060000,
  "endTime": 1667347140000,
  "activatedTime": 1666188126953,
  "now": 1667294686212
}

期望产出:

代码语言:javascript
复制
Id  Name   statC_NewValue 
1   ABC-1  10      
4   ABC-4  40     
5   ABC-5  50 
7   ABC-71 70
7   ABC-72 75

我的PySpark代码:

代码语言:javascript
复制
from pyspark.sql.functions import * 
rawDF = spark.read.json([f"abfss://{pADLSContainer}@{pADLSGen2}.dfs.core.windows.net/{pADLSDirectory}/InputFile.json"], multiLine = "true")

idDF = rawDF.select(explode("changes").alias("changes_json"))

错误:

AnalysisException:无法解决数据类型不匹配导致的“爆炸(changes)”:函数爆炸的输入应该是数组或映射类型,而不是struct.

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-11-17 05:56:24

它有很多东西。首先,您可以创建一个列列表( 123等),这些列满足您的条件,选择它们作为列,修改数组,取消枢轴并展开为列。

代码语言:javascript
复制
from pyspark.sql import functions as F

df_raw = spark.read.json(["file.json"], multiLine =True)
df = df_raw.select('changes.*')

df0 = df.select([F.col(c)[0].alias(c) for c in df.columns])
cols = [c for c in df0.columns
          if 'statC' in df0.schema[c].dataType.names and
             'newValue' in df0.schema[c].dataType['statC'].dataType.names
]
df = df.select(
    [F.transform(c, lambda x: F.struct(
        x.Name.alias('Name'),
        x.statC.newValue.alias('statC_NewValue'))).alias(c)
     for c in cols]
)
to_melt = [f"'{c}', `{c}`" for c in df.columns]
df = df.selectExpr(f"stack({len(to_melt)}, {','.join(to_melt)}) (Id, val)")
df = df.selectExpr("Id", "inline(val)")
df = df.filter('statC_NewValue is not null')

df.show()
# +---+------+--------------+
# | Id|  Name|statC_NewValue|
# +---+------+--------------+
# |  1| ABC-1|            10|
# |  4| ABC-4|            40|
# |  5| ABC-5|            50|
# |  7|ABC-71|            70|
# |  7|ABC-72|            75|
# +---+------+--------------+
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/74455493

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档