安装

  1. import "github.com/go-redis/redis/v7"
  2. go mod tidy

创建交换器 - Add exchange

  1. // ExchangeDelete removes the named exchange from the server.
  2. func (r *RabbitMQ) DeleteExchange(name string) (err error) {
  3. err = r.channel.ExchangeDelete(name, false, false)
  4. if err != nil {
  5. log.Printf("[amqp] delete exchange error: %s\n", err)
  6. return err
  7. }
  8. return nil
  9. }

创建队列 - Add a new queue

  1. // QueueDeclare declares a queue to hold messages and deliver to consumers.
  2. func (r *RabbitMQ) DeclareQueue(name string, durable, autodelete, exclusive, nowait bool) (err error) {
  3. _, err = r.channel.QueueDeclare(name, durable, autodelete, exclusive, nowait, nil)
  4. if err != nil {
  5. log.Printf("[amqp] declare queue error: %s\n", err)
  6. return err
  7. }
  8. return nil
  9. }

路由绑定 - Bindings

  1. // QueueBind binds an exchange to a queue
  2. func (r *RabbitMQ) BindQueue(queue, exchange string, keys []string, nowait bool) (err error) {
  3. for _, key := range keys {
  4. if err = r.channel.QueueBind(queue, key, exchange, nowait, nil); err != nil {
  5. log.Printf("[amqp] bind queue error: %s\n", err)
  6. return err
  7. }
  8. }
  9. return nil
  10. }

发布消息 - Publish message

  1. func (m *mqController) PublishHandler(w http.ResponseWriter, r *http.Request) {
  2. //...
  3. err = RabbitMQ.Publish(MessageEntity.Exchange, MessageEntity.Key, MessageEntity.DeliveryMode, MessageEntity.Priority, MessageEntity.Body);
  4. //...
  5. }

消费消息 - Consume message

  1. func (m *mqController) QueueHandler(w http.ResponseWriter, r *http.Request) {
  2. //...
  3. err := RabbitMQ.ConsumeQueue(name, message);
  4. //...
  5. }

总结