第一部分 redis入门

  • redis介绍
  • 在k8s集群里部署redis集群
  • redis/redis集群的日常操作

第二部分 redisOperator开发

  • 创建项目
  • 编写业务逻辑
  • 测试

第一部分 redis入门

一、redis介绍

redis官网地址:redis.io

Redis 是一个高性能的key-value数据库。它支持存储的value类型相对更多,包括string(字符串)、list(链表)、set(集合)、zset(sorted set —有序集合)和hash(哈希类型)。
Redis支持主从同步。
Redis数据存在内存中,支持数据持久化—定时或定量写入磁盘.
Redis支持原子操作(要么成功执行要么失败完全不执行)。
Redis支持数据备份,即master-slave模式的数据备份。

性能极高,官方测试:完成了50个并发执行100000个请求。设置和获取的值是一个256字节字符串。结果读的速度是110000次/s,写的速度是81000次/s。

redisClusterOperator开发 - 图1

应用场景
关于Redis的应用场景,每个人有不同的心得,笔者选取了一些比较经典的场景,包括分布式锁、有序集合、管道与事务、Lua脚本以及最新支持的Streams进行介绍。

二、在k8s集群里部署redis集群

  1. 准备一个k8s集群;
  2. 在集群中准备好storageClass;
  3. 编写redis-statefulset.yaml;
  4. 初始化redis集群.

image.pngvim redis-statefulset.yaml

  1. ---
  2. apiVersion: v1
  3. kind: ConfigMap
  4. metadata:
  5. name: redis-cluster
  6. namespace: redis
  7. data:
  8. update-node.sh: |
  9. #!/bin/sh
  10. REDIS_NODES="/data/nodes.conf"
  11. sed -i -e "/myself/ s/[0-9]\{1,3\}\.[0-9]\{1,3\}\.[0-9]\{1,3\}\.[0-9]\{1,3\}/${POD_IP}/" ${REDIS_NODES}
  12. exec "$@"
  13. redis.conf: |+
  14. bind 0.0.0.0
  15. port 6379
  16. cluster-enabled yes
  17. cluster-require-full-coverage no
  18. cluster-node-timeout 15000
  19. cluster-config-file /data/nodes.conf
  20. cluster-migration-barrier 1
  21. appendonly yes
  22. protected-mode no
  23. ---
  24. apiVersion: apps/v1
  25. kind: StatefulSet
  26. metadata:
  27. name: redis-cluster
  28. namespace: redis
  29. spec:
  30. serviceName: redis-cluster
  31. replicas: 6
  32. selector:
  33. matchLabels:
  34. app: redis-cluster
  35. template:
  36. metadata:
  37. labels:
  38. app: redis-cluster
  39. spec:
  40. containers:
  41. - name: redis
  42. image: redis:6.2.4
  43. ports:
  44. - containerPort: 6379
  45. name: client
  46. - containerPort: 16379
  47. name: gossip
  48. command: ["/conf/update-node.sh", "redis-server", "/conf/redis.conf"]
  49. env:
  50. - name: POD_IP
  51. valueFrom:
  52. fieldRef:
  53. fieldPath: status.podIP
  54. volumeMounts:
  55. - name: conf
  56. mountPath: /conf
  57. readOnly: false
  58. - name: data
  59. mountPath: /data
  60. readOnly: false
  61. volumes:
  62. - name: conf
  63. configMap:
  64. name: redis-cluster
  65. defaultMode: 0755
  66. volumeClaimTemplates:
  67. - metadata:
  68. name: data
  69. namespace: test-db
  70. spec:
  71. accessModes: [ "ReadWriteOnce" ]
  72. resources:
  73. requests:
  74. storage: 5Gi
  75. storageClassName: nfs
  76. volumeMode: Filesystem
  77. ---
  78. apiVersion: v1
  79. kind: Service
  80. metadata:
  81. name: redis-cluster
  82. namespace: redis
  83. spec:
  84. type: NodePort
  85. ports:
  86. - port: 6379
  87. targetPort: 6379
  88. name: client
  89. - port: 16379
  90. targetPort: 16379
  91. name: gossip
  92. selector:
  93. app: redis-cluster
  1. [root@master1 home]#
  2. [root@master1 home]# kubectl apply -f redis-statefulset.yaml
  3. configmap/redis-cluster created
  4. statefulset.apps/redis-cluster created
  5. service/redis-cluster created
  6. [root@master1 home]#
  7. [root@master1 home]# kubectl get pods -n redis
  8. NAME READY STATUS RESTARTS AGE
  9. redis-cluster-0 1/1 Running 0 3s
  10. redis-cluster-1 0/1 Pending 0 1s
  11. [root@master1 home]# kubectl get pods -n redis
  12. NAME READY STATUS RESTARTS AGE
  13. redis-cluster-0 1/1 Running 0 98s
  14. redis-cluster-1 1/1 Running 0 96s
  15. redis-cluster-2 1/1 Running 0 93s
  16. redis-cluster-3 1/1 Running 0 90s
  17. redis-cluster-4 1/1 Running 0 85s
  18. redis-cluster-5 1/1 Running 0 82s
  19. [root@master1 home]#

