我正在编写一个库,将Apache与自定义环境集成起来。我正在实现自定义流源和流编写器。
我正在开发的一些源代码是不可恢复的,至少在应用程序崩溃之后是如此。如果应用程序重新启动,则需要重新加载所有数据。因此,我们希望避免用户不得不显式设置'checkpointLocation‘选项。但是,如果没有提供该选项,我们将看到以下错误:
org.apache.spark.sql.AnalysisException: checkpointLocation must be specified either through option("checkpointLocation", ...) or SparkSession.conf.set("spark.sql.streaming.checkpointLocation", ...);但是,如果我使用控制台流输出,一切都正常。
有办法获得同样的行为吗?
注意:对于流读取器/写入器,我们使用的是火花v2接口。
火花日志:
18/06/29 16:36:48 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir ('file:/C:/mydir/spark-warehouse/').
18/06/29 16:36:48 INFO SharedState: Warehouse path is 'file:/C:/mydir/spark-warehouse/'.
18/06/29 16:36:48 INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint
org.apache.spark.sql.AnalysisException: checkpointLocation must be specified either through option("checkpointLocation", ...) or SparkSession.conf.set("spark.sql.streaming.checkpointLocation", ...);
at org.apache.spark.sql.streaming.StreamingQueryManager$$anonfun$3.apply(StreamingQueryManager.scala:213)
at org.apache.spark.sql.streaming.StreamingQueryManager$$anonfun$3.apply(StreamingQueryManager.scala:208)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:207)
at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:299)
at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:296)
...
18/06/29 16:36:50 INFO SparkContext: Invoking stop() from shutdown hook我就是这样开始流作业的:
spark.readStream().format("mysource").load()
.writeStream().format("mywriter").outputMode(OutputMode.Append()).start();相反,如果我运行的话,一切都很好:
spark.readStream().format("mysource").load()
.writeStream().format("console").outputMode(OutputMode.Append()).start();我不能共享数据编写器的完整代码。总之,我做了这样的事:
class MySourceProvider extends DataSourceRegister with StreamWriteSupport {
def createStreamWriter(queryId: String, schema: StructType, mode: OutputMode, options: DataSourceOptions): StreamWriter = {
new MyStreamWriter(...)
}
def shortName(): String = {
"mywriter"
}
}
class MyStreamWriter(...) extends StreamWriter {
def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
def createWriterFactory(): DataWriterFactory[Row] = {
new MyDataWriterFactory()
}
}发布于 2018-06-29 16:00:00
您需要在代码中添加checkpointLocation
选项(“checkpointLocation”、“/tmp/vaquarkhan/检查点”)。// <--检查点目录
示例:
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import scala.concurrent.duration._
val q = records.
writeStream.
format("console").
option("truncate", false).
option("checkpointLocation", "/tmp/vaquarkhan/checkpoint"). // <-- checkpoint directory
trigger(Trigger.ProcessingTime(10.seconds)).
outputMode(OutputMode.Update).
start你的问题有以下三个选择:
.option("startingOffsets",“最新”) //读取流末尾的数据
如果找不到检查点位置的目录名,createQuery将报告一个AnalysisException。
checkpointLocation must be specified either through option("checkpointLocation", ...) or SparkSession.conf.set("spark.sql.streaming.checkpointLocation", ...)以下是apache spark代码:
private def createQuery(
userSpecifiedName: Option[String],
userSpecifiedCheckpointLocation: Option[String],
df: DataFrame,
extraOptions: Map[String, String],
sink: BaseStreamingSink,
outputMode: OutputMode,
useTempCheckpointLocation: Boolean,
recoverFromCheckpointLocation: Boolean,
trigger: Trigger,
triggerClock: Clock): StreamingQueryWrapper = {
var deleteCheckpointOnStop = false
val checkpointLocation = userSpecifiedCheckpointLocation.map { userSpecified =>
new Path(userSpecified).toUri.toString
}.orElse {
df.sparkSession.sessionState.conf.checkpointLocation.map { location =>
new Path(location, userSpecifiedName.getOrElse(UUID.randomUUID().toString)).toUri.toString
}
}.getOrElse {
if (useTempCheckpointLocation) {
// Delete the temp checkpoint when a query is being stopped without errors.
deleteCheckpointOnStop = true
Utils.createTempDir(namePrefix = s"temporary").getCanonicalPath
} else {
throw new AnalysisException(
"checkpointLocation must be specified either " +
"""through option("checkpointLocation", ...) or """ +
s"""SparkSession.conf.set("${SQLConf.CHECKPOINT_LOCATION.key}", ...)""")
}
}https://stackoverflow.com/questions/50936964
复制相似问题