首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何在Kubernetes的Flink集群上运行Apache Beam Python管道?

如何在Kubernetes的Flink集群上运行Apache Beam Python管道?
EN

Stack Overflow用户
提问于 2020-09-29 00:53:23
回答 1查看 689关注 0票数 0

尝试按照Flink Kubernetes说明here在minikube上运行word count example,但作业从未完成。Python Beam SDK worker pooler似乎不做任何工作。

除了配置Flink Kubernetes集群的说明之外,我还在taskmanager部署中添加了一个Python SDK工作池。如果我理解正确的话,worker池的目的是执行管道的Python部分。请参阅完整的k8s部署。

代码语言:javascript
复制
apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-taskmanager
  namespace: flink-test
spec:
  replicas: 1
  selector:
    matchLabels:
      app: flink
      component: taskmanager
  template:
    metadata:
      labels:
        app: flink
        component: taskmanager
    spec:
      containers:
      - name: taskmanager
        image: flink:1.10.2-scala_2.11
        workingDir: /opt/flink
        command: ["/bin/bash", "-c", "$FLINK_HOME/bin/taskmanager.sh start; \
          while :;
          do
            if [[ -f $(find log -name '*taskmanager*.log' -print -quit) ]];
              then tail -f -n +1 log/*taskmanager*.log;
            fi;
          done"]
        ports:
        - containerPort: 6122
          name: rpc
        livenessProbe:
          tcpSocket:
            port: 6122
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
        - name: flink-config-volume
          mountPath: /opt/flink/conf/
        securityContext:
          runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
      - name: beam-worker-pool
        image: apache/beam_python3.7_sdk:2.24.0
        args: ["--worker_pool"]
        ports:
        - containerPort: 50000
          name: pool
        livenessProbe:
          tcpSocket:
            port: 50000
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
        - name: flink-config-volume
          mountPath: /opt/flink/conf/
        securityContext:
          runAsUser: 9999
      volumes:
      - name: flink-config-volume
        configMap:
          name: flink-config
          items:
          - key: flink-conf.yaml
            path: flink-conf.yaml
          - key: log4j.properties
            path: log4j.properties

我按如下方式运行该示例:

代码语言:javascript
复制
python -m apache_beam.examples.wordcount \
--output /tmp/results/count \
--runner FlinkRunner \
--flink_master=localhost:8081 \
--environment_type=EXTERNAL \
--environment_config=localhost:50000

我使用了https://beam.apache.org/documentation/runtime/sdk-harness-config/上的文档来设置environment_typeenvironment_config的值。

作业被添加到作业管理器中,我可以在Flink UI中查看它,但作业永远不会完成。我开始查看容器日志,注意到started worker-pool有以下日志:

代码语言:javascript
复制
Starting worker with command ['/opt/apache/beam/boot', '--id=1-1', '--logging_endpoint=localhost:46005', '--artifact_endpoint=localhost:43851', '--provision_endpoint=localhost:37079', '--control_endpoint=localhost:37961']
2020/09/28 16:44:00 Provision info:
pipeline_options:<fields: fields: > fields: > fields: > fields: > > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > > > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > > logging_endpoint: artifact_endpoint: control_endpoint: dependencies: 
2020/09/28 16:44:00 Initializing python harness: /opt/apache/beam/boot --id=1-1 --logging_endpoint=localhost:46005 --artifact_endpoint=localhost:43851 --provision_endpoint=localhost:37079 --control_endpoint=localhost:37961
2020/09/28 16:44:08 Failed to retrieve staged files: failed to retrieve /tmp/staged in 3 attempts: failed to retrieve chunk for /tmp/staged/pickled_main_session
    caused by:
rpc error: code = Unknown desc = ; failed to retrieve chunk for /tmp/staged/pickled_main_session
    caused by:
rpc error: code = Unknown desc = ; failed to retrieve chunk for /tmp/staged/pickled_main_session
    caused by:
rpc error: code = Unknown desc = ; failed to retrieve chunk for /tmp/staged/pickled_main_session
    caused by:
rpc error: code = Unknown desc = 

同样,任务管理器也在记录:

代码语言:javascript
复制
2020-09-28 16:46:00,155 INFO  org.apache.beam.runners.fnexecution.environment.ExternalEnvironmentFactory  - Still waiting for startup of environment from localhost:50000 for worker id 1-1

不确定我错过了什么。我检查了工作池上的/tmp/staging/pickled_main_session,它是空的。

注意:这个问题与前面的问题类似。How do I run Beam Python pipelines using Flink deployed on Kubernetes? Running Apache Beam python pipelines in Kubernetes

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-09-29 07:16:21

默认情况下(在撰写本文时),梁将运行时依赖项(“工件”)存放到某个目录(默认情况下为/tmp/stage),该目录需要作业服务器(在您的示例中为客户端)和梁工作器都可以访问。

您可以通过设置--flink_submit_uber_jar管道选项来解决此问题。当设置了--flink_submit_uber_jar时,Beam将所有依赖项包装在一个jar中,该jar将提交给Flink。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/64106427

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档