# 前置条件
- 安装php swoole
- 搭建rabbitmq
- composer require php-amqplib/php-amqplib
# 生产者
producer.php
<?php
/***
* 生产者
* User: gan
* Date: 2020/11/3
* Time: 10:10 下午
*/
require_once __DIR__ . '/../vendor/autoload.php';
$params = ["127.0.0.1",5672,"guest","guest"];
// 建立连接
$conn = new \PhpAmqpLib\Connection\AMQPStreamConnection(...$params);
// 创建通道
$channel = $conn->channel();
// 创建交换机
/**
* name:xxx 交换机名称
* type:direct 类型 fanut,direct,topic,headers
* passive:false 不存在自动创建,如果设置true的话,返回OK,否则失败
* durable:false 是否持久化
* auto_delete:false 自动删除,最后一个
*/
$exName = "rabbitmq_exchange";
$channel->exchange_declare($exName,AMQP_EX_TYPE_DIRECT,false,true,false);
// 创建队列
/**
* name:xxx 队列名称
* passive:false 不存在自动创建,如果设置true的话,返回OK,否则失败
* durable:false 是否持久化
* exclusive:false 是否排他,如果为true的话,只对当前连接有效,连接断开后自动删除
* auto_delete:false 自动删除,最后一个
*/
$queueName = "rabbitmq_exchange_queue";
$channel->queue_declare($queueName,false,true,false,false);
// 绑定
/**
* $queue 队列名称
* $exchange 交换机名称
* $routing_key 路由名称
*/
$routeKey = "rabbitmq_exchange_queue_route";
$channel->queue_bind($queueName,$exName,$routeKey);
for ($i=0;$i<10000;$i++) {
// push
$data = [
"requestTime"=>time(),
"requestId"=>uniqid(),
"name"=>"hello world".$i
];
$msg = new \PhpAmqpLib\Message\AMQPMessage(
json_encode($data),
["delivery_mode"=>\PhpAmqpLib\Message\AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]
);
$channel->basic_publish($msg,$exName,$routeKey);
}
$channel->close();
$conn->close();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# 消费者
consumer.php
<?php
/***
* 消费者
* User: gan
* Date: 2020/11/3
* Time: 10:10 下午
*/
require_once __DIR__ . '/../vendor/autoload.php';
$workerNum = 10;
$pool = new Swoole\Process\Pool($workerNum);
$pool->on("WorkerStart", function ($pool, $workerId) {
echo "Worker#{$workerId} is started\n";
try {
$params = ["127.0.0.1",5672,"guest","guest"];
// 建立连接
$conn = new \PhpAmqpLib\Connection\AMQPStreamConnection(...$params);
// 创建通道
$channel = $conn->channel();
// 创建交换机
/**
* name:xxx 交换机名称
* type:direct 类型 fanut,direct,topic,headers
* passive:false 不存在自动创建,如果设置true的话,返回OK,否则失败
* durable:false 是否持久化
* auto_delete:false 自动删除,最后一个
*/
$exName = "rabbitmq_exchange";
$channel->exchange_declare($exName,AMQP_EX_TYPE_DIRECT,false,true,false);
// 创建队列
/**
* name:xxx 队列名称
* passive:false 不存在自动创建,如果设置true的话,返回OK,否则失败
* durable:false 是否持久化
* exclusive:false 是否排他,如果为true的话,只对当前连接有效,连接断开后自动删除
* auto_delete:false 自动删除,最后一个
*/
$queueName = "rabbitmq_exchange_queue";
$channel->queue_declare($queueName,false,true,false,false);
// 绑定
/**
* $queue 队列名称
* $exchange 交换机名称
* $routing_key 路由名称
*/
$routeKey = "rabbitmq_exchange_queue_route";
$channel->queue_bind($queueName,$exName,$routeKey);
// 消费
/**
* $queue = '', 被消费队列名称
* $consumer_tag = '', 消费者客户端标识,用于区分客户端
* $no_local = false, 这个功能属于amqp的标准,但是rabbitmq未实现
* $no_ack = false, 收到消息后,是否要ack应答才算被消费
* $exclusive = false, 是否排他,即为这个队列只能由一个消费者消费,适用于任务不允许并发处理
* $nowait = false, 不返回直接结果,但是如果排他开启的话,则必须需要等待结果的,如果二个都开启会报错
* $callback = null, 回调函数处理逻辑
*/
// 回调
$callback = function ($msg) use ($workerId) {
var_dump($workerId."-----".$msg->body);
// 响应ack
$msg->delivery_info["channel"]->basic_ack($msg->delivery_info["delivery_tag"]);
};
$channel->basic_consume($queueName,"",false,false,false,false,$callback);
// 监听
while ($channel->is_consuming()) {
$channel->wait();
}
} catch (\Throwable $e) {
}
});
$pool->on("WorkerStop", function ($pool, $workerId) {
echo "Worker#{$workerId} is stopped\n";
});
$pool->start();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
← 安装 Swoole基础镜像 →