首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >无法重新分区DStream

无法重新分区DStream
EN

Stack Overflow用户
提问于 2017-07-02 13:10:53
回答 1查看 847关注 0票数 2
代码语言:javascript
复制
val sparkConf = new SparkConf().setMaster("yarn-cluster")
                               .setAppName("SparkJob")
                               .set("spark.executor.memory","2G")
                               .set("spark.dynamicAllocation.executorIdleTimeout","5")

val streamingContext = new StreamingContext(sparkConf, Minutes(1))

var historyRdd: RDD[(String, ArrayList[String])] = streamingContext.sparkContext.emptyRDD

var historyRdd_2: RDD[(String, ArrayList[String])] = streamingContext.sparkContext.emptyRDD

val stream_1 = KafkaUtils.createDirectStream[String, GenericData.Record, StringDecoder, GenericDataRecordDecoder](streamingContext, kafkaParams ,  Set(inputTopic_1))
val dstream_2 = KafkaUtils.createDirectStream[String, GenericData.Record, StringDecoder, GenericDataRecordDecoder](streamingContext, kafkaParams ,  Set(inputTopic_2))

val dstream_2 = stream_2.map((r: Tuple2[String, GenericData.Record]) => 
{
    //some mapping
}

dstream_1.foreachRDD(r => r.repartition(500))
val historyDStream = dstream_1.transform(rdd => rdd.union(historyRdd))
dstream_2.foreachRDD(r => r.repartition(500))
val historyDStream_2 = dstream_2.transform(rdd => rdd.union(historyRdd_2))
val fullJoinResult = historyDStream.fullOuterJoin(historyDStream_2)

val filtered = fullJoinResult.filter(r => r._2._1.isEmpty)


filtered.foreachRDD{rdd =>
    val formatted = rdd.map(r  => (r._1 , r._2._2.get)) 
    historyRdd_2.unpersist(false) // unpersist the 'old' history RDD
    historyRdd_2 = formatted // assign the new history
    historyRdd_2.persist(StorageLevel.MEMORY_AND_DISK) // cache the computation
}

val filteredStream = fullJoinResult.filter(r => r._2._2.isEmpty)

filteredStream.foreachRDD{rdd =>
    val formatted = rdd.map(r => (r._1 , r._2._1.get)) 
    historyRdd.unpersist(false) // unpersist the 'old' history RDD
    historyRdd = formatted // assign the new history
    historyRdd.persist(StorageLevel.MEMORY_AND_DISK) // cache the computation
}

streamingContext.start()
streamingContext.awaitTermination()
}
}

我无法使用上面的代码重新划分DStream,我得到了128个分区作为输入,这就是no。对于Kafka分区,由于Join,我需要对读写数据进行洗牌,所以我想通过增加无分区来增加并行性。但是分区仍然是same.Why,是吗?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2017-07-02 13:31:38

就像mapfilter一样,repartition是星火中的一种转变,意味着三件事:

  • 它返回另一个不可变的RDD。
  • 太懒了
  • 它需要通过一些行动来实现

考虑到这一守则:

代码语言:javascript
复制
dstream_1.foreachRDD(r => r.repartition(500))

repartition中使用foreachRDD作为副作用什么也不做.生成的RDD从未被使用过,因此重新分区永远不会发生。

我们应该把这一转变与工作中的其他行动联系起来。在这种情况下,实现这一目标的一个简单方法是使用transform

代码语言:javascript
复制
val repartitionedDStream = dstream_1.transform(rdd => rdd.repartition(500))
... use repartitionedDStream further on ...
票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/44871173

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档