首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Scala -以特定格式组合列值

Scala -以特定格式组合列值
EN

Stack Overflow用户
提问于 2018-09-05 15:24:32
回答 2查看 453关注 0票数 0

DF1是我现在拥有的,我想让DF1看起来像DF2。

期望产出:

代码语言:javascript
复制
 DF1                                                           DF2
+---------+----------------------------------------+          +---------+-------------------------------------------------------------------+
|   ID    |         Category                       |          |   ID    |                  category_name                                    |
+---------+----------------------------------------+          +---------+-------------------------------------------------------------------+  
|  31898  |   CP Bill Payment                      |          |  31898  |  CP Bill Payment + CP e-Transfer + CP IMT (CPS Limit + CPS Payee) |  
|  31898  |   CP e-Transfer + CP IMT               |          |  32614  |  CP Bill Payment + CP e-Transfer + CP Other Transfer (CPS Blocked)|
|  31898  |   CPS Limit + CPS Payee                |          |  35431  |  CP Bill Payment + CP e-Transfer                                  |
|  32614  |   CP e-Transfer + CP Other Transfer    |          |  33987  |  CP IMT (CPS Limit)                                               |
|  32614  |   CP Bill Payment                      |  =====>  |  35672  |  CPS Blocked                                                      |
|  32614  |   CPS Blocked                          |  =====>  |  37612  |  CPS Blocked + CPS Stop/Cancel/Reverse                            |
|  35431  |   CP e-Transfer                        |          +---------+-------------------------------------------------------------------+
|  35431  |   CP Bill Payment                      |
|  33987  |   CP IMT                               |
|  33987  |   CPS Limit                            |
|  35672  |   CPS Blocked                          |
|  37612  |   CPS Blocked + CPS Stop/Cancel/Reverse|
+---------+----------------------------------------+

我的代码如下:

代码语言:javascript
复制
val DF2 = DF1.groupBy("ID").agg(collect_set("Category").as("CategorySet"))
.groupBy("ID")
.agg(collect_set("Category").as("CategorySet"))
.withColumn( "category_name",
  when(array_contains($"CategorySet", "CP Bill Payment") && array_contains($"CategorySet", "CP e-Transfer + CP IMT") && array_contains($"CategorySet", "CPS Limit + CPS Payee"), "CP Bill Payment + CP e-Transfer + CP IMT (CPS Limit + CPS Payee)").otherwise("---other conditions---"))
.select("ID","category_name")

逻辑是,对于相同的ID,31898/32614/33987:如果包含CP*和CPS*,则应为CP* ( CPS* )或CP* + CP* (CPS*);35431:如果数组中没有CPS*,只需使用+连接数组中的所有元素;35672/37612:否则,仅为数组中的元素。顺便说一句,类别应该按升序排序。

代码起作用了,只是有太多可能的组合。如何使用UDF来做同样的事情?或者有什么内置功能可以做到这一点?提前谢谢你

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2018-09-05 18:43:24

我现在能想到的是:

代码语言:javascript
复制
//UDF
def mapColumn(col: String) = udf { (xs: Seq[String]) => 
                        xs.map { x => 
                          if (x.contains(col+" ")) x else null
                        }.filter(_ != null).mkString(" + ")
                     }

import org.apache.spark.sql.functions._
val df1 = df.groupBy("Id").agg(
                               mapColumn("CP")(sort_array(collect_set("Category"))).as("CategorySetCP"),
                               mapColumn("CPS")(sort_array(collect_set("Category"))).as("CategorySetCPS")
                               ).withColumn("CategorySetCPS_New",concat(lit(" ("),'CategorySetCPS,lit(")")))
                               .withColumn("category_name",
                                           when(length($"CategorySetCP") > 0 and length($"CategorySetCPS") > 0,concat($"CategorySetCP",$"CategorySetCPS_New")).
                                           otherwise(when(length($"CategorySetCP") >0 and length($"CategorySetCPS") === 0,$"CategorySetCP").
                                           otherwise($"CategorySetCPS"))
                                           )
           .select('Id,'category_name)

df1.show(false)

输出:

代码语言:javascript
复制
+-----+-----------------------------------------------------------------+
|Id   |category_name                                                    |
+-----+-----------------------------------------------------------------+
|33987|CP IMT (CPS Limit)                                               |
|32614|CP Bill Payment + CP e-Transfer + CP Other Transfer (CPS Blocked)|
|35672|CPS Blocked                                                      |
|35431|CP Bill Payment + CP e-Transfer                                  |
|31898|CP Bill Payment + CP e-Transfer + CP IMT (CPS Limit + CPS Payee) |
|35612|CPS Blocked + CPS Stop/Cancel/Reverse                            |
+-----+-----------------------------------------------------------------+       

希望这能有所帮助!

票数 1
EN

Stack Overflow用户

发布于 2018-09-05 16:57:12

这是如何使用UDAFs的一个示例。显然,您不需要通过id连接列值,但它允许添加更多的逻辑。例如,要通过ID字段连接这些值,您可以创建一个如下所示的联新议程:

代码语言:javascript
复制
class ConcatenateStrings extends UserDefinedAggregateFunction {
  override def inputSchema: StructType = StructType(StructField("input", StringType) :: Nil)

  override def bufferSchema: StructType = StructType(StructField("pair", StringType) :: Nil)

  override def dataType: DataType = StringType

  override def deterministic: Boolean = true

  override def initialize(buffer: MutableAggregationBuffer): Unit = buffer(0) = ""

  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
      val b = buffer.getAs[String](0)
      val i = input.getAs[String](0)
      buffer(0) = { if(b.isEmpty) b + i else b + " + " + i }
  }

  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    val b1 = buffer1.getAs[String](0)
    val b2 = buffer2.getAs[String](0)
    if(!b1.isEmpty)
      buffer1(0) = (b1) ++ " + " ++ (b2)
    else
      buffer1(0) = b2
  }

  override def evaluate(buffer: Row): Any = {
    val yourString = buffer.getAs[String](0)
    // Compute your logic and return another String
    yourString + "@procesed"
  }
}

然后,您可以在聚合调用中包括:

代码语言:javascript
复制
object testAppl0 {

  def main(args: Array[String]) : Unit = {

    val agg0 = new ConcatenateStrings()

    implicit val spark: SparkSession =
      SparkSession
        .builder()
        .appName("Test")
        .master("local[1]")
        .getOrCreate()

    import spark.implicits._

    val rows = Seq(Row(31898,"CP Bill Payment"), Row(31898,"CP e-Transfer + CP IMT"), Row(31898,"CPS Limit + CPS Payee "))

    val schema = List(
      StructField("ID", IntegerType, true),
      StructField("Category", StringType, true))

    val df =  spark.createDataFrame(
      spark.sparkContext.parallelize(rows),
      StructType(schema)
    )

    df.groupBy("ID").agg(agg0($"Category")).show(false)

  }
}

它将返回一个新列“串联字符串(类别)”:

代码语言:javascript
复制
+-----+--------------------------------------------------------------------------+
|ID   |concatenatestrings(Category)                                              |
+-----+--------------------------------------------------------------------------+
|31898|CP Bill Payment + CP e-Transfer + CP IMT + CPS Limit + CPS Payee @procesed|
+-----+--------------------------------------------------------------------------+

看看这个,也许能帮上忙

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/52188707

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档