首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >有没有人能够在Spark中使用elasticsearch xpack sql?

有没有人能够在Spark中使用elasticsearch xpack sql?
EN

Stack Overflow用户
提问于 2019-01-31 08:04:05
回答 1查看 649关注 0票数 3

使用PySpark,我试图从elasticsearch中读取数据。通常,我会将查询设置为类似的内容(参见下面的查询),并将索引设置为“my_ es.resource /doc”,这样我就可以将数据读取到spark中:

代码语言:javascript
复制
q ="""{
          "query": {
              "match_all": {}
          }  
      }"""

但是,最近我在kibana上尝试了_xpack/sql,在其他SQL客户端上尝试了JDBC,它们在获取数据方面都工作得很好。但是,当我尝试在我的pyspark代码中引用_xpack时,我得到了以下错误:

代码语言:javascript
复制
Py4JJavaError: An error occurred while calling 
z:org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD.
: org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: 
org.elasticsearch.hadoop.rest.EsHadoopRemoteException: 
invalid_index_name_exception: Invalid index name [_xpack], must not start with '_'.
null

下面你会发现我的代码摘录,我正在尝试使用它来执行pyspark,提前谢谢!

代码语言:javascript
复制
q = """{"query": "select * from eg_flight limit 1"}"""

es_read_conf = {
    "es.nodes" : "192.168.1.71,192.168.1.72,192.168.1.73",
    "es.port" : "9200",
    "es.resource" :  "_xpack/sql",
    "es.query" : q
}

es_rdd = sc.newAPIHadoopRDD(
    inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat",
    keyClass="org.apache.hadoop.io.NullWritable", 
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
    conf=es_read_conf)
EN

回答 1

Stack Overflow用户

发布于 2019-01-31 14:08:39

我不认为这个功能是受支持的。PySpark中的另一种解决方案是使用JDBC驱动程序,我确实尝试过。我尝试了以下几种方法:

代码语言:javascript
复制
es_df = spark.read.jdbc(url="jdbc:es://http://192.168.1.71:9200", table = "(select * from eg_flight) mytable")

我得到了以下错误:

代码语言:javascript
复制
Py4JJavaError: An error occurred while calling o2488.jdbc.
: java.sql.SQLFeatureNotSupportedException: Found 1 problem(s)
line 1:8: Unexecutable item

...

另一种方法是使用核心Python并进行请求,但我不建议将其用于大型数据集。

代码语言:javascript
复制
import requests as r
import json


es_template = {
    "query": "select * from eg_flight"
}

es_link = "http://192.168.1.71:9200/_xpack/sql"
headers = {'Content-type': 'application/json'}


if __name__ == "__main__":

    load = r.post(es_link, data=json.dumps(es_template), headers=headers)
    if load.status_code == 200:
        load = load.json()
        #do something with it
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/54451452

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档