首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Elasticsearch to Spark Streaming

Elasticsearch to Spark Streaming
EN

Stack Overflow用户
提问于 2017-05-10 17:53:54
回答 1查看 2K关注 0票数 4

我正在分析日志,我有这样的架构:

kafka -> spark streaming ->弹性搜索

我的主要目标是在流媒体中创建机器学习模型。我认为我可以做两件事:

1) Kafka -> spark Streaming (ML) ->弹性搜索

2) Kafka -> spark Streaming-> elasticsearch -> spark streaming(ML)

-I认为第二种架构是最好的,因为spark streaming将直接使用索引数据。你认为如何?-If我们在spark streaming中创建一个模型(在弹性搜索之后)我们必须在这个地方使用这个模型(在elasticsearch之后)还是我们可以在spark streaming中使用它(在kafka之后直接使用)?#use==预测在elasticsearch使我们的模型静态(或不是在实时方法中)后实时-Does创建模型

谢谢。

EN

回答 1

Stack Overflow用户

发布于 2017-08-28 16:05:01

你是认真的吗?

kafka -> spark Streaming -> elasticsearch数据库

代码语言:javascript
复制
val sqlContext = new SQLContext(sc)

//kafka group
val group_id = "receiveScanner"
// kafka topic
val topic = Map("testStreaming"-> 1)
// zk connect
val zkParams = Map(
  "zookeeper.connect" ->"localhost",
  "zookeeper.connection.timeout.ms" -> "10000",
  "group.id" -> group_id)

// Kafka
val kafkaConsumer = KafkaUtils.createStream[String,String,StringDecoder,StringDecoder](ssc,zkParams,topic,StorageLevel.MEMORY_ONLY_SER)
val receiveData = kafkaConsumer.map(_._2 )
// printer kafka data
receiveData.print()
receiveData.foreachRDD{ rdd=>
  val transform = rdd.map{ line =>
    val data = Json.parse(line)
    // play json parse
    val id = (data \ "id").asOpt[Int] match { case Some(x) => x; case None => 0}
    val name = ( data \ "name"  ).asOpt[String] match { case Some(x)=> x ; case None => "" }
    val age = (data \ "age").asOpt[Int] match { case Some(x) => x; case None => 0}
    val address = ( data \ "address"  ).asOpt[String] match { case Some(x)=> x ; case None => "" }
    Row(id,name,age,address)
  }

  val transfromrecive = sqlContext.createDataFrame(transform,schameType)
  import org.apache.spark.sql.functions._
  import org.elasticsearch.spark.sql._
  //filter age < 20 , to ES database
  transfromrecive.where(col("age").<(20)).orderBy(col("age").asc)
    .saveToEs("member/user",Map("es.mapping.id" -> "id"))
}

}

/** * dataframe schame * */

代码语言:javascript
复制
def schameType =  StructType(
  StructField("id",IntegerType,false)::
  StructField("name",StringType,false)::
  StructField("age",IntegerType,false)::
  StructField("address",StringType,false)::
  Nil
)
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/43889053

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档