尝试按照Flink Kubernetes说明here在minikube上运行word count example,但作业从未完成。Python Beam SDK worker pooler似乎不做任何工作。
除了配置Flink Kubernetes集群的说明之外,我还在taskmanager部署中添加了一个Python SDK工作池。如果我理解正确的话,worker池的目的是执行管道的Python部分。请参阅完整的k8s部署。
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-taskmanager
namespace: flink-test
spec:
replicas: 1
selector:
matchLabels:
app: flink
component: taskmanager
template:
metadata:
labels:
app: flink
component: taskmanager
spec:
containers:
- name: taskmanager
image: flink:1.10.2-scala_2.11
workingDir: /opt/flink
command: ["/bin/bash", "-c", "$FLINK_HOME/bin/taskmanager.sh start; \
while :;
do
if [[ -f $(find log -name '*taskmanager*.log' -print -quit) ]];
then tail -f -n +1 log/*taskmanager*.log;
fi;
done"]
ports:
- containerPort: 6122
name: rpc
livenessProbe:
tcpSocket:
port: 6122
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf/
securityContext:
runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
- name: beam-worker-pool
image: apache/beam_python3.7_sdk:2.24.0
args: ["--worker_pool"]
ports:
- containerPort: 50000
name: pool
livenessProbe:
tcpSocket:
port: 50000
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf/
securityContext:
runAsUser: 9999
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j.properties
path: log4j.properties我按如下方式运行该示例:
python -m apache_beam.examples.wordcount \
--output /tmp/results/count \
--runner FlinkRunner \
--flink_master=localhost:8081 \
--environment_type=EXTERNAL \
--environment_config=localhost:50000我使用了https://beam.apache.org/documentation/runtime/sdk-harness-config/上的文档来设置environment_type和environment_config的值。
作业被添加到作业管理器中,我可以在Flink UI中查看它,但作业永远不会完成。我开始查看容器日志,注意到started worker-pool有以下日志:
Starting worker with command ['/opt/apache/beam/boot', '--id=1-1', '--logging_endpoint=localhost:46005', '--artifact_endpoint=localhost:43851', '--provision_endpoint=localhost:37079', '--control_endpoint=localhost:37961']
2020/09/28 16:44:00 Provision info:
pipeline_options:<fields: fields: > fields: > fields: > fields: > > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > > > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > > logging_endpoint: artifact_endpoint: control_endpoint: dependencies:
2020/09/28 16:44:00 Initializing python harness: /opt/apache/beam/boot --id=1-1 --logging_endpoint=localhost:46005 --artifact_endpoint=localhost:43851 --provision_endpoint=localhost:37079 --control_endpoint=localhost:37961
2020/09/28 16:44:08 Failed to retrieve staged files: failed to retrieve /tmp/staged in 3 attempts: failed to retrieve chunk for /tmp/staged/pickled_main_session
caused by:
rpc error: code = Unknown desc = ; failed to retrieve chunk for /tmp/staged/pickled_main_session
caused by:
rpc error: code = Unknown desc = ; failed to retrieve chunk for /tmp/staged/pickled_main_session
caused by:
rpc error: code = Unknown desc = ; failed to retrieve chunk for /tmp/staged/pickled_main_session
caused by:
rpc error: code = Unknown desc = 同样,任务管理器也在记录:
2020-09-28 16:46:00,155 INFO org.apache.beam.runners.fnexecution.environment.ExternalEnvironmentFactory - Still waiting for startup of environment from localhost:50000 for worker id 1-1不确定我错过了什么。我检查了工作池上的/tmp/staging/pickled_main_session,它是空的。
注意:这个问题与前面的问题类似。How do I run Beam Python pipelines using Flink deployed on Kubernetes? Running Apache Beam python pipelines in Kubernetes
发布于 2020-09-29 07:16:21
默认情况下(在撰写本文时),梁将运行时依赖项(“工件”)存放到某个目录(默认情况下为/tmp/stage),该目录需要作业服务器(在您的示例中为客户端)和梁工作器都可以访问。
您可以通过设置--flink_submit_uber_jar管道选项来解决此问题。当设置了--flink_submit_uber_jar时,Beam将所有依赖项包装在一个jar中,该jar将提交给Flink。
https://stackoverflow.com/questions/64106427
复制相似问题