首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >将spark sql中的日期列与最大日期进行比较

将spark sql中的日期列与最大日期进行比较
EN

Stack Overflow用户
提问于 2019-07-08 10:07:52
回答 1查看 987关注 0票数 0

使用Spark2.3.0和Scala

有一张下面这样的桌子:

代码语言:javascript
复制
created_date mth    ColA    
2019-01-01  2019-01 a
2019-01-01  2019-01 b
2019-01-02  2019-01 a
2019-01-02  2019-01 b
.
.
2019-06-26  2019-01 a

模式看起来像:

代码语言:javascript
复制
root
 |-- transaction_created_date: string (nullable = true)
 |-- txn_mth: string (nullable = true)
 |-- ColA: string (nullable = true)

想要比较created_date列和max_date,并创建一个新列

试过如下:

代码语言:javascript
复制
var max_date = sparkVal.sql(s"""SELECT cast(max(created_date)                 
        as DATE) from BASE_TABLE""").first()
val maxDateValue = max_date.get(0)
var day_counter=10
val data =spark.sql(s"""SELECT
       created_date,
       mth,
       sum(if(date_add(created_date+$day_counter) > cast($maxDateValue as DATE) ),1,0)) 
       as Total_arrival from BASE_TALE a""")

lets say max_date = 2019-06-29希望输出类似于

代码语言:javascript
复制
created_date mth    Total_arrival
2019-01-01  2019-01 1
2019-01-01  2019-01 1
2019-01-01  2019-01 1
2019-01-02  2019-01 1
.
.
2019-06-26  2019-01 0
2019-06-27  2019-01 0
2019-06-28  2019-01 0
2019-06-29  2019-01 0
2019-06-30  2019-01 0

getting below error :

由于数据类型不匹配,org.apache.spark.sql.AnalysisException:无法解析“强制转换(2019-6)- 26)为日期”:无法转换到目前为止的int;第43行pos 106;

有人可以帮助转换maxdate,以便它可以用于与日期列进行比较吗?

EN

回答 1

Stack Overflow用户

发布于 2019-07-09 14:41:51

一项执行可以是以下内容:

代码语言:javascript
复制
object TestSO {

  def main(args: Array[String]) : Unit = {
    // dataset
    implicit val spark: SparkSession =
      SparkSession
        .builder()
        .master("local[1]")
        .appName("Test")
        .getOrCreate()

    import org.apache.spark.sql.functions.{to_date, col, max, when, date_add, lit}

    val data = Seq(Row("2019-01-01", "2019-01", "a"),
                   Row("2019-01-01", "2019-01", "b"),
                   Row("2019-01-02", "2019-01", "a"),
                   Row("2019-01-02", "2019-01", "b"))

    val df = spark.createDataFrame(spark.sparkContext.parallelize(data), StructType(List(StructField("transaction_created_date", StringType, false),
      StructField("txn_mth", StringType, false),
      StructField("ColA", StringType, false))))

    // Add a column with a new column as date. It could be done all in one line
    val df_withdate = df.withColumn("transaction_created_date",
      to_date(col("transaction_created_date")))

    var day_counter=10

    // Getting the max
    val max_date = df_withdate
      .select(max(col("transaction_created_date")))
      .collect()(0)(0)

    // Put 1, in rows where creation_date + day_counter > max_date
    val result_df = df_withdate.withColumn("Total_arrival",
      when(date_add(col("transaction_created_date"), day_counter) > to_date(lit(max_date)), 1)
     .otherwise(0))

    result_df.show()
  }
}

它规定:

代码语言:javascript
复制
+------------------------+-------+----+-------------+
|transaction_created_date|txn_mth|ColA|Total_arrival|
+------------------------+-------+----+-------------+
|              2019-01-01|2019-01|   a|            1|
|              2019-01-01|2019-01|   b|            1|
|              2019-01-02|2019-01|   a|            1|
|              2019-01-02|2019-01|   b|            1|
+------------------------+-------+----+-------------+
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/56932657

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档