controller-runtime 框架深度解析
概述
controller-runtime 是 kubebuilder 和 Operator SDK 的底层框架,提供了构建 K8s 控制器所需的核心组件。
核心组件
Manager(管理器)
├── Client(读写 K8s 资源)
├── Cache(本地缓存,基于 Informer)
├── Scheme(类型注册)
├── Recorder(事件记录)
└── Controller(控制器)
├── Reconciler(业务逻辑)
├── Watches(监听资源)
└── Predicates(事件过滤)完整 Reconciler 示例
go
package controller
import (
"context"
"fmt"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
appsv1alpha1 "github.com/mycompany/my-operator/api/v1alpha1"
)
type MyAppReconciler struct {
client.Client
Scheme *runtime.Scheme
}
// +kubebuilder:rbac:groups=apps.mycompany.io,resources=myapps,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=apps.mycompany.io,resources=myapps/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;create;update;patch;delete
func (r *MyAppReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx)
// 1. 获取 CR
myApp := &appsv1alpha1.MyApp{}
if err := r.Get(ctx, req.NamespacedName, myApp); err != nil {
if errors.IsNotFound(err) {
return ctrl.Result{}, nil // CR 已删除,忽略
}
return ctrl.Result{}, err
}
// 2. 处理删除(Finalizer)
if !myApp.DeletionTimestamp.IsZero() {
return r.handleDeletion(ctx, myApp)
}
// 3. 添加 Finalizer
if !controllerutil.ContainsFinalizer(myApp, "apps.mycompany.io/finalizer") {
controllerutil.AddFinalizer(myApp, "apps.mycompany.io/finalizer")
if err := r.Update(ctx, myApp); err != nil {
return ctrl.Result{}, err
}
}
// 4. Reconcile Deployment
if err := r.reconcileDeployment(ctx, myApp); err != nil {
// 更新 Status 为 Failed
myApp.Status.Phase = "Failed"
r.Status().Update(ctx, myApp)
return ctrl.Result{}, err
}
// 5. Reconcile Service
if err := r.reconcileService(ctx, myApp); err != nil {
return ctrl.Result{}, err
}
// 6. 更新 Status
myApp.Status.Phase = "Running"
if err := r.Status().Update(ctx, myApp); err != nil {
return ctrl.Result{}, err
}
logger.Info("Reconcile 完成", "name", myApp.Name)
return ctrl.Result{RequeueAfter: 30 * time.Second}, nil
}
func (r *MyAppReconciler) reconcileDeployment(ctx context.Context, myApp *appsv1alpha1.MyApp) error {
deploy := &appsv1.Deployment{}
err := r.Get(ctx, client.ObjectKey{Name: myApp.Name, Namespace: myApp.Namespace}, deploy)
if errors.IsNotFound(err) {
// 创建 Deployment
deploy = r.buildDeployment(myApp)
// 设置 OwnerReference(级联删除)
ctrl.SetControllerReference(myApp, deploy, r.Scheme)
return r.Create(ctx, deploy)
}
if err != nil {
return err
}
// 更新 Deployment
deploy.Spec.Replicas = &myApp.Spec.Replicas
deploy.Spec.Template.Spec.Containers[0].Image = myApp.Spec.Image
return r.Update(ctx, deploy)
}
func (r *MyAppReconciler) buildDeployment(myApp *appsv1alpha1.MyApp) *appsv1.Deployment {
labels := map[string]string{"app": myApp.Name}
return &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: myApp.Name,
Namespace: myApp.Namespace,
},
Spec: appsv1.DeploymentSpec{
Replicas: &myApp.Spec.Replicas,
Selector: &metav1.LabelSelector{MatchLabels: labels},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{Labels: labels},
Spec: corev1.PodSpec{
Containers: []corev1.Container{{
Name: "app",
Image: myApp.Spec.Image,
}},
},
},
},
}
}
// 注册 Controller
func (r *MyAppReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&appsv1alpha1.MyApp{}). // 主要监听资源
Owns(&appsv1.Deployment{}). // 监听拥有的 Deployment
Owns(&corev1.Service{}). // 监听拥有的 Service
WithOptions(controller.Options{
MaxConcurrentReconciles: 3, // 并发 Reconcile 数
}).
Complete(r)
}Predicates 事件过滤
go
import "sigs.k8s.io/controller-runtime/pkg/predicate"
ctrl.NewControllerManagedBy(mgr).
For(&appsv1alpha1.MyApp{},
builder.WithPredicates(
// 只处理 Generation 变化(忽略 Status 更新)
predicate.GenerationChangedPredicate{},
),
).
Watches(
&corev1.ConfigMap{},
handler.EnqueueRequestsFromMapFunc(r.findMyAppsForConfigMap),
builder.WithPredicates(predicate.ResourceVersionChangedPredicate{}),
).
Complete(r)返回值控制
go
// 立即重新入队
return ctrl.Result{Requeue: true}, nil
// 延迟重新入队
return ctrl.Result{RequeueAfter: 30 * time.Second}, nil
// 不重新入队(成功或不可恢复错误)
return ctrl.Result{}, nil
// 错误重试(指数退避)
return ctrl.Result{}, err