vlambda博客
学习文章列表

flask简单的redis封装与使用​

  1. 创建文件 brdies.py(命名随意)

  2. 代码如下,每个地方都写上备注,方便理解

import _pickle as cPickle # 导入序列号包import redis # 导入redis包
try: import simplejson as jsonexcept ImportError: import json
_CACHE_KEY_PREFIX = "blog:" # 定义reids key已什么开头

def require_prefix(f): # 校验key是否已_CACHE_KEY_PREFIX 定义的值开头 def func(self, key, *args, **kws): if not key.startswith(_CACHE_KEY_PREFIX): raise KeyError('%s is not starts with %s'.format(key, _CACHE_KEY_PREFIX)) return f(self, key, *args, **kws)
return func

class BRedis: # redis 封装类 (名字随意)
def __init__(self, app=None): # 初始化redis 类 self._client = None
if app is not None: self.init_app(app)
def __getattr__(self, name): return getattr(self._client, name)
def init_app(self, app): # 初始化redis 配置 uri = app.config.get('REDIS_URL', 'redis://localhost:6379/0') max_conn = app.config.get('REDIS_MAX_CONNECTIONS', 1024) self._client = redis.StrictRedis.from_url(uri, max_connections=max_conn)
if not hasattr(app, 'extensions'): app.extensions = {} app.extensions['br'] = self
def get(self, key): r = self._client.get(key) if r: return json.loads(r) else: return None
@require_prefix def set(self, key, value, expire=0, compress=False): if value is None: return None # todo: 如果以后要优化,value可以压缩, 暂时compress那个参数没有使用 v = json.dumps(value) if expire: return self._client.setex(key, expire, v) else: return self._client.set(key, v)
@require_prefix def add(self, key, value, expire=0): if value is None and expire > 0: self._client.expire(key, expire) return self.get(key)
if value: r = self._client.incrby(key, value) if expire: self._client.expire(key, expire) return r
@require_prefix def lpush(self, name, *values): return self._client.lpush(name, *values)
@require_prefix def lpop(self, name): return self._client.pop(name)
@require_prefix def llen(self, name): return self._client.llen(name)
@require_prefix def lrange(self, name, start, end): return self._client.lrange(name, start, end)
@require_prefix def lrem(self, name, count, value): return self._client.lrem(name, count, value)
def delete(self, *keys): if len(keys) < 1: return return self._client.delete(*keys)
def flushdb(self):        self._client.flushdb()
br = BRedis() # 实例化类
  1. app 的配置文件,redis的配置如下

DEFAULT_CONFIGS = { 'REDIS_URL': 'redis://:@127.0.0.1:6379/1', 'REDIS_MAX_CONNECTIONS': 1024,}
  1. flask app初始化redis

from **.bredis import br # 导入实例化变量brdef create_app(): """初始化app""" app = Flask(__name__) ... br.init_app(app) return app

5.redis的使用 这里举个简单的例子 邮箱验证码的发送接口

  • 发送接口如下

from blog.corelibs.bredis import br # 要导入redis实例化的变量# 获取邮箱验证码@bp.route('/getEmailCode', methods=['POST'])def get_email_code(): parsed_data = request.json token = request.headers.get('token')  # BLOG_USER_INFO = 'blog:user_token:{0}' redis的key定义以及 过期时间统一写到一个文件中,方便管理 # 直接get 对应的key就能正常获取值了 user_info = br.get(BLOG_USER_INFO.format(token)) user_id = user_info.get('id') code = str(random.randint(0, 999999))    code = code.zfill(6) # BLOG_EMAIL_CODE = 'blog:email_code:{0}' # 同理set 写入 redis # CACHE_THREE_HOUR = 60 * 60 * 3 缓存过期时间 # 格式是 key value 过期时间 br.set(BLOG_EMAIL_CODE.format(user_id), code, CACHE_WEEK)  email = user_info.email # 发送邮箱,发送邮箱的逻辑代码下次发出来 send_mail(title='邮箱认证', to_email=email, code=code, type='checkEmail') return partner_success()


  1. flask 使用reids还是相对简单的,当然我讲解的也不是太详细,如果有什么不懂的欢迎到下发留言