我有一张有地图的桌子。我想在map -1.keyColumn2.Values列中创建两个单独的列。
input.show();
+---------------------+--------------------+--------------+----------------+---------------+--------------+-----------------------+
|addedSkuWithTimestamp| fbaSKUAdditions|fbaSKURemovals| merchantId|mfnSKUAdditions|mfnSKURemovals|removedSkuWithTimestamp|
+---------------------+--------------------+--------------+----------------+---------------+--------------+-----------------------+
| [Test1 -> 1234567...|[CLAM_SUNGLASS_CA...| []| A4QA5OYD4Y45F| []| null| null|
| null|[CLAM_SUNGLASS_CA...| []| A4QA5OYD4Y45F| []| null| null|
| null|[CLAM_SUNGLASS_CA...| []| A4QA5OYD4Y45F| []| null| null|
| null|[CLAM_SUNGLASS_CA...| []| A4QA5OYD4Y45F| []| null| null|
| null|[CLAM_SUNGLASS_CA...| []| A4QA5OYD4Y45F| []| null| null|
| null|[CLAM_SUNGLASS_CA...| []| A4QA5OYD4Y45F| []| null| null|
| null|[CLAM_SUNGLASS_CA...| []| A4QA5OYD4Y45F| []| null| null|
| null|[CLAM_SUNGLASS_CA...| []|ANOTHER_MERCHANT| []| null| null|
| null|[CLAM_SUNGLASS_CA...| []|ANOTHER_MERCHANT| []| null| null|
+---------------------+--------------------+--------------+----------------+---------------+--------------+-----------------------+但我希望输出结果为
test1 123456789
Test2 123456780 如何从map中获取两个不同的列(键列和值列)?
Dataset<Row> removed_skus = input
.withColumn("sku", functions.explode(input.col("removedSkuWithTimestamp")))
.withColumn("skuType", functions.lit("MFN"))
.select(input.col("merchantId").alias("merchant_id"), new Column("sku").,
new Column("skuType"))
.distinct()
.groupBy("merchant_id")
.agg(functions.collect_list("sku").alias("removedSkus"));发布于 2019-04-07 05:19:00
首先,让我们创建一些数据:
val df = Seq(
(Map("sku1"->"timestamp1"), "AFN"),
(Map("sku2"->"timestamp2"), "AFN"),
(null, "AFN")
).toDF("addedSkuWithTimestamp", "skuType")
.show(false)
+---------------------+-------+
|addedSkuWithTimestamp|skuType|
+---------------------+-------+
| [sku1 -> timestamp1]| AFN|
| [sku2 -> timestamp2]| AFN|
| null| AFN|
+---------------------+-------+它将具有以下架构:
scala> df.printSchema()
root
|-- addedSkuWithTimestamp: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
|-- skuType: string (nullable = true)火花< 2.3
下面的代码将使用mapToTupleUDF udf函数从addedSkuWithTimestamp列中提取列sku_key和sku_value:
val mapToTupleUDF = udf((sku: Map[String, String]) => if(sku != null) sku.toSeq(0) else null)
df.withColumn("addedSkuWithTimestamp", mapToTupleUDF($"addedSkuWithTimestamp"))
.withColumn("Sku", when($"addedSkuWithTimestamp".isNotNull, $"addedSkuWithTimestamp._1"))
.withColumn("Timestamp", when($"addedSkuWithTimestamp".isNotNull, $"addedSkuWithTimestamp._2"))
.show(false)
+---------------------+-------+----+----------+
|addedSkuWithTimestamp|skuType|Sku |Timestamp |
+---------------------+-------+----+----------+
|[sku1, timestamp1] |AFN |sku1|timestamp1|
|[sku2, timestamp2] |AFN |sku2|timestamp2|
|null |AFN |null|null |
+---------------------+-------+----+----------+请注意,只有当addedSkuWithTimestamp 不为空时,我们才能访问。
Spark >= 2.3
从Spark2.3.0开始,你可以使用内置的map_values和map_keys
df.withColumn("Sku", map_keys($"addedSkuWithTimestamp").getItem(0))
.withColumn("Timestamp", map_values($"addedSkuWithTimestamp").getItem(0))
.show(false)输出:
+---------------------+-------+----+----------+
|addedSkuWithTimestamp|skuType|Sku |Timestamp |
+---------------------+-------+----+----------+
|[sku1 -> timestamp1] |AFN |sku1|timestamp1|
|[sku2 -> timestamp2] |AFN |sku2|timestamp2|
|null |AFN |null|null |
+---------------------+-------+----+----------+发布于 2020-03-27 01:23:57
使用来自其他答案的相同输入
val df = Seq(
(Map("timestamp1"->1585008000, "timestamp3"-> 1584921600), "AFN"),
(Map("timestamp2"-> 1584835200), "AFN"),
(null, "AFN")
).toDF("addedSkuWithTimestamp", "skuType")尝试使用explode,我在spark 2.2.1和2.3.1中进行了测试
df.select(explode($"addedSkuWithTimestamp")).show(false)
+----------+----------+
|key |value |
+----------+----------+
|timestamp1|1585008000|
|timestamp3|1584921600|
|timestamp2|1584835200|
+----------+----------+https://stackoverflow.com/questions/55543394
复制相似问题