对接流程描述

云主机,云硬盘,RDS,Redis,MongoDB的事件数据统一推送至rabbitMQ中类型为fanoutexchange中, 计量计费模块将queue绑定至该exchange,然后进行消费。

目前只接收 资源创建,资源升降配,资源销毁三种事件, 进行后付费的计算。

事件推送规范

vhost name

dcloud

exchange name

DCLOUD_EVENT_EXCHANGE

exchange type

fanout

推送格式示例
  1. {
  2. "method": "rds_create", //事件类型,以_分割,前面是服务类型,比如rds/mongo/redis/s3/h3pc/h3storage/hwpc/hwstorage,后面是发生的动作,例如create/upgrade/downgrade/delete,create代表资源创建,upgrade代表升配,downgrade代表降配,delete代表资源销毁
  3. "payload":{
  4. "occurTime": 112313223, //事件发生的时间,时间戳,精确到毫秒
  5. "name": "资源名称",
  6. "ident": "资源英文标识",
  7. "uuid": "UUID", //每个创建出来的资源实例的唯一id,一定要有,以后这个实例的所有事件都要使用这个resourceId
  8. "extend": "{
  9. \"name\": \"rds.m3.c2.d400\", //资源的规格名称,由业务线自定义
  10. \"cpu\": 1, //cpu个数 ,若没有则写0
  11. \"mem\": \"4068Gi\", //内存大小,单位必须是Gi,若没有则传0Gi,单位仅支持KMGTPKi,Mi,Gi,Ti,Pi这几种,其他一律视为无效数据
  12. \"disk\": \"10086002Gi\", //存储容量,单位必须是Gi 若没有则传0Gi,单位仅支持KMGTPKi,Mi,Gi,Ti,Pi这几种,其他一律视为无效数据
  13. }", //当前的规格描述,是个字符串,"
  14. "eventId": "UUID", //为这个事件创建一个唯一的id,一定要有,与实例无关,每个事件的eventId都应该不一样,一样的就会被认为是重复数据
  15. "labels":"{}", //资源自定义的标签,是个字符串
  16. "product": "rds/mongo/redis/s3/h3pc/h3storage/hwpc/hwstorage", //标明自己是哪个产品,枚举了字符串类别,与method字段前半部分一致
  17. "userId": 3, //进行操作的用户id
  18. "tenantId": 10, //资源所属的租户id
  19. "projectId": 4, //资源所属的项目id
  20. "nid": 12 //资源所挂载的服务树节点id
  21. }
  22. }

