根据我读过的这个问题和文档,星火流的foreachRDD( someFunction )只会在驱动程序进程中执行someFunction本身,尽管如果对RDD进行操作,那么它们将在执行程序上执行-- RDDs位于的位置。
尽管我注意到,如果打开检查点,那么spark似乎正在尝试序列化foreachRDD(someFunction)中的所有内容并发送到某个地方--这对我来说也同样有效,这给我带来了麻烦,因为使用的对象之一是不可序列化的(即schemaRegistryClient)。我试过Kryo串行口,但也没有运气。
如果我关闭检查点,序列化问题就会消失。
是否有一种方法可以让Spark不序列化foreachRDD(someFunc)中使用的内容,同时也继续使用检查点?
非常感谢。
发布于 2016-09-22 18:10:59
是否有一种方法可以让Spark不序列化foreachRDD(someFunc)中使用的内容,同时也继续使用检查点?
检查点不应该与你的问题有任何关系。根本的问题是,您有一个不可序列化的对象实例,需要发送给您的工作人员。
当您有这样一个依赖项时,可以在星火中使用一个通用模式。创建一个具有延迟瞬态属性的object,该属性将在需要时加载到工作节点中:
object RegisteryWrapper {
@transient lazy val schemaClient: SchemaRegisteryClient = new SchemaRegisteryClient()
}当您需要在foreachRDD内部使用它时
someStream.foreachRDD {
rdd => rdd.foreachPartition { iterator =>
val schemaClient = RegisteryWrapper.schemaClient
iterator.foreach(schemaClient.send(_))
}
}发布于 2016-09-22 17:58:44
这里有几件事很重要:
发布于 2016-09-22 19:29:12
问题可能与检查点数据有关,如果您更改了代码中的任何内容,则需要删除旧的检查点元数据。
https://stackoverflow.com/questions/39645485
复制相似问题