对接流程描述
云主机,云硬盘,RDS,Redis,MongoDB的事件数据统一推送至rabbitMQ中类型为fanout的exchange中, 计量计费模块将queue绑定至该exchange,然后进行消费。
目前只接收 资源创建,资源升降配,资源销毁三种事件, 进行后付费的计算。
事件推送规范
vhost name
exchange name
exchange type
fanout
推送格式示例
{
"method": "rds_create", //事件类型,以_分割,前面是服务类型,比如rds/mongo/redis/s3/h3pc/h3storage/hwpc/hwstorage,后面是发生的动作,例如create/upgrade/downgrade/delete,create代表资源创建,upgrade代表升配,downgrade代表降配,delete代表资源销毁
"payload":{
"occurTime": 112313223, //事件发生的时间,时间戳,精确到毫秒
"name": "资源名称",
"ident": "资源英文标识",
"uuid": "UUID", //每个创建出来的资源实例的唯一id,一定要有,以后这个实例的所有事件都要使用这个resourceId
"extend": "{
\"name\": \"rds.m3.c2.d400\", //资源的规格名称,由业务线自定义
\"cpu\": 1, //cpu个数 ,若没有则写0
\"mem\": \"4068Gi\", //内存大小,单位必须是Gi,若没有则传0Gi,单位仅支持K,M,G,T,P,Ki,Mi,Gi,Ti,Pi这几种,其他一律视为无效数据
\"disk\": \"10086002Gi\", //存储容量,单位必须是Gi, 若没有则传0Gi,单位仅支持K,M,G,T,P,Ki,Mi,Gi,Ti,Pi这几种,其他一律视为无效数据
}", //当前的规格描述,是个字符串,"
"eventId": "UUID", //为这个事件创建一个唯一的id,一定要有,与实例无关,每个事件的eventId都应该不一样,一样的就会被认为是重复数据
"labels":"{}", //资源自定义的标签,是个字符串
"product": "rds/mongo/redis/s3/h3pc/h3storage/hwpc/hwstorage", //标明自己是哪个产品,枚举了字符串类别,与method字段前半部分一致
"userId": 3, //进行操作的用户id
"tenantId": 10, //资源所属的租户id
"projectId": 4, //资源所属的项目id
"nid": 12 //资源所挂载的服务树节点id
}
}
推送MQ的demo,仅供参考
package rabbitMQ
import (
"encoding/json"
"github.com/streadway/amqp"
"github.com/toolkits/pkg/logger"
"golang.org/x/net/context"
"time"
)
const (
defaultExchange = "DCLOUD_EVENT_EXCHANGE"
)
type Message []byte
type session struct {
*amqp.Connection
*amqp.Channel
}
func (s session) Close() error {
if s.Connection == nil {
return nil
}
return s.Connection.Close()
}
var (
RabbitMQ chan chan session
MQChannel = make(chan Message)
)
func Init() {
conf := config.GetEnv()
ctx, _ := context.WithCancel(context.Background())
exchange := defaultExchange
if conf.RABBIT_EXCHANGE != "" {
exchange = conf.RABBIT_EXCHANGE
}
RabbitMQ = redial(ctx,
conf.RABBIT_HOST,
conf.RABBIT_PORT,
conf.RABBIT_USERNAME,
conf.RABBIT_PASSWORD,
conf.RABBIT_VHOST,
exchange,
false,
)
routingKey := "ignored for fanout exchanges, application dependent for other exchanges"
go func(){
Publish(RabbitMQ, exchange, MQChannel, routingKey)
}()
}
func redial(ctx context.Context, host string, port string,
name string, pwd string, vh string, exchange string, isQueue bool) (chan chan session){
sessions := make(chan chan session)
go func() {
sess := make(chan session)
defer close(sessions)
for {
select {
case sessions <- sess:
case <-ctx.Done():
logger.Infoln("shutting down session factory")
return
}
conn, err := amqp.Dial("amqp://"+name+":"+pwd+"@"+host+":"+port+"/"+vh)
if err != nil {
logger.Errorf("cannot (re)dial: %v \n", err)
continue //TODO opt logic
}
ch, err := conn.Channel()
if err != nil {
logger.Errorf("cannot create channel: %v \n", err)
continue
}
if isQueue {
if _, err := ch.QueueDeclare(exchange, true, false, false, false, nil); err != nil {
logger.Errorf("cannot declare single queue: %v \n", err)
}
} else {
if err := ch.ExchangeDeclare(exchange, "fanout", true, false, false, false, nil); err != nil {
logger.Errorf("cannot declare fanout exchange: %v \n", err)
}
}
select {
case sess <- session{conn, ch}:
case <-ctx.Done():
logger.Infoln("shutting down new session")
}
}
}()
return sessions
}
func Publish(sessions chan chan session, exchange string, messages <-chan Message, key string) {
for c_session := range sessions {
var (
running bool
reading = messages
pending = make(chan Message, 1)
confirm = make(chan amqp.Confirmation, 1)
pub session
)
select {
case pub = <-c_session:
case <-time.After(time.Second * time.Duration(20)):
// timeout means this c_session is unavaliable
continue
}
if err := pub.Confirm(false); err != nil {
logger.Infoln("publisher confirms not supported")
close(confirm)
} else {
pub.NotifyPublish(confirm)
}
logger.Infoln("publishing...")
Publish:
for {
var body Message
select {
case confirmed, ok := <-confirm:
if !ok {
logger.Errorln("rabbitmq error")
break Publish
}
if !confirmed.Ack{
logger.Infof("nack message %d, body: %q", confirmed.DeliveryTag, string(body))
}
reading = messages
case body = <-pending:
//routingKey := "ignored for fanout exchanges, application dependent for other exchanges"
err := pub.Publish(exchange, key, false, false, amqp.Publishing{
Body:body,
})
if err != nil {
pending <- body
pub.Close()
logger.Errorln("rabbitmq error")
break Publish
}
case body, running = <-reading:
if !running {
return
}
pending <- body
reading = nil
}
}
}
}
var (
rmq = rabbitMQ.RabbitMQ
mq = rabbitMQ.MQChannel
)
func FanoutArchitecture(m []byte) {
logger.Infof("push message: %s", m)
go func() {
PutChan(m)
//rabbitMQ.Publish(rmq, ChanByte(m))
}()
}
func PutChan(b []byte) {
mq <- b
}
func SendMsg(action string, msg interface{}) {
js := EncodeStruct(action, msg)
FanoutArchitecture(js)
}
func main() {
go SendMsg("test", nil)
}
rabbitMQ测试地址
amqp://admin:admin_202009@10.178.24.110:8002/