为什么需要异步通信?

在我们预设定的接口中,我们需要完成一个重置密码的功能。基本流程为,用户提交需要重置密码的邮箱,系统接收到后向邮箱发送一则消息,用户点击邮箱中带有加密信息的邮件再次向系统发起请求,系统通过验证后重置用户的密码。在这一个流程当中,发送邮件是一个耗时操作,如果采用同步的方式,一方面这会导致大量的请求浪费(因为要监听状态需要发起轮询请求),另一方面会导致接口数量不断增长变得臃肿,另外,对一些耗时操作同步请求会影响用户体验。基于上面的种种原因,我们有必要为系统接入基于事件异步通信,这样不仅为系统带来解耦,同时可以基于消息队列进行多个订阅处理,从而提高系统的运行效率。在go-micro中,我们可以通过broker组件来实现上述的异步通信。这里我们选择go-micro插件支持rabbitmq作为broker的驱动。

docker-compose安装rabbitmq

.env中添加配置信息

  1. ...
  2. #设置rabbitmq镜像版本
  3. RABBITMQ_VERSION=3.8.3-management
  4. #rabbitmq默认用户名称
  5. RABBITMQ_USER=root
  6. #rabbitmq默认密码
  7. RABBTIMQ_PASSWORD=root
  8. ...

修改docker-compose.yaml

  1. micro-rabbitmq:
  2. image: rabbitmq:${RABBITMQ_VERSION}
  3. restart: always
  4. ports:
  5. - 15672:15672
  6. - 5672:5672
  7. environment:
  8. - RABBITMQ_DEFAULT_USER=${RABBITMQ_USER}
  9. - RABBITMQ_DEFAULT_PASS=${RABBTIMQ_PASSWORD}
  10. networks:
  11. - micro-network

检查rabbitmq是否正常运行

检查容器是否正常运行

image.png

访问rabbitmq可视化管理界面

打开http://127.0.0.1:15672输入配置的用户名密码
image.png

编写重置密码服务

创建重置密码记录模型

  1. touch pkg/model/password.go
  1. package model
  2. import (
  3. db "github.com/869413421/micro-service/common/pkg/db"
  4. )
  5. // PasswordReset 重置密码模型
  6. type PasswordReset struct {
  7. db.BaseModel
  8. Token string `gorm:"column:token;type:varchar(255) not null;index" `
  9. Email string `gorm:"column:email;type:varchar(255) not null;index" valid:"email"`
  10. Verify int8 `gorm:"column:verify;type:tinyint(1);not null;default:0"`
  11. }
  12. // Store 创建重置记录
  13. func (model *PasswordReset) Store() (err error) {
  14. result := db.GetDB().Create(&model)
  15. err = result.Error
  16. if err != nil {
  17. return err
  18. }
  19. return nil
  20. }
  21. // Delete 删除数据库重置记录
  22. func (model *PasswordReset) Delete() (rowsAffected int64, err error) {
  23. result := db.GetDB().Delete(&model)
  24. err = result.Error
  25. if err != nil {
  26. return 0, err
  27. }
  28. rowsAffected = result.RowsAffected
  29. return rowsAffected, nil
  30. }
  31. // Update 更新数据库重置记录
  32. func (model *PasswordReset) Update() (rowsAffected int64, err error) {
  33. result := db.GetDB().Save(&model)
  34. err = result.Error
  35. if err != nil {
  36. return 0, err
  37. }
  38. rowsAffected = result.RowsAffected
  39. return rowsAffected, nil
  40. }

