# 1. 介绍

Redis 5.0 版本新增了一个更强大的数据结构—Stream。

1) 它提供了消息的持久化和主备复制功能,可以让任何客户端访问任何时刻的数据,并且能记住每一个客户端的访问位置,还能保证消息不丢失。

2) 它就像是个仅追加内容的消息链表,把所有加入的消息都串起来,每个消息都有一个唯一的 ID 和对应的内容。而且消息是持久化的。

# 2. 基本命令

# 2.1 消息队列相关命令

XADD - 添加消息到末尾
XTRIM - 对流进行修剪,限制长度
XDEL - 删除消息
XLEN - 获取流包含的元素数量,即消息长度
XRANGE - 获取消息列表,会自动过滤已经删除的消息
XREVRANGE - 反向获取消息列表,ID 从大到小
XREAD - 以阻塞或非阻塞方式获取消息列表
1
2
3
4
5
6
7

# 2.2 消费者组相关命令

XGROUP CREATE - 创建消费者组
XREADGROUP GROUP - 读取消费者组中的消息
XACK - 将消息标记为"已处理"
XGROUP SETID - 为消费者组设置新的最后递送消息ID
XGROUP DELCONSUMER - 删除消费者
XGROUP DESTROY - 删除消费者组
XPENDING - 显示待处理消息的相关信息
XCLAIM - 转移消息的归属权
XINFO - 查看流和消费者组的相关信息;
XINFO GROUPS - 打印消费者组的信息;
XINFO STREAM - 打印流信息
1
2
3
4
5
6
7
8
9
10
11

# 3. 独立消费

XADD key ID field value [field value ...] - 添加消息到末尾
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
1
2

$ 这个特殊的 ID 意思是 XREAD 应该使用流 mystream 已经存储的最大 ID 作为最后一个 ID。以便我们仅接收从我们开始监听时间以后的新消息。

# 4. 消费组消费

xread 虽然可以扇形分发到 N 个客户端,然而,在某些问题中,我们想要做的不是向许多客户端提供相同的消息流,而是从同一流向许多客户端提供不同的消息子集。比如下图这样,三个消费者按轮训的方式去消费一个 Stream。

image-20211220162922362-9988964

image-20211220162948294-9988989

Redis Stream 借鉴了很多 Kafka 的设计。

Consumer Group:有了消费组的概念,每个消费组状态独立,互不影响,一个消费组可以有多个消费者

last_delivered_id :每个消费组会有个游标 last_delivered_id 在数组之上往前移动,表示当前消费组已经消费到哪条消息了

pending_ids :消费者的状态变量,作用是维护消费者的未确认的 id。pending_ids 记录了当前已经被客户端读取的消息,但是还没有 ack。如果客户端没有 ack,这个变量里面的消息 ID 会越来越多,一旦某个消息被 ack,它就开始减少。这个 pending_ids 变量在 Redis 官方被称之为 PEL,也就是 Pending Entries List,这是一个很核心的数据结构,它用来确保客户端至少消费了消息一次,而不会在网络传输的中途丢失了没处理。

Stream 不像 Kafka 那样有分区的概念,如果想实现类似分区的功能,就要在客户端使用一定的策略将消息写到不同的 Stream。

xgroup create:创建消费者组

xgreadgroup:读取消费组中的消息

xack:ack 掉指定消息

img

# 4.1 创建消费组

XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY  key groupname] [DELCONSUMER key groupname consumername]
1

说明:

key :队列名称,如果不存在就创建

groupname :组名。

$ : 表示从尾部开始消费,只接受新消息,当前 Stream 消息会全部忽略。

# 4.2 读取消费组中的消息

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]

比如:XREADGROUP GROUP consumer-group-name consumer-name COUNT 1 STREAMS mystream >
1
2
3

# 5. 实战

# 生产者

produce.php

<?php

$redis = new  Redis();
$redis->connect("127.0.0.1", 6379);
$redis->auth("7GNR8pI5LOlh");
$redis->select(15);

for ($i = 0; $i < 100; $i++) {
    $str = ['id' => $i . '-' . microtime(true)];
    $rt  = $redis->xAdd('test', "*", $str);
    file_put_contents("./p.txt", "[" . microtime(true) . "] " . $rt . ' ' . json_encode($str) . PHP_EOL, FILE_APPEND | LOCK_EX);
    var_dump($rt);
}
1
2
3
4
5
6
7
8
9
10
11
12
13

# 消费者1

consumer01.php

<?php

$redis = new  Redis();
$redis->connect("127.0.0.1", 6379);
$redis->auth("7GNR8pI5LOlh");
$redis->select(15);

$stream       = "test";
$group        = "groupA";
$consumerName = "c1";

