我正在开发一个看起来像这样的KeyedCoProcessFunction:
class MyOperator extends KeyedCoProcessFunction[String, ModelDef, Data, Prediction]
with CheckpointedFunction {
// To hold loaded models
@transient private var models: HashMap[(String, String), Model] = _
// For serialization purposes
@transient private var modelsBytes: MapState[(String, String), Array[Bytes]] = _
...
override def snapshotState(context: FunctionSnapshotContext): Unit = {
modelsBytes.clear() // This raises an exception when there is no active key set
for ((k, model) <- models) {
modelsBytes.put(k, model.toBytes(v))
}
}
override def initializeState(context: FunctionInitializationContext): Unit = {
modelsBytes = context.getKeyedStateStore.getMapState[String, String](
new MapStateDescriptor("modelsBytes", classOf[String], classOf[String])
)
if (context.isRestored) {
// restore models from modelsBytes
}
}
}状态由使用第三方库构建的ML模型的集合组成。在设置检查点之前,我需要将加载的模型转储到snapshotState中的字节数组中。
我的问题是,在snapshotState中,当没有活动键时,modelsBytes.clear()会引发一个异常。当我在输入流上没有任何数据的情况下从头启动应用程序时,就会发生这种情况。因此,当检查点的时间到来时,我会得到这样的错误:
java.lang.NullPointerException: No key set. This method should not be called outside of a keyed context.
但是,当输入流包含数据时,检查点工作得很好。我对此有点困惑,因为snapshotState不提供键控上下文(与processElement1和processElement2相反,可以通过执行ctx.getCurrentKey来访问当前键),所以在我看来,在snapshotState中对clear和put的调用应该总是失败的,因为它们应该只在键控上下文中工作。有没有人能澄清这是否是预期中的行为?
发布于 2019-12-02 16:10:43
键控状态只能用于文档中所写的键控流。
* <p>The state is only accessible by functions applied on a {@code KeyedStream}. The key is
* automatically supplied by the system, so the function always sees the value mapped to the
* key of the current element. That way, the system can handle stream and state partitioning
* consistently together.如果您调用clear(),您将不会清除整个映射,而只是重置当前关键点的状态。在processElementX中,密钥始终是已知的。
/**
* Removes the value mapped under the current key.
*/
void clear();当您尝试在processElementX以外的函数中调用clear时,实际上应该会收到一个更好的异常。最后,您错误地使用了键控状态。
现在来看看你的实际问题。我假设您正在使用KeyedCoProcessFunction,因为模型是在单独的输入中更新的。如果它们是静态的,您可以从静态源(例如,包含在jar中)加载它们的open。此外,通常只有一个模型适用于具有不同键的所有值,那么您可以使用BroadCast状态。所以我假设你对不同类型的数据有不同的模型,用键分隔。
如果它们来自input2,那么您已经在调用processElement2时序列化了它们。
override def processElement2(model: Model, ctx: Context, collector): Unit = {
models.put(ctx.getCurrentKey, model)
modelsBytes.put(ctx.getCurrentKey, model.toBytes(v))
}那么您就不会覆盖snapshotState,因为状态已经是最新的。initializeState会急切地对模型进行反序列化,或者您也可以在processElement1中懒洋洋地实现它们。
https://stackoverflow.com/questions/59123188
复制相似问题