创建重置密码模型数据库交互层

  1. touch pkg/repo/password.go
  1. package repo
  2. import (
  3. db "github.com/869413421/micro-service/common/pkg/db"
  4. "github.com/869413421/micro-service/user/pkg/model"
  5. "gorm.io/gorm"
  6. )
  7. //PasswordRestRepositoryInterface 重置记录操作接口
  8. type PasswordRestRepositoryInterface interface {
  9. GetByEmail(email string) (*model.PasswordReset, error)
  10. GetByToken(token string) (*model.PasswordReset, error)
  11. }
  12. //PasswordResetRepository 重置记录操作仓库
  13. type PasswordResetRepository struct {
  14. DB *gorm.DB
  15. }
  16. // NewPasswordResetRepository 新建仓库
  17. func NewPasswordResetRepository() PasswordRestRepositoryInterface {
  18. return &PasswordResetRepository{DB: db.GetDB()}
  19. }
  20. // GetByEmail 根据邮件获取
  21. func (repo *PasswordResetRepository) GetByEmail(email string) (*model.PasswordReset, error) {
  22. passwordReset := &model.PasswordReset{}
  23. err := repo.DB.Where("email = ?", email).First(&passwordReset).Error
  24. if err != nil {
  25. return nil, err
  26. }
  27. return passwordReset, nil
  28. }
  29. // GetByToken 根据token获取
  30. func (repo *PasswordResetRepository) GetByToken(token string) (*model.PasswordReset, error) {
  31. passwordReset := &model.PasswordReset{}
  32. err := repo.DB.Where("token = ?", token).First(&passwordReset).Error
  33. if err != nil {
  34. return nil, err
  35. }
  36. return passwordReset, nil
  37. }

定义protobuf

在proto/user/user.proto添加相应的定义

  1. ...
  2. service UserService {
  3. rpc Pagination(PaginationRequest) returns(PaginationResponse){}
  4. rpc Get(GetRequest) returns(UserResponse){}
  5. rpc Create(CreateRequest) returns(UserResponse){}
  6. rpc Update(UpdateRequest) returns(UserResponse){}
  7. rpc Delete(DeleteRequest) returns(UserResponse){}
  8. rpc Auth(AuthRequest) returns(TokenResponse){}
  9. rpc ValidateToken(TokenRequest) returns(TokenResponse){}
  10. rpc CreatePasswordReset(CreatePasswordResetRequest) returns(PasswordReset){}
  11. rpc ResetPassword(ResetPasswordRequest) returns(ResetPasswordResponse){}
  12. }
  13. ...
  14. ...
  15. // PasswordReset 重置密码记录
  16. message PasswordReset{
  17. uint64 id = 1;
  18. string email = 2;
  19. string token = 3;
  20. string create_at = 4;
  21. }
  22. // CreatePasswordResetRequest 创建重置密码请求
  23. message CreatePasswordResetRequest{
  24. string email = 1;
  25. }
  26. // ResetPasswordRequest 重置密码请求
  27. message ResetPasswordRequest{
  28. string token = 1 ;
  29. }
  30. // ResetPasswordResponse 重置密码相应
  31. message ResetPasswordResponse{
  32. bool resetSuccess = 1;
  33. string newPassword = 2;
  34. }
  35. ...

生成代码

  1. make proto

编写密码重置业务代码

编写一个生成随机字符串方法,用于生成用户新密码

打开common项目

  1. mkdir pkg/string
  2. touch pkg/string/string.go
  1. package string
  2. import (
  3. "math/rand"
  4. "time"
  5. )
  6. // RandString 生成随机字符串
  7. func RandString(len int) string {
  8. r := rand.New(rand.NewSource(time.Now().UnixNano()))
  9. bytes := make([]byte, len)
  10. for i := 0; i < len; i++ {
  11. b := r.Intn(26) + 65
  12. bytes[i] = byte(b)
  13. }
  14. return string(bytes)
  15. }

