首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >SparkStreaming:避免checkpointLocation检查

SparkStreaming:避免checkpointLocation检查
EN

Stack Overflow用户
提问于 2018-06-19 21:07:58
回答 1查看 13K关注 0票数 8

我正在编写一个库,将Apache与自定义环境集成起来。我正在实现自定义流源和流编写器。

我正在开发的一些源代码是不可恢复的,至少在应用程序崩溃之后是如此。如果应用程序重新启动,则需要重新加载所有数据。因此,我们希望避免用户不得不显式设置'checkpointLocation‘选项。但是,如果没有提供该选项,我们将看到以下错误:

代码语言:javascript
复制
org.apache.spark.sql.AnalysisException: checkpointLocation must be specified either through option("checkpointLocation", ...) or SparkSession.conf.set("spark.sql.streaming.checkpointLocation", ...);

但是,如果我使用控制台流输出,一切都正常。

有办法获得同样的行为吗?

注意:对于流读取器/写入器,我们使用的是火花v2接口。

火花日志:

代码语言:javascript
复制
18/06/29 16:36:48 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir ('file:/C:/mydir/spark-warehouse/').
18/06/29 16:36:48 INFO SharedState: Warehouse path is 'file:/C:/mydir/spark-warehouse/'.
18/06/29 16:36:48 INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint
org.apache.spark.sql.AnalysisException: checkpointLocation must be specified either through option("checkpointLocation", ...) or SparkSession.conf.set("spark.sql.streaming.checkpointLocation", ...);
    at org.apache.spark.sql.streaming.StreamingQueryManager$$anonfun$3.apply(StreamingQueryManager.scala:213)
    at org.apache.spark.sql.streaming.StreamingQueryManager$$anonfun$3.apply(StreamingQueryManager.scala:208)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:207)
    at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:299)
    at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:296)
    ...
18/06/29 16:36:50 INFO SparkContext: Invoking stop() from shutdown hook

我就是这样开始流作业的:

代码语言:javascript
复制
spark.readStream().format("mysource").load()
  .writeStream().format("mywriter").outputMode(OutputMode.Append()).start();

相反,如果我运行的话,一切都很好:

代码语言:javascript
复制
spark.readStream().format("mysource").load()
  .writeStream().format("console").outputMode(OutputMode.Append()).start();

我不能共享数据编写器的完整代码。总之,我做了这样的事:

代码语言:javascript
复制
class MySourceProvider extends DataSourceRegister with StreamWriteSupport {
  def createStreamWriter(queryId: String, schema: StructType, mode: OutputMode, options: DataSourceOptions): StreamWriter = {
    new MyStreamWriter(...)
  }
  def shortName(): String = {
    "mywriter"
  }
}

class MyStreamWriter(...) extends StreamWriter { 
  def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
  def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
  def createWriterFactory(): DataWriterFactory[Row] = {
    new MyDataWriterFactory()
  }
}
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2018-06-29 16:00:00

您需要在代码中添加checkpointLocation

选项(“checkpointLocation”、“/tmp/vaquarkhan/检查点”)。// <--检查点目录

示例:

代码语言:javascript
复制
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import scala.concurrent.duration._
val q = records.
  writeStream.
  format("console").
  option("truncate", false).
  option("checkpointLocation", "/tmp/vaquarkhan/checkpoint"). // <-- checkpoint directory
  trigger(Trigger.ProcessingTime(10.seconds)).
  outputMode(OutputMode.Update).
  start

你的问题有以下三个选择:

.option("startingOffsets",“最新”) //读取流末尾的数据

  • 最早--在溪流开始时开始阅读。这不包括已经从Kafka中删除的数据,因为它比保留期(“过期”数据)要早。
  • 最近-现在开始,只处理在查询启动后到达的新数据。
  • 每个分区分配-为每个分区指定要开始的精确偏移量,允许细粒度控制处理应该从何处开始。例如,如果我们想准确地获取其他系统或查询中断的位置,则可以利用此选项。

如果找不到检查点位置的目录名,createQuery将报告一个AnalysisException。

代码语言:javascript
复制
checkpointLocation must be specified either through option("checkpointLocation", ...) or SparkSession.conf.set("spark.sql.streaming.checkpointLocation", ...)

以下是apache spark代码:

代码语言:javascript
复制
  private def createQuery(
      userSpecifiedName: Option[String],
      userSpecifiedCheckpointLocation: Option[String],
      df: DataFrame,
      extraOptions: Map[String, String],
      sink: BaseStreamingSink,
      outputMode: OutputMode,
      useTempCheckpointLocation: Boolean,
      recoverFromCheckpointLocation: Boolean,
      trigger: Trigger,
      triggerClock: Clock): StreamingQueryWrapper = {
    var deleteCheckpointOnStop = false
    val checkpointLocation = userSpecifiedCheckpointLocation.map { userSpecified =>
      new Path(userSpecified).toUri.toString
    }.orElse {
      df.sparkSession.sessionState.conf.checkpointLocation.map { location =>
        new Path(location, userSpecifiedName.getOrElse(UUID.randomUUID().toString)).toUri.toString
      }
    }.getOrElse {
      if (useTempCheckpointLocation) {
        // Delete the temp checkpoint when a query is being stopped without errors.
        deleteCheckpointOnStop = true
        Utils.createTempDir(namePrefix = s"temporary").getCanonicalPath
      } else {
        throw new AnalysisException(
          "checkpointLocation must be specified either " +
            """through option("checkpointLocation", ...) or """ +
            s"""SparkSession.conf.set("${SQLConf.CHECKPOINT_LOCATION.key}", ...)""")
      }
    }
票数 4
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/50936964

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档