# 1.死信模式

在RabbitMQ中,队列支持下面几个属性. x-message-ttl: 10000 表示队列中的消息只能存活10秒,变成死信无特殊配置时,消息到期将被丢弃. 若不希望死信直接丢弃,可以通过队列的其他属性进行配置.把死信转发给另一个交换机

  1. x-dead-letter-exchange: myExchange带有此属性的队列,会把死信投递给此处设置的myExchange交换机
  2. x-dead-letter-routing-key: myKey,该属性设置了死信转发时使用的路由键.
composer require php-amqplib/php-amqplib -vvv
1
<?php
  
require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

try{
    //连接
    $connection = new AMQPStreamConnection('xxx.com', 5672, 'admin', 123456,'/',true);
    $channel = $connection->channel();

    //普通交换机
    $channel->exchange_declare('PHP-Direct-Exchange','direct');
    //定义普通队列
    $channel->queue_declare(
        'PHP-Queue',
        false,
        true,
        false,
        false,
        false,
        new AMQPTable([
            'x-message-ttl'=>15000,
            'x-dead-letter-exchange'=>'PHP-Dead-Exchange',
            'x-dead-letter-routing-key'=>'PHP-Dead-Key'
        ])
    );
    //绑定
    $channel->queue_bind('PHP-Queue','PHP-Direct-Exchange','key');

    //死信交换机
    $channel->exchange_declare('PHP-Dead-Exchange','direct');
    //死信队列
    $channel->queue_declare('PHP-Dead-Queue');
    //死信绑定
    $channel->queue_bind('PHP-Dead-Queue','PHP-Dead-Exchange','PHP-Dead-Key');

    //向普通队列发送消息
    $message = new AMQPMessage('hello world');
    $channel->basic_publish($message,'PHP-Direct-Exchange','key',true);

    $channel->close();
    $connection->close();
    
}catch (\Exception $exception){
    echo "异常信息".$exception->getMessage();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49

# 2.插件模式

rabbitmq的3.6.*版本中可以使用一个插件rabbitmq-delayed-message-exchange (opens new window) 构建一个的延迟队列

rabbitmq-plugins enable rabbitmq_delayed_message_exchange
rabbitmq-plugins list
1
2

如果出现

# rabbitmq-plugins enable rabbitmq_delayed_message_exchange
Enabling plugins on node rabbit@1005ebc1bac8:
rabbitmq_delayed_message_exchange
Error:
{:plugins_not_found, [:rabbitmq_delayed_message_exchange]}
1
2
3
4
5

需要从仓库 https://github.com/rabbitmq/rabbitmq-delayed-message-exchange (opens new window) 下载对应的版本到插件目录下。

plugin在安装的时候会默认读几个目录,怎么查找这些目录:

% rabbitmq-plugins directories -s

将出输出目录:将上述下载的plugin放到Plugin archives directory下:

  • Plugin archives directory:/usr/local/Cellar/rabbitmq/3.9.16/plugins
  • Plugin expansion directory: /usr/local/var/lib/rabbitmq/mnesia/rabbit@localhost-plugins-expand
  • Enabled plugins file: /usr/local/etc/rabbitmq/enabled_plugins
docker cp rabbitmq_delayed_message_exchange-3.9.0.ez rabbit:/plugins
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
1
2

安装成功如下:

# rabbitmq-plugins enable rabbitmq_delayed_message_exchange
Enabling plugins on node rabbit@1005ebc1bac8:
rabbitmq_delayed_message_exchange
The following plugins have been configured:
  rabbitmq_delayed_message_exchange
  rabbitmq_management
  rabbitmq_management_agent
  rabbitmq_prometheus
  rabbitmq_web_dispatch
Applying plugin configuration to rabbit@1005ebc1bac8...
The following plugins have been enabled:
  rabbitmq_delayed_message_exchange

started 1 plugins.
1
2
3
4
5
6
7
8
9
10
11
12
13
14

image-20220609133458386

# 3.简单使用

composer require php-amqplib/php-amqplib -vvv
1
<?php
declare(strict_types=1);

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

try {
    //连接
    $connection = new AMQPStreamConnection('xxx.com', 5672, 'admin', 123456,'/',true);
    $channel    = $connection->channel();

    //普通交换机
    $channel->exchange_declare('delay', 'x-delayed-message', false, true, false, false, false, ["x-delayed-type" => ["S", "topic"]]);
    
    //定义普通队列
    $channel->queue_declare(
        'delay_queue',
        false,
        true,
        false,
        false,
    );
    //绑定
    $channel->queue_bind('delay_queue', 'delay', 'delay_queue');
    $message = new AMQPMessage('hello world' . time(), [
        'application_headers' => new AMQPTable([
            'x-delay' => 5000
        ])
    ]);

    $channel->basic_publish($message, 'delay', 'delay_queue', true);
    $channel->close();
    $connection->close();

} catch (\Exception $exception) {
    echo "异常信息" . $exception->getMessage();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40