基本概念
官方網頁RabbitMQ就像郵局,唯一不同的是它送數位資料,郵局送信。
Queue就是郵筒,存在郵局裡(RabbitMQ),只受限於ram跟disk。
message只存在queue裡。
多個producers可以送message給同一個queue。
多個consumer可以從同一個queue收message。
基本上每個message被收走就沒有了。
- RabbitMQ=郵局
- Queue=郵筒(存在於郵局RabbitMQ)
- Message=信
- Exchange=在producer這邊,他是收信的工具、在consumer這邊,他是送信的工具(存在於郵局RabbitMQ)
- Producer=寄信者
- Consumer=收信者
連線架構
TCP connection -> connection -> channel
Message 送出後,consumer會回傳ack(acknowledgement),
確認message正確收取,否則會re-queue此message。
訊息模型
Producer -> exchange -> Queue -> exchange -> Consumer Queue -> exchange -> Consumer
以下為完成傳遞必要項目,缺一不可
- 1 由Producer產生Message
- 2 Message送到左邊的Exchange
- 3 左邊的Exchange把Message送到綁定的Queue
- 4 右邊的Exchange從Queue取出Message
- 5 右邊的Exchange把Message送給Consumer
Producer及Consumer都可以宣告Exchange及Queue
若是沒宣告就是使用預設的,或是RabbitMQ會自行建立
宣告同名的Exchange或是Queue,就表示用現有的,RabbitMQ不會新建立一個
宣告同名的Exchange或是Queue,參數必須完全相同,否則報錯
基本模型(Producer/Consumer)
Producer -> Queue -> Consumer
Producer/Consumer是位於app,Queue是位於RabbitMQ
多個consumer模型(Worker Queues)
1 massage平均分配給多個consumer
2 等consumer回覆之後,才送新的message下去
Exchange模型(Publish/Subscribe)
所有要收的consumer,自行綁定上該exchange
一 Produce/Consumer(基本模型)
官方網頁二 Work Queues(message acknowledgement、message durable)
官方網頁- Round-robin dispatching 輪詢式派送
輪流派送給每個註冊的worker
- Massage acknowledgement 訊息確認
msgs, err := ch.Consume( q.Name, // queue "", // consumer false, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) //幾本上auto-ack都要設成true,不然就要自己送 //否則rabbitmq 會保留此message,造成資源消耗 go func() { for d := range msgs { log.Printf("Received a message: %s", d.Body) dot_count := bytes.Count(d.Body, []byte(".")) t := time.Duration(dot_count) time.Sleep(t * time.Second) log.Printf("Done") d.Ack(false) } }()
- Massage durability 訊息耐用(即便rabbitmq重開,未送出的message也不會消失)
q, err := ch.QueueDeclare( "hello", // name true, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) //producer、consumer宣告的queue durable要設成true //宣告同名的queue參數必須相同,否則錯誤 err = ch.Publish( "", // exchange q.Name, // routing key false, // mandatory false, amqp.Publishing { DeliveryMode: amqp.Persistent, ContentType: "text/plain", Body: []byte(body), }) //publishing DeliveryMode 要設成 amqp.Persistent
- 完成一個work才繼續派送(Fair dispatch 一節)
基本上RabbitMQ的派送是平均派送,當他收到就派送, 所以有可能派送給還沒完成工作的worker。 可設定prefetch count=1 這時rabbitMQ會收到前一個message的acknowledgement才會派送下一個。 注意,此時queue有可能被填滿,你需要多設定幾個woker。 err = ch.Qos( 1, // prefetch count 0, // prefetch size false, // global )
三 Publish/Subscribe(one message to multiple consumers)
官方網頁Published message are going to be broadcast to all the receivers.
- Exchanges
4 types of exchange:direct,topic,headers,fanout err = ch.ExchangeDeclare( "logs", // name "fanout", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) //fanout:broadcast all messages to all the queues it knows
- Bindings(Binding exchange and queue)
err = ch.QueueBind( q.Name, // queue name "", // routing key "logs", // exchange false, nil )
四 Routing(用路由方式傳送message給不同Queue)
官方網頁Subscribe only to a subset of the messages.
- 基本概念
exchange publish時,可以加上路由(routing key) queue可以綁定exchange及routing key 一個queue可以綁定一個exchange及多個routing key 這樣就可以一個exchange 用多種routing發布,然後consumer利用不同routing接收
- Bindings
//Binding 是綁定exchange 及 queue //Binding時可以設定路由,用routing key parameter err = ch.QueueBind( q.Name, // queue name "black", // routing key "logs", // exchange false, nil) //route to black
- Direct exchange
//用direct exchange才能使用binding的routing key //用fanout exchange會忽略routing key參數,合理,所以他才叫fanout err = ch.ExchangeDeclare( "logs_direct", // name "direct", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) q, err := ch.QueueDeclare( "", // name false, // durable false, // delete when usused true, // exclusive false, // no-wait nil, // arguments ) err = ch.QueueBind( q.Name, // queue name "", // routing key "logs", // exchange false, nil )
五 Publish/Subscribe(one message to multiple consumers)
官方網頁Published message are going to be broadcast to all the receivers.
- Exchanges
4 types of exchange:direct,topic,headers,fanout err = ch.ExchangeDeclare( "logs", // name "fanout", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) //fanout:broadcast all messages to all the queues it knows
- Bindings(Binding exchange and queue)
err = ch.QueueBind( q.Name, // queue name "", // routing key "logs", // exchange false, nil )
正式產品需要注意的主題
官方說明文件- Connection Management
- Error Handling
- Connection Recovery
- Concurrency
- Metric Collection
Publisher Confirms and Consumer Acknowledgements
Production Checklist
Mornitoring
linux安裝
主目錄 /opt/rabbitmq看相關變數 printenv | grep rabbitmq
執行檔 /opt/rabbitmq/sbin
執行檔 /usr/local/bin
服務啟動
systemctl start rabbitmq-server
mac home brew 安裝
官網說明
相關的scripts及cli tools安裝在/usr/local/opt/rabbitmq/sbin,
需要自己加到path
export PATH=$PATH:/usr/local/opt/rabbitmq/sbin
brew services start rabbitmq brew services stoprabbitmq啟動在前景
/usr/local/Cellar/rabbitmq/3.7.11/sbin/rabbitmq-server
管理 rabbitmqctl
用這個程式管理rabbitmq大部分項目,像是增加使用者、列出使用者、重啟、列出參數等。範例
增加使用者 add_user [userName][password]
rabbitmqctl add_user admin admin更改使用者tag,改成管理者 set_user_tags
rabbitmqctl set_user_tags admin administrator更改使用者權限 set_permissions [name]
rabbitmqctl set_permissions -p / username ".*" ".*" ".*"更改密碼 change_password [username] [password]
rabbitmqctl change_password admin admin查詢未確認的message(unacknowledged) list_queues [queueName] messages_ready messages_unacknowledged
sudo rabbitmqctl list_queues queueName messages_ready messages_unacknowledged
監控
監控程式啟動在 http://localhost:15672 使用瀏覽器就可以開啟,要先建立一個userrabbitmq-plugins enable rabbitmq-management
外掛管理
rabbitmq-plugins [-n [node]] [-t [timeout]] [-l] [-q] [command] [command options] Eg. rabbitmq-plugins [-n] [-t ] [-l] [-q] is_enabled [plugin1] [plugin2]
參數
-n node -q quiet -h help list 列出所有外掛 is_enabled [plugin1][,[plugin2]] 查看plugin是否啟用 enable [plugin] 啟用外掛
範例
啟用外掛
rabbitmq-plugins enable rabbitmq_management
沒有留言:
張貼留言