首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Spark没有从Elasticsearch中获取所有源字段

Spark没有从Elasticsearch中获取所有源字段
EN

Stack Overflow用户
提问于 2016-03-04 07:16:11
回答 2查看 768关注 0票数 1

我在Elasticseach (本地单节点服务器)中有以下数据

seach命令curl -XPOST 'localhost:9200/sparkdemo/_search?pretty' -d '{ "query": { "match_all": {} } }'

产出:

代码语言:javascript
复制
{
  "took" : 4,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "failed" : 0
  },
  "hits" : {
    "total" : 10,
    "max_score" : 1.0,
    "hits" : [ {
      "_index" : "sparkdemo",
      "_type" : "hrinfo",
      "_id" : "AVNAY_H0lYe0cQl--Bin",
      "_score" : 1.0,
      "_source" : {
        "date" : "9/Mar/2016",
        "pid" : "1",
        "propName" : "HEARTRATE",
        "var" : null,
        "propValue" : 86,
        "avg" : 86,
        "stage" : "S1"
      }
    }, {
      "_index" : "sparkdemo",
      "_type" : "hrinfo",
      "_id" : "AVNAY_KklYe0cQl--Bir",
      "_score" : 1.0,
      "_source" : {
        "date" : "13/Mar/2016",
        "pid" : "1",
        "propName" : "HEARTRATE",
        "var" : null,
        "propValue" : 86,
        "avg" : 87,
        "stage" : "S1"
      }
    }, {
      "_index" : "sparkdemo",
      "_type" : "hrinfo",
      "_id" : "AVNAY-TolYe0cQl--Bii",
      "_score" : 1.0,
      "_source" : {
        "date" : "4/Mar/2016",
        "pid" : "1",
        "propName" : "HEARTRATE",
        "var" : null,
        "propValue" : 82,
        "avg" : 82,
        "stage" : "S0"
      }
    }, 
.......
... Few more records
..........
    }, {
      "_index" : "sparkdemo",
      "_type" : "hrinfo",
      "_id" : "AVNAY_KklYe0cQl--Biq",
      "_score" : 1.0,
      "_source" : {
        "date" : "12/Mar/2016",
        "pid" : "1",
        "propName" : "HEARTRATE",
        "var" : null,
        "propValue" : 91,
        "avg" : 89,
        "stage" : "S1"
      }
    } ]
  }
}

我试图获取星火程序(本地独立程序运行自eclipse)中的所有数据。

代码语言:javascript
复制
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.elasticsearch.spark._
import scala.collection.mutable.Map;

object Test1 {

  def main(args: Array[String]) {
    val conf = new SparkConf().setMaster("local[2]").setAppName("HRInfo");
    val sc = new SparkContext(conf);

    val esRdd = sc.esRDD("sparkdemo/hrinfo", "?q=*");

    val searchResultRDD = esRdd.map(t => {
      println("id:" + t._1 + ", map:" + t._2);
      t._2;
    });

    val infoRDD = searchResultRDD.collect().foreach(map => {
      var stage = map.get("stage");
      var pid = map.get("pid");
      var date = map.get("date");
      var propName = map.get("propName");
      var propValue = map.get("propValue");
      var avg = map.get("avg");
      var variation = map.get("var");

      println("Info(" + stage + "," + pid + "," + date + "," + propName + "," + propValue + "," + avg + "," + variation + ")");

    });

  }
}

但是程序并没有获取存储在ElasticSearch中的所有记录文件。

程序输出:

代码语言:javascript
复制
id:AVNAY_H0lYe0cQl--Bin, map:Map(date -> 9/Mar/2016, pid -> 1, propName -> HEARTRATE)
id:AVNAY_KklYe0cQl--Bir, map:Map(date -> 13/Mar/2016, pid -> 1, propName -> HEARTRATE)
id:AVNAY-TolYe0cQl--Bii, map:Map(date -> 4/Mar/2016, pid -> 1, propName -> HEARTRATE)
id:AVNAY_H0lYe0cQl--Bio, map:Map(date -> 10/Mar/2016, pid -> 1, propName -> HEARTRATE)
id:AVNAY_KklYe0cQl--Bip, map:Map(date -> 11/Mar/2016, pid -> 1, propName -> HEARTRATE)
id:AVNAY-TolYe0cQl--Bij, map:Map(date -> 5/Mar/2016, pid -> 1, propName -> HEARTRATE)
id:AVNAY-Y9lYe0cQl--Bil, map:Map(date -> 7/Mar/2016, pid -> 1, propName -> HEARTRATE)
id:AVNAY-Y9lYe0cQl--Bim, map:Map(date -> 8/Mar/2016, pid -> 1, propName -> HEARTRATE)
id:AVNAY-Y9lYe0cQl--Bik, map:Map(date -> 6/Mar/2016, pid -> 1, propName -> HEARTRATE)
id:AVNAY_KklYe0cQl--Biq, map:Map(date -> 12/Mar/2016, pid -> 1, propName -> HEARTRATE)
Info(None,Some(1),Some(9/Mar/2016),Some(HEARTRATE),None,None,None)
Info(None,Some(1),Some(13/Mar/2016),Some(HEARTRATE),None,None,None)
Info(None,Some(1),Some(4/Mar/2016),Some(HEARTRATE),None,None,None)
Info(None,Some(1),Some(10/Mar/2016),Some(HEARTRATE),None,None,None)
Info(None,Some(1),Some(11/Mar/2016),Some(HEARTRATE),None,None,None)
Info(None,Some(1),Some(5/Mar/2016),Some(HEARTRATE),None,None,None)
Info(None,Some(1),Some(7/Mar/2016),Some(HEARTRATE),None,None,None)
Info(None,Some(1),Some(8/Mar/2016),Some(HEARTRATE),None,None,None)
Info(None,Some(1),Some(6/Mar/2016),Some(HEARTRATE),None,None,None)
Info(None,Some(1),Some(12/Mar/2016),Some(HEARTRATE),None,None,None)

程序获取所有记录,但在每个记录中不获取其他字段(即阶段、propValue、avg和变量)为什么?非常感谢。

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2016-03-07 21:50:45

这是因为您的文档中有"var": null值。每个文档中的"var": null和以下所有的值都不会将其放入Scala中的映射中。

您可以通过用一个非空值(例如,"var": null )替换其中一个"var": null值来显示这一点。然后,您将得到所有正确返回的值,正如您所期望的。或者,您可以在文档的开头放置一个空值。例如:

代码语言:javascript
复制
curl -X POST 'http://localhost:9200/sparkdemo/hrinfo/5' -d '{"test":null,"date": "9/Mar/2016","pid": "1","propName": "HEARTRATE","propValue": 86,"avg": 86,"stage": "S1"}'

该文件的地图将为空:

代码语言:javascript
复制
id:5, map:Map()
票数 1
EN

Stack Overflow用户

发布于 2017-01-31 16:40:14

试试这个:

代码语言:javascript
复制
import org.elasticsearch.spark.sql._

val sql = new SQLContext(sc)
val index1 = sql.esDF("index/type")
println(index1.schema.treeString)
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/35790168

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档