Skip to content

Informer 机制深度解析

Informer 架构

Informer 是 client-go 的核心机制,实现了高效的资源监听和本地缓存:

API Server
    │ List(初始化)+ Watch(增量更新)

Reflector(反射器)
    │ 将事件写入 DeltaFIFO

DeltaFIFO(增量队列)
    │ Pop 事件

Indexer(本地缓存 + 索引)
    │ 触发事件处理器

ResourceEventHandler(用户回调)
    │ 将 Key 加入 WorkQueue

WorkQueue(工作队列)
    │ Worker 协程消费

Reconcile(业务逻辑)

完整示例

go
package main

import (
    "fmt"
    "time"
    corev1 "k8s.io/api/core/v1"
    "k8s.io/apimachinery/pkg/labels"
    "k8s.io/client-go/informers"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/cache"
    "k8s.io/client-go/tools/clientcmd"
    "k8s.io/client-go/util/workqueue"
)

func main() {
    config, _ := clientcmd.BuildConfigFromFlags("", "/root/.kube/config")
    clientset, _ := kubernetes.NewForConfig(config)

    // 创建 SharedInformerFactory(30s 重新同步一次)
    factory := informers.NewSharedInformerFactoryWithOptions(
        clientset,
        30*time.Second,
        informers.WithNamespace("default"),  // 只监听 default 命名空间
    )

    // 获取 Pod Informer
    podInformer := factory.Core().V1().Pods()
    informer := podInformer.Informer()
    lister := podInformer.Lister()

    // 创建工作队列
    queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())

    // 注册事件处理器
    informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {
            key, err := cache.MetaNamespaceKeyFunc(obj)
            if err == nil {
                queue.Add(key)
            }
        },
        UpdateFunc: func(oldObj, newObj interface{}) {
            key, err := cache.MetaNamespaceKeyFunc(newObj)
            if err == nil {
                queue.Add(key)
            }
        },
        DeleteFunc: func(obj interface{}) {
            key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
            if err == nil {
                queue.Add(key)
            }
        },
    })

    // 启动 Informer
    stopCh := make(chan struct{})
    defer close(stopCh)
    factory.Start(stopCh)

    // 等待缓存同步
    if !cache.WaitForCacheSync(stopCh, informer.HasSynced) {
        panic("缓存同步失败")
    }

    // 启动 Worker
    go worker(queue, lister)

    // 阻塞主协程
    <-stopCh
}

func worker(queue workqueue.RateLimitingInterface, lister corev1listers.PodLister) {
    for processNextItem(queue, lister) {
    }
}

func processNextItem(queue workqueue.RateLimitingInterface, lister corev1listers.PodLister) bool {
    key, quit := queue.Get()
    if quit {
        return false
    }
    defer queue.Done(key)

    // 业务逻辑
    err := syncPod(key.(string), lister)
    if err != nil {
        // 失败重试(指数退避)
        queue.AddRateLimited(key)
        fmt.Printf("处理失败,重试: %s, 错误: %v\n", key, err)
        return true
    }

    // 成功,清除重试计数
    queue.Forget(key)
    return true
}

func syncPod(key string, lister corev1listers.PodLister) error {
    namespace, name, err := cache.SplitMetaNamespaceKey(key)
    if err != nil {
        return err
    }

    // 从缓存读取(不访问 API Server)
    pod, err := lister.Pods(namespace).Get(name)
    if err != nil {
        return err
    }

    fmt.Printf("处理 Pod: %s/%s, Phase: %s\n", namespace, name, pod.Status.Phase)
    return nil
}

Lister 索引查询

go
// 列出所有 Pod
pods, err := lister.Pods("default").List(labels.Everything())

// 标签选择器查询
selector := labels.SelectorFromSet(labels.Set{"app": "nginx"})
pods, err := lister.Pods("default").List(selector)

// 获取单个 Pod
pod, err := lister.Pods("default").Get("my-pod")

自定义索引

go
// 添加自定义索引(按 NodeName 索引 Pod)
informer.AddIndexers(cache.Indexers{
    "byNode": func(obj interface{}) ([]string, error) {
        pod := obj.(*corev1.Pod)
        return []string{pod.Spec.NodeName}, nil
    },
})

// 查询指定节点上的所有 Pod
objs, err := informer.GetIndexer().ByIndex("byNode", "node1")
for _, obj := range objs {
    pod := obj.(*corev1.Pod)
    fmt.Println(pod.Name)
}

Informer 最佳实践

  1. 使用 SharedInformerFactory:避免重复 Watch 同一资源
  2. 合理设置 ResyncPeriod:30s-5min,过短增加 API Server 负担
  3. 使用 Lister 读取:从缓存读取,不要直接调用 Clientset
  4. 处理 Tombstone:DeleteFunc 中使用 cache.DeletionHandlingMetaNamespaceKeyFunc
  5. WorkQueue 解耦:事件处理器只负责入队,Worker 负责业务逻辑

本站内容由 褚成志 整理编写,仅供学习参考