我正试图在我的4核心机器上,在本地模式下运行一些关于星火流应用程序的处理时间的测试。
这是我的代码:
SparkConf sparkConf = new SparkConf().setMaster("local[2]").setAppName("sparkstreaminggetjson");
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));
JavaReceiverInputDStream<String> streamData1 = ssc.socketTextStream(args[0], Integer.parseInt(args[1]),
StorageLevels.MEMORY_AND_DISK_SER);
streamData1.print();我每秒收到一个JSON消息。因此,我对4种不同的场景进行了测试:
1) setMaster(...local[2])和1分区
2) setMaster(...local[*])和1分区
3)setMaster(...local[2])和4个分区(使用streamData1.repartition(4))
4) setMaster(...local[*])和4个分区(使用streamData1.repartition(4))
当我检查UI中的平均处理时间时,这是我在每个场景中得到的:
1) 30毫秒
2) 28毫秒
( 3) 72毫秒
4) 75毫秒
我的问题是:为什么1、2、3和4的处理时间几乎相同?例如,我意识到从2增加到4是正常的,因为重新分区是一种洗牌操作。我不明白的是,例如,为什么处理过程与3如此相似?它不应该小得多,因为我正在增加并行化的水平,而且我有更多的核心来分配任务?
希望我没弄糊涂,谢谢你。
发布于 2016-06-16 13:15:13
这在一定程度上取决于您的JSON消息是什么样子的,我假设每条消息都是一个没有换行的字符串。在这种情况下,每秒有1条消息,批处理间隔为1秒,在每个批处理中,您将得到一个只有一个项的RDD。您不能将其分割成多个分区,因此当您重新分区时,您仍然有相同的情况数据,但是使用重分区步骤的开销。
即使有更多的数据,当您对数据所做的所有操作都是print()时,我也不会期望有太大的差异:这将占用您的数据的前10项,如果它们可以来自一个分区,那么我希望Spark会对其进行优化,只计算一个分区。在任何情况下,如果您显著增加每批的数据量,并在整个集合上进行一些实际处理,那么您将得到更有代表性的数字,至少类似于streamData1.count().print()。
为了更好地了解发生了什么,深入了解Spark的UI的其他部分也是有用的,比如Stages选项卡,它可以告诉您执行时间中有多少是洗牌、序列化等,而不是实际执行,以及影响性能的事情,比如告诉您缓存哪些位的DAGs,以及Spark可以跳过的任务。
https://stackoverflow.com/questions/37857359
复制相似问题