我们很多项目类似与支付宝,微信支付这些支付成功后,阶梯式的异步通知服务端接口的情况,方案有很多,可以利用数据库扫描, Redis zset,RabbitMQ, RocketMQ,Pulsar等

本文中主要利用的 RabbitMQ 的延迟消息去实现。EasySwoole+Task

假设我们有 1s,3s,5s,15s,30s 这5种情况阶梯式的通知。

# 注册进程

EasySwooleEvent.php


<?php

namespace EasySwoole\EasySwoole;


use App\Process\ConsumerProcess;
use EasySwoole\EasySwoole\AbstractInterface\Event;
use EasySwoole\EasySwoole\Swoole\EventRegister;

class EasySwooleEvent implements Event
{
    public static function initialize()
    {
        date_default_timezone_set('Asia/Shanghai');
    }

    public static function mainServerCreate(EventRegister $register)
    {
        self::process();
    }

    public static function process()
	{
		$arr = [1, 3, 5, 15, 30];
		for ($i = 0; $i < count($arr); $i++) {
			$processConfig = new \EasySwoole\Component\Process\Config([
				'processName'     => 'ConsumerProcess' . $arr[$i], // 设置 进程名称为 TickProcess
				'processGroup'    => 'Custom',                     // 设置 进程组名称为 Tick
				'enableCoroutine' => true,                         // 设置 自定义进程自动开启协程
				'arg'             => [
					'i' => $arr[$i]
				],
			]);
			\EasySwoole\Component\Process\Manager::getInstance()->addProcess(new ConsumerProcess($processConfig));
		}
	}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

# 消费进程

ConsumerProcess.php

<?php

declare(strict_types=1);

namespace App\Process;

use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
use Swoole\Process;

class ConsumerProcess extends \EasySwoole\Component\Process\AbstractProcess
{
    protected function run($arg)
    {
        try {

            $args = $this->getArg();
            $i    = $args["i"];

            //连接
            $connection = new \PhpAmqpLib\Connection\AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest', '/', true);
            $channel    = $connection->channel();

            //定义普通队列
            $channel->queue_declare(
                'delay_queue' . $i,
                false,
                true,
                false,
                false,
            );

            // callback回调
            $callback = function ($msg) use ($channel, $i) {

                file_put_contents(EASYSWOOLE_ROOT . "/Log/{$i}.txt", ' [' . $i . '] Received ' . $msg->body . ' ---> ' . date('Y-m-d H:i:s') . "\n", FILE_APPEND | LOCK_EX);

                // TODO...
                // 异步执行业务逻辑,开协程去做, 然后再去通知下一个
                go(function () use ($channel, $i) {
                    // 模拟延迟
					\co::sleep(rand(1, 5));
					
                    $arr   = [1, 3, 5, 15, 30];
                    $count = count($arr);
                    $key   = array_keys($arr, $i);
                    if (!empty($key) && $key[0] < $count - 1) {
                        $next    = $key[0] + 1;
                        $v       = $arr[$next];
                        $s       = 1000 * $v;
                        $message = new AMQPMessage('[' . $v . '] hello world ' . date('Y-m-d H:i:s'), [
                            'application_headers' => new AMQPTable([
                                'x-delay' => $s
                            ])
                        ]);
                        $channel->basic_publish($message, 'delay', 'delay_queue' . $v, true);
                    }
                });
            };

            // 消费
            $channel->basic_consume('delay_queue' . $i, 'delay_queue' . $i, false, true, false, false, $callback);

            // 监听,阻塞
            while ($channel->is_consuming()) {
                $channel->wait();
            }

            $channel->close();
            $connection->close();
        } catch (\Exception $exception) {
            echo "异常信息" . $exception->getMessage();
        }
    }


    protected function onPipeReadable(Process $process)
    {
        // 该回调可选
        // 当主进程对子进程发送消息的时候 会触发
        $recvMsgFromMain = $process->read(); // 用于获取主进程给当前进程发送的消息
        var_dump('收到主进程发送的消息: ');
        var_dump($recvMsgFromMain);
    }

    protected function onException(\Throwable $throwable, ...$args)
    {
        // 该回调可选
        // 捕获 run 方法内抛出的异常
        // 这里可以通过记录异常信息来帮助更加方便地知道出现问题的代码
    }

    protected function onShutDown()
    {
        // 该回调可选
        // 进程意外退出 触发此回调
        // 大部分用于清理工作
    }

    protected function onSigTerm()
    {
        // 当进程接收到 SIGTERM 信号触发该回调
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104

# 生产消息

<?php

declare(strict_types=1);
require_once __DIR__ . '/../vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

try {
    //连接
    $connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest', '/', true);
    $channel    = $connection->channel();

    //普通交换机
    $channel->exchange_declare('delay', 'x-delayed-message', false, true, false, false, false, ["x-delayed-type" => ["S", "topic"]]);

    for ($i = 1; $i < 500; $i++) {
        go(function () use ($channel, $i) {
			
            $r    = 1;

            //定义普通队列
            $channel->queue_declare(
                'delay_queue' . $r,
                false,
                true,
                false,
                false,
            );

            //绑定
            $channel->queue_bind('delay_queue' . $r, 'delay', 'delay_queue' . $r);

            $s       = 1000 * $r;
            $message = new AMQPMessage('[' . $i . ' - ' . $r . '] hello world ' . date('Y-m-d H:i:s'), [
                'application_headers' => new AMQPTable([
                    'x-delay' => $s
                ])
            ]);
            $channel->basic_publish($message, 'delay', 'delay_queue' . $r, true);
        });
    }
    $channel->close();
    $connection->close();
} catch (\Exception $exception) {
    echo "异常信息" . $exception->getMessage();
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49

上面的优化点还有很多,如 rabbitmq的多路复用(连接池)等。