Informer 机制深度解析
Informer 架构
Informer 是 client-go 的核心机制,实现了高效的资源监听和本地缓存:
API Server
│ List(初始化)+ Watch(增量更新)
▼
Reflector(反射器)
│ 将事件写入 DeltaFIFO
▼
DeltaFIFO(增量队列)
│ Pop 事件
▼
Indexer(本地缓存 + 索引)
│ 触发事件处理器
▼
ResourceEventHandler(用户回调)
│ 将 Key 加入 WorkQueue
▼
WorkQueue(工作队列)
│ Worker 协程消费
▼
Reconcile(业务逻辑)完整示例
go
package main
import (
"fmt"
"time"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/workqueue"
)
func main() {
config, _ := clientcmd.BuildConfigFromFlags("", "/root/.kube/config")
clientset, _ := kubernetes.NewForConfig(config)
// 创建 SharedInformerFactory(30s 重新同步一次)
factory := informers.NewSharedInformerFactoryWithOptions(
clientset,
30*time.Second,
informers.WithNamespace("default"), // 只监听 default 命名空间
)
// 获取 Pod Informer
podInformer := factory.Core().V1().Pods()
informer := podInformer.Informer()
lister := podInformer.Lister()
// 创建工作队列
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
// 注册事件处理器
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {
queue.Add(key)
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(newObj)
if err == nil {
queue.Add(key)
}
},
DeleteFunc: func(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err == nil {
queue.Add(key)
}
},
})
// 启动 Informer
stopCh := make(chan struct{})
defer close(stopCh)
factory.Start(stopCh)
// 等待缓存同步
if !cache.WaitForCacheSync(stopCh, informer.HasSynced) {
panic("缓存同步失败")
}
// 启动 Worker
go worker(queue, lister)
// 阻塞主协程
<-stopCh
}
func worker(queue workqueue.RateLimitingInterface, lister corev1listers.PodLister) {
for processNextItem(queue, lister) {
}
}
func processNextItem(queue workqueue.RateLimitingInterface, lister corev1listers.PodLister) bool {
key, quit := queue.Get()
if quit {
return false
}
defer queue.Done(key)
// 业务逻辑
err := syncPod(key.(string), lister)
if err != nil {
// 失败重试(指数退避)
queue.AddRateLimited(key)
fmt.Printf("处理失败,重试: %s, 错误: %v\n", key, err)
return true
}
// 成功,清除重试计数
queue.Forget(key)
return true
}
func syncPod(key string, lister corev1listers.PodLister) error {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
// 从缓存读取(不访问 API Server)
pod, err := lister.Pods(namespace).Get(name)
if err != nil {
return err
}
fmt.Printf("处理 Pod: %s/%s, Phase: %s\n", namespace, name, pod.Status.Phase)
return nil
}Lister 索引查询
go
// 列出所有 Pod
pods, err := lister.Pods("default").List(labels.Everything())
// 标签选择器查询
selector := labels.SelectorFromSet(labels.Set{"app": "nginx"})
pods, err := lister.Pods("default").List(selector)
// 获取单个 Pod
pod, err := lister.Pods("default").Get("my-pod")自定义索引
go
// 添加自定义索引(按 NodeName 索引 Pod)
informer.AddIndexers(cache.Indexers{
"byNode": func(obj interface{}) ([]string, error) {
pod := obj.(*corev1.Pod)
return []string{pod.Spec.NodeName}, nil
},
})
// 查询指定节点上的所有 Pod
objs, err := informer.GetIndexer().ByIndex("byNode", "node1")
for _, obj := range objs {
pod := obj.(*corev1.Pod)
fmt.Println(pod.Name)
}Informer 最佳实践
- 使用 SharedInformerFactory:避免重复 Watch 同一资源
- 合理设置 ResyncPeriod:30s-5min,过短增加 API Server 负担
- 使用 Lister 读取:从缓存读取,不要直接调用 Clientset
- 处理 Tombstone:DeleteFunc 中使用
cache.DeletionHandlingMetaNamespaceKeyFunc - WorkQueue 解耦:事件处理器只负责入队,Worker 负责业务逻辑