基于swoole的协程redis+redis zset实现延迟队列,当消费成功实现 1s,3s,5s,10s,30s 阶梯性异步通知,一旦通知返回 SUCCESS 的话,后续不通知。不然就一直通知下去,直到5次结束。

# RedisZsetNotify.php

<?php

declare(strict_types=1);

namespace App\Utils;


class RedisZsetNotify
{

    protected \Swoole\Coroutine\Redis $_server;

    protected string $_key = 'apiNotify';

    public function __construct()
    {
		$this->_server =  $this->connect();
    }
	
	public function connect(){
		$server = new \Swoole\Coroutine\Redis();
		$server->connect("127.0.0.1", 6379);
		$server->auth("7GNR8pI5LOlh");
		$server->setOptions(['compatibility_mode' => true]);
		$server->select(15);
		return $server;
	}


    public function zAddData($score, $value)
    {
        $this->_server->zAdd($this->_key, $score, $value);
    }

    /**
     * 开始执行队列
     */
    public function run()
    {
        $list = $this->_server->zRangeByScore($this->_key, "0", (string)time(), ['withscores' => true]);
        if (!empty($list)) {
            foreach ($list as $value => $score) {
                $this->dispatch($value);
            }
        }
    }

    /**
     * 数据处理
     * @param $value
     */
    public function dispatch($value)
    {
        $rt = $this->_server->zRem($this->_key, $value);
        if (!$rt) {
            return;
        }

        // 执行业务逻辑
        go(function () use ($value) {
            $data_info = json_decode($value, true);

            // 模拟网络请求
            $rt = file_get_contents($data_info["url"]);

            file_put_contents(EASYSWOOLE_ROOT . "/Log/{$data_info["num"]}.txt", $value . ' --- ' . ((new \DateTime())->format('Y-m-d H:i:s.u')) . ' - ' . $rt . "\n", FILE_APPEND | LOCK_EX);

            if (!empty($rt) && false !== strpos($rt, "ok")) {
                return;
            }

            $num = $data_info["num"] + 1;

            // 到时间的话消费,再去给下次消费,5次内
            if ($num <= 5) {
                $time                 = $this->notifyTime($num);
                $data_info["num"]     = $num;
                $data_info["updated"] = (new \DateTime())->format('Y-m-d H:i:s.u');

                $server = $this->connect();
                $server->zAdd($this->_key, $time, json_encode($data_info));
                $server->close();
            }
        });
    }


    /**
     * 通知时间间隔
     *
     * @param $num
     * @return int
     */
    public function notifyTime($num)
    {
        $time = 0;
        switch ($num) {
            case 1:
                $time = time() + 1;
                break;
            case 2:
                $time = time() + 3;
                break;
            case 3:
                $time = time() + 5;
                break;
            case 4:
                $time = time() + 10;
                break;
            case 5:
                $time = time() + 30;
                break;
            default:
                break;
        }
        return $time;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118

# 生产者

for ($i = 0; $i < 1000; $i++) {
	go(function () {
		$obj = new RedisZsetNotify();
		$val = [
			"num"     => 1,
			"sn"      => RandomHelper::makeSnowFlake() . rand(1000, 9999),
			"url"     => "http://www.zjwoo.com/v1/health",
			"updated" => (new \DateTime())->format('Y-m-d H:i:s.u')
		];
		$obj->zAddData((string)time(), json_encode($val));
	});
}
1
2
3
4
5
6
7
8
9
10
11
12

# 消费者

?php

declare(strict_types=1);

namespace App\Process;


use App\Utils\RedisZsetNotify;

class RedisConsumer extends \EasySwoole\Component\Process\AbstractProcess
{

    protected function run($arg)
    {
        while (true) {
            $obj = new RedisZsetNotify();
            $obj->run();
            \co::sleep(0.01);
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

# 注册进程

利用 easyswoole EasySwooleEvent mainServerCreate去注册金进程。

for ($i = 1; $i < 11; $i++) {
	$processConfig = new \EasySwoole\Component\Process\Config([
		'processName'     => 'RedisConsumer' . $i,              // 设置 进程名称为 TickProcess
		'processGroup'    => 'Custom',                          // 设置 进程组名称为 Tick
		'enableCoroutine' => true,                              // 设置 自定义进程自动开启协程
	]);
	\EasySwoole\Component\Process\Manager::getInstance()->addProcess(new RedisConsumer($processConfig));
}
1
2
3
4
5
6
7
8

需要注意的是,dispatch 方法里面的 zrem 的原子性问题,因为开了多个消费,防止拿到相同的数据,所以直接进来就zrem 操作,然后执行成功的执行下面的逻辑。