使用事务进行密码重置

  1. touch service/password.go
  1. package service
  2. import (
  3. "errors"
  4. "github.com/869413421/micro-service/common/pkg/db"
  5. string2 "github.com/869413421/micro-service/common/pkg/string"
  6. "github.com/869413421/micro-service/user/pkg/repo"
  7. "gorm.io/gorm"
  8. "time"
  9. )
  10. // PasswordResetServiceInterface 重置密码业务接口
  11. type PasswordResetServiceInterface interface {
  12. Reset(token string) (string, error)
  13. }
  14. // PasswordResetService 重置密码业务
  15. type PasswordResetService struct {
  16. UserRepo repo.UserRepositoryInterface
  17. PasswordResetRepo repo.PasswordRestRepositoryInterface
  18. }
  19. // NewPasswordResetService 创建业务层
  20. func NewPasswordResetService() PasswordResetServiceInterface {
  21. return &PasswordResetService{
  22. UserRepo: repo.NewUserRepository(),
  23. PasswordResetRepo: repo.NewPasswordResetRepository(),
  24. }
  25. }
  26. // Reset 重置密码,返回新的密码
  27. func (srv *PasswordResetService) Reset(token string) (string, error) {
  28. //1.根据token获取密码重置记录
  29. passwordReset, err := srv.PasswordResetRepo.GetByToken(token)
  30. if err != nil {
  31. return "", err
  32. }
  33. //2.比较时间,查看邮件是否已经超时或已验证
  34. if passwordReset.Verify == 1 {
  35. return "", errors.New("the record has been verified")
  36. }
  37. d, _ := time.ParseDuration("+5m")
  38. expTime := passwordReset.CreatedAt.Add(d)
  39. if time.Now().After(expTime) {
  40. return "", errors.New("verify that the message has expired")
  41. }
  42. //3.获取用户更新密码(使用事務)
  43. newPassword := string2.RandString(8)
  44. db := db.GetDB()
  45. err = db.Transaction(func(tx *gorm.DB) error {
  46. user, err := srv.UserRepo.GetByEmail(passwordReset.Email)
  47. if err != nil {
  48. return err
  49. }
  50. user.Password = newPassword
  51. result := tx.Debug().Save(&user)
  52. err = result.Error
  53. if err != nil {
  54. return err
  55. }
  56. rowsAffected := result.RowsAffected
  57. if rowsAffected == 0 {
  58. return errors.New("update password fail")
  59. }
  60. //4.更新重置记录
  61. passwordReset.Verify = 1
  62. result = tx.Debug().Save(&passwordReset)
  63. err = result.Error
  64. if err != nil {
  65. return err
  66. }
  67. rowsAffected = result.RowsAffected
  68. if rowsAffected == 0 {
  69. return errors.New("update password fail")
  70. }
  71. return nil
  72. })
  73. if err != nil {
  74. return "", err
  75. }
  76. return newPassword, nil
  77. }

编写服务处理器代码

修改初始化依赖

  1. ...
  2. //UserServiceHandler 用户服务处理器
  3. type UserServiceHandler struct {
  4. UserRepo repo.UserRepositoryInterface
  5. TokenService service.Authble
  6. PasswordService service.PasswordResetServiceInterface
  7. }
  8. // NewUserServiceHandler 用户服务初始化
  9. func NewUserServiceHandler() *UserServiceHandler {
  10. return &UserServiceHandler{
  11. UserRepo: repo.NewUserRepository(),
  12. TokenService: service.NewTokenService(),
  13. PasswordService: service.NewPasswordResetService(),
  14. }
  15. }
  16. ...

增加创建重置记录,重置方法

  1. ...
  2. // CreatePasswordReset 创建密码重置记录
  3. func (srv *UserServiceHandler) CreatePasswordReset(ctx context.Context, req *pb.CreatePasswordResetRequest, rsp *pb.PasswordReset) error {
  4. //1.获取提交邮箱,检查用户是否存在
  5. _, err := srv.UserRepo.GetByEmail(req.Email)
  6. if err != nil {
  7. return errors.NotFound("User.CreatePasswordReset.GetByEmail.Error", "user not found ,check you email")
  8. }
  9. passwordReset := &model.PasswordReset{}
  10. types.Fill(passwordReset, req)
  11. //2.生成md5保存到数据库
  12. passwordReset.Token = password.Md5Str(req.Email + time.Now().String())
  13. err = passwordReset.Store()
  14. if err != nil {
  15. return err
  16. }
  17. //3.返回响应信息
  18. types.Fill(rsp, passwordReset)
  19. return nil
  20. }
  21. // ResetPassword 重置密码
  22. func (srv *UserServiceHandler) ResetPassword(ctx context.Context, req *pb.ResetPasswordRequest, rsp *pb.ResetPasswordResponse) error {
  23. //1.执行重置逻辑
  24. newPassword, err := srv.PasswordService.Reset(req.Token)
  25. if err != nil {
  26. return err
  27. }
  28. //2.返回新密码
  29. rsp.ResetSuccess = true
  30. rsp.NewPassword = newPassword
  31. return nil
  32. }
  33. ...

接入borker,编写订阅发布业务代码

上面的代码已经完成了创建密码重置记录以及密码重置等功能,但中间基于异步通信的发布消息,订阅消费(发送邮件)代码还没有实现。

使用rabbitmq作为go-microborker组件

修改docker-compose.yaml

