vlambda博客
学习文章列表

Redis有序数集封装实例

当数据流量较快的时候,如果有明确的排序要求,而且数据写入非常频繁,那么Redis天然成为了优质选择。因为Redis天然的写入保护性能远超以硬盘为后端的数据库,可以节省大量成本。而且自然地,稍微参考一下文档就可以考虑用Redis内置的数据结构有序数集(sorted sets)。


Sorted sets的使用场景


Redis sorted sets使用场景非常广泛,一般来说都是作为二次索引的手段,最常见的包括时间序列分析,用户年龄统计 ,日志搜索等等。当然也包括下面重点要讲的,也是最复杂的积分排名。这些事情InnoDB都可以办到,但是面对数据浪涌的时候,不管是InnoDB还是PostgreSQL,以硬盘为后端的数据库引擎的事务处理能力会大大受限,扩容起来成本高且弹性不足。


Sorted sets的不足


Redis sorted sets用64位双精度的浮点数保存某个值的分数,然后按分数先后有序保存值序列,支持值序列任意位置的快速读取,读写时间复杂度都是O(log(n))。但是只有值和分数的设定虽然让性能飞起,但是也带来了一个不足,就是其他排序信息介入的问题,比如时间先后的问题。一个典型的场景就是,用户获得了相同的积分,那么先获得这么多积分的应该排在前面,这在Redis sorted sets里面是无法保证的。


这个问题的根源是Redis sorted sets排序标的唯一导致的。


要想解决这个问题,笨办法是把主要排序结果取出后二次排序。例如把用户最后获取积分的时间戳保存在用户的数据里面,从sorted sets里面取出排名后,结合用户数据二次排名。但是这显然还是不科学的,你无法保证边界情况是否理想,如果要取出前10名,而2到10000名用户都是同分,那sorted sets意义就没有那么大了。


另一个办法很简单,更直观,也是Redis sorted sets的正确用法,就是把次要排序值混入分数中。前面说了Redis sorted sets保存的是double,那么很自然地会想到,用整数部分保存值的分数,用小数部分保存数据更新的时间戳,那么排序的时候就可以实现先按分数,再按时间排序的目的了。甚至如果有更复杂的排序要求,则可以把分数保存在用户的字段里面,用更精细的排序值计算方案确定用户排序值。


而按照时间排序是个足够常见的场景,我们可以考虑实现一个封装,在保留时间戳的前提下,直接读写用户的积分,这样有的时候就可以省去二次索引,尤其是大量读取,例如数据统计的时候。


下面就用Python示例如何一步步实现这样的封装。


实现带时间戳的Redis sorted sets封装


这里上游客户端用的是aredis,前置的mixin可以根据实际情况实现


class _RedisMixin(object):
def _key(self, key): return NotImplemented
async def get_connection(self): return NotImplemented

class _RedisAdapterMixin(_RedisMixin):
def _key(self, key): return f"my_key_prefix:{key}"
async def get_connection(self): rda = await my_upstream.get_redis_adapter() return await rda.get_async_connection()

class _RedisZSetMixin(_RedisMixin):
pass # TODO

class Redis( _RedisAdapterMixin, _RedisZSetMixin,):
def __init__(self):
pass


我们的工作重点是实现_RedisZSetMixin,主要实现ZADDZSCOREZINCRBYZRANKZRANGE五个命令的封装。


首先要做的是时间戳值的确定方式,Python里面的timestamp是个Unix time,我们保留时间戳精度到jiffy也就是0.01秒的话,时间戳值要乘以100。同时我们需要一个参数来确定时间戳值是越早的值越大,还是越晚的值越大。越晚的值越大很简单,直接记录转换后的时间戳值就行了,但是要越晚的值越小,那我们只能用一个固定大小的值去减它。


这里我们可以设定这个固定值是1 << 40,为什么呢,因为双精度浮点数小数位只有52位,详情可以参考Double-precision floating-point format。这个固定值越大,我们可以用的时间精度越高,但是太大了前面分数能用的数位就有限了。分数值域过大,虽然整体时间排序不会错,但是精度就难以保证了。对于10000分以内来说,就1 << 40来说,0.01秒精度,datetime.fromtimestamp((1 << 40) / 100)可以安全用到2318年,0.001秒精度,datetime.fromtimestamp((1 << 42) / 1000)可以安全用到2109年,我想应该都足够了,那个时候还用不用计算机了其实都说不定,地球是不是还安全都说不定。实际生产环境下可以根据应用场景中分数的值域确定调节这个值的大小,我觉得0.01秒精度,1 << 40应该是比较理想的值。


