# 1.安装 PHP 扩展

composer require php-amqplib/php-amqplib -vvv
1

# 2.hello world

send.php (opens new window)

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

// 连接,channel
$connection = new AMQPStreamConnection('172.50.0.100', 5677, 'guest', 'guest');
$channel = $connection->channel();

// 定义queue
$channel->queue_declare('hello', false, false, false, false);

// send msg
$msg = new AMQPMessage('Hello World!');
$channel->basic_publish($msg, '', 'hello');

echo " [x] Sent 'Hello World!'\n";

// chose
$channel->close();
$connection->close();
?>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

receive.php (opens new window)

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

// 连接,channel
$connection = new AMQPStreamConnection('172.50.0.100', 5677, 'guest', 'guest');
$channel = $connection->channel();

// 定义queue
$channel->queue_declare('hello', false, false, false, false);

echo " [*] Waiting for messages. To exit press CTRL+C\n";

// callback回调
$callback = function ($msg) {
    echo ' [x] Received ', $msg->body, "\n";
};

// 消费
$channel->basic_consume('hello', '', false, true, false, false, $callback);

// 监听,阻塞
while ($channel->is_consuming()) {
    $channel->wait();
}

// chose
$channel->close();
$connection->close();
?>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

运行 php send.php,php receive.php 即可。

列出队列(Listing queues)

如果你想查看 Rabbitmq 队列,并且想知道有多少消息存在其中,你(作为特权用户)可以使用 rabbitmqctl 工具:

rabbitmqctl list_queues
1

# 3.工作队列

图片.png

task.php

<?php

require_once __DIR__.'/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