初始化redis集群

  1. [root@master1 home]# podips=`kubectl get pods -l app=redis-cluster -n redis -o jsonpath='{range.items[*]}{.status.podIP}:6379 '`
  2. [root@master1 home]# podips=${podips% :*}
  3. [root@master1 home]# kubectl exec -it redis-cluster-0 -n redis -- redis-cli --cluster create --cluster-replicas 1 $podips
  4. >>> Performing hash slots allocation on 6 nodes...
  5. Master[0] -> Slots 0 - 5460
  6. Master[1] -> Slots 5461 - 10922
  7. Master[2] -> Slots 10923 - 16383
  8. ···
  9. Can I set the above configuration? (type 'yes' to accept): yes
  10. ···
  11. [OK] All nodes agree about slots configuration.
  12. >>> Check for open slots...
  13. >>> Check slots coverage...
  14. [OK] All 16384 slots covered.
  15. [root@master1 home]#
  16. [root@master1 home]# kubectl exec -it redis-cluster-0 -n redis -- redis-cli CLUSTER NODES
  17. d43c1be85889411d105a06c1871ebe912137f93c 10.244.2.43:6379@16379 slave 2b5cdf33140f27bb7eb413e28c47bd6b2558d169 0 1646135095925 1 connected
  18. 0da6119ddc95176e47cf8f8de54654d9933941e9 10.244.2.41:6379@16379 master - 0 1646135094914 2 connected 5461-10922
  19. 8a828910d7383af547a3b3759664a221ab3e0d57 10.244.2.42:6379@16379 master - 0 1646135094000 3 connected 10923-16383
  20. 2b5cdf33140f27bb7eb413e28c47bd6b2558d169 10.244.2.40:6379@16379 myself,master - 0 1646135094000 1 connected 0-5460
  21. 317e93aa6b1335c0082d40908386717339e284e3 10.244.2.44:6379@16379 slave 0da6119ddc95176e47cf8f8de54654d9933941e9 0 1646135095000 2 connected
  22. 5679479f6825b235fb92d1a82179bc0e11368e9e 10.244.1.92:6379@16379 slave 8a828910d7383af547a3b3759664a221ab3e0d57 0 1646135092895 3 connected
  23. [root@master1 home]#

三、redis/redis集群的日常操作

集群扩容-添加节点

  1. [root@master1 home]# kubectl scale --replicas=8 statefulset/redis-cluster -n redis
  2. statefulset.apps/redis-cluster scaled
  3. [root@master1 home]# kubectl get pods -n redis
  4. NAME READY STATUS RESTARTS AGE
  5. redis-cluster-0 1/1 Running 0 13m
  6. redis-cluster-1 1/1 Running 0 13m
  7. redis-cluster-2 1/1 Running 0 13m
  8. redis-cluster-3 1/1 Running 0 13m
  9. redis-cluster-4 1/1 Running 0 13m
  10. redis-cluster-5 1/1 Running 0 13m
  11. redis-cluster-6 1/1 Running 0 13s
  12. redis-cluster-7 1/1 Running 0 8s
  1. (1)获取信息
  2. # redis-cluster-0这个pod的ip
  3. podIP0=`kubectl get pods redis-cluster-0 -n redis -o wide | awk 'NR>1{print $6}'`
  4. podIP6=`kubectl get pods redis-cluster-6 -n redis -o wide | awk 'NR>1{print $6}'`
  5. podIP7=`kubectl get pods redis-cluster-7 -n redis -o wide | awk 'NR>1{print $6}'`
  6. # redis节点: redis-cluster-0的node ID
  7. cluster0ID=`kubectl exec -it redis-cluster-0 -n redis -- redis-cli CLUSTER NODES | grep "$podIP0" | awk '{print $1}'`
  8. newMaterID=`kubectl exec -it redis-cluster-0 -n redis -- redis-cli CLUSTER NODES | grep "$podIP6" | awk '{print $1}'`
  9. newSlaveID=`kubectl exec -it redis-cluster-0 -n redis -- redis-cli CLUSTER NODES | grep "$podIP7" | awk '{print $1}'`
  10. (2)添加新节点
  11. # 第一个参数是新节点的ip:port,第二个参数是任意已存在节点的ip:port
  12. kubectl exec -it redis-cluster-0 -n redis -- redis-cli --cluster add-node $podIP6:6379 $podIP0:6379
  13. # 最后一个参数是新master的node ID
  14. kubectl exec -it redis-cluster-0 -n redis -- redis-cli --cluster add-node $podIP7:6379 $podIP0:6379 --cluster-slave --cluster-master-id "$newMaterID"
  15. (3)分配哈希槽slot
  16. # 均衡slot,最后一个参数为任意节点的ip:port
  17. kubectl exec -it redis-cluster-0 -n redis -- redis-cli --cluster rebalance --cluster-threshold 1 --cluster-use-empty-masters $podIP0:6379
  18. kubectl exec -it redis-cluster-0 -n redis -- redis-cli --cluster info $podIP0:6379

集群缩容-移除节点

  1. (1)移除从节点
  2. # 用del-node参数从节点可以直接移除
  3. kubectl exec -it redis-cluster-0 -n redis -- redis-cli --cluster del-node $podIP0:6379 $newSlaveID
  4. (2)移除主节点
  5. # 获取要删除的主节点上存在的slot数
  6. cluster6slot=`kubectl exec -it redis-cluster-0 -n redis -- redis-cli --cluster info $podIP0:6379 | grep $podIP6 | awk '{print $7}'`
  7. # 迁移slot
  8. kubectl exec -it redis-cluster-0 -n redis -- redis-cli --cluster reshard --cluster-from $newMaterID --cluster-to $cluster0ID --cluster-slots $cluster6slot $podIP0:6379
  9. # 迁移完slot后,可以把主节点删除。$podIP0:6379是其他任意节点ip:port,$newMaterID是要移除的主节点node ID
  10. kubectl exec -it redis-cluster-0 -n redis -- redis-cli --cluster del-node $podIP0:6379 $newMaterID
  11. # 节点从集群中forget后,可以节点下线了
  12. kubectl scale --replicas=8 sts redis-cluster -n redis
  13. # 删除对应的pvc
  14. kubectl delete pvc redis-data-redis-cluster-6 redis-data-redis-cluster-7 -n redis

