首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >是否为不同查询创建新的SparkSession?

是否为不同查询创建新的SparkSession?
EN

Stack Overflow用户
提问于 2019-08-21 10:43:02
回答 1查看 289关注 0票数 0

我想从elasticsearch获取两个数据

一个是用查询过滤的,另一个没有过滤器。

代码语言:javascript
复制
 // with query
 session = get_spark_session(query=query)

 df = session.read.option(
     "es.resource", "analytics-prod-2019.08.02"
 ).format("org.elasticsearch.spark.sql").load()


 df.show() // empty result

 // without query
 session = get_spark_session()

 df = session.read.option(
     "es.resource", "analytics-prod-2019.08.02"
 ).format("org.elasticsearch.spark.sql").load()

 df.show() // empty result


 def get_spark_session(query=None, excludes=[]):

     conf = pyspark.SparkConf()
     conf.set("spark.driver.allowMultipleContexts", "true")
     conf.set("es.index.auto.create", "true")
     conf.set("es.nodes.discovery", "true")
     conf.set("es.scroll.size", 10000)
     conf.set("es.read.field.exclude", excludes)
     conf.set("spark.driver.extraClassPath", "/usr/local/elasticsearch-hadoop/dist/elasticsearch-spark-20_2.11-6.6.2.jar")
     if query:
         conf.set("es.query", query)


     sc = SparkSession.builder.config(conf=conf).getOrCreate()

     return sc 

问题是会话是否被重用。

当我第一次运行filtered查询,第二次运行non-filtered查询时,两者都给出了空结果

但是当我第一次运行non-filtered查询时,它显示了一些结果,随后的filtered查询显示空结果。

代码语言:javascript
复制
 // below, I reverse the order
 // without query
 session = get_spark_session()

 df = session.read.option(
     "es.resource", "analytics-prod-2019.08.02"
 ).format("org.elasticsearch.spark.sql").load()

 df.show() // some result

 // with query
 session = get_spark_session(query=query)

 df = session.read.option(
     "es.resource", "analytics-prod-2019.08.02"
 ).format("org.elasticsearch.spark.sql").load()


 df.show() // empty result

**编辑

这样我就可以用下面的代码得到想要的结果:

代码语言:javascript
复制
def get_spark_session(query=None, excludes=[]):

    conf = pyspark.SparkConf()
    conf.set("spark.driver.allowMultipleContexts", "true")
    conf.set("es.index.auto.create", "true")
    conf.set("es.nodes.discovery", "true")
    conf.set("es.scroll.size", 10000)
    conf.set("es.read.field.exclude", excludes)
    conf.set("spark.driver.extraClassPath", "/usr/local/elasticsearch-hadoop/dist/elasticsearch-spark-20_2.11-6.6.2.jar")
    if query:
        conf.set("es.query", query)
    else:
        conf.set("es.query", "") # unset the query 
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-08-21 11:22:23

SparkSession.builder获取现有的SparkSession,或者,如果没有现有的,则基于此构建器中设置的选项创建一个新的构建器。在您的例子中,spark配置正在被重用。从配置中删除"es.query“应该可以解决这个问题:

代码语言:javascript
复制
def get_spark_session(query=None, excludes=[]):
     conf = pyspark.SparkConf()
     conf.unset("es.query")
     conf.set("spark.driver.allowMultipleContexts", "true")
     conf.set("es.index.auto.create", "true")
     conf.set("es.nodes.discovery", "true")
     conf.set("es.scroll.size", 10000)
     conf.set("es.read.field.exclude", excludes)
     conf.set("spark.driver.extraClassPath", "/usr/local/elasticsearch-hadoop/dist/elasticsearch-spark-20_2.11-6.6.2.jar")
     if query:
         conf.set("es.query", query)    

     sc = SparkSession.builder.config(conf=conf).getOrCreate()    
     return sc 
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/57583792

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档