k8s的flexvolume-plugin介绍

k8s的卷管理目前都是in-tree的,也就是写在k8s代码中,实际看了下对国内用户唯一有价值的就ceph相关的cephfs和rbd以及glusterfs,其他基本都不是国内用户在用的。
除此之外我们可以扩展的就flexvolomeplugin或者是csi,csi需要高版本的k8s以及有点麻烦,这里就不关注了。还是主要关注flexvolumeplugin

flexvolume有多个用处:

  1. 用于区分本地的磁盘,分出特定的大小,挂载到容器中,用于动态控制用户可用的空间,而不依赖于docker的配置
  2. 在有client的情况下,挂载远端的(分布式)文件系统

我这里的用途主要是用于挂载分布式存储,在实现插件的过程中,顺道看下flexvolume的相关代码
首先需要了解的是:我们需要实现一个可执行程序放在/usr/libexec/kubernetes/kubelet-plugins/volume/exec/<vendor~driver>/ 这样的目录, 实现以下接口
init:kubelet/kube-controller-manager 初始化存储插件时调用,插件需要返回是否需要要 attach 和 detach 操作
attach:将存储卷挂载到 Node 上
detach:将存储卷从 Node 上卸载
waitforattach: 等待 attach 操作成功(超时时间为 10 分钟)
isattached:检查存储卷是否已经挂载
mountdevice:将设备挂载到指定目录中以便后续 bind mount 使用
unmountdevice:将设备取消挂载
mount:将存储卷挂载到指定目录中
umount:将存储卷取消挂载
具体的可以看官方设计文档https://github.com/kubernetes/community/blob/master/contributors/devel/sig-storage/flexvolume.md

下面看代码:
flexVolumePlugin的主要代码在pkg/volume/flexvolume目录下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

func (pluginFactory) NewFlexVolumePlugin(pluginDir, name string) (volume.VolumePlugin, error) {
execPath := path.Join(pluginDir, name)

driverName := utilstrings.UnescapePluginName(name)

flexPlugin := &flexVolumePlugin{
driverName: driverName,
execPath: execPath,
runner: exec.New(),
unsupportedCommands: []string{},
}

// Initialize the plugin and probe the capabilities
call := flexPlugin.NewDriverCall(initCmd)
ds, err := call.Run()
if err != nil {
return nil, err
}
flexPlugin.capabilities = *ds.Capabilities

if flexPlugin.capabilities.Attach {
// Plugin supports attach/detach, so return flexVolumeAttachablePlugin
return &flexVolumeAttachablePlugin{flexVolumePlugin: flexPlugin}, nil
} else {
return flexPlugin, nil
}
}

一个细节是会根据init 接口的返回来判断走不通的逻辑,支持则是device模式,不支持则是普通的模式
区别是device模式需要实现attach detach waitforattach isattached mountdevicve unmountdevice 这些接口 比如用于挂lvm 设备,需要有个先创建设备的过程
普通模式实现mount unmount就可以了 比如用于挂nfs
为什么这么设计呢: 其实很容器理解,kubelet有些reconile 重试的操作,以lvm举例,如果把lvcreate只写在mount中的话会导致device重新被创建,但是显然只是不应该的,且是非幂等有问题的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
//我们先看简单的只需要mount umount的
//注意下列函数实际在2个文件中,我这放一起方便查看

