第一部分 redis入门
- redis介绍
- 在k8s集群里部署redis集群
- redis/redis集群的日常操作
第二部分 redisOperator开发
- 创建项目
- 编写业务逻辑
- 测试
第一部分 redis入门
一、redis介绍
redis官网地址:redis.io
Redis 是一个高性能的key-value数据库。它支持存储的value类型相对更多,包括string(字符串)、list(链表)、set(集合)、zset(sorted set —有序集合)和hash(哈希类型)。
Redis支持主从同步。
Redis数据存在内存中,支持数据持久化—定时或定量写入磁盘.
Redis支持原子操作(要么成功执行要么失败完全不执行)。
Redis支持数据备份,即master-slave模式的数据备份。
性能极高,官方测试:完成了50个并发执行100000个请求。设置和获取的值是一个256字节字符串。结果读的速度是110000次/s,写的速度是81000次/s。

应用场景
关于Redis的应用场景,每个人有不同的心得,笔者选取了一些比较经典的场景,包括分布式锁、有序集合、管道与事务、Lua脚本以及最新支持的Streams进行介绍。
二、在k8s集群里部署redis集群
- 准备一个k8s集群;
- 在集群中准备好storageClass;
- 编写redis-statefulset.yaml;
- 初始化redis集群.
vim redis-statefulset.yaml
---apiVersion: v1kind: ConfigMapmetadata:name: redis-clusternamespace: redisdata:update-node.sh: |#!/bin/shREDIS_NODES="/data/nodes.conf"sed -i -e "/myself/ s/[0-9]\{1,3\}\.[0-9]\{1,3\}\.[0-9]\{1,3\}\.[0-9]\{1,3\}/${POD_IP}/" ${REDIS_NODES}exec "$@"redis.conf: |+bind 0.0.0.0port 6379cluster-enabled yescluster-require-full-coverage nocluster-node-timeout 15000cluster-config-file /data/nodes.confcluster-migration-barrier 1appendonly yesprotected-mode no---apiVersion: apps/v1kind: StatefulSetmetadata:name: redis-clusternamespace: redisspec:serviceName: redis-clusterreplicas: 6selector:matchLabels:app: redis-clustertemplate:metadata:labels:app: redis-clusterspec:containers:- name: redisimage: redis:6.2.4ports:- containerPort: 6379name: client- containerPort: 16379name: gossipcommand: ["/conf/update-node.sh", "redis-server", "/conf/redis.conf"]env:- name: POD_IPvalueFrom:fieldRef:fieldPath: status.podIPvolumeMounts:- name: confmountPath: /confreadOnly: false- name: datamountPath: /datareadOnly: falsevolumes:- name: confconfigMap:name: redis-clusterdefaultMode: 0755volumeClaimTemplates:- metadata:name: datanamespace: test-dbspec:accessModes: [ "ReadWriteOnce" ]resources:requests:storage: 5GistorageClassName: nfsvolumeMode: Filesystem---apiVersion: v1kind: Servicemetadata:name: redis-clusternamespace: redisspec:type: NodePortports:- port: 6379targetPort: 6379name: client- port: 16379targetPort: 16379name: gossipselector:app: redis-cluster
[root@master1 home]#[root@master1 home]# kubectl apply -f redis-statefulset.yamlconfigmap/redis-cluster createdstatefulset.apps/redis-cluster createdservice/redis-cluster created[root@master1 home]#[root@master1 home]# kubectl get pods -n redisNAME READY STATUS RESTARTS AGEredis-cluster-0 1/1 Running 0 3sredis-cluster-1 0/1 Pending 0 1s[root@master1 home]# kubectl get pods -n redisNAME READY STATUS RESTARTS AGEredis-cluster-0 1/1 Running 0 98sredis-cluster-1 1/1 Running 0 96sredis-cluster-2 1/1 Running 0 93sredis-cluster-3 1/1 Running 0 90sredis-cluster-4 1/1 Running 0 85sredis-cluster-5 1/1 Running 0 82s[root@master1 home]#
初始化redis集群
[root@master1 home]# podips=`kubectl get pods -l app=redis-cluster -n redis -o jsonpath='{range.items[*]}{.status.podIP}:6379 '`[root@master1 home]# podips=${podips% :*}[root@master1 home]# kubectl exec -it redis-cluster-0 -n redis -- redis-cli --cluster create --cluster-replicas 1 $podips>>> Performing hash slots allocation on 6 nodes...Master[0] -> Slots 0 - 5460Master[1] -> Slots 5461 - 10922Master[2] -> Slots 10923 - 16383···Can I set the above configuration? (type 'yes' to accept): yes···[OK] All nodes agree about slots configuration.>>> Check for open slots...>>> Check slots coverage...[OK] All 16384 slots covered.[root@master1 home]#[root@master1 home]# kubectl exec -it redis-cluster-0 -n redis -- redis-cli CLUSTER NODESd43c1be85889411d105a06c1871ebe912137f93c 10.244.2.43:6379@16379 slave 2b5cdf33140f27bb7eb413e28c47bd6b2558d169 0 1646135095925 1 connected0da6119ddc95176e47cf8f8de54654d9933941e9 10.244.2.41:6379@16379 master - 0 1646135094914 2 connected 5461-109228a828910d7383af547a3b3759664a221ab3e0d57 10.244.2.42:6379@16379 master - 0 1646135094000 3 connected 10923-163832b5cdf33140f27bb7eb413e28c47bd6b2558d169 10.244.2.40:6379@16379 myself,master - 0 1646135094000 1 connected 0-5460317e93aa6b1335c0082d40908386717339e284e3 10.244.2.44:6379@16379 slave 0da6119ddc95176e47cf8f8de54654d9933941e9 0 1646135095000 2 connected5679479f6825b235fb92d1a82179bc0e11368e9e 10.244.1.92:6379@16379 slave 8a828910d7383af547a3b3759664a221ab3e0d57 0 1646135092895 3 connected[root@master1 home]#
三、redis/redis集群的日常操作
集群扩容-添加节点
[root@master1 home]# kubectl scale --replicas=8 statefulset/redis-cluster -n redisstatefulset.apps/redis-cluster scaled[root@master1 home]# kubectl get pods -n redisNAME READY STATUS RESTARTS AGEredis-cluster-0 1/1 Running 0 13mredis-cluster-1 1/1 Running 0 13mredis-cluster-2 1/1 Running 0 13mredis-cluster-3 1/1 Running 0 13mredis-cluster-4 1/1 Running 0 13mredis-cluster-5 1/1 Running 0 13mredis-cluster-6 1/1 Running 0 13sredis-cluster-7 1/1 Running 0 8s
(1)获取信息# redis-cluster-0这个pod的ippodIP0=`kubectl get pods redis-cluster-0 -n redis -o wide | awk 'NR>1{print $6}'`podIP6=`kubectl get pods redis-cluster-6 -n redis -o wide | awk 'NR>1{print $6}'`podIP7=`kubectl get pods redis-cluster-7 -n redis -o wide | awk 'NR>1{print $6}'`# redis节点: redis-cluster-0的node IDcluster0ID=`kubectl exec -it redis-cluster-0 -n redis -- redis-cli CLUSTER NODES | grep "$podIP0" | awk '{print $1}'`newMaterID=`kubectl exec -it redis-cluster-0 -n redis -- redis-cli CLUSTER NODES | grep "$podIP6" | awk '{print $1}'`newSlaveID=`kubectl exec -it redis-cluster-0 -n redis -- redis-cli CLUSTER NODES | grep "$podIP7" | awk '{print $1}'`(2)添加新节点# 第一个参数是新节点的ip:port,第二个参数是任意已存在节点的ip:portkubectl exec -it redis-cluster-0 -n redis -- redis-cli --cluster add-node $podIP6:6379 $podIP0:6379# 最后一个参数是新master的node IDkubectl exec -it redis-cluster-0 -n redis -- redis-cli --cluster add-node $podIP7:6379 $podIP0:6379 --cluster-slave --cluster-master-id "$newMaterID"(3)分配哈希槽slot# 均衡slot,最后一个参数为任意节点的ip:portkubectl exec -it redis-cluster-0 -n redis -- redis-cli --cluster rebalance --cluster-threshold 1 --cluster-use-empty-masters $podIP0:6379kubectl exec -it redis-cluster-0 -n redis -- redis-cli --cluster info $podIP0:6379
集群缩容-移除节点
(1)移除从节点# 用del-node参数从节点可以直接移除kubectl exec -it redis-cluster-0 -n redis -- redis-cli --cluster del-node $podIP0:6379 $newSlaveID(2)移除主节点# 获取要删除的主节点上存在的slot数cluster6slot=`kubectl exec -it redis-cluster-0 -n redis -- redis-cli --cluster info $podIP0:6379 | grep $podIP6 | awk '{print $7}'`# 迁移slotkubectl exec -it redis-cluster-0 -n redis -- redis-cli --cluster reshard --cluster-from $newMaterID --cluster-to $cluster0ID --cluster-slots $cluster6slot $podIP0:6379# 迁移完slot后,可以把主节点删除。$podIP0:6379是其他任意节点ip:port,$newMaterID是要移除的主节点node IDkubectl exec -it redis-cluster-0 -n redis -- redis-cli --cluster del-node $podIP0:6379 $newMaterID# 节点从集群中forget后,可以节点下线了kubectl scale --replicas=8 sts redis-cluster -n redis# 删除对应的pvckubectl delete pvc redis-data-redis-cluster-6 redis-data-redis-cluster-7 -n redis
集群备份
待补充
集群恢复
待补充
手动均衡slot槽
待补充
第一部分和第二部分没有绝对的联系,第一部分仅作为对redis极其安装的了解,为第二部分redisClusterOperator开始打好基础。
第二部分 redisClusterOperator开发
一、创建项目
1.1 预设CRD资源
比如我们想要通过下面的 CRD 资源来创建对应的 redis 集群:
apiVersion: redis.fuyu.io/v1alpha1kind: RedisClustermetadata:name: redis-clusterspec:size: 6 # 副本数量,须为偶数image: redis:6.2.4 # 镜像storageClassName: nfsstorage: 5Gi
1.2 初始化项目
mkdir -p gitee.com/fym89/redis-operator && cd gitee.com/fym89/redis-operatorexport GO111MODULE=onexport GOPROXY=https://goproxy.cnkubebuilder init --domain fuyu.io --owner fuyu --repo gitee.com/fym89/redis-operatorkubebuilder create api --group redis --version v1alpha1 --kind RedisCluster --resource=true --controller=true
二、编写业务逻辑
2.1 基础修改
2.1.1 修改config/samples/redis_v1alpha1_rediscluster.yaml
apiVersion: redis.fuyu.io/v1alpha1kind: RedisClustermetadata:name: redis-clusterspec:size: 6 # 副本数量,须为偶数image: redis:6.2.4 # 镜像storageClassName: nfsstorage: 5Gi
和我们预设的crd资源清单保持格式一致。
2.1.2 在main.go里添加日志初始化
if err = (&controllers.RedisClusterReconciler{Client: mgr.GetClient(),Log: ctrl.Log.WithName("controllers").WithName("RedisCluster"), // 初始化日志Scheme: mgr.GetScheme(),}).SetupWithManager(mgr); err != nil {setupLog.Error(err, "unable to create controller", "controller", "RedisCluster")os.Exit(1)}
2.1.3 修改api/v1alpha1/rediscluster_types.go
在 RedisClusterSpec struct 结构体里添加yaml解析字段,和预设CRD资源清单保持一致。
type RedisClusterSpec struct {// INSERT ADDITIONAL SPEC FIELDS - desired state of cluster// Important: Run "make" to regenerate code after modifying this fileSize *int32 `json:"size"`Image string `json:"image"`StorageClassName string `json:"storageclassname,omitempty"`Storage string `json:"storage,omitempty"`}
修改完这个文件,先编译一下。
[root@master1 redis-operator]# make/home/gitee.com/fym89/redis-operator/bin/controller-gen object:headerFile="hack/boilerplate.go.txt" paths="./..."go fmt ./...go vet ./...go build -o bin/manager main.go[root@master1 redis-operator]#
2.1.3 修改controllers/rediscluster_controller.go
通用业务逻辑 — 调谐部分 - Reconcile函数
/*
Copyright 2022 fuyu.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controllers
import (
"github.com/go-logr/logr"
"golang.org/x/net/context"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
redisv1alpha1 "gitee.com/fym89/redis-operator/api/v1alpha1"
)
var initClusterEnd = false
// RedisClusterReconciler reconciles a RedisCluster object
type RedisClusterReconciler struct {
client.Client
Log logr.Logger
Scheme *runtime.Scheme
}
//+kubebuilder:rbac:groups=apps,resources=statefulsets,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=core,resources=configmaps,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=redis.fuyu.io,resources=redisclusters,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=redis.fuyu.io,resources=redisclusters/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=redis.fuyu.io,resources=redisclusters/finalizers,verbs=update
// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
// TODO(user): Modify the Reconcile function to compare the state specified by
// the RedisCluster object against the actual cluster state, and then
// perform operations to make the cluster state reflect the state specified by
// the user.
//
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.10.0/pkg/reconcile
func (r *RedisClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
ctx = context.Background()
log := r.Log.WithValues("RedisCluster", req.NamespacedName)
// 获取RedisCluster实例
var redisCluster redisv1alpha1.RedisCluster
if err := r.Get(ctx, req.NamespacedName, &redisCluster); err != nil {
// 如果RedisCluster是被删了,那么应该忽略掉
return ctrl.Result{}, client.IgnoreNotFound(err)
}
// 创建对应的 statefulSet 和 svc
// CreateOrUpdate svc
var svc corev1.Service
svc.Name = redisCluster.Name
svc.Namespace = redisCluster.Namespace
result, err := ctrl.CreateOrUpdate(ctx, r.Client, &svc, func() error {
// 拼接 service
MutateService(&redisCluster, &svc)
return controllerutil.SetControllerReference(&redisCluster, &svc, r.Scheme)
})
if err != nil {
return ctrl.Result{}, err
}
log.Info("CreateOrUpdate Result", "Service", result)
// CreateOrUpdate statefulSet
var sts appsv1.StatefulSet
sts.Name = redisCluster.Name
sts.Namespace = redisCluster.Namespace
result, err = ctrl.CreateOrUpdate(ctx, r.Client, &sts, func() error {
// 去拼接 StatefulSet
MutateStatefulSet(&redisCluster, &sts)
return controllerutil.SetControllerReference(&redisCluster, &sts, r.Scheme)
})
if err != nil {
return ctrl.Result{}, err
}
log.Info("CreateOrUpdate Result", "StatefulSet", result)
// CreateOrUpdate ConfigMap
var cm corev1.ConfigMap
cm.Name = redisCluster.Name
cm.Namespace = redisCluster.Namespace
result, err = ctrl.CreateOrUpdate(ctx, r.Client, &cm, func() error {
// 去拼接ConfigMap
MutateConfigMap(&cm)
return controllerutil.SetControllerReference(&redisCluster, &cm, r.Scheme)
})
if err != nil {
return ctrl.Result{}, err
}
log.Info("CreateOrUpdate Result", "ConfigMap", result)
// 初始化redis集群。如果已经初始化成功,initClusterEnd会变成true,将不再进入初始化操作。
if !initClusterEnd {
if InitRedisCluster(&redisCluster, r.Log) {
log.Info("InitCluster Result", "InitCluster", "redis集群初始化完成")
initClusterEnd = true
}
}
// redis集群扩容:添加redis新节点
if initClusterEnd {
if result, err := AddRedisNode(&redisCluster, log); result == "失败" && err != nil {
log.Info("AddRedisNode Result", "AddRedisNode", "失败")
}
}
return ctrl.Result{}, nil
}
// SetupWithManager sets up the controller with the Manager.
func (r *RedisClusterReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&redisv1alpha1.RedisCluster{}).
Owns(&appsv1.StatefulSet{}).
Owns(&corev1.Service{}).
Owns(&corev1.ConfigMap{}).
Complete(r)
}
2.2 核心代码
2.2.1 编辑controllers/resources.go
差异化逻辑1 - 添加MutateService和MutateStatefulSet、MutateConfigMap方法的实现。本质上就是拼接service、statefulset、configmap资源的各个字段信息。
package controllers
import (
"fmt"
"gitee.com/fym89/redis-operator/api/v1alpha1"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
var (
RedisClusterCommonLabelKey = "app"
RedisClusterLabelKey = "redis.fuyu.io/cluster"
RedisDataDirName = "data"
)
func MutateService(cluster *v1alpha1.RedisCluster, svc *corev1.Service) {
svc.Labels = map[string]string{
RedisClusterCommonLabelKey: "redis-cluster",
}
svc.Spec = corev1.ServiceSpec{
ClusterIP: corev1.ClusterIPNone,
Selector: map[string]string{
RedisClusterLabelKey: cluster.Name,
},
Ports: []corev1.ServicePort{corev1.ServicePort{
Name: "client",
Port: 6379,
}, corev1.ServicePort{
Name: "gossip",
Port: 16379,
}},
}
}
func MutateStatefulSet(cluster *v1alpha1.RedisCluster, sts *appsv1.StatefulSet) {
scriptMode := int32(0755)
storageClassName := "nfs"
if cluster.Spec.Storage == "" {
cluster.Spec.Storage = "1Gi"
}
if cluster.Spec.StorageClassName == nil {
cluster.Spec.StorageClassName = &storageClassName
}
sts.Labels = map[string]string{
RedisClusterCommonLabelKey: "redis-cluster",
}
sts.Spec = appsv1.StatefulSetSpec{
Replicas: cluster.Spec.Size,
ServiceName: cluster.Name,
Selector: &metav1.LabelSelector{MatchLabels: map[string]string{
RedisClusterCommonLabelKey: cluster.Name,
}},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
RedisClusterCommonLabelKey: cluster.Name,
},
},
Spec: corev1.PodSpec{
Containers: newContainers(cluster),
Volumes: []corev1.Volume{
corev1.Volume{
Name: "conf",
VolumeSource: corev1.VolumeSource{
ConfigMap: &corev1.ConfigMapVolumeSource{
LocalObjectReference: corev1.LocalObjectReference{
Name: sts.Name,
},
DefaultMode: &scriptMode,
},
},
},
},
},
},
VolumeClaimTemplates: []corev1.PersistentVolumeClaim{
corev1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: RedisDataDirName,
},
Spec: corev1.PersistentVolumeClaimSpec{
AccessModes: []corev1.PersistentVolumeAccessMode{
corev1.ReadWriteOnce,
},
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceStorage: resource.MustParse(cluster.Spec.Storage),
},
},
StorageClassName: cluster.Spec.StorageClassName,
},
},
},
}
}
func newContainers(cluster *v1alpha1.RedisCluster) []corev1.Container {
return []corev1.Container{
corev1.Container{
Name: "redis",
Image: cluster.Spec.Image,
ImagePullPolicy: corev1.PullIfNotPresent,
Ports: []corev1.ContainerPort{
corev1.ContainerPort{
Name: "client",
ContainerPort: 6379,
},
corev1.ContainerPort{
Name: "gossip",
ContainerPort: 16379,
},
},
VolumeMounts: []corev1.VolumeMount{
corev1.VolumeMount{
Name: RedisDataDirName,
MountPath: "/data",
ReadOnly: false,
},
corev1.VolumeMount{
Name: "conf",
MountPath: "/conf",
ReadOnly: false,
},
},
Env: []corev1.EnvVar{
corev1.EnvVar{
Name: "POD_IP",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "status.podIP",
},
},
},
},
Command: []string{
"/conf/update-node.sh",
"redis-server",
"/conf/redis.conf",
},
},
}
}
func MutateConfigMap(cm *corev1.ConfigMap) {
upDataNodeSh := fmt.Sprintf(`#!/bin/sh
REDIS_NODES="/data/nodes.conf"
sed -i -e "/myself/ s/[0-9]\{1,3\}\.[0-9]\{1,3\}\.[0-9]\{1,3\}\.[0-9]\{1,3\}/${POD_IP}/" ${REDIS_NODES}
exec "$@"`)
// redis 配置文件
configFile := fmt.Sprintf(`bind 0.0.0.0
port 6379
cluster-enabled yes
cluster-require-full-coverage no
cluster-node-timeout 15000
cluster-config-file /data/nodes.conf
cluster-migration-barrier 1
appendonly yes
protected-mode no`)
// redis cluster 初始化脚本
initClusterSh := fmt.Sprintf(`#!/bin/bash
echo yes | redis-cli --cluster create --cluster-replicas 1 $1`)
cm.Data = map[string]string{
"update-node.sh": upDataNodeSh,
"redis.conf": configFile,
"initCluster.sh": initClusterSh,
}
}
2.2.2 编辑controllers/initRedisCluster.go
差异化逻辑2 - 这里是初始化redis cluster集群的核心代码。初始化redis集群的逻辑为:读取kubeconfig —> 创建连接客户端clientset —> 获取redis pod列表 —> 判断所有redis pod是否为Running —> exec进入last pod执行date命令再次判断最后一个pod是否准备就绪 —> 判断是否redis cluster已经初始化 —> 初始化redis cluster。
(初来咋到,代码逻辑有点乱,代码能力也比较菜,敬请包容与谅解,后续持续优化中···)
package controllers
import (
"bytes"
"context"
"fmt"
"gitee.com/fym89/redis-operator/api/v1alpha1"
"github.com/go-logr/logr"
"io"
corev1 "k8s.io/api/core/v1"
metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/remotecommand"
"os"
"path/filepath"
"strconv"
"strings"
)
// InitK8sClusterClient 初始化一个客户端用户连接k8s集群
func InitK8sClusterClient() (*rest.Config, error) {
var config *rest.Config
kubeConfig := filepath.Join("/root/.kube", "config")
config, err := clientcmd.BuildConfigFromFlags("", kubeConfig)
if err != nil {
return nil, err
}
return config, nil
}
// getPodList 获取pod列表
func getPodList(cluster *v1alpha1.RedisCluster, log logr.Logger) *corev1.PodList {
var err error
var config *rest.Config
config, err = InitK8sClusterClient()
if err != nil {
log.Error(err, "controllers.getPods", "初始化kubeConfig配置", "失败")
return nil
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
log.Error(err, "controllers.getPods", "使用kubeConfig配置初始化clientset", "失败")
return nil
}
// 获取命名空间下的pod列表
opts := metaV1.ListOptions{}
pods, err := clientset.CoreV1().Pods(cluster.Namespace).List(context.Background(), opts)
log.Info(fmt.Sprintf("等待所有pod就绪 ··· %s/%s", strconv.Itoa(len(pods.Items)), strconv.FormatInt(int64(*cluster.Spec.Size), 10)))
if err != nil {
log.Error(err, "controllers.getPodList", "获取pod列表", "失败")
return nil
}
if pods == nil || len(pods.Items) == 0 {
return nil
}
return pods
}
// execCommandsInPodContainer exec进入pod执行命令
func execCommandsInPodContainer(cluster *v1alpha1.RedisCluster, commands []string, podName string, log logr.Logger) string {
config, err := InitK8sClusterClient()
if err != nil {
log.Error(err, "controllers.execCommandsInPodContainer", "初始化kubeConfig配置", "失败")
return ""
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
log.Error(err, "controllers.execCommandsInPodContainer", "使用kubeConfig配置初始化clientset", "失败")
return ""
}
// 捕捉标准输出
old := os.Stdout // keep backup of the real stdout
reader, writer, _ := os.Pipe()
os.Stdout = writer
req := clientset.CoreV1().RESTClient().Post().
Resource("pods").Name(podName).Namespace(cluster.Namespace).SubResource("exec").
VersionedParams(&corev1.PodExecOptions{
Command: commands,
Stdin: true,
Stdout: true,
Stderr: true,
TTY: true,
}, scheme.ParameterCodec)
exec, err := remotecommand.NewSPDYExecutor(config, "POST", req.URL())
screen := struct {
io.Reader
io.Writer
}{os.Stdin, os.Stdout}
if err = exec.Stream(remotecommand.StreamOptions{
Stdin: screen,
Stdout: screen,
Stderr: screen,
Tty: false,
}); err != nil {
log.Error(err, "controllers.execCommandsInPodContainer", "进入pod执行命令", "失败")
}
outC := make(chan string)
// copy the output in a separate goroutine so printing can't block indefinitely
go func() {
var buf bytes.Buffer
_, _ = io.Copy(&buf, reader)
outC <- buf.String()
}()
// back to normal state
_ = writer.Close()
os.Stdout = old // restoring the real stdout
outText := <-outC
results := fmt.Sprintf("命令: %s\n执行结果:\n%s", commands, outText)
log.Info("controllers.execCommandsInPodContainer", "进入pod执行命令", "成功")
fmt.Println(results)
return outText
}
// InitRedisCluster 检查redis集群是否已经初始化
func InitRedisCluster(cluster *v1alpha1.RedisCluster, log logr.Logger) (initEnd bool) {
// 获取pod列表
pods := getPodList(cluster, log)
if pods == nil {
log.Info("controllers.InitRedisCluster", "获取所有pod副本", "失败")
return false
}
log.Info("controllers.InitRedisCluster", "获取所有pod副本", "成功")
// 检查所有pod状态是否都是Running
for _, pod := range pods.Items {
if string(pod.Status.Phase) != "Running" {
log.Info("controllers.InitRedisCluster", "所有pod状态为Running", "false")
return false
}
}
log.Info("controllers.InitRedisCluster", "所有pod状态为Running", "true")
// 检测最后一个pod是否已经正常
// 拼接最后一个pod名称:如果sts名为redis-cluster,副本数size为6,最后一个pod名字为redis-cluster-5
LastPodName := cluster.Name + "-" + fmt.Sprint(*cluster.Spec.Size-1)
if result1 := execCommandsInPodContainer(cluster, []string{"date"}, LastPodName, log); result1 == "" {
log.Info("controllers.InitRedisCluster", "最后一个pod是否接受访问", "false")
return false
}
// 获取所有Pod IP
IPs := ""
for _, pod := range pods.Items {
IPs = IPs + pod.Status.PodIP + ":6379 "
}
FirstPodName := cluster.Name + "-0"
// 判断集群是否已经初始化
if result := execCommandsInPodContainer(cluster, []string{"redis-cli", "cluster", "info"}, FirstPodName, log); result != "" {
if strings.Contains(result, "cluster_state:fail") && strings.Contains(result, "cluster_size:0") {
// 开始初始化
result := execCommandsInPodContainer(cluster, []string{"bash", "/conf/initCluster.sh", IPs}, FirstPodName, log)
if result == "" {
log.Info("controllers.InitRedisCluster", "redis集群初始化结果", "failed")
return false
}
log.Info("controllers.InitRedisCluster", "redis集群初始化结果", "succeed")
initEnd = true
}
// log.Info("redis集群已经初始化。初始化命令返回结果:", "InitCluster", result)
log.Info("controllers.InitRedisCluster", "redis集群已经初始化", "true")
initEnd = true
}
return initEnd
}
2.2.3 编辑controllers/add_node.go
差异化逻辑3 - 这里主要实现对新的redis节点的添加。添加逻辑为:获取redis集群 And 当前命名空间podList —>
判断具有同Label的pod是否都在redis集群里 —> 不在的pod为newPods,newPods超过两个则开始启动添加redis节点 —> 加入new Master节点 —> 加入new slave节点 —> 均衡slot。
package controllers
import (
"context"
"gitee.com/fym89/redis-operator/api/v1alpha1"
"github.com/go-logr/logr"
metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"strings"
"time"
)
/*
1、执行redis-cli CLUSTER NODES 获取当前redis集群所有节点的ip: RedisNodeIps;
2、获取命名空间下的所有标签app=redis-cluster的pod ip:AllPodIps;
3、判断AllPodIps里的ip是否有not in RedisNodeIps,如果有,说明扩容了,准备加入redis集群;
4、获取添加节点所需信息:Pod0IP(FirstPodName的ip),newPodIP(两个/偶数个),newMaterID;
5、添加新节点
*/
func AddRedisNode(cluster *v1alpha1.RedisCluster, log logr.Logger) (string, error) {
FirstPodName1 := cluster.Name + "-0"
// 1、执行redis-cli CLUSTER NODES 获取当前redis集群所有节点的ip: RedisNodeIps;
RedisNodeInfo := execCommandsInPodContainer(cluster, []string{"redis-cli", "CLUSTER", "NODES"}, FirstPodName1, log)
if RedisNodeInfo == "" {
return "失败", nil
}
log.Info("controllers.AddRedisNode", "获取集群信息", "end")
// 存储第一个redis节点的IP,用于后面加入集群准备。连续两次切割
firstPodIP1 := strings.Split(strings.Split(RedisNodeInfo, "@16379")[0], " ")[1]
// 2、获取命名空间下的所有标签app=redis-cluster的pod ip:AllPodIps;
_, clientset, err := InitK8sClusterClient(log)
if err != nil {
return "失败", err
}
pods, err := clientset.CoreV1().Pods(cluster.Namespace).List(context.Background(), metaV1.ListOptions{})
if err != nil {
return "失败", err
}
// 3、判断AllPodIps里的ip是否有not in RedisNodeIps,如果有,说明扩容了,准备加入redis集群;
var newPodIps, newPodNames []string // 获取newPodNames为了获取newMasterNodeID
for _, pod := range pods.Items {
if pod.Labels[RedisClusterCommonLabelKey] == cluster.Name && pod.Status.Phase == "Running" {
if !strings.Contains(RedisNodeInfo, pod.Status.PodIP) {
newPodIps = append(newPodIps, pod.Status.PodIP+":6379")
newPodNames = append(newPodNames, pod.Name)
}
}
}
// 至少有两个新pod属于Running状态再加入成为Redis集群新节点
if len(newPodIps) > 1 {
// 获取 new master redis nodeID
redisNodeInfo := execCommandsInPodContainer(cluster, []string{"redis-cli", "CLUSTER", "NODES"}, newPodNames[0], log)
if redisNodeInfo == "" {
return "失败", nil
}
newMasterNodeID := strings.Split(redisNodeInfo, " ")[0]
log.Info("controllers.AddRedisNode", "开始添加redis新节点", newPodIps)
// 开始添加新redis master节点
if result := execCommandsInPodContainer(cluster, []string{
"redis-cli",
"--cluster",
"add-node",
newPodIps[0],
firstPodIP1,
}, FirstPodName1, log); result == "" {
return "失败", nil
}
log.Info("controllers.AddRedisNode", "add new redis master node", "成功")
time.Sleep(time.Second * 120)
// 开始添加新redis slave节点
if result := execCommandsInPodContainer(cluster, []string{
"redis-cli",
"--cluster",
"add-node",
newPodIps[1],
firstPodIP1,
"--cluster-slave",
"--cluster-master-id",
newMasterNodeID,
}, FirstPodName1, log); result == "" || result == "command terminated with exit code 1" {
log.Info("controllers.AddRedisNode", "添加slave-node失败", result)
return "失败", nil
}
log.Info("controllers.AddRedisNode", "添加slave-node", "成功")
// 均衡slot
if result := execCommandsInPodContainer(cluster, []string{
"redis-cli",
"--cluster",
"rebalance",
"--cluster-threshold",
"1",
"--cluster-use-empty-masters",
firstPodIP1,
}, FirstPodName1, log); result == "" || result == "command terminated with exit code 1" {
log.Info("controllers.AddRedisNode", "均衡slot失败", result)
return "失败", nil
}
log.Info("controllers.AddRedisNode", "均衡slot", "成功")
}
return "成功", nil
}
三、测试
测试项:
- 代码能否正常编译运行;
- 代码正常运行后,创建RedisCluster资源,能否自动创建对应的service、statefulset、configmap资源;
- 分别手动删除service、statefulset、configmap,是否能自动watch并调谐(CreateOrUpdate)恢复期望状态;
- 待所有pod正常后,进入某个pod执行redis-cli cluster info,查看redis集群状态_state和大小_size是否正常;
- redis集群扩容 — 在线增加两个pod,待新pod正常后执行第4项测试;
(下面待实现部分···)
- redis pod副本数大小size只能是偶数判断 — 把副本数size设置为奇数,创建是否报错且提示?
- redis集群扩容 — kubectl scale —replicas=6 sts xxx 在线缩小两个pod,稍后行第4项测试且查询是否丢数据;
- redis集群备份;
- redis集群恢复。
3.1 上传代码
在Goland上选择项目目录,然后依次 工具—> 部署 —> 上传到…
3.2 编译安装
在远程项目根目录下,编译、安装、运行代码。
[root@master1 redis-operator]# ls
api bin config controllers Dockerfile go.mod go.sum hack main.go Makefile PROJECT
[root@master1 redis-operator]#
[root@master1 redis-operator]# go mod tidy && make && make install
/home/gitee.com/fym89/redis-operator/bin/controller-gen object:headerFile="hack/boilerplate.go.txt" paths="./..."
go fmt ./...
go vet ./...
go build -o bin/manager main.go
/home/gitee.com/fym89/redis-operator/bin/controller-gen rbac:roleName=manager-role crd webhook paths="./..." output:crd:artifacts:config=config/crd/bases
/home/gitee.com/fym89/redis-operator/bin/kustomize build config/crd | kubectl apply -f -
customresourcedefinition.apiextensions.k8s.io/redisclusters.redis.fuyu.io created
[root@master1 redis-operator]#
[root@master1 redis-operator]#
[root@master1 redis-operator]# make run
/home/gitee.com/fym89/redis-operator/bin/controller-gen rbac:roleName=manager-role crd webhook paths="./..." output:crd:artifacts:config=config/crd/bases
/home/gitee.com/fym89/redis-operator/bin/controller-gen object:headerFile="hack/boilerplate.go.txt" paths="./..."
go fmt ./...
go vet ./...
go run ./main.go
2022-03-05T17:35:18.721+0800 INFO controller-runtime.metrics metrics server is starting to listen {"addr": ":8080"}
3.3 创建RedisCluster资源测试
vim redisOperator-crd.yaml
apiVersion: redis.fuyu.io/v1alpha1
kind: RedisCluster
metadata:
name: redis-cluster
namespace: redis
spec:
size: 6 # 副本数量,须为偶数
image: redis:6.2.4 # 镜像
kubectl apply -f redisOperator-crd.yaml
查看资源创建结果
[root@master1 home]# kubectl get cm,svc,sts,pods,pvc -n redis
NAME DATA AGE
configmap/redis-cluster 2 29m
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
service/redis-cluster ClusterIP None <none> 6379/TCP,16379/TCP 29m
NAME READY AGE
statefulset.apps/redis-cluster 6/6 29m
NAME READY STATUS RESTARTS AGE
pod/redis-cluster-0 1/1 Running 0 29m
pod/redis-cluster-1 1/1 Running 0 29m
pod/redis-cluster-2 1/1 Running 0 29m
pod/redis-cluster-3 1/1 Running 0 29m
pod/redis-cluster-4 1/1 Running 0 29m
pod/redis-cluster-5 1/1 Running 0 29m
NAME STATUS VOLUME CAPACITY ACCESS MODES STORAGECLASS AGE
persistentvolumeclaim/data-redis-cluster-0 Bound pvc-a48d5b07-cd10-45fc-a2a5-26cc379b03b1 1Gi RWO nfs 29m
persistentvolumeclaim/data-redis-cluster-1 Bound pvc-a09aac98-5df0-4276-ac21-377a2c4420c7 1Gi RWO nfs 29m
persistentvolumeclaim/data-redis-cluster-2 Bound pvc-876b0459-c478-4d63-88f2-9704665144ba 1Gi RWO nfs 29m
persistentvolumeclaim/data-redis-cluster-3 Bound pvc-1b9eaa01-bc4e-4475-8e2b-c060c79b3381 1Gi RWO nfs 29m
persistentvolumeclaim/data-redis-cluster-4 Bound pvc-be4bf29c-2fcf-4e9d-8834-65050ec95a26 1Gi RWO nfs 29m
persistentvolumeclaim/data-redis-cluster-5 Bound pvc-3d175127-45f9-4a01-bb1a-1ed2bb6e9723 1Gi RWO nfs 29m
[root@master1 home]#
3.4 RedisCluster控制器代码实现的功能测试
3.4.1 RedisCluster控制器功能1 - 自动初始化redis集群
待所有pod正常后,进入某个pod执行redis-cli cluster info,查看redis集群状态cluster_state和大小cluster_size是否正常。
3.4.2 RedisCluster控制器功能2 - redis集群扩容缩容:识别pod实例的变化从而对redis集群add-node、del-node
1)执行扩容、缩容命令,pod实例是否正常增加、减少。先加后减。
kubectl scale sts redis-cluster --replicas=8 -n redis
kubectl get pods -n redis
kubectl scale sts redis-cluster --replicas=6 -n redis
kubectl get pods -n redis
2)pod实例增加后,新pod是否自动加入到redis集群中成为redis集群新节点,且自动均衡slot和数据;
修改redisOperator-crd.yaml ,把size修改为8,再重新kubectl apply;
(下面待实现部分···)
3)pod实例缩小后,被删除的pod实例是否自动从redis集群中删除,且自动迁移数据到其他节点。
3.4.3 RedisCluster控制器功能3 - 自动watch调谐资源测试
- 依次手动删除一个pod实例、service、configmap资源,是否回自动创建;
kubectl delete pod redis-cluster-4 -n redis kubectl delete svc redis-cluster -n redis kubectl delete sts redis-cluster -n redis kubectl delete cm redis-cluster -n redis
我们的控制器代码能正常运行,对应的资源能正常创建和监控,但是pod里的redis进程和redis集群不一定正常访问使用。
3.5 redis集群健康测试
查看集群状态
kubectl get pods -n redis -o wide | grep redis-cluster podnumber=$(kubectl get pods -n redis -o wide | grep redis-cluster | wc -l) for x in $(seq 0 `expr $podnumber - 1`); do echo "redis-cluster-$x"; kubectl exec redis-cluster-$x -n redis -- redis-cli role; echo; done kubectl exec -it redis-cluster-0 -n redis -- redis-cli cluster info kubectl exec -it redis-cluster-0 -n redis -- redis-cli CLUSTER NODES本地测试
kubectl exec -it redis-cluster-0 -n redis -- redis-cli -h redis-cluster-0 -c -p 6379 set meng hello # 设置一对K-V值 kubectl exec -it redis-cluster-0 -n redis -- redis-cli -h redis-cluster-0 -c -p 6379 get meng # 获取值 kubectl exec -it redis-cluster-4 -n redis -- redis-cli -h redis-cluster-4 -c -p 6379 get meng调用测试 ```shell [root@node1 ~]# docker run -itd —name redis-test pypy:latest [root@node1 ~]# docker exec -it redis-test bash pip install redis-py-cluster
python
from rediscluster import RedisCluster conn = RedisCluster(host=”10.0.24.78”, port=30755) conn.get(“meng”) ```
(文档表述有些不到位,欢迎提出您宝贵的建议和见解)
