Skip to content

controller-runtime 框架深度解析

概述

controller-runtime 是 kubebuilder 和 Operator SDK 的底层框架,提供了构建 K8s 控制器所需的核心组件。

核心组件

Manager(管理器)
├── Client(读写 K8s 资源)
├── Cache(本地缓存,基于 Informer)
├── Scheme(类型注册)
├── Recorder(事件记录)
└── Controller(控制器)
    ├── Reconciler(业务逻辑)
    ├── Watches(监听资源)
    └── Predicates(事件过滤)

完整 Reconciler 示例

go
package controller

import (
    "context"
    "fmt"

    appsv1 "k8s.io/api/apps/v1"
    corev1 "k8s.io/api/core/v1"
    "k8s.io/apimachinery/pkg/api/errors"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/apimachinery/pkg/runtime"
    ctrl "sigs.k8s.io/controller-runtime"
    "sigs.k8s.io/controller-runtime/pkg/client"
    "sigs.k8s.io/controller-runtime/pkg/log"

    appsv1alpha1 "github.com/mycompany/my-operator/api/v1alpha1"
)

type MyAppReconciler struct {
    client.Client
    Scheme *runtime.Scheme
}

// +kubebuilder:rbac:groups=apps.mycompany.io,resources=myapps,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=apps.mycompany.io,resources=myapps/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;create;update;patch;delete

func (r *MyAppReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
    logger := log.FromContext(ctx)

    // 1. 获取 CR
    myApp := &appsv1alpha1.MyApp{}
    if err := r.Get(ctx, req.NamespacedName, myApp); err != nil {
        if errors.IsNotFound(err) {
            return ctrl.Result{}, nil  // CR 已删除,忽略
        }
        return ctrl.Result{}, err
    }

    // 2. 处理删除(Finalizer)
    if !myApp.DeletionTimestamp.IsZero() {
        return r.handleDeletion(ctx, myApp)
    }

    // 3. 添加 Finalizer
    if !controllerutil.ContainsFinalizer(myApp, "apps.mycompany.io/finalizer") {
        controllerutil.AddFinalizer(myApp, "apps.mycompany.io/finalizer")
        if err := r.Update(ctx, myApp); err != nil {
            return ctrl.Result{}, err
        }
    }

    // 4. Reconcile Deployment
    if err := r.reconcileDeployment(ctx, myApp); err != nil {
        // 更新 Status 为 Failed
        myApp.Status.Phase = "Failed"
        r.Status().Update(ctx, myApp)
        return ctrl.Result{}, err
    }

    // 5. Reconcile Service
    if err := r.reconcileService(ctx, myApp); err != nil {
        return ctrl.Result{}, err
    }

    // 6. 更新 Status
    myApp.Status.Phase = "Running"
    if err := r.Status().Update(ctx, myApp); err != nil {
        return ctrl.Result{}, err
    }

    logger.Info("Reconcile 完成", "name", myApp.Name)
    return ctrl.Result{RequeueAfter: 30 * time.Second}, nil
}

func (r *MyAppReconciler) reconcileDeployment(ctx context.Context, myApp *appsv1alpha1.MyApp) error {
    deploy := &appsv1.Deployment{}
    err := r.Get(ctx, client.ObjectKey{Name: myApp.Name, Namespace: myApp.Namespace}, deploy)

    if errors.IsNotFound(err) {
        // 创建 Deployment
        deploy = r.buildDeployment(myApp)
        // 设置 OwnerReference(级联删除)
        ctrl.SetControllerReference(myApp, deploy, r.Scheme)
        return r.Create(ctx, deploy)
    }
    if err != nil {
        return err
    }

    // 更新 Deployment
    deploy.Spec.Replicas = &myApp.Spec.Replicas
    deploy.Spec.Template.Spec.Containers[0].Image = myApp.Spec.Image
    return r.Update(ctx, deploy)
}

func (r *MyAppReconciler) buildDeployment(myApp *appsv1alpha1.MyApp) *appsv1.Deployment {
    labels := map[string]string{"app": myApp.Name}
    return &appsv1.Deployment{
        ObjectMeta: metav1.ObjectMeta{
            Name:      myApp.Name,
            Namespace: myApp.Namespace,
        },
        Spec: appsv1.DeploymentSpec{
            Replicas: &myApp.Spec.Replicas,
            Selector: &metav1.LabelSelector{MatchLabels: labels},
            Template: corev1.PodTemplateSpec{
                ObjectMeta: metav1.ObjectMeta{Labels: labels},
                Spec: corev1.PodSpec{
                    Containers: []corev1.Container{{
                        Name:  "app",
                        Image: myApp.Spec.Image,
                    }},
                },
            },
        },
    }
}

// 注册 Controller
func (r *MyAppReconciler) SetupWithManager(mgr ctrl.Manager) error {
    return ctrl.NewControllerManagedBy(mgr).
        For(&appsv1alpha1.MyApp{}).           // 主要监听资源
        Owns(&appsv1.Deployment{}).            // 监听拥有的 Deployment
        Owns(&corev1.Service{}).               // 监听拥有的 Service
        WithOptions(controller.Options{
            MaxConcurrentReconciles: 3,        // 并发 Reconcile 数
        }).
        Complete(r)
}

Predicates 事件过滤

go
import "sigs.k8s.io/controller-runtime/pkg/predicate"

ctrl.NewControllerManagedBy(mgr).
    For(&appsv1alpha1.MyApp{},
        builder.WithPredicates(
            // 只处理 Generation 变化(忽略 Status 更新)
            predicate.GenerationChangedPredicate{},
        ),
    ).
    Watches(
        &corev1.ConfigMap{},
        handler.EnqueueRequestsFromMapFunc(r.findMyAppsForConfigMap),
        builder.WithPredicates(predicate.ResourceVersionChangedPredicate{}),
    ).
    Complete(r)

返回值控制

go
// 立即重新入队
return ctrl.Result{Requeue: true}, nil

// 延迟重新入队
return ctrl.Result{RequeueAfter: 30 * time.Second}, nil

// 不重新入队(成功或不可恢复错误)
return ctrl.Result{}, nil

// 错误重试(指数退避)
return ctrl.Result{}, err

本站内容由 褚成志 整理编写,仅供学习参考