首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Scala和Spark UDF函数

Scala和Spark UDF函数
EN

Stack Overflow用户
提问于 2016-07-28 18:04:22
回答 2查看 30.2K关注 0票数 11

我创建了一个简单的UDF来转换或提取spark中temptabl中的时间字段的一些值。我注册了该函数,但是当我使用sql调用该函数时,它抛出了一个NullPointerException。下面是我的函数和执行它的过程。我在用齐柏林飞艇。奇怪的是,昨天它还在工作,但今天早上它停止了工作。

函数

代码语言:javascript
复制
def convert( time:String ) : String = {
  val sdf = new java.text.SimpleDateFormat("HH:mm")
  val time1 = sdf.parse(time)
  return sdf.format(time1)
}

注册函数

代码语言:javascript
复制
sqlContext.udf.register("convert",convert _)

在没有SQL的情况下测试函数--这是可行的

代码语言:javascript
复制
convert(12:12:12) -> returns 12:12

在Zeppelin中使用SQL测试函数失败。

代码语言:javascript
复制
%sql
select convert(time) from temptable limit 10

temptable的结构

代码语言:javascript
复制
root
 |-- date: string (nullable = true)
 |-- time: string (nullable = true)
 |-- serverip: string (nullable = true)
 |-- request: string (nullable = true)
 |-- resource: string (nullable = true)
 |-- protocol: integer (nullable = true)
 |-- sourceip: string (nullable = true)

我得到的堆栈跟踪的一部分。

代码语言:javascript
复制
java.lang.NullPointerException
    at org.apache.hadoop.hive.ql.exec.FunctionRegistry.getFunctionInfo(FunctionRegistry.java:643)
    at org.apache.hadoop.hive.ql.exec.FunctionRegistry.getFunctionInfo(FunctionRegistry.java:652)
    at org.apache.spark.sql.hive.HiveFunctionRegistry.lookupFunction(hiveUdfs.scala:54)
    at org.apache.spark.sql.hive.HiveContext$$anon$3.org$apache$spark$sql$catalyst$analysis$OverrideFunctionRegistry$$super$lookupFunction(HiveContext.scala:376)
    at org.apache.spark.sql.catalyst.analysis.OverrideFunctionRegistry$$anonfun$lookupFunction$2.apply(FunctionRegistry.scala:44)
    at org.apache.spark.sql.catalyst.analysis.OverrideFunctionRegistry$$anonfun$lookupFunction$2.apply(FunctionRegistry.scala:44)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.sql.catalyst.analysis.OverrideFunctionRegistry$class.lookupFunction(FunctionRegistry.scala:44)
EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2016-07-28 22:11:41

使用udf而不是直接定义函数

代码语言:javascript
复制
import org.apache.spark.sql.functions._

val convert = udf[String, String](time => {
        val sdf = new java.text.SimpleDateFormat("HH:mm")
        val time1 = sdf.parse(time)
        sdf.format(time1)
    }
)

udf的输入参数是列(或多个列)。返回类型为Column。

代码语言:javascript
复制
case class UserDefinedFunction protected[sql] (
    f: AnyRef,
    dataType: DataType,
    inputTypes: Option[Seq[DataType]]) {

  def apply(exprs: Column*): Column = {
    Column(ScalaUDF(f, dataType, exprs.map(_.expr), inputTypes.getOrElse(Nil)))
  }
}
票数 16
EN

Stack Overflow用户

发布于 2019-04-11 01:56:01

您必须将您的函数定义为UDF。

代码语言:javascript
复制
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions.udf

val convertUDF: UserDefinedFunction = udf((time:String) => {
  val sdf = new java.text.SimpleDateFormat("HH:mm")
  val time1 = sdf.parse(time)
  sdf.format(time1)
})

接下来,您将在DataFrame上应用您的自定义定义文件。

代码语言:javascript
复制
// assuming your DataFrame is already defined
dataFrame.withColumn("time", convertUDF(col("time"))) // using the same name replaces existing

现在,对于您的实际问题,您收到此错误的一个原因可能是因为您的DataFrame包含为空的行。如果在应用UDF之前将它们过滤掉,则应该可以继续,不会出现任何问题。

代码语言:javascript
复制
dataFrame.filter(col("time").isNotNull)

我很好奇除了遇到空值之外,还有什么原因会导致运行UDF时出现NullPointerException,如果你找到了与我的建议不同的原因,我很乐意知道。

票数 5
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/38633216

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档