Watch & List 机制深度解析
List-Watch 原理
K8s 的事件驱动架构基于 List-Watch 机制:
1. List:获取资源的当前完整状态(带 ResourceVersion)
2. Watch:从 ResourceVersion 开始监听增量变化
API Server
│ List(初始化,获取所有 Pod + ResourceVersion)
│ Watch(从 ResourceVersion 开始,接收增量事件)
▼
Reflector(client-go)
│ 将事件写入 DeltaFIFO
▼
Indexer(本地缓存)ResourceVersion 机制
go
// 第一次 List,获取所有资源和 ResourceVersion
pods, err := clientset.CoreV1().Pods("").List(ctx, metav1.ListOptions{})
rv := pods.ResourceVersion // 例如 "12345"
// 从 rv 开始 Watch,不丢失任何事件
watcher, err := clientset.CoreV1().Pods("").Watch(ctx, metav1.ListOptions{
ResourceVersion: rv,
})
for event := range watcher.ResultChan() {
switch event.Type {
case watch.Added:
pod := event.Object.(*corev1.Pod)
fmt.Printf("Added: %s\n", pod.Name)
case watch.Modified:
pod := event.Object.(*corev1.Pod)
fmt.Printf("Modified: %s\n", pod.Name)
case watch.Deleted:
pod := event.Object.(*corev1.Pod)
fmt.Printf("Deleted: %s\n", pod.Name)
case watch.Error:
// 处理错误,重新 List-Watch
}
}Bookmark 事件
go
// 启用 Bookmark,定期更新 ResourceVersion(避免重新 List)
watcher, err := clientset.CoreV1().Pods("").Watch(ctx, metav1.ListOptions{
ResourceVersion: "0",
AllowWatchBookmarks: true,
})
for event := range watcher.ResultChan() {
if event.Type == watch.Bookmark {
// 更新本地 ResourceVersion
rv = event.Object.(*corev1.Pod).ResourceVersion
}
}分页 List
go
// 大量资源时使用分页
var continueToken string
for {
list, err := clientset.CoreV1().Pods("").List(ctx, metav1.ListOptions{
Limit: 500,
Continue: continueToken,
})
if err != nil {
break
}
for _, pod := range list.Items {
// 处理 pod
}
if list.Continue == "" {
break
}
continueToken = list.Continue
}Watch 超时处理
go
// Watch 连接会超时(默认 5-10 分钟),需要重连
func watchWithRetry(ctx context.Context, clientset kubernetes.Interface) {
for {
watcher, err := clientset.CoreV1().Pods("").Watch(ctx, metav1.ListOptions{
ResourceVersion: "0",
TimeoutSeconds: pointer.Int64(300),
})
if err != nil {
time.Sleep(5 * time.Second)
continue
}
for event := range watcher.ResultChan() {
// 处理事件
}
// Watch 结束,重新连接
watcher.Stop()
}
}使用 Informer(推荐)
实际开发中应使用 Informer 而不是直接 Watch,Informer 内部处理了重连、缓存同步等复杂逻辑:
go
// Informer 内部自动处理:
// 1. List-Watch 初始化
// 2. Watch 断线重连
// 3. ResourceVersion 管理
// 4. 本地缓存维护
// 5. 事件去重
factory := informers.NewSharedInformerFactory(clientset, 30*time.Second)
podInformer := factory.Core().V1().Pods().Informer()
// 直接使用,无需关心底层细节