首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何在工作流模板火花作业中传递参数

如何在工作流模板火花作业中传递参数
EN

Stack Overflow用户
提问于 2021-08-24 16:49:22
回答 1查看 818关注 0票数 3

我的spark dataproc工作流有问题。

这在发射时起作用:

代码语言:javascript
复制
gcloud dataproc jobs submit spark \
--project myproject \
--cluster=mycluster \
--region=europe-west3 \
--jars=gs:path\file.jar,gs://path//depende.jar \
--class=it.flow \
--properties spark.num.executors=2,spark.executor.cores=3,spark.executor.memory=5g,spark.driver.cores=2,spark.driver.memory=10g,spark.dynamicAllocation.enabled=false,spark.executor.userClassPathFirst=true,spark.driver.userClassPathFirst=true,spark.jars.packages=com.google.cloud:google-cloud-logging:2.2.0  
--  20210820 010000 000 0 000 TRY

我创建了一个dataproc工作流和python代码,通过composer启动它,它可以工作。

现在,我必须使最终参数动态(-- 20210820 010000 000 0 000 TRY)。

但是,我无法将参数传递给工作流:

代码语言:javascript
复制
gcloud dataproc workflow-templates create try1 --region=europe-west3
 
gcloud dataproc workflow-templates add-job spark \
--workflow-template=try1 \
--step-id=create_try1 \
--class=it.flow \
 --region=europe-west3 \
--jars=gs:path\file.jar,gs://path//depende.jar \
 --properties spark.num.executors=2,spark.executor.cores=3,spark.executor.memory=5g,spark.driver.cores=2,spark.driver.memory=10g,spark.dynamicAllocation.enabled=false,spark.executor.userClassPathFirst=true,spark.driver.userClassPathFirst=true,spark.jars.packages=com.google.cloud:google-cloud-logging:2.2.0 \
 -- $arg1 $arg2  
 
gcloud dataproc workflow-templates set-cluster-selector TRY1  --region=europe-west3 --cluster-labels=goog-dataproc-cluster-name=cluster

这一呼吁:

代码语言:javascript
复制
gcloud dataproc workflow-templates instantiate TRY1  --region=europe-west3 --parameters="arg1=20210820"

导致以下错误:

(gcloud.dataproc.workflow-templates.instantiate) 错误: INVALID_ARGUMENT: INVALID_ARGUMENT不包含名为arg1.的参数

我如何解决这个问题?

yaml文件

代码语言:javascript
复制
id: create_file
jobs:
- sparkJob:
    args:
    - ARG1
    - ARG2
    jarFileUris:
    - gs://mybucket/try_file.jar
    - gs://mybucket/try_dependencies_2.jar
    mainClass: org.apache.hadoop.examples.tryFile
    properties:
      spark.driver.cores: '2'
      spark.driver.memory: 10g
      spark.driver.userClassPathFirst: 'true'
      spark.dynamicAllocation.enabled: 'false'
      spark.executor.cores: '3'
      spark.executor.memory: 5g
      spark.executor.userClassPathFirst: 'true'
      spark.jars.packages: com.google.cloud:google-cloud-logging:2.2.0
      spark.num.executors: '2'
  stepId: create_file_try
  parameters:
- name: ARG1
  fields:
  - jobs['create_file_try'].sparkJob.args[0]
- name: ARG2
  fields:
  - jobs['create_file_try'].sparkJob.args[1]
name: projects/My-project-id/regions/europe-west3/workflowTemplates/create_file
updateTime: '2021-08-25T07:49:59.251096Z'
EN

回答 1

Stack Overflow用户

发布于 2021-08-25 03:42:25

对于要接受参数的工作流模板,最好使用yaml文件。运行完整命令gcloud dataproc workflow-templates add-job spark时,可以获得yaml文件。它将在CLI上返回yaml配置。

在本例中,我只使用了Dataproc文档中的示例代码,并在--properties上使用了您的值,以便进行测试。

注意:我在这个示例的yaml文件中使用了一个虚拟project-id。确保您使用了实际的project-id,这样您就不会遇到任何问题。

示例命令:

代码语言:javascript
复制
gcloud dataproc workflow-templates add-job spark \
--workflow-template=try1 \
--step-id=create_try1 \
--class=org.apache.hadoop.examples.WordCount \
--region=europe-west3 \
--jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
--properties spark.num.executors=2,spark.executor.cores=3,spark.executor.memory=5g,spark.driver.cores=2,spark.driver.memory=10g,spark.dynamicAllocation.enabled=false,spark.executor.userClassPathFirst=true,spark.driver.userClassPathFirst=true,spark.jars.packages=com.google.cloud:google-cloud-logging:2.2.0 \
-- ARG1 ARG2  

CLI输出(yaml配置):

代码语言:javascript
复制
id: try1
jobs:
- sparkJob:
    args:
    - ARG1
    - ARG2
    jarFileUris:
    - file:///usr/lib/spark/examples/jars/spark-examples.jar
    mainClass: org.apache.hadoop.examples.WordCount
    properties:
      spark.driver.cores: '2'
      spark.driver.memory: 10g
      spark.driver.userClassPathFirst: 'true'
      spark.dynamicAllocation.enabled: 'false'
      spark.executor.cores: '3'
      spark.executor.memory: 5g
      spark.executor.userClassPathFirst: 'true'
      spark.jars.packages: com.google.cloud:google-cloud-logging:2.2.0
      spark.num.executors: '2'
  stepId: create_try1
name: projects/your-project-id/regions/europe-west3/workflowTemplates/try1
placement:
  managedCluster:
    clusterName: mycluster
updateTime: '2021-08-25T03:30:47.365244Z'
version: 3

复制生成的yaml配置,打开文本编辑器并添加parameters:字段。它将包含您要接受的论点。

代码语言:javascript
复制
parameters:
- name: ARG1
  fields:
  - jobs['create_try1'].sparkJob.args[0] # use the stepId in jobs[], in this example it is 'create_try1'
- name: ARG2
  fields:
  - jobs['create_try1'].sparkJob.args[1]

在本例中,我将其放在stepId:之后。

编辑的yaml配置:

代码语言:javascript
复制
id: try1
jobs:
- sparkJob:
    args:
    - ARG1
    - ARG2
    jarFileUris:
    - file:///usr/lib/spark/examples/jars/spark-examples.jar
    mainClass: org.apache.hadoop.examples.WordCount
    properties:
      spark.driver.cores: '2'
      spark.driver.memory: 10g
      spark.driver.userClassPathFirst: 'true'
      spark.dynamicAllocation.enabled: 'false'
      spark.executor.cores: '3'
      spark.executor.memory: 5g
      spark.executor.userClassPathFirst: 'true'
      spark.jars.packages: com.google.cloud:google-cloud-logging:2.2.0
      spark.num.executors: '2'
  stepId: create_try1
parameters:
- name: ARG1
  fields:
  - jobs['create_try1'].sparkJob.args[0]
- name: ARG2
  fields:
  - jobs['create_try1'].sparkJob.args[1]
name: projects/your-project-id/regions/europe-west3/workflowTemplates/try1
placement:
  managedCluster:
    clusterName: mycluster
updateTime: '2021-08-25T03:13:25.014685Z'
version: 3

使用已编辑的yaml文件覆盖工作流模板:

代码语言:javascript
复制
gcloud dataproc workflow-templates import try1 \
    --region=europe-west3 \
    --source=config.yaml

使用gcloud dataproc workflow-templates instantiate运行模板

有关更多细节,您可以参考工作流模板的参数化

票数 5
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/68911200

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档