对接流程描述
云主机,云硬盘,RDS,Redis,MongoDB的事件数据统一推送至rabbitMQ中类型为fanout的exchange中, 计量计费模块将queue绑定至该exchange,然后进行消费。
目前只接收 资源创建,资源升降配,资源销毁三种事件, 进行后付费的计算。
事件推送规范
vhost name
exchange name
exchange type
fanout
推送格式示例
{"method": "rds_create", //事件类型,以_分割,前面是服务类型,比如rds/mongo/redis/s3/h3pc/h3storage/hwpc/hwstorage,后面是发生的动作,例如create/upgrade/downgrade/delete,create代表资源创建,upgrade代表升配,downgrade代表降配,delete代表资源销毁"payload":{"occurTime": 112313223, //事件发生的时间,时间戳,精确到毫秒"name": "资源名称","ident": "资源英文标识","uuid": "UUID", //每个创建出来的资源实例的唯一id,一定要有,以后这个实例的所有事件都要使用这个resourceId"extend": "{\"name\": \"rds.m3.c2.d400\", //资源的规格名称,由业务线自定义\"cpu\": 1, //cpu个数 ,若没有则写0\"mem\": \"4068Gi\", //内存大小,单位必须是Gi,若没有则传0Gi,单位仅支持K,M,G,T,P,Ki,Mi,Gi,Ti,Pi这几种,其他一律视为无效数据\"disk\": \"10086002Gi\", //存储容量,单位必须是Gi, 若没有则传0Gi,单位仅支持K,M,G,T,P,Ki,Mi,Gi,Ti,Pi这几种,其他一律视为无效数据}", //当前的规格描述,是个字符串,""eventId": "UUID", //为这个事件创建一个唯一的id,一定要有,与实例无关,每个事件的eventId都应该不一样,一样的就会被认为是重复数据"labels":"{}", //资源自定义的标签,是个字符串"product": "rds/mongo/redis/s3/h3pc/h3storage/hwpc/hwstorage", //标明自己是哪个产品,枚举了字符串类别,与method字段前半部分一致"userId": 3, //进行操作的用户id"tenantId": 10, //资源所属的租户id"projectId": 4, //资源所属的项目id"nid": 12 //资源所挂载的服务树节点id}}
推送MQ的demo,仅供参考
package rabbitMQimport ("encoding/json""github.com/streadway/amqp""github.com/toolkits/pkg/logger""golang.org/x/net/context""time")const (defaultExchange = "DCLOUD_EVENT_EXCHANGE")type Message []bytetype session struct {*amqp.Connection*amqp.Channel}func (s session) Close() error {if s.Connection == nil {return nil}return s.Connection.Close()}var (RabbitMQ chan chan sessionMQChannel = make(chan Message))func Init() {conf := config.GetEnv()ctx, _ := context.WithCancel(context.Background())exchange := defaultExchangeif conf.RABBIT_EXCHANGE != "" {exchange = conf.RABBIT_EXCHANGE}RabbitMQ = redial(ctx,conf.RABBIT_HOST,conf.RABBIT_PORT,conf.RABBIT_USERNAME,conf.RABBIT_PASSWORD,conf.RABBIT_VHOST,exchange,false,)routingKey := "ignored for fanout exchanges, application dependent for other exchanges"go func(){Publish(RabbitMQ, exchange, MQChannel, routingKey)}()}func redial(ctx context.Context, host string, port string,name string, pwd string, vh string, exchange string, isQueue bool) (chan chan session){sessions := make(chan chan session)go func() {sess := make(chan session)defer close(sessions)for {select {case sessions <- sess:case <-ctx.Done():logger.Infoln("shutting down session factory")return}conn, err := amqp.Dial("amqp://"+name+":"+pwd+"@"+host+":"+port+"/"+vh)if err != nil {logger.Errorf("cannot (re)dial: %v \n", err)continue //TODO opt logic}ch, err := conn.Channel()if err != nil {logger.Errorf("cannot create channel: %v \n", err)continue}if isQueue {if _, err := ch.QueueDeclare(exchange, true, false, false, false, nil); err != nil {logger.Errorf("cannot declare single queue: %v \n", err)}} else {if err := ch.ExchangeDeclare(exchange, "fanout", true, false, false, false, nil); err != nil {logger.Errorf("cannot declare fanout exchange: %v \n", err)}}select {case sess <- session{conn, ch}:case <-ctx.Done():logger.Infoln("shutting down new session")}}}()return sessions}func Publish(sessions chan chan session, exchange string, messages <-chan Message, key string) {for c_session := range sessions {var (running boolreading = messagespending = make(chan Message, 1)confirm = make(chan amqp.Confirmation, 1)pub session)select {case pub = <-c_session:case <-time.After(time.Second * time.Duration(20)):// timeout means this c_session is unavaliablecontinue}if err := pub.Confirm(false); err != nil {logger.Infoln("publisher confirms not supported")close(confirm)} else {pub.NotifyPublish(confirm)}logger.Infoln("publishing...")Publish:for {var body Messageselect {case confirmed, ok := <-confirm:if !ok {logger.Errorln("rabbitmq error")break Publish}if !confirmed.Ack{logger.Infof("nack message %d, body: %q", confirmed.DeliveryTag, string(body))}reading = messagescase body = <-pending://routingKey := "ignored for fanout exchanges, application dependent for other exchanges"err := pub.Publish(exchange, key, false, false, amqp.Publishing{Body:body,})if err != nil {pending <- bodypub.Close()logger.Errorln("rabbitmq error")break Publish}case body, running = <-reading:if !running {return}pending <- bodyreading = nil}}}}var (rmq = rabbitMQ.RabbitMQmq = rabbitMQ.MQChannel)func FanoutArchitecture(m []byte) {logger.Infof("push message: %s", m)go func() {PutChan(m)//rabbitMQ.Publish(rmq, ChanByte(m))}()}func PutChan(b []byte) {mq <- b}func SendMsg(action string, msg interface{}) {js := EncodeStruct(action, msg)FanoutArchitecture(js)}func main() {go SendMsg("test", nil)}
rabbitMQ测试地址
amqp://admin:admin_202009@10.178.24.110:8002/
