Spark version 1.6.0 on AWS EMR with Zeppelin Notebook
我用以下代码定义了一个UDAF:
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row;
import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import java.text.SimpleDateFormat
import java.util.Date
class AggregateTS extends UserDefinedAggregateFunction{
def inputSchema: StructType = StructType(StructField("input", StringType) :: Nil)
def bufferSchema: StructType = StructType(StructField("intermediate", StringType)::Nil)
def dataType: DataType = StringType
def deterministic: Boolean = true
def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = "Init"
}
def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
if (buffer.getAs[String](0) == "Init"){
buffer(0) = input.getAs[String](0)
}
else{
// add two string
buffer(0) = average_ts(input.getAs[String](0), buffer.getAs[String](0))
}
}
def merge(buffer1: MutableAggregationBuffer, buffer2:Row):Unit = {
buffer1(0) = average_ts(buffer1.getAs[String](0), buffer2.getAs[String](0))
}
def evaluate(buffer: Row): Any = {
buffer.getAs[String](0)
}
}从中我得到了一个编译错误:
error: not found: type DataType
def dataType: DataType = StringType这是什么意思?
发布于 2016-03-08 21:42:03
我自己解决的。这似乎是一些导入冲突错误。我将导入语句更改为显式
import org.apache.spark.sql.types.{DataType}然后它就能工作了
https://stackoverflow.com/questions/35832624
复制相似问题