使用指定的spark连接器org.elasticsearch:elasticsearch-spark-20_2.11:6.3.2从Elasticsearch v6.2读取spark的速度慢得可怕。这来自一个带有索引的3节点ES集群:
curl https://server/_cat/indices?v
green open db MmVwAwYfTz4eE_L-tncbwQ 5 1 199983131 9974871 105.1gb 51.8gb在(10个节点、1tb内存、>50个VCPU) spark群集上读取:
val query = """{
"query": {
"match_all": {}
}
}"""
val df = spark.read
.format("org.elasticsearch.spark.sql")
.option("es.nodes","server")
.option("es.port", "443")
.option("es.net.ssl","true")
.option("es.nodes.wan.only","true")
.option("es.input.use.sliced.partitions", "false")
.option("es.scroll.size", "1000")
.option("es.read.field.include", "f1,f2,f3")
.option("es.query",query)
.load("db")
df.take(1)这花了10分钟来执行。

这是(缓慢的)工作方式,还是我做错了什么?
发布于 2021-05-16 19:01:05
这并不是它应该有多慢,答案可以在你分享的截图中找到:
Spark UI中的列Stages: Succeeded/Total只显示了一个运行读取操作的任务,我不认为这是您所期望的,否则,拥有整个集群的意义何在。
我也遇到过同样的问题,我花了一段时间才弄清楚Spark将一个任务(分区)与Elasticsearch索引中的每个分片相关联。
这就是我们的答案,为了更快地进行,我们应该并行处理,如何做到这一点?通过将源索引分布到多个分片中。
默认情况下,Elasticsearch会创建一个带有一个分片的索引,但也可以对其进行个性化设置,如下所示:
PUT /index-name
{
"settings": {
"index": {
"number_of_shards": x,
"number_of_replicas": xx
}
}
}分片的数量可以高于弹性节点的数量,这对Spark是完全透明的。如果索引已经存在,请尝试创建新的inex,然后使用Elasticsearch Reindex API
我希望这解决了你的问题。
https://stackoverflow.com/questions/51856184
复制相似问题