WorkQueue 工作队列
为什么需要 WorkQueue
直接在 Informer 事件处理器中执行业务逻辑有以下问题:
- 阻塞 Informer 的事件处理
- 无法重试失败的操作
- 短时间内多次更新同一对象会触发多次处理
WorkQueue 解决了这些问题:事件处理器只负责入队,Worker 协程异步消费。
队列类型
go
import "k8s.io/client-go/util/workqueue"
// 基础队列(去重)
queue := workqueue.New()
// 限速队列(推荐,支持指数退避重试)
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
// 延迟队列
queue := workqueue.NewDelayingQueue()
// 命名队列(带 Prometheus 指标)
queue := workqueue.NewNamedRateLimitingQueue(
workqueue.DefaultControllerRateLimiter(),
"my-controller",
)限速器
go
// 默认限速器(指数退避 + 令牌桶)
workqueue.DefaultControllerRateLimiter()
// 基础延迟 5ms,最大延迟 1000s,令牌桶 10 QPS,突发 100
// 自定义限速器
workqueue.NewItemExponentialFailureRateLimiter(
5*time.Millisecond, // 基础延迟
1000*time.Second, // 最大延迟
)
// 组合限速器
workqueue.NewMaxOfRateLimiter(
workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
)完整控制器示例
go
type Controller struct {
clientset kubernetes.Interface
lister corev1listers.PodLister
synced cache.InformerSynced
queue workqueue.RateLimitingInterface
}
func NewController(clientset kubernetes.Interface, factory informers.SharedInformerFactory) *Controller {
podInformer := factory.Core().V1().Pods()
c := &Controller{
clientset: clientset,
lister: podInformer.Lister(),
synced: podInformer.Informer().HasSynced,
queue: workqueue.NewNamedRateLimitingQueue(
workqueue.DefaultControllerRateLimiter(), "pods"),
}
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.enqueue,
UpdateFunc: func(old, new interface{}) { c.enqueue(new) },
DeleteFunc: c.enqueue,
})
return c
}
func (c *Controller) enqueue(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
return
}
c.queue.Add(key)
}
func (c *Controller) Run(workers int, stopCh <-chan struct{}) {
defer c.queue.ShutDown()
if !cache.WaitForCacheSync(stopCh, c.synced) {
return
}
for i := 0; i < workers; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
}
<-stopCh
}
func (c *Controller) runWorker() {
for c.processNextItem() {}
}
func (c *Controller) processNextItem() bool {
key, quit := c.queue.Get()
if quit {
return false
}
defer c.queue.Done(key)
err := c.syncHandler(key.(string))
c.handleErr(err, key)
return true
}
func (c *Controller) handleErr(err error, key interface{}) {
if err == nil {
c.queue.Forget(key)
return
}
// 重试次数限制
if c.queue.NumRequeues(key) < 5 {
c.queue.AddRateLimited(key)
return
}
// 超过重试次数,放弃
c.queue.Forget(key)
utilruntime.HandleError(fmt.Errorf("处理 %v 失败: %v", key, err))
}
func (c *Controller) syncHandler(key string) error {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
pod, err := c.lister.Pods(namespace).Get(name)
if errors.IsNotFound(err) {
// Pod 已删除,正常情况
return nil
}
if err != nil {
return err
}
// 业务逻辑
fmt.Printf("Reconciling Pod: %s/%s\n", namespace, pod.Name)
return nil
}队列指标
WorkQueue 自动暴露 Prometheus 指标:
workqueue_depth{name="pods"} # 队列深度
workqueue_adds_total{name="pods"} # 入队总数
workqueue_queue_duration_seconds{name="pods"} # 在队列中等待时长
workqueue_work_duration_seconds{name="pods"} # 处理耗时
workqueue_retries_total{name="pods"} # 重试次数
workqueue_unfinished_work_seconds{name="pods"} # 未完成工作时长