集群备份

  1. 待补充

集群恢复

  1. 待补充

手动均衡slot槽

  1. 待补充

第一部分和第二部分没有绝对的联系,第一部分仅作为对redis极其安装的了解,为第二部分redisClusterOperator开始打好基础。

第二部分 redisClusterOperator开发

一、创建项目

1.1 预设CRD资源

比如我们想要通过下面的 CRD 资源来创建对应的 redis 集群:

  1. apiVersion: redis.fuyu.io/v1alpha1
  2. kind: RedisCluster
  3. metadata:
  4. name: redis-cluster
  5. spec:
  6. size: 6 # 副本数量,须为偶数
  7. image: redis:6.2.4 # 镜像
  8. storageClassName: nfs
  9. storage: 5Gi

1.2 初始化项目

  1. mkdir -p gitee.com/fym89/redis-operator && cd gitee.com/fym89/redis-operator
  2. export GO111MODULE=on
  3. export GOPROXY=https://goproxy.cn
  4. kubebuilder init --domain fuyu.io --owner fuyu --repo gitee.com/fym89/redis-operator
  5. kubebuilder create api --group redis --version v1alpha1 --kind RedisCluster --resource=true --controller=true

二、编写业务逻辑

2.1 基础修改

2.1.1 修改config/samples/redis_v1alpha1_rediscluster.yaml

  1. apiVersion: redis.fuyu.io/v1alpha1
  2. kind: RedisCluster
  3. metadata:
  4. name: redis-cluster
  5. spec:
  6. size: 6 # 副本数量,须为偶数
  7. image: redis:6.2.4 # 镜像
  8. storageClassName: nfs
  9. storage: 5Gi

和我们预设的crd资源清单保持格式一致。

2.1.2 在main.go里添加日志初始化

  1. if err = (&controllers.RedisClusterReconciler{
  2. Client: mgr.GetClient(),
  3. Log: ctrl.Log.WithName("controllers").WithName("RedisCluster"), // 初始化日志
  4. Scheme: mgr.GetScheme(),
  5. }).SetupWithManager(mgr); err != nil {
  6. setupLog.Error(err, "unable to create controller", "controller", "RedisCluster")
  7. os.Exit(1)
  8. }

2.1.3 修改api/v1alpha1/rediscluster_types.go
在 RedisClusterSpec struct 结构体里添加yaml解析字段,和预设CRD资源清单保持一致。

  1. type RedisClusterSpec struct {
  2. // INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
  3. // Important: Run "make" to regenerate code after modifying this file
  4. Size *int32 `json:"size"`
  5. Image string `json:"image"`
  6. StorageClassName string `json:"storageclassname,omitempty"`
  7. Storage string `json:"storage,omitempty"`
  8. }

修改完这个文件,先编译一下。

  1. [root@master1 redis-operator]# make
  2. /home/gitee.com/fym89/redis-operator/bin/controller-gen object:headerFile="hack/boilerplate.go.txt" paths="./..."
  3. go fmt ./...
  4. go vet ./...
  5. go build -o bin/manager main.go
  6. [root@master1 redis-operator]#

2.1.3 修改controllers/rediscluster_controller.go
通用业务逻辑 — 调谐部分 - Reconcile函数

