首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >无法分析数据

无法分析数据
EN

Stack Overflow用户
提问于 2019-11-02 21:03:30
回答 1查看 56关注 0票数 0

val模式= ctx.getBroadcastState(patternStateDescriptor)

我做的进口

代码语言:javascript
复制
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.common.state.{MapStateDescriptor, ValueState, ValueStateDescriptor}
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.datastream.BroadcastStream
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

下面是代码

代码语言:javascript
复制
    val env = StreamExecutionEnvironment.getExecutionEnvironment

  val properties = new Properties()
  properties.setProperty("bootstrap.servers","localhost:9092")

  val patternStream = new FlinkKafkaConsumer010("patterns", new SimpleStringSchema, properties)

  val patterns = env.addSource(patternStream)

  var patternData = patterns.map {
    str =>
      val splitted_str = str.split(",")
      PatternStream(splitted_str(0).trim, splitted_str(1).trim, splitted_str(2).trim)
  }

  val logsStream = new FlinkKafkaConsumer010("logs", new SimpleStringSchema, properties)

//  logsStream.setStartFromEarliest()

  val logs = env.addSource(logsStream)

  var data = logs.map {
    str =>
      val splitted_str = str.split(",")
      LogsTest(splitted_str.head.trim, splitted_str(1).trim, splitted_str(2).trim)
  }

  val keyedData: KeyedStream[LogsTest, String] = data.keyBy(_.metric)

  val bcStateDescriptor = new MapStateDescriptor[Unit, PatternStream]("patterns", Types.UNIT, Types.of[PatternStream]) // first type defined is for the key and second data type defined is for the value

  val broadcastPatterns: BroadcastStream[PatternStream]  = patternData.broadcast(bcStateDescriptor)

  val alerts = keyedData
      .connect(broadcastPatterns)
      .process(new PatternEvaluator())

  alerts.print()

//   println(alerts.getClass)
//  val sinkProducer = new FlinkKafkaProducer010("output",  new SimpleStringSchema(), properties)



  env.execute("Flink Broadcast State Job")
}

class PatternEvaluator()
  extends KeyedBroadcastProcessFunction[String, LogsTest, PatternStream, (String, String, String)] {

  private lazy val patternStateDescriptor = new MapStateDescriptor("patterns", classOf[String], classOf[String])

  private var lastMetricState: ValueState[String] = _

  override def open(parameters: Configuration): Unit = {
    val lastMetricDescriptor = new ValueStateDescriptor("last-metric", classOf[String])

    lastMetricState = getRuntimeContext.getState(lastMetricDescriptor)
  }

  override def processElement(reading: LogsTest,
                              readOnlyCtx: KeyedBroadcastProcessFunction[String, LogsTest, PatternStream, (String, String, String)]#ReadOnlyContext,
                              out: Collector[(String, String, String)]): Unit = {

    val metrics = readOnlyCtx.getBroadcastState(patternStateDescriptor)
    if (metrics.contains(reading.metric)) {
      val metricPattern: String = metrics.get(reading.metric)
      val metricPatternValue: String = metrics.get(reading.value)
      val lastMetric = lastMetricState.value()

      val logsMetric = (reading.metric)
      val logsValue = (reading.value)


      if (logsMetric == metricPattern) {
        if (metricPatternValue == logsValue) {
          out.collect((reading.timestamp, reading.value, reading.metric))
        }
      }
    }
  }


  override def processBroadcastElement(
                                        update: PatternStream,
                                        ctx: KeyedBroadcastProcessFunction[String, LogsTest, PatternStream, (String, String, String)]#Context,
                                        out: Collector[(String, String, String)]
                                      ): Unit = {
    val patterns = ctx.getBroadcastState(patternStateDescriptor)

    if (update.metric == "IP") {
      patterns.put(update.metric /*,update.operator*/ , update.value)
    }
    //    else if (update.metric == "username"){
    //      patterns.put(update.metric, update.value)
    //    }
    //    else {
    //      println("No required data found")
    //    }
    //  }

  }
}

示例数据:- Logs流

代码语言:javascript
复制
"21/09/98","IP", "5.5.5.5"

模式流

代码语言:javascript
复制
"IP","==","5.5.5.5"

我无法通过获得期望的结果来分析数据,即= 21/09/98,IP,5.5.5.5

到目前为止没有错误,只是没有分析数据

代码正在读取流(选中)

EN

回答 1

Stack Overflow用户

发布于 2019-11-04 21:43:49

在这种情况下,一个常见的问题来源是API不能控制模式和数据被摄取的顺序。可能是在processBroadcastElement之前调用了processElement。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/58671386

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档