vlambda博客
学习文章列表

redis必会基础命令、数据结构、lua脚本和分布式锁等

导读

本文介绍日常工作中redis的使用,涉及到redis的数据结构、对应的命令、持久化配置和Lua脚本,以及基于redis的分布式锁实现方案,使用redis时这些都是必会的基础知识,建议保存以下命令

通用命令

# 查看当前库中key的数量
dbsize

#
 清空当前库
flushdb

#
 清空所有库
flushall

#
 查看当前库下所有key
keys *

#
 当前库下是否有指定key
exists key1

#
 查看key的值类型
type key1

#
 删除key
del key1

#
 设置指定key的过期时间,单位秒
expire key1 10

#
 查看key的剩余过期时间,-1表示永不过期,-2表示已过期
ttl key1

#
 监视key
watch key1

#
 取消监视key
unwatch key1

数据结构和命令

string

是key/value的数据结构,一个key对应一个string类型的value,单个value最大512M

这个是最常用的数据结构了

命令
# 设置key/value
set key1 value1

#
 设置key/value的同时设置过期时间
setex key1 10 value1

#
 获取key的值
get key1

#
 设置新值,同时返回旧值
getset key1 value1

#
 一次设置多个key/value,后面的....表示还可以写key3 value3等
mset key1 value1 key2 value2 ...

#
 一次获取多个key的值
mget key1 key2

#
 追加内容到指定key的值后面
append key1 xxx

#
 获取值的长度
strlen key1

#
 只有在key不存在时才成功
setnx key1 value1

#
 只有在所有key不存在时才成功
msetnx key1 value1 key2 value2 ...

#
 给指定key的值加1
incr key1

#
 减1
decr key1

#
 给指定key的值加指定数值,本例是加2
incrby key1 2

#
 给指定key的值减指定数值,本例是减2
decrby key1 2

#
 获取指定key的值中指定范围的字符,如值为abcdefg,取1至2返回bc,即包含1和2两个位置的字符
getrange key1 1 2

#
 设置指定位置的值,指定开始位置,然后直接覆盖,如下例中值为abcdefg,从第1个位置开始覆盖为cb,则结果为acbdefg
setrange key1 1 cb

list

双向链表,无序可重复的集合,一般用来做队列

命令
# 从表头添加元素,value2是新的表头
lpush key1 value1 value2 ...

#
 从表尾添加元素,value2是新的表尾
rpush key1 value1 value2 ...

#
 从表头弹出元素
lpop key1

#
 从表尾弹出元素
rpop key1

#
 从key1表尾弹出一个元素,再加到key2表头
rpoplpush key1 key2

#
 从表中查看指定索引的范围的元素
lrange key1 0 2

#
 查看整个链表
lrange key1 1 0 -1

#
 获取链表中从左向右指定索引的元素
lindex key1 1

#
 获取链表中最后一个元素
lindex key1 -1

#
 获取链表长度
llen key1

#
 向链表中的value1前面插入value2
linsert key1 before value1 value2

#
 向链表中value1后面插入value2
linsert key1 after value1 value2

#
 从链表中删除一个值为value1的元素,从左向右
lrem key1 1 value1

#
 从链表中删除一个值为value1的元素,从右向左
lrem key1 -1 value1

#
 删除链表中所有值为value1的元素
lrem key1 0 value1

set

无序不可重复的集合,常用来排除重复数据和随机抽奖功能

命令
# 向集合中添加元素,重复元素会自动跳过
sadd key1 value1 value2 ...

#
 取出集合所有元素
smembers key1

#
 判断集合中是否存在某个元素
sismember key1 value1

#
 获取集合中的元素个数
scard key1

#
 从集合中删除指定元素
srem key1 value1 value2 ...

#
 随机从集合中弹出一个元素并删除该元素
spop key1

#
 随机从集合中取出元素,但不会删除元素,后面的1表示取出元素的个数
srandmember key1 1

#
 求两个集合交集
sinter key1 key2

#
 求两个集合并集
sunion key1 key2

#
 求两个集合差集
sdiff key1 key2

zset

有序不可重复的集合,常用来做排行榜

命令
# 添加元素,相同value不同score会覆盖score
zadd key1 score1 value1 score2 value2

#
 获取元素数量
zcard key1

#
 取出全部元素,从小到大
zrange key1 0 -1

#
 取出部分元素,从小到大
zrange key1 0 4

#
 取出全部元素,从大到小
zrevrange key1 0 -1

#
 取出部分元素,从大到小
