# 1. 介绍
Redis 5.0 版本新增了一个更强大的数据结构—Stream。
1) 它提供了消息的持久化和主备复制功能,可以让任何客户端访问任何时刻的数据,并且能记住每一个客户端的访问位置,还能保证消息不丢失。
2) 它就像是个仅追加内容的消息链表,把所有加入的消息都串起来,每个消息都有一个唯一的 ID 和对应的内容。而且消息是持久化的。
# 2. 基本命令
# 2.1 消息队列相关命令
XADD - 添加消息到末尾
XTRIM - 对流进行修剪,限制长度
XDEL - 删除消息
XLEN - 获取流包含的元素数量,即消息长度
XRANGE - 获取消息列表,会自动过滤已经删除的消息
XREVRANGE - 反向获取消息列表,ID 从大到小
XREAD - 以阻塞或非阻塞方式获取消息列表
2
3
4
5
6
7
# 2.2 消费者组相关命令
XGROUP CREATE - 创建消费者组
XREADGROUP GROUP - 读取消费者组中的消息
XACK - 将消息标记为"已处理"
XGROUP SETID - 为消费者组设置新的最后递送消息ID
XGROUP DELCONSUMER - 删除消费者
XGROUP DESTROY - 删除消费者组
XPENDING - 显示待处理消息的相关信息
XCLAIM - 转移消息的归属权
XINFO - 查看流和消费者组的相关信息;
XINFO GROUPS - 打印消费者组的信息;
XINFO STREAM - 打印流信息
2
3
4
5
6
7
8
9
10
11
# 3. 独立消费
XADD key ID field value [field value ...] - 添加消息到末尾
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
2
$ 这个特殊的 ID 意思是 XREAD 应该使用流 mystream 已经存储的最大 ID 作为最后一个 ID。以便我们仅接收从我们开始监听时间以后的新消息。
# 4. 消费组消费
xread 虽然可以扇形分发到 N 个客户端,然而,在某些问题中,我们想要做的不是向许多客户端提供相同的消息流,而是从同一流向许多客户端提供不同的消息子集。比如下图这样,三个消费者按轮训的方式去消费一个 Stream。
Redis Stream 借鉴了很多 Kafka 的设计。
Consumer Group:有了消费组的概念,每个消费组状态独立,互不影响,一个消费组可以有多个消费者
last_delivered_id :每个消费组会有个游标 last_delivered_id 在数组之上往前移动,表示当前消费组已经消费到哪条消息了
pending_ids :消费者的状态变量,作用是维护消费者的未确认的 id。pending_ids 记录了当前已经被客户端读取的消息,但是还没有 ack。如果客户端没有 ack,这个变量里面的消息 ID 会越来越多,一旦某个消息被 ack,它就开始减少。这个 pending_ids 变量在 Redis 官方被称之为 PEL,也就是 Pending Entries List,这是一个很核心的数据结构,它用来确保客户端至少消费了消息一次,而不会在网络传输的中途丢失了没处理。
Stream 不像 Kafka 那样有分区的概念,如果想实现类似分区的功能,就要在客户端使用一定的策略将消息写到不同的 Stream。
xgroup create:创建消费者组
xgreadgroup:读取消费组中的消息
xack:ack 掉指定消息
# 4.1 创建消费组
XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
说明:
key :队列名称,如果不存在就创建
groupname :组名。
$ : 表示从尾部开始消费,只接受新消息,当前 Stream 消息会全部忽略。
# 4.2 读取消费组中的消息
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
比如:XREADGROUP GROUP consumer-group-name consumer-name COUNT 1 STREAMS mystream >
2
3
# 5. 实战
# 生产者
produce.php
<?php
$redis = new Redis();
$redis->connect("127.0.0.1", 6379);
$redis->auth("7GNR8pI5LOlh");
$redis->select(15);
for ($i = 0; $i < 100; $i++) {
$str = ['id' => $i . '-' . microtime(true)];
$rt = $redis->xAdd('test', "*", $str);
file_put_contents("./p.txt", "[" . microtime(true) . "] " . $rt . ' ' . json_encode($str) . PHP_EOL, FILE_APPEND | LOCK_EX);
var_dump($rt);
}
2
3
4
5
6
7
8
9
10
11
12
13
# 消费者1
consumer01.php
<?php
$redis = new Redis();
$redis->connect("127.0.0.1", 6379);
$redis->auth("7GNR8pI5LOlh");
$redis->select(15);
$stream = "test";
$group = "groupA";
$consumerName = "c1";
// 创建组消费
function initGroup()
{
global $redis, $group, $stream;
$groups = $redis->xInfo('GROUPS', $stream);
$existGroup = false;
if (!empty($groups)) {
foreach ($groups as $g) {
if ($g["name"] === $group) {
$existGroup = true;
break;
}
}
}
if (!$existGroup) {
$redis->xGroup('CREATE', $stream, $group, "0");
}
}
initGroup();
while (true) {
try {
$getResult = $redis->xReadGroup($group, $consumerName, [$stream => ">"], 1);
if ($getResult && isset($getResult[$stream]) && !empty($getResult[$stream])) {
foreach ($getResult[$stream] as $id => $value) {
file_put_contents("./" . $consumerName . ".txt", "[" . microtime(true) . "] " . $id . " " . json_encode($value) . PHP_EOL, FILE_APPEND | LOCK_EX);
usleep(rand(100, 200));
$redis->xAck($stream, $group, [$id]);
}
}
} catch (Throwable $e) {
var_dump($e->getMessage());
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# 消费者2
consumer02.php
<?php
$redis = new Redis();
$redis->connect("127.0.0.1", 6379);
$redis->auth("7GNR8pI5LOlh");
$redis->select(15);
$stream = "test";
$group = "groupA";
$consumerName = "c2";
// 创建组消费
function initGroup()
{
global $redis, $group, $stream;
$groups = $redis->xInfo('GROUPS', $stream);
$existGroup = false;
if (!empty($groups)) {
foreach ($groups as $g) {
if ($g["name"] === $group) {
$existGroup = true;
break;
}
}
}
if (!$existGroup) {
$redis->xGroup('CREATE', $stream, $group, "0");
}
}
initGroup();
while (true) {
try {
$getResult = $redis->xReadGroup($group, $consumerName, [$stream => ">"], 1);
if ($getResult && isset($getResult[$stream]) && !empty($getResult[$stream])) {
foreach ($getResult[$stream] as $id => $value) {
file_put_contents("./" . $consumerName . ".txt", "[" . microtime(true) . "] " . $id . " " . json_encode($value) . PHP_EOL, FILE_APPEND | LOCK_EX);
usleep(rand(100, 200));
$redis->xAck($stream, $group, [$id]);
}
}
} catch (Throwable $e) {
var_dump($e->getMessage());
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
二个消费者可以单独部署,或用多进程去消费。
<?php
/**
* Description:this is description
* User:ligan
* Date:2021/12/16
* Time:4:22 下午
*/
use Swoole\Process;
use Swoole\Coroutine;
function getRedis()
{
$redis = new \Redis();
$redis->connect("127.0.0.1", 6379);
$redis->auth("7GNR8pI5LOlh");
$redis->select(15);
return $redis;
}
$stream = "test";
$group = "groupA";
// 创建组消费
function initGroup()
{
global $group, $stream;
$redis = getRedis();
$groups = $redis->xInfo('GROUPS', $stream);
$existGroup = false;
if (!empty($groups)) {
foreach ($groups as $g) {
if ($g["name"] === $group) {
$existGroup = true;
break;
}
}
}
if (!$existGroup) {
$redis->xGroup('CREATE', $stream, $group, "0");
}
}
const POOL_NUM = 5;
$pool = new Process\Pool(POOL_NUM);
$pool->set(['enable_coroutine' => true]);
$pool->on('WorkerStart', function (Process\Pool $pool, $workerId) use ($group, $stream) {
/** 当前是 Worker 进程 */
static $running = true;
Process::signal(SIGTERM, function () use (&$running) {
$running = false;
echo "TERM\n";
});
echo("[Worker #{$workerId}] WorkerStart, pid: " . posix_getpid() . "\n");
initGroup();
while ($running) {
$consumerName = "c-" . (posix_getpid() % POOL_NUM);
$redis = getRedis();
$getResult = $redis->xReadGroup($group, $consumerName, [$stream => ">"], 1);
if ($getResult && isset($getResult[$stream]) && !empty($getResult[$stream])) {
foreach ($getResult[$stream] as $id => $value) {
file_put_contents("./" . $consumerName . ".txt", "[" . microtime(true) . "] " . $id . " " . json_encode($value) . PHP_EOL, FILE_APPEND | LOCK_EX);
usleep(rand(100, 200));
$redis->xAck($stream, $group, [$id]);
}
} else {
\co::sleep(1);
}
}
});
$pool->on('WorkerStop', function (\Swoole\Process\Pool $pool, $workerId) {
echo("[Worker #{$workerId}] WorkerStop\n");
});
$pool->start();
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76