推送MQ的demo,仅供参考

  1. package rabbitMQ
  2. import (
  3. "encoding/json"
  4. "github.com/streadway/amqp"
  5. "github.com/toolkits/pkg/logger"
  6. "golang.org/x/net/context"
  7. "time"
  8. )
  9. const (
  10. defaultExchange = "DCLOUD_EVENT_EXCHANGE"
  11. )
  12. type Message []byte
  13. type session struct {
  14. *amqp.Connection
  15. *amqp.Channel
  16. }
  17. func (s session) Close() error {
  18. if s.Connection == nil {
  19. return nil
  20. }
  21. return s.Connection.Close()
  22. }
  23. var (
  24. RabbitMQ chan chan session
  25. MQChannel = make(chan Message)
  26. )
  27. func Init() {
  28. conf := config.GetEnv()
  29. ctx, _ := context.WithCancel(context.Background())
  30. exchange := defaultExchange
  31. if conf.RABBIT_EXCHANGE != "" {
  32. exchange = conf.RABBIT_EXCHANGE
  33. }
  34. RabbitMQ = redial(ctx,
  35. conf.RABBIT_HOST,
  36. conf.RABBIT_PORT,
  37. conf.RABBIT_USERNAME,
  38. conf.RABBIT_PASSWORD,
  39. conf.RABBIT_VHOST,
  40. exchange,
  41. false,
  42. )
  43. routingKey := "ignored for fanout exchanges, application dependent for other exchanges"
  44. go func(){
  45. Publish(RabbitMQ, exchange, MQChannel, routingKey)
  46. }()
  47. }
  48. func redial(ctx context.Context, host string, port string,
  49. name string, pwd string, vh string, exchange string, isQueue bool) (chan chan session){
  50. sessions := make(chan chan session)
  51. go func() {
  52. sess := make(chan session)
  53. defer close(sessions)
  54. for {
  55. select {
  56. case sessions <- sess:
  57. case <-ctx.Done():
  58. logger.Infoln("shutting down session factory")
  59. return
  60. }
  61. conn, err := amqp.Dial("amqp://"+name+":"+pwd+"@"+host+":"+port+"/"+vh)
  62. if err != nil {
  63. logger.Errorf("cannot (re)dial: %v \n", err)
  64. continue //TODO opt logic
  65. }
  66. ch, err := conn.Channel()
  67. if err != nil {
  68. logger.Errorf("cannot create channel: %v \n", err)
  69. continue
  70. }
  71. if isQueue {
  72. if _, err := ch.QueueDeclare(exchange, true, false, false, false, nil); err != nil {
  73. logger.Errorf("cannot declare single queue: %v \n", err)
  74. }
  75. } else {
  76. if err := ch.ExchangeDeclare(exchange, "fanout", true, false, false, false, nil); err != nil {
  77. logger.Errorf("cannot declare fanout exchange: %v \n", err)
  78. }
  79. }
  80. select {
  81. case sess <- session{conn, ch}:
  82. case <-ctx.Done():
  83. logger.Infoln("shutting down new session")
  84. }
  85. }
  86. }()
  87. return sessions
  88. }
  89. func Publish(sessions chan chan session, exchange string, messages <-chan Message, key string) {
  90. for c_session := range sessions {
  91. var (
  92. running bool
  93. reading = messages
  94. pending = make(chan Message, 1)
  95. confirm = make(chan amqp.Confirmation, 1)
  96. pub session
  97. )
  98. select {
  99. case pub = <-c_session:
  100. case <-time.After(time.Second * time.Duration(20)):
  101. // timeout means this c_session is unavaliable
  102. continue
  103. }
  104. if err := pub.Confirm(false); err != nil {
  105. logger.Infoln("publisher confirms not supported")
  106. close(confirm)
  107. } else {
  108. pub.NotifyPublish(confirm)
  109. }
  110. logger.Infoln("publishing...")
  111. Publish:
  112. for {
  113. var body Message
  114. select {
  115. case confirmed, ok := <-confirm:
  116. if !ok {
  117. logger.Errorln("rabbitmq error")
  118. break Publish
  119. }
  120. if !confirmed.Ack{
  121. logger.Infof("nack message %d, body: %q", confirmed.DeliveryTag, string(body))
  122. }
  123. reading = messages
  124. case body = <-pending:
  125. //routingKey := "ignored for fanout exchanges, application dependent for other exchanges"
  126. err := pub.Publish(exchange, key, false, false, amqp.Publishing{
  127. Body:body,
  128. })
  129. if err != nil {
  130. pending <- body
  131. pub.Close()
  132. logger.Errorln("rabbitmq error")
  133. break Publish
  134. }
  135. case body, running = <-reading:
  136. if !running {
  137. return
  138. }
  139. pending <- body
  140. reading = nil
  141. }
  142. }
  143. }
  144. }
  145. var (
  146. rmq = rabbitMQ.RabbitMQ
  147. mq = rabbitMQ.MQChannel
  148. )
  149. func FanoutArchitecture(m []byte) {
  150. logger.Infof("push message: %s", m)
  151. go func() {
  152. PutChan(m)
  153. //rabbitMQ.Publish(rmq, ChanByte(m))
  154. }()
  155. }
  156. func PutChan(b []byte) {
  157. mq <- b
  158. }
  159. func SendMsg(action string, msg interface{}) {
  160. js := EncodeStruct(action, msg)
  161. FanoutArchitecture(js)
  162. }
  163. func main() {
  164. go SendMsg("test", nil)
  165. }

rabbitMQ测试地址

amqp://admin:admin_202009@10.178.24.110:8002/