通过环境变量为go-micro指定borker

  1. ...
  2. micro-user-service:
  3. depends_on: # 启动依赖,需要等etcd集群启动后才启动当前容器
  4. - etcd1
  5. - etcd2
  6. - etcd3
  7. - micro-user-db
  8. build: ./user # dockerfile所在目录
  9. environment:
  10. TZ: ${TZ}
  11. MICRO_SERVER_ADDRESS: ":9091" # 服务端口
  12. MICRO_REGISTRY: "etcd" # 注册中心类型
  13. MICRO_REGISTRY_ADDRESS: "etcd1:2379,etcd2:2379,etcd3:2379" # 注册中心集群地址
  14. DB_HOST: "micro-user-db:3306"
  15. DB_DATABASE: ${USER_DB_DATABASE}
  16. DB_USER: ${USER_DB_USER}
  17. DB_PASSWORD: ${USER_DB_PASSWORD}
  18. DB_MAX_CONNECTIONS: ${USER_DB_MAX_CONNECTIONS}
  19. DB_MAX_IDE_CONNECTIONS: ${USER_DB_MAX_IDE_CONNECTIONS}
  20. DB_CONNECTIONS_MAX_LIFE_TIME: ${USER_DB_CONNECTIONS_MAX_LIFE_TIME}
  21. MICRO_BROKER: "rabbitmq"
  22. MICRO_BROKER_ADDRESS: "amqp://${RABBITMQ_USER}:${RABBTIMQ_PASSWORD}@micro-rabbitmq:5672"
  23. restart: always
  24. ports:
  25. - 9092:9091
  26. volumes:
  27. - ./user:/app
  28. networks:
  29. - micro-network
  30. ...

获取rabbitmq插件包

  1. go get -u github.com/micro/go-plugins/broker/rabbitmq/v2

修改plugin.go

  1. package main
  2. import (
  3. // rabbitmq broker
  4. _ "github.com/micro/go-plugins/broker/rabbitmq/v2"
  5. )

修改makefile

  1. ...
  2. .PHONY: build
  3. build: proto
  4. CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -ldflags '-w' -i -o micro-user-service ./main.go ./plugin.go
  5. ...

通过上述步骤,基于插件机制使rabbitmq成为go-microborker组件的默认驱动。

封装container,Json包

考虑到整个程序的生命周期中,我们有许多对象需要全局使用,我们定义一个容器包将对象存储到当中,在需要时再从容器中取出使用。
打开common项目

  1. mkdir pkg/container
  2. touch pkg/container/service.go
  1. package container
  2. import (
  3. "github.com/micro/go-micro/v2"
  4. "github.com/micro/go-micro/v2/broker"
  5. )
  6. var service micro.Service
  7. // SetService 设置服务实例
  8. func SetService(srv micro.Service) {
  9. service = srv
  10. }
  11. // GetService 返回服务实例
  12. func GetService() micro.Service {
  13. return service
  14. }
  15. // GetServiceBroker 返回服务Broker实例
  16. func GetServiceBroker() broker.Broker {
  17. return service.Options().Broker
  18. }
  1. mkdir pkg/encoder
  2. touch pkg/encoder/encoder.go
  1. package encoder
  2. import jsoniter "github.com/json-iterator/go"
  3. var JsonHandler jsoniter.API
  4. func init() {
  5. JsonHandler = jsoniter.ConfigCompatibleWithStandardLibrary
  6. }

下载相关依赖go mod tidy

基于事件编写发布代码

  1. touch pkg/model/password_hooks.go
  1. package model
  2. import (
  3. "fmt"
  4. "github.com/869413421/micro-service/common/pkg/container"
  5. "github.com/869413421/micro-service/common/pkg/encoder"
  6. "github.com/micro/go-micro/v2/broker"
  7. "gorm.io/gorm"
  8. )
  9. var createTopic = "create.password.reset"
  10. // AfterCreate 创建成功后
  11. func (model *PasswordReset) AfterCreate(tx *gorm.DB) (err error) {
  12. if model.Email != "" {
  13. err := pushCreateEvent(model)
  14. if err != nil {
  15. return err
  16. }
  17. }
  18. return nil
  19. }
  20. // pushCreateEvent 推送创建消息
  21. func pushCreateEvent(model *PasswordReset) error {
  22. //1.获取发布连接
  23. publisher := container.GetServiceBroker()
  24. //2.构建broker消息
  25. body, err := encoder.JsonHandler.Marshal(model)
  26. if err != nil {
  27. return err
  28. }
  29. msg := &broker.Message{
  30. Header: map[string]string{
  31. "email": model.Email,
  32. },
  33. Body: body,
  34. }
  35. //3.发送消息到mq
  36. err = publisher.Publish(createTopic, msg)
  37. if err != nil {
  38. return err
  39. } else {
  40. fmt.Println("[pub] pubbed message:", string(msg.Body))
  41. }
  42. return nil
  43. }

