接續上一篇 RabbitMQ(1)
工作隊列 (Worker Queues)
RabbitMQ-WorkerQueue | repository
在 worker 之間分配任務(競爭消費者模式 - the competing consumers pattern)
Preparation 寓意 主要目的:
- 避免立即執行密集資源的任務,要等待處理完成 或是
- 工作隊列可以平行處理任務,並且可以透過增加 worker 來提升效能。
說明:
該範例一樣是接收與消費者的功能:
- 任務發佈者(producer)傳入字串,並且該字串可增加
.的數量來模擬任務處理時間。- 消費者(consumer/worker)會接收到發布者的任務,再以
.的數量來模擬處理時間。Round-robin dispatching (輪詢調度)
當推入的任務過多時,我們可以增加消費者的數量來平行處理任務。
(增加消費者來加快消化速度)在範例中,使用
basic_qos設定消化設定,比方說;預先取得消化大小、數量、是否共享;
如果未添加這段程式碼,他會一直推送任務給消費者。(此時如果有設定 no_ack 則會直接消化,不會等待消費者處理完畢)在 RabbitMQ 中設定 basic_qos 時,a_global 參數決定了 prefetch count 的作用範圍:
- a_global = true:prefetch count 限制套用於整個 channel,所有 consumer 共享同一個配額
- a_global = false:prefetch count 限制套用於個別 consumer,每個 consumer 都有獨立的配額
舉例來說,如果設定 prefetch_count = 10:
a_global = true 時,無論有多少個 consumer,channel 中最多只會有 10 個未確認的訊息
a_global = false 時,每個 consumer 都可以有 10 個未確認的訊息,總數會隨 consumer 數量增加而增加
$channel->basic_qos( prefetch_size: 0, // 預先取得大小(0為不限制, 預設值) prefetch_count: 1, // 預先取得數量 (0為不限制, 預設值) a_global: false, // 全域,共享? 設定為共享時,不會因為不同消費者而有不同的預先取得數量 (也就是說,設定共享後,設定多少就是多少,不會因為多開 worker 而增加) );本身來說, queue 與 message 是可以設定持久化的,並且設定其持久化設定,可以透過
durable來設定。
- by queue_declare 設定持久化;隊列持久化
$channel->queue_declare( queue: 'task_queue', // queue 名稱 passive: false, // 是否被動,被動時不會創建 queue durable: true, // 是否持久化 exclusive: false, // 是否排他 auto_delete: false, // 是否自動刪除 // ...略 );
- by message 設定持久化
$msg = new AMQPMessage( body: $msgStr, properties: ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT] ); $channel->basic_publish( message: $message, // 訊息 exchange: '', // 交換機 routing_key: 'task_queue', // 路由鍵 // ...略 );
Message acknowledgment (訊息確認)
當消費者處理完畢後,會回傳一個確認訊息給 RabbitMQ,告訴他已經處理完畢。
如果設定no_ack為true時,則會直接消化訊息,不會等待消費者處理完畢。
反之,如果設定no_ack為false時,則會等待消費者處理完畢後,再進行消化。
下面程式碼就是屬於手動觸發 ack 進行回應手動確認消息:
$callback = function (AMQPMessage $msg) use (&$output) { $output->writeln('<comment> [x] 得到訊息: ' . $msg->body . '</comment>'); $sleepCount = substr_count($msg->body, '.'); $output->writeln("<comment> [x] 模擬情境需要花時間, sleep ($sleepCount)</comment>"); sleep($sleepCount); $output->writeln('<info> [x] 處理訊息: ' . $msg->body . '完成!</info>'); $msg->ack(); // <------ 手動觸發 ack };
Publish/Subscribe (發布/訂閱)
一次發送多個訊息給多個消費者 在這個例子中,就有點像是 linux 中的 tee 指令(就是我一個 log 紀錄,我要可以查看且同時也要寫到文件裡面)
Preparation 寓意 主要目的:
- pub/sub 屬於是一對多的模式,一個生產者可以發送多個訊息給多個消費者。很常有的情況是,一個訊息會被多個消費者消化。
說明:
在先前的所有範例中,我們都是直接針對消息直接推入 queue 中,而直接進行消化;但是事實上,
producer 並不會直接推送訊息到 queue 中,而是透過 exchange 來進行轉發。(就有點像是事件觸發,而非直接推送)
由於先前都沒有用到交換機Exchange,那這個範例中,我們將會使用到交換機(Exchange)來進行訊息的轉發。Exchange (聲明一個交換機,並且指定交換機類型)
建立一個 exchange 並且設定其類型為
fanout,這樣所有的 queue 都會接收到訊息。$channel->exchange_declare( exchange: self::EXCHANGE_NAME, // 交換機名稱 type: 'fanout', // 交換機類型 allow: fanout|direct|topic|headers passive: false, // 被動,檢查Exchange是否存在, 不存在則報錯 durable: false, // 耐用,是否持久化… 重啟後是否存在 auto_delete: false // 自動刪除,當最後一個消費者取消訂閱時,Exchange是否自動刪除 );建立完 exchange 後,我們應該要設定哪些隊列應該可以消化 exchange 推送的訊息。
Temporary queues (臨時隊列)
當然,我們可以建立一個臨時隊列,這樣當消費者取消訂閱時,該隊列會自動刪除。
list($queueName, ,) = $channel->queue_declare(queue: ''); // 建立臨時隊列此時,
$queueName就是我們建立的臨時隊列名稱… 通常名稱類似amq.gen-mWKw9yFMi6F5XX4WtenB9g這樣的格式名稱。到目前為止,我已經有了 exchange 和 queue,接下來就是要將 queue 與 exchange 進行綁定。
Bindings (綁定)
透過下面的程式碼,我們可以將 queue 與 exchange 進行綁定。
$channel->queue_bind( queue: $queueName, // 隊列名稱 exchange: self::EXCHANGE_NAME // 交換機名稱 );此時,交換機的
type為fanout,因此所有的 queue 都會接收到訊息。