k8s中的GPU共享功能实现分析

GPU在容器间共享这个功能其实都有需要,但是官方都没有,之前在公司内实现的改动也太过于in-tree,无法opensource,今天看到阿里云开源了个GPU共享的方案,那么顺道借分析下阿里的方案来总结下这方面的事情。

阿里云的代码在
https://github.com/AliyunContainerService/gpushare-scheduler-extender
https://github.com/AliyunContainerService/gpushare-device-plugin
阿里云实现了一个gpushare的device-plugin,然后实现了相关的调度逻辑

由于具体的共享逻辑在node上,所以我们先看下device-plugin的实现。
之前看过nvidia的官方device-plugin实现,本质很简单,需要实现2个接口 1.listandwatch用于获得机器上的device列表,2. allocate接口用于kubelet分配设备时,这个设备要做的一些事情 这个设计其实不是那么好,这个之前的nvidia k8s和官方的区别中 有提到,限制了deviceplugin对设备的调度控制能力 (device-plugin相关的实现后面也看下补篇文章)

入口代码,和nvidia官方本质没什么区别,加载了nvml的库(cgo的)用户获取机器上的gpu的信息
创建了个fsNotify去实现deviceplugin要求的相关逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
func (ngm *sharedGPUManager) Run() error {
log.V(1).Infoln("Loading NVML")
if err := nvml.Init(); err != nil {
log.V(1).Infof("Failed to initialize NVML: %s.", err)
log.V(1).Infof("If this is a GPU node, did you set the docker default runtime to `nvidia`?")
select {}
}
defer func() { log.V(1).Infoln("Shutdown of NVML returned:", nvml.Shutdown()) }()
log.V(1).Infoln("Fetching devices.")
if getDeviceCount() == uint(0) {
log.V(1).Infoln("No devices found. Waiting indefinitely.")
select {}
}
log.V(1).Infoln("Starting FS watcher.")
watcher, err := newFSWatcher(pluginapi.DevicePluginPath)
if err != nil {
log.V(1).Infoln("Failed to created FS watcher.")
return err
}
defer watcher.Close()
log.V(1).Infoln("Starting OS watcher.")
sigs := newOSWatcher(syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
restart := true
var devicePlugin *NvidiaDevicePlugin
L:
for {
if restart {
if devicePlugin != nil {
devicePlugin.Stop()
}
devicePlugin = NewNvidiaDevicePlugin(ngm.enableMPS, ngm.healthCheck)
if err := devicePlugin.Serve(); err != nil {
log.Warningf("Failed to start device plugin due to %v", err)
} else {
restart = false
}
}
select {
case event := <-watcher.Events:
if event.Name == pluginapi.KubeletSocket && event.Op&fsnotify.Create == fsnotify.Create {
log.V(1).Infof("inotify: %s created, restarting.", pluginapi.KubeletSocket)
restart = true
}

case err := <-watcher.Errors:
log.Warningf("inotify: %s", err)

case s := <-sigs:
switch s {
case syscall.SIGHUP:
log.V(1).Infoln("Received SIGHUP, restarting.")
restart = true
case syscall.SIGQUIT:
t := time.Now()
timestamp := fmt.Sprint(t.Format("20060102150405"))
log.Infoln("generate core dump")
coredump("/etc/kubernetes/go_" + timestamp + ".txt")
default:
log.V(1).Infof("Received signal \"%v\", shutting down.", s)
devicePlugin.Stop()
break L
}
}
}

return nil
}

ListAndWatch的逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
//listandwatch 返回的是m.Devs, 实际是从下面这个函数获得的
func getDevices() ([]*pluginapi.Device, map[string]uint) {
n, err := nvml.GetDeviceCount()
check(err)

var devs []*pluginapi.Device
realDevNames := map[string]uint{}
for i := uint(0); i < n; i++ {
d, err := nvml.NewDevice(i)
check(err)
// realDevNames = append(realDevNames, d.UUID)
var id uint
log.Infof("Deivce %s's Path is %s", d.UUID, d.Path)
_, err = fmt.Sscanf(d.Path, "/dev/nvidia%d", &id)
check(err)
realDevNames[d.UUID] = id
// var KiB uint64 = 1024
log.Infof("# device Memory: %d", uint(*d.Memory))
if getGPUMemory() == uint(0) {
setGPUMemory(uint(*d.Memory))
}
for j := uint(0); j < getGPUMemory(); j++ {
fakeID := generateFakeDeviceID(d.UUID, j) //上面的逻辑和官方的没太多的区别,主要是这,创建了个fakeId ,就是用真的uuid和0-n的数字来加,相当于加了内存数量个的设备!很有意思的实现
if j == 0 {
log.Infoln("# Add first device ID: " + fakeID)
}
if j == getGPUMemory()-1 {
log.Infoln("# Add last device ID: " + fakeID)
}
devs = append(devs, &pluginapi.Device{
ID: fakeID,
Health: pluginapi.Healthy,
})
}
}

return devs, realDevNames
}

具体的分配逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
// Allocate which return list of devices.
func (m *NvidiaDevicePlugin) Allocate(ctx context.Context,
reqs *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) {
responses := pluginapi.AllocateResponse{} //这是一些deviceplugin要求的结构体

log.Infoln("----Allocating GPU for gpu mem is started----")
var (
podReqGPU uint
found bool
assumePod *v1.Pod
)

// podReqGPU = uint(0)
for _, req := range reqs.ContainerRequests {
podReqGPU += uint(len(req.DevicesIDs)) //这里的deviceid是假的
}
log.Infof("RequestPodGPUs: %d", podReqGPU) //遍历pod的container获得这个pod需要的设备数量其实就是内存数啦

m.Lock()
defer m.Unlock()
log.Infoln("checking...")
pods, err := getCandidatePods() //这里是获得用了aliyun.com/gpu-mem 这个资源的在这个node上的pending的pod
if err != nil {
log.Infof("invalid allocation requst: Failed to find candidate pods due to %v", err)
return buildErrResponse(reqs, podReqGPU), nil
}

if log.V(4) {
for _, pod := range pods {
log.Infof("Pod %s in ns %s request GPU Memory %d with timestamp %v",
pod.Name,
pod.Namespace,
getGPUMemoryFromPodResource(pod),
getAssumeTimeFromPodAnnotation(pod))
}
}//debug信息,忽略

for _, pod := range pods {
if getGPUMemoryFromPodResource(pod) == podReqGPU {
log.Infof("Found Assumed GPU shared Pod %s in ns %s with GPU Memory %d",
pod.Name,
pod.Namespace,
podReqGPU)
assumePod = pod
found = true
break
}
}

if found {
id := getGPUIDFromPodAnnotation(assumePod) //从pod的annotation上获得gpu的真正设备号
if id < 0 {
log.Warningf("Failed to get the dev ", assumePod)
}

candidateDevID := ""
if id >= 0 {
ok := false
candidateDevID, ok = m.GetDeviceNameByIndex(uint(id))
if !ok {
log.Warningf("Failed to find the dev for pod %v because it's not able to find dev with index %d",
assumePod,
id)
id = -1
}
}

if id < 0 {
return buildErrResponse(reqs, podReqGPU), nil
}

// 1. Create container requests
for _, req := range reqs.ContainerRequests {
reqGPU := uint(len(req.DevicesIDs))
response := pluginapi.ContainerAllocateResponse{
Envs: map[string]string{
envNVGPU: candidateDevID, // 这里是关键!!! 这个环境变量是NVIDIA_VISABLE_DEVICE 其实是通过传递这个环境变量,让nvidia-docker来实现了选择用哪个设备,所以原始的kubelet选设备的逻辑根本毫无用处
EnvResourceIndex: fmt.Sprintf("%d", id),
EnvResourceByPod: fmt.Sprintf("%d", podReqGPU),
EnvResourceByContainer: fmt.Sprintf("%d", reqGPU),
EnvResourceByDev: fmt.Sprintf("%d", getGPUMemory()), //另外传递了几个变量给容器内的业务来使用
},
}
responses.ContainerResponses = append(responses.ContainerResponses, &response)
}

// 2. Update Pod spec
newPod := updatePodAnnotations(assumePod)
_, err = clientset.CoreV1().Pods(newPod.Namespace).Update(newPod) //这个实现其实导致了device-plugin还需要和k8s交互
if err != nil {
// the object has been modified; please apply your changes to the latest version and try again
if err.Error() == OptimisticLockErrorMsg {
// retry
pod, err := clientset.CoreV1().Pods(assumePod.Namespace).Get(assumePod.Name, metav1.GetOptions{})
if err != nil {
log.Warningf("Failed due to %v", err)
return buildErrResponse(reqs, podReqGPU), nil
}
newPod = updatePodAnnotations(pod)
_, err = clientset.CoreV1().Pods(newPod.Namespace).Update(newPod)
if err != nil {
log.Warningf("Failed due to %v", err)
return buildErrResponse(reqs, podReqGPU), nil
}
} else {
log.Warningf("Failed due to %v", err)
return buildErrResponse(reqs, podReqGPU), nil
}
}

} else {
log.Warningf("invalid allocation requst: request GPU memory %d can't be satisfied.",
podReqGPU)
// return &responses, fmt.Errorf("invalid allocation requst: request GPU memory %d can't be satisfied", reqGPU)
return buildErrResponse(reqs, podReqGPU), nil
}

log.Infof("new allocated GPUs info %v", &responses)
log.Infoln("----Allocating GPU for gpu mem is ended----")
// // Add this to make sure the container is created at least
// currentTime := time.Now()

// currentTime.Sub(lastAllocateTime)

return &responses, nil
}

device-plugin控制了设备真正的使用,所以从这里可以看出来,阿里的方案并没有真正控制gpu的使用,只是实现了共享,最终还需要业务方从传入的环境变量来适应这个共享

然后阿里这个scheduler首先我们需要了解scheduler-extender的设计
https://github.com/kubernetes/community/blob/master/contributors/design-proposals/scheduling/scheduler_extender.md
即官方提供了个方案允许你扩展它的调度器
只需要你指定一个描述文件https://github.com/AliyunContainerService/gpushare-scheduler-extender/blob/master/config/scheduler-policy-config.json 其中设置好几个verb,官方调度器会在相应的环节调用你的调度器处理你的逻辑,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
{
"kind": "Policy",
"apiVersion": "v1",
"extenders": [
{
"urlPrefix": "http://127.0.0.1:32766/gpushare-scheduler",
"filterVerb": "filter",
"bindVerb": "bind",
"enableHttps": false,
"nodeCacheCapable": true,
"managedResources": [
{
"name": "aliyun.com/gpu-mem",
"ignoredByScheduler": false
}
],
"ignorable": false
}
]
}

从描述文件来看,阿里实现了filter和bind接口来接管gpu-mem这个资源的预选操作,具体另外开文章说吧。

然后再描述下我之前对gpu共享的改动,和阿里不太一样,我这里主要是修改了nvidia.com/gpu这个原有的资源,让其支持小数。
主要是改动了scheduler和kubelet,这个比阿里的好处是由于是已有的资源,调度上实现不用太多,改动很少,只有
plugin/pkg/scheduler/algorithm/predicates/predicates.go 以及plugin/pkg/scheduler/schedulercache/node_info.go中资源相关的数据又原来的Value()改成MilliValue()。

剩下的改动主要是kubelet中pkg/kubelet/gpu/nvidia/nvidia_gpu_manager.go 这里由于支持了小数,并且比阿里好的是支持了多gpu的共享
主要是扩展了原有的定义,以及改动了 AllocateGPU(pod v1.Pod, container v1.Container) 这个func中的具体逻辑,具体就不列出来了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//扩展了原有的结构,记录了pod对container,container对gpu的使用量的情况,便于allocate处使用
type podGpuUsage struct {
podGPUMapping map[string]containerToGpuUsage
}

func newPodGpuUsage() *podGpuUsage {
return &podGpuUsage{
podGPUMapping: make(map[string]containerToGpuUsage),
}
}

type containerToGpuUsage map[string]gpuToUsage

type gpuToUsage map[string]*resource.Quantity

总结下来看,目前对k8s容器中gpu共享的方案,都没有实现真正的资源控制,只是实现了gpu卡在多个容器中的共享,并且需要业务去适应这样的问题
但是!!! 其实我也接触到了一些vgpu的方案,总体来说都是通过改写nvidia的cuda api的方式来实现了资源控制,最简单的例子就是写个so把cuMemGetInfo这个cuda api重新实现变,这样无论是什么机器学习框架tensorflow也好还是别的,获得的mem都是限制住的,避免了业务没有适配可能带来的问题

另外再补充一句,gpu这种资源还是适合用的时候多分配,不用的时候就回收,即适合结合自动扩缩容的方案来做资源管理,尤其是对在线的预测服务。