首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >用于序列发生器的火花UDF

用于序列发生器的火花UDF
EN

Stack Overflow用户
提问于 2020-11-12 12:41:39
回答 1查看 524关注 0票数 0

全,

我正在尝试为spark创建UDF,它将用于生成每行唯一的ID。为了确保唯一性,我所依赖的是: ID生成器将采用“时间戳( bigint )的时代值+”作为参数传递的唯一源ID + randomNumber 5位数。

我有两个问题:

在id生成函数"idGenerator"

  • while中如何使用UDF包含monotonically_increasing_id(),以下错误失败:

代码语言:javascript
复制
    error:Type mismatch;
    found : String(SRC_")
    required : org.apache.spark.sql.Column
            df.withColumn("rowkey",SequenceGeneratorUtil.GenID("SRC_") ) 

请提供任何指针..。

代码语言:javascript
复制
Object SequenceGeneratorUtil extends Serializable {

    val random = new scala.util.Random
    val start = 10000
    val end = 99999

    //CustomEpochGenerator - this is custom function to generate the epoch value for current timestamp in milliseconds
    // ID Generator will take "epoch value of timestamp ( bigint ) + "unique Source ID passed as argument + randomNumber 5 digit
    def idGenerator(SrcIdentifier: String ): String = SrcIdentifier + CustomEpochGenerator.nextID.toString + (start + random.nextInt((end - start) + 1)).toString // + monotonically_increasing_id ( not working )

    val GenID = udf[String, String](idGenerator __)

}

val df2 = df.withColumn("rowkey",SequenceGeneratorUtil.GenID("SRC_") ) 
EN

回答 1

Stack Overflow用户

发布于 2020-11-12 14:48:42

职能以下的变化

代码语言:javascript
复制
def idGenerator(SrcIdentifier: String ): String = SrcIdentifier + CustomEpochGenerator.nextID.toString + (start + random.nextInt((end - start) + 1)).toString // + monotonically_increasing_id ( not working )

在下面的函数中,在mId中添加idGenerator附加参数以保存monotonically_increasing_id值。

代码语言:javascript
复制
def idGenerator(SrcIdentifier: String,mId: Long): String = SrcIdentifier + CustomEpochGenerator.nextID.toString + (start + random.nextInt((end - start) + 1)).toString + mId

udf以下的变化

代码语言:javascript
复制
val GenID = udf[String, String](idGenerator __)

代码语言:javascript
复制
val GenID = udf(idGenerator _)

失败:错误:类型错配;查找:String(SRC_)必需: org.apache.spark.sql.Column df.withColumn("rowkey",SequenceGeneratorUtil.GenID("SRC_") )

因为SequenceGeneratorUtil.GenID需要org.apache.spark.sql.Column类型的值,但是传递值时,SRC_String类型的。

若要解决此问题,请使用lit函数。

代码语言:javascript
复制
df.withColumn("rowkey",SequenceGeneratorUtil.GenID(lit("SRC_")) )

withColumn以下的变化

代码语言:javascript
复制
val df2 = df.withColumn("rowkey",SequenceGeneratorUtil.GenID("SRC_") ) 

代码语言:javascript
复制
val df2 = df
            .withColumn(
                "rowkey",
                SequenceGeneratorUtil.GenID(
                    lit("SRC_"), // using lit function to pass static string.
                    monotonically_increasing_id
                )
            ) 
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/64804129

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档