基于swoole的协程redis+redis zset实现延迟队列,当消费成功实现 1s,3s,5s,10s,30s 阶梯性异步通知,一旦通知返回 SUCCESS
的话,后续不通知。不然就一直通知下去,直到5次结束。
# RedisZsetNotify.php
<?php
declare(strict_types=1);
namespace App\Utils;
class RedisZsetNotify
{
protected \Swoole\Coroutine\Redis $_server;
protected string $_key = 'apiNotify';
public function __construct()
{
$this->_server = $this->connect();
}
public function connect(){
$server = new \Swoole\Coroutine\Redis();
$server->connect("127.0.0.1", 6379);
$server->auth("7GNR8pI5LOlh");
$server->setOptions(['compatibility_mode' => true]);
$server->select(15);
return $server;
}
public function zAddData($score, $value)
{
$this->_server->zAdd($this->_key, $score, $value);
}
/**
* 开始执行队列
*/
public function run()
{
$list = $this->_server->zRangeByScore($this->_key, "0", (string)time(), ['withscores' => true]);
if (!empty($list)) {
foreach ($list as $value => $score) {
$this->dispatch($value);
}
}
}
/**
* 数据处理
* @param $value
*/
public function dispatch($value)
{
$rt = $this->_server->zRem($this->_key, $value);
if (!$rt) {
return;
}
// 执行业务逻辑
go(function () use ($value) {
$data_info = json_decode($value, true);
// 模拟网络请求
$rt = file_get_contents($data_info["url"]);
file_put_contents(EASYSWOOLE_ROOT . "/Log/{$data_info["num"]}.txt", $value . ' --- ' . ((new \DateTime())->format('Y-m-d H:i:s.u')) . ' - ' . $rt . "\n", FILE_APPEND | LOCK_EX);
if (!empty($rt) && false !== strpos($rt, "ok")) {
return;
}
$num = $data_info["num"] + 1;
// 到时间的话消费,再去给下次消费,5次内
if ($num <= 5) {
$time = $this->notifyTime($num);
$data_info["num"] = $num;
$data_info["updated"] = (new \DateTime())->format('Y-m-d H:i:s.u');
$server = $this->connect();
$server->zAdd($this->_key, $time, json_encode($data_info));
$server->close();
}
});
}
/**
* 通知时间间隔
*
* @param $num
* @return int
*/
public function notifyTime($num)
{
$time = 0;
switch ($num) {
case 1:
$time = time() + 1;
break;
case 2:
$time = time() + 3;
break;
case 3:
$time = time() + 5;
break;
case 4:
$time = time() + 10;
break;
case 5:
$time = time() + 30;
break;
default:
break;
}
return $time;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# 生产者
for ($i = 0; $i < 1000; $i++) {
go(function () {
$obj = new RedisZsetNotify();
$val = [
"num" => 1,
"sn" => RandomHelper::makeSnowFlake() . rand(1000, 9999),
"url" => "http://www.zjwoo.com/v1/health",
"updated" => (new \DateTime())->format('Y-m-d H:i:s.u')
];
$obj->zAddData((string)time(), json_encode($val));
});
}
1
2
3
4
5
6
7
8
9
10
11
12
2
3
4
5
6
7
8
9
10
11
12
# 消费者
?php
declare(strict_types=1);
namespace App\Process;
use App\Utils\RedisZsetNotify;
class RedisConsumer extends \EasySwoole\Component\Process\AbstractProcess
{
protected function run($arg)
{
while (true) {
$obj = new RedisZsetNotify();
$obj->run();
\co::sleep(0.01);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 注册进程
利用 easyswoole EasySwooleEvent mainServerCreate去注册金进程。
for ($i = 1; $i < 11; $i++) {
$processConfig = new \EasySwoole\Component\Process\Config([
'processName' => 'RedisConsumer' . $i, // 设置 进程名称为 TickProcess
'processGroup' => 'Custom', // 设置 进程组名称为 Tick
'enableCoroutine' => true, // 设置 自定义进程自动开启协程
]);
\EasySwoole\Component\Process\Manager::getInstance()->addProcess(new RedisConsumer($processConfig));
}
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
需要注意的是,dispatch
方法里面的 zrem
的原子性问题,因为开了多个消费,防止拿到相同的数据,所以直接进来就zrem
操作,然后执行成功的执行下面的逻辑。