// newMounterInternal创建了相关的结构用于实际的mount操作
func (plugin *flexVolumePlugin) newMounterInternal(spec *volume.Spec, pod *api.Pod, mounter mount.Interface, runner exec.Interface) (volume.Mounter, error) {
source, readOnly := getVolumeSource(spec)
return &flexVolumeMounter{
flexVolume: &flexVolume{
driverName: source.Driver,
execPath: plugin.getExecutable(),
mounter: mounter,
plugin: plugin,
podUID: pod.UID,
podNamespace: pod.Namespace,
volName: spec.Name(),
},
runner: runner,
spec: spec,
readOnly: readOnly,
blockDeviceMounter: &mount.SafeFormatAndMount{Interface: mounter, Runner: runner},
}, nil
}
// 这里实际完成了mount的操作 代码在mounter.go 中
func (f *flexVolumeMounter) SetUpAt(dir string, fsGroup *int64) error {
glog.Infof("Mount dir:%s", dir)
// Mount only once.
alreadyMounted, err := prepareForMount(f.mounter, dir)
if err != nil {
return err
}
if alreadyMounted {
return nil
}

call := f.plugin.NewDriverCall(mountCmd) //drivercall是实际去执行命令的方法,后面单独讲下

// Interface parameters
call.Append(dir) //这是你的mount命令能收到的第一个参数 即挂载的目录

extraOptions := make(map[string]string)

// Extract secret and pass it as options.
if err := addSecretsToOptions(extraOptions, f.spec, f.podNamespace, f.driverName, f.plugin.host); err != nil {
return err
}

// Implicit parameters
if fsGroup != nil {
extraOptions[optionFSGroup] = strconv.FormatInt(*fsGroup, 10)
}

call.AppendSpec(f.spec, f.plugin.host, extraOptions) //这是第二个参数,注意这里是以json格式传递的 所以这里能看到你能拿到的相关信息

_, err = call.Run()
if isCmdNotSupportedErr(err) {
err = (*mounterDefaults)(f).SetUpAt(dir, fsGroup)
}

if err != nil {
return err
}

if !f.readOnly {
volume.SetVolumeOwnership(f, fsGroup)
}

return nil
}

attach看了下,本质区别不大,就是步骤多点,代码太多这里不贴出来了,下面看下driverCall的逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76

func (plugin *flexVolumePlugin) NewDriverCallWithTimeout(command string, timeout time.Duration) *DriverCall {
return &DriverCall{
Command: command,
Timeout: timeout,
plugin: plugin,
args: []string{command},
}
}

//注意append和appendSpec的区别

func (dc *DriverCall) Append(arg string) {
dc.args = append(dc.args, arg)
}

func (dc *DriverCall) AppendSpec(spec *volume.Spec, host volume.VolumeHost, extraOptions map[string]string) error {
optionsForDriver, err := NewOptionsForDriver(spec, host, extraOptions)
if err != nil {
return err
}

jsonBytes, err := json.Marshal(optionsForDriver)//这里json编码了下
if err != nil {
return fmt.Errorf("Failed to marshal spec, error: %s", err.Error())
}

dc.Append(string(jsonBytes))
return nil
}

func (dc *DriverCall) Run() (*DriverStatus, error) {
if dc.plugin.isUnsupported(dc.Command) {
return nil, errors.New(StatusNotSupported)
}
execPath := dc.plugin.getExecutable()

cmd := dc.plugin.runner.Command(execPath, dc.args...)

timeout := false
if dc.Timeout > 0 {
timer := time.AfterFunc(dc.Timeout, func() { //这里实现了一个超时机制,注意插件挂目录有个2分钟的超时,这个时间是hardcode的,不可修改
timeout = true
cmd.Stop()
})
defer timer.Stop()
}

output, execErr := cmd.CombinedOutput()
if execErr != nil {
if timeout {
return nil, TimeoutError
}
_, err := handleCmdResponse(dc.Command, output)
if err == nil {
glog.Errorf("FlexVolume: driver bug: %s: exec error (%s) but no error in response.", execPath, execErr)
return nil, execErr
}
if isCmdNotSupportedErr(err) {
dc.plugin.unsupported(dc.Command)
} else {
glog.Warningf("FlexVolume: driver call failed: executable: %s, args: %s, error: %s, output: %s", execPath, dc.args, execErr.Error(), output)
}
return nil, err
}

status, err := handleCmdResponse(dc.Command, output)
if err != nil {
if isCmdNotSupportedErr(err) {
dc.plugin.unsupported(dc.Command)
}
return nil, err
}

return status, nil
}

总体来看flexvolume还是个比较简单的逻辑根据用户的程序init的返回走不同实现,用driver-call来调用用户的命令,来把目录挂到相应的路径。
需要留意的点是:以上步骤都在pause创建出来前。