我正在尝试将sparkdataframe发送到Elasticsearch集群。我有Spark dataframe(df)。
我创建了index = "spark“,但是,当我运行这个命令时:
df.write
.format("org.elasticsearch.spark.sql")
.option("es.nodes.wan.only","true")
.option("es.port","9092")
.option("es.net.ssl","true")
.option("es.nodes", "localhost")
.save("spark/docs")我遇到了这个错误:
py4j.protocol.Py4JJavaError: An error occurred while calling o144.save.
: java.lang.NoClassDefFoundError: scala/Product$classSpark版本: spark-3.0.0-bin-hadoop2.7
Elasticsearch版本: elasticsearch-7.7.0
添加依赖项: elasticsearch-hadoop-7.7.0.jar
发布于 2020-05-20 16:00:52
我相信你应该在写的时候指定es.resource,格式可以指定为es。下面的代码适用于我在Spark 2.4.5 (运行在docker上)和ES版本7.5.1上。首先,确保您正在使用以下软件包运行pyspark:
PYSPARK_SUBMIT_ARGS --packages org.elasticsearch:elasticsearch-hadoop:7.5.1 pyspark-shell在PySpark端,例如在笔记本中:
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
conf = SparkConf()
conf.setMaster("local").setAppName("ES Test")
conf.set("es.index.auto.create", "true")
conf.set("es.nodes", "elasticsearch") # name of my docker container, you might keep localhost
conf.set("es.port", "9200")
sc = SparkContext(conf=conf)
spark = SparkSession(sc)
colnames = [('col_' + str(i+1)) for i in range(11)]
df1 = spark._sc.parallelize([
[it for it in range(11)],
[it for it in range(1,12)]]
).toDF((colnames))
(
df1
.write
.format('es')
.option(
'es.resource', '%s/%s' % ('<resource_name>', '<table_name>'))
.save()
)Additonal验证这是使用elasticsearch Python包编写的:
from elasticsearch import Elasticsearch
esclient = Elasticsearch(['elasticsearch:9200'])
response = esclient.search(
index='<resource_name>*',
body={
"query": {
"match": {
"col1": 1
}
},
"aggs": {
"test_agg": {
"terms": {
"field": "col1",
"size": 10
}
}
}
}
)https://stackoverflow.com/questions/61907055
复制相似问题