amqp协议格式

  1. amqp://<user>:<password>@<ip>:<port>

开发环境

安装官方教程 https://www.rabbitmq.com/download.html
dokcer hub 地址https://registry.hub.docker.com/_/rabbitmq/

docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management

默认用户名和密码都是guest,这个账户只能通过本地地址访问。远程如果修改默认用户名和密码,需要设置环境变量
RABBITMQ_DEFAULT_USER 和 RABBITMQ_DEFAULT_PASS


$ docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=654321 rabbitmq:3-management

HelloWorld

创建生产者

package main

import (
    "log"

    "github.com/streadway/amqp"
)

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672")
    if err != nil {
        log.Fatal("Failed to connect to RabbitMQ:", err)
    }
    defer conn.Close()

    ch, err := conn.Channel()
    if err != nil {
        log.Fatal("Failed to open a channel:", err)
    }
    defer ch.Close()

    q, err := ch.QueueDeclare(
        "hello", // name
        false,   // durable
        false,   // delete when unused
        false,   // exclusive
        false,   // no-wait
        nil,     // arguments
    )

    if err != nil {
        log.Fatal("Failed to declare a queue:", err)
    }
    body := "Hello World!"
    err = ch.Publish(
        "",     // exchange
        q.Name, // routing key
        false,  // mandatory
        false,  // immediate
        amqp.Publishing{
            ContentType: "text/plain",
            Body:        []byte(body),
        })
    if err != nil {
        log.Fatal("Failed to publish a message:", err)
    }
    log.Printf(" [x] Sent %s", body)
}

创建消费者

package main

import (
    "log"

    "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
    }
}

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    q, err := ch.QueueDeclare(
        "hello", // name
        false,   // durable
        false,   // delete when unused
        false,   // exclusive
        false,   // no-wait
        nil,     // arguments
    )
    failOnError(err, "Failed to declare a queue")

    msgs, err := ch.Consume(
        q.Name, // queue
        "",     // consumer
        true,   // auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
    failOnError(err, "Failed to register a consumer")

    forever := make(chan bool)

    go func() {
        for d := range msgs {
            log.Printf("Received a message: %s", d.Body)
        }
    }()

    log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
    <-forever
}

参考

https://github.com/streadway/amqp
https://www.jianshu.com/p/dca01aad6bc8