zrevrange key1 0 4

#
 取出score在指定范围内的元素,从小到大,其中min和max是score的范围
zrangebyscore key1 min max withscores

#
 取出score在指定范围内的元素,从大到小
zrevrangebyscore key1 max min withscores

#
 为指定value的元素的score递增,其中1是每次递增多少,可以为负数
zincrby key1 1 value1

#
 删除指定元素
zrem key1 value1

#
 统计集合中score在范围内的元素个数
zcount key1 min max

#
 返回指定值在集合中的排名,从小到大,排名从0开始
zrank key1 value1

#
 返回指定值在集合中的排名,从大到小
zrevrank key1 value1

hash

类似于Java中的Map<String, String>

命令
# 添加一个键值对
hset key1 field1 value1

#
 获取键值
hget key1 field1

#
 批量设置键值对
hmset key1 field1 value1 field2 value2 ...

#
 检查键是否存在
hexists key1 field1

#
 获取所有键
hkeys key1

#
 获取所有值
hvals key1

#
 键值递增,后面的1表示每次递增多少,可以为负数,当是负数时表示递减
hincrby key1 field1 1

#
 键不存在时成功
hsetnx key1 field1 value1

#
 获取所有键值对,奇数为键,偶数为值
hgetall key1

bitmap

bitmap以bit为单位设置各个位的值(要么是0,要么是1),根据实际应用场景可以设计出节省空间的算法,如布隆过滤器,本文以记录用户签到为例,假设用户ID为1,每年一个key,并且key=用户ID_年份,如1_2021

ID=1的用户在2021-01-01这一天签到,这一天是2021年第1天(也就是第0天),可以执行以下命令,保存签到记录

# 设置1_2021这个key的第0个bit值为1,以此表示第0天签到成功
setbit 1_2021 0 1

该用户在2021-01-03这一天签到,则执行以下命令

# 设置1_2021这个key的第2个bit值为1,以此表示第2天签到成功
setbit 1_2021 2 1

现在想查询该用户在2021年的签到情况,可通过get命令实现

get 1_2021
# 输出\xa0

get命令输出0xa0,这是十六进制,转成二进制,就是10100000,二进制中为1的位就表示那一天签到了,所以第0天和第2天签到了

判断该用户2021-01-03是否签到

getbit 1_2021 2

统计该用户2021年有多少次签到,实际上是统计有多个位是1

bitcount 1_2021
# 输出2

统计该用户指定日期范围内的签到次数,这个不太好实现,redis提供的命令中指定范围的单位是byte,比如统计2021年1月的次数,就是第0个字节到第3个字节(第0 1 2 3共4个字节),这样多统计了1天,即把2021-02-01这一天也统计进来,如下:

# 统计第0到第3个字节中为1的位个数,包含第3个字节
bitcount 1_2021 0 3

这种情况要么按月来设定key值,要么单独查询2021-02-01这一天是否签到,如果签到则总次数就减1

通过上面的例子,可以看到以bit为单位存储非常节省空间,用8个bit就可以表示8天内的签到情况。也可以用bitmap来存储所有用户一天内的签到情况,这种就以用户ID作为bit的偏移量,如果用户ID很大,超过了bitmap的最大范围,可以通过用户ID分片到不同的bitmap上

地理位置

在同一个key内添加多个位置(经纬度),计算位置各个位置之间的距离,也可指定圆心按半径查找符合条件的位置,可实现附近的xxx功能

命令
# 向key1添加一个叫company的位置,经纬度为116.404844 39.915378
geoadd key1 116.404844 39.915378 company

#
 向key1添加一个叫home的位置,经纬度为116.370924 39.930871
geoadd key1 116.370924 39.930871 home

#
 查询指定位置的经纬度
geopos key1 company

#
 查询多个位置的经纬度
geopos key1 company home

#
 计算两个位置的距离,单位是m
geodist key1 company home

#
 计算两个位置的距离,指定单位为km
geodist key1 company home km

#
 以指定经纬度为圆心,查询指定半径内的所有位置,其中116.370924 39.930871是圆心点的经纬度,2000 m是半径大小,单位m,withdist表示输出符合条件的位置与圆心的距离,withcoord表示输出符合条件的位置的经纬度,asc表示按距离从小到大排序
georadius key1 116.370924 39.930871 2000 m withdist withcoord asc

#
 以指定位置为圆心,查询指定半径内的所有位置,返回结果中包含圆心自身,其他可选参数与上一条georadius相同
georadiusbymember key1 home 4000 m

持久化

