首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >foreachRDD()中使用的对象的序列化( CheckPointing )

foreachRDD()中使用的对象的序列化( CheckPointing )
EN

Stack Overflow用户
提问于 2016-09-22 17:38:20
回答 3查看 927关注 0票数 3

根据我读过的这个问题和文档,星火流的foreachRDD( someFunction )只会在驱动程序进程中执行someFunction本身,尽管如果对RDD进行操作,那么它们将在执行程序上执行-- RDDs位于的位置。

尽管我注意到,如果打开检查点,那么spark似乎正在尝试序列化foreachRDD(someFunction)中的所有内容并发送到某个地方--这对我来说也同样有效,这给我带来了麻烦,因为使用的对象之一是不可序列化的(即schemaRegistryClient)。我试过Kryo串行口,但也没有运气。

如果我关闭检查点,序列化问题就会消失。

是否有一种方法可以让Spark不序列化foreachRDD(someFunc)中使用的内容,同时也继续使用检查点?

非常感谢。

EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2016-09-22 18:10:59

是否有一种方法可以让Spark不序列化foreachRDD(someFunc)中使用的内容,同时也继续使用检查点?

检查点不应该与你的问题有任何关系。根本的问题是,您有一个不可序列化的对象实例,需要发送给您的工作人员。

当您有这样一个依赖项时,可以在星火中使用一个通用模式。创建一个具有延迟瞬态属性的object,该属性将在需要时加载到工作节点中:

代码语言:javascript
复制
object RegisteryWrapper {
  @transient lazy val schemaClient: SchemaRegisteryClient = new SchemaRegisteryClient()
}

当您需要在foreachRDD内部使用它时

代码语言:javascript
复制
someStream.foreachRDD { 
   rdd => rdd.foreachPartition { iterator => 
       val schemaClient = RegisteryWrapper.schemaClient
       iterator.foreach(schemaClient.send(_))
  }
}
票数 5
EN

Stack Overflow用户

发布于 2016-09-22 17:58:44

这里有几件事很重要:

  1. 不能在对工作人员(即RDD内部)执行的代码中使用此客户端。
  2. 您可以使用瞬态客户端字段创建对象,并在作业重新启动后重新创建对象。如何实现这一点的示例可以找到这里。
  3. 同样的原则也适用于广播和累加器变量。
  4. 检查点保存数据、作业元数据和代码逻辑。当代码被更改时,检查点将变得无效。
票数 0
EN

Stack Overflow用户

发布于 2016-09-22 19:29:12

问题可能与检查点数据有关,如果您更改了代码中的任何内容,则需要删除旧的检查点元数据。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/39645485

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档