/*
Copyright 2022 fuyu.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package controllers

import (
    "github.com/go-logr/logr"
    "golang.org/x/net/context"
    appsv1 "k8s.io/api/apps/v1"
    corev1 "k8s.io/api/core/v1"
    "k8s.io/apimachinery/pkg/runtime"
    ctrl "sigs.k8s.io/controller-runtime"
    "sigs.k8s.io/controller-runtime/pkg/client"
    "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

    redisv1alpha1 "gitee.com/fym89/redis-operator/api/v1alpha1"
)

var initClusterEnd = false

// RedisClusterReconciler reconciles a RedisCluster object
type RedisClusterReconciler struct {
    client.Client
    Log    logr.Logger
    Scheme *runtime.Scheme
}

//+kubebuilder:rbac:groups=apps,resources=statefulsets,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=core,resources=configmaps,verbs=get;list;watch;create;update;patch;delete

//+kubebuilder:rbac:groups=redis.fuyu.io,resources=redisclusters,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=redis.fuyu.io,resources=redisclusters/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=redis.fuyu.io,resources=redisclusters/finalizers,verbs=update

// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
// TODO(user): Modify the Reconcile function to compare the state specified by
// the RedisCluster object against the actual cluster state, and then
// perform operations to make the cluster state reflect the state specified by
// the user.
//
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.10.0/pkg/reconcile
func (r *RedisClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
    ctx = context.Background()
    log := r.Log.WithValues("RedisCluster", req.NamespacedName)

    // 获取RedisCluster实例
    var redisCluster redisv1alpha1.RedisCluster

    if err := r.Get(ctx, req.NamespacedName, &redisCluster); err != nil {
        // 如果RedisCluster是被删了,那么应该忽略掉
        return ctrl.Result{}, client.IgnoreNotFound(err)
    }

    // 创建对应的 statefulSet 和 svc
    // CreateOrUpdate svc
    var svc corev1.Service
    svc.Name = redisCluster.Name
    svc.Namespace = redisCluster.Namespace
    result, err := ctrl.CreateOrUpdate(ctx, r.Client, &svc, func() error {
        // 拼接 service
        MutateService(&redisCluster, &svc)
        return controllerutil.SetControllerReference(&redisCluster, &svc, r.Scheme)
    })
    if err != nil {
        return ctrl.Result{}, err
    }
    log.Info("CreateOrUpdate Result", "Service", result)

    // CreateOrUpdate statefulSet
    var sts appsv1.StatefulSet
    sts.Name = redisCluster.Name
    sts.Namespace = redisCluster.Namespace
    result, err = ctrl.CreateOrUpdate(ctx, r.Client, &sts, func() error {
        // 去拼接 StatefulSet
        MutateStatefulSet(&redisCluster, &sts)
        return controllerutil.SetControllerReference(&redisCluster, &sts, r.Scheme)
    })
    if err != nil {
        return ctrl.Result{}, err
    }

    log.Info("CreateOrUpdate Result", "StatefulSet", result)

    // CreateOrUpdate ConfigMap
    var cm corev1.ConfigMap
    cm.Name = redisCluster.Name
    cm.Namespace = redisCluster.Namespace
    result, err = ctrl.CreateOrUpdate(ctx, r.Client, &cm, func() error {
        // 去拼接ConfigMap
        MutateConfigMap(&cm)
        return controllerutil.SetControllerReference(&redisCluster, &cm, r.Scheme)
    })
    if err != nil {
        return ctrl.Result{}, err
    }

    log.Info("CreateOrUpdate Result", "ConfigMap", result)

    // 初始化redis集群。如果已经初始化成功,initClusterEnd会变成true,将不再进入初始化操作。
    if !initClusterEnd {
        if InitRedisCluster(&redisCluster, r.Log) {
            log.Info("InitCluster Result", "InitCluster", "redis集群初始化完成")
            initClusterEnd = true
        }
    }

    // redis集群扩容:添加redis新节点
    if initClusterEnd {
        if result, err := AddRedisNode(&redisCluster, log); result == "失败" && err != nil {
            log.Info("AddRedisNode Result", "AddRedisNode", "失败")
        }
    }

    return ctrl.Result{}, nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *RedisClusterReconciler) SetupWithManager(mgr ctrl.Manager) error {
    return ctrl.NewControllerManagedBy(mgr).
        For(&redisv1alpha1.RedisCluster{}).
        Owns(&appsv1.StatefulSet{}).
        Owns(&corev1.Service{}).
        Owns(&corev1.ConfigMap{}).
        Complete(r)
}

2.2 核心代码

2.2.1 编辑controllers/resources.go
差异化逻辑1 - 添加MutateService和MutateStatefulSet、MutateConfigMap方法的实现。本质上就是拼接service、statefulset、configmap资源的各个字段信息。

 package controllers

import (
    "fmt"
    "gitee.com/fym89/redis-operator/api/v1alpha1"
    appsv1 "k8s.io/api/apps/v1"
    corev1 "k8s.io/api/core/v1"
    "k8s.io/apimachinery/pkg/api/resource"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

var (
    RedisClusterCommonLabelKey = "app"
    RedisClusterLabelKey       = "redis.fuyu.io/cluster"
    RedisDataDirName           = "data"
)

func MutateService(cluster *v1alpha1.RedisCluster, svc *corev1.Service) {
    svc.Labels = map[string]string{
        RedisClusterCommonLabelKey: "redis-cluster",
    }
    svc.Spec = corev1.ServiceSpec{
        ClusterIP: corev1.ClusterIPNone,
        Selector: map[string]string{
            RedisClusterLabelKey: cluster.Name,
        },
        Ports: []corev1.ServicePort{corev1.ServicePort{
            Name: "client",
            Port: 6379,
        }, corev1.ServicePort{
            Name: "gossip",
            Port: 16379,
        }},
    }
}

func MutateStatefulSet(cluster *v1alpha1.RedisCluster, sts *appsv1.StatefulSet) {
    scriptMode := int32(0755)
    storageClassName := "nfs"
    if cluster.Spec.Storage == "" {
        cluster.Spec.Storage = "1Gi"
    }
    if cluster.Spec.StorageClassName == nil {
        cluster.Spec.StorageClassName = &storageClassName
    }

    sts.Labels = map[string]string{
        RedisClusterCommonLabelKey: "redis-cluster",
    }
    sts.Spec = appsv1.StatefulSetSpec{
        Replicas:    cluster.Spec.Size,
        ServiceName: cluster.Name,
        Selector: &metav1.LabelSelector{MatchLabels: map[string]string{
            RedisClusterCommonLabelKey: cluster.Name,
        }},
        Template: corev1.PodTemplateSpec{
            ObjectMeta: metav1.ObjectMeta{
                Labels: map[string]string{
                    RedisClusterCommonLabelKey: cluster.Name,
                },
            },
            Spec: corev1.PodSpec{
                Containers: newContainers(cluster),
                Volumes: []corev1.Volume{
                    corev1.Volume{
                        Name: "conf",
                        VolumeSource: corev1.VolumeSource{
                            ConfigMap: &corev1.ConfigMapVolumeSource{
                                LocalObjectReference: corev1.LocalObjectReference{
                                    Name: sts.Name,
                                },
                                DefaultMode: &scriptMode,
                            },
                        },
                    },
                },
            },
        },
        VolumeClaimTemplates: []corev1.PersistentVolumeClaim{
            corev1.PersistentVolumeClaim{
                ObjectMeta: metav1.ObjectMeta{
                    Name: RedisDataDirName,
                },
                Spec: corev1.PersistentVolumeClaimSpec{
                    AccessModes: []corev1.PersistentVolumeAccessMode{
                        corev1.ReadWriteOnce,
                    },
                    Resources: corev1.ResourceRequirements{
                        Requests: corev1.ResourceList{
                            corev1.ResourceStorage: resource.MustParse(cluster.Spec.Storage),
                        },
                    },
                    StorageClassName: cluster.Spec.StorageClassName,
                },
            },
        },
    }
}

func newContainers(cluster *v1alpha1.RedisCluster) []corev1.Container {
    return []corev1.Container{
        corev1.Container{
            Name:            "redis",
            Image:           cluster.Spec.Image,
            ImagePullPolicy: corev1.PullIfNotPresent,
            Ports: []corev1.ContainerPort{
                corev1.ContainerPort{
                    Name:          "client",
                    ContainerPort: 6379,
                },
                corev1.ContainerPort{
                    Name:          "gossip",
                    ContainerPort: 16379,
                },
            },
            VolumeMounts: []corev1.VolumeMount{
                corev1.VolumeMount{
                    Name:      RedisDataDirName,
                    MountPath: "/data",
                    ReadOnly:  false,
                },
                corev1.VolumeMount{
                    Name:      "conf",
                    MountPath: "/conf",
                    ReadOnly:  false,
                },
            },
            Env: []corev1.EnvVar{
                corev1.EnvVar{
                    Name: "POD_IP",
                    ValueFrom: &corev1.EnvVarSource{
                        FieldRef: &corev1.ObjectFieldSelector{
                            FieldPath: "status.podIP",
                        },
                    },
                },
            },
            Command: []string{
                "/conf/update-node.sh",
                "redis-server",
                "/conf/redis.conf",
            },
        },
    }
}

func MutateConfigMap(cm *corev1.ConfigMap) {
    upDataNodeSh := fmt.Sprintf(`#!/bin/sh
REDIS_NODES="/data/nodes.conf"
sed -i -e "/myself/ s/[0-9]\{1,3\}\.[0-9]\{1,3\}\.[0-9]\{1,3\}\.[0-9]\{1,3\}/${POD_IP}/" ${REDIS_NODES}
exec "$@"`)

    // redis 配置文件
    configFile := fmt.Sprintf(`bind 0.0.0.0
port 6379
cluster-enabled yes
cluster-require-full-coverage no
cluster-node-timeout 15000
cluster-config-file /data/nodes.conf
cluster-migration-barrier 1
appendonly yes
protected-mode no`)

    // redis cluster 初始化脚本
    initClusterSh := fmt.Sprintf(`#!/bin/bash
echo yes | redis-cli --cluster create --cluster-replicas 1 $1`)

    cm.Data = map[string]string{
        "update-node.sh": upDataNodeSh,
        "redis.conf":     configFile,
        "initCluster.sh": initClusterSh,
    }
}

2.2.2 编辑controllers/initRedisCluster.go
差异化逻辑2 - 这里是初始化redis cluster集群的核心代码。初始化redis集群的逻辑为:读取kubeconfig —> 创建连接客户端clientset —> 获取redis pod列表 —> 判断所有redis pod是否为Running —> exec进入last pod执行date命令再次判断最后一个pod是否准备就绪 —> 判断是否redis cluster已经初始化 —> 初始化redis cluster。
(初来咋到,代码逻辑有点乱,代码能力也比较菜,敬请包容与谅解,后续持续优化中···)

package controllers

import (
    "bytes"
    "context"
    "fmt"
    "gitee.com/fym89/redis-operator/api/v1alpha1"
    "github.com/go-logr/logr"
    "io"
    corev1 "k8s.io/api/core/v1"
    metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/kubernetes/scheme"
    "k8s.io/client-go/rest"
    "k8s.io/client-go/tools/clientcmd"
    "k8s.io/client-go/tools/remotecommand"
    "os"
    "path/filepath"
    "strconv"
    "strings"
)

// InitK8sClusterClient 初始化一个客户端用户连接k8s集群
func InitK8sClusterClient() (*rest.Config, error) {
    var config *rest.Config
    kubeConfig := filepath.Join("/root/.kube", "config")
    config, err := clientcmd.BuildConfigFromFlags("", kubeConfig)
    if err != nil {
        return nil, err
    }
    return config, nil
}

// getPodList 获取pod列表
func getPodList(cluster *v1alpha1.RedisCluster, log logr.Logger) *corev1.PodList {
    var err error
    var config *rest.Config

    config, err = InitK8sClusterClient()
    if err != nil {
        log.Error(err, "controllers.getPods", "初始化kubeConfig配置", "失败")
        return nil
    }
    clientset, err := kubernetes.NewForConfig(config)
    if err != nil {
        log.Error(err, "controllers.getPods", "使用kubeConfig配置初始化clientset", "失败")
        return nil
    }
    // 获取命名空间下的pod列表
    opts := metaV1.ListOptions{}
    pods, err := clientset.CoreV1().Pods(cluster.Namespace).List(context.Background(), opts)
    log.Info(fmt.Sprintf("等待所有pod就绪 ··· %s/%s", strconv.Itoa(len(pods.Items)), strconv.FormatInt(int64(*cluster.Spec.Size), 10)))
    if err != nil {
        log.Error(err, "controllers.getPodList", "获取pod列表", "失败")
        return nil
    }
    if pods == nil || len(pods.Items) == 0 {
        return nil
    }
    return pods
}

// execCommandsInPodContainer exec进入pod执行命令
func execCommandsInPodContainer(cluster *v1alpha1.RedisCluster, commands []string, podName string, log logr.Logger) string {
    config, err := InitK8sClusterClient()
    if err != nil {
        log.Error(err, "controllers.execCommandsInPodContainer", "初始化kubeConfig配置", "失败")
        return ""
    }
    clientset, err := kubernetes.NewForConfig(config)
    if err != nil {
        log.Error(err, "controllers.execCommandsInPodContainer", "使用kubeConfig配置初始化clientset", "失败")
        return ""
    }

    // 捕捉标准输出
    old := os.Stdout // keep backup of the real stdout
    reader, writer, _ := os.Pipe()
    os.Stdout = writer
    req := clientset.CoreV1().RESTClient().Post().
        Resource("pods").Name(podName).Namespace(cluster.Namespace).SubResource("exec").
        VersionedParams(&corev1.PodExecOptions{
            Command: commands,
            Stdin:   true,
            Stdout:  true,
            Stderr:  true,
            TTY:     true,
        }, scheme.ParameterCodec)
    exec, err := remotecommand.NewSPDYExecutor(config, "POST", req.URL())

    screen := struct {
        io.Reader
        io.Writer
    }{os.Stdin, os.Stdout}

    if err = exec.Stream(remotecommand.StreamOptions{
        Stdin:  screen,
        Stdout: screen,
        Stderr: screen,
        Tty:    false,
    }); err != nil {
        log.Error(err, "controllers.execCommandsInPodContainer", "进入pod执行命令", "失败")
    }
    outC := make(chan string)
    // copy the output in a separate goroutine so printing can't block indefinitely
    go func() {
        var buf bytes.Buffer
        _, _ = io.Copy(&buf, reader)
        outC <- buf.String()
    }()

    // back to normal state
    _ = writer.Close()
    os.Stdout = old // restoring the real stdout
    outText := <-outC
    results := fmt.Sprintf("命令: %s\n执行结果:\n%s", commands, outText)
    log.Info("controllers.execCommandsInPodContainer", "进入pod执行命令", "成功")
    fmt.Println(results)
    return outText
}

// InitRedisCluster 检查redis集群是否已经初始化
func InitRedisCluster(cluster *v1alpha1.RedisCluster, log logr.Logger) (initEnd bool) {
    // 获取pod列表
    pods := getPodList(cluster, log)
    if pods == nil {
        log.Info("controllers.InitRedisCluster", "获取所有pod副本", "失败")
        return false
    }
    log.Info("controllers.InitRedisCluster", "获取所有pod副本", "成功")
    // 检查所有pod状态是否都是Running
    for _, pod := range pods.Items {
        if string(pod.Status.Phase) != "Running" {
            log.Info("controllers.InitRedisCluster", "所有pod状态为Running", "false")
            return false
        }
    }
    log.Info("controllers.InitRedisCluster", "所有pod状态为Running", "true")

    // 检测最后一个pod是否已经正常
    // 拼接最后一个pod名称:如果sts名为redis-cluster,副本数size为6,最后一个pod名字为redis-cluster-5
    LastPodName := cluster.Name + "-" + fmt.Sprint(*cluster.Spec.Size-1)
    if result1 := execCommandsInPodContainer(cluster, []string{"date"}, LastPodName, log); result1 == "" {
        log.Info("controllers.InitRedisCluster", "最后一个pod是否接受访问", "false")
        return false
    }

    // 获取所有Pod IP
    IPs := ""
    for _, pod := range pods.Items {
        IPs = IPs + pod.Status.PodIP + ":6379 "
    }
    FirstPodName := cluster.Name + "-0"
    // 判断集群是否已经初始化
    if result := execCommandsInPodContainer(cluster, []string{"redis-cli", "cluster", "info"}, FirstPodName, log); result != "" {
        if strings.Contains(result, "cluster_state:fail") && strings.Contains(result, "cluster_size:0") {
            // 开始初始化
            result := execCommandsInPodContainer(cluster, []string{"bash", "/conf/initCluster.sh", IPs}, FirstPodName, log)
            if result == "" {
                log.Info("controllers.InitRedisCluster", "redis集群初始化结果", "failed")
                return false
            }
            log.Info("controllers.InitRedisCluster", "redis集群初始化结果", "succeed")
            initEnd = true
        }
        // log.Info("redis集群已经初始化。初始化命令返回结果:", "InitCluster", result)
        log.Info("controllers.InitRedisCluster", "redis集群已经初始化", "true")
        initEnd = true
    }
    return initEnd
}

2.2.3 编辑controllers/add_node.go
差异化逻辑3 - 这里主要实现对新的redis节点的添加。添加逻辑为:获取redis集群 And 当前命名空间podList —>
判断具有同Label的pod是否都在redis集群里 —> 不在的pod为newPods,newPods超过两个则开始启动添加redis节点 —> 加入new Master节点 —> 加入new slave节点 —> 均衡slot。

package controllers

import (
    "context"
    "gitee.com/fym89/redis-operator/api/v1alpha1"
    "github.com/go-logr/logr"
    metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "strings"
    "time"
)

/*
1、执行redis-cli CLUSTER NODES 获取当前redis集群所有节点的ip: RedisNodeIps;
2、获取命名空间下的所有标签app=redis-cluster的pod ip:AllPodIps;
3、判断AllPodIps里的ip是否有not in RedisNodeIps,如果有,说明扩容了,准备加入redis集群;
4、获取添加节点所需信息:Pod0IP(FirstPodName的ip),newPodIP(两个/偶数个),newMaterID;
5、添加新节点
*/