redis大部分时间用来做缓存,通常数据丢失也可以恢复,但是有时候也会用来存储热门的数据,或者nginx直接连接redis做一些重要数据的存储(丢失后很难恢复),所以redis中的数据需要持久化

redis提供了RDB和AOF两种持久化方式,RDB是对当前数据的全量备份(理解成快照),AOF采用追加的方式记录所有写入的命令,所以一般AOF文件更大,可能导致硬盘被占满,这一点需要注意,需要及时的对AOF文件进行瘦身

RDB

执行savebgsave命令会生成一份当前内存数据的快照到.rdb文件内,其中bgsave命令是另起一个线程去执行,因此不会阻塞主线程

redis默认开启了RDB,redis会自动进行RDB存储,RDB常用配置参数:

save 300 1000 # 每隔300秒,如果有1000个key发生了变化,则备份一次
save 30 10000 # 每隔30秒,如果有10000个key发生了变化,则备份一次

上面的参数不能乱改,要根据redis的写入数据情况来设置,不能太频繁的生成RDB快照,这会影响redis的性能

AOF

一般做持久化要同时开启RDB和AOF,下面介绍工作中如何设置redis的AOF,编辑redis.conf配置文件,修改以下配置项

# 开启AOF
appendonly yes

#
 AOF文件名
appendfilename "xxx.aof"

#
 将命令刷入磁盘的间隔,everysec是每秒一次
appendfsync everysec

#
 在执行bgrewrite的时候不向磁盘刷新数据
no-appendfsync-on-rewrite no

关于appendfsync的意思:在计算机组成原理中,我们知道相对于内存而言对磁盘的读写是很慢的,所以CPU将数据写入内存缓冲区,定时或存满后再写入磁盘,redis的AOF中appendfsync这个配置就是设置多久写入磁盘一次,设为一秒是比较保险的,如果发生故障只会丢失最近1秒内的数据

RDB和AOF对比

  1. RDB定期对内存中的数据进行快照,会影响redis的性能,所以不能太频繁
  2. RDB在快照之间如何发生错误会丢失此段时间内的数据
  3. RDB在重启redis时恢复速度更快,不像AOF那样需要一条一条命令执行
  4. AOF可设置每秒追加一次写入命令到aof文件中,所以发生故障时丢失数据最少
  5. AOF文件中保存的是redis的写入命令,所以可以打开文件进行修改,删除不需要的命令
  6. AOF的缺点是要记录redis做的每一步写入命令,所以文件很大,需要及时进行瘦身

lua脚本

介绍

redis中默认就支持lua脚本,我们通常会使用lua脚本来代替redis事务,可解决超卖和少卖的情况

以下是redis的lua脚本特性:

  1. 原子操作,lua脚本会作为一个整体执行,不会被其他连接的命令中断,因此可替代事务
  2. lua脚本加载后可重复使用
  3. 减少网络请求的开销,一次性发出一个lua脚本到redis,redis执行完后返回结果,不用多次请求

用法

直接执行
eval 脚本 参数数量 参数名1 参数名2 值1 值2

示例:

eval "return KEYS[1]..ARGV[1]" 1 key1 val1
# 输出
key1val1

lua脚本中KEYSARGV两个数组名是固定的,且索引从1开始,上面例子中参数数量为1,参数名1为key1,值1为val1,而脚本是拼接key1和val1,所以结果就是key1val1了

注意:如果脚本有多个参数,则参数名写在一起,参数值写在一起,如key1 key2 val1 val2,而不是key1 val1 key2 val2

加载脚本

加载脚本的目的是重复使用,通过script load命令实现,返回一个sha1的hash值,之后通过此值可调用已加载的脚本

script load "return KEYS[1]..ARGV[1]"
# 假设返回3783a90bf1f43b15a1e06c4e7664da956ed959d9
调用脚本
# 3783a90bf1f43b15a1e06c4e7664da956ed959d9 是script load返回的结果
# 1 是参数数量
# key1 是参数名1
# val1 是参数值1
evalsha 3783a90bf1f43b15a1e06c4e7664da956ed959d9 1 key1 val1

指定加载脚本返回的sha1 hash值来调用脚本,并指定参数数量和参数名以及参数值

判断脚本是否加载
script exists 3783a90bf1f43b15a1e06c4e7664da956ed959d9

Java中使用redis lua脚本

本例使用spring提供的RedisTemplate,首先引入依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

定义一个RedisClient,对常用命令进行封装

