我正在分析日志,我有这样的架构:
kafka -> spark streaming ->弹性搜索
我的主要目标是在流媒体中创建机器学习模型。我认为我可以做两件事:
1) Kafka -> spark Streaming (ML) ->弹性搜索
2) Kafka -> spark Streaming-> elasticsearch -> spark streaming(ML)
-I认为第二种架构是最好的,因为spark streaming将直接使用索引数据。你认为如何?-If我们在spark streaming中创建一个模型(在弹性搜索之后)我们必须在这个地方使用这个模型(在elasticsearch之后)还是我们可以在spark streaming中使用它(在kafka之后直接使用)?#use==预测在elasticsearch使我们的模型静态(或不是在实时方法中)后实时-Does创建模型
谢谢。
发布于 2017-08-28 16:05:01
你是认真的吗?
kafka -> spark Streaming -> elasticsearch数据库
val sqlContext = new SQLContext(sc)
//kafka group
val group_id = "receiveScanner"
// kafka topic
val topic = Map("testStreaming"-> 1)
// zk connect
val zkParams = Map(
"zookeeper.connect" ->"localhost",
"zookeeper.connection.timeout.ms" -> "10000",
"group.id" -> group_id)
// Kafka
val kafkaConsumer = KafkaUtils.createStream[String,String,StringDecoder,StringDecoder](ssc,zkParams,topic,StorageLevel.MEMORY_ONLY_SER)
val receiveData = kafkaConsumer.map(_._2 )
// printer kafka data
receiveData.print()
receiveData.foreachRDD{ rdd=>
val transform = rdd.map{ line =>
val data = Json.parse(line)
// play json parse
val id = (data \ "id").asOpt[Int] match { case Some(x) => x; case None => 0}
val name = ( data \ "name" ).asOpt[String] match { case Some(x)=> x ; case None => "" }
val age = (data \ "age").asOpt[Int] match { case Some(x) => x; case None => 0}
val address = ( data \ "address" ).asOpt[String] match { case Some(x)=> x ; case None => "" }
Row(id,name,age,address)
}
val transfromrecive = sqlContext.createDataFrame(transform,schameType)
import org.apache.spark.sql.functions._
import org.elasticsearch.spark.sql._
//filter age < 20 , to ES database
transfromrecive.where(col("age").<(20)).orderBy(col("age").asc)
.saveToEs("member/user",Map("es.mapping.id" -> "id"))
}
}/** * dataframe schame * */
def schameType = StructType(
StructField("id",IntegerType,false)::
StructField("name",StringType,false)::
StructField("age",IntegerType,false)::
StructField("address",StringType,false)::
Nil
)https://stackoverflow.com/questions/43889053
复制相似问题