首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >KeyedCoProcessFunction serving ML模型的状态处理

KeyedCoProcessFunction serving ML模型的状态处理
EN

Stack Overflow用户
提问于 2019-12-01 14:40:39
回答 1查看 299关注 0票数 1

我正在开发一个看起来像这样的KeyedCoProcessFunction:

代码语言:javascript
复制
class MyOperator extends KeyedCoProcessFunction[String, ModelDef, Data, Prediction]
  with CheckpointedFunction {

  // To hold loaded models
  @transient private var models: HashMap[(String, String), Model] = _

  // For serialization purposes
  @transient private var modelsBytes: MapState[(String, String), Array[Bytes]] = _

  ...

  override def snapshotState(context: FunctionSnapshotContext): Unit = {
    modelsBytes.clear() // This raises an exception when there is no active key set
    for ((k, model) <- models) {
      modelsBytes.put(k, model.toBytes(v))
    }
  }

  override def initializeState(context: FunctionInitializationContext): Unit = {
    modelsBytes = context.getKeyedStateStore.getMapState[String, String](
      new MapStateDescriptor("modelsBytes", classOf[String], classOf[String])
    )

    if (context.isRestored) {
      // restore models from modelsBytes
    }
  }

}

状态由使用第三方库构建的ML模型的集合组成。在设置检查点之前,我需要将加载的模型转储到snapshotState中的字节数组中。

我的问题是,在snapshotState中,当没有活动键时,modelsBytes.clear()会引发一个异常。当我在输入流上没有任何数据的情况下从头启动应用程序时,就会发生这种情况。因此,当检查点的时间到来时,我会得到这样的错误:

java.lang.NullPointerException: No key set. This method should not be called outside of a keyed context.

但是,当输入流包含数据时,检查点工作得很好。我对此有点困惑,因为snapshotState不提供键控上下文(与processElement1processElement2相反,可以通过执行ctx.getCurrentKey来访问当前键),所以在我看来,在snapshotState中对clearput的调用应该总是失败的,因为它们应该只在键控上下文中工作。有没有人能澄清这是否是预期中的行为?

EN

回答 1

Stack Overflow用户

发布于 2019-12-02 16:10:43

键控状态只能用于文档中所写的键控流。

代码语言:javascript
复制
 * <p>The state is only accessible by functions applied on a {@code KeyedStream}. The key is
 * automatically supplied by the system, so the function always sees the value mapped to the
 * key of the current element. That way, the system can handle stream and state partitioning
 * consistently together.

如果您调用clear(),您将不会清除整个映射,而只是重置当前关键点的状态。在processElementX中,密钥始终是已知的。

代码语言:javascript
复制
/**
 * Removes the value mapped under the current key.
 */
void clear();

当您尝试在processElementX以外的函数中调用clear时,实际上应该会收到一个更好的异常。最后,您错误地使用了键控状态。

现在来看看你的实际问题。我假设您正在使用KeyedCoProcessFunction,因为模型是在单独的输入中更新的。如果它们是静态的,您可以从静态源(例如,包含在jar中)加载它们的open。此外,通常只有一个模型适用于具有不同键的所有值,那么您可以使用BroadCast状态。所以我假设你对不同类型的数据有不同的模型,用键分隔。

如果它们来自input2,那么您已经在调用processElement2时序列化了它们。

代码语言:javascript
复制
override def processElement2(model: Model, ctx: Context, collector): Unit = {
    models.put(ctx.getCurrentKey, model)
    modelsBytes.put(ctx.getCurrentKey, model.toBytes(v))
}

那么您就不会覆盖snapshotState,因为状态已经是最新的。initializeState会急切地对模型进行反序列化,或者您也可以在processElement1中懒洋洋地实现它们。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/59123188

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档