首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >火花流预写日志重启后不重放数据

火花流预写日志重启后不重放数据
EN

Stack Overflow用户
提问于 2016-01-21 11:20:25
回答 1查看 1.5K关注 0票数 2

为了有一种简单的方法来测试Streaming,Log,我创建了一个非常简单的自定义输入接收器,它将生成字符串并存储这些字符串:

代码语言:javascript
复制
class InMemoryStringReceiver extends Receiver[String](StorageLevel.MEMORY_AND_DISK_SER) {

  val batchID = System.currentTimeMillis()

  def onStart() {
    new Thread("InMemoryStringReceiver") {
      override def run(): Unit = {
        var i = 0
        while(true) {
          //http://spark.apache.org/docs/latest/streaming-custom-receivers.html
          //To implement a reliable receiver, you have to use store(multiple-records) to store data.
          store(ArrayBuffer(s"$batchID-$i"))
          println(s"Stored => [$batchID-$i)]")
          Thread.sleep(1000L)
          i = i + 1
        }
      }
    }.start()
  }

  def onStop() {}
}

然后,我创建了一个简单的应用程序,它将使用自定义接收器来流数据并处理它:

代码语言:javascript
复制
object DStreamResilienceTest extends App {

  val conf = new SparkConf().setMaster("local[*]").setAppName("DStreamResilienceTest").set("spark.streaming.receiver.writeAheadLog.enable", "true")
  val ssc = new StreamingContext(conf, Seconds(1))
  ssc.checkpoint("hdfs://myhdfsserver/user/spark/checkpoint/DStreamResilienceTest")
  val customReceiverStream: ReceiverInputDStream[String] = ssc.receiverStream(new InMemoryStringReceiver())
  customReceiverStream.foreachRDD { (rdd: RDD[String]) =>
    println(s"processed => [${rdd.collect().toList}]")
    Thread.sleep(2000L)
  }
  ssc.start()
  ssc.awaitTermination()

}

如您所见,每个接收到的RDD的处理有2秒的睡眠,而String则每秒钟存储一次。这会创建一个待办事项,新的字符串堆积起来,并且应该存储在WAL中。实际上,我可以看到检查点文件中的文件正在更新。运行该应用程序时,我会得到如下输出:

代码语言:javascript
复制
[info] Stored => [1453374654941-0)]
[info] processed => [List(1453374654941-0)]
[info] Stored => [1453374654941-1)]
[info] Stored => [1453374654941-2)]
[info] processed => [List(1453374654941-1)]
[info] Stored => [1453374654941-3)]
[info] Stored => [1453374654941-4)]
[info] processed => [List(1453374654941-2)]
[info] Stored => [1453374654941-5)]
[info] Stored => [1453374654941-6)]
[info] processed => [List(1453374654941-3)]
[info] Stored => [1453374654941-7)]
[info] Stored => [1453374654941-8)]
[info] processed => [List(1453374654941-4)]
[info] Stored => [1453374654941-9)]
[info] Stored => [1453374654941-10)]

正如您所预期的,存储已经超出了处理的步调。因此,我关闭应用程序并重新启动它。这一次我注释掉了foreachRDD中的睡眠,以便处理可以清除任何待办事项:

代码语言:javascript
复制
[info] Stored => [1453374753946-0)]
[info] processed => [List(1453374753946-0)]
[info] Stored => [1453374753946-1)]
[info] processed => [List(1453374753946-1)]
[info] Stored => [1453374753946-2)]
[info] processed => [List(1453374753946-2)]
[info] Stored => [1453374753946-3)]
[info] processed => [List(1453374753946-3)]
[info] Stored => [1453374753946-4)]
[info] processed => [List(1453374753946-4)]

如您所见,新事件将被处理,但前一批中没有任何事件。旧的WAL日志被清除,我看到这样的日志消息,但是旧的数据没有被处理。

代码语言:javascript
复制
INFO WriteAheadLogManager : Recovered 1 write ahead log files from hdfs://myhdfsserver/user/spark/checkpoint/DStreamResilienceTest/receivedData/0

我做错了什么?我用的是星火1.5.2。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2016-01-26 08:28:53

这是由星火用户邮件列表上的朱世雄(瑞安)回答的。

按照他的建议使用StreamingContext.getOrCreate

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/34922410

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档