Skip to content

RabbitMQ(2)

Published: at 05:08 PM

接續上一篇 RabbitMQ(1)

工作隊列 (Worker Queues)

RabbitMQ-WorkerQueue | repository worker

在 worker 之間分配任務(競爭消費者模式 - the competing consumers pattern)

Preparation 寓意 主要目的:

  • 避免立即執行密集資源的任務,要等待處理完成 或是
  • 工作隊列可以平行處理任務,並且可以透過增加 worker 來提升效能。

說明:

該範例一樣是接收與消費者的功能:

  • 任務發佈者(producer)傳入字串,並且該字串可增加 . 的數量來模擬任務處理時間。
  • 消費者(consumer/worker)會接收到發布者的任務,再以 . 的數量來模擬處理時間。

Round-robin dispatching (輪詢調度)

當推入的任務過多時,我們可以增加消費者的數量來平行處理任務。
(增加消費者來加快消化速度)

在範例中,使用 basic_qos 設定消化設定,比方說;預先取得消化大小、數量、是否共享;
如果未添加這段程式碼,他會一直推送任務給消費者。(此時如果有設定 no_ack 則會直接消化,不會等待消費者處理完畢)

在 RabbitMQ 中設定 basic_qos 時,a_global 參數決定了 prefetch count 的作用範圍:

  • a_global = true:prefetch count 限制套用於整個 channel,所有 consumer 共享同一個配額
  • a_global = false:prefetch count 限制套用於個別 consumer,每個 consumer 都有獨立的配額

舉例來說,如果設定 prefetch_count = 10:

  • a_global = true 時,無論有多少個 consumer,channel 中最多只會有 10 個未確認的訊息

  • a_global = false 時,每個 consumer 都可以有 10 個未確認的訊息,總數會隨 consumer 數量增加而增加

$channel->basic_qos(
    prefetch_size: 0, // 預先取得大小(0為不限制, 預設值)
    prefetch_count: 1, // 預先取得數量 (0為不限制, 預設值)
    a_global: false, // 全域,共享? 設定為共享時,不會因為不同消費者而有不同的預先取得數量 (也就是說,設定共享後,設定多少就是多少,不會因為多開 worker 而增加)
);

本身來說, queue 與 message 是可以設定持久化的,並且設定其持久化設定,可以透過 durable 來設定。

  • by queue_declare 設定持久化;隊列持久化
$channel->queue_declare(
    queue: 'task_queue', // queue 名稱
    passive: false, // 是否被動,被動時不會創建 queue
    durable: true, // 是否持久化
    exclusive: false, // 是否排他
    auto_delete: false, // 是否自動刪除
    // ...略
);
  • by message 設定持久化

$msg = new AMQPMessage(
    body: $msgStr,
    properties: ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
);

$channel->basic_publish(
    message: $message, // 訊息
    exchange: '', // 交換機
    routing_key: 'task_queue', // 路由鍵
    // ...略
);

Message acknowledgment (訊息確認)

當消費者處理完畢後,會回傳一個確認訊息給 RabbitMQ,告訴他已經處理完畢。
如果設定 no_acktrue 時,則會直接消化訊息,不會等待消費者處理完畢。
反之,如果設定 no_ackfalse 時,則會等待消費者處理完畢後,再進行消化。
下面程式碼就是屬於手動觸發 ack 進行回應

手動確認消息:

$callback = function (AMQPMessage $msg) use (&$output) {
    $output->writeln('<comment> [x] 得到訊息: ' . $msg->body . '</comment>');
    $sleepCount = substr_count($msg->body, '.');
    $output->writeln("<comment> [x] 模擬情境需要花時間, sleep ($sleepCount)</comment>");
    sleep($sleepCount);

    $output->writeln('<info> [x] 處理訊息: ' . $msg->body . '完成!</info>');
    $msg->ack(); // <------ 手動觸發 ack
};

Publish/Subscribe (發布/訂閱)

RabbitMQ-Pub/Sub | repository worker

一次發送多個訊息給多個消費者 在這個例子中,就有點像是 linux 中的 tee 指令(就是我一個 log 紀錄,我要可以查看且同時也要寫到文件裡面)

Preparation 寓意 主要目的:

  • pub/sub 屬於是一對多的模式,一個生產者可以發送多個訊息給多個消費者。很常有的情況是,一個訊息會被多個消費者消化。

說明:

在先前的所有範例中,我們都是直接針對消息直接推入 queue 中,而直接進行消化;但是事實上,
producer 並不會直接推送訊息到 queue 中,而是透過 exchange 來進行轉發。(就有點像是事件觸發,而非直接推送)
由於先前都沒有用到交換機Exchange,那這個範例中,我們將會使用到交換機(Exchange)來進行訊息的轉發。

Exchange (聲明一個交換機,並且指定交換機類型)

建立一個 exchange 並且設定其類型為 fanout,這樣所有的 queue 都會接收到訊息。

$channel->exchange_declare(
    exchange: self::EXCHANGE_NAME, // 交換機名稱
    type: 'fanout', // 交換機類型 allow: fanout|direct|topic|headers
    passive: false, // 被動,檢查Exchange是否存在, 不存在則報錯
    durable: false, // 耐用,是否持久化… 重啟後是否存在
    auto_delete: false // 自動刪除,當最後一個消費者取消訂閱時,Exchange是否自動刪除
);

建立完 exchange 後,我們應該要設定哪些隊列應該可以消化 exchange 推送的訊息。

Temporary queues (臨時隊列)

當然,我們可以建立一個臨時隊列,這樣當消費者取消訂閱時,該隊列會自動刪除

list($queueName, ,) = $channel->queue_declare(queue: ''); // 建立臨時隊列

此時, $queueName 就是我們建立的臨時隊列名稱… 通常名稱類似 amq.gen-mWKw9yFMi6F5XX4WtenB9g 這樣的格式名稱。

到目前為止,我已經有了 exchange 和 queue,接下來就是要將 queue 與 exchange 進行綁定。

Bindings (綁定)

透過下面的程式碼,我們可以將 queue 與 exchange 進行綁定。

$channel->queue_bind(
    queue: $queueName, // 隊列名稱
    exchange: self::EXCHANGE_NAME // 交換機名稱
);

此時,交換機的 typefanout,因此所有的 queue 都會接收到訊息。


Previous Post
RabbitMQ(3)
Next Post
Chrome DevTools Protocol