在创建密码重置记录成功后出发了模型事件,这时候将消息推送到rabbitmq,完成消息发布流程

编写订阅代码

  1. touch subscriber/password.go
  1. package subscriber
  2. import (
  3. "fmt"
  4. "github.com/micro/go-micro/v2/broker"
  5. )
  6. // 重置密码事件
  7. const createPasswordResetTopic = "create.password.reset"
  8. // EventSubscriberInterface 事件订阅者启动器接口
  9. type EventSubscriberInterface interface {
  10. Subscriber() error
  11. }
  12. // EventSubscriber 事件订阅者启动器
  13. type EventSubscriber struct {
  14. Broker broker.Broker
  15. }
  16. // NewEventSubscriber 创建事件订阅启动器
  17. func NewEventSubscriber(brk broker.Broker) EventSubscriberInterface {
  18. return &EventSubscriber{Broker: brk}
  19. }
  20. // Subscriber 启动订阅
  21. func (subscriber EventSubscriber) Subscriber() error {
  22. //1.重置密码事件订阅
  23. _, err := subscriber.Broker.Subscribe(createPasswordResetTopic, func(event broker.Event) error {
  24. // TODO 发送邮件
  25. fmt.Println("[sub] received message:", string(event.Message().Body), "header", event.Message().Header)
  26. return nil
  27. }, broker.Queue(createPasswordResetTopic), broker.DisableAutoAck())
  28. if err != nil {
  29. return err
  30. }
  31. return nil
  32. }

修改main.go启动订阅

  1. package main
  2. import (
  3. "github.com/869413421/micro-service/common/pkg/container"
  4. "github.com/869413421/micro-service/common/pkg/db"
  5. "github.com/869413421/micro-service/user/handler"
  6. "github.com/869413421/micro-service/user/pkg/model"
  7. "github.com/869413421/micro-service/user/subscriber"
  8. "github.com/micro/go-micro/v2"
  9. log "github.com/micro/go-micro/v2/logger"
  10. proto "github.com/869413421/micro-service/user/proto/user"
  11. )
  12. func main() {
  13. // 1.准备数据库连接,并且执行数据库迁移
  14. db := db.GetDB()
  15. db.AutoMigrate(&model.User{})
  16. db.AutoMigrate(&model.PasswordReset{})
  17. // 2.创建服务
  18. service := micro.NewService(
  19. micro.Name("micro.service.user"),
  20. micro.Version("v1"),
  21. )
  22. // 3.初始化服务,全局化service对象
  23. service.Init()
  24. container.SetService(service)
  25. // 4.初始化borker
  26. brk := service.Options().Broker
  27. defer brk.Disconnect()
  28. err := brk.Init()
  29. if err != nil {
  30. log.Fatal(err)
  31. return
  32. }
  33. err = brk.Connect()
  34. if err != nil {
  35. log.Fatal("connection broker error:", err)
  36. return
  37. }
  38. // 5.订阅事件
  39. eventSubscriber := subscriber.NewEventSubscriber(brk)
  40. err = eventSubscriber.Subscriber()
  41. if err != nil {
  42. log.Fatal("subscriber broker error:", err)
  43. return
  44. }
  45. // 6.注册服务处理器
  46. proto.RegisterUserServiceHandler(service.Server(), handler.NewUserServiceHandler())
  47. // 7.运行服务
  48. if err := service.Run(); err != nil {
  49. log.Fatal(err)
  50. }
  51. }

至此,发布订阅代码完成

测试重置密码相关接口

编译代码,然后重启容器

  1. make build
  2. docker-compose up -d micro-user-service

调用了创建重置密码的记录后我们可以看到发布订阅代码中打印的相关信息都显示了
image.png
拿到日志中的token调用重置接口
image.png
接口返回新生成的密码。