我们很多项目类似与支付宝,微信支付这些支付成功后,阶梯式的异步通知服务端接口的情况,方案有很多,可以利用数据库扫描, Redis zset,RabbitMQ, RocketMQ,Pulsar等
本文中主要利用的 RabbitMQ
的延迟消息去实现。EasySwoole+Task
假设我们有 1s
,3s
,5s
,15s
,30s
这5种情况阶梯式的通知。
# 注册进程
EasySwooleEvent.php
<?php
namespace EasySwoole\EasySwoole;
use App\Process\ConsumerProcess;
use EasySwoole\EasySwoole\AbstractInterface\Event;
use EasySwoole\EasySwoole\Swoole\EventRegister;
class EasySwooleEvent implements Event
{
public static function initialize()
{
date_default_timezone_set('Asia/Shanghai');
}
public static function mainServerCreate(EventRegister $register)
{
self::process();
}
public static function process()
{
$arr = [1, 3, 5, 15, 30];
for ($i = 0; $i < count($arr); $i++) {
$processConfig = new \EasySwoole\Component\Process\Config([
'processName' => 'ConsumerProcess' . $arr[$i], // 设置 进程名称为 TickProcess
'processGroup' => 'Custom', // 设置 进程组名称为 Tick
'enableCoroutine' => true, // 设置 自定义进程自动开启协程
'arg' => [
'i' => $arr[$i]
],
]);
\EasySwoole\Component\Process\Manager::getInstance()->addProcess(new ConsumerProcess($processConfig));
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# 消费进程
ConsumerProcess.php
<?php
declare(strict_types=1);
namespace App\Process;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
use Swoole\Process;
class ConsumerProcess extends \EasySwoole\Component\Process\AbstractProcess
{
protected function run($arg)
{
try {
$args = $this->getArg();
$i = $args["i"];
//连接
$connection = new \PhpAmqpLib\Connection\AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest', '/', true);
$channel = $connection->channel();
//定义普通队列
$channel->queue_declare(
'delay_queue' . $i,
false,
true,
false,
false,
);
// callback回调
$callback = function ($msg) use ($channel, $i) {
file_put_contents(EASYSWOOLE_ROOT . "/Log/{$i}.txt", ' [' . $i . '] Received ' . $msg->body . ' ---> ' . date('Y-m-d H:i:s') . "\n", FILE_APPEND | LOCK_EX);
// TODO...
// 异步执行业务逻辑,开协程去做, 然后再去通知下一个
go(function () use ($channel, $i) {
// 模拟延迟
\co::sleep(rand(1, 5));
$arr = [1, 3, 5, 15, 30];
$count = count($arr);
$key = array_keys($arr, $i);
if (!empty($key) && $key[0] < $count - 1) {
$next = $key[0] + 1;
$v = $arr[$next];
$s = 1000 * $v;
$message = new AMQPMessage('[' . $v . '] hello world ' . date('Y-m-d H:i:s'), [
'application_headers' => new AMQPTable([
'x-delay' => $s
])
]);
$channel->basic_publish($message, 'delay', 'delay_queue' . $v, true);
}
});
};
// 消费
$channel->basic_consume('delay_queue' . $i, 'delay_queue' . $i, false, true, false, false, $callback);
// 监听,阻塞
while ($channel->is_consuming()) {
$channel->wait();
}
$channel->close();
$connection->close();
} catch (\Exception $exception) {
echo "异常信息" . $exception->getMessage();
}
}
protected function onPipeReadable(Process $process)
{
// 该回调可选
// 当主进程对子进程发送消息的时候 会触发
$recvMsgFromMain = $process->read(); // 用于获取主进程给当前进程发送的消息
var_dump('收到主进程发送的消息: ');
var_dump($recvMsgFromMain);
}
protected function onException(\Throwable $throwable, ...$args)
{
// 该回调可选
// 捕获 run 方法内抛出的异常
// 这里可以通过记录异常信息来帮助更加方便地知道出现问题的代码
}
protected function onShutDown()
{
// 该回调可选
// 进程意外退出 触发此回调
// 大部分用于清理工作
}
protected function onSigTerm()
{
// 当进程接收到 SIGTERM 信号触发该回调
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# 生产消息
<?php
declare(strict_types=1);
require_once __DIR__ . '/../vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
try {
//连接
$connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest', '/', true);
$channel = $connection->channel();
//普通交换机
$channel->exchange_declare('delay', 'x-delayed-message', false, true, false, false, false, ["x-delayed-type" => ["S", "topic"]]);
for ($i = 1; $i < 500; $i++) {
go(function () use ($channel, $i) {
$r = 1;
//定义普通队列
$channel->queue_declare(
'delay_queue' . $r,
false,
true,
false,
false,
);
//绑定
$channel->queue_bind('delay_queue' . $r, 'delay', 'delay_queue' . $r);
$s = 1000 * $r;
$message = new AMQPMessage('[' . $i . ' - ' . $r . '] hello world ' . date('Y-m-d H:i:s'), [
'application_headers' => new AMQPTable([
'x-delay' => $s
])
]);
$channel->basic_publish($message, 'delay', 'delay_queue' . $r, true);
});
}
$channel->close();
$connection->close();
} catch (\Exception $exception) {
echo "异常信息" . $exception->getMessage();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
上面的优化点还有很多,如 rabbitmq的多路复用(连接池)等。