首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >无法替换Pyspark中数组列中的特定值。

无法替换Pyspark中数组列中的特定值。
EN

Stack Overflow用户
提问于 2022-10-25 15:24:24
回答 2查看 62关注 0票数 0

我的DF中有一个列,其中的数据类型是:

代码语言:javascript
复制
testcolumn:array  
--element: struct
-----id:integer   
-----configName: string 
-----desc:string  
-----configparam:array
--------element:map  
-------------key:string
-------------value:string 

测试柱

Row1:

代码语言:javascript
复制
[{"id":1,"configName":"test1","desc":"Ram1","configparam":[{"removeit":"[]"}]},
{"id":2,"configName":"test2","desc":"Ram2","configparam":[{"removeit":"[]"}]},
{"id":3,"configName":"test3","desc":"Ram1","configparam":[{"paramId":"4","paramvalue":"200"}]}]    

Row2:

代码语言:javascript
复制
[{"id":11,"configName":"test11","desc":"Ram11","configparam":[{"removeit":"[]"}]},
{"id":33,"configName":"test33","desc":"Ram33","configparam":[{"paramId":"43","paramvalue":"300"}]},
{"id":6,"configName":"test26","desc":"Ram26","configparam":[{"removeit":"[]"}]},
{"id":93,"configName":"test93","desc":"Ram93","configparam":[{"paramId":"93","paramvalue":"3009"}]}
]    

我想移除configparam是"configparam":[{"removeit":"[]"}]"configparam":[]的位置。

期望输出:

输出柱

Row1:

代码语言:javascript
复制
[{"id":1,"configName":"test1","desc":"Ram1","configparam":[]},
{"id":2,"configName":"test2","desc":"Ram2","configparam":[]},
{"id":3,"configName":"test3","desc":"Ram1","configparam":[{"paramId":"4","paramvalue":"200"}]}]    

Row2:

代码语言:javascript
复制
[{"id":11,"configName":"test11","desc":"Ram11","configparam":[]},
{"id":33,"configName":"test33","desc":"Ram33","configparam":[{"paramId":"43","paramvalue":"300"}]},
{"id":6,"configName":"test26","desc":"Ram26","configparam":[]},
{"id":93,"configName":"test93","desc":"Ram93","configparam":[{"paramId":"93","paramvalue":"3009"}]}
]   

我尝试过这段代码,但它并没有给我输出:

