基本概念
官方網頁
RabbitMQ就像郵局,唯一不同的是它送數位資料,郵局送信。
Queue就是郵筒,存在郵局裡(RabbitMQ),只受限於ram跟disk。
message只存在queue裡。
多個producers可以送message給同一個queue。
多個consumer可以從同一個queue收message。
基本上每個message被收走就沒有了。
- RabbitMQ=郵局
- Queue=郵筒(存在於郵局RabbitMQ)
- Message=信
- Exchange=在producer這邊,他是收信的工具、在consumer這邊,他是送信的工具(存在於郵局RabbitMQ)
- Producer=寄信者
- Consumer=收信者
連線架構
TCP connection -> connection -> channel
Message 送出後,consumer會回傳ack(acknowledgement),
確認message正確收取,否則會re-queue此message。
訊息模型
Producer -> exchange -> Queue -> exchange -> Consumer
                        Queue -> exchange -> Consumer
以下為完成傳遞必要項目,缺一不可
- 1 由Producer產生Message
- 2 Message送到左邊的Exchange
- 3 左邊的Exchange把Message送到綁定的Queue
-  4 右邊的Exchange從Queue取出Message
- 5 右邊的Exchange把Message送給Consumer
Producer及Consumer都可以宣告Exchange及Queue
若是沒宣告就是使用預設的,或是RabbitMQ會自行建立
宣告同名的Exchange或是Queue,就表示用現有的,RabbitMQ不會新建立一個
宣告同名的Exchange或是Queue,參數必須完全相同,否則報錯
基本模型(Producer/Consumer)
Producer -> Queue -> Consumer
Producer/Consumer是位於app,Queue是位於RabbitMQ
多個consumer模型(Worker Queues)
1 massage平均分配給多個consumer
2 等consumer回覆之後,才送新的message下去
Exchange模型(Publish/Subscribe)
所有要收的consumer,自行綁定上該exchange
一 Produce/Consumer(基本模型)
官方網頁
二 Work Queues(message acknowledgement、message durable)
官方網頁
- Round-robin dispatching 輪詢式派送 
輪流派送給每個註冊的worker 
- Massage acknowledgement 訊息確認
msgs, err := ch.Consume(
  q.Name, // queue
  "",     // consumer
  false,  // auto-ack
  false,  // exclusive
  false,  // no-local
  false,  // no-wait
  nil,    // args
)
//幾本上auto-ack都要設成true,不然就要自己送
//否則rabbitmq 會保留此message,造成資源消耗
go func() {
  for d := range msgs {
    log.Printf("Received a message: %s", d.Body)
    dot_count := bytes.Count(d.Body, []byte("."))
    t := time.Duration(dot_count)
    time.Sleep(t * time.Second)
    log.Printf("Done")
    d.Ack(false)
  }
}()
- Massage durability 訊息耐用(即便rabbitmq重開,未送出的message也不會消失)
q, err := ch.QueueDeclare(
  "hello",      // name
  true,         // durable
  false,        // delete when unused
  false,        // exclusive
  false,        // no-wait
  nil,          // arguments
)
//producer、consumer宣告的queue durable要設成true
//宣告同名的queue參數必須相同,否則錯誤
err = ch.Publish(
  "",           // exchange
  q.Name,       // routing key
  false,        // mandatory
  false,
  amqp.Publishing {
    DeliveryMode: amqp.Persistent,
    ContentType:  "text/plain",
    Body:         []byte(body),
})
//publishing DeliveryMode 要設成 amqp.Persistent
- 完成一個work才繼續派送(Fair dispatch 一節)
基本上RabbitMQ的派送是平均派送,當他收到就派送,
所以有可能派送給還沒完成工作的worker。
可設定prefetch count=1
這時rabbitMQ會收到前一個message的acknowledgement才會派送下一個。
注意,此時queue有可能被填滿,你需要多設定幾個woker。
err = ch.Qos(
  1,     // prefetch count
  0,     // prefetch size
  false, // global
)
 
三 Publish/Subscribe(one message to multiple consumers)
官方網頁
Published message are going to be broadcast to all the receivers.
- Exchanges4 types of exchange:direct,topic,headers,fanout
err = ch.ExchangeDeclare(
  "logs",   // name
  "fanout", // type
  true,     // durable
  false,    // auto-deleted
  false,    // internal
  false,    // no-wait
  nil,      // arguments
)
//fanout:broadcast all messages to all the queues it knows
 
