# 网关限流

可查看文章Nginx限流

# 资源cdn

前端静态页面,图片,样式全走cdn。

# MySQL 悲观锁

悲观锁的方案采用的是排他读,也就是同时只能有一个进程读取到stock的值。事务在提交或回滚之后,锁会释放,其他的进程才能读取。

该方案最简单易懂,在对性能要求不高时,可以直接采用该方案。要注意的是,SELECT … FOR UPDATE要尽可能的使用索引,以便锁定尽可能少的行数;

排他锁是在事务执行结束之后才释放的,不是读取完成之后就释放,因此使用的事务应该尽可能的早些提交或回滚,以便早些释放排它锁。

$pdo = new EasyPdo([
	'host'       => '127.0.0.1',
	'port'       => 3308,
	'database'   => 'test',
	'username'   => 'root',
	'password'   => '123456',
	'charset'    => 'utf8mb4',
	'unixSocket' => null,
	'options'    => [],
	'size'       => 5,
]);

$pdo->beginTransaction();
$row = $pdo->query("SELECT stock FROM goods WHERE id=1 LIMIT 1 FOR UPDATE");
$stock = intval($row['stock']);
if ($stock > 0) {
	$affected_rows = $pdo->update("goods", ["stock[-]" => 1], ["id" => 1]);
	if ($affected_rows == 1) {
		$sn = uniqid() . rand(1000, 9999);
		$id = $pdo->insert("order", [
			"user_id"    => 1,
			"order_sn"   => $sn,
			"goods_id"   => 1,
			"num"        => 1,
			"created_at" => (new \DateTime())->format("Y-m-d H:i:s.u"),
		]);
		if ($id) {
			$pdo->commit();
			echo "success:" . $stock;
		} else {
			$pdo->rollBack();
			echo "fail1:" . $stock;
		}
	} else {
		$pdo->rollBack();
		echo "fail2:" . $stock;
	}
} else {
	$pdo->commit();
	echo "fail3:" . $stock;
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42

# MySQL 乐观锁

悲观锁的方案保证了数据库中stock的值在同一时间只能被一个进程读取并处理,也就是并发的读取进程到这里要排队依次执行。

乐观锁的方案虽然stock的值可以被多个进程同时读取到,但是更新操作中stock的等值判断可以保证并发的更新操作在同一时间只能有一个更新成功。

假如此时stock为10,同时有5个进程读取到了stock=10,对于乐观锁的方案由于stock字段的等值判断,这5个进程只会有一个更新成功,这5个进程执行完成之后stock为9。

$pdo = new EasyPdo([
	'host'       => '127.0.0.1',
	'port'       => 3308,
	'database'   => 'test',
	'username'   => 'root',
	'password'   => '123456',
	'charset'    => 'utf8mb4',
	'unixSocket' => null,
	'options'    => [],
	'size'       => 5,
]);

$pdo->beginTransaction();
$row = $pdo->get("goods", ["stock"], ['id' => 1]);
$num = intval($row['stock']);
if ($num > 0) {
	$affected_rows = $pdo->update("goods", ["stock[-]" => 1], ["id" => 1, "stock" => $row["stock"], "stock[>]" => 1]);
	if ($affected_rows == 1) {
		$sn = uniqid() . rand(1000, 9999);
		$id = $pdo->insert("order", [
			"user_id"    => 1,
			"order_sn"   => $sn,
			"goods_id"   => 1,
			"num"        => 1,
			"created_at" => (new \DateTime())->format("Y-m-d H:i:s.u"),
		]);
		if ($id) {
			$pdo->commit();
			echo "success:" . $num;
		} else {
			$pdo->rollBack();
			echo "fail1:" . $num;
		}
	} else {
		$pdo->rollBack();
		echo "fail2:" . $num;
	}
} else {
	$pdo->commit();
	echo "fail3:" . $num;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41

# redis watch

watch用于监视一个(或多个) key,如果在事务执行之前这个(或这些) key 被其他命令所改动,那么事务将被打断。 这种方案跟mysql中的乐观锁方案类似,具体表现也是一样的。

$stock = $redis->get('stock');
if($stock > 0) {
	$redis->watch('stock');
	$res = $redis->multi()->decr('stock')->lPush('result',$stock)->exec();
	if($res == false){
		echo "fail1";
	}else{
		echo "success:".$stock;
	}
}else{
  echo "fail2";
}

1
2
3
4
5
6
7
8
9
10
11
12
13

# redis list方案

基于队列的方案利用了redis出队操作的原子性,抢购开始之前首先将商品编号放入响应的队列中,在抢购时依次从队列中弹出操作,这样可以保证每个商品只能被一个进程获取并操作,不存在超发的情况。

该方案的优点是理解和实现起来都比较简单,缺点是当商品数量较多是,需要将大量的数据存入到队列中,并且不同的商品需要存入到不同的消息队列中。

public function init()
{
	$redis->del('goods');
	for($i=1;$i<=10;$i++){
		$redis->lPush('goods',$i);
	}
	$redis->del('result');
	echo 'init done';
}


public function run()
{
	$goods_id = $redis->rPop('goods');
	if($goods_id == false) {
		echo "fail1";
	}else{
		$res = $redis->lPush('result',$goods_id);
		if($res == false){
			echo "writelog:".$goods_id;
		}else{
			echo "success".$goods_id;
		}
	}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

redis5 以后加入了 stream,可以用作MQ处理,具有ack机制会更好。

# redis decr

如果我们将剩余量num设置为一个键值类型,每次先get之后判断,然后再decr是不能解决超发问题的。

但是redis中的decr操作会返回执行后的结果,可以解决超发问题。

我们首先get到num的值进行第一步判断,避免每次都去更新num的值,然后再对num执行decr操作,并判断decr的返回值,如果返回值不小于0,这说明decr之前是大于0的,用户抢购成功。

public function run()
{
	$num = $redis->get('num');
	if($num > 0) {
		$retNum = $redis->decr('num');
		if($retNum >= 0){
			$res = $redis->lPush('result',$retNum);
			if($res == false){
				echo "writeLog:".$retNum;
			}else{
				echo "success:".$retNum;
			}
		}else{
			echo "fail1";
		}
	}else{
		echo "fail2";
	}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

# redis setnx 排它锁

redis没有像mysql中的排它锁,但是可以通过一些方式实现排它锁的功能,就类似php使用文件锁实现排它锁一样。

setnx实现了exists和set两个指令的功能,若给定的key已存在,则setnx不做任何动作,返回0;若key不存在,则执行类似set的操作,返回1。

我们设置一个超时时间timeout,每隔一定时间尝试setnx操作,如果设置成功就是获得了相应的锁,执行num的decr操作,操作完成删除相应的key,模拟释放锁的操作。


public function run()
{
	do {
		$res = $redis->setnx("numKey",1);
		$this->timeout -= 100;
	}while($res == 0 && $this->timeout>0);
	
	if($res == 0){
		echo 'fail1';
	}else{
		$num = $redis->get('num');
		if($num > 0) {
			$redis->decr('num');
			$res = $redis->lPush('result',$num);
			if($res == false){
				echo "fail2";
			}else{
				echo "success:".$num;
			}
		}else{
		  echo "fail3";
		}
		$redis->del("numKey");
	}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

# redis + mq

public function index()
{
	// 扣减库存
	$key    = "goods:1";
	$num    = 1;//rand(1, 5);
	$userId = rand(1, 2);
	try {
		// 大量请求到达的时候,这个判断是扛不住的,会穿过这个判断的,也就是说如果库存100的情况,会瞬间打到大于库存的量
		$curr = RedisCache::defer()->get($key);
		if (empty($curr) || intval($curr) <= 0) {
			return $this->writeJson(200, ["code" => 1, "msg" => "很抱歉全部被抢光了,下次再来!"]);
		}

		$requestId = RandomHelper::makeSnowFlake() . rand(1000, 9999) . $userId;
		$lastNum   = RedisCache::defer()->decrBy($key, $num);
		if ($lastNum < 0) {
			throw new \Exception("很抱歉全部被抢光了,下次再来!");
		}

		// 推送MQ,主要的压力在这... ,优化成多路复用
		self::pushMQ($requestId, $num, $userId);

		// 直接返回请求单号,前端轮询获取订单状态
		$this->writeJson(200, ["code" => 0, "msg" => "排队成功,正在处理", "requestId" => $requestId]);

	} catch (\Throwable $e) {
		// 回滚,但是这边最好做补偿,就是回滚失败的情况,需要异步补偿
		RedisCache::defer()->incrBy($key, $num);
		$this->writeJson(200, ["code" => 1, "msg" => $e->getMessage()]);
	}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

mq直接消费即可。但是会存在一个问题,就是多个mq消费者的时候,加mysql乐观锁会导致部分失败的情况(同时操作同一个资源)。

# 本地库存分片 + 队列

针对于超大并发的情况,如:12306购票,双11 这种。很多都会本地存库存。

WechatIMG22.png

base.php

<?php

class Base
{
    static $redisObj;

    static function conRedis($config = array())
    {
        if (self::$redisObj) return self::$redisObj;
        self::$redisObj = new \Redis();
        self::$redisObj->connect("127.0.0.1", 6379);
        return self::$redisObj;
    }

    static function output($data = array(), $errNo = 0, $errMsg = 'ok')
    {
        $res['errno'] = $errNo;
        $res['errmsg'] = $errMsg;
        $res['data'] = $data;
        echo json_encode($res);exit();
    }
}

?>

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

api.php

<?php
include('base.php');
class Api extends Base
{
    //共享信息,存储在redis中,以hash表的形式存储,%s变量代表的是商品id
    static $userId;
    static $productId;

    static $REDIS_REMOTE_HT_KEY         = "product_%s";     //共享信息key
    static $REDIS_REMOTE_TOTAL_COUNT    = "total_count";    //商品总库存
    static $REDIS_REMOTE_USE_COUNT      = "used_count";     //已售库存
    static $REDIS_REMOTE_QUEUE          = "c_order_queue";  //创建订单队列

    static $APCU_LOCAL_STOCK    = "apcu_stock_%s";       //总共剩余库存

    static $APCU_LOCAL_USE      = "apcu_stock_use_%s";   //本地已售多少
    static $APCU_LOCAL_COUNT    = "apcu_total_count_%s"; //本地分库存分摊总数

    public function __construct($productId, $userId)
    {
        self::$REDIS_REMOTE_HT_KEY  = sprintf(self::$REDIS_REMOTE_HT_KEY, $productId);
        self::$APCU_LOCAL_STOCK     = sprintf(self::$APCU_LOCAL_STOCK, $productId);
        self::$APCU_LOCAL_USE       = sprintf(self::$APCU_LOCAL_USE, $productId);
        self::$APCU_LOCAL_COUNT     = sprintf(self::$APCU_LOCAL_COUNT, $productId);
        self::$APCU_LOCAL_COUNT     = sprintf(self::$APCU_LOCAL_COUNT, $productId);
        self::$userId               = $userId;
        self::$productId            = $productId;
    }
    static  function clear(){
    apcu_delete(self::$APCU_LOCAL_STOCK);
    apcu_delete(self::$APCU_LOCAL_USE);
    apcu_delete(self::$APCU_LOCAL_COUNT);
        
    }
    /*查剩余库存*/
    static function getStock()
    {
    $stockNum = apcu_fetch(self::$APCU_LOCAL_STOCK);
        if ($stockNum === false) {
            $stockNum = self::initStock();
        }
        self::output(['stock_num' => $stockNum]);
    }

    /*抢购-减库存*/
    static function buy()
    {
        $localStockNum = apcu_fetch(self::$APCU_LOCAL_COUNT);
        if ($localStockNum === false) {
            $localStockNum = self::init();
        }

        $localUse = apcu_inc(self::$APCU_LOCAL_USE);//本已卖 + 1
        if ($localUse > $localStockNum) {//抢购失败 大部分流量在此被拦截
        echo 1;
            self::output([], -1, '该商品已售完');
        }

        //同步已售库存 + 1;
        if (!self::incUseCount()) {//改失败,返回商品已售完
            self::output([], -1, '该商品已售完');
        }

        //写入创建订单队列
        self::conRedis()->lPush(self::$REDIS_REMOTE_QUEUE, json_encode(['user_id' => self::$userId, 'product_id' => self::$productId]));
        //返回抢购成功
        self::output([], 0, '抢购成功,请从订单中心查看订单');
    }

    /*创建订单*/
    /*查询订单*/
    /*总剩余库存同步本地,定时执行就可以*/
    static function sync()
    {
    $data = self::conRedis()->hMGet(self::$REDIS_REMOTE_HT_KEY, [self::$REDIS_REMOTE_TOTAL_COUNT, self::$REDIS_REMOTE_USE_COUNT]);
        $num = $data['total_count'] - $data["used_count"];
        apcu_add(self::$APCU_LOCAL_STOCK, $num);
        self::output([], 0, '同步库存成功');
    }
    /*私有方法*/
    //库存同步
    private static function incUseCount()
    {
        //同步远端库存时,需要经过lua脚本,保证不会出现超卖现象
        $script = <<<eof
            local key = KEYS[1]
            local field1 = KEYS[2]
            local field2 = KEYS[3]
            local field1_val = redis.call('hget', key, field1)
            local field2_val = redis.call('hget', key, field2)
            if(field1_val>field2_val) then
                return redis.call('HINCRBY', key, field2,1)
            end
            return 0
eof;
        return self::conRedis()->eval($script,[self::$REDIS_REMOTE_HT_KEY,  self::$REDIS_REMOTE_TOTAL_COUNT, self::$REDIS_REMOTE_USE_COUNT] , 3);
    }
    /*初始化本地数据*/
    private static function init()
    {
        apcu_add(self::$APCU_LOCAL_COUNT, 150);
        apcu_add(self::$APCU_LOCAL_USE, 0);
    }
    static  function initStock(){
        $data = self::conRedis()->hMGet(self::$REDIS_REMOTE_HT_KEY, [self::$REDIS_REMOTE_TOTAL_COUNT, self::$REDIS_REMOTE_USE_COUNT]);
        $num = $data['total_count']- $data["used_count"];
        apcu_add(self::$APCU_LOCAL_STOCK, $num);
        return $num;
    }

}

try{
$act = $_GET['act'];
$product_id = $_GET['product_id'];
$user_id = $_GET['user_id'];

$obj = new Api($product_id, $user_id);
if (method_exists($obj, $act)) {
    $obj::$act();
    die;
}
echo 'method_error!';
} catch (\Exception $e) {
    echo 'exception_error!';
    var_dump($e);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127

# 参考资料