首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用状态的Flink KeyedCoProcessFunction

使用状态的Flink KeyedCoProcessFunction
EN

Stack Overflow用户
提问于 2022-03-28 13:59:06
回答 1查看 319关注 0票数 1

我使用KeyedCoProcessFunction函数来使用来自另一个流的数据丰富主数据。

代码:

代码语言:javascript
复制
class AssetDataEnrichment extends KeyedCoProcessFunction[String, PacketData, AssetCommandState, AssetData] with LazyLogging {

  case class AssetStateDoc(assetId: Option[String])
  private var associatedDevices: ValueState[AssetStateDoc] = _

  override def open(parameters: Configuration): Unit = {
    val associatedDevicesDescriptor =
      new ValueStateDescriptor[AssetStateDoc]("associatedDevices", classOf[AssetStateDoc])
    associatedDevices = getRuntimeContext.getState[AssetStateDoc](associatedDevicesDescriptor)
  }

  override def processElement1(
                                packet: PacketData,
                                ctx: KeyedCoProcessFunction[String, PacketData, AssetCommandState, AssetData]#Context,
                                out: Collector[AssetData]): Unit = {
    
    val tmpState = associatedDevices.value
    val state = if (tmpState == null) AssetStateDoc(None) else tmpState
    
    state.assetId match {
      case Some(assetId) =>
        logger.debug(s"There are state for ${packet.tag.externalId} = $assetId")
        out.collect(AssetData(assetId, packet.tag.externalId.get, packet.toString))
      case None => logger.debug(s"No state for a packet ${packet.tag.externalId}")
      case _ => logger.debug("Smth went wrong")
    }
  }

  override def processElement2(
                                value: AssetCommandState,
                                ctx: KeyedCoProcessFunction[String, PacketData, AssetCommandState, AssetData]#Context,
                                out: Collector[AssetData]): Unit = {
    
    value.command match {
      case CREATE =>
        logger.debug(s"Got command to CREATE state for tag: ${value.id} with value: ${value.assetId}")
        logger.debug(s"current state is ${associatedDevices.value()}")
        associatedDevices.update(AssetStateDoc(Some(value.assetId)))
        logger.debug(s"new state is ${associatedDevices.value()}")
      case _ =>
        logger.error("Got unknown AssetCommandState command")
    }
  }
}

processElement2()运行良好,它接受数据并更新状态。

但是在processElement1()中,我总是点击case None => logger.debug(s"No state for a packet ${packet.tag.externalId}")

尽管我希望在processElement2函数中设置一个值

作为一个例子,我使用了这个指南- https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/state/

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-03-28 18:47:30

processElement1processElement2确实共享状态,但请记住这是密钥分区状态。这意味着,在processElement2中处理给定值v2时设置的值只有在以后调用时才会在processElement1中看到,而具有与v2具有相同键的值v1。

还请记住,您无法控制进入processElement1processElement2的两个流之间的竞争条件。

来自官方Apache培训的RidesAndFares运动全部是学习如何使用API的这一部分。https://nightlies.apache.org/flink/flink-docs-stable/docs/learn-flink/etl/是相应教程的主页。

票数 4
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/71648709

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档