代码语言:javascript
复制
test=df.withColumn('outputcolumn',F.expr("translate"(testcolumn,x-> replace(x,':[{"removeit":"[]"}]','[]'))) 

如果有人能帮我,那就太好了。

EN

回答 2

Stack Overflow用户

发布于 2022-10-25 18:02:48

您的testcolumn是一个结构数组,因此您不能按原样执行字符串操作。

你可以做这样的事。当configparam包含一个"removeit“键时,它将完全清空它。

示例:

代码语言:javascript
复制
"configparam":[{"removeit":[], "otherparam": "value"}] -> "configparam": []

火花3.1.0+

代码语言:javascript
复制
array_has_remove = lambda y: ~F.array_contains(F.map_keys(y), 'removeit')

df = (df.withColumn('outputcolumn', 
          F.transform('testcolumn', 
              lambda x: x.withField('configparam', 
                  F.filter(x['configparam'], array_has_remove)
              )
          )
     ))

参考文献:withField过滤器包含关键字

我尝试了没有explode,但这是复杂的。如果您不喜欢这个复杂,可以尝试使用explode和聚合。

代码语言:javascript
复制
df = (# extract configparam to a column for easier access.
      df.withColumn('configparam', F.expr('transform(testcolumn, x -> x.configparam)'))
        # Return empty array if there is a "removeit" otherwise return the original object.
        .withColumn('configparam', 
            F.expr('transform(configparam, x -> 
                        case when array_contains(map_keys(x[0]), "removeit") then array() 
                        else x end)'))
        # Patch the transformed configparam with the rest of testcolumn
        .withColumn('outputcolumn', 
            F.expr('transform(testcolumn, (x, i) -> struct(x.id, x.configName, x.desc, configparam[i] as configparam))'))
        .drop('configparam'))

结果

代码语言:javascript
复制
Row(testcolumn=[Row(id=1, configName='test1', desc='Ram1', configparam=[{'removeit': '[]'}]), Row(id=2, configName='test2', desc='Ram2', configparam=[{'removeit': '[]'}]), Row(id=3, configName='test3', desc='Ram1', configparam=[{'paramId': '4', 'paramvalue': '200'}])], 
  outputcolumn=[Row(id=1, configName='test1', desc='Ram1', configparam=[]), Row(id=2, configName='test2', desc='Ram2', configparam=[]), Row(id=3, configName='test3', desc='Ram1', configparam=[{'paramId': '4', 'paramvalue': '200'}])])
票数 0
EN

Stack Overflow用户

发布于 2022-10-26 06:17:46

你必须执行一连串的爆炸,过滤和groupBy操作来实现这一点。

首先,爆炸数组/struct/map列以到达嵌套列:

代码语言:javascript
复制
df = df.withColumn("id", F.col("testcolumn")["id"])
df = df.withColumn("configName", F.col("testcolumn")["configName"])
df = df.withColumn("desc", F.col("testcolumn")["desc"])
df = df.withColumn("configparam_exploded", F.explode(F.col("testcolumn")["configparam"]))
df = df.select(df.columns + [F.explode(F.col("configparam_exploded"))])

+-----------------------------------------------------+---+----------+----+---------------------------------+----------+-----+
|testcolumn                                           |id |configName|desc|configparam_exploded             |key       |value|
+-----------------------------------------------------+---+----------+----+---------------------------------+----------+-----+
|{1, test1, Ram1, [{removeit -> []}]}                 |1  |test1     |Ram1|{removeit -> []}                 |removeit  |[]   |
|{2, test2, Ram2, [{removeit -> []}]}                 |2  |test2     |Ram2|{removeit -> []}                 |removeit  |[]   |
|{3, test3, Ram1, [{paramId -> 4, paramvalue -> 200}]}|3  |test3     |Ram1|{paramId -> 4, paramvalue -> 200}|paramId   |4    |
|{3, test3, Ram1, [{paramId -> 4, paramvalue -> 200}]}|3  |test3     |Ram1|{paramId -> 4, paramvalue -> 200}|paramvalue|200  |
+-----------------------------------------------------+---+----------+----+---------------------------------+----------+-----+

然后,根据需要过滤数据:

代码语言:javascript
复制
df = df.filter((F.col("key") != "removeit") | (F.col("value") != "[]"))

+-----------------------------------------------------+---+----------+----+---------------------------------+----------+-----+
|testcolumn                                           |id |configName|desc|configparam_exploded             |key       |value|
+-----------------------------------------------------+---+----------+----+---------------------------------+----------+-----+
|{3, test3, Ram1, [{paramId -> 4, paramvalue -> 200}]}|3  |test3     |Ram1|{paramId -> 4, paramvalue -> 200}|paramId   |4    |
|{3, test3, Ram1, [{paramId -> 4, paramvalue -> 200}]}|3  |test3     |Ram1|{paramId -> 4, paramvalue -> 200}|paramvalue|200  |
+-----------------------------------------------------+---+----------+----+---------------------------------+----------+-----+

最后,将所有单独的列返回到原始包装:

代码语言:javascript
复制
df = df.withColumn("configparam_map", F.map_from_entries(F.array(F.struct("key", "value"))))
df = df.groupBy(["id", "configName", "desc"]).agg(F.collect_list("configparam_map").alias("configparam"))
df = df.withColumn("testcolumn", F.struct("id", "configName", "desc", "configparam"))
df = df.drop("id", "configName", "desc", "configparam")

+-------------------------------------------------------+
|testcolumn                                             |
+-------------------------------------------------------+
|{3, test3, Ram1, [{paramId -> 4}, {paramvalue -> 200}]}|
+-------------------------------------------------------+

用于重现问题的样本数据集:

代码语言:javascript
复制
schema = StructType([StructField('testcolumn', StructType([StructField('id', IntegerType(), True), StructField('configName', StringType(), True), StructField('desc', StringType(), True), StructField('configparam', ArrayType(MapType(StringType(), StringType(), True), True), True)]), True)])

data = [
  Row(Row(1, "test1", "Ram1", [{"removeit":"[]"}])),
  Row(Row(2, "test2", "Ram2", [{"removeit":"[]"}])),
  Row(Row(3, "test3", "Ram1", [{"paramId":"4","paramvalue":"200"}]))    
]

df = spark.createDataFrame(data = data, schema = schema)
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/74196479

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档