2019年2月13日 星期三

RabbitMQ

基本概念

官方網頁

RabbitMQ就像郵局,唯一不同的是它送數位資料,郵局送信。
Queue就是郵筒,存在郵局裡(RabbitMQ),只受限於ram跟disk。

message只存在queue裡。
多個producers可以送message給同一個queue。
多個consumer可以從同一個queue收message。
基本上每個message被收走就沒有了。
  • RabbitMQ=郵局
  • Queue=郵筒(存在於郵局RabbitMQ)
  • Message=信
  • Exchange=在producer這邊,他是收信的工具、在consumer這邊,他是送信的工具(存在於郵局RabbitMQ)
  • Producer=寄信者
  • Consumer=收信者

連線架構

TCP connection -> connection -> channel

Message 送出後,consumer會回傳ack(acknowledgement),
確認message正確收取,否則會re-queue此message。

訊息模型

Producer -> exchange -> Queue -> exchange -> Consumer
                        Queue -> exchange -> Consumer

以下為完成傳遞必要項目,缺一不可

  • 1 由Producer產生Message
  • 2 Message送到左邊的Exchange
  • 3 左邊的Exchange把Message送到綁定的Queue
  • 4 右邊的Exchange從Queue取出Message
  • 5 右邊的Exchange把Message送給Consumer
Producer及Consumer都可以宣告Exchange及Queue
若是沒宣告就是使用預設的,或是RabbitMQ會自行建立
宣告同名的Exchange或是Queue,就表示用現有的,RabbitMQ不會新建立一個
宣告同名的Exchange或是Queue,參數必須完全相同,否則報錯

基本模型(Producer/Consumer)
Producer -> Queue -> Consumer
Producer/Consumer是位於app,Queue是位於RabbitMQ
多個consumer模型(Worker Queues)
1 massage平均分配給多個consumer
2 等consumer回覆之後,才送新的message下去
Exchange模型(Publish/Subscribe)
所有要收的consumer,自行綁定上該exchange

一 Produce/Consumer(基本模型)

官方網頁

二 Work Queues(message acknowledgement、message durable)

官方網頁
  • Round-robin dispatching 輪詢式派送
    輪流派送給每個註冊的worker

  • Massage acknowledgement 訊息確認
    msgs, err := ch.Consume(
      q.Name, // queue
      "",     // consumer
      false,  // auto-ack
      false,  // exclusive
      false,  // no-local
      false,  // no-wait
      nil,    // args
    )
    
    //幾本上auto-ack都要設成true,不然就要自己送
    //否則rabbitmq 會保留此message,造成資源消耗
    
    go func() {
      for d := range msgs {
        log.Printf("Received a message: %s", d.Body)
        dot_count := bytes.Count(d.Body, []byte("."))
        t := time.Duration(dot_count)
        time.Sleep(t * time.Second)
        log.Printf("Done")
        d.Ack(false)
      }
    }()
    

  • Massage durability 訊息耐用(即便rabbitmq重開,未送出的message也不會消失)
    q, err := ch.QueueDeclare(
      "hello",      // name
      true,         // durable
      false,        // delete when unused
      false,        // exclusive
      false,        // no-wait
      nil,          // arguments
    )
    //producer、consumer宣告的queue durable要設成true
    //宣告同名的queue參數必須相同,否則錯誤
    
    err = ch.Publish(
      "",           // exchange
      q.Name,       // routing key
      false,        // mandatory
      false,
      amqp.Publishing {
        DeliveryMode: amqp.Persistent,
        ContentType:  "text/plain",
        Body:         []byte(body),
    })
    //publishing DeliveryMode 要設成 amqp.Persistent
    
    

  • 完成一個work才繼續派送(Fair dispatch 一節)
    基本上RabbitMQ的派送是平均派送,當他收到就派送,
    所以有可能派送給還沒完成工作的worker。
    可設定prefetch count=1
    這時rabbitMQ會收到前一個message的acknowledgement才會派送下一個。
    
    注意,此時queue有可能被填滿,你需要多設定幾個woker。
    
    err = ch.Qos(
      1,     // prefetch count
      0,     // prefetch size
      false, // global
    )
    

三 Publish/Subscribe(one message to multiple consumers)

官方網頁

Published message are going to be broadcast to all the receivers.
  • Exchanges
    4 types of exchange:direct,topic,headers,fanout
    
    err = ch.ExchangeDeclare(
      "logs",   // name
      "fanout", // type
      true,     // durable
      false,    // auto-deleted
      false,    // internal
      false,    // no-wait
      nil,      // arguments
    )
    
    //fanout:broadcast all messages to all the queues it knows
    
    

  • Bindings(Binding exchange and queue)
    err = ch.QueueBind(
      q.Name, // queue name
      "",     // routing key
      "logs", // exchange
      false,
      nil
    )
    

