首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >火花持续30天过滤,提高性能的最佳方法

火花持续30天过滤,提高性能的最佳方法
EN

Stack Overflow用户
提问于 2016-06-15 04:17:01
回答 3查看 3.4K关注 0票数 3

我有一个记录的RDD,转换为DataFrame,我希望按日时间戳进行过滤,并计算最后30个每日统计数据,按列过滤并计算结果。

Spark在进入for循环之前是非常快的,所以我想知道这是否是一种反模式的方法,我如何才能做到它有良好的性能,我应该使用火花笛卡尔,如何?

代码语言:javascript
复制
//FILTER PROJECT RECORDS
val clientRecordsDF = recordsDF.filter($"rowkey".contains(""+client_id))
client_records_total = clientRecordsDF.count().toLong

这是clientRecordsDF内容

代码语言:javascript
复制
root
 |-- rowkey: string (nullable = true) //CLIENT_ID-RECORD_ID
 |-- record_type: string (nullable = true)
 |-- device: string (nullable = true)
 |-- timestamp: long (nullable = false) // MILLISECOND
 |-- datestring: string (nullable = true) // yyyyMMdd

[1-575e7f80673a0,login,desktop,1465810816424,20160613]
[1-575e95fc34568,login,desktop,1465816572216,20160613]
[1-575ef88324eb7,registration,desktop,1465841795153,20160613]
[1-575efe444d2be,registration,desktop,1465843268317,20160613]
[1-575e6b6f46e26,login,desktop,1465805679292,20160613]
[1-575e960ee340f,login,desktop,1465816590932,20160613]
[1-575f1128670e7,action,mobile-phone,1465848104423,20160613]
[1-575c9a01b67fb,registration,mobile-phone,1465686529750,20160612]
[1-575dcfbb109d2,registration,mobile-phone,1465765819069,20160612]
[1-575dcbcb9021c,registration,desktop,1465764811593,20160612] 
...


the for loop with bad performances

var dayCounter = 0;
for( dayCounter <- 1 to 30){ 
    //LAST 30 DAYS

    // CREATE DAY TIMESTAMP
    var cal = Calendar.getInstance(gmt);

    cal.add(Calendar.DATE, -dayCounter);
    cal.set(Calendar.HOUR_OF_DAY, 0);
    cal.set(Calendar.MINUTE, 0);
    cal.set(Calendar.SECOND, 0);
    cal.set(Calendar.MILLISECOND, 0);
    val calTime=cal.getTime()
    val dayTime = cal.getTimeInMillis()

    cal.set(Calendar.HOUR_OF_DAY, 23);
    cal.set(Calendar.MINUTE, 59);
    cal.set(Calendar.SECOND, 59);
    cal.set(Calendar.MILLISECOND, 999);
    val dayTimeEnd = cal.getTimeInMillis()

    //FILTER PROJECT RECORDS
    val dailyClientRecordsDF = clientRecordsDF.filter(
      $"timestamp" >= dayTime && $"timestamp" <= dayTimeEnd
    )
    val daily_client_records = dailyClientRecordsDF.count().toLong

    println("dayCounter "+dayCounter+" records = "+daily_project_records);

    // perform other filter on dailyClientRecordsDF
    // save daily statistics to hbase

  }
}
EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2016-06-15 04:48:28

这种方法遵循SQL。首先,我注册了一个要查询的表。然后,我需要定义一个UDF (用户定义函数)来转换时间戳到日期。最后,您需要像在sql中所做的那样对所需的日期范围进行筛选和分组。

代码语言:javascript
复制
    def mk(timestamp: Long): Long = {
            val blockTime: Int = 3600 * 24 // daily
          //  val blockTime: Int = 3600 // hourly
            (timestamp - timestamp % blockTime)
          }

    recordsDF.registerTempTable("client") // define your table
    sqlContext.udf.register("makeDaily", (timestamp: Long) => mk(timestamp)) // register your function

    val res = sqlContext.sql("""select makeDaily(timestamp) as date, count(*) as count 
                                from client 
                                where timestamp between 111111 and 222222 
                                group by makeDaily(timestamp)""").collect()

补充:例如,计数所有的record_type是在30天内注册。

代码语言:javascript
复制
sqlContext.sql("select count(*) 
                from client 
                where record_type='registration' and timestamp between 1111 and 2222")
票数 1
EN

Stack Overflow用户

发布于 2020-07-22 17:53:24

几乎在每一种情况下,都应该避免创建UDF。这样做可以防止催化剂优化器正确地处理查询。

相反,使用内置的SQL函数:

代码语言:javascript
复制
(
  spark.read.table("table_1")
  .join(
    spark.read.table("table_2"), 
    "user_id"
  )
  .where("p_eventdate > current_date() - 30")
)
票数 2
EN

Stack Overflow用户

发布于 2021-02-20 23:04:16

date_sub(current_date(), 30)在1.5.0之后可用。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/37826054

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档