Skip to content

Watch & List 机制深度解析

List-Watch 原理

K8s 的事件驱动架构基于 List-Watch 机制:

1. List:获取资源的当前完整状态(带 ResourceVersion)
2. Watch:从 ResourceVersion 开始监听增量变化

API Server
    │ List(初始化,获取所有 Pod + ResourceVersion)
    │ Watch(从 ResourceVersion 开始,接收增量事件)

Reflector(client-go)
    │ 将事件写入 DeltaFIFO

Indexer(本地缓存)

ResourceVersion 机制

go
// 第一次 List,获取所有资源和 ResourceVersion
pods, err := clientset.CoreV1().Pods("").List(ctx, metav1.ListOptions{})
rv := pods.ResourceVersion  // 例如 "12345"

// 从 rv 开始 Watch,不丢失任何事件
watcher, err := clientset.CoreV1().Pods("").Watch(ctx, metav1.ListOptions{
    ResourceVersion: rv,
})

for event := range watcher.ResultChan() {
    switch event.Type {
    case watch.Added:
        pod := event.Object.(*corev1.Pod)
        fmt.Printf("Added: %s\n", pod.Name)
    case watch.Modified:
        pod := event.Object.(*corev1.Pod)
        fmt.Printf("Modified: %s\n", pod.Name)
    case watch.Deleted:
        pod := event.Object.(*corev1.Pod)
        fmt.Printf("Deleted: %s\n", pod.Name)
    case watch.Error:
        // 处理错误,重新 List-Watch
    }
}

Bookmark 事件

go
// 启用 Bookmark,定期更新 ResourceVersion(避免重新 List)
watcher, err := clientset.CoreV1().Pods("").Watch(ctx, metav1.ListOptions{
    ResourceVersion:     "0",
    AllowWatchBookmarks: true,
})

for event := range watcher.ResultChan() {
    if event.Type == watch.Bookmark {
        // 更新本地 ResourceVersion
        rv = event.Object.(*corev1.Pod).ResourceVersion
    }
}

分页 List

go
// 大量资源时使用分页
var continueToken string
for {
    list, err := clientset.CoreV1().Pods("").List(ctx, metav1.ListOptions{
        Limit:    500,
        Continue: continueToken,
    })
    if err != nil {
        break
    }
    for _, pod := range list.Items {
        // 处理 pod
    }
    if list.Continue == "" {
        break
    }
    continueToken = list.Continue
}

Watch 超时处理

go
// Watch 连接会超时(默认 5-10 分钟),需要重连
func watchWithRetry(ctx context.Context, clientset kubernetes.Interface) {
    for {
        watcher, err := clientset.CoreV1().Pods("").Watch(ctx, metav1.ListOptions{
            ResourceVersion: "0",
            TimeoutSeconds:  pointer.Int64(300),
        })
        if err != nil {
            time.Sleep(5 * time.Second)
            continue
        }

        for event := range watcher.ResultChan() {
            // 处理事件
        }

        // Watch 结束,重新连接
        watcher.Stop()
    }
}

使用 Informer(推荐)

实际开发中应使用 Informer 而不是直接 Watch,Informer 内部处理了重连、缓存同步等复杂逻辑:

go
// Informer 内部自动处理:
// 1. List-Watch 初始化
// 2. Watch 断线重连
// 3. ResourceVersion 管理
// 4. 本地缓存维护
// 5. 事件去重

factory := informers.NewSharedInformerFactory(clientset, 30*time.Second)
podInformer := factory.Core().V1().Pods().Informer()
// 直接使用,无需关心底层细节

本站内容由 褚成志 整理编写,仅供学习参考