首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何使用火花流处理实时流数据/日志?

如何使用火花流处理实时流数据/日志?
EN

Stack Overflow用户
提问于 2016-04-19 06:28:34
回答 1查看 1.4K关注 0票数 0

我是斯派克和斯卡拉的新手。

我想实现一个实时Spark,它可以每分钟读取网络日志,从Kafka获取大约1GB的JSON日志行/分钟,最后将聚合的值存储在ElasticSearch中。

聚合基于少量的值,如bytes_in、bytes_out等,使用组合密钥,如:客户端MAC、客户端IP、服务器MAC、服务器IP等。

我所写的“火花消费者”是:

代码语言:javascript
复制
object LogsAnalyzerScalaCS{
    def main(args : Array[String]) {
          val sparkConf = new SparkConf().setAppName("LOGS-AGGREGATION")
          sparkConf.set("es.nodes", "my ip address")
          sparkConf.set("es.port", "9200")
          sparkConf.set("es.index.auto.create", "true")
          sparkConf.set("es.nodes.discovery", "false")

          val elasticResource = "conrec_1min/1minute"
          val ssc = new StreamingContext(sparkConf, Seconds(30))
          val zkQuorum = "my zk quorum IPs:2181"
          val consumerGroupId = "LogsConsumer"
          val topics = "Logs"
          val topicMap = topics.split(",").map((_,3)).toMap
          val json = KafkaUtils.createStream(ssc, zkQuorum, consumerGroupId, topicMap)
          val logJSON = json.map(_._2)
          try{
            logJSON.foreachRDD( rdd =>{
              if(!rdd.isEmpty()){
                  val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
                  import sqlContext.implicits._
                  val df = sqlContext.read.json(rdd)
                  val groupedData = 
((df.groupBy("id","start_time_formated","l2_c","l3_c",
"l4_c","l2_s","l3_s","l4_s")).agg(count("f_id") as "total_f", sum("p_out") as "total_p_out",sum("p_in") as "total_p_in",sum("b_out") as "total_b_out",sum("b_in") as "total_b_in", sum("duration") as "total_duration"))
                  val dataForES = groupedData.withColumnRenamed("start_time_formated", "start_time")
                  dataForES.saveToEs(elasticResource)
                  dataForES.show();
                }
              })
             }
          catch{
            case e: Exception => print("Exception has occurred : "+e.getMessage)
          }
          ssc.start()
          ssc.awaitTermination()
        }

object SQLContextSingleton {
    @transient  private var instance: org.apache.spark.sql.SQLContext = _
    def getInstance(sparkContext: SparkContext): org.apache.spark.sql.SQLContext = {
      if (instance == null) {
        instance = new org.apache.spark.sql.SQLContext(sparkContext)
      }
      instance
    }
  }
}

首先我想知道我的方法是正确的还是没有考虑到我需要1分钟的日志聚合

使用此代码似乎存在一个问题:

  1. 此使用者将每30秒从Kafka broker中提取数据,并将最终聚合保存到Elasticsearch中,以保存30秒数据,从而增加Elasticsearch中的行数,使其每分钟至少有2个条目。UI工具,比方说Kibana需要做进一步的聚合。如果我将轮询时间从30秒增加到60秒,那么聚合需要大量时间,因此根本不需要实时。
  2. 我希望以这样一种方式来实现它,即在ElasticSearch中,每个键只保存一行。因此,我想要执行聚合,直到我没有在数据集中获得新的键值,这是从Kafka broker每分钟提取的。在做了一些googling之后,我发现可以使用groupByKey()和updateStateByKey()函数来实现这一点,但是如果我将JSON线转换成一个平面值的日志行,然后在那里使用这些函数,我就无法理解在我的情况下如何使用它了?如果我要使用这些函数,那么什么时候应该将最终的聚合值保存到ElasticSearch中?
  3. 还有其他方法来实现这一目标吗?

您的快速帮助将不胜感激。

你好,Bhupesh

EN

回答 1

Stack Overflow用户

发布于 2018-02-13 18:21:09

代码语言:javascript
复制
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object Main {
def main(args: Array[String]): Unit = {


val conf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(15))

val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "localhost:9092",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "group1",
  "auto.offset.reset" -> "earliest",
  "enable.auto.commit" -> (false: java.lang.Boolean)
)//,localhost:9094,localhost:9095"

val topics = Array("test")
val stream = KafkaUtils.createDirectStream[String, String](
  ssc,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
)

val out = stream.map(record =>
  record.value
)

val words = out.flatMap(_.split(" "))
val count = words.map(word => (word, 1))
val wdc = count.reduceByKey(_+_)

val sqlContext = SQLContext.getOrCreate(SparkContext.getOrCreate())

wdc.foreachRDD{rdd=>
        val es = sqlContext.createDataFrame(rdd).toDF("word","count")
        import org.elasticsearch.spark.sql._
        es.saveToEs("wordcount/testing")
  es.show()
}

ssc.start()
ssc.awaitTermination()

 }
}

查看完整的示例和sbt

阿帕奇火花斯卡拉hadoop卡夫卡apache火花-sql 火花流apache-spark-2.0弹力

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/36710221

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档