DF1是我现在拥有的,我想让DF1看起来像DF2。
期望产出:
DF1 DF2
+---------+----------------------------------------+ +---------+-------------------------------------------------------------------+
| ID | Category | | ID | category_name |
+---------+----------------------------------------+ +---------+-------------------------------------------------------------------+
| 31898 | CP Bill Payment | | 31898 | CP Bill Payment + CP e-Transfer + CP IMT (CPS Limit + CPS Payee) |
| 31898 | CP e-Transfer + CP IMT | | 32614 | CP Bill Payment + CP e-Transfer + CP Other Transfer (CPS Blocked)|
| 31898 | CPS Limit + CPS Payee | | 35431 | CP Bill Payment + CP e-Transfer |
| 32614 | CP e-Transfer + CP Other Transfer | | 33987 | CP IMT (CPS Limit) |
| 32614 | CP Bill Payment | =====> | 35672 | CPS Blocked |
| 32614 | CPS Blocked | =====> | 37612 | CPS Blocked + CPS Stop/Cancel/Reverse |
| 35431 | CP e-Transfer | +---------+-------------------------------------------------------------------+
| 35431 | CP Bill Payment |
| 33987 | CP IMT |
| 33987 | CPS Limit |
| 35672 | CPS Blocked |
| 37612 | CPS Blocked + CPS Stop/Cancel/Reverse|
+---------+----------------------------------------+我的代码如下:
val DF2 = DF1.groupBy("ID").agg(collect_set("Category").as("CategorySet"))
.groupBy("ID")
.agg(collect_set("Category").as("CategorySet"))
.withColumn( "category_name",
when(array_contains($"CategorySet", "CP Bill Payment") && array_contains($"CategorySet", "CP e-Transfer + CP IMT") && array_contains($"CategorySet", "CPS Limit + CPS Payee"), "CP Bill Payment + CP e-Transfer + CP IMT (CPS Limit + CPS Payee)").otherwise("---other conditions---"))
.select("ID","category_name")逻辑是,对于相同的ID,31898/32614/33987:如果包含CP*和CPS*,则应为CP* ( CPS* )或CP* + CP* (CPS*);35431:如果数组中没有CPS*,只需使用+连接数组中的所有元素;35672/37612:否则,仅为数组中的元素。顺便说一句,类别应该按升序排序。
代码起作用了,只是有太多可能的组合。如何使用UDF来做同样的事情?或者有什么内置功能可以做到这一点?提前谢谢你
发布于 2018-09-05 18:43:24
我现在能想到的是:
//UDF
def mapColumn(col: String) = udf { (xs: Seq[String]) =>
xs.map { x =>
if (x.contains(col+" ")) x else null
}.filter(_ != null).mkString(" + ")
}
import org.apache.spark.sql.functions._
val df1 = df.groupBy("Id").agg(
mapColumn("CP")(sort_array(collect_set("Category"))).as("CategorySetCP"),
mapColumn("CPS")(sort_array(collect_set("Category"))).as("CategorySetCPS")
).withColumn("CategorySetCPS_New",concat(lit(" ("),'CategorySetCPS,lit(")")))
.withColumn("category_name",
when(length($"CategorySetCP") > 0 and length($"CategorySetCPS") > 0,concat($"CategorySetCP",$"CategorySetCPS_New")).
otherwise(when(length($"CategorySetCP") >0 and length($"CategorySetCPS") === 0,$"CategorySetCP").
otherwise($"CategorySetCPS"))
)
.select('Id,'category_name)
df1.show(false)输出:
+-----+-----------------------------------------------------------------+
|Id |category_name |
+-----+-----------------------------------------------------------------+
|33987|CP IMT (CPS Limit) |
|32614|CP Bill Payment + CP e-Transfer + CP Other Transfer (CPS Blocked)|
|35672|CPS Blocked |
|35431|CP Bill Payment + CP e-Transfer |
|31898|CP Bill Payment + CP e-Transfer + CP IMT (CPS Limit + CPS Payee) |
|35612|CPS Blocked + CPS Stop/Cancel/Reverse |
+-----+-----------------------------------------------------------------+ 希望这能有所帮助!
发布于 2018-09-05 16:57:12
这是如何使用UDAFs的一个示例。显然,您不需要通过id连接列值,但它允许添加更多的逻辑。例如,要通过ID字段连接这些值,您可以创建一个如下所示的联新议程:
class ConcatenateStrings extends UserDefinedAggregateFunction {
override def inputSchema: StructType = StructType(StructField("input", StringType) :: Nil)
override def bufferSchema: StructType = StructType(StructField("pair", StringType) :: Nil)
override def dataType: DataType = StringType
override def deterministic: Boolean = true
override def initialize(buffer: MutableAggregationBuffer): Unit = buffer(0) = ""
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
val b = buffer.getAs[String](0)
val i = input.getAs[String](0)
buffer(0) = { if(b.isEmpty) b + i else b + " + " + i }
}
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
val b1 = buffer1.getAs[String](0)
val b2 = buffer2.getAs[String](0)
if(!b1.isEmpty)
buffer1(0) = (b1) ++ " + " ++ (b2)
else
buffer1(0) = b2
}
override def evaluate(buffer: Row): Any = {
val yourString = buffer.getAs[String](0)
// Compute your logic and return another String
yourString + "@procesed"
}
}然后,您可以在聚合调用中包括:
object testAppl0 {
def main(args: Array[String]) : Unit = {
val agg0 = new ConcatenateStrings()
implicit val spark: SparkSession =
SparkSession
.builder()
.appName("Test")
.master("local[1]")
.getOrCreate()
import spark.implicits._
val rows = Seq(Row(31898,"CP Bill Payment"), Row(31898,"CP e-Transfer + CP IMT"), Row(31898,"CPS Limit + CPS Payee "))
val schema = List(
StructField("ID", IntegerType, true),
StructField("Category", StringType, true))
val df = spark.createDataFrame(
spark.sparkContext.parallelize(rows),
StructType(schema)
)
df.groupBy("ID").agg(agg0($"Category")).show(false)
}
}它将返回一个新列“串联字符串(类别)”:
+-----+--------------------------------------------------------------------------+
|ID |concatenatestrings(Category) |
+-----+--------------------------------------------------------------------------+
|31898|CP Bill Payment + CP e-Transfer + CP IMT + CPS Limit + CPS Payee @procesed|
+-----+--------------------------------------------------------------------------+看看这个,也许能帮上忙
https://stackoverflow.com/questions/52188707
复制相似问题