首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >缓存和持久化数据集

缓存和持久化数据集
EN

Stack Overflow用户
提问于 2017-09-15 15:38:45
回答 1查看 593关注 0票数 2

我想多次使用org.apache.flink.api.scala.DataSet对象:

  • 使用count()打印行数,
  • 写入neo4j数据库,
  • 转换成Gelly图形对象,
  • 等。

对于这些操作,Flink完全重新计算DataSet的值,而不是缓存它。我找不到像在Spark中那样的缓存()或持久化()函数。

这确实对我的应用程序产生了巨大的影响,我的应用程序有~1.000.000个数据,有许多联接/ coGroup用法等等:运行时似乎增加了3倍,也就是几个小时!那么,如何缓存或持久化数据集并显着减少运行时呢?

我正在使用最新的Flink版本1.3.2和Scala2.11。

示例:

代码语言:javascript
复制
package dummy

import org.apache.flink.api.scala._
import org.apache.flink.graph.scala.Graph
import org.apache.flink.graph.{Edge, Vertex}
import org.apache.logging.log4j.scala.Logging

object Trials extends Logging {

  def main(args: Array[String]) {
    val env = ExecutionEnvironment.getExecutionEnvironment

    // some dataset which could be huge in reality
    val dataSet = env.fromElements((1, 436), (2, 235), (3, 67), (4, 51), (5, 15), (6, 62), (7, 155))

    // some complex joins, coGroup functions etc.
    val joined = dataSet.cross(dataSet).filter(tuple => (tuple._1._2 + tuple._2._2) % 7 == 0)

    // log the number of rows --> performs the join above
    logger.info(f"results contains ${joined.count()} rows")

    // convert to Gelly graph format
    val graph = Graph.fromDataSet(
      dataSet.map(nodeTuple => new Vertex[Long, Long](nodeTuple._1, nodeTuple._2)),
      joined.map(edgeTuple => new Edge[Long, String](edgeTuple._1._1, edgeTuple._2._1, "someValue")),
      env
    )

    // do something with the graph
    logger.info("get number of vertices")
    val numberOfVertices = graph.numberOfVertices()
    logger.info("get number of edges")
    val numberOfEdges = graph.numberOfEdges() // --> performs the join again!

    logger.info(f"the graph has ${numberOfVertices} vertices and ${numberOfEdges} edges")
  }

}

必需的libs: log4j-core,log4j-api-scala_2.11,flink-core,flink-scala_2.11,flink-gelly-scala_2.10

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-08-06 08:34:23

我认为,如果您需要在同一个流上执行多个操作,那么使用侧输出- output.html是值得的。

一旦您执行了一些复杂的联接、coGroup函数等,并获得了一个joined数据集,您就可以将值收集到不同的侧输出--稍后将进行计算计数的输出,另一个将完成另一项工作。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/46243181

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档