首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >气流emrAddStepsOperator无法执行火花遮阳罐

气流emrAddStepsOperator无法执行火花遮阳罐
EN

Stack Overflow用户
提问于 2022-06-08 09:26:39
回答 1查看 454关注 0票数 0

火花应用程序应该采取什么步骤类型..。我面临的问题是,主型号不能设定或无法识别纱线.在使用emrAddStepsOperator时,似乎是将应用程序视为简单的jar而不是火花提交模式。请找到附加的气流数据,错误和电子病历屏幕截图。

amazon云控制台手动添加火花作业作为步骤

在添加火花罐类型步骤后,而不是自定义jar步骤。分别给出了火花提交args和主方法args的选项。

step类型可以是流,也可以是星火应用程序或自定义jar。

我的错误信息:

代码语言:javascript
复制
> Exception in thread "main" org.apache.spark.SparkException: A master URL must be set in your configuration
    at org.apache.spark.SparkContext.<init>(SparkContext.scala:385)
    at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2574)
    at org.apache.spark.sql.SparkSession$Builder.$anonfun$getOrCreate$2(SparkSession.scala:934)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:928)
    at com.mcf.ExtractcustomerCategoryWiseSummarizedViews$.main(ExtractcustomerCategoryWiseSummarizedViews.scala:13)
    at com.mcf.ExtractcustomerCategoryWiseSummarizedViews.main(ExtractcustomerCategoryWiseSummarizedViews.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.util.RunJar.run(RunJar.java:323)
    at org.apache.hadoop.util.RunJar.main(RunJar.java:236)

这是AWS EMR管道的一个示例。

从创建集群开始,添加步骤/操作,检查步骤,最后结束集群。

代码语言:javascript
复制
import time    
from airflow.operators.python import PythonOperator     
from datetime import timedelta    

from airflow import DAG    
from airflow.providers.amazon.aws.operators.emr_add_steps import EmrAddStepsOperator    
from airflow.providers.amazon.aws.operators.emr_create_job_flow import EmrCreateJobFlowOperator    
from airflow.providers.amazon.aws.operators.emr_terminate_job_flow import EmrTerminateJobFlowOperator     
from airflow.providers.amazon.aws.sensors.emr_step import EmrStepSensor    
from airflow.utils.dates import days_ago    

SPARK_STEPS = [    
    {    
        'Name': 'PerformETL',    
        'ActionOnFailure': 'CONTINUE',    
        'HadoopJarStep': {
            'Jar': 's3://mydata/jars/open-0.0.1-SNAPSHOT.jar',    
            #'MainClass': 'com.sadim.main',    
            'Args': ['spark-submit',    
                    '--deploy-mode',    
                    'cluster',    
                    '--master',    
                    'yarn',    
                    '--class',    
                    'com.sadim.ExtractcustomerCategoryWiseSummarizedViews',     
                    '--mode',    
                'DeltaLoadByDays',    
                '--noOfDaysBehindTodayForDeltaLoad',    
                '1',    
                '--s3InputPath',    
                's3://data-lake/documents/accountscore/categoriseddata/',    
                '--s3OutputPathcustomerCategoryWiseSummarizedViews',    
                's3://test-data/spark_output//customerCategoryWiseSummarizedViews//test'],    
        },
    }
]
SPARK_STEPS2 = [
    {
        'Name': 'sadim_test3',
        'ActionOnFailure': 'CONTINUE',
        'HadoopJarStep': {
            'Jar': 's3://test-data/jars/scalatestnadeem-0.0.1-SNAPSHOT_v2.jar',
            'MainClass': 'com.sadim.scalatestnadeem.Test',
            'Args': ['spark-submit',    
                '--deploy-mode',    
                'client',    
                '--master',    
                'yarn',    
                '--conf',    
                'spark.yarn.submit.waitAppCompletion=true'],    
        },    
    }    
]    
SPARK_STEPS3 = [    
    {    
        'Name': 'sadim_test3',    
        'ActionOnFailure': 'CONTINUE',    
        'HadoopJarStep': {    
            'Jar': 's3://mydata/jars/open-0.0.1-SNAPSHOT_masteryarnwithoutdependencyandtest.jar',    
            'MainClass': 'com.sadim.TestSadim',    
            'Args': ['spark-submit',     
                '--deploy-mode',    
                'client',     
                '--master',     
                'yarn',    
                '--conf',    
                'spark.yarn.submit.waitAppCompletion=true'],    
        },    
    }    
]    
SPARK_STEPS4 = [    
    {    
        'Name': 'PerformETL',    
        'ActionOnFailure': 'CONTINUE',    
        'HadoopJarStep': {    
            'Jar': 's3://mydata/jars/open-0.0.1-SNAPSHOT.jar',    
            #'MainClass': 'com.sadim.ExtractcustomerCategoryWiseSummarizedViews',    
            'Args': ['com.sadim.ExtractcustomerCategoryWiseSummarizedViews',    
                'spark-submit',    
                '--deploy-mode',    
                'client',    
                 '--master',    
                 'yarn',                    
                    '--mode',    
                'DeltaLoadByDays',    
                '--noOfDaysBehindTodayForDeltaLoad',    
                '1',    
                '--s3InputPath',    
                's3://data-lake/documents/accountscore/categoriseddata/',    
                '--s3OutputPathcustomerCategoryWiseSummarizedViews',    
                's3://test-data/spark_output//customerCategoryWiseSummarizedViews//test'],    
        },    
    }    
]    
SPARK_STEPS5 = [    
    {    
        'Name': 'PerformETL',    
        'ActionOnFailure': 'CONTINUE',    
        'HadoopJarStep': {    
            'Jar': 's3://mydata/jars/open-0.0.1-SNAPSHOT.jar',    
            #'MainClass': 'com.sadim.main',    
            'Args': ['com.sadim.ExtractcustomerCategoryWiseSummarizedViews',    
                    '--mode',    
                'DeltaLoadByDays',    
                '--noOfDaysBehindTodayForDeltaLoad',    
                '1',    
                '--s3InputPath',    
                's3://data-lake/documents/accountscore/categoriseddata/',    
                '--s3OutputPathcustomerCategoryWiseSummarizedViews',    
                's3://test-data/spark_output//customerCategoryWiseSummarizedViews//test'],    
        },    
    }    
]    
JOB_FLOW_OVERRIDES = {    
    'Name': 'ob_emr_airflow_automation',    
    'ReleaseLabel': 'emr-6.6.0',    
    'LogUri': 's3://test-data/emr_logs/',    
    'Instances': {    
        'InstanceGroups': [    
            {    
                'Name': 'Master node',    
                'Market': 'ON_DEMAND',    
                'InstanceRole': 'MASTER',    
                'InstanceType': 'm5.xlarge',    
                'InstanceCount': 1    
            },    
            {    
                    'Name': "Slave nodes",    
                    'Market': 'ON_DEMAND',
                    'InstanceRole': 'CORE',    
                    'InstanceType': 'm5.xlarge',    
                    'InstanceCount': 1    
            }    
        ],    
        'Ec2SubnetId': 'subnet-03129248888a14196',    
        'Ec2KeyName': 'datalake-emr-nodes',    
        'KeepJobFlowAliveWhenNoSteps': True,    
        'TerminationProtected': False    
    },    
    'BootstrapActions': [    
                {    
                        'Name': 'Java11InstallBootstrap',    
                        'ScriptBootstrapAction': {    
                            'Path': 's3://test-data/jars/bootstrap.sh',    
                            'Args': [    
                            ]    
                        }    
                }    
    ],    
    'Configurations': [    
        {    
            "Classification":"spark-defaults",    
                    "Properties":{    
                    "spark.driver.defaultJavaOptions":"-XX:OnOutOfMemoryError='kill -9 %p' -    XX:MaxHeapFreeRatio=70",    
                    "spark.executor.defaultJavaOptions":"-verbose:gc -Xlog:gc*::time -    XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p' -XX:MaxHeapFreeRatio=70 -XX:+IgnoreUnrecognizedVMOptions"
                                }    
        }    
    ],    
    'JobFlowRole': 'DL_EMR_EC2_DefaultRole',    
    'ServiceRole': 'EMR_DefaultRole',    
}    

with DAG(    
    dag_id='emr_job_flow_manual_steps_dag_v6',    
    default_args={    
        'owner': 'airflow',    
        'depends_on_past': False,    
        'email': ['sadim.nadeem@sadim.com'],    
        'email_on_failure': False,    
        'email_on_retry': False,    
    },    
    dagrun_timeout=timedelta(hours=1),    
    start_date=days_ago(1),
    schedule_interval='0 3 * * *',
    tags=['example'],
) as dag:

    # [START howto_operator_emr_manual_steps_tasks]
    cluster_creator = EmrCreateJobFlowOperator(
        task_id='create_job_flow',
        job_flow_overrides=JOB_FLOW_OVERRIDES,
        aws_conn_id='aws_default',
        emr_conn_id='emr_default',
    )

    delay_python_task: PythonOperator = PythonOperator(task_id="delay_python_task",
                                                   dag=dag,
                                                   python_callable=lambda: time.sleep(400))

    step_adder = EmrAddStepsOperator(
        task_id='add_steps',
        job_flow_id=cluster_creator.output,
        aws_conn_id='aws_default',
        steps=SPARK_STEPS5,
    )

    step_checker = EmrStepSensor(
        task_id='watch_step',
        job_flow_id=cluster_creator.output,
        step_id="{{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')[0] }}",
       aws_conn_id='aws_default',
    )

    cluster_remover = EmrTerminateJobFlowOperator(
        task_id='remove_cluster',
        job_flow_id=cluster_creator.output,
        aws_conn_id='aws_default',
    )

    cluster_creator >> step_adder >> step_checker >> cluster_remover

    # [END howto_operator_emr_manual_steps_tasks]

    # Task dependencies created via `XComArgs`:
    #   cluster_creator >> step_checker
    #   cluster_creator >> cluster_remover
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-06-10 08:19:19

问题解决了。我们需要在emrAddStepsOperator中使用命令runner作为jar选项,并将您的特定ETL作业jar传递到args中

代码语言:javascript
复制
SPARK_STEPS = [
    {
        'Name': 'DetectAndLoadOrphanTransactions',
        'ActionOnFailure': 'CONTINUE',
        'HadoopJarStep': {
            'Jar': 'command-runner.jar',
            #'MainClass': 'com.sadim.main',
            'Args': [ 
            'spark-submit',
            '--deploy-mode',
                'cluster',
                 '--master',
                 'yarn',
                 '--class',
                    'com.sadim.DetectAndLoadOrphanTransactions',
                 's3://nadeem-test-data/jars/open-0.0.1-SNAPSHOT_yarn_yarndep_maininpom.jar',
                '--Txnspath',
                's3://nadeem-test-data/spark_output//CategorizedTxnsDataset//2022_load_v2',
                '--demographyPath',
                's3://nadeem-test-data/spark_output//customerDemographics//2022_load_v2'
                '--outPath',
                's3://nadeem-test-data/spark_output//orphan_ihub_ids//2022_load_v2'],
        },
    }
]
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/72543252

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档