RabbitMQ với brew và Hypertext Preprocessor
Tới công chiện rồi quí zị... Khách hàng nói là đã nhắn tin không đồng bộ cho cửa hàng, mong nhân viên pub/sub gấp gấp! Nghe xong là thấy nó lùng bùng cái hệ nơ-ron luôn á.
Quý zị muốn tìm hiểu và cài đặt RabbitMQ bước đầu thì xem hướng dẫn bên dưới.
1. Installation of RabbitMQ bằng brew:
brew update
brew install rabbitmq
// Xác định folder đã cài đặt:
brew info rabbitmq
Thêm RabbitMQ vào PATH để có thể chạy liền những command line chẳng hạn như rabbitmqctl
export PATH=$PATH:/usr/local/opt/rabbitmq/sbin
source ~/.bash_profile
- ở background:
brew services start rabbitmq
- hoặc foreground:
rabbitmq-server
- Enable tất cả cờ:
rabbitmqctl enable_feature_flag all
brew services stop rabbitmq
hoặc
rabbitmqctl shutdown
2. Triển khai trong Hypertext Preprocessor:
RabbitMQ có 3 biệt ngữ chính: queue, producer và consumer.
queue là nơi lưu trữ messages
producer là module gởi message lên queue
consumer là module chờ để nhận message
Trước tiên, require package này trong composer.json:
"php-amqplib/php-amqplib": "^3.2"
2.1. Tạo file [producer.php]
/* Include libraries */
require('vendor/autoload.php');
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
/* Tạo connection tới server */
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
/* Tạo channel và queue rồi gởi message lên */
$channel = $connection->channel();
$channel->queue_declare('test_queue', false, false, false, false);
$msg = new AMQPMessage('Hello World!');
$channel->basic_publish($msg, '', 'hello');
echo " [x] Sent 'Hello World!'\n";
/* Đóng channel và connection */
$channel->close();
$connection->close();
2.2. Tạo file [consumer.php]
/* Include libraries */
require('vendor/autoload.php');
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
/* Tạo connection tới server */
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
/* Tạo channel và chờ để nhận message */
$channel = $connection->channel();
$callback = function ($msg) {
echo ' [x] Received ', $msg->body, "\n";
};
$channel->basic_consume('test_queue', '', false, true, false, false, $callback);
while ($channel->is_consuming()) {
$channel->wait();
}
$channel->close();
$connection->close();
Sau khi đã có 2 file thì chạy thôi các bạn.. chờ chi.
// Chạy module nhận message
php consumer.php
// rồi chạy module gởi message
php producer.php
Vậy là các bạn đã thử sơ qua cái gọi là 'không đồng bộ' của rmq rồi. Giờ coi xem pub/sub là gì.
Pub/sub được dùng khi apps muốn gởi cùng một message tới nhiều người nhận cùng 1 lúc. Ví dụ như ghi log, notify số lượng hàng trong kho khi có một order mới...
Trong pub/sub, giữa publisher và queue có thêm một layer nữa là exchange. Exchange sẽ phân tán message lên nhiều queue và những queue này sẽ được đọc bởi nhiều subscribers.
Sau đây là cách dùng pub/sub để ghi log.
3.1. File phát sinh text [publish_text.php]
<?php
require('vendor/autoload.php');
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
// Tạo exchange thay vì queue
$channel->exchange_declare('logs', 'fanout', false, false, false);
$data = implode(' ', array_slice($argv, 1));
if (empty($data)) {
$data = "info: Hello World!";
}
$msg = new AMQPMessage($data);
$channel->basic_publish($msg, 'logs');
echo ' [x] Sent ', $data, "\n";
$channel->close();
$connection->close();
3.2. File nhận log [receive_logs.php]
<?php
require('vendor/autoload.php');
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->exchange_declare('logs', 'fanout', false, false, false);
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
$channel->queue_bind($queue_name, 'logs');
echo " [*] Waiting for logs. To exit press CTRL+C\n";
$callback = function (AMQPMessage $msg) {
echo ' [x] ', $msg->getBody(), "\n";
};
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
try {
$channel->consume();
} catch (\Throwable $exception) {
echo $exception->getMessage();
}
$channel->close();
$connection->close();
Bên dưới là cách chạy 2 scripts này.
- Nếu các bạn muốn ghi log vào file:
php receive_logs.php > logs_hello.log
- Nếu muốn xem log trên màn hình thì chỉ đơn giản:
Sau đó là chạy file phát sinh text
Để xem những exchange đã tạo, chạy câu lệnh:
sudo rabbitmqctl list_bindings
Okay quý zị, vậy là Ri đã dẫn quý zị bước những bước đầu tiên vào mảnh đất hấp dẫn của RabbitMQ. Chúc quý zị sẽ có những điều thú vị khi tìm hiểu sâu hơn về công nghệ này. Ri thank you và thân chào.