func AddRedisNode(cluster *v1alpha1.RedisCluster, log logr.Logger) (string, error) {
    FirstPodName1 := cluster.Name + "-0"

    // 1、执行redis-cli CLUSTER NODES 获取当前redis集群所有节点的ip: RedisNodeIps;
    RedisNodeInfo := execCommandsInPodContainer(cluster, []string{"redis-cli", "CLUSTER", "NODES"}, FirstPodName1, log)
    if RedisNodeInfo == "" {
        return "失败", nil
    }

    log.Info("controllers.AddRedisNode", "获取集群信息", "end")

    // 存储第一个redis节点的IP,用于后面加入集群准备。连续两次切割
    firstPodIP1 := strings.Split(strings.Split(RedisNodeInfo, "@16379")[0], " ")[1]

    // 2、获取命名空间下的所有标签app=redis-cluster的pod ip:AllPodIps;
    _, clientset, err := InitK8sClusterClient(log)
    if err != nil {
        return "失败", err
    }
    pods, err := clientset.CoreV1().Pods(cluster.Namespace).List(context.Background(), metaV1.ListOptions{})
    if err != nil {
        return "失败", err
    }

    // 3、判断AllPodIps里的ip是否有not in RedisNodeIps,如果有,说明扩容了,准备加入redis集群;
    var newPodIps, newPodNames []string // 获取newPodNames为了获取newMasterNodeID
    for _, pod := range pods.Items {
        if pod.Labels[RedisClusterCommonLabelKey] == cluster.Name && pod.Status.Phase == "Running" {
            if !strings.Contains(RedisNodeInfo, pod.Status.PodIP) {
                newPodIps = append(newPodIps, pod.Status.PodIP+":6379")
                newPodNames = append(newPodNames, pod.Name)
            }
        }
    }

    // 至少有两个新pod属于Running状态再加入成为Redis集群新节点
    if len(newPodIps) > 1 {
        // 获取 new master redis nodeID
        redisNodeInfo := execCommandsInPodContainer(cluster, []string{"redis-cli", "CLUSTER", "NODES"}, newPodNames[0], log)
        if redisNodeInfo == "" {
            return "失败", nil
        }
        newMasterNodeID := strings.Split(redisNodeInfo, " ")[0]

        log.Info("controllers.AddRedisNode", "开始添加redis新节点", newPodIps)
        // 开始添加新redis master节点
        if result := execCommandsInPodContainer(cluster, []string{
            "redis-cli",
            "--cluster",
            "add-node",
            newPodIps[0],
            firstPodIP1,
        }, FirstPodName1, log); result == "" {
            return "失败", nil
        }

        log.Info("controllers.AddRedisNode", "add new redis master node", "成功")

        time.Sleep(time.Second * 120)
        // 开始添加新redis slave节点
        if result := execCommandsInPodContainer(cluster, []string{
            "redis-cli",
            "--cluster",
            "add-node",
            newPodIps[1],
            firstPodIP1,
            "--cluster-slave",
            "--cluster-master-id",
            newMasterNodeID,
        }, FirstPodName1, log); result == "" || result == "command terminated with exit code 1" {
            log.Info("controllers.AddRedisNode", "添加slave-node失败", result)
            return "失败", nil
        }

        log.Info("controllers.AddRedisNode", "添加slave-node", "成功")

        // 均衡slot
        if result := execCommandsInPodContainer(cluster, []string{
            "redis-cli",
            "--cluster",
            "rebalance",
            "--cluster-threshold",
            "1",
            "--cluster-use-empty-masters",
            firstPodIP1,
        }, FirstPodName1, log); result == "" || result == "command terminated with exit code 1" {
            log.Info("controllers.AddRedisNode", "均衡slot失败", result)
            return "失败", nil
        }

        log.Info("controllers.AddRedisNode", "均衡slot", "成功")
    }
    return "成功", nil
}