// 创建组消费
function initGroup()
{
    global $redis, $group, $stream;
    $groups     = $redis->xInfo('GROUPS', $stream);
    $existGroup = false;
    if (!empty($groups)) {
        foreach ($groups as $g) {
            if ($g["name"] === $group) {
                $existGroup = true;
                break;
            }
        }
    }

    if (!$existGroup) {
        $redis->xGroup('CREATE', $stream, $group, "0");
    }
}

initGroup();

while (true) {
    try {
        $getResult = $redis->xReadGroup($group, $consumerName, [$stream => ">"], 1);
        if ($getResult && isset($getResult[$stream]) && !empty($getResult[$stream])) {
            foreach ($getResult[$stream] as $id => $value) {
                file_put_contents("./" . $consumerName . ".txt", "[" . microtime(true) . "] " . $id . " " . json_encode($value) . PHP_EOL, FILE_APPEND | LOCK_EX);
                usleep(rand(100, 200));
                $redis->xAck($stream, $group, [$id]);
            }
        }
    } catch (Throwable $e) {
        var_dump($e->getMessage());
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47

# 消费者2

consumer02.php

<?php

$redis = new  Redis();
$redis->connect("127.0.0.1", 6379);
$redis->auth("7GNR8pI5LOlh");
$redis->select(15);

$stream       = "test";
$group        = "groupA";
$consumerName = "c2";

// 创建组消费
function initGroup()
{
    global $redis, $group, $stream;
    $groups     = $redis->xInfo('GROUPS', $stream);
    $existGroup = false;
    if (!empty($groups)) {
        foreach ($groups as $g) {
            if ($g["name"] === $group) {
                $existGroup = true;
                break;
            }
        }
    }

    if (!$existGroup) {
        $redis->xGroup('CREATE', $stream, $group, "0");
    }
}

initGroup();

while (true) {
    try {
        $getResult = $redis->xReadGroup($group, $consumerName, [$stream => ">"], 1);
        if ($getResult && isset($getResult[$stream]) && !empty($getResult[$stream])) {
            foreach ($getResult[$stream] as $id => $value) {
                file_put_contents("./" . $consumerName . ".txt", "[" . microtime(true) . "] " . $id . " " . json_encode($value) . PHP_EOL, FILE_APPEND | LOCK_EX);
                usleep(rand(100, 200));
                $redis->xAck($stream, $group, [$id]);
            }
        }
    } catch (Throwable $e) {
        var_dump($e->getMessage());
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47

二个消费者可以单独部署,或用多进程去消费。

<?php
/**
 * Description:this is description
 * User:ligan
 * Date:2021/12/16
 * Time:4:22 下午
 */

use Swoole\Process;
use Swoole\Coroutine;


function getRedis()
{
    $redis = new \Redis();
    $redis->connect("127.0.0.1", 6379);
    $redis->auth("7GNR8pI5LOlh");
    $redis->select(15);
    return $redis;
}

$stream = "test";
$group  = "groupA";

// 创建组消费
function initGroup()
{
    global $group, $stream;
    $redis      = getRedis();
    $groups     = $redis->xInfo('GROUPS', $stream);
    $existGroup = false;
    if (!empty($groups)) {
        foreach ($groups as $g) {
            if ($g["name"] === $group) {
                $existGroup = true;
                break;
            }
        }
    }

    if (!$existGroup) {
        $redis->xGroup('CREATE', $stream, $group, "0");
    }
}

const POOL_NUM = 5;
$pool = new Process\Pool(POOL_NUM);
$pool->set(['enable_coroutine' => true]);
$pool->on('WorkerStart', function (Process\Pool $pool, $workerId) use ($group, $stream) {
    /** 当前是 Worker 进程 */
    static $running = true;
    Process::signal(SIGTERM, function () use (&$running) {
        $running = false;
        echo "TERM\n";
    });
    echo("[Worker #{$workerId}] WorkerStart, pid: " . posix_getpid() . "\n");
    initGroup();
    while ($running) {
        $consumerName = "c-" . (posix_getpid() % POOL_NUM);
        $redis        = getRedis();
        $getResult    = $redis->xReadGroup($group, $consumerName, [$stream => ">"], 1);
        if ($getResult && isset($getResult[$stream]) && !empty($getResult[$stream])) {
            foreach ($getResult[$stream] as $id => $value) {
                file_put_contents("./" . $consumerName . ".txt", "[" . microtime(true) . "] " . $id . " " . json_encode($value) . PHP_EOL, FILE_APPEND | LOCK_EX);
                usleep(rand(100, 200));
                $redis->xAck($stream, $group, [$id]);
            }
        } else {
            \co::sleep(1);
        }
    }
});
$pool->on('WorkerStop', function (\Swoole\Process\Pool $pool, $workerId) {
    echo("[Worker #{$workerId}] WorkerStop\n");
});
$pool->start();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76

# 参考

PHP 操作 Redis Stream 消息队列 (opens new window)

Redis Stream (opens new window)