我目前有一个运行在独立Kubernetes (v1.16)中的Flink (1.12)集群。出于我们的目的,我们采用了application cluster模式部署。
为了使我们的flink集群对故障更有弹性,我们希望将HA添加到我们当前的设置中,我已经阅读了文档,并遵循了为我们的给定设置(here)推荐的示例配置。
flink-conf.yaml
jobmanager.rpc.address: {{ $fullName }}-jobmanager
jobmanager.rpc.port: 6123
jobmanager.memory.process.size: 1600m
taskmanager.numberOfTaskSlots: 2
taskmanager.rpc.port: 6122
taskmanager.memory.process.size: 1728m
blob.server.port: 6124
queryable-state.proxy.ports: 6125
parallelism.default: 2
scheduler-mode: reactive
execution.checkpointing.interval: 10s
high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.cluster-id: thoros-cluster-1
high-availability.storageDir: s3:///company-flink-{{ .Values.environment }}/recoveryjob.yaml (摘录)
...
restartPolicy: OnFailure
containers:
- name: jobmanager
image: "{{ .Values.thoros.image.repository }}:{{ .Chart.AppVersion }}"
imagePullPolicy: {{ default "Always" .Values.thoros.image.pullPolicy }}
env:
- name: POD_IP
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: status.podIP
envFrom:
- configMapRef:
name: {{ $fullName }}
# The following args overwrite the value of jobmanager.rpc.address configured in the configuration config map to POD_IP.
args: [
"standalone-job",
"--host",
"$(POD_IP)",
"--job-classname",
"com.company.beam.Main"]当然,我还省略了一些其他配置(如果需要,我很乐意提供这些配置)。
为了测试,我将作业并行度设置为2(这将启动两个JobManagers,其中一个应该是备用的)
当尝试将其部署到K8时,JobManager pods会立即失败,并出现以下错误-除了似乎缺少了一些东西从而导致Nullpointerexception之外,我不确定这里可能缺少了什么
2021-08-20 12:06:55,133 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Initializing cluster services.
2021-08-20 12:06:55,176 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils [] - Trying to start actor system, external address 100.107.0.5:6123, bind address 0.0.0.0:6123.
2021-08-20 12:06:56,956 INFO akka.event.slf4j.Slf4jLogger [] - Slf4jLogger started
2021-08-20 12:06:57,067 INFO akka.remote.Remoting [] - Starting remoting
2021-08-20 12:06:57,469 INFO akka.remote.Remoting [] - Remoting started; listening on addresses :[akka.tcp://flink@100.107.0.5:6123]
2021-08-20 12:06:57,687 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils [] - Actor system started at akka.tcp://flink@100.107.0.5:6123
2021-08-20 12:06:58,671 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Shutting StandaloneApplicationClusterEntryPoint down with application status FAILED. Diagnostics org.apache.flink.util.FlinkException: Could not create the ha services from the instantiated HighAvailabilityServicesFactory org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory.
at org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createCustomHAServices(HighAvailabilityServicesUtils.java:268)
at org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:124)
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:338)
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:296)
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:224)
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:178)
at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:175)
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:585)
at org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint.main(StandaloneApplicationClusterEntryPoint.java:85)
Caused by: java.lang.NullPointerException
at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:59)
at org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient.<init>(Fabric8FlinkKubeClient.java:85)
at org.apache.flink.kubernetes.kubeclient.FlinkKubeClientFactory.fromConfiguration(FlinkKubeClientFactory.java:106)q
at org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory.createHAServices(KubernetesHaServicesFactory.java:37)
at org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createCustomHAServices(HighAvailabilityServicesUtils.java:265)
... 9 more
.
2021-08-20 12:06:58,684 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService [] - Stopping Akka RPC service.
2021-08-20 12:06:58,754 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator [] - Shutting down remote daemon.
2021-08-20 12:06:58,767 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator [] - Remote daemon shut down; proceeding with flushing remote transports.
2021-08-20 12:06:58,833 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator [] - Remoting shut down.
2021-08-20 12:06:58,882 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService [] - Stopped Akka RPC service.
2021-08-20 12:06:58,882 ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Could not start cluster entrypoint StandaloneApplicationClusterEntryPoint.
org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to initialize the cluster entrypoint StandaloneApplicationClusterEntryPoint.
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:201) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:585) [flink-dist_2.12-1.12.5.jar:1.12.5]
at org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint.main(StandaloneApplicationClusterEntryPoint.java:85) [flink-dist_2.12-1.12.5.jar:1.12.5]
Caused by: org.apache.flink.util.FlinkException: Could not create the ha services from the instantiated HighAvailabilityServicesFactory org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory.
at org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createCustomHAServices(HighAvailabilityServicesUtils.java:268) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
at org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:124) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:338) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:296) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:224) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:178) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:175) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
... 2 more
Caused by: java.lang.NullPointerException
at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:59) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
at org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient.<init>(Fabric8FlinkKubeClient.java:85) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
at org.apache.flink.kubernetes.kubeclient.FlinkKubeClientFactory.fromConfiguration(FlinkKubeClientFactory.java:106) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
at org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory.createHAServices(KubernetesHaServicesFactory.java:37) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
at org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createCustomHAServices(HighAvailabilityServicesUtils.java:265) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
at org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:124) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:338) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:296) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:224) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:178) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28) ~[flink-dist_2.12-1.12.5.jar:1.12.5]
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:175) ~[flink-dist_2.12-1.12.5.jar:1.12.5]发布于 2021-08-25 08:29:58
该问题是由于在本应为kubernetes.cluster-id的情况下使用high-availability.cluster-id造成的。
https://stackoverflow.com/questions/68904352
复制相似问题