Dstream 也视同数据集。 我的简单理解,Dstream是在RDD上面又封了一层的数据结构。下面是官网对Dstream描述的图。 ? 在这里先看一下Dstream的转换。 ? Dstream依赖关系 Dstream的一些依赖关系还是要先弄明白的,不然不太容易理解。Dstream依赖图很大,我们只列几个这次关注的。 源码分析:Dstream.flatMap方法(以及Dstream如何生成RDD) Dstream前面已经做过了一些介绍,不再赘述,这里开始按照例子的顺序向下讲。 看我们的第一个转换flatMap。 源码分析:DStream.print方法 最后的打印函数也有点意思,它调用的时Dstream的print函数。
Spark Streaming里的DStream可以看成是Spark Core里的RDD的模板,DStreamGraph是RDD DAG的模板。 跟着例子看流程 DStream 也和 RDD 一样有着转换(transformation)和 输出(output)操作,通过 transformation 操作会产生新的DStream,典型的transformation 来和parent DStream形成依赖链,通过outputStreams 向前追溯遍历就可以得到所有上游的DStream,另外,DStreamGraph 还会记录所有的inputStreams ,避免每次为查找 另外其dependencies是直接指向了其构造参数parent,也就是刚才的ReceiverInputDStream,每个新建的DStream的dependencies都是指向了其父DStream,这样就构成了一个依赖链 ,也就是形成了DStream DAG。
输入DStream代表了来自数据源的输入数据流。 在之前的wordcount例子中,lines就是一个输入DStream(JavaReceiverInputDStream),代表了从netcat(nc)服务接收到的数据流。 除了文件数据流之外,所有的输入DStream都会绑定一个Receiver对象,该对象是一个关键的组件,用来从数据源接收数据,并将其存储在Spark的内存中,以供后续处理。 要注意的是,如果你想要在实时计算应用中并行接收多条数据流,可以创建多个输入DStream。这样就会创建多个Receiver,从而并行地接收多个数据流。 使用本地模式,运行程序时,绝对不能用local或者local[1],因为那样的话,只会给执行输入DStream的executor分配一个线程。
HDFS文件 基于HDFS文件的实时计算,其实就是,监控一个HDFS目录,只要其中有新文件出现,就实时处理。相当于处理实时的文件流。 streamingContext.fileStream<KeyClass, ValueClass, InputFormatClass>(dataDirectory) streamingContext.fileStreamKeyClass, ValueClass, InputFormatClass Spark Streaming会监视指定的HDFS目录,并且处理出现在目录中的文件。要注意的是,所有放入HDFS目录中的文件,都必须有相同的格式;必须使用移动或者重命名的方式,将文件移入目录;一旦处理之后,文件的内容即使改变,也不会再处理了;基于HDFS文件的数据源是没有Receiver的,因此不会占用一个cpu core。
DStream 无状态转换操作 map:每个元素采用操作,返回的列表形式 flatmap:操作之后拍平,变成单个元素 filter:过滤元素 repartition:通过改变分区的多少,来改变DStream 的并行度 reduce:对函数的每个进行操作,返回的是一个包含单元素RDD的DStream count:统计总数 union:合并两个DStream reduceByKey:通过key分组再通过func updateFunc,initialRDD=initialStateRDD) running_counts.pprint() ssc.start() ssc.awaitTermination() DStream streaming/stateful/output") # 保存到该路径下 running_counts.pprint() ssc.start() ssc.awaitTermination() DStream
要开发Spark Streaming应用程序,核心是通过StreamingContext创建DStream。因此DStream对象就是Spark Streaming中最核心的对象。 DStream的核心是通过时间的采用间隔将连续的数据流转换成是一系列不连续的RDD,在由Transformation进行转换,从而达到处理流式数据的目的。 因此从表现形式上看,DStream是由一系列连续的RDD组成,因此DStream也就具备了RDD的特性。 通过上图中可以看出DStream的表现形式其实就是RDD,因此操作DStream和操作RDD的本质其实是一样的。 由于DStream是由一系列离散的RDD组成,因此Spark Streaming的其实是一个小批的处理模型,本质上依然还是一个批处理的离线计算。
DStream 的依赖链 每个 DStream 的子类都会继承 def dependencies: List[DStream[_]] = List()方法,该方法用来返回自己的依赖的父 DStream : T => U ) extends DStream[U](parent.ssc) { override def dependencies: List[DStream[_]] = List(parent DStream,及 lines,到这里,依赖链就变成了下图: ? 与 DStream transform 操作返回一个新的 DStream 不同,output 操作不会返回任何东西,只会创建一个ForEachDStream作为依赖链的终结。 本文以一个简单的例子说明 DStream DAG 的生成过程,之后将再写两篇文章说明如何根据这个 DStream DAG 得到 RDD DAG 及如何定时生成 job。 ----
与RDD中的惰性求值类似,如果一个DStream及其派生出的DStream都没有被执行输出操作,那么这些DStream就都不会被求值。
'join' between RDDs of `this` DStream and `other` DStream `this` DStream and `other` DStream * * 通过join this和other Dstream的rdd构建出一个新的DStream. by applying 'join' between RDDs of `this` DStream and `other` DStream * 通过join this和other Dstream的rdd构建出一个新的DStream.
{DStream, ReceiverInputDStream} import org.apache.spark.streaming. 案例解析 Discretized Stream(DStream) 是 Spark Streaming 提供的基本抽象, 表示持续性的数据流, 可以来自输入数据, 也可以是其他的 DStream 转换得到 在 DStream 中的每个 RDD 包含一个确定时间段的数据. ? 对 DStream 的任何操作都会转换成对他里面的 RDD 的操作. 比如前面的 wordcount 案例, flatMap是应用在 line DStream 的每个 RDD 上, 然后生成了 words SStream 中的 RDD. 如下图所示: ? DStream 的操作隐藏的大多数的细节, 然后给开发者提供了方便使用的高级 API. ? ? 本次的分享就到这里了
---- SparkStreaming数据抽象-DStream DStream 是什么 Spark Streaming的核心是DStream,DStream类似于RDD,它实质上一系列的RDD的集合 DStream = Seq[RDD] DStream相当于一个序列(集合),里面存储的数据类型为RDD(Streaming按照时间间隔划分流式数据) 对DStream的数据进行操作也是按照RDD为单位进行的 DStream中每批次数据RDD在处理时,各个RDD之间存在依赖关系,DStream直接也有依赖关系,RDD具有容错性,那么DStream也具有容错性。 DStream Operations DStream#Output Operations:将DStream中每批次RDD处理结果resultRDD输出 DStream类似RDD,里面包含很多函数,进行数据处理和输出操作 返回值为true的DStream元素并返回一个新的DStream union(otherStream) 将源DStream和输入参数为otherDStream的元素合并,并返回一个新的DStream.
DStream抽象:核心数据结构与操作 DStream(Discretized Stream,离散化流)是Spark Streaming中最核心的数据抽象,它代表了一个连续的数据流,但在内部被划分为一系列小的 每个RDD包含该时间窗口内到达的所有数据记录,而DStream则由这一系列按时间排序的RDD序列构成。 创建DStream有多种方式,最常见的是通过 StreamingContext 对象连接外部数据源。 转换操作允许对DStream中的每个RDD进行处理,生成新的DStream。 首先,我们创建一个DStream,每5秒接收一批日志数据。
用法及说明 测试过程中,可以通过使用ssc.queueStream(queueOfRDDs)来创建DStream,每一个推送到这个队列中的RDD,都会作为一个DStream处理。 2. 通过 Spark Streaming创建 Dstream,计算 WordCount package com.buwenbuhuo.spark.streaming.day01 import org.apache.spark {SparkConf, SparkContext} import org.apache.spark.rdd.RDD import org.apache.spark.streaming.dstream. {DStream, InputDStream} import org.apache.spark.streaming. 包内提供的 KafkaUtils 对象可以在 StreamingContext和JavaStreamingContext中以你的 Kafka 消息创建出 DStream。
@5a69b104 has not been initialized at org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala :321) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342 ) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342) at scala.Option.orElse(Option.scala:257) at org.apache.spark.streaming.dstream.DStream.getOrCompute (DStream.scala:339) at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala
上图为官网的解释,我们可以翻译为: 与RDD相似,转换允许修改来自输入DStream的数据。DStream支持普通Spark RDD上可用的许多转换。 无状态转化操作也能在多个DStream间整合数据,不过也是在各个时间区间内。 我们还可以像在常规的 Spark 中一样使用 DStream的union() 操作将它和另一个DStream 的内容合并起来,也可以使用StreamingContext.union()来合并多个流。 其实也就是对DStream中的RDD应用转换。 1. 上图所示, 窗口在 DStream 上每滑动一次, 落在窗口内的那些 RDD会结合在一起, 然后在上面操作产生新的 RDD, 组成了 window DStream.
对DStream实施map操作,会转换成另外一个DStream 2. DStream是一组连续的RDD序列,这些RDD中的元素的类型是一样的。 对DStream实施windows或者reduceByKeyAndWindow操作,也是转换成另外一个DStream(window操作是stateful DStream Transformation) DStream内部有如下三个特性: -DStream也有依赖关系,一个DStream可能依赖于其它的DStream(依赖关系的产生,同RDD是一样的) -DStream创建RDD的时间间隔,这个时间间隔是不是就是构造 -在时间间隔到达后,DStream创建RDD的方法 在DStream内部,DStream表现为一系列的RDD的序列,针对DStream的操作(比如map,filter)会转换到它底层的RDD的操 作 也就是说,在 Spark Streaming中,DStream中的每个RDD的数据是一个时间窗口的累计。 下图展示了对DStream实施转换算子flatMap操作。
(func): 返回一个新的DStream,仅包含源DStream中满足函数func的项; repartition(numPartitions): 通过创建更多或者更少的分区改变DStream的并行程度 ; union(otherStream): 返回一个新的DStream,包含源DStream和其他DStream的元素; count():统计源DStream中每个RDD的元素数量; reduce(func ):利用函数func聚集源DStream中每个RDD的元素,返回一个包含单元素RDDs的新DStream; countByValue():应用于元素类型为K的DStream上,返回一个(K,V)键值对类型的新 创建一个新DStream。 与RDD中的惰性求值类似,如果一个DStream及其派生出的DStream都没有被执行输出操作,那么这些DStream就都不会被求值。
DStream也是有依赖关系的 flatMap 操作也是直接作用在DStream上的,就和作用于RDD一样 这样很好理解 ? filter(func) 返回一个新的 DStream,它仅仅包含原 DStream 中函数 func 返回值为 true 的项。 countByValue() 在元素类型为 K 的 DStream上,返回一个(K,long)pair 的新的 DStream,每个 key 的值是在原 DStream 的每个 RDD 中的次数。 transform(func) 通过对源 DStream 的每个 RDD 应用 RDD-to-RDD 函数,创建一个新的 DStream。这个可以在 DStream 中的任何 RDD 操作中使用。 返回一个新的 DStream,它是基于 source DStream 的窗口 batch 进行计算的。
对DStream应用的算子,比如map,其实在底层会被翻译为对DStream中每个RDD的操作。比如对一个DStream执行一个map操作,会产生一个新的DStream。 但是,在底层,其实其原理为,对输入DStream中每个时间段的RDD,都应用一遍map操作,然后生成的新的RDD,即作为新的DStream中的那个时间段的一个RDD。 重要概念 Dstream Spark Streaming提供了表示连续数据流的、高度抽象的被称为离散流的DStream DStream是Spark Streaming Dstream可以看做一组RDDs,即RDD的一个序列 Spark的RDD可以理解为空间维度,Dstream的RDD理解为在空间维度上又加了个时间维度。 创建DStream的两种方式 1. 由Kafka,Flume取得的数据作为输入数据流。 2. 在其他DStream进行的高层操作。 6.
DStream 和 RDD 是包含的关系,你可以理解为Java里的装饰模式,也就是DStream 是对RDD的增强,但是行为表现和RDD是基本上差不多的。 的,这也就是说,DStream有机会和义务去负责RDD的生命周期。 ,一种是调用DStream.cache,第二种是RDD.cache。 dependencies 你可以简单理解为父DStream,通过dependencies 我们可以获得已完整DStream链。 每个DStream 都会被扫描,不同的DStream根据情况不同,保留的RDD数量也是不一致的,但都是根据rememberDuration变量决定,而该变量会被下游的DStream所影响,所以不同的DStream