- Bindings(Binding exchange and queue)
err = ch.QueueBind(
  q.Name, // queue name
  "",     // routing key
  "logs", // exchange
  false,
  nil
)
 
四 Routing(用路由方式傳送message給不同Queue)
官方網頁
Subscribe only to a subset of the messages.
- 基本概念
exchange publish時,可以加上路由(routing key)
queue可以綁定exchange及routing key
一個queue可以綁定一個exchange及多個routing key
這樣就可以一個exchange 用多種routing發布,然後consumer利用不同routing接收
 
- Bindings
//Binding 是綁定exchange 及 queue
//Binding時可以設定路由,用routing key parameter
err = ch.QueueBind(
  q.Name,    // queue name
  "black",   // routing key
  "logs",    // exchange
  false,
  nil)
//route to black
 
- Direct exchange
//用direct exchange才能使用binding的routing key
//用fanout exchange會忽略routing key參數,合理,所以他才叫fanout
err = ch.ExchangeDeclare(
                "logs_direct", // name
                "direct",      // type
                true,          // durable
                false,         // auto-deleted
                false,         // internal
                false,         // no-wait
                nil,           // arguments
)
 q, err := ch.QueueDeclare(
                "",    // name
                false, // durable
                false, // delete when usused
                true,  // exclusive
                false, // no-wait
                nil,   // arguments
 )
        
err = ch.QueueBind(
  q.Name, // queue name
  "",     // routing key
  "logs", // exchange
  false,
  nil
)
五 Publish/Subscribe(one message to multiple consumers)
官方網頁
Published message are going to be broadcast to all the receivers.
- Exchanges4 types of exchange:direct,topic,headers,fanout
err = ch.ExchangeDeclare(
  "logs",   // name
  "fanout", // type
  true,     // durable
  false,    // auto-deleted
  false,    // internal
  false,    // no-wait
  nil,      // arguments
)
//fanout:broadcast all messages to all the queues it knows
 
- Bindings(Binding exchange and queue)
err = ch.QueueBind(
  q.Name, // queue name
  "",     // routing key
  "logs", // exchange
  false,
  nil
)
 
- 
- 
正式產品需要注意的主題
官方說明文件
- Connection Management
- Error Handling
- Connection Recovery
- Concurrency
- Metric Collection
參考頁面
Publisher Confirms and Consumer Acknowledgements
Production Checklist
Mornitoring
linux安裝
主目錄  /opt/rabbitmq
看相關變數 printenv | grep rabbitmq
執行檔 /opt/rabbitmq/sbin
執行檔 /usr/local/bin
服務啟動
systemctl start rabbitmq-server
mac home brew 安裝
官網說明
相關的scripts及cli tools安裝在/usr/local/opt/rabbitmq/sbin,
需要自己加到path
export PATH=$PATH:/usr/local/opt/rabbitmq/sbin
服務啟動
brew services start rabbitmq
brew services stoprabbitmq
啟動在前景
/usr/local/Cellar/rabbitmq/3.7.11/sbin/rabbitmq-server
管理 rabbitmqctl
用這個程式管理rabbitmq大部分項目,像是增加使用者、列出使用者、重啟、列出參數等。
範例
增加使用者 add_user [userName][password]
rabbitmqctl add_user admin admin
更改使用者tag,改成管理者 set_user_tags 
  [...]
rabbitmqctl set_user_tags admin administrator
更改使用者權限 set_permissions [name]
rabbitmqctl set_permissions -p / username ".*" ".*" ".*"
更改密碼 change_password [username] [password]
rabbitmqctl change_password admin admin
查詢未確認的message(unacknowledged) list_queues [queueName] messages_ready messages_unacknowledged
sudo rabbitmqctl list_queues queueName messages_ready messages_unacknowledged
監控
監控程式啟動在 http://localhost:15672
使用瀏覽器就可以開啟,要先建立一個user
rabbitmq-plugins enable rabbitmq-management
外掛管理
rabbitmq-plugins [-n [node]] [-t [timeout]] [-l] [-q] [command] [command options]
Eg.
rabbitmq-plugins [-n ] [-t ] [-l] [-q] is_enabled [plugin1] [plugin2] 
參數
-n node
-q quiet
-h  help
list 列出所有外掛
is_enabled [plugin1][,[plugin2]]   查看plugin是否啟用
enable [plugin]  啟用外掛
範例
啟用外掛
rabbitmq-plugins enable rabbitmq_management
管理工具
rabbitmqadmin