三、测试

测试项:

  1. 代码能否正常编译运行;
  2. 代码正常运行后,创建RedisCluster资源,能否自动创建对应的service、statefulset、configmap资源;
  3. 分别手动删除service、statefulset、configmap,是否能自动watch并调谐(CreateOrUpdate)恢复期望状态;
  4. 待所有pod正常后,进入某个pod执行redis-cli cluster info,查看redis集群状态_state和大小_size是否正常;
  5. redis集群扩容 — 在线增加两个pod,待新pod正常后执行第4项测试;

(下面待实现部分···)

  1. redis pod副本数大小size只能是偶数判断 — 把副本数size设置为奇数,创建是否报错且提示?
  2. redis集群扩容 — kubectl scale —replicas=6 sts xxx 在线缩小两个pod,稍后行第4项测试且查询是否丢数据;
  3. redis集群备份;
  4. redis集群恢复。

3.1 上传代码

在Goland上选择项目目录,然后依次 工具—> 部署 —> 上传到…

3.2 编译安装

在远程项目根目录下,编译、安装、运行代码。

[root@master1 redis-operator]# ls
api  bin  config  controllers  Dockerfile  go.mod  go.sum  hack  main.go  Makefile  PROJECT
[root@master1 redis-operator]# 
[root@master1 redis-operator]# go mod tidy && make && make install
/home/gitee.com/fym89/redis-operator/bin/controller-gen object:headerFile="hack/boilerplate.go.txt" paths="./..."
go fmt ./...
go vet ./...
go build -o bin/manager main.go
/home/gitee.com/fym89/redis-operator/bin/controller-gen rbac:roleName=manager-role crd webhook paths="./..." output:crd:artifacts:config=config/crd/bases
/home/gitee.com/fym89/redis-operator/bin/kustomize build config/crd | kubectl apply -f -
customresourcedefinition.apiextensions.k8s.io/redisclusters.redis.fuyu.io created
[root@master1 redis-operator]# 
[root@master1 redis-operator]# 
[root@master1 redis-operator]# make run
/home/gitee.com/fym89/redis-operator/bin/controller-gen rbac:roleName=manager-role crd webhook paths="./..." output:crd:artifacts:config=config/crd/bases
/home/gitee.com/fym89/redis-operator/bin/controller-gen object:headerFile="hack/boilerplate.go.txt" paths="./..."
go fmt ./...
go vet ./...
go run ./main.go
2022-03-05T17:35:18.721+0800    INFO    controller-runtime.metrics    metrics server is starting to listen    {"addr": ":8080"}

