flask简单的redis封装与使用
创建文件 brdies.py(命名随意)
代码如下,每个地方都写上备注,方便理解
import _pickle as cPickle # 导入序列号包import redis # 导入redis包try:import simplejson as jsonexcept ImportError:import json_CACHE_KEY_PREFIX = "blog:" # 定义reids key已什么开头def require_prefix(f): # 校验key是否已_CACHE_KEY_PREFIX 定义的值开头def func(self, key, *args, **kws):if not key.startswith(_CACHE_KEY_PREFIX):raise KeyError('%s is not starts with %s'.format(key, _CACHE_KEY_PREFIX))return f(self, key, *args, **kws)return funcclass BRedis: # redis 封装类 (名字随意)def __init__(self, app=None): # 初始化redis 类self._client = Noneif app is not None:self.init_app(app)def __getattr__(self, name):return getattr(self._client, name)def init_app(self, app): # 初始化redis 配置uri = app.config.get('REDIS_URL', 'redis://localhost:6379/0')max_conn = app.config.get('REDIS_MAX_CONNECTIONS', 1024)self._client = redis.StrictRedis.from_url(uri, max_connections=max_conn)if not hasattr(app, 'extensions'):app.extensions = {}app.extensions['br'] = selfdef get(self, key):r = self._client.get(key)if r:return json.loads(r)else:return None@require_prefixdef set(self, key, value, expire=0, compress=False):if value is None:return None# todo: 如果以后要优化,value可以压缩, 暂时compress那个参数没有使用v = json.dumps(value)if expire:return self._client.setex(key, expire, v)else:return self._client.set(key, v)@require_prefixdef add(self, key, value, expire=0):if value is None and expire > 0:self._client.expire(key, expire)return self.get(key)if value:r = self._client.incrby(key, value)if expire:self._client.expire(key, expire)return r@require_prefixdef lpush(self, name, *values):return self._client.lpush(name, *values)@require_prefixdef lpop(self, name):return self._client.pop(name)@require_prefixdef llen(self, name):return self._client.llen(name)@require_prefixdef lrange(self, name, start, end):return self._client.lrange(name, start, end)@require_prefixdef lrem(self, name, count, value):return self._client.lrem(name, count, value)def delete(self, *keys):if len(keys) < 1:returnreturn self._client.delete(*keys)def flushdb(self):self._client.flushdb()br = BRedis() # 实例化类
app 的配置文件,redis的配置如下
DEFAULT_CONFIGS = {'REDIS_URL': 'redis://:@127.0.0.1:6379/1','REDIS_MAX_CONNECTIONS': 1024,}
flask app初始化redis
from **.bredis import br # 导入实例化变量brdef create_app():"""初始化app"""app = Flask(__name__)...br.init_app(app)return app
5.redis的使用 这里举个简单的例子 邮箱验证码的发送接口
发送接口如下
from blog.corelibs.bredis import br@bp.route('/getEmailCode', methods=['POST'])def get_email_code():parsed_data = request.jsontoken = request.headers.get('token')user_info = br.get(BLOG_USER_INFO.format(token))user_id = user_info.get('id')code = str(random.randint(0, 999999))code = code.zfill(6)br.set(BLOG_EMAIL_CODE.format(user_id), code, CACHE_WEEK)email = user_info.emailsend_mail(title='邮箱认证', to_email=email, code=code, type='checkEmail')return partner_success()
flask 使用reids还是相对简单的,当然我讲解的也不是太详细,如果有什么不懂的欢迎到下发留言
