“kubelet 删除 Pod 前的逻辑分析,参考的代码为 1.28-1.32 版本,社区持续迭代,细节会有区别,仅供参考。
Kubelet 感知到用户发起了删除 Pod 的操作后,需要做的主要有以下几件事:

当 Pod 被删除时,会首先将请求提交到 API server(如果由 controller 管理的 Pod,也是如此)。API server 在收到删除请求后,会为 Pod 添加 metadata.deletionTimestamp 字段。
以下是一个发送到 API server 的 delete 请求的示例:

Kubelet 通过 ListWatch 方式感知 API server 的 Pod 配置改变。
“根据 Selector spec.nodeName 过滤,只处理调度到当前节点的 Pod。
// NewSourceApiserver creates a config source that watches and pulls from the apiserver.
func NewSourceApiserver(c clientset.Interface, nodeName types.NodeName, nodeHasSynced func() bool, updates chan<- interface{}) {
lw := cache.NewListWatchFromClient(c.CoreV1().RESTClient(), "pods", metav1.NamespaceAll, fields.OneTermEqualSelector("spec.nodeName", string(nodeName)))
// The Reflector responsible for watching pods at the apiserver should be run only after
// the node sync with the apiserver has completed.
klog.InfoS("Waiting for node sync before watching apiserver pods")
gofunc() {
for {
if nodeHasSynced() {
klog.V(4).InfoS("node sync completed")
break
}
time.Sleep(WaitForAPIServerSyncPeriod)
klog.V(4).InfoS("node sync has not completed yet")
}
klog.InfoS("Watching apiserver")
newSourceApiserverFromLW(lw, updates)
}()
}
处理 Pod 配置改变之前,kubelet 会先对这种【改变】进行分类,以便给不同的 handler:
func (s *podStorage) merge(source string, change interface{}) (adds, updates, deletes, removes, reconciles *kubetypes.PodUpdate) {
s.podLock.Lock()
defer s.podLock.Unlock()
addPods := []*v1.Pod{}
updatePods := []*v1.Pod{}
deletePods := []*v1.Pod{}
removePods := []*v1.Pod{}
reconcilePods := []*v1.Pod{}
pods := s.pods[source]
if pods == nil {
pods = make(map[types.UID]*v1.Pod)
}
// updatePodFunc is the local function which updates the pod cache *oldPods* with new pods *newPods*.
// After updated, new pod will be stored in the pod cache *pods*.
// Notice that *pods* and *oldPods* could be the same cache.
updatePodsFunc := func(newPods []*v1.Pod, oldPods, pods map[types.UID]*v1.Pod) {
filtered := filterInvalidPods(newPods, source, s.recorder)
for _, ref := range filtered {
// Annotate the pod with the source before any comparison.
if ref.Annotations == nil {
ref.Annotations = make(map[string]string)
}
ref.Annotations[kubetypes.ConfigSourceAnnotationKey] = source
// ignore static pods
if !kubetypes.IsStaticPod(ref) {
s.startupSLIObserver.ObservedPodOnWatch(ref, time.Now())
}
if existing, found := oldPods[ref.UID]; found {
pods[ref.UID] = existing
needUpdate, needReconcile, needGracefulDelete := checkAndUpdatePod(existing, ref)
if needUpdate {
updatePods = append(updatePods, existing)
} elseif needReconcile {
reconcilePods = append(reconcilePods, existing)
} elseif needGracefulDelete {
deletePods =append(deletePods, existing)
}
continue
}
recordFirstSeenTime(ref)
pods[ref.UID] = ref
addPods = append(addPods, ref)
}
}
update := change.(kubetypes.PodUpdate)
switch update.Op {
// 省略部分内容
case kubetypes.SET:
klog.V(4).InfoS("Setting pods for source","source", source)
s.markSourceSet(source)
// Clear the old map entries by just creating a new map
oldPods := pods
pods = make(map[types.UID]*v1.Pod)
updatePodsFunc(update.Pods, oldPods, pods)
for uid, existing := range oldPods {
if _, found := pods[uid]; !found {
// this is a delete
removePods = append(removePods, existing)
}
}
default:
klog.InfoS("Received invalid update type", "type", update)
}
s.pods[source] = pods
//省略部分逻辑
deletes = &kubetypes.PodUpdate{Op: kubetypes.DELETE, Pods: copyPods(deletePods), Source: source}
return adds, updates, deletes, removes, reconciles
}
只要 Pod 的 metadata.deletionTimestamp 不为空,就需要走 DELETE 逻辑:
// checkAndUpdatePod updates existing, and:
// - if ref makes a meaningful change, returns needUpdate=true
// - if ref makes a meaningful change, and this change is graceful deletion, returns needGracefulDelete=true
// - if ref makes no meaningful change, but changes the pod status, returns needReconcile=true
// - else return all false
// Now, needUpdate, needGracefulDelete and needReconcile should never be both true
func checkAndUpdatePod(existing, ref *v1.Pod) (needUpdate, needReconcile, needGracefulDelete bool) {
// 省略部分内容
// 2. this is an graceful delete
if ref.DeletionTimestamp !=nil {
needGracefulDelete =true
} else {
// 3. this is an update
needUpdate = true
}
return
}
回到 kubelet 的主逻辑syncLoop,syncLoopIteration 从 channel 读取 Pod 处理事件并分发给不同的 handler 进行处理。当收到 DELETE 事件,会打印以下日志:
Sep 02 16:43:25 iv-ye2vaomk8wwh2yp48s1p kubelet[376496]: I0902 16:43:25.113416 376496 kubelet.go:2484] "SyncLoop DELETE" source="api" pods=["default/ebs-dp-5c9c948759-86xtq"]
处理逻辑如下:
// HandlePodUpdates is the callback in the SyncHandler interface for pods
// being updated from a config source.
func (kl *Kubelet) HandlePodUpdates(pods []*v1.Pod) {
start := kl.clock.Now()
for _, pod := range pods {
kl.podManager.UpdatePod(pod)
pod, mirrorPod, wasMirror := kl.podManager.GetPodAndMirrorPod(pod)
if wasMirror {
if pod == nil {
klog.V(2).InfoS("Unable to find pod for mirror pod, skipping", "mirrorPod", klog.KObj(mirrorPod), "mirrorPodUID", mirrorPod.UID)
continue
}
}
kl.podWorkers.UpdatePod(UpdatePodOptions{
Pod: pod,
MirrorPod: mirrorPod,
UpdateType: kubetypes.SyncPodUpdate,
StartTime: start,
})
}
}
UpdatePod的注释解释了更新 Pod 的具体步骤:
// UpdatePod notifies the pod worker of a change to a pod, which will then
// be processed in FIFO order by a goroutine per pod UID. The state of the
// pod will be passed to the syncPod method until either the pod is marked
// as deleted, it reaches a terminal phase (Succeeded/Failed), or the pod
// is evicted by the kubelet. Once that occurs the syncTerminatingPod method
// will be called until it exits successfully, and after that all further
// UpdatePod() calls will be ignored for that pod until it has been forgotten
// due to significant time passing. A pod that is terminated will never be
// restarted.
UpdatePod(options UpdatePodOptions)
UpdatePod的逻辑比较复杂,首先,看下该方法的参数UpdatePodOptions:
// UpdatePodOptions is an options struct to pass to a UpdatePod operation.
type UpdatePodOptions struct {
// The type of update (create, update, sync, kill).
UpdateType kubetypes.SyncPodType
// StartTime is an optional timestamp for when this update was created. If set,
// when this update is fully realized by the pod worker it will be recorded in
// the PodWorkerDuration metric.
StartTime time.Time
// Pod to update. Required.
Pod *v1.Pod
// MirrorPod is the mirror pod if Pod is a static pod. Optional when UpdateType
// is kill or terminated.
MirrorPod *v1.Pod
// RunningPod is a runtime pod that is no longer present in config. Required
// if Pod is nil, ignored if Pod is set.
RunningPod *kubecontainer.Pod
// KillPodOptions is used to override the default termination behavior of the
// pod or to update the pod status after an operation is completed. Since a
// pod can be killed for multiple reasons, PodStatusFunc is invoked in order
// and later kills have an opportunity to override the status (i.e. a preemption
// may be later turned into an eviction).
KillPodOptions *KillPodOptions
}
这里面包括一个关键信息:
需要的信息包括:
func (p *podWorkers) UpdatePod(options UpdatePodOptions) {
var isRuntimePod bool
var uid types.UID
var name, ns string
if runningPod := options.RunningPod; runningPod != nil {
if options.Pod == nil {
// the sythetic pod created here is used only as a placeholder and not tracked
if options.UpdateType != kubetypes.SyncPodKill {
klog.InfoS("Pod update is ignored, runtime pods can only be killed", "pod", klog.KRef(runningPod.Namespace, runningPod.Name), "podUID", runningPod.ID, "updateType", options.UpdateType)
return
}
uid, ns, name = runningPod.ID, runningPod.Namespace, runningPod.Name
isRuntimePod = true
} else {
options.RunningPod = nil
uid, ns, name = options.Pod.UID, options.Pod.Namespace, options.Pod.Name
klog.InfoS("Pod update included RunningPod which is only valid when Pod is not specified", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType)
}
} else {
uid, ns, name = options.Pod.UID, options.Pod.Namespace, options.Pod.Name
}
// 省略部分内容
}
简单来说,如果 Pod 为 nil,会使用 runningPod 的信息,但是仅 UpdateType 为 SyncPodKill 的时候,才会处理。否则,使用 Pod 的信息进行处理。
Pod 在节点的状态记录在 podWorkers 的 podSyncStatuses 中:
type podWorkers struct {
// 省略部分内容
// Tracks by UID the termination status of a pod - syncing, terminating,
// terminated, and evicted.
podSyncStatuses map[types.UID]*podSyncStatus
// 省略部分内容
}
podSyncStatuses 的 key 为 Pod uid,如果第一次被处理,会先初始化 podSyncStatus:
func (p *podWorkers) UpdatePod(options UpdatePodOptions) {
status, ok := p.podSyncStatuses[uid]
if !ok {
klog.V(4).InfoS("Pod is being synced for the first time", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType)
firstTime = true
status = &podSyncStatus{
syncedAt: now,
fullname: kubecontainer.BuildPodFullName(name, ns),
}
// if this pod is being synced for the first time, we need to make sure it is an active pod
if options.Pod != nil && (options.Pod.Status.Phase == v1.PodFailed || options.Pod.Status.Phase == v1.PodSucceeded) {
// Check to see if the pod is not running and the pod is terminal; if this succeeds then record in the podWorker that it is terminated.
// This is needed because after a kubelet restart, we need to ensure terminal pods will NOT be considered active in Pod Admission. See http://issues.k8s.io/105523
// However, `filterOutInactivePods`, considers pods that are actively terminating as active. As a result, `IsPodKnownTerminated()` needs to return true and thus `terminatedAt` needs to be set.
if statusCache, err := p.podCache.Get(uid); err == nil {
if isPodStatusCacheTerminal(statusCache) {
// At this point we know:
// (1) The pod is terminal based on the config source.
// (2) The pod is terminal based on the runtime cache.
// This implies that this pod had already completed `SyncTerminatingPod` sometime in the past. The pod is likely being synced for the first time due to a kubelet restart.
// These pods need to complete SyncTerminatedPod to ensure that all resources are cleaned and that the status manager makes the final status updates for the pod.
// As a result, set finished: false, to ensure a Terminated event will be sent and `SyncTerminatedPod` will run.
status = &podSyncStatus{
terminatedAt: now,
terminatingAt: now,
syncedAt: now,
startedTerminating: true,
finished: false,
fullname: kubecontainer.BuildPodFullName(name, ns),
}
}
}
}
p.podSyncStatuses[uid] = status
}
// 省略部分内容
}
通过以下逻辑判断是否要开始终止某个 Pod:
func (p *podWorkers) UpdatePod(options UpdatePodOptions) {
// check for a transition to terminating
var becameTerminating bool
if !status.IsTerminationRequested() {
switch {
case isRuntimePod:
klog.V(4).InfoS("Pod is orphaned and must be torn down", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType)
status.deleted = true
status.terminatingAt = now
becameTerminating = true
case pod.DeletionTimestamp != nil:
klog.V(4).InfoS("Pod is marked for graceful deletion, begin teardown", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType)
status.deleted = true
status.terminatingAt = now
becameTerminating = true
case pod.Status.Phase == v1.PodFailed, pod.Status.Phase == v1.PodSucceeded:
klog.V(4).InfoS("Pod is in a terminal phase (success/failed), begin teardown", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType)
status.terminatingAt = now
becameTerminating = true
case options.UpdateType == kubetypes.SyncPodKill:
if options.KillPodOptions != nil && options.KillPodOptions.Evict {
klog.V(4).InfoS("Pod is being evicted by the kubelet, begin teardown", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType)
status.evicted = true
} else {
klog.V(4).InfoS("Pod is being removed by the kubelet, begin teardown", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType)
}
status.terminatingAt = now
becameTerminating = true
}
}
// 省略部分内容
}
正常删除 Pod 的情况下,会走到 DeletionTimestamp 不为 nil 的逻辑,有以下日志:
“日志等级为 4
Sep 05 14:33:07 iv-ye2t6bkyrkqc6ilj5ztn kubelet[2194045]: I0905 14:33:07.036644 2194045 pod_workers.go:854] "Pod is marked for graceful deletion, begin teardown" pod="default/ebs-dp-78bc4957b8-57kbs" podUID="8bd2d0ac-f534-45b8-afc2-bc68d1b48caf" updateType="update"
status.IsTerminationRequested()返回 true,也就是 terminatingAt 不为 0,表示需要走 terminate 逻辑,其它情况仅重置KillPodOptions:
func (p *podWorkers) UpdatePod(options UpdatePodOptions) {
// once a pod is terminating, all updates are kills and the grace period can only decrease
var wasGracePeriodShortened bool
switch {
case status.IsTerminated():
// A terminated pod may still be waiting for cleanup - if we receive a runtime pod kill request
// due to housekeeping seeing an older cached version of the runtime pod simply ignore it until
// after the pod worker completes.
if isRuntimePod {
klog.V(3).InfoS("Pod is waiting for termination, ignoring runtime-only kill until after pod worker is fully terminated", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType)
return
}
if options.KillPodOptions != nil {
if ch := options.KillPodOptions.CompletedCh; ch != nil {
close(ch)
}
}
options.KillPodOptions = nil
case status.IsTerminationRequested():
if options.KillPodOptions == nil {
options.KillPodOptions = &KillPodOptions{}
}
if ch := options.KillPodOptions.CompletedCh; ch != nil {
status.notifyPostTerminating = append(status.notifyPostTerminating, ch)
}
if fn := options.KillPodOptions.PodStatusFunc; fn != nil {
status.statusPostTerminating = append(status.statusPostTerminating, fn)
}
gracePeriod, gracePeriodShortened := calculateEffectiveGracePeriod(status, pod, options.KillPodOptions)
wasGracePeriodShortened = gracePeriodShortened
status.gracePeriod = gracePeriod
// always set the grace period for syncTerminatingPod so we don't have to recalculate,
// will never be zero.
options.KillPodOptions.PodTerminationGracePeriodSecondsOverride = &gracePeriod
default:
// KillPodOptions is not valid for sync actions outside of the terminating phase
if options.KillPodOptions != nil {
if ch := options.KillPodOptions.CompletedCh; ch != nil {
close(ch)
}
options.KillPodOptions = nil
}
}
// 省略部分内容
}
其中,calculateEffectiveGracePeriod用于计算优雅退出时间:
// calculateEffectiveGracePeriod sets the initial grace period for a newly terminating pod or allows a
// shorter grace period to be provided, returning the desired value.
func calculateEffectiveGracePeriod(status *podSyncStatus, pod *v1.Pod, options *KillPodOptions) (int64, bool) {
// enforce the restriction that a grace period can only decrease and track whatever our value is,
// then ensure a calculated value is passed down to lower levels
gracePeriod := status.gracePeriod
// this value is bedrock truth - the apiserver owns telling us this value calculated by apiserver
if override := pod.DeletionGracePeriodSeconds; override != nil {
if gracePeriod == 0 || *override < gracePeriod {
gracePeriod = *override
}
}
// we allow other parts of the kubelet (namely eviction) to request this pod be terminated faster
if options != nil {
if override := options.PodTerminationGracePeriodSecondsOverride; override != nil {
if gracePeriod == 0 || *override < gracePeriod {
gracePeriod = *override
}
}
}
// make a best effort to default this value to the pod's desired intent, in the event
// the kubelet provided no requested value (graceful termination?)
if gracePeriod == 0 && pod.Spec.TerminationGracePeriodSeconds != nil {
gracePeriod = *pod.Spec.TerminationGracePeriodSeconds
}
// no matter what, we always supply a grace period of 1
if gracePeriod < 1 {
gracePeriod = 1
}
return gracePeriod, status.gracePeriod != 0 && status.gracePeriod != gracePeriod
}
如果第一次处理,需要初始化 podUpdates:
func (p *podWorkers) UpdatePod(options UpdatePodOptions) {
// start the pod worker goroutine if it doesn't exist
podUpdates, exists := p.podUpdates[uid]
if !exists {
// buffer the channel to avoid blocking this method
podUpdates = make(chanstruct{}, 1)
p.podUpdates[uid] = podUpdates
// ensure that static pods start in the order they are received by UpdatePod
if kubetypes.IsStaticPod(pod) {
p.waitingToStartStaticPodsByFullname[status.fullname] =
append(p.waitingToStartStaticPodsByFullname[status.fullname], uid)
}
// allow testing of delays in the pod update channel
var outCh <-chanstruct{}
if p.workerChannelFn != nil {
outCh = p.workerChannelFn(uid, podUpdates)
} else {
outCh = podUpdates
}
// spawn a pod worker
gofunc() {
// TODO: this should be a wait.Until with backoff to handle panics, and
// accept a context for shutdown
defer runtime.HandleCrash()
defer klog.V(3).InfoS("Pod worker has stopped", "podUID", uid)
p.podWorkerLoop(uid, outCh)
}()
}
// 省略部分内容
}
如果已经存在 podUpdates,通过 channel 告诉 worker 有新的 update 需要处理:
func (p *podWorkers) UpdatePod(options UpdatePodOptions) {
// notify the pod worker there is a pending update
status.pendingUpdate = &options
status.working = true
klog.V(4).InfoS("Notifying pod of pending update", "pod", klog.KRef(ns, name), "podUID", uid, "workType", status.WorkType())
select {
case podUpdates <- struct{}{}:
default:
}
// 省略部分内容
}
正常删除 Pod 的逻辑,会向 worker 的 channel 发送 terminating 类型的 update,对应日志:
Sep 05 14:33:07 iv-ye2t6bkyrkqc6ilj5ztn kubelet[2194045]: I0905 14:33:07.036659 2194045 pod_workers.go:963] "Notifying pod of pending update" pod="default/ebs-dp-78bc4957b8-57kbs" podUID="8bd2d0ac-f534-45b8-afc2-bc68d1b48caf" workType="terminating"
podWorkerLoop 是主要逻辑,每个 uid 对应一个:
// podWorkerLoop manages sequential state updates to a pod in a goroutine, exiting once the final
// state is reached. The loop is responsible for driving the pod through four main phases:
//
// 1. Wait to start, guaranteeing no two pods with the same UID or same fullname are running at the same time
// 2. Sync, orchestrating pod setup by reconciling the desired pod spec with the runtime state of the pod
// 3. Terminating, ensuring all running containers in the pod are stopped
// 4. Terminated, cleaning up any resources that must be released before the pod can be deleted
//
// The podWorkerLoop is driven by updates delivered to UpdatePod and by SyncKnownPods. If a particular
// sync method fails, p.workerQueue is updated with backoff but it is the responsibility of the kubelet
// to trigger new UpdatePod calls. SyncKnownPods will only retry pods that are no longer known to the
// caller. When a pod transitions working->terminating or terminating->terminated, the next update is
// queued immediately and no kubelet action is required.
有 4 种状态:
podWorkerLoop 持续处理 podUpdates channel 发过来的 update:
func (p *podWorkers) podWorkerLoop(podUID types.UID, podUpdates <-chan struct{}) {
var lastSyncTime time.Time
forrange podUpdates {
ctx, update, canStart, canEverStart, ok := p.startPodSync(podUID)
// If we had no update waiting, it means someone initialized the channel without filling out pendingUpdate.
if !ok {
continue
}
// If the pod was terminated prior to the pod being allowed to start, we exit the loop.
if !canEverStart {
return
}
// If the pod is not yet ready to start, continue and wait for more updates.
if !canStart {
continue
}
// 省略部分内容
}
}
startPodSync 消费 pending update,主要做一些状态检查,并且通过加锁避免同一时刻有多个 worker 在处理同一个 UID 相同的 Pod:
// startPodSync is invoked by each pod worker goroutine when a message arrives on the pod update channel.
// This method consumes a pending update, initializes a context, decides whether the pod is already started
// or can be started, and updates the cached pod state so that downstream components can observe what the
// pod worker goroutine is currently attempting to do. If ok is false, there is no available event. If any
// of the boolean values is false, ensure the appropriate cleanup happens before returning.
//
// This method should ensure that either status.pendingUpdate is cleared and merged into status.activeUpdate,
// or when a pod cannot be started status.pendingUpdate remains the same. Pods that have not been started
// should never have an activeUpdate because that is exposed to downstream components on started pods.
status.WorkType() 生成 WorkType:
func (p *podWorkers) startPodSync(podUID types.UID) (ctx context.Context, update podWork, canStart, canEverStart, ok bool) {
p.podLock.Lock()
defer p.podLock.Unlock()
// verify we are known to the pod worker still
status, ok := p.podSyncStatuses[podUID]
if !ok {
// pod status has disappeared, the worker should exit
klog.V(4).InfoS("Pod worker no longer has status, worker should exit", "podUID", podUID)
returnnil, update, false, false, false
}
if !status.working {
// working is used by unit tests to observe whether a worker is currently acting on this pod
klog.V(4).InfoS("Pod should be marked as working by the pod worker, programmer error", "podUID", podUID)
}
if status.pendingUpdate == nil {
// no update available, this means we were queued without work being added or there is a
// race condition, both of which are unexpected
status.working = false
klog.V(4).InfoS("Pod worker received no pending work, programmer error?", "podUID", podUID)
returnnil, update, false, false, false
}
// consume the pending update
update.WorkType = status.WorkType()
update.Options = *status.pendingUpdate
status.pendingUpdate = nil
select {
case <-p.podUpdates[podUID]:
// ensure the pod update channel is empty (it is only ever written to under lock)
default:
}
// 省略部分内容
}
具体逻辑如下:
func (s *podSyncStatus) IsTerminationRequested() bool { return !s.terminatingAt.IsZero() }
func (s *podSyncStatus) IsTerminated() bool { return !s.terminatedAt.IsZero() }
// WorkType returns this pods' current state of the pod in pod lifecycle state machine.
func (s *podSyncStatus) WorkType() PodWorkerState {
if s.IsTerminated() {
return TerminatedPod
}
if s.IsTerminationRequested() {
return TerminatingPod
}
return SyncPod
}
简单理解,status.terminatingAt 不为 0,update 类型为 TerminatingPod,s.terminatedAt 不为 0,update 类型为 TerminatedPod。
从前面的逻辑可以看到,只要 DeletiationTimestamp 不为 nil,就会设置 status.terminatingAt,换句话说,当我们通过kubectl delete pod xxx命令发起删除 Pod 的请求,kubelet 感知到后,开始走 TerminatingPod 流程。
上述 Update 的不同类型对应不同的处理函数:
这 4 个方法对应 podSyncer 的全部内容:
// podSyncer describes the core lifecyle operations of the pod state machine. A pod is first
// synced until it naturally reaches termination (true is returned) or an external agent decides
// the pod should be terminated. Once a pod should be terminating, SyncTerminatingPod is invoked
// until it returns no error. Then the SyncTerminatedPod method is invoked until it exits without
// error, and the pod is considered terminal. Implementations of this interface must be threadsafe
// for simultaneous invocation of these methods for multiple pods.
type podSyncer interface {
// SyncPod configures the pod and starts and restarts all containers. If it returns true, the
// pod has reached a terminal state and the presence of the error indicates succeeded or failed.
// If an error is returned, the sync was not successful and should be rerun in the future. This
// is a long running method and should exit early with context.Canceled if the context is canceled.
SyncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod *v1.Pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (bool, error)
// SyncTerminatingPod attempts to ensure the pod's containers are no longer running and to collect
// any final status. This method is repeatedly invoked with diminishing grace periods until it exits
// without error. Once this method exits with no error other components are allowed to tear down
// supporting resources like volumes and devices. If the context is canceled, the method should
// return context.Canceled unless it has successfully finished, which may occur when a shorter
// grace period is detected.
SyncTerminatingPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, gracePeriod *int64, podStatusFn func(*v1.PodStatus)) error
// SyncTerminatingRuntimePod is invoked when running containers are found that correspond to
// a pod that is no longer known to the kubelet to terminate those containers. It should not
// exit without error unless all containers are known to be stopped.
SyncTerminatingRuntimePod(ctx context.Context, runningPod *kubecontainer.Pod) error
// SyncTerminatedPod is invoked after all running containers are stopped and is responsible
// for releasing resources that should be executed right away rather than in the background.
// Once it exits without error the pod is considered finished on the node.
SyncTerminatedPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus) error
}
SyncPod 确保 Pod 达到期望的状态,参考 kubelet 创建 Pod 前发生了什么?
// SyncPod is the transaction script for the sync of a single pod (setting up)
// a pod. This method is reentrant and expected to converge a pod towards the
// desired state of the spec. The reverse (teardown) is handled in
// SyncTerminatingPod and SyncTerminatedPod. If SyncPod exits without error,
// then the pod runtime state is in sync with the desired configuration state
// (pod is running). If SyncPod exits with a transient error, the next
// invocation of SyncPod is expected to make progress towards reaching the
// desired state. SyncPod exits with isTerminal when the pod was detected to
// have reached a terminal lifecycle phase due to container exits (for
// RestartNever or RestartOnFailure) and the next method invoked will be
// SyncTerminatingPod. If the pod terminates for any other reason, SyncPod
// will receive a context cancellation and should exit as soon as possible.
//
// Arguments:
//
// updateType - whether this is a create (first time) or an update, should
// only be used for metrics since this method must be reentrant
//
// pod - the pod that is being set up
//
// mirrorPod - the mirror pod known to the kubelet for this pod, if any
//
// podStatus - the most recent pod status observed for this pod which can
// be used to determine the set of actions that should be taken during
// this loop of SyncPod
//
// The workflow is:
// - If the pod is being created, record pod worker start latency
// - Call generateAPIPodStatus to prepare an v1.PodStatus for the pod
// - If the pod is being seen as running for the first time, record pod
// start latency
// - Update the status of the pod in the status manager
// - Stop the pod's containers if it should not be running due to soft
// admission
// - Ensure any background tracking for a runnable pod is started
// - Create a mirror pod if the pod is a static pod, and does not
// already have a mirror pod
// - Create the data directories for the pod if they do not exist
// - Wait for volumes to attach/mount
// - Fetch the pull secrets for the pod
// - Call the container runtime's SyncPod callback
// - Update the traffic shaping for the pod's ingress and egress limits
//
// If any step of this workflow errors, the error is returned, and is repeated
// on the next SyncPod call.
//
// This operation writes all events that are dispatched in order to provide
// the most accurate information possible about an error situation to aid debugging.
// Callers should not write an event if this operation returns an error.
这个方法我们主要关注什么时候返回 isTerminal,表示容器已经退出:
func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (isTerminal bool, err error) {
// 省略其它内容
// If the pod is terminal, we don't need to continue to setup the pod
if apiPodStatus.Phase == v1.PodSucceeded || apiPodStatus.Phase == v1.PodFailed {
kl.statusManager.SetPodStatus(pod, apiPodStatus)
isTerminal = true
return isTerminal, nil
}
// 省略其它内容
}
该方法确保停掉 Pod 中所有 Running 状态的容器和 PodSandbox 容器。如果 Pod 被强删并且 kubelet 被重启,不会调用该方法。
// SyncTerminatingPod is expected to terminate all running containers in a pod. Once this method
// returns without error, the pod is considered to be terminated and it will be safe to clean up any
// pod state that is tied to the lifetime of running containers. The next method invoked will be
// SyncTerminatedPod. This method is expected to return with the grace period provided and the
// provided context may be cancelled if the duration is exceeded. The method may also be interrupted
// with a context cancellation if the grace period is shortened by the user or the kubelet (such as
// during eviction). This method is not guaranteed to be called if a pod is force deleted from the
// configuration and the kubelet is restarted - SyncTerminatingRuntimePod handles those orphaned
// pods.
关键日志如下:
Sep 28 17:24:44 iv-ye2t6bkyrkqc6ilj5ztn kubelet[2194045]: I0928 17:24:44.002936 2194045 kubelet.go:2060] "SyncTerminatingPod enter" pod="default/ebs-dp-78bc4957b8-m7v4v" podUID="6a94fb3a-ea69-4776-8f55-437b52e7cbb4"
首先,通过kl.generateAPIPodStatus(pod, podStatus, false)方法生成 status:
func (kl *Kubelet) SyncTerminatingPod(_ context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, gracePeriod *int64, podStatusFn func(*v1.PodStatus)) error {
// 省略部分内容
apiPodStatus := kl.generateAPIPodStatus(pod, podStatus, false)
if podStatusFn != nil {
podStatusFn(&apiPodStatus)
}
kl.statusManager.SetPodStatus(pod, apiPodStatus)
// 省略部分内容
}
generateAPIPodStatus是通用方法,在SyncPod、SyncTerminatingRuntimePod和SyncTerminatedPod都会触发。在该方法会根据 Pod 的 ContainerStatus 计算 Phase,并且生成 Pod 的 Conditions 内容。
生成的 status 交给 status manager 处理,status manager 会更新本地缓存和 API server 的 Pod 信息。
// syncPod syncs the given status with the API server. The caller must not hold the status lock.
func (m *manager) syncPod(uid types.UID, status versionedPodStatus) {
// 省略部分内容
newPod, patchBytes, unchanged, err := statusutil.PatchPodStatus(context.TODO(), m.kubeClient, pod.Namespace, pod.Name, pod.UID, pod.Status, mergedStatus)
klog.V(3).InfoS("Patch status for pod", "pod", klog.KObj(pod), "podUID", uid, "patch", string(patchBytes))
// 省略部分内容
}
接下来,调用 killPod 方法:
func (kl *Kubelet) SyncTerminatingPod(_ context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, gracePeriod *int64, podStatusFn func(*v1.PodStatus)) error {
// 省略部分内容
p := kubecontainer.ConvertPodStatusToRunningPod(kl.getRuntime().Type(), podStatus)
if err := kl.killPod(ctx, pod, p, gracePeriod); err != nil {
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToKillPod, "error killing pod: %v", err)
// there was an error killing the pod, so we return that error directly
utilruntime.HandleError(err)
return err
}
// 省略部分内容
}
killPod 进一步调用 container runtime 的 KillPod 方法:
// killPod instructs the container runtime to kill the pod. This method requires that
// the pod status contains the result of the last syncPod, otherwise it may fail to
// terminate newly created containers and sandboxes.
func (kl *Kubelet) killPod(ctx context.Context, pod *v1.Pod, p kubecontainer.Pod, gracePeriodOverride *int64) error {
// Call the container runtime KillPod method which stops all known running containers of the pod
if err := kl.containerRuntime.KillPod(ctx, pod, p, gracePeriodOverride); err != nil {
return err
}
if err := kl.containerManager.UpdateQOSCgroups(); err != nil {
klog.V(2).InfoS("Failed to update QoS cgroups while killing pod", "err", err)
}
return nil
}
这里的 killPod 是同步的:
// KillPod kills all the containers of a pod. Pod may be nil, running pod must not be.
// gracePeriodOverride if specified allows the caller to override the pod default grace period.
// only hard kill paths are allowed to specify a gracePeriodOverride in the kubelet in order to not corrupt user data.
// it is useful when doing SIGKILL for hard eviction scenarios, or max grace period during soft eviction scenarios.
func (m *kubeGenericRuntimeManager) KillPod(ctx context.Context, pod *v1.Pod, runningPod kubecontainer.Pod, gracePeriodOverride *int64) error {
err := m.killPodWithSyncResult(ctx, pod, runningPod, gracePeriodOverride)
return err.Error()
}
// killPodWithSyncResult kills a runningPod and returns SyncResult.
// Note: The pod passed in could be *nil* when kubelet restarted.
func (m *kubeGenericRuntimeManager) killPodWithSyncResult(ctx context.Context, pod *v1.Pod, runningPod kubecontainer.Pod, gracePeriodOverride *int64) (result kubecontainer.PodSyncResult) {
killContainerResults := m.killContainersWithSyncResult(ctx, pod, runningPod, gracePeriodOverride)
for _, containerResult := range killContainerResults {
result.AddSyncResult(containerResult)
}
// stop sandbox, the sandbox will be removed in GarbageCollect
killSandboxResult := kubecontainer.NewSyncResult(kubecontainer.KillPodSandbox, runningPod.ID)
result.AddSyncResult(killSandboxResult)
// Stop all sandboxes belongs to same pod
for _, podSandbox := range runningPod.Sandboxes {
if err := m.runtimeService.StopPodSandbox(ctx, podSandbox.ID.ID); err != nil && !crierror.IsNotFound(err) {
killSandboxResult.Fail(kubecontainer.ErrKillPodSandbox, err.Error())
klog.ErrorS(nil, "Failed to stop sandbox", "podSandboxID", podSandbox.ID)
}
}
return
}
对多个容器进行 kill 操作并且等待结果是 WorkGroup 并行处理的,这部分在 killContainersWithSyncResult 中实现。
// killContainersWithSyncResult kills all pod's containers with sync results.
func (m *kubeGenericRuntimeManager) killContainersWithSyncResult(ctx context.Context, pod *v1.Pod, runningPod kubecontainer.Pod, gracePeriodOverride *int64) (syncResults []*kubecontainer.SyncResult) {
containerResults := make(chan *kubecontainer.SyncResult, len(runningPod.Containers))
wg := sync.WaitGroup{}
wg.Add(len(runningPod.Containers))
var termOrdering *terminationOrdering
// we only care about container termination ordering if the sidecars feature is enabled
if utilfeature.DefaultFeatureGate.Enabled(features.SidecarContainers) && types.HasRestartableInitContainer(pod) {
var runningContainerNames []string
for _, container := range runningPod.Containers {
runningContainerNames = append(runningContainerNames, container.Name)
}
termOrdering = newTerminationOrdering(pod, runningContainerNames)
}
for _, container := range runningPod.Containers {
gofunc(container *kubecontainer.Container) {
defer utilruntime.HandleCrash()
defer wg.Done()
killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, container.Name)
if err := m.killContainer(ctx, pod, container.ID, container.Name, "", reasonUnknown, gracePeriodOverride, termOrdering); err != nil {
killContainerResult.Fail(kubecontainer.ErrKillContainer, err.Error())
// Use runningPod for logging as the pod passed in could be *nil*.
klog.ErrorS(err, "Kill container failed", "pod", klog.KRef(runningPod.Namespace, runningPod.Name), "podUID", runningPod.ID,
"containerName", container.Name, "containerID", container.ID)
}
containerResults <- killContainerResult
}(container)
}
wg.Wait()
close(containerResults)
for containerResult := range containerResults {
syncResults = append(syncResults, containerResult)
}
return
}
m.killContainer计算 gracePeriod 并且通过 runtime 的 StopContainer API 发起停容器的操作。
// killContainer kills a container through the following steps:
// * Run the pre-stop lifecycle hooks (if applicable).
// * Stop the container.
func (m *kubeGenericRuntimeManager) killContainer(ctx context.Context, pod *v1.Pod, containerID kubecontainer.ContainerID, containerName string, message string, reason containerKillReason, gracePeriodOverride *int64, ordering *terminationOrdering) error {
// 省略部分内容
klog.V(2).InfoS("Killing container with a grace period", "pod", klog.KObj(pod), "podUID", pod.UID,
"containerName", containerName, "containerID", containerID.String(), "gracePeriod", gracePeriod)
err := m.runtimeService.StopContainer(ctx, containerID.ID, gracePeriod)
if err != nil && !crierror.IsNotFound(err) {
klog.ErrorS(err, "Container termination failed with gracePeriod", "pod", klog.KObj(pod), "podUID", pod.UID,
"containerName", containerName, "containerID", containerID.String(), "gracePeriod", gracePeriod)
return err
}
klog.V(3).InfoS("Container exited normally", "pod", klog.KObj(pod), "podUID", pod.UID,
"containerName", containerName, "containerID", containerID.String())
// 省略部分内容
}
在这之前,会先执行 preStop hook:
func (m *kubeGenericRuntimeManager) killContainer(ctx context.Context, pod *v1.Pod, containerID kubecontainer.ContainerID, containerName string, message string, reason containerKillReason, gracePeriodOverride *int64, ordering *terminationOrdering) error {
// 省略部分内容
// Run the pre-stop lifecycle hooks if applicable and if there is enough time to run it
if containerSpec.Lifecycle != nil && containerSpec.Lifecycle.PreStop != nil && gracePeriod > 0 {
gracePeriod = gracePeriod - m.executePreStopHook(ctx, pod, containerID, containerSpec, gracePeriod)
}
// 省略部分内容
}
如果 preStop 执行超过了 gracePeriod,会超时退出,runtime service 调用 StopContainer 的最短超时时间为 2s。
const (
// A minimal shutdown window for avoiding unnecessary SIGKILLs
minimumGracePeriodInSeconds = 2
)
func (m *kubeGenericRuntimeManager) killContainer(ctx context.Context, pod *v1.Pod, containerID kubecontainer.ContainerID, containerName string, message string, reason containerKillReason, gracePeriodOverride *int64, ordering *terminationOrdering) error {
// 省略部分内容
// always give containers a minimal shutdown window to avoid unnecessary SIGKILLs
if gracePeriod < minimumGracePeriodInSeconds {
gracePeriod = minimumGracePeriodInSeconds
}
// 省略部分内容
}
Runtime service 调用 StopContainer 的实际操作是向容器的 1 号进程发送 SIGTERM 信号,如果超过 gracePeriod 容器还未退出,会向容器的 1 号进程发送 SIGKILL 信号。
不同 runtime 的实现可能有区别,以 containerd 为例:
// StopContainer stops a running container with a grace period (i.e., timeout).
func (c *criService) StopContainer(ctx context.Context, r *runtime.StopContainerRequest) (*runtime.StopContainerResponse, error) {
start := time.Now()
// Get container config from container store.
container, err := c.containerStore.Get(r.GetContainerId())
if err != nil {
if !errdefs.IsNotFound(err) {
returnnil, fmt.Errorf("an error occurred when try to find container %q: %w", r.GetContainerId(), err)
}
// The StopContainer RPC is idempotent, and must not return an error if
// the container has already been stopped. Ref:
// https://github.com/kubernetes/cri-api/blob/c20fa40/pkg/apis/runtime/v1/api.proto#L67-L68
return &runtime.StopContainerResponse{}, nil
}
if err := c.stopContainer(ctx, container, time.Duration(r.GetTimeout())*time.Second); err != nil {
returnnil, err
}
// 省略部分内容
return &runtime.StopContainerResponse{}, nil
}
// stopContainer stops a container based on the container metadata.
func (c *criService) stopContainer(ctx context.Context, container containerstore.Container, timeout time.Duration) error {
id := container.ID
// 省略部分内容
// We only need to kill the task. The event handler will Delete the
// task from containerd after it handles the Exited event.
if timeout > 0 {
stopSignal := "SIGTERM"
if container.StopSignal != "" {
stopSignal = container.StopSignal
} else {
// The image may have been deleted, and the `StopSignal` field is
// just introduced to handle that.
// However, for containers created before the `StopSignal` field is
// introduced, still try to get the stop signal from the image config.
// If the image has been deleted, logging an error and using the
// default SIGTERM is still better than returning error and leaving
// the container unstoppable. (See issue #990)
// TODO(random-liu): Remove this logic when containerd 1.2 is deprecated.
image, err := c.imageStore.Get(container.ImageRef)
if err != nil {
if !errdefs.IsNotFound(err) {
return fmt.Errorf("failed to get image %q: %w", container.ImageRef, err)
}
log.G(ctx).Warningf("Image %q not found, stop container with signal %q", container.ImageRef, stopSignal)
} else {
if image.ImageSpec.Config.StopSignal != "" {
stopSignal = image.ImageSpec.Config.StopSignal
}
}
}
sig, err := signal.ParseSignal(stopSignal)
if err != nil {
return fmt.Errorf("failed to parse stop signal %q: %w", stopSignal, err)
}
var sswt bool
if container.IsStopSignaledWithTimeout == nil {
log.G(ctx).Infof("unable to ensure stop signal %v was not sent twice to container %v", sig, id)
sswt = true
} else {
sswt = atomic.CompareAndSwapUint32(container.IsStopSignaledWithTimeout, 0, 1)
}
if sswt {
log.G(ctx).Infof("Stop container %q with signal %v", id, sig)
if err = task.Kill(ctx, sig); err != nil && !errdefs.IsNotFound(err) {
return fmt.Errorf("failed to stop container %q: %w", id, err)
}
} else {
log.G(ctx).Infof("Skipping the sending of signal %v to container %q because a prior stop with timeout>0 request already sent the signal", sig, id)
}
sigTermCtx, sigTermCtxCancel := context.WithTimeout(ctx, timeout)
defer sigTermCtxCancel()
err = c.waitContainerStop(sigTermCtx, container)
if err == nil {
// SIFTERM 成功,直接返回
// Container stopped on first signal no need for SIGKILL
returnnil
}
// If the parent context was cancelled or exceeded return immediately
if ctx.Err() != nil {
return ctx.Err()
}
// sigTermCtx was exceeded. Send SIGKILL
log.G(ctx).Debugf("Stop container %q with signal %v timed out", id, sig)
}
// SIFTERM 失败,尝试 SIGKILL
log.G(ctx).Infof("Kill container %q", id)
if err = task.Kill(ctx, syscall.SIGKILL); err != nil && !errdefs.IsNotFound(err) {
return fmt.Errorf("failed to kill container %q: %w", id, err)
}
// Wait for a fixed timeout until container stop is observed by event monitor.
err = c.waitContainerStop(ctx, container)
if err != nil {
return fmt.Errorf("an error occurs during waiting for container %q to be killed: %w", id, err)
}
returnnil
}
等所有 container 都被 kill 后,会调用 runtime manager 的 StopPodSandbox 接口来停掉 PodSandbox,该过程和 StopContainer 类似,只是在停掉 pause 容器后,会调用 NRI 接口清理网络相关资源。
func (c *criService) stopPodSandbox(ctx context.Context, sandbox sandboxstore.Sandbox) error {
// Use the full sandbox id.
id := sandbox.ID
//省略部分内容
if err := c.cleanupSandboxFiles(id, sandbox.Config); err != nil {
return fmt.Errorf("failed to cleanup sandbox files: %w", err)
}
// Only stop sandbox container when it's running or unknown.
state := sandbox.Status.Get().State
if state == sandboxstore.StateReady || state == sandboxstore.StateUnknown {
if err := c.stopSandboxContainer(ctx, sandbox); err != nil {
return fmt.Errorf("failed to stop sandbox container %q in %q state: %w", id, state, err)
}
}
sandboxRuntimeStopTimer.WithValues(sandbox.RuntimeHandler).UpdateSince(stop)
// Teardown network for sandbox.
if sandbox.NetNS != nil {
netStop := time.Now()
// Use empty netns path if netns is not available. This is defined in:
// https://github.com/containernetworking/cni/blob/v0.7.0-alpha1/SPEC.md
if closed, err := sandbox.NetNS.Closed(); err != nil {
return fmt.Errorf("failed to check network namespace closed: %w", err)
} elseif closed {
sandbox.NetNSPath = ""
}
if sandbox.CNIResult != nil {
if err := c.teardownPodNetwork(ctx, sandbox); err != nil {
return fmt.Errorf("failed to destroy network for sandbox %q: %w", id, err)
}
}
if err := sandbox.NetNS.Remove(); err != nil {
return fmt.Errorf("failed to remove network namespace for sandbox %q: %w", id, err)
}
sandboxDeleteNetwork.UpdateSince(netStop)
}
log.G(ctx).Infof("TearDown network for sandbox %q successfully", id)
returnnil
}
等所有 container 和 sandbox 都被 kill 后,从 probeManager 移除 Pod:
func (kl *Kubelet) SyncTerminatingPod(_ context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, gracePeriod *int64, podStatusFn func(*v1.PodStatus)) error {
// 省略部分内容
// Once the containers are stopped, we can stop probing for liveness and readiness.
// TODO: once a pod is terminal, certain probes (liveness exec) could be stopped immediately after
// the detection of a container shutdown or (for readiness) after the first failure. Tracked as
// https://github.com/kubernetes/kubernetes/issues/107894 although may not be worth optimizing.
kl.probeManager.RemovePod(pod)
// 省略部分内容
}
如果使用了 DRA 的动态资源分配特性,还会调用 UnprepareResources API 取消资源分配。
最后,再次触发kl.generateAPIPodStatus(pod, stoppedPodStatus, true)以更新 Pod 状态。
回到处理 update 的流程,接下来会通过completeTerminating方法更新 status.terminatedAt 为当前时间,这是让 Pod 进入 SyncTerminatedPod 的关键信息。
func (p *podWorkers) completeTerminating(podUID types.UID) {
p.podLock.Lock()
defer p.podLock.Unlock()
klog.V(4).InfoS("Pod terminated all containers successfully", "podUID", podUID)
status, ok := p.podSyncStatuses[podUID]
if !ok {
return
}
// update the status of the pod
if status.terminatingAt.IsZero() {
klog.V(4).InfoS("Pod worker was terminated but did not have terminatingAt set, likely programmer error", "podUID", podUID)
}
status.terminatedAt = p.clock.Now()
for _, ch := range status.notifyPostTerminating {
close(ch)
}
status.notifyPostTerminating = nil
status.statusPostTerminating = nil
// the pod has now transitioned to terminated and we want to run syncTerminatedPod
// as soon as possible, so if no update is already waiting queue a synthetic update
p.requeueLastPodUpdate(podUID, status)
}
和 SyncTerminatingPod 不同,走到 SyncTerminatingRuntimePod 逻辑是因为 Pod 配置已经不存在,没有完整的 Pod Spec 信息,只有运行时 Pod 信息。
// SyncTerminatingRuntimePod is expected to terminate running containers in a pod that we have no
// configuration for. Once this method returns without error, any remaining local state can be safely
// cleaned up by background processes in each subsystem. Unlike syncTerminatingPod, we lack
// knowledge of the full pod spec and so cannot perform lifecycle related operations, only ensure
// that the remnant of the running pod is terminated and allow garbage collection to proceed. We do
// not update the status of the pod because with the source of configuration removed, we have no
// place to send that status.
该方法和 SyncTerminatingPod 类似,只不过处理流程更简单,直接调用kl.killPod:
func (kl *Kubelet) SyncTerminatingRuntimePod(_ context.Context, runningPod *kubecontainer.Pod) error {
// TODO(#113606): connect this with the incoming context parameter, which comes from the pod worker.
// Currently, using that context causes test failures.
ctx := context.Background()
pod := runningPod.ToAPIPod()
klog.V(4).InfoS("SyncTerminatingRuntimePod enter", "pod", klog.KObj(pod), "podUID", pod.UID)
defer klog.V(4).InfoS("SyncTerminatingRuntimePod exit", "pod", klog.KObj(pod), "podUID", pod.UID)
// we kill the pod directly since we have lost all other information about the pod.
klog.V(4).InfoS("Orphaned running pod terminating without grace period", "pod", klog.KObj(pod), "podUID", pod.UID)
// TODO: this should probably be zero, to bypass any waiting (needs fixes in container runtime)
gracePeriod := int64(1)
if err := kl.killPod(ctx, pod, *runningPod, &gracePeriod); err != nil {
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToKillPod, "error killing pod: %v", err)
// there was an error killing the pod, so we return that error directly
utilruntime.HandleError(err)
return err
}
klog.V(4).InfoS("Pod termination stopped all running orphaned containers", "pod", klog.KObj(pod), "podUID", pod.UID)
returnnil
}
当 WorkType 为 TerminatedPod 时,进入 SyncTerminatedPod 的处理流程。status.terminatedAt 不为 0 时,WorkType 为 TerminatedPod:
func (s *podSyncStatus) IsTerminated() bool { return !s.terminatedAt.IsZero() }
// WorkType returns this pods' current state of the pod in pod lifecycle state machine.
func (s *podSyncStatus) WorkType() PodWorkerState {
if s.IsTerminated() {
return TerminatedPod
}
if s.IsTerminationRequested() {
return TerminatingPod
}
return SyncPod
}
首先,通过kl.generateAPIPodStatus(pod, podStatus, true)更新 Pod 的状态信息。
接下来,等待 Pod 挂载的所有存储卷卸载完成:
func (kl *Kubelet) SyncTerminatedPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus) error {
//省略部分内容
// volumes are unmounted after the pod worker reports ShouldPodRuntimeBeRemoved (which is satisfied
// before syncTerminatedPod is invoked)
if err := kl.volumeManager.WaitForUnmount(ctx, pod); err != nil {
return err
}
klog.V(4).InfoS("Pod termination unmounted volumes", "pod", klog.KObj(pod), "podUID", pod.UID)
//省略部分内容
}
这里是通过 volume manager 的 actualStateOfWorld 进行判断:
func (vm *volumeManager) WaitForUnmount(ctx context.Context, pod *v1.Pod) error {
if pod == nil {
returnnil
}
klog.V(3).InfoS("Waiting for volumes to unmount for pod", "pod", klog.KObj(pod))
uniquePodName := util.GetUniquePodName(pod)
vm.desiredStateOfWorldPopulator.ReprocessPod(uniquePodName)
err := wait.PollUntilContextTimeout(
ctx,
podAttachAndMountRetryInterval,
podAttachAndMountTimeout,
true,
vm.verifyVolumesUnmountedFunc(uniquePodName))
if err != nil {
var mountedVolumes []string
for _, v := range vm.actualStateOfWorld.GetMountedVolumesForPod(uniquePodName) {
mountedVolumes = append(mountedVolumes, v.OuterVolumeSpecName)
}
iflen(mountedVolumes) == 0 {
returnnil
}
slices.Sort(mountedVolumes)
return fmt.Errorf(
"mounted volumes=%v: %w",
mountedVolumes,
err)
}
klog.V(3).InfoS("All volumes are unmounted for pod", "pod", klog.KObj(pod))
returnnil
}
检查逻辑由 verifyVolumesUnmountedFunc 生成:
// verifyVolumesUnmountedFunc returns a method that is true when there are no mounted volumes for this
// pod.
func (vm *volumeManager) verifyVolumesUnmountedFunc(podName types.UniquePodName) wait.ConditionWithContextFunc {
returnfunc(_ context.Context) (done bool, err error) {
if errs := vm.desiredStateOfWorld.PopPodErrors(podName); len(errs) > 0 {
returntrue, errors.New(strings.Join(errs, "; "))
}
return !vm.actualStateOfWorld.PodHasMountedVolumes(podName), nil
}
}
func (asw *actualStateOfWorld) PodHasMountedVolumes(podName volumetypes.UniquePodName) bool {
asw.RLock()
defer asw.RUnlock()
for _, volumeObj := range asw.attachedVolumes {
if podObj, hasPod := volumeObj.mountedPods[podName]; hasPod {
if podObj.volumeMountStateForPod == operationexecutor.VolumeMounted {
returntrue
}
}
}
returnfalse
}
除了确保 actualStateOfWorld 已经没有 Pod 使用的 volume 处于 Mounted 状态,还会判断节点上 volume 的 Pod 挂载点目录是否已经卸载。
func (kl *Kubelet) SyncTerminatedPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus) error {
//省略部分内容
// This waiting loop relies on the background cleanup which starts after pod workers respond
// true for ShouldPodRuntimeBeRemoved, which happens after `SyncTerminatingPod` is completed.
if err := wait.PollUntilContextCancel(ctx, 100*time.Millisecond, true, func(ctx context.Context) (bool, error) {
volumesExist := kl.podVolumesExist(pod.UID)
if volumesExist {
klog.V(3).InfoS("Pod is terminated, but some volumes have not been cleaned up", "pod", klog.KObj(pod), "podUID", pod.UID)
}
return !volumesExist, nil
}); err != nil {
return err
}
klog.V(3).InfoS("Pod termination cleaned up volume paths", "pod", klog.KObj(pod), "podUID", pod.UID)
//省略部分内容
}
和通过 actualStateOfWorld 判断不同,podVolumesExist通过GetPossiblyMountedVolumesForPod检查时包含了处于 Uncertain 状态的存储卷:
“在 volume manager 中,volume 处于 Uncertain 状态是一种中间状态。例如,可能已经调用 CSI 的 NodePublishVolume,但是还未返回成功,这个时候无法确定是否已经挂载到节点的 Pod 存储卷目录。或者已经调用 CSI 的 NodeUnpublishVolume,但是未返回成功,无法确定 Pod 存储卷目录是否已经卸载完成。
// podVolumesExist checks with the volume manager and returns true any of the
// pods for the specified volume are mounted or are uncertain.
func (kl *Kubelet) podVolumesExist(podUID types.UID) bool {
if mountedVolumes :=
kl.volumeManager.GetPossiblyMountedVolumesForPod(
volumetypes.UniquePodName(podUID)); len(mountedVolumes) > 0 {
returntrue
}
// TODO: This checks pod volume paths and whether they are mounted. If checking returns error, podVolumesExist will return true
// which means we consider volumes might exist and requires further checking.
// There are some volume plugins such as flexvolume might not have mounts. See issue #61229
volumePaths, err := kl.getMountedVolumePathListFromDisk(podUID)
if err != nil {
klog.ErrorS(err, "Pod found, but error occurred during checking mounted volumes from disk", "podUID", podUID)
returntrue
}
iflen(volumePaths) > 0 {
klog.V(4).InfoS("Pod found, but volumes are still mounted on disk", "podUID", podUID, "paths", volumePaths)
returntrue
}
returnfalse
}
除此之外,还会检查 Pod 存储卷的目录是否为挂载点,从而确保 Pod 从节点上删除之前,没有泄漏的挂载点。
func (kl *Kubelet) getMountedVolumePathListFromDisk(podUID types.UID) ([]string, error) {
mountedVolumes := []string{}
volumePaths, err := kl.getPodVolumePathListFromDisk(podUID)
if err != nil {
return mountedVolumes, err
}
// Only use IsLikelyNotMountPoint to check might not cover all cases. For CSI volumes that
// either: 1) don't mount or 2) bind mount in the rootfs, the mount check will not work as expected.
// We plan to remove this mountpoint check as a condition before deleting pods since it is
// not reliable and the condition might be different for different types of volumes. But it requires
// a reliable way to clean up unused volume dir to avoid problems during pod deletion. See discussion in issue #74650
for _, volumePath := range volumePaths {
isNotMount, err := kl.mounter.IsLikelyNotMountPoint(volumePath)
if err != nil {
return mountedVolumes, fmt.Errorf("fail to check mount point %q: %v", volumePath, err)
}
if !isNotMount {
mountedVolumes = append(mountedVolumes, volumePath)
}
}
return mountedVolumes, nil
}
Pod 使用的存储卷卸载完成。接下来,从 secret manager 和 configMap manager 移除 Pod,避免后续再同步 Pod 使用的 secret 和 configMap 内容到本地文件。
func (kl *Kubelet) SyncTerminatedPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus) error {
//省略部分内容
// After volume unmount is complete, let the secret and configmap managers know we're done with this pod
if kl.secretManager != nil {
kl.secretManager.UnregisterPod(pod)
}
if kl.configMapManager != nil {
kl.configMapManager.UnregisterPod(pod)
}
//省略部分内容
}
接下来,删除 Pod 的 cgroupfs 目录:
func (kl *Kubelet) SyncTerminatedPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus) error {
//省略部分内容
// remove any cgroups in the hierarchy for pods that are no longer running.
if kl.cgroupsPerQOS {
pcm := kl.containerManager.NewPodContainerManager()
name, _ := pcm.GetPodContainerName(pod)
if err := pcm.Destroy(name); err != nil {
return err
}
klog.V(4).InfoS("Pod termination removed cgroups", "pod", klog.KObj(pod), "podUID", pod.UID)
}
//省略部分内容
}
实际处理逻辑由 libcontainer 的 cgroupfs 包完成:
// Destroy destroys the pod container cgroup paths
func (m *podContainerManagerImpl) Destroy(podCgroup CgroupName) error {
// Try killing all the processes attached to the pod cgroup
if err := m.tryKillingCgroupProcesses(podCgroup); err != nil {
klog.InfoS("Failed to kill all the processes attached to cgroup", "cgroupName", podCgroup, "err", err)
return fmt.Errorf("failed to kill all the processes attached to the %v cgroups : %v", podCgroup, err)
}
// Now its safe to remove the pod's cgroup
containerConfig := &CgroupConfig{
Name: podCgroup,
ResourceParameters: &ResourceConfig{},
}
if err := m.cgroupManager.Destroy(containerConfig); err != nil {
klog.InfoS("Failed to delete cgroup paths", "cgroupName", podCgroup, "err", err)
return fmt.Errorf("failed to delete cgroup paths for %v : %v", podCgroup, err)
}
returnnil
}
如果启用了 user namespace 特性,还会清理 user namespace 用到的 mappings 文件:
func (kl *Kubelet) SyncTerminatedPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus) error {
//省略部分内容
kl.usernsManager.Release(pod.UID)
//省略部分内容
}
最后,通过 statusManager.TerminatePod 标记 Pod 最终状态,并且触发一次 update:
func (kl *Kubelet) SyncTerminatedPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus) error {
//省略部分内容
kl.statusManager.TerminatePod(pod)
//省略部分内容
}
TerminatePod 在删除前将 Pod phase 转换为 terminal(Failed 或者 Succeed):
// TerminatePod ensures that the status of containers is properly defaulted at the end of the pod
// lifecycle. As the Kubelet must reconcile with the container runtime to observe container status
// there is always the possibility we are unable to retrieve one or more container statuses due to
// garbage collection, admin action, or loss of temporary data on a restart. This method ensures
// that any absent container status is treated as a failure so that we do not incorrectly describe
// the pod as successful. If we have not yet initialized the pod in the presence of init containers,
// the init container failure status is sufficient to describe the pod as failing, and we do not need
// to override waiting containers (unless there is evidence the pod previously started those containers).
// It also makes sure that pods are transitioned to a terminal phase (Failed or Succeeded) before
// their deletion.
TerminatePod 通过updateStatusInternal更新 status:
func (m *manager) TerminatePod(pod *v1.Pod) {
//省略部分内容
m.updateStatusInternal(pod, status, true, true)
}
这里会更新 status,并且通过 channel 触发 status manager 处理:
func (m *manager) updateStatusInternal(pod *v1.Pod, status v1.PodStatus, forceUpdate, podIsFinished bool) {
//省略部分内容
newStatus := versionedPodStatus{
status: status,
version: cachedStatus.version + 1,
podName: pod.Name,
podNamespace: pod.Namespace,
podIsFinished: podIsFinished,
}
// Multiple status updates can be generated before we update the API server,
// so we track the time from the first status update until we retire it to
// the API.
if cachedStatus.at.IsZero() {
newStatus.at = time.Now()
} else {
newStatus.at = cachedStatus.at
}
m.podStatuses[pod.UID] = newStatus
select {
case m.podStatusChannel <- struct{}{}:
default:
// there's already a status update pending
}
}
处理 channel 的地方:
func (m *manager) Start() {
// 省略部分内容
// syncPod and syncBatch share the same go routine to avoid sync races.
go wait.Forever(func() {
for {
select {
case <-m.podStatusChannel:
klog.V(4).InfoS("Syncing updated statuses")
m.syncBatch(false)
case <-syncTicker:
klog.V(4).InfoS("Syncing all statuses")
m.syncBatch(true)
}
}
}, 0)
}
syncBatch 中遍历 update status 并且通过 syncPod 处理单个 status:
func (m *manager) syncBatch(all bool) int {
type podSync struct {
podUID types.UID
statusUID kubetypes.MirrorPodUID
status versionedPodStatus
}
//省略部分内容
for _, update := range updatedStatuses {
klog.V(5).InfoS("Sync pod status", "podUID", update.podUID, "statusUID", update.statusUID, "version", update.status.version)
m.syncPod(update.podUID, update.status)
}
returnlen(updatedStatuses)
}
除了通过 Patch 更新 Pod 的状态,还会通过 canBeDeleted 判断是否可以删除 Pod:
func (m *manager) syncPod(uid types.UID, status versionedPodStatus) {
// TODO: make me easier to express from client code
pod, err := m.kubeClient.CoreV1().Pods(status.podNamespace).Get(context.TODO(), status.podName, metav1.GetOptions{})
if errors.IsNotFound(err) {
klog.V(3).InfoS("Pod does not exist on the server",
"podUID", uid,
"pod", klog.KRef(status.podNamespace, status.podName))
// If the Pod is deleted the status will be cleared in
// RemoveOrphanedStatuses, so we just ignore the update here.
return
}
if err != nil {
klog.InfoS("Failed to get status for pod",
"podUID", uid,
"pod", klog.KRef(status.podNamespace, status.podName),
"err", err)
return
}
// 省略部分内容
mergedStatus := mergePodStatus(pod.Status, status.status, m.podDeletionSafety.PodCouldHaveRunningContainers(pod))
newPod, patchBytes, unchanged, err := statusutil.PatchPodStatus(context.TODO(), m.kubeClient, pod.Namespace, pod.Name, pod.UID, pod.Status, mergedStatus)
klog.V(3).InfoS("Patch status for pod", "pod", klog.KObj(pod), "podUID", uid, "patch", string(patchBytes))
// 省略部分内容
// We don't handle graceful deletion of mirror pods.
if m.canBeDeleted(pod, status.status, status.podIsFinished) {
deleteOptions := metav1.DeleteOptions{
GracePeriodSeconds: new(int64),
// Use the pod UID as the precondition for deletion to prevent deleting a
// newly created pod with the same name and namespace.
Preconditions: metav1.NewUIDPreconditions(string(pod.UID)),
}
err = m.kubeClient.CoreV1().Pods(pod.Namespace).Delete(context.TODO(), pod.Name, deleteOptions)
if err != nil {
klog.InfoS("Failed to delete status for pod", "pod", klog.KObj(pod), "err", err)
return
}
klog.V(3).InfoS("Pod fully terminated and removed from etcd", "pod", klog.KObj(pod))
m.deletePodStatus(uid)
}
}
canBeDeleted 的逻辑如下:
func (m *manager) canBeDeleted(pod *v1.Pod, status v1.PodStatus, podIsFinished bool) bool {
if pod.DeletionTimestamp == nil || kubetypes.IsMirrorPod(pod) {
returnfalse
}
// Delay deletion of pods until the phase is terminal, based on pod.Status
// which comes from pod manager.
if !podutil.IsPodPhaseTerminal(pod.Status.Phase) {
// For debugging purposes we also log the kubelet's local phase, when the deletion is delayed.
klog.V(3).InfoS("Delaying pod deletion as the phase is non-terminal", "phase", pod.Status.Phase, "localPhase", status.Phase, "pod", klog.KObj(pod), "podUID", pod.UID)
returnfalse
}
// If this is an update completing pod termination then we know the pod termination is finished.
if podIsFinished {
klog.V(3).InfoS("The pod termination is finished as SyncTerminatedPod completes its execution", "phase", pod.Status.Phase, "localPhase", status.Phase, "pod", klog.KObj(pod), "podUID", pod.UID)
returntrue
}
returnfalse
}
其中,podIsFinished 是用的 status.podIsFinished,这是在通过 channel 触发更新之前生成 status 的时候赋值的,这个值来自updateStatusInternal的参数,在TerminatePod方法中调用该方法时传递的 podIsFinished 值是 true。因此,只要 Pod 的 DeletionTimestamp 不为空并且不是 mirror Pod,同时 Pod 的 Phase 不是 Failed 和 Succeed,就会删除 Pod,同时从 status manager 删除对应 Pod 的 status 信息。