3.3 创建RedisCluster资源测试

vim redisOperator-crd.yaml

apiVersion: redis.fuyu.io/v1alpha1
kind: RedisCluster
metadata:
  name: redis-cluster
  namespace: redis
spec:
  size: 6  # 副本数量,须为偶数
  image: redis:6.2.4  # 镜像

kubectl apply -f redisOperator-crd.yaml

查看资源创建结果

[root@master1 home]# kubectl get cm,svc,sts,pods,pvc -n redis
NAME                      DATA   AGE
configmap/redis-cluster   2      29m

NAME                    TYPE        CLUSTER-IP   EXTERNAL-IP   PORT(S)              AGE
service/redis-cluster   ClusterIP   None         <none>        6379/TCP,16379/TCP   29m

NAME                             READY   AGE
statefulset.apps/redis-cluster   6/6     29m

NAME                  READY   STATUS    RESTARTS   AGE
pod/redis-cluster-0   1/1     Running   0          29m
pod/redis-cluster-1   1/1     Running   0          29m
pod/redis-cluster-2   1/1     Running   0          29m
pod/redis-cluster-3   1/1     Running   0          29m
pod/redis-cluster-4   1/1     Running   0          29m
pod/redis-cluster-5   1/1     Running   0          29m

NAME                                         STATUS   VOLUME                                     CAPACITY   ACCESS MODES   STORAGECLASS   AGE
persistentvolumeclaim/data-redis-cluster-0   Bound    pvc-a48d5b07-cd10-45fc-a2a5-26cc379b03b1   1Gi        RWO            nfs            29m
persistentvolumeclaim/data-redis-cluster-1   Bound    pvc-a09aac98-5df0-4276-ac21-377a2c4420c7   1Gi        RWO            nfs            29m
persistentvolumeclaim/data-redis-cluster-2   Bound    pvc-876b0459-c478-4d63-88f2-9704665144ba   1Gi        RWO            nfs            29m
persistentvolumeclaim/data-redis-cluster-3   Bound    pvc-1b9eaa01-bc4e-4475-8e2b-c060c79b3381   1Gi        RWO            nfs            29m
persistentvolumeclaim/data-redis-cluster-4   Bound    pvc-be4bf29c-2fcf-4e9d-8834-65050ec95a26   1Gi        RWO            nfs            29m
persistentvolumeclaim/data-redis-cluster-5   Bound    pvc-3d175127-45f9-4a01-bb1a-1ed2bb6e9723   1Gi        RWO            nfs            29m
[root@master1 home]#