try {

    //建立一个连接通道,声明一个可以发送消息的队列hello
    $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
    $channel = $connection->channel();
    $channel->queue_declare('task_queue', false, true, false, false);

    $data = "Hello World!".(10000*microtime(true));
    $msg = new AMQPMessage($data,['delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]);
    $channel->basic_publish($msg, '', 'task_queue');

    //发送完成后打印消息告诉发布消息的人:发送成功
    echo " [x] Sent ", $data, "\n";

} catch (Exception $e){
    var_dump($e->getMessage());
} finally {
    //关闭连接
    $channel->close();
    $connection->close();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

work.php

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

try {
    $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
    $channel = $connection->channel();

    $channel->queue_declare('hello', false, false, false, false);

    echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";

    $callback = function($msg){
        echo " [x] Received ", $msg->body, "\n";
        sleep(rand(1,5));
        echo " [x] Done", "\n";
        $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
    };

    #只有consumer已经处理并确认了上一条message时queue才分派新的message给它,公平调度
    $channel->basic_qos(null, 1, null);

    $channel->basic_consume('task_queue', '', false, false, false, false, $callback);

    while(count($channel->callbacks)) {
        $channel->wait();
    }
} catch (Exception $e){
    var_dump($e->getMessage());
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
rabbitmqctl list_queues #查看队列信息
rabbitmqctl list_queues name messages_ready messages_unacknowledged # 排查哪些未ack
rabbitmqctl list_exchanges #交换机列表
rabbitmqctl list_bindings #绑定列表
1
2
3
4

# 4.发布/订阅

图片.png

emit_log.php (opens new window)

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->exchange_declare('logs', 'fanout', false, false, false);

$data = implode(' ', array_slice($argv, 1));
if (empty($data)) {
    $data = "info: Hello World!";
}
$msg = new AMQPMessage($data);

$channel->basic_publish($msg, 'logs');

echo ' [x] Sent ', $data, "\n";

$channel->close();
$connection->close();
?>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

receive_logs.php (opens new window)

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->exchange_declare('logs', 'fanout', false, false, false);

list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

$channel->queue_bind($queue_name, 'logs');

echo " [*] Waiting for logs. To exit press CTRL+C\n";

$callback = function ($msg) {
    echo ' [x] ', $msg->body, "\n";
};

$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

while ($channel->is_consuming()) {
    $channel->wait();
}

$channel->close();
$connection->close();
?>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

# 5.路由

图片.png

emit_log_direct.php (opens new window)

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->exchange_declare('direct_logs', 'direct', false, false, false);

$severity = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'info';

$data = implode(' ', array_slice($argv, 2));
if (empty($data)) {
    $data = "Hello World!";
}

$msg = new AMQPMessage($data);

$channel->basic_publish($msg, 'direct_logs', $severity);

echo ' [x] Sent ', $severity, ':', $data, "\n";

$channel->close();
$connection->close();
?>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

receive_logs_direct.php (opens new window)

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->exchange_declare('direct_logs', 'direct', false, false, false);

list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

$severities = array_slice($argv, 1);
if (empty($severities)) {
    file_put_contents('php://stderr', "Usage: $argv[0] [info] [warning] [error]\n");
    exit(1);
}

foreach ($severities as $severity) {
    $channel->queue_bind($queue_name, 'direct_logs', $severity);
}

echo " [*] Waiting for logs. To exit press CTRL+C\n";

$callback = function ($msg) {
    echo ' [x] ', $msg->delivery_info['routing_key'], ':', $msg->body, "\n";
};

$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

while ($channel->is_consuming()) {
    $channel->wait();
}

$channel->close();
$connection->close();
?>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37

# 6.主题交换机

发送到主题交换机(topic exchange)的消息不可以携带随意什么样子的路由键(routing_key),它的路由键必须是一个由** . 分隔开的词语列表。这些单词随便是什么都可以,但是最好是跟携带它们的消息有关系的词汇。以下是几个推荐的例子:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit"。词语的个数可以随意,但是不要超过 255 字节。

绑定键也必须拥有同样的格式。主题交换机背后的逻辑跟直连交换机很相似 —— 一个携带着特定路由键的消息会被主题交换机投递给绑定键与之想匹配的队列。但是它的绑定键和路由键有两个特殊应用方式:

  • * (星号) 用来表示一个单词.
  • # (井号) 用来表示任意数量(零个或多个)单词。

下边用图说明:

图片.png

这个例子里,我们发送的所有消息都是用来描述小动物的。发送的消息所携带的路由键是由三个单词所组成的,这三个单词被两个 . 分割开。路由键里的第一个单词描述的是动物的速度,第二个单词是动物的颜色,第三个是动物的种类。所以它看起来是这样的: <celerity>.<colour>.<species>

我们创建了三个绑定:Q1 的绑定键为 *.orange.*,Q2 的绑定键为 *.*.rabbitlazy.#

这三个绑定键被可以总结为:

  • Q1 对所有的桔黄色动物都感兴趣。
  • Q2 则是对所有的兔子所有懒惰的动物感兴趣。

一个携带有 quick.orange.rabbit 的消息将会被分别投递给这两个队列。携带着 lazy.orange.elephant 的消息同样也会给两个队列都投递过去。另一方面携带有 quick.orange.fox 的消息会投递给第一个队列,携带有 lazy.brown.fox 的消息会投递给第二个队列。携带有 lazy.pink.rabbit 的消息只会被投递给第二个队列一次,即使它同时匹配第二个队列的两个绑定。携带着 quick.brown.fox 的消息不会投递给任何一个队列。

如果我们违反约定,发送了一个携带有一个单词或者四个单词(**"orange" or "quick.orange.male.rabbit")的消息时,发送的消息不会投递给任何一个队列,而且会丢失掉。

但是另一方面,即使 **"lazy.orange.male.rabbit" 有四个单词,他还是会匹配最后一个绑定,并且被投递到第二个队列中。

# 主题交换机

主题交换机是很强大的,它可以表现出跟其他交换机类似的行为

当一个队列的绑定键为 "#"(井号) 的时候,这个队列将会无视消息的路由键,接收所有的消息。

* (星号) 和 # (井号) 这两个特殊字符都未在绑定键中出现的时候,此时主题交换机就拥有的直连交换机的行为。

emit_log_topic.php (opens new window)

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->exchange_declare('topic_logs', 'topic', false, false, false);

$routing_key = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'anonymous.info';
$data = implode(' ', array_slice($argv, 2));
if(empty($data)) $data = "Hello World!";

$msg = new AMQPMessage($data);

$channel->basic_publish($msg, 'topic_logs', $routing_key);

echo " [x] Sent ",$routing_key,':',$data," \n";

$channel->close();
$connection->close();

?>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

receive_logs_topic.php (opens new window)

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->exchange_declare('topic_logs', 'topic', false, false, false);

list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

$binding_keys = array_slice($argv, 1);
if( empty($binding_keys )) {
    file_put_contents('php://stderr', "Usage: $argv[0] [binding_key]\n");
    exit(1);
}

foreach($binding_keys as $binding_key) {
    $channel->queue_bind($queue_name, 'topic_logs', $binding_key);
}

echo ' [*] Waiting for logs. To exit press CTRL+C', "\n";

$callback = function($msg){
  echo ' [x] ',$msg->delivery_info['routing_key'], ':', $msg->body, "\n";
};

$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

while(count($channel->callbacks)) {
    $channel->wait();
}

$channel->close();
$connection->close();

?>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

执行下边命令 接收所有日志: $ php receive_logs_topic.php "#"

执行下边命令 接收来自”kern“设备的日志: $ php receive_logs_topic.php "kern.*"

执行下边命令 只接收严重程度为”critical“的日志: $ php receive_logs_topic.php "*.critical"

执行下边命令 建立多个绑定: $ php receive_logs_topic.php "kern.*" "*.critical"

执行下边命令 发送路由键为 "kern.critical" 的日志: $ php emit_log_topic.php "kern.critical" "A critical kernel error"

执行上边命令试试看效果吧。另外,上边代码不会对路由键和绑定键做任何假设,所以你可以在命令中使用超过两个路由键参数。

# 7.远程调用 RPC

一般我们前端调用 mq 服务的时候,这个时候需要同步返回结果的话,就需要 rpc 了。

# 消息属性

AMQP 协议给消息预定义了一系列的 14 个属性。大多数属性很少会用到,除了以下几个:

  • delivery_mode(投递模式):将消息标记为持久的(值为 2)或暂存的(除了 2 之外的其他任何值)。第二篇教程里接触过这个属性,记得吧?
  • content_type(内容类型):用来描述编码的 mime-type。例如在实际使用中常常使用 application/json 来描述 JOSN 编码类型。
  • reply_to(回复目标):通常用来命名回调队列。
  • correlation_id(关联标识):用来将 RPC 的响应和请求关联起来。

图片.png

上图我们可以看出,push 的时候定义一个** reply_to 回调队列,correlation_id 通过关联 ID 关联起来。

我们的 RPC 如此工作:

  • 当客户端启动的时候,它创建一个匿名独享的回调队列。
  • 在 RPC 请求中,客户端发送带有两个属性的消息:一个是设置回调队列的 **reply_to 属性,另一个是设置唯一值的 correlation_id 属性。
  • 将请求发送到一个 **rpc_queue 队列中。
  • RPC 工作者(又名:服务器)等待请求发送到这个队列中来。当请求出现的时候,它执行他的工作并且将带有执行结果的消息发送给 reply_to 字段指定的队列。
  • 客户端等待回调队列里的数据。当有消息出现的时候,它会检查 correlation_id 属性。如果此属性的值与请求匹配,将它返回给应用。

rpc_client.php (opens new window)

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class FibonacciRpcClient {
    private $connection;
    private $channel;
    private $callback_queue;
    private $response;
    private $corr_id;

    public function __construct() {
        $this->connection = new AMQPStreamConnection(
            'localhost', 5672, 'guest', 'guest');
        $this->channel = $this->connection->channel();
        list($this->callback_queue, ,) = $this->channel->queue_declare(
            "", false, false, true, false);
        $this->channel->basic_consume(
            $this->callback_queue, '', false, false, false, false,
            array($this, 'on_response'));
    }
    public function on_response($rep) {
        if($rep->get('correlation_id') == $this->corr_id) {
            $this->response = $rep->body;
        }
    }

    public function call($n) {
        $this->response = null;
        $this->corr_id = uniqid();

        $msg = new AMQPMessage(
            (string) $n,
            array('correlation_id' => $this->corr_id,
                  'reply_to' => $this->callback_queue)
            );
        $this->channel->basic_publish($msg, '', 'rpc_queue');
        while(!$this->response) {
            $this->channel->wait();
        }
        return intval($this->response);
    }
};

$fibonacci_rpc = new FibonacciRpcClient();
$response = $fibonacci_rpc->call(30);
echo " [.] Got ", $response, "\n";

?>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51

rpc_server.php (opens new window)

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('rpc_queue', false, false, false, false);

function fib($n) {
    if ($n == 0)
        return 0;
    if ($n == 1)
        return 1;
    return fib($n-1) + fib($n-2);
}

echo " [x] Awaiting RPC requests\n";
$callback = function($req) {
    $n = intval($req->body);
    echo " [.] fib(", $n, ")\n";

    $msg = new AMQPMessage(
        (string) fib($n),
        array('correlation_id' => $req->get('correlation_id'))
        );

    $req->delivery_info['channel']->basic_publish(
        $msg, '', $req->get('reply_to'));
    $req->delivery_info['channel']->basic_ack(
        $req->delivery_info['delivery_tag']);
};

$channel->basic_qos(null, 1, null);
$channel->basic_consume('rpc_queue', '', false, false, false, false, $callback);

while(count($channel->callbacks)) {
    $channel->wait();
}

$channel->close();
$connection->close();

?>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46

服务器端代码相当简单:

  • 像往常一样,我们建立连接,声明队列
  • 我们为 basic_consume 声明了一个回调函数,这是 RPC 服务器端的核心。它执行实际的操作并且作出响应。
  • 或许我们希望能在服务器上多开几个线程。为了能将负载平均地分摊到多个服务器,我们需要在 $channel.basic_qos 中设置 prefetch_count。

# 8.事务

主要保证发布消息的时候,一定把消息给了 broker

channel.txSelect()
channel.txCommit()
channel.txRollback()
1
2
3

# 参考资料