首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >利用气流EMR算子无法从emr集群连接雪花

利用气流EMR算子无法从emr集群连接雪花
EN

Stack Overflow用户
提问于 2021-02-20 20:17:35
回答 1查看 429关注 0票数 0

我正在尝试连接到由气流EMR操作员启动的EMR集群中的雪花,但我得到了以下错误

py4j.protocol.Py4JJavaError:调用o147.load时出错。::java.lang.ClassNotFoundException:未能找到数据源: net.snowflake.spark.snowflake。请在http://spark.apache.org/third-party-projects.html找到包裹

以下是我要添加到EMRaddsteps操作符中以运行脚本load_updates.py的步骤,并在"Args“中描述我的雪花包。

代码语言:javascript
复制
STEPS = [
    {
        "Name" : "convo_facts",
        "ActionOnFailure" : "TERMINATE_CLUSTER",
        "HadoopJarStep" : {
            "Jar" : "command-runner.jar",
            "Args" : ["spark-submit", "s3://dev-data-lake/spark_files/cf/load_updates.py", \
                      "--packages net.snowflake:snowflake-jdbc:3.8.0,net.snowflake:spark-snowflake_2.11:2.4.14-spark_2.4", \
                      "INPUT=s3://dev-data-lake/table_exports/public/", \
                      "OUTPUT=s3://dev-data-lake/emr_output/cf/"]
        }
    }
]

JOB_FLOW_OVERRIDES = {
    'Name' : 'cftest',
    'LogUri' : 's3://dev-data-lake/emr_logs/cf/log.txt',
    'ReleaseLabel' : 'emr-5.32.0',
    'Instances' : {
        'InstanceGroups' : [
            {
                'Name' : 'Master nodes',
                'Market' : 'ON_DEMAND',
                'InstanceRole' : 'MASTER',
                'InstanceType' : 'r6g.4xlarge',
                'InstanceCount' : 1,
            },
            {
                'Name' : 'Slave nodes',
                'Market' : 'ON_DEMAND',
                'InstanceRole' : 'CORE',
                'InstanceType' : 'r6g.4xlarge',
                'InstanceCount' : 3,
            }
        ],
        'KeepJobFlowAliveWhenNoSteps' : True,
        'TerminationProtected' : False
    },
    'Applications' : [{
        'Name' : 'Spark'
    }],
    'JobFlowRole' : 'EMR_EC2_DefaultRole',
    'ServiceRole' : 'EMR_DefaultRole'
}

而且,这就是我如何在我的load_updates.py脚本中添加雪花片,以提取到pyspark中。

代码语言:javascript
复制
# Set options below
sfOptions = {
  "sfURL" : "xxxx.us-east-1.snowflakecomputing.com",
  "sfUser" : "user",
  "sfPassword" : "xxxx",
  "sfDatabase" : "",
  "sfSchema" : "PUBLIC",
  "sfWarehouse" : ""
}

SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

query_sql = """select * from cf""";

messages_new = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
  .options(**sfOptions) \
  .option("query", query_sql) \
  .load()

不知道我是不是漏掉了什么或者哪里做错了什么。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-02-20 20:53:45

选项--package应该放在s3://.../load_updates.py之前,放在submit命令中。否则,它将被视为应用程序参数。

试着用这个:

代码语言:javascript
复制
STEPS = [
    {
        "Name": "convo_facts",
        "ActionOnFailure": "TERMINATE_CLUSTER",
        "HadoopJarStep": {
            "Jar": "command-runner.jar",
            "Args": [
                "spark-submit",
                "--packages",
                "net.snowflake:snowflake-jdbc:3.8.0,net.snowflake:spark-snowflake_2.11:2.4.14-spark_2.4",
                "s3://dev-data-lake/spark_files/cf/load_updates.py",
                "INPUT=s3://dev-data-lake/table_exports/public/",
                "OUTPUT=s3://dev-data-lake/emr_output/cf/"
            ]
        }
    }
]
票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/66296139

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档