Skip to content

WorkQueue 工作队列

为什么需要 WorkQueue

直接在 Informer 事件处理器中执行业务逻辑有以下问题:

  • 阻塞 Informer 的事件处理
  • 无法重试失败的操作
  • 短时间内多次更新同一对象会触发多次处理

WorkQueue 解决了这些问题:事件处理器只负责入队,Worker 协程异步消费。

队列类型

go
import "k8s.io/client-go/util/workqueue"

// 基础队列(去重)
queue := workqueue.New()

// 限速队列(推荐,支持指数退避重试)
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())

// 延迟队列
queue := workqueue.NewDelayingQueue()

// 命名队列(带 Prometheus 指标)
queue := workqueue.NewNamedRateLimitingQueue(
    workqueue.DefaultControllerRateLimiter(),
    "my-controller",
)

限速器

go
// 默认限速器(指数退避 + 令牌桶)
workqueue.DefaultControllerRateLimiter()
// 基础延迟 5ms,最大延迟 1000s,令牌桶 10 QPS,突发 100

// 自定义限速器
workqueue.NewItemExponentialFailureRateLimiter(
    5*time.Millisecond,  // 基础延迟
    1000*time.Second,    // 最大延迟
)

// 组合限速器
workqueue.NewMaxOfRateLimiter(
    workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
    &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
)

完整控制器示例

go
type Controller struct {
    clientset kubernetes.Interface
    lister    corev1listers.PodLister
    synced    cache.InformerSynced
    queue     workqueue.RateLimitingInterface
}

func NewController(clientset kubernetes.Interface, factory informers.SharedInformerFactory) *Controller {
    podInformer := factory.Core().V1().Pods()

    c := &Controller{
        clientset: clientset,
        lister:    podInformer.Lister(),
        synced:    podInformer.Informer().HasSynced,
        queue:     workqueue.NewNamedRateLimitingQueue(
            workqueue.DefaultControllerRateLimiter(), "pods"),
    }

    podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc:    c.enqueue,
        UpdateFunc: func(old, new interface{}) { c.enqueue(new) },
        DeleteFunc: c.enqueue,
    })

    return c
}

func (c *Controller) enqueue(obj interface{}) {
    key, err := cache.MetaNamespaceKeyFunc(obj)
    if err != nil {
        return
    }
    c.queue.Add(key)
}

func (c *Controller) Run(workers int, stopCh <-chan struct{}) {
    defer c.queue.ShutDown()

    if !cache.WaitForCacheSync(stopCh, c.synced) {
        return
    }

    for i := 0; i < workers; i++ {
        go wait.Until(c.runWorker, time.Second, stopCh)
    }

    <-stopCh
}

func (c *Controller) runWorker() {
    for c.processNextItem() {}
}

func (c *Controller) processNextItem() bool {
    key, quit := c.queue.Get()
    if quit {
        return false
    }
    defer c.queue.Done(key)

    err := c.syncHandler(key.(string))
    c.handleErr(err, key)
    return true
}

func (c *Controller) handleErr(err error, key interface{}) {
    if err == nil {
        c.queue.Forget(key)
        return
    }

    // 重试次数限制
    if c.queue.NumRequeues(key) < 5 {
        c.queue.AddRateLimited(key)
        return
    }

    // 超过重试次数,放弃
    c.queue.Forget(key)
    utilruntime.HandleError(fmt.Errorf("处理 %v 失败: %v", key, err))
}

func (c *Controller) syncHandler(key string) error {
    namespace, name, err := cache.SplitMetaNamespaceKey(key)
    if err != nil {
        return err
    }

    pod, err := c.lister.Pods(namespace).Get(name)
    if errors.IsNotFound(err) {
        // Pod 已删除,正常情况
        return nil
    }
    if err != nil {
        return err
    }

    // 业务逻辑
    fmt.Printf("Reconciling Pod: %s/%s\n", namespace, pod.Name)
    return nil
}

队列指标

WorkQueue 自动暴露 Prometheus 指标:

workqueue_depth{name="pods"}                    # 队列深度
workqueue_adds_total{name="pods"}               # 入队总数
workqueue_queue_duration_seconds{name="pods"}   # 在队列中等待时长
workqueue_work_duration_seconds{name="pods"}    # 处理耗时
workqueue_retries_total{name="pods"}            # 重试次数
workqueue_unfinished_work_seconds{name="pods"}  # 未完成工作时长

本站内容由 褚成志 整理编写,仅供学习参考