
openkruise 面向自动化场景的 Kubernetes workload 扩展 controller,它是一组 controller,可在应用程序工作负载管理上扩展和补充 Kubernetes 核心控制器。cloneset 在很多方面上借鉴了 statefulset ,只是没有 statefulset 的 ordinal 序号。
CloneSet 控制器提供了高效管理无状态应用的能力,一个简单的 CloneSet yaml 文件如下:
apiVersion: apps.kruise.io/v1alpha1
kind: CloneSet
metadata:
labels:
app: sample
name: sample
spec:
replicas: 5
selector:
matchLabels:
app: sample
template:
metadata:
labels:
app: sample
spec:
containers:
- name: nginx
image: nginx:alpine
spec.template 中定义了当前 CloneSet 中最新的 Pod 模板。控制器会为每次更新过的 spec.template 计算一个 revision hash 值。在运行过程中,还会额外 为 cloneset 管理的pod 加上 label=controller-revision-hash 标记 pod 所属的 revision。
运行后,describe cloneset 示例:
Name: sample
Namespace: default
Labels: app=sample
Annotations: API Version: apps.kruise.io/v1alpha1
Kind: CloneSet
Metadata:
Creation Timestamp: 2020-08-17T09:39:08Z
Generation: 1
Resource Version: 307518
Self Link: /apis/apps.kruise.io/v1alpha1/namespaces/default/clonesets/sample
UID: 65534592-a998-4374-a2c7-56eeb1dd273b
Spec:
...
Status:
Available Replicas: 5
Collision Count: 0
Label Selector: app=sample
Observed Generation: 1
Ready Replicas: 5
Replicas: 5
Update Revision: sample-5cdbb7d879
Updated Ready Replicas: 5
Updated Replicas: 5
原地升级 nginx pod 的 status 示例
Labels: app=sample
apps.kruise.io/cloneset-instance-id=75v7q
controller-revision-hash=sample-549647c4b4
Annotations: inplace-update-state:
{"revision":"sample-549647c4b4","updateTimestamp":"2020-08-14T11:04:25Z","lastContainerStatuses":...}
Controlled By: CloneSet/sample
Status: Running // Pending/Running/Succeeded/Failed/Unknown
Readiness Gates:
Type Status
InPlaceUpdateReady True // 原地升级添加的自定义Readiness Gates
Conditions:
Type Status
InPlaceUpdateReady True
Initialized True // 所有的 Init 容器 都已成功启动
Ready False // Pod 可以为请求提供服务,并且应该被添加到对应服务的负载均衡池中
ContainersReady False // Pod 中所有容器都已就绪
PodScheduled True // Pod 已经被调度到某节点
CloneSet 可以对标原生的 Deployment,但 CloneSet 提供了很多增强功能,比如扩缩容时删除特定 pod,尤其提供了丰富的升级策略。
spec:
// 升级功能
updateStrategy:
type: // 升级方式,默认为重建升级ReCreate,支持尽可能原地升级InPlaceIfPossible 和 只能原地升级InPlaceOnly
inPlaceUpdateStrategy: // 原地升级策略,比如graceful period 等
partition: // Partition 的语义是 保留旧版本 Pod 的数量,默认为 0。 如果在发布过程中设置了 partition,则控制器只会将 (replicas - partition) 数量的 Pod 更新到最新版本。
MaxUnavailable: // 最大不可用的Pod数量,是一个绝对值或百分比
MaxSurge: // 最大弹性数量,即最多能扩出来超过 replicas 的 Pod 数量,是一个绝对值或百分比。
priorityStrategy: // 优先级策略,升级顺序相关
...
scatterStrategy: // 打散策略,升级顺序相关
paused: // 为true时发布暂停
PreUpdate: // 升级前钩子函数
PostUpdate: // 升级后钩子函数
// 扩缩容功能
scaleStrategy:
podsToDelete: // 允许用户在缩小 replicas 数量的同时,指定想要删除的 Pod 名字
CloneSet status 中的字段说明:
cloneset 作者提到:cloneset partition 其实是继承了原生 statefulset 的 partition 理念,只是没有 statefulset 的 ordinal 序号。Partition 的语义是 保留旧版本 Pod 的数量,笔者曾觉得有点违反直觉。但如果 partition 来表示新版本数量的话,每次全量发布、扩容时都应同步设置 partition 的值(与 replicas 保持一致),partition 的默认值就不能是 0 或不填了。
在 kubebuilder 把 Controller 控制器模型 的代码 都自动生成之后,不同 Controller 之间的逻辑差异便只剩下 Reconcile 了
背景知识
// kruise/pkg/controller/cloneset/cloneset_controller.go
func (r *ReconcileCloneSet) Reconcile(request reconcile.Request) (reconcile.Result, error) {
// ReconcileCloneSet.reconcileFunc = ReconcileCloneSet.doReconcile
return r.reconcileFunc(request)
}
func (r *ReconcileCloneSet) doReconcile(request reconcile.Request) (res reconcile.Result, retErr error) {
startTime := time.Now()
// Fetch the CloneSet instance
instance := &appsv1alpha1.CloneSet{}
err := r.Get(context.TODO(), request.NamespacedName, instance)
coreControl := clonesetcore.New(instance)
selector, err := metav1.LabelSelectorAsSelector(instance.Spec.Selector)
// list all active Pods and PVCs belongs to cs
filteredPods, filteredPVCs, err := r.getOwnedResource(instance)
//release Pods ownerRef
filteredPods, err = r.claimPods(instance, filteredPods)
// list all revisions and sort them
revisions, err := r.controllerHistory.ListControllerRevisions(instance, selector)
// 排序规则 byRevision.Less 优先根据 CreationTimestamp排序 其次根据Name
history.SortControllerRevisions(revisions)
// get the current, and update revisions
currentRevision, updateRevision, collisionCount, err := r.getActiveRevisions(instance, revisions, clonesetutils.GetPodsRevisions(filteredPods))
newStatus := appsv1alpha1.CloneSetStatus{
ObservedGeneration: instance.Generation,
CurrentRevision: currentRevision.Name,
UpdateRevision: updateRevision.Name,
CollisionCount: new(int32),
LabelSelector: selector.String(),
}
*newStatus.CollisionCount = collisionCount
// scale and update pods
delayDuration, syncErr := r.syncCloneSet(instance, &newStatus, currentRevision, updateRevision, revisions, filteredPods, filteredPVCs)
// update new status
if err = r.statusUpdater.UpdateCloneSetStatus(instance, &newStatus, filteredPods); err != nil {
return reconcile.Result{}, err
}
if err = r.truncatePodsToDelete(instance, filteredPods); err != nil {
klog.Warningf("Failed to truncate podsToDelete for %s: %v", request, err)
}
if err = r.truncateHistory(instance, filteredPods, revisions, currentRevision, updateRevision); err != nil {
klog.Errorf("Failed to truncate history for %s: %v", request, err)
}
...
return reconcile.Result{RequeueAfter: delayDuration}, syncErr
}
syncCloneSet 根据 cloneSet 期望状态( 由 replicas 以及 updateStrategy 描述 )以及 pod 的真实状态, 执行 scale 或 update 逻辑
// kruise/pkg/controller/cloneset/cloneset_controller.go
func (r *ReconcileCloneSet) syncCloneSet(
instance *appsv1alpha1.CloneSet, newStatus *appsv1alpha1.CloneSetStatus,
currentRevision, updateRevision *apps.ControllerRevision, revisions []*apps.ControllerRevision,
filteredPods []*v1.Pod, filteredPVCs []*v1.PersistentVolumeClaim,
) (time.Duration, error) {
// 根据 ControllerRevision 还原CloneSet
currentSet, err := r.revisionControl.ApplyRevision(instance, currentRevision)
updateSet, err := r.revisionControl.ApplyRevision(instance, updateRevision)
scaling, podsScaleErr = r.scaleControl.Manage(currentSet, updateSet, currentRevision.Name, updateRevision.Name, filteredPods, filteredPVCs)
if scaling {
return delayDuration, podsScaleErr
}
delayDuration, podsUpdateErr = r.updateControl.Manage(updateSet, updateRevision, revisions, filteredPods, filteredPVCs)
return delayDuration, err
}
扩容逻辑:如果发布的时候设置了 maxSurge,控制器会先多扩出来 maxSurge 数量的 Pod(此时 Pod 总数为 (replicas+maxSurge)),然后再开始发布存量的 Pod。然后,当新版本 Pod 数量已经满足 replicas - partition 要求之后,控制器会再把多余的 maxSurge 数量的 Pod 删除掉,保证最终的 Pod 数量符合 replicas。此外,maxSurge 还受 升级方式(type)的影响:maxSurge 不允许配合 InPlaceOnly 更新模式使用(可以认为此时 maxSurge=0?)。另外,如果是与 InPlaceIfPossible 策略配合使用,控制器会先扩出来 maxSurge 数量的 Pod,再对存量 Pod 做原地升级。
// kruise/pkg/controller/cloneset/scale/cloneset_scale.go
func (r *realControl) Manage(
currentCS, updateCS *appsv1alpha1.CloneSet,
currentRevision, updateRevision string,
pods []*v1.Pod, pvcs []*v1.PersistentVolumeClaim,
) (bool, error) {
// 获取 podsToDelete 中指定的pod
if podsToDelete := getPodsToDelete(updateCS, pods); len(podsToDelete) > 0 {
return r.deletePods(updateCS, podsToDelete, pvcs)
}
// 符合 updateRevision 的pod 为 updatedPods ,不符合的为notUpdatedPods
updatedPods, notUpdatedPods := clonesetutils.SplitPodsByRevision(pods, updateRevision)
// 一个CloneSet 最多允许(replicas + MaxSurge)个pod 存在,如果实际pod 小于这个数量(diff<0)则需要创建pod,否则(diff>0) 删除pod。
// diff 标记pod 总量是否ready;currentRevDiff 表示 currentRev 总量是否ready
diff, currentRevDiff := calculateDiffs(updateCS, updateRevision == currentRevision, len(pods), len(notUpdatedPods))
if diff < 0 {
// total number of this creation
expectedCreations := diff * -1
// lack number of current version
expectedCurrentCreations := 0
if currentRevDiff < 0 {
expectedCurrentCreations = currentRevDiff * -1
}
// available instance-id come from free pvc
availableIDs := getOrGenAvailableIDs(expectedCreations, pods, pvcs)
// pod 数量不足,就要创建expectedCreations 个pod,创建时需指明 currentRevision和updateRevision 的pod 分别创建几个
return r.createPods(expectedCreations, expectedCurrentCreations,
currentCS, updateCS, currentRevision, updateRevision, availableIDs.List(), existingPVCNames)
} else if diff > 0 {
// pod 数量较多 则选择 多余的 新老Revision pod 删除
podsToDelete := choosePodsToDelete(diff, currentRevDiff, notUpdatedPods, updatedPods)
return r.deletePods(updateCS, podsToDelete, pvcs)
}
// 如果diff = 0 什么都不做
return false, nil
}
更新逻辑
// 处理 升级间隔,计算真正需要 更新的pod
// kruise/pkg/controller/cloneset/update/cloneset_update.go
func (c *realControl) Manage(cs *appsv1alpha1.CloneSet,
updateRevision *apps.ControllerRevision, revisions []*apps.ControllerRevision,
pods []*v1.Pod, pvcs []*v1.PersistentVolumeClaim,
) (time.Duration, error) {
// 1. find currently updated and not-ready count and all pods waiting to update
var waitUpdateIndexes []int
for i := range pods {
// 支持 在pod 上设置一些注解,控制升级间隔,比如inplace-update-grace,如果距离上一次pod 的grace period 还未到,则先放弃本次升级
// 如果pod 的revision 不是最新的revision ,则加入到waitUpdateIndexes
if clonesetutils.GetPodRevision(pods[i]) != updateRevision.Name {
waitUpdateIndexes = append(waitUpdateIndexes, i)
}
}
// 2. sort all pods waiting to update 排序规则ActivePods.Less , 越新的 pod 越靠前
// 3. 根据 replicas/partition/MaxSurge/maxUnavailable 以及pod Status(比如not ready)等 calculate max count of pods can update
needToUpdateCount := calculateUpdateCount(coreControl, cs.Spec.UpdateStrategy, cs.Spec.MinReadySeconds, int(*cs.Spec.Replicas), waitUpdateIndexes, pods)
if needToUpdateCount < len(waitUpdateIndexes) {
waitUpdateIndexes = waitUpdateIndexes[:needToUpdateCount]
}
// 4. update pods
for _, idx := range waitUpdateIndexes {
pod := pods[idx]
if duration, err := c.updatePod(cs, coreControl, updateRevision, revisions, pod, pvcs); err != nil {...}
}
return requeueDuration.Get(), nil
}
// 根据CloneSet 升级策略,执行升级逻辑
func (c *realControl) updatePod(cs *appsv1alpha1.CloneSet, coreControl clonesetcore.Control,
updateRevision *apps.ControllerRevision, revisions []*apps.ControllerRevision,
pod *v1.Pod, pvcs []*v1.PersistentVolumeClaim,
) (time.Duration, error) {
// 如果clone set 升级策略为 (尽量)原地升级,则进入原地升级流程
if cs.Spec.UpdateStrategy.Type == appsv1alpha1.InPlaceIfPossibleCloneSetUpdateStrategyType ||
cs.Spec.UpdateStrategy.Type == appsv1alpha1.InPlaceOnlyCloneSetUpdateStrategyType {
var oldRevision *apps.ControllerRevision
for _, r := range revisions {
if r.Name == clonesetutils.GetPodRevision(pod) {
oldRevision = r
break
}
}
res := c.inplaceControl.Update(pod, oldRevision, updateRevision, coreControl.GetUpdateOptions())
...
return ...
}
// 如果不是原地升级,则本次Reconcile 删除pod,待下次Reconcile 扩容时创建pod
if err := c.Delete(context.TODO(), pod); err != nil {
return 0, err
}
// handle pvc
return 0, nil
}
虽然 spec.updateStrategy.partition 指定了旧版的数量。但 update 逻辑的主要目的是 更新 (replicas - partition) 个 updateRevision 实例。如果连续多次灰度发布,则旧版 可能存在多个 revision(也就是说不是最新的 revision 都是旧版,旧版不都是某一个 revision),整个 cloneset 可能存在 2 个以上 revision 的 pod。这与直觉上的 多版本 pod 管理 还是不一样的
如何为 Kubernetes 实现原地升级? 如何在 Kubernetes 中实现容器原地升级一个 Pod 中可能包含了主业务容器,还有不可剥离的依赖业务容器,以及 SideCar 组件容器等,如果因为要更新其中一个 SideCar Container 而继续按照 ReCreate Pod 的方式进行整个 Pod 的重建,那负担还是很大的。更新一个轻量的 SideCar 却导致了分钟级的单个 Pod 的重建过程,因此,我们迫切希望能实现,只升级 Pod 中的某个 Container,而不用重建整个 Pod。
Kubernetes 把容器原地升级的能力只做在 Kubelet 这一层,并没有暴露在 Deployment、StatefulSet 等 Controller 中直接提供给用户,原因很简单,还是建议大家把 Pod 作为完整的部署单元。为了实现容器原地升级,我们更改 Pod.Spec 中对应容器的 Image,就会生成 kubetypes.UPDATE 类型的事件,kubelet 将容器优雅终止。旧的容器被杀死之后,kubelet 启动新的容器,如此即完成 Pod 不重建的前提下实现容器的原地升级。
不过这样可能存在的几个风险:
// kruise/pkg/util/inplaceupdate/inplace_utils.go
func (c *realControl) Update(pod *v1.Pod, oldRevision, newRevision *apps.ControllerRevision, opts *UpdateOptions) UpdateResult {
// 1. calculate inplace update spec
var spec *UpdateSpec
if opts == nil || opts.CustomizeSpecCalculate == nil {
// 只对更新了 spec.containers[x].image 的pod 进行原地升级
spec = calculateInPlaceUpdateSpec(oldRevision, newRevision)
}
if opts != nil && opts.GracePeriodSeconds > 0 {
spec.GraceSeconds = opts.GracePeriodSeconds
}
// 2. update condition for pod with readiness-gate
if containsReadinessGate(pod) {
newCondition := v1.PodCondition{
Type: appsv1alpha1.InPlaceUpdateReady,
LastTransitionTime: c.now(),
Status: v1.ConditionFalse,
Reason: "StartInPlaceUpdate",
}
if err := c.updateCondition(pod, newCondition); err != nil {
return UpdateResult{InPlaceUpdate: true, UpdateErr: err}
}
}
// 3. update container images
if err := c.updatePodInPlace(pod, spec, opts); err != nil {
return UpdateResult{InPlaceUpdate: true, UpdateErr: err}
}
...
}
func (c *realControl) updatePodInPlace(pod *v1.Pod, spec *UpdateSpec, opts *UpdateOptions) error {
return retry.RetryOnConflict(retry.DefaultBackoff, func() error {
clone, err := c.adp.getPod(pod.Namespace, pod.Name)
// update new revision 给pod 加上revision label
if c.revisionKey != "" {
clone.Labels[c.revisionKey] = spec.Revision
}
// record old containerStatuses
inPlaceUpdateStateJSON, _ := json.Marshal(inPlaceUpdateState)
clone.Annotations[appsv1alpha1.InPlaceUpdateStateKey] = string(inPlaceUpdateStateJSON)
if spec.GraceSeconds <= 0 {
if clone, err = patchUpdateSpecToPod(clone, spec, opts); err != nil {
return err
}
delete(clone.Annotations, appsv1alpha1.InPlaceUpdateGraceKey)
} else {
inPlaceUpdateSpecJSON, _ := json.Marshal(spec)
clone.Annotations[appsv1alpha1.InPlaceUpdateGraceKey] = string(inPlaceUpdateSpecJSON)
}
// 使用k8s api 更新pod
return c.adp.updatePod(clone)
})
}
计算待更新 pod 的 spec ,condition,container status 等数据, 加上 revision label, inplace-update-grace annotation ,最终使用 k8s api 更新 pod 到 k8s cluster
事件一直在产生 并由不同的协程处理, 如果一个协程正在对 cloneset 做扩容操作,那么另一个协程需要等待一下,所以需要一个协程间的协调机制。
type ScaleExpectations interface {
ExpectScale(controllerKey string, action ScaleAction, name string)
ObserveScale(controllerKey string, action ScaleAction, name string)
SatisfiedExpectations(controllerKey string) (bool, time.Duration, map[ScaleAction][]string)
DeleteExpectations(controllerKey string)
GetExpectations(controllerKey string) map[ScaleAction]sets.String
}
type realScaleExpectations struct {
sync.Mutex
// key: parent key, workload namespace/name
controllerCache map[string]*realControllerScaleExpectations
}
一个协程进行某操作前先 ExpectScale,操作完成后再 ObserveScale,另一个协程可以通过 SatisfiedExpectations 来检查 操作是否完成。
// github.com/openkruise/kruise/pkg/controller/cloneset/scale/cloneset_scale.go
func (r *realControl) createPods(...){
// 先ExpectScale
for _, p := range newPods {
clonesetutils.ScaleExpectations.ExpectScale(clonesetutils.GetControllerKey(updateCS), expectations.Create, p.Name)
podsCreationChan <- p
}
// 创建pod 逻辑
...
// 完成后 ObserveScale
for _, pod := range newPods {
if _, ok := successPodNames.Load(pod.Name); !ok {
clonesetutils.ScaleExpectations.ObserveScale(clonesetutils.GetControllerKey(updateCS), expectations.Create, pod.Name)
}
}
}
// github.com/openkruise/kruise/pkg/controller/cloneset/cloneset_controller.go
func (r *ReconcileCloneSet) doReconcile(request reconcile.Request) (res reconcile.Result, retErr error) {
...
// 另一个协程做检测
if scaleSatisfied, unsatisfiedDuration, scaleDirtyPods := clonesetutils.ScaleExpectations.SatisfiedExpectations(request.String()); !scaleSatisfied {
if unsatisfiedDuration >= expectations.ExpectationTimeout {
klog.Warningf("Expectation unsatisfied overtime for %v, scaleDirtyPods=%v, overtime=%v", request.String(), scaleDirtyPods, unsatisfiedDuration)
return reconcile.Result{}, nil
}
klog.V(4).Infof("Not satisfied scale for %v, scaleDirtyPods=%v", request.String(), scaleDirtyPods)
return reconcile.Result{RequeueAfter: expectations.ExpectationTimeout - unsatisfiedDuration}, nil
}
...
}
apiVersion: apps.kruise.io/v1alpha1
kind: CloneSet
spec:
# ...
updateStrategy:
priorityStrategy:
weightPriority:
- weight: 50
matchSelector:
matchLabels:
test-key: foo
- weight: 30
matchSelector:
matchLabels:
test-key: bar
在操作 cloneset 发布之前,为 pod 打上 label,则 test-key=foo 会比 test-key= bar 的 pod 先升级。
CloneSet 管理的 Pod 有以下状态

apiVersion: apps.kruise.io/v1alpha1
kind: CloneSet
spec:
# 通过 finalizer 定义 hook
lifecycle:
preDelete:
finalizersHandler:
- example.io/unready-blocker
inPlaceUpdate:
finalizersHandler:
- example.io/unready-blocker
# 或者也可以通过 label 定义
lifecycle:
inPlaceUpdate:
labelsHandler:
example.io/block-unready: "true"
如果定义了 lifecycle hook /preDelete,cloneset 先只将 Pod 状态改为 PreparingDelete,当开发移除 label/finalizer 后,kruise 才执行 Pod 删除,否则会直接删除 pod。PS:也就是说,如果没有定义 lifecycle hook/preDelete,pod 是没有 PreparingDelete 状态的。
原文链接:https://qiankunli.github.io/2020/08/12/openkruise_cloneset_note.html
K8S 进阶训练营

