首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >从主RDD创建RDD

从主RDD创建RDD
EN

Stack Overflow用户
提问于 2019-03-20 22:37:52
回答 1查看 66关注 0票数 0

我有一个RDD (RDD[(String,IterableEvent)],它有一个键,表示一年中的一个月,值是该月发生的数百万个事件。

我想遍历每个键,并创建键事件的RDD。然后,我想为当月事件的每一天创建一个event RDD,这样我就可以将它们发送到相关的s3位置(“目录”结构是bucketName/year/month/ day )。

问题是,似乎你不能在另一个RDD的foreach中创建RDD。因此,我不确定如何在不将整个主RDD加载到内存中的情况下实现我想要的东西(这肯定会耗尽驱动程序的内存,并在一开始就失去使用Spark的意义)。

也许有一种方法可以使用Spark来实现我想要的东西,我只是对此并不了解,希望这里有人能帮上忙。

下面是我目前掌握的代码:

代码语言:javascript
复制
 private def store(
    eventsByMonth: RDD[(String, Iterable[Event])]
  )(
    implicit sqlContext: SQLContext
  ): Try[Unit] =
    Try(
      eventsByMonth
        .foreach {
          case (_, events: Iterable[Event]) =>
            writeToS3Files(sqlContext.sparkContext.parallelize(events.toSeq))
        }
    )

  private def writeToS3Files(events: RDD[Event])(
    implicit sqlContext: SQLContext
  ): Try[Unit] =
    Try(
      // outputFilePath will contain the day that these events are related to.
      events.groupBy(_.outputFilePath).foreach {
        case (filePath: String, eventsForFile: Iterable[Event]) =>
          writeToS3File(filePath, sqlContext.sparkContext.parallelize(eventsForFile.toSeq))
      }
    )

  private def writeToS3File(filePath: String, events: RDD[Event]): Try[Unit] = {
    val fileNameWithPath = s"${filePath}${UUID.randomUUID().toString}.gz"

    Try(events.saveAsTextFile(fileNameWithPath, classOf[GzipCodec]))
  }
EN

回答 1

Stack Overflow用户

发布于 2019-03-21 11:33:11

我假设有一些方法可以确定事件发生的日期(例如,day (类型为Int)是事件的属性)。

您可以将RDD[(String,IterableEvent] )转换为一个PairRDD(K,V),其中键(K)是事件发生的月份和日期,值(V)是在该月的某一天发生的所有事件。之后,您可以轻松地将数据转储到数据库中。

代码语言:javascript
复制
val eventsByMonthAndDate = eventsByMonth.flatMap { case (month, events) => events.map(e => ((month, e.day), e)) }
eventsByMonthAndDate.groupby(_._1).foreach(writeToDB)
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/55263371

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档