# 前置条件

  • 安装php swoole
  • 搭建rabbitmq
  • composer require php-amqplib/php-amqplib

# 生产者

producer.php

<?php
/***
 * 生产者
 * User: gan
 * Date: 2020/11/3
 * Time: 10:10 下午
 */
require_once __DIR__ . '/../vendor/autoload.php';

$params = ["127.0.0.1",5672,"guest","guest"];


// 建立连接
$conn = new \PhpAmqpLib\Connection\AMQPStreamConnection(...$params);

// 创建通道
$channel = $conn->channel();

// 创建交换机
/**
 * name:xxx             交换机名称
 * type:direct          类型 fanut,direct,topic,headers
 * passive:false        不存在自动创建,如果设置true的话,返回OK,否则失败
 * durable:false        是否持久化
 * auto_delete:false    自动删除,最后一个
 */
$exName = "rabbitmq_exchange";
$channel->exchange_declare($exName,AMQP_EX_TYPE_DIRECT,false,true,false);

// 创建队列
/**
 * name:xxx             队列名称
 * passive:false        不存在自动创建,如果设置true的话,返回OK,否则失败
 * durable:false        是否持久化
 * exclusive:false      是否排他,如果为true的话,只对当前连接有效,连接断开后自动删除
 * auto_delete:false    自动删除,最后一个
 */
$queueName = "rabbitmq_exchange_queue";
$channel->queue_declare($queueName,false,true,false,false);


// 绑定
/**
 * $queue           队列名称
 * $exchange        交换机名称
 * $routing_key     路由名称
 */
$routeKey = "rabbitmq_exchange_queue_route";
$channel->queue_bind($queueName,$exName,$routeKey);

for ($i=0;$i<10000;$i++) {

    // push
    $data = [
        "requestTime"=>time(),
        "requestId"=>uniqid(),
        "name"=>"hello world".$i
    ];

    $msg = new \PhpAmqpLib\Message\AMQPMessage(
        json_encode($data),
        ["delivery_mode"=>\PhpAmqpLib\Message\AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]
    );
    $channel->basic_publish($msg,$exName,$routeKey);
}

$channel->close();
$conn->close();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68

# 消费者

consumer.php

<?php
/***
 * 消费者
 * User: gan
 * Date: 2020/11/3
 * Time: 10:10 下午
 */

require_once __DIR__ . '/../vendor/autoload.php';

$workerNum = 10;
$pool = new Swoole\Process\Pool($workerNum);

$pool->on("WorkerStart", function ($pool, $workerId) {
    echo "Worker#{$workerId} is started\n";

    try {

        $params = ["127.0.0.1",5672,"guest","guest"];


	    // 建立连接
        $conn = new \PhpAmqpLib\Connection\AMQPStreamConnection(...$params);

        // 创建通道
        $channel = $conn->channel();

	    // 创建交换机

        /**
         * name:xxx             交换机名称
         * type:direct          类型 fanut,direct,topic,headers
         * passive:false        不存在自动创建,如果设置true的话,返回OK,否则失败
         * durable:false        是否持久化
         * auto_delete:false    自动删除,最后一个
         */
        $exName = "rabbitmq_exchange";
        $channel->exchange_declare($exName,AMQP_EX_TYPE_DIRECT,false,true,false);

        // 创建队列
        /**
         * name:xxx             队列名称
         * passive:false        不存在自动创建,如果设置true的话,返回OK,否则失败
         * durable:false        是否持久化
         * exclusive:false      是否排他,如果为true的话,只对当前连接有效,连接断开后自动删除
         * auto_delete:false    自动删除,最后一个
         */
        $queueName = "rabbitmq_exchange_queue";
        $channel->queue_declare($queueName,false,true,false,false);


	    // 绑定
        /**
         * $queue           队列名称
         * $exchange        交换机名称
         * $routing_key     路由名称
         */
        $routeKey = "rabbitmq_exchange_queue_route";
        $channel->queue_bind($queueName,$exName,$routeKey);

	    // 消费
        /**
         * $queue = '',         被消费队列名称
         * $consumer_tag = '',  消费者客户端标识,用于区分客户端
         * $no_local = false,   这个功能属于amqp的标准,但是rabbitmq未实现
         * $no_ack = false,     收到消息后,是否要ack应答才算被消费
         * $exclusive = false,  是否排他,即为这个队列只能由一个消费者消费,适用于任务不允许并发处理
         * $nowait = false,     不返回直接结果,但是如果排他开启的话,则必须需要等待结果的,如果二个都开启会报错
         * $callback = null,    回调函数处理逻辑
         */
         // 回调
        $callback = function ($msg) use ($workerId) {
            var_dump($workerId."-----".$msg->body);

            // 响应ack
            $msg->delivery_info["channel"]->basic_ack($msg->delivery_info["delivery_tag"]);
        };

        $channel->basic_consume($queueName,"",false,false,false,false,$callback);


	     // 监听
        while ($channel->is_consuming()) {
            $channel->wait();
        }

    } catch (\Throwable $e) {

    }
});

$pool->on("WorkerStop", function ($pool, $workerId) {
    echo "Worker#{$workerId} is stopped\n";
});

$pool->start();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96