我正在创建一个加速器,它将数据从源迁移到目的地。例如,我将从API中选择数据并将数据迁移到csv。当数据转换为csv时,我遇到了处理arraytype的问题。我使用了withColumn和concat_ws方法(即df1=df.withColumn(‘df1=df.withColumn’,F.concat_ws(':',F.col(“薄膜”),薄膜是阵列式柱)。现在我希望这件事能动态发生。我的意思是,在不指定列名的情况下,我是否可以从具有arraytype的结构中选择列名,然后调用udf?
谢谢您抽时间见我!
发布于 2021-04-29 17:00:11
可以使用df.schema获取列的类型。根据列的类型,可以应用ws或不应用:
data = [["test1", "test2", [1,2,3], ["a","b","c"]]]
schema= ["col1", "col2", "arr1", "arr2"]
df = spark.createDataFrame(data, schema)
array_cols = [F.concat_ws(":", c.name).alias(c.name) \
for c in df.schema if isinstance(c.dataType, T.ArrayType) ]
other_cols = [F.col(c.name) \
for c in df.schema if not isinstance(c.dataType, T.ArrayType) ]
df = df.select(other_cols + array_cols)结果:
+-----+-----+-----+-----+
| col1| col2| arr1| arr2|
+-----+-----+-----+-----+
|test1|test2|1:2:3|a:b:c|
+-----+-----+-----+-----+https://stackoverflow.com/questions/67302267
复制相似问题