class _RedisZSetMixin(_RedisMixin):
""" timestamp_signature +1 越晚的时间戳值越大 -1 越晚的时间戳值越小 0 不记录时间戳值(默认)
记录时间戳时,要求数据必须是整数,时间戳精度为0.01s """
timestamp_signature = NotImplemented
def _timestamp(self): timestamp = int(time.time() * 100) if self.timestamp_signature < 0: return (1 << 40) - timestamp return timestamp
def _value(self, value): if not self.timestamp_signature: return value
timestamp = self._timestamp() return float(f'{int(value)}.{timestamp}')
def _devalue(self, value): if not self.timestamp_signature: return float(value)
return int(float(value))
pass # TODO

class Redis( _RedisAdapterMixin, _RedisZSetMixin,):
def __init__(self, *, timestamp_signature=0):
self.timestamp_signature = timestamp_signature


这样,我们在写入Redis的时候自动打上时间戳,取出值的时候自动去掉时间戳,在数据读写的时候用这些函数处理数值,在更外面就不用考虑数值处理了。例如实现ZADDZSCORE:


 async def zadd(self, key, *nvs, **kwnvs): if len(nvs) and len(nvs) % 2: raise ValueError('ZADD requires name value pairs')
conn = await self.get_connection() key = self._key(key) vns = [] for name, value in zip(nvs[::2], nvs[1::2]): vns.append(self._value(value)) vns.append(name) for name, value in kwnvs.items(): vns.append(self._value(value)) vns.append(name)
return await conn.zadd(key, *vns) async def zscore(self, key, name): conn = await self.get_connection() key = self._key(key) value = await conn.zscore(key, name) return self._devalue(value or 0)


实现ZINCRBY则要注意,封装过的数值不能直接加,否则会让时间戳值参与计算,不过处理以后的行为逻辑在极端条件下可能会和原生的ZINCRBY有些许不同。


 async def zincrby(self, key, name, value): conn = await self.get_connection() value = self._value(value)
if not self.timestamp_signature: key = self._key(key) return await conn.zincrby(key, name, value)
value += await self.zscore(key, name) return await self.zadd(key, name, value)
 


ZRANKZRANGE的实现则比较简单,没有什么需要特别注意的,


 async def zrank(self, key, name, reverse=False): if reverse: return await self.zrevrank(key, name) conn = await self.get_connection() key = self._key(key) return await conn.zrank(key, name)
async def zrevrank(self, key, name): conn = await self.get_connection() key = self._key(key) return await conn.zrevrank(key, name)
async def zrange(self, key, start=0, end=-1, reverse=False): if reverse: return await self.zrevrange(key, start, end)
conn = await self.get_connection() key = self._key(key) r = await conn.zrange(key, start, end, withscores=True, score_cast_func=self._devalue)
return [(k.decode('utf-8'), v) for (k, v) in r]
async def zrevrange(self, key, start=0, end=-1): conn = await self.get_connection() key = self._key(key) r = await conn.zrevrange(key, start, end, withscores=True, score_cast_func=self._devalue)
return [(k.decode('utf-8'), v) for (k, v) in r]

那至此,一个完整的zset封装就实现了,事实上这也是我在生产环境中用过的积分排名缓存处理方式。


优化方向


最主要的还是时间戳的计算方式。如果要做到真正高可用,应该根据值域动态确定时间戳的生成方式,但是我暂时也没有比较好的实现思路。而且真正复杂环境下,还是用我说的精细确定分数计算方式,老老实实二次索引更好。二次索引的时候再加上缓存池,应该能形成更通用的处理办法。


另外就是多数据库记录的时候,注意把写入缓存包括在数据库写入事务里面,并且最后处理Redis的写入,防止出现写入Redis以后数据库回滚了,但是Redis不因此回滚造成的数据不同步。