基本概念
官方網頁
RabbitMQ就像郵局,唯一不同的是它送數位資料,郵局送信。
Queue就是郵筒,存在郵局裡(RabbitMQ),只受限於ram跟disk。
message只存在queue裡。
多個producers可以送message給同一個queue。
多個consumer可以從同一個queue收message。
基本上每個message被收走就沒有了。
- RabbitMQ=郵局
- Queue=郵筒(存在於郵局RabbitMQ)
- Message=信
- Exchange=在producer這邊,他是收信的工具、在consumer這邊,他是送信的工具(存在於郵局RabbitMQ)
- Producer=寄信者
- Consumer=收信者
連線架構
TCP connection -> connection -> channel
Message 送出後,consumer會回傳ack(acknowledgement),
確認message正確收取,否則會re-queue此message。
訊息模型
Producer -> exchange -> Queue -> exchange -> Consumer
Queue -> exchange -> Consumer
以下為完成傳遞必要項目,缺一不可
- 1 由Producer產生Message
- 2 Message送到左邊的Exchange
- 3 左邊的Exchange把Message送到綁定的Queue
- 4 右邊的Exchange從Queue取出Message
- 5 右邊的Exchange把Message送給Consumer
Producer及Consumer都可以宣告Exchange及Queue
若是沒宣告就是使用預設的,或是RabbitMQ會自行建立
宣告同名的Exchange或是Queue,就表示用現有的,RabbitMQ不會新建立一個
宣告同名的Exchange或是Queue,參數必須完全相同,否則報錯
基本模型(Producer/Consumer)
Producer -> Queue -> Consumer
Producer/Consumer是位於app,Queue是位於RabbitMQ
多個consumer模型(Worker Queues)
1 massage平均分配給多個consumer
2 等consumer回覆之後,才送新的message下去
Exchange模型(Publish/Subscribe)
所有要收的consumer,自行綁定上該exchange
一 Produce/Consumer(基本模型)
官方網頁
二 Work Queues(message acknowledgement、message durable)
官方網頁
- Round-robin dispatching 輪詢式派送
輪流派送給每個註冊的worker
- Massage acknowledgement 訊息確認
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
//幾本上auto-ack都要設成true,不然就要自己送
//否則rabbitmq 會保留此message,造成資源消耗
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
dot_count := bytes.Count(d.Body, []byte("."))
t := time.Duration(dot_count)
time.Sleep(t * time.Second)
log.Printf("Done")
d.Ack(false)
}
}()
- Massage durability 訊息耐用(即便rabbitmq重開,未送出的message也不會消失)
q, err := ch.QueueDeclare(
"hello", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
//producer、consumer宣告的queue durable要設成true
//宣告同名的queue參數必須相同,否則錯誤
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false,
amqp.Publishing {
DeliveryMode: amqp.Persistent,
ContentType: "text/plain",
Body: []byte(body),
})
//publishing DeliveryMode 要設成 amqp.Persistent
- 完成一個work才繼續派送(Fair dispatch 一節)
基本上RabbitMQ的派送是平均派送,當他收到就派送,
所以有可能派送給還沒完成工作的worker。
可設定prefetch count=1
這時rabbitMQ會收到前一個message的acknowledgement才會派送下一個。
注意,此時queue有可能被填滿,你需要多設定幾個woker。
err = ch.Qos(
1, // prefetch count
0, // prefetch size
false, // global
)
三 Publish/Subscribe(one message to multiple consumers)
官方網頁
Published message are going to be broadcast to all the receivers.
- Exchanges
4 types of exchange:direct,topic,headers,fanout
err = ch.ExchangeDeclare(
"logs", // name
"fanout", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
//fanout:broadcast all messages to all the queues it knows
- Bindings(Binding exchange and queue)
err = ch.QueueBind(
q.Name, // queue name
"", // routing key
"logs", // exchange
false,
nil
)
四 Routing(用路由方式傳送message給不同Queue)
官方網頁
Subscribe only to a subset of the messages.
- 基本概念
exchange publish時,可以加上路由(routing key)
queue可以綁定exchange及routing key
一個queue可以綁定一個exchange及多個routing key
這樣就可以一個exchange 用多種routing發布,然後consumer利用不同routing接收
- Bindings
//Binding 是綁定exchange 及 queue
//Binding時可以設定路由,用routing key parameter
err = ch.QueueBind(
q.Name, // queue name
"black", // routing key
"logs", // exchange
false,
nil)
//route to black
- Direct exchange
//用direct exchange才能使用binding的routing key
//用fanout exchange會忽略routing key參數,合理,所以他才叫fanout
err = ch.ExchangeDeclare(
"logs_direct", // name
"direct", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
q, err := ch.QueueDeclare(
"", // name
false, // durable
false, // delete when usused
true, // exclusive
false, // no-wait
nil, // arguments
)
err = ch.QueueBind(
q.Name, // queue name
"", // routing key
"logs", // exchange
false,
nil
)
五 Publish/Subscribe(one message to multiple consumers)
官方網頁
Published message are going to be broadcast to all the receivers.
- Exchanges
4 types of exchange:direct,topic,headers,fanout
err = ch.ExchangeDeclare(
"logs", // name
"fanout", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
//fanout:broadcast all messages to all the queues it knows
- Bindings(Binding exchange and queue)
err = ch.QueueBind(
q.Name, // queue name
"", // routing key
"logs", // exchange
false,
nil
)
-
-
正式產品需要注意的主題
官方說明文件
- Connection Management
- Error Handling
- Connection Recovery
- Concurrency
- Metric Collection
參考頁面
Publisher Confirms and Consumer Acknowledgements
Production Checklist
Mornitoring
linux安裝
主目錄 /opt/rabbitmq
看相關變數 printenv | grep rabbitmq
執行檔 /opt/rabbitmq/sbin
執行檔 /usr/local/bin
服務啟動
systemctl start rabbitmq-server
mac home brew 安裝
官網說明
相關的scripts及cli tools安裝在/usr/local/opt/rabbitmq/sbin,
需要自己加到path
export PATH=$PATH:/usr/local/opt/rabbitmq/sbin
服務啟動
brew services start rabbitmq
brew services stoprabbitmq
啟動在前景
/usr/local/Cellar/rabbitmq/3.7.11/sbin/rabbitmq-server
管理 rabbitmqctl
用這個程式管理rabbitmq大部分項目,像是增加使用者、列出使用者、重啟、列出參數等。
範例
增加使用者 add_user [userName][password]
rabbitmqctl add_user admin admin
更改使用者tag,改成管理者 set_user_tags
[...]
rabbitmqctl set_user_tags admin administrator
更改使用者權限 set_permissions [name]
rabbitmqctl set_permissions -p / username ".*" ".*" ".*"
更改密碼 change_password [username] [password]
rabbitmqctl change_password admin admin
查詢未確認的message(unacknowledged) list_queues [queueName] messages_ready messages_unacknowledged
sudo rabbitmqctl list_queues queueName messages_ready messages_unacknowledged
監控
監控程式啟動在 http://localhost:15672
使用瀏覽器就可以開啟,要先建立一個user
rabbitmq-plugins enable rabbitmq-management
外掛管理
rabbitmq-plugins [-n [node]] [-t [timeout]] [-l] [-q] [command] [command options]
Eg.
rabbitmq-plugins [-n ] [-t ] [-l] [-q] is_enabled [plugin1] [plugin2]
參數
-n node
-q quiet
-h help
list 列出所有外掛
is_enabled [plugin1][,[plugin2]] 查看plugin是否啟用
enable [plugin] 啟用外掛
範例
啟用外掛
rabbitmq-plugins enable rabbitmq_management
管理工具
rabbitmqadmin