flask简单的redis封装与使用
创建文件 brdies.py(命名随意)
代码如下,每个地方都写上备注,方便理解
import _pickle as cPickle # 导入序列号包
import redis # 导入redis包
try:
import simplejson as json
except ImportError:
import json
_CACHE_KEY_PREFIX = "blog:" # 定义reids key已什么开头
def require_prefix(f): # 校验key是否已_CACHE_KEY_PREFIX 定义的值开头
def func(self, key, *args, **kws):
if not key.startswith(_CACHE_KEY_PREFIX):
raise KeyError('%s is not starts with %s'.format(key, _CACHE_KEY_PREFIX))
return f(self, key, *args, **kws)
return func
class BRedis: # redis 封装类 (名字随意)
def __init__(self, app=None): # 初始化redis 类
self._client = None
if app is not None:
self.init_app(app)
def __getattr__(self, name):
return getattr(self._client, name)
def init_app(self, app): # 初始化redis 配置
uri = app.config.get('REDIS_URL', 'redis://localhost:6379/0')
max_conn = app.config.get('REDIS_MAX_CONNECTIONS', 1024)
self._client = redis.StrictRedis.from_url(uri, max_connections=max_conn)
if not hasattr(app, 'extensions'):
app.extensions = {}
app.extensions['br'] = self
def get(self, key):
r = self._client.get(key)
if r:
return json.loads(r)
else:
return None
@require_prefix
def set(self, key, value, expire=0, compress=False):
if value is None:
return None
# todo: 如果以后要优化,value可以压缩, 暂时compress那个参数没有使用
v = json.dumps(value)
if expire:
return self._client.setex(key, expire, v)
else:
return self._client.set(key, v)
@require_prefix
def add(self, key, value, expire=0):
if value is None and expire > 0:
self._client.expire(key, expire)
return self.get(key)
if value:
r = self._client.incrby(key, value)
if expire:
self._client.expire(key, expire)
return r
@require_prefix
def lpush(self, name, *values):
return self._client.lpush(name, *values)
@require_prefix
def lpop(self, name):
return self._client.pop(name)
@require_prefix
def llen(self, name):
return self._client.llen(name)
@require_prefix
def lrange(self, name, start, end):
return self._client.lrange(name, start, end)
@require_prefix
def lrem(self, name, count, value):
return self._client.lrem(name, count, value)
def delete(self, *keys):
if len(keys) < 1:
return
return self._client.delete(*keys)
def flushdb(self):
self._client.flushdb()
br = BRedis() # 实例化类
app 的配置文件,redis的配置如下
DEFAULT_CONFIGS = {
'REDIS_URL': 'redis://:@127.0.0.1:6379/1',
'REDIS_MAX_CONNECTIONS': 1024,
}
flask app初始化redis
from **.bredis import br # 导入实例化变量br
def create_app():
"""初始化app"""
app = Flask(__name__)
...
br.init_app(app)
return app
5.redis的使用 这里举个简单的例子 邮箱验证码的发送接口
发送接口如下
from blog.corelibs.bredis import br
@bp.route('/getEmailCode', methods=['POST'])
def get_email_code():
parsed_data = request.json
token = request.headers.get('token')
user_info = br.get(BLOG_USER_INFO.format(token))
user_id = user_info.get('id')
code = str(random.randint(0, 999999))
code = code.zfill(6)
br.set(BLOG_EMAIL_CODE.format(user_id), code, CACHE_WEEK)
email = user_info.email
send_mail(title='邮箱认证', to_email=email, code=code, type='checkEmail')
return partner_success()
flask 使用reids还是相对简单的,当然我讲解的也不是太详细,如果有什么不懂的欢迎到下发留言