3.4 RedisCluster控制器代码实现的功能测试

3.4.1 RedisCluster控制器功能1 - 自动初始化redis集群
待所有pod正常后,进入某个pod执行redis-cli cluster info,查看redis集群状态cluster_state和大小cluster_size是否正常。

3.4.2 RedisCluster控制器功能2 - redis集群扩容缩容:识别pod实例的变化从而对redis集群add-node、del-node
1)执行扩容、缩容命令,pod实例是否正常增加、减少。先加后减。

kubectl scale sts redis-cluster --replicas=8 -n redis
kubectl get pods -n redis
kubectl scale sts redis-cluster --replicas=6 -n redis
kubectl get pods -n redis

2)pod实例增加后,新pod是否自动加入到redis集群中成为redis集群新节点,且自动均衡slot和数据;
修改redisOperator-crd.yaml ,把size修改为8,再重新kubectl apply;

(下面待实现部分···)
3)pod实例缩小后,被删除的pod实例是否自动从redis集群中删除,且自动迁移数据到其他节点。

3.4.3 RedisCluster控制器功能3 - 自动watch调谐资源测试

  • 依次手动删除一个pod实例、service、configmap资源,是否回自动创建;
    kubectl delete pod redis-cluster-4 -n redis
    kubectl delete svc redis-cluster -n redis
    kubectl delete sts redis-cluster -n redis
    kubectl delete cm redis-cluster -n redis
    

我们的控制器代码能正常运行,对应的资源能正常创建和监控,但是pod里的redis进程和redis集群不一定正常访问使用。

3.5 redis集群健康测试

  • 查看集群状态

    kubectl get pods -n redis -o wide | grep redis-cluster
    podnumber=$(kubectl get pods -n redis -o wide | grep redis-cluster | wc -l)
    for x in $(seq 0 `expr $podnumber - 1`); do echo "redis-cluster-$x"; kubectl exec redis-cluster-$x -n redis -- redis-cli role; echo; done
    kubectl exec -it redis-cluster-0 -n redis -- redis-cli cluster info
    kubectl exec -it redis-cluster-0 -n redis -- redis-cli CLUSTER NODES
    
  • 本地测试

    kubectl exec -it redis-cluster-0 -n redis -- redis-cli -h redis-cluster-0 -c -p 6379 set meng hello    # 设置一对K-V值
    kubectl exec -it redis-cluster-0 -n redis -- redis-cli -h redis-cluster-0 -c -p 6379 get meng         # 获取值
    kubectl exec -it redis-cluster-4 -n redis -- redis-cli -h redis-cluster-4 -c -p 6379 get meng
    
  • 调用测试 ```shell [root@node1 ~]# docker run -itd —name redis-test pypy:latest [root@node1 ~]# docker exec -it redis-test bash pip install redis-py-cluster

python

from rediscluster import RedisCluster conn = RedisCluster(host=”10.0.24.78”, port=30755) conn.get(“meng”) ```

(文档表述有些不到位,欢迎提出您宝贵的建议和见解)