给k8s的deployment实现上线顺序和暂停点功能

k8s的deployment是通过调整2个rs的副本数来实现容器的升级和更新的:删一个rs里面的pod,扩容新的spec的rs里面的pod。
那么上线的顺序可以转化成删除rs的顺序。

我们看k8s的代码,可以看到rs副本数的调整是以下函数控制的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps.ReplicaSet) error {
if diff > 0 { //diff 是当前pod数和rs上的replica数的差,大于0为缩容
if diff > rsc.burstReplicas {
diff = rsc.burstReplicas
}
klog.V(2).Infof("Too many replicas for %v %s/%s, need %d, deleting %d", rsc.Kind, rs.Namespace, rs.Name, *(rs.Spec.Replicas), diff)

// Choose which Pods to delete, preferring those in earlier phases of startup.
podsToDelete := getPodsToDelete(filteredPods, diff) //删pod的顺序在这个里面

rsc.expectations.ExpectDeletions(rsKey, getPodKeys(podsToDelete))

errCh := make(chan error, diff)
var wg sync.WaitGroup
wg.Add(diff)
for _, pod := range podsToDelete {
go func(targetPod *v1.Pod) {
defer wg.Done()
if err := rsc.podControl.DeletePod(rs.Namespace, targetPod.Name, rs); err != nil {
// Decrement the expected number of deletes because the informer won't observe this deletion
podKey := controller.PodKey(targetPod)
klog.V(2).Infof("Failed to delete %v, decrementing expectations for %v %s/%s", podKey, rsc.Kind, rs.Namespace, rs.Name)
rsc.expectations.DeletionObserved(rsKey, podKey)
errCh <- err
}
}(pod)
}
wg.Wait()

具体看getPodsToDelete的逻辑

1
2
3
4
5
6
7
8
9
10
11
func getPodsToDelete(filteredPods []*v1.Pod, diff int) []*v1.Pod {
// No need to sort pods if we are about to delete all of them.
// diff will always be <= len(filteredPods), so not need to handle > case.
if diff < len(filteredPods) {
// Sort the pods in the order such that not-ready < ready, unscheduled
// < scheduled, and pending < running. This ensures that we delete pods
// in the earlier stages whenever possible.
sort.Sort(controller.ActivePods(filteredPods))
}
return filteredPods[:diff]
}

可以看到是找到需要删除的diff数量的pod,具体的顺序是ActivePods sort完的熟悉,我们看下这个类型的自定义排序逻辑
并不是随机的,而是有一定的顺序。
顺序为unassigned < Pending< Unknown < Not Ready < higher restart counts < new start pod

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func (s ActivePods) Less(i, j int) bool {

// 1. Unassigned < assigned
// If only one of the pods is unassigned, the unassigned one is smaller
if s[i].Spec.NodeName != s[j].Spec.NodeName && (len(s[i].Spec.NodeName) == 0 || len(s[j].Spec.NodeName) == 0) {
return len(s[i].Spec.NodeName) == 0
}
// 2. PodPending < PodUnknown < PodRunning
m := map[v1.PodPhase]int{v1.PodPending: 0, v1.PodUnknown: 1, v1.PodRunning: 2}
if m[s[i].Status.Phase] != m[s[j].Status.Phase] {
return m[s[i].Status.Phase] < m[s[j].Status.Phase]
}
// 3. Not ready < ready
// If only one of the pods is not ready, the not ready one is smaller
if podutil.IsPodReady(s[i]) != podutil.IsPodReady(s[j]) {
return !podutil.IsPodReady(s[i])
}
// TODO: take availability into account when we push minReadySeconds information from deployment into pods,
// see https://github.com/kubernetes/kubernetes/issues/22065
// 4. Been ready for empty time < less time < more time
// If both pods are ready, the latest ready one is smaller
if podutil.IsPodReady(s[i]) && podutil.IsPodReady(s[j]) && !podReadyTime(s[i]).Equal(podReadyTime(s[j])) {
return afterOrZero(podReadyTime(s[i]), podReadyTime(s[j]))
}
// 5. Pods with containers with higher restart counts < lower restart counts
if maxContainerRestarts(s[i]) != maxContainerRestarts(s[j]) {
return maxContainerRestarts(s[i]) > maxContainerRestarts(s[j])
}
// 6. Empty creation time pods < newer pods < older pods
if !s[i].CreationTimestamp.Equal(&s[j].CreationTimestamp) {
return afterOrZero(&s[i].CreationTimestamp, &s[j].CreationTimestamp)
}
return false
}

我们加个自定义的顺序逻辑,放到最前面 ,这样我们只要给pod打上不同数值的annotation,就可以控制rs删除pod的顺序也就控制了deployment更新pod的顺序,核心代码如下,具体见提交的github pr

1
2
3
4
5
6
7
8
9
//0. controller.alpha.kubernetes.io/delete-priority=1 < controller.alpha.kubernetes.io/delete-priority=0
//If pod has annotation controller.alpha.kubernetes.io/delete-priority the larger is small
if s[i].Annotations[DeletePriorityPodAnnotationKey] != s[j].Annotations[DeletePriorityPodAnnotationKey] {
s1, err1 := strconv.Atoi(s[i].Annotations[DeletePriorityPodAnnotationKey])
s2, err2 := strconv.Atoi(s[j].Annotations[DeletePriorityPodAnnotationKey])
if err1 == nil && err2 == nil {
return s1 > s2
}
}

至于暂停点的功能:
目前k8s是通过kubectl rollout pause deployment.v1.apps/nginx-deployment实现的,十分依赖人工操作的时机。我们也可以仿照上面的方法,加个annotation打到pod上,rs删到这个pod的时候调用下上面这个命令的内部调用,暂停下。然后业务层,再提供kubectl rollout resume deployment.v1.apps/nginx-deployment的入口,允许用户继续。

具体实现我还没写,现招人实现提交给github上,看这篇文章的同学有没有兴趣研究下,没错说的就是你。