首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >将Spark与Elasticsearch集成

将Spark与Elasticsearch集成
EN

Stack Overflow用户
提问于 2020-05-20 15:07:37
回答 1查看 463关注 0票数 0

我正在尝试将sparkdataframe发送到Elasticsearch集群。我有Spark dataframe(df)。

我创建了index = "spark“,但是,当我运行这个命令时:

代码语言:javascript
复制
   df.write
     .format("org.elasticsearch.spark.sql")
     .option("es.nodes.wan.only","true")
     .option("es.port","9092")
     .option("es.net.ssl","true")
     .option("es.nodes", "localhost")
     .save("spark/docs")

我遇到了这个错误:

代码语言:javascript
复制
py4j.protocol.Py4JJavaError: An error occurred while calling o144.save.
: java.lang.NoClassDefFoundError: scala/Product$class

Spark版本: spark-3.0.0-bin-hadoop2.7

Elasticsearch版本: elasticsearch-7.7.0

添加依赖项: elasticsearch-hadoop-7.7.0.jar

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-05-20 16:00:52

我相信你应该在写的时候指定es.resource,格式可以指定为es。下面的代码适用于我在Spark 2.4.5 (运行在docker上)和ES版本7.5.1上。首先,确保您正在使用以下软件包运行pyspark

代码语言:javascript
复制
PYSPARK_SUBMIT_ARGS --packages org.elasticsearch:elasticsearch-hadoop:7.5.1 pyspark-shell

在PySpark端,例如在笔记本中:

代码语言:javascript
复制
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql import SparkSession

conf = SparkConf()
conf.setMaster("local").setAppName("ES Test")
conf.set("es.index.auto.create", "true")
conf.set("es.nodes", "elasticsearch")  # name of my docker container, you might keep localhost
conf.set("es.port", "9200")

sc = SparkContext(conf=conf)
spark = SparkSession(sc)

colnames = [('col_' + str(i+1)) for i in range(11)]
df1 = spark._sc.parallelize([
  [it for it in range(11)], 
  [it for it in range(1,12)]]
).toDF((colnames))

(
  df1
  .write
  .format('es')
  .option(
    'es.resource', '%s/%s' % ('<resource_name>', '<table_name>'))
  .save()
)

Additonal验证这是使用elasticsearch Python包编写的:

代码语言:javascript
复制
from elasticsearch import Elasticsearch
esclient = Elasticsearch(['elasticsearch:9200'])


response = esclient.search(
    index='<resource_name>*',
    body={
        "query": {
            "match": {
                "col1": 1
            }
        },
        "aggs": {
            "test_agg": {
                "terms": {
                    "field": "col1",
                    "size": 10
                }
            }
        }
    }
)
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/61907055

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档