四 Routing(用路由方式傳送message給不同Queue)

官方網頁

Subscribe only to a subset of the messages.
  • 基本概念
    exchange publish時,可以加上路由(routing key)
    queue可以綁定exchange及routing key
    一個queue可以綁定一個exchange及多個routing key
    這樣就可以一個exchange 用多種routing發布,然後consumer利用不同routing接收
    

  • Bindings
    //Binding 是綁定exchange 及 queue
    //Binding時可以設定路由,用routing key parameter
    
    err = ch.QueueBind(
      q.Name,    // queue name
      "black",   // routing key
      "logs",    // exchange
      false,
      nil)
    
    //route to black
    
    

  • Direct exchange
    //用direct exchange才能使用binding的routing key
    //用fanout exchange會忽略routing key參數,合理,所以他才叫fanout
    
    err = ch.ExchangeDeclare(
                    "logs_direct", // name
                    "direct",      // type
                    true,          // durable
                    false,         // auto-deleted
                    false,         // internal
                    false,         // no-wait
                    nil,           // arguments
    )
    
     q, err := ch.QueueDeclare(
                    "",    // name
                    false, // durable
                    false, // delete when usused
                    true,  // exclusive
                    false, // no-wait
                    nil,   // arguments
     )
            
    err = ch.QueueBind(
      q.Name, // queue name
      "",     // routing key
      "logs", // exchange
      false,
      nil
    )
    

五 Publish/Subscribe(one message to multiple consumers)

官方網頁

Published message are going to be broadcast to all the receivers.
  • Exchanges
    4 types of exchange:direct,topic,headers,fanout
    
    err = ch.ExchangeDeclare(
      "logs",   // name
      "fanout", // type
      true,     // durable
      false,    // auto-deleted
      false,    // internal
      false,    // no-wait
      nil,      // arguments
    )
    
    //fanout:broadcast all messages to all the queues it knows
    
    

  • Bindings(Binding exchange and queue)
    err = ch.QueueBind(
      q.Name, // queue name
      "",     // routing key
      "logs", // exchange
      false,
      nil
    )
    
  • 
    

  • 
    

正式產品需要注意的主題

官方說明文件

  • Connection Management

  • Error Handling

  • Connection Recovery

  • Concurrency

  • Metric Collection

參考頁面
Publisher Confirms and Consumer Acknowledgements
Production Checklist
Mornitoring

linux安裝

主目錄 /opt/rabbitmq
看相關變數 printenv | grep rabbitmq
執行檔 /opt/rabbitmq/sbin
執行檔 /usr/local/bin

服務啟動
systemctl start rabbitmq-server


mac home brew 安裝

官網說明

相關的scripts及cli tools安裝在/usr/local/opt/rabbitmq/sbin,
需要自己加到path
export PATH=$PATH:/usr/local/opt/rabbitmq/sbin

服務啟動
brew services start rabbitmq
brew services stoprabbitmq
啟動在前景
/usr/local/Cellar/rabbitmq/3.7.11/sbin/rabbitmq-server

管理 rabbitmqctl

用這個程式管理rabbitmq大部分項目,像是增加使用者、列出使用者、重啟、列出參數等。

範例



增加使用者 add_user [userName][password]
rabbitmqctl add_user admin admin
更改使用者tag,改成管理者 set_user_tags [...]
rabbitmqctl set_user_tags admin administrator
更改使用者權限 set_permissions [name]
rabbitmqctl set_permissions -p / username ".*" ".*" ".*"
更改密碼 change_password [username] [password]
rabbitmqctl change_password admin admin
查詢未確認的message(unacknowledged) list_queues [queueName] messages_ready messages_unacknowledged
sudo rabbitmqctl list_queues queueName messages_ready messages_unacknowledged

監控

監控程式啟動在 http://localhost:15672 使用瀏覽器就可以開啟,要先建立一個user
rabbitmq-plugins enable rabbitmq-management

外掛管理

rabbitmq-plugins [-n [node]] [-t [timeout]] [-l] [-q] [command] [command options]

Eg.
rabbitmq-plugins [-n ] [-t ] [-l] [-q] is_enabled [plugin1] [plugin2] 

參數

-n node

-q quiet

-h  help

list 列出所有外掛

is_enabled [plugin1][,[plugin2]]   查看plugin是否啟用

enable [plugin]  啟用外掛

範例


啟用外掛
rabbitmq-plugins enable rabbitmq_management

管理工具

rabbitmqadmin

沒有留言:

張貼留言