首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何使用sparkSession在数据采集卡中使用火花卡桑德拉连接器写火花放电

如何使用sparkSession在数据采集卡中使用火花卡桑德拉连接器写火花放电
EN

Stack Overflow用户
提问于 2020-06-10 16:29:43
回答 1查看 1.4K关注 0票数 1

我使用的是卡桑德拉数据库( cassandra )中的spark-cassandra-connector_2.11-2.3.0.jar吡火花。我正在从一个键空间读取数据,并将其写入另一个不同的密钥空间。这两个密钥空间有不同的用户名和密码。

我使用以下方法创建了sparkSession:

代码语言:javascript
复制
spark_session = None

def set_up_spark(sparkconf,config):
    """
    sets up spark configuration and create a session
    :return: None
    """
    try:
        logger.info("spark conf set up Started")
        global spark_session
        spark_conf = SparkConf()
        for key, val in sparkconf.items():
            spark_conf.set(key, val)
        spark_session = SparkSession.builder.config(conf=spark_conf).getOrCreate()
        logger.info("spark conf set up Completed")
    except Exception as e:
        raise e

我使用这个sparkSession将数据读取为:

代码语言:javascript
复制
table_df = spark_session.read \
            .format("org.apache.spark.sql.cassandra") \
            .options(table=table_name, keyspace=keyspace_name) \
            .load()

我能够使用上面的会话读取数据。spark_session附加到上面的查询。

现在我需要创建另一个会话,因为写表的凭据是不同的。我有这样的写查询:

代码语言:javascript
复制
table_df.write \
            .format("org.apache.spark.sql.cassandra") \
            .options(table=table_name, keyspace=keyspace_name) \
            .mode("append") \
            .save()

我找不到如何在cassandra中为上面的写操作附加一个新的sparkSession。

如何用火花卡桑德拉连接器连接一个新的SparkSession,以便在火花放电中写入操作?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-06-10 17:02:28

您可以简单地将该信息作为选项传递给特定的readwrite操作,其中包括:spark.cassandra.connection.host

请注意,您需要将这些选项放入字典中,并传递此字典,而不是直接传递,如文档中所述。

代码语言:javascript
复制
read_options = { "table": "..", "keyspace": "..", 
  "spark.cassandra.connection.host": "IP1", 
  "spark.cassandra.auth.username": "username1", 
  "spark.cassandra.auth.password":"password1"}
table_df = spark_session.read \
            .format("org.apache.spark.sql.cassandra") \
            .options(**read_options) \
            .load()

write_options = { "table": "..", "keyspace": "..", 
  "spark.cassandra.connection.host": "IP2", 
  "spark.cassandra.auth.username": "username2", 
  "spark.cassandra.auth.password":"password1"}
table_df.write \
            .format("org.apache.spark.sql.cassandra") \
            .options(**write_options) \
            .mode("append") \
            .save()
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/62308511

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档