@Component
public class RedisClient {
    @Autowired
    @Qualifier("redisTemplate")
    private RedisTemplate redisTemplate;

    /**
     * 执行脚本
     * @param clazz 返回结果类型
     * @param script lua脚本
     * @param keys lua脚本的参数名
     * @param args lua脚本的值
     * @param <T>
     * @return
     */

    public <T> execute(Class<T> clazz, String script, List<Object> keys, Object... args) {
        DefaultRedisScript<T> redisScript = new DefaultRedisScript<>();
        redisScript.setResultType(clazz);
        redisScript.setScriptText(script);

        return (T) redisTemplate.execute(redisScript, keys, args);
    }
}

测试使用,先在redis中set两个key出来,分别是test_key1和test_key2,以下lua脚本是对这两个key进行递增操作,分别递增1和2

String script = "redis.call(\"INCRBY\", KEYS[1], ARGV[1])\nredis.call(\"INCRBY\", KEYS[2], ARGV[2])";

ArrayList<Object> keys = Lists.newArrayList("test_key1""test_key2");
Object execute = redisClient.execute(Object.classscriptkeys, 1, 2);
System.out.println(execute);

分布式锁

redis有个setnx命令,在key不存在时才能设置成功,因此也经常用来实现分布式锁,但是只通过setnx命令来做分布式锁是不安全的,假设线程1执行setnx成功并设置10秒后过期,线程2再执行setnx命令肯定是失败的,如果线程1执行过程中发生故障没有及时清除锁,则其他线程只能等待10秒后才能获取锁;如果线程1在10秒内还没有执行完,由于锁已经过期,导致其他线程执行setnx成功,这就存在两个线程同时拿到锁,这样的使用方式肯定是不行的,今天介绍第三方库redisson,redisson帮我们搞定了以上两种情况需要解决的问题

先引入依赖

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-boot-starter</artifactId>
</dependency>

定义一个工具类,方便使用

@Component
public class RedissonUtils {
    @Autowired
    private RedissonClient redissonClient;

    private static RedissonClient client;

    @PostConstruct
    private void init() {
        client = redissonClient;
    }

    /**
     * 通过指定key加锁
     * @param key 指定key
     * @return
     */

    public static RLock lock(String key) {
        RLock lock = client.getLock(key);
        lock.lock();
        return lock;
    }

    /**
     * 通过指定key加锁指定时间
     * @param key 指定key
     * @param expire 锁定时间,默认单位:毫秒
     * @return
     */

    public static RLock lock(String key, long expire) {
        RLock lock = client.getLock(key);
        lock.lock(expire, TimeUnit.MILLISECONDS);
        return lock;
    }

    /**
     * 通过指定key加锁指定时间
     * @param key 指定key
     * @param expire 锁定时间
     * @param unit 锁定时间单位
     * @return
     */

    public static RLock lock(String key, long expire, TimeUnit unit) {
        RLock lock = client.getLock(key);
        lock.lock(expire, unit);
        return lock;
    }

    /**
     * 通过指定key尝试加锁指定时间,如果加锁失败等待指定时间
     * @param key 指定key
     * @param wait 加锁等待时间
     * @param expire 锁定时间
     * @param unit 锁定时间单位
     * @return
     */

    public static RLock tryLock(String key, long wait, long expire, TimeUnit unit) {
        RLock lock = client.getLock(key);
        try {
            if (lock.tryLock(wait, expire, unit)) {
                return lock;
            } else {
                return null;
            }
        } catch (InterruptedException e) {
            return null;
        }
    }

    /**
     * 通过指定key解锁
     * @param key
     */

    public static void unlock(String key) {
        RLock lock = client.getLock(key);
        lock.unlock();
    }

    /**
     * 通过指定锁解锁
     * @param lock
     */

    public static void unlock(RLock lock) {
        if (lock != null) {
            lock.unlock();
        }
    }
}

获取锁

RLock lock = RedissonUtils.tryLock(LockKey.XXX, 5L10L, TimeUnit.SECONDS);
Assert.isNull(lock, "获取锁失败");

// 取到锁

try {
    // 拿到锁后执行的操作
    ...
finally {
    RedissonUtils.unlock(lock);
}

需要注意的是释放锁一定要放到finally中,防止发生异常而未释放锁

最后说明一下分布式锁也不是非要使用redisson,使用redis实现分布式锁有个致命的问题,当拿到锁后主redis还未同步到从redis时,主redis挂掉,系统切换到从redis,此时其他线程仍然可以拿到锁,这样系统中就有两个线程拿到了锁