如何实现系列二:实现一个自己的admission controller

准入控制器(Admission Controller)位于 API Server 中,在对象被持久化之前,准入控制器拦截对 API Server 的请求,一般用来做身份验证和授权,不过从设计上由于位于apiserver把数据写入etcd之前,所以我们可以加入一些自己的逻辑,或者改写apisever到etcd中的数据。线上我们长开的admission主要是NamespaceLifecycle,ResourceQuota,namespacelifecycle是保证删namespace的时候不会再有pod创建;resourcequota主要是总控namespace下的request和limit,一般我们用来做总体资源限制,其他admission的具体介绍可以参考官方文档
https://kubernetes.io/docs/reference/access-authn-authz/admission-controllers/

admission的各个策略的实现很容器找到,代码都在中src/k8s.io/kubernetes/plugin/pkg/admission 中,
不过admission本身的代码实现就有点难找了,藏在了src/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/admission/initializer/initializer.go 中,
一般ide还不一定能找到。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// 这里可以看到admission能够看到的,也就是未来能修改,验证的东西
func NewPluginInitializer(
internalClient internalclientset.Interface,
sharedInformers informers.SharedInformerFactory,
cloudConfig []byte,
restMapper meta.RESTMapper,
quotaConfiguration quota.Configuration,
) *PluginInitializer {
return &PluginInitializer{
internalClient: internalClient,
informers: sharedInformers,
cloudConfig: cloudConfig,
restMapper: restMapper,
quotaConfiguration: quotaConfiguration,
}
}

// 这里很关键,尤其注意,这里判断了几个WantsInternalKubeClientSet, WantsInternalKubeInformerFactory接口等是否实现了
// 便于admission 在代码中使用kubeclient以及informer的功能
func (i *PluginInitializer) Initialize(plugin admission.Interface) {
if wants, ok := plugin.(WantsInternalKubeClientSet); ok {
wants.SetInternalKubeClientSet(i.internalClient)
}

if wants, ok := plugin.(WantsInternalKubeInformerFactory); ok {
wants.SetInternalKubeInformerFactory(i.informers)
}

if wants, ok := plugin.(WantsCloudConfig); ok {
wants.SetCloudConfig(i.cloudConfig)
}

if wants, ok := plugin.(WantsRESTMapper); ok {
wants.SetRESTMapper(i.restMapper)
}

if wants, ok := plugin.(WantsQuotaConfiguration); ok {
wants.SetQuotaConfiguration(i.quotaConfiguration)
}
}

下面可以看到admission 给我们提供的2个接口,admit和validate

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// Admit performs an admission control check using a chain of handlers, and returns immediately on first error
func (admissionHandler chainAdmissionHandler) Admit(a Attributes) error {
for _, handler := range admissionHandler {
if !handler.Handles(a.GetOperation()) {
continue
}
if mutator, ok := handler.(MutationInterface); ok {
err := mutator.Admit(a)
if err != nil {
return err
}
}
}
return nil
}

// Validate performs an admission control check using a chain of handlers, and returns immediately on first error
func (admissionHandler chainAdmissionHandler) Validate(a Attributes) error {
for _, handler := range admissionHandler {
if !handler.Handles(a.GetOperation()) {
continue
}
if validator, ok := handler.(ValidationInterface); ok {
err := validator.Validate(a)
if err != nil {
return err
}
}
}
return nil
}

了解基础背景后,回到我们的正题,由于一些业务需要,我们需要在apiserver这个层面对pod做一定的处理,那我们看下我们自己如何实现一个adminssion

  1. 首先我们需要定义好plugin,并且实现register方法,总的admission chain会用这个方法来注册

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    const PluginName = "AlwaysAdmit"

    // Register registers a plugin
    func Register(plugins *admission.Plugins) {
    plugins.Register(PluginName, func(config io.Reader) (admission.Interface, error) {
    return NewAlwaysAdmit(), nil
    })
    }
    func NewAlwaysAdmit() *Plugin {
    return &Plugin{
    Handler: admission.NewHandler(admission.Delete), //这个可以定义这个admission处理的操作是什么,比如这里我只处理delete的请求
    }
    }
    //并且在kubeapiserver/options中注册上,这个是在另外一个文件中
    func RegisterAllAdmissionPlugins(plugins *admission.Plugins) {
    admit.Register(plugins)
  2. 插件本身的定义,要实现admission.Handler这个接口,此处由于我还需要从k8s获取一些pod的具体信息,也实现了一个clientset

    1
    2
    3
    4
    5
    6
    7
    8
    type Plugin struct {
    *admission.Handler
    client internalclientset.Interface
    }

    //这2个很有用,可以保证你实现了相应的接口,IDE就可以有提示,不用等到编译环境
    var _ admission.ValidationInterface = &Plugin{}
    var _ = kubeapiserveradmission.WantsInternalKubeClientSet(&Plugin{})
  3. 此处就是实现了上面说的kubeapiserveradmission.WantsInternalKubeClientSet

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    // SetInternalKubeClientSet implements the WantsInternalKubeClientSet interface.
    func (p *Plugin) SetInternalKubeClientSet(client internalclientset.Interface) {
    p.client = client
    } // ValidateInitialization implements the InitializationValidator interface.
    func (p *Plugin) ValidateInitialization() error {
    if p.client == nil {
    return fmt.Errorf("missing client")
    }
    return nil
    }
  4. 实现具体的admin和validate实现

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    func (p *Plugin) Validate(attributes admission.Attributes) (err error) {
    // Ignore all calls to subresources or resources other than pods.
    if len(attributes.GetSubresource()) != 0 || attributes.GetResource().GroupResource() != api.Resource("pods") {
    return nil
    }
    //admission.Attributes 中只有一些基本信息,所以这里我用client去获取更具体的内容
    pod, err := p.client.Core().Pods(attributes.GetNamespace()).Get(attributes.GetName(), metav1.GetOptions{})
    if err != nil {
    return admission.NewForbidden(attributes, err)
    }
    //xxxx 自己的业务逻辑,此处略去
    return nil
    }

具体的其他实现可以参考已有的代码。admission算是一个比较方便,并且优雅的做一些自定义逻辑的地方,包括之前的一起改动从这里来改会看起来更好一些,而且admission是可以通过kube-apiserver的参数来控制启用和停止的。主要不同的apiserver可以实现不同的功能