# 1.死信模式
在RabbitMQ中,队列支持下面几个属性.
x-message-ttl: 10000 表示队列中的消息只能存活10秒,变成死信无特殊配置时,消息到期将被丢弃.
若不希望死信直接丢弃,可以通过队列的其他属性进行配置.把死信转发给另一个交换机
x-dead-letter-exchange: myExchange带有此属性的队列,会把死信投递给此处设置的myExchange交换机x-dead-letter-routing-key: myKey,该属性设置了死信转发时使用的路由键.
composer require php-amqplib/php-amqplib -vvv
1
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
try{
//连接
$connection = new AMQPStreamConnection('xxx.com', 5672, 'admin', 123456,'/',true);
$channel = $connection->channel();
//普通交换机
$channel->exchange_declare('PHP-Direct-Exchange','direct');
//定义普通队列
$channel->queue_declare(
'PHP-Queue',
false,
true,
false,
false,
false,
new AMQPTable([
'x-message-ttl'=>15000,
'x-dead-letter-exchange'=>'PHP-Dead-Exchange',
'x-dead-letter-routing-key'=>'PHP-Dead-Key'
])
);
//绑定
$channel->queue_bind('PHP-Queue','PHP-Direct-Exchange','key');
//死信交换机
$channel->exchange_declare('PHP-Dead-Exchange','direct');
//死信队列
$channel->queue_declare('PHP-Dead-Queue');
//死信绑定
$channel->queue_bind('PHP-Dead-Queue','PHP-Dead-Exchange','PHP-Dead-Key');
//向普通队列发送消息
$message = new AMQPMessage('hello world');
$channel->basic_publish($message,'PHP-Direct-Exchange','key',true);
$channel->close();
$connection->close();
}catch (\Exception $exception){
echo "异常信息".$exception->getMessage();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# 2.插件模式
rabbitmq的3.6.*版本中可以使用一个插件rabbitmq-delayed-message-exchange (opens new window) 构建一个的延迟队列。
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
rabbitmq-plugins list
1
2
2
如果出现
# rabbitmq-plugins enable rabbitmq_delayed_message_exchange
Enabling plugins on node rabbit@1005ebc1bac8:
rabbitmq_delayed_message_exchange
Error:
{:plugins_not_found, [:rabbitmq_delayed_message_exchange]}
1
2
3
4
5
2
3
4
5
需要从仓库 https://github.com/rabbitmq/rabbitmq-delayed-message-exchange (opens new window) 下载对应的版本到插件目录下。
plugin在安装的时候会默认读几个目录,怎么查找这些目录:
% rabbitmq-plugins directories -s
将出输出目录:将上述下载的plugin放到Plugin archives directory下:
- Plugin archives directory:/usr/local/Cellar/rabbitmq/3.9.16/plugins
- Plugin expansion directory: /usr/local/var/lib/rabbitmq/mnesia/rabbit@localhost-plugins-expand
- Enabled plugins file: /usr/local/etc/rabbitmq/enabled_plugins
docker cp rabbitmq_delayed_message_exchange-3.9.0.ez rabbit:/plugins
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
1
2
2
安装成功如下:
# rabbitmq-plugins enable rabbitmq_delayed_message_exchange
Enabling plugins on node rabbit@1005ebc1bac8:
rabbitmq_delayed_message_exchange
The following plugins have been configured:
rabbitmq_delayed_message_exchange
rabbitmq_management
rabbitmq_management_agent
rabbitmq_prometheus
rabbitmq_web_dispatch
Applying plugin configuration to rabbit@1005ebc1bac8...
The following plugins have been enabled:
rabbitmq_delayed_message_exchange
started 1 plugins.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
2
3
4
5
6
7
8
9
10
11
12
13
14

# 3.简单使用
composer require php-amqplib/php-amqplib -vvv
1
<?php
declare(strict_types=1);
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
try {
//连接
$connection = new AMQPStreamConnection('xxx.com', 5672, 'admin', 123456,'/',true);
$channel = $connection->channel();
//普通交换机
$channel->exchange_declare('delay', 'x-delayed-message', false, true, false, false, false, ["x-delayed-type" => ["S", "topic"]]);
//定义普通队列
$channel->queue_declare(
'delay_queue',
false,
true,
false,
false,
);
//绑定
$channel->queue_bind('delay_queue', 'delay', 'delay_queue');
$message = new AMQPMessage('hello world' . time(), [
'application_headers' => new AMQPTable([
'x-delay' => 5000
])
]);
$channel->basic_publish($message, 'delay', 'delay_queue', true);
$channel->close();
$connection->close();
} catch (\Exception $exception) {
echo "异常信息" . $exception->getMessage();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40