vlambda博客
学习文章列表

实战:1 小时上线 Flask 短信微服务

优质文章,第一时间送达!

前言

Flask 是 Python 开发的轻量 Web 框架,有多轻量呢?10 行以内就可以开发一个 Web 服务,不过这只能用来做演示,今天我就用 1 个小时来开发一个用于生产环境的短信微服务。以下是我们生产环境脱敏后直接可用的服务代码,绝非示例教程。

为什么要开发短信微服务?

短信服务我们都是依赖公有云的实现,通过公有云的 API 直接调用,那为什么还要自己封装呢?

  • 因为微服务环境下我们要减少代码的重复量,如果有多个微服务需要使用短信服务,那就要复制多遍代码,把公有云的 API 包装成我们自己的微服务 API 可以将代码的复制减少为一行 Http 请求。
  • 调用 API 的 accesskey 和 secret 不需要复制给多个服务,减少安全风险。
  • 可以根据我们的业务需求加入共用的业务逻辑。

多了一层调用有没有性能影响?

多了一层调用是多了一个网络请求,但是影响微乎其微。我们不可能因为面向对象的方式太多调用就写逐行执行的代码吧。

  • 公有云短信服务本就是异步调用,错误处理也是异步回调的方式。
  • 微服务内部网络的调用应该是非常快的,可以同虚拟机部署或者同机房部署。

开始

首先我们建立项目的骨架。

为什么要建立项目的骨架呢?

因为 Flask 太过于轻量,所以例如配置、路由等规范需要由开发人员自己定义。一般成熟的开发团队都有自己的一套开发骨架,要统一配置,统一开发规范,统一集成相关系统等。我这里就分享一套适用于生产环境的非常简单的开发骨架。

新建一个项目目录,然后在里面建立 app 和 config 两个 Python 目录。app 用于存放业务相关代码,config 用于存放配置相关代码。

配置类

config/config.py 中添加如下内容,配置的设计因人而异,Flask 也没有做任何限制。我这里的设计是使用 BaseConfig 作为配置基类,存放所有共用的配置,而不同的环境使用不同的配置子类,子类只需要修改特定的值就可以,便于查看。

如果配置的值需要在运行是注入(如数据库连接等),则可以使用环境变量的方式(如下面的 SECRET_KEY),我同时使用 or 提供了没有环境变量的默认值。

import os


class BaseConfig:
"""
配置基类,用于存放共用的配置
"""

SECRET_KEY = os.environ.get('SECRET_KEY') or os.urandom(16)
DEBUG = False
TESTING = False


class ProductionConfig(BaseConfig):
"""
生产环境配置类,用于存放生产环境的配置
"""

pass


class DevelopmentConfig(BaseConfig):
"""
开发环境配置类,用于存放开发环境的配置
"""

DEBUG = True


class TestingConfig(BaseConfig):
"""
测试环境配置类,用于存放开发环境的配置
"""

DEBUG = True
TESTING = True


registered_app = [
'app'
]

config_map = {
'development': DevelopmentConfig,
'production': ProductionConfig,
'testing': TestingConfig
}

至于后面的 registered_appconfig_map 有什么用?可以做自动注入,这个我后面会讲。

然后我加一个日志的配置,日志的配置非常重要,不同的开发团队往往有一套规范的日志配置模版,一般不会改变,所以可以直接定义在代码里,也可以用配置文件的方式。

config/logger.py

from logging.config import dictConfig


def config_logger(enable_console_handler=True, enable_file_handler=True, log_file='app.log', log_level='ERROR',
log_file_max_bytes=5000000, log_file_max_count=5)
:

# 定义输出到控制台的日志处理器
console_handler = {
'class': 'logging.StreamHandler',
'formatter': 'default',
'level': log_level,
'stream': 'ext://flask.logging.wsgi_errors_stream'
}
# 定义输出到文件的日志处理器
file_handler = {
'class': 'logging.handlers.RotatingFileHandler',
'formatter': 'detail',
'filename': log_file,
'level': log_level,
'maxBytes': log_file_max_bytes,
'backupCount': log_file_max_count
}
# 定义日志输出格式
default_formatter = {
'format': '[%(asctime)s] %(levelname)s in %(module)s: %(message)s'
}
detail_formatter = {
'format': '[%(asctime)s] %(levelname)s in %(module)s: %(message)s'
}
handlers = []
if enable_console_handler:
handlers.append('console')
if enable_file_handler:
handlers.append('file')
d = {
'version': 1,
'formatters': {
'default': default_formatter,
'detail': detail_formatter
},
'handlers': {
'console': console_handler,
'file': file_handler
},
'root': {
'level': log_level,
'handlers': handlers
}
}
dictConfig(d)

上面就是一个典型的 Python 日志配置方法,把可变的部分定义为参数(日志文件、级别等),定义了两个日志处理器(文件和控制台),使用时只需要调用这个方法即可。

应用类

定义好配置,我们就开始创建我们的 Flask 应用了。用过 Flask 的同学知道,创建 Flask 应用只需要一行代码。

app = Flask(__name__)

但这不是生产可用的方式,为了生产和测试方便,我们需要用一个方法获取这个 app 对象。

def create_app(conf=None):
# initialize logger
register_logger()
# check instance path
instance_path = os.environ.get('INSTANCE_PATH') or None
# create and configure the app
app = Flask(__name__, instance_path=instance_path)
if not conf:
conf = get_config_object()
app.config.from_object(conf)
# ensure the instance folder exists
if app.instance_path:
try:
os.makedirs(app.instance_path)
except OSError:
pass
# register app
register_app(app)
return app

这里做了几个事情,一是注册日志类,二是载入配置对象,三是创建 instance 目录,四是注册应用业务。

为什么注册日志要放在第一行?

不少开发人员会把日志的配置放在配置类里,这个没太大问题,只是越早注册日志,你的日志就会越早开始收集。如果载入配置类后才配置日志,那如果创建 app 时报错就无法被我们定义的日志收集器收集到了。

注册日志的方法可以这样写

def register_logger():
log_level = os.environ.get('LOG_LEVEL') or 'INFO'
log_file = os.environ.get('LOG_FILE') or 'app.log'
config_logger(
enable_console_handler=True,
enable_file_handler=True,
log_level=log_level,
log_file=log_file
)

我还是从环境变量里获取配置,并调用之前的配置函数配置日志。

载入配置对象的方法。

def get_config_object(env=None):
if not env:
env = os.environ.get('FLASK_ENV')
else:
os.environ['FLASK_ENV'] = env
if env in config.config_map:
return config.config_map[env]
else:
# set default env if not set
env = 'production'
return config.config_map[env]

FLASK_ENV 这个环境变量获取运行的环境,然后根据之前配置类里的 config_map 获取对应的配置类,实现配置类的载入。

最后就是注册我们的业务代码。

def register_app(app):
for a in config.registered_app:
module = importlib.import_module(a)
if hasattr(module, 'register'):
getattr(module, 'register')(app)

这里就用到了配置类里的 registered_app 列表,这里定义了要载入的模块,对于微服务来说,一般只有一个模块。

我这里还需要 app/__init__.py 文件里有个 register 方法,这个方法来执行具体的注册操作,例如注册 Flask 蓝图。

def register(app):
api_bp = Blueprint('api', __name__, url_prefix='/api')
app.register_blueprint(api_bp)

为什么要搞个 register 方法?

因为每个业务模块有自己的路由、ORM 或蓝图等,这是业务自己的代码,必须与骨架解耦。用一个特定的方法作为规范一是便于自定义的代码扩展,二是便于团队理解,不需要灵活的配置,这里约定大于配置。当然你可以有自己的另一套实现。

我把上面的代码整理为 application.py 模块

import os
import importlib
from flask import Flask
from config.logger import config_logger
from config import config


def register_logger():
log_level = os.environ.get('LOG_LEVEL') or 'INFO'
log_file = os.environ.get('LOG_FILE') or 'app.log'
config_logger(
enable_console_handler=True,
enable_file_handler=True,
log_level=log_level,
log_file=log_file
)


def register_app(app):
for a in config.registered_app:
module = importlib.import_module(a)
if hasattr(module, 'register'):
getattr(module, 'register')(app)


def get_config_object(env=None):
if not env:
env = os.environ.get('FLASK_ENV')
else:
os.environ['FLASK_ENV'] = env
if env in config.config_map:
return config.config_map[env]
else:
# set default env if not set
env = 'production'
return config.config_map[env]


def create_app_by_config(conf=None):
# initialize logger
register_logger()
# check instance path
instance_path = os.environ.get('INSTANCE_PATH') or None
# create and configure the app
app = Flask(__name__, instance_path=instance_path)
if not conf:
conf = get_config_object()
app.config.from_object(conf)
# ensure the instance folder exists
if app.instance_path:
try:
os.makedirs(app.instance_path)
except OSError:
pass
# register app
register_app(app)
return app


def create_app(env=None):
conf = get_config_object(env)
return create_app_by_config(conf)

这里提供了 create_app_by_config 方法用于从配置类直接创建 app 对象,主要是便于单元测试时直接注入特定的配置类。

我们的骨架基本上就成型了,包括了最基础的配置类、日志配置和应用注册机制。然后就可以运行我们的 Flask 应用了。

开发测试

Flask 提供了 flask run 命令来运行测试应用,不过还需要提供 FLASK_APPFLASK_ENV 两个环境变量来启动,这步我们也可以简化下。

编写 run.py

import click
from envparse import env
from application import create_app


@click.command()
@click.option('-h', '--host', help='Bind host', default='localhost', show_default=True)
@click.option('-p', '--port', help='Bind port', default=8000, type=int, show_default=True)
@click.option('-e', '--env', help='Running env, override environment FLASK_ENV.', default='development', show_default=True)
@click.option('-f', '--env-file', help='Environment from file', type=click.Path(exists=True))
def main(**kwargs):
if kwargs['env_file']:
env.read_envfile(kwargs['env_file'])
app = create_app(kwargs['env'])
app.run(host=kwargs['host'], port=kwargs['port'])


if __name__ == '__main__':
main()

这里用 click 创建了一个简单的命令行脚本,可以通过命令行参数直接启动一个测试用服务。当然默认参数直接可用,使用 python run.py 或者 IDE 里右键运行即可。同时,还提供了 env-file 选项,用户可提供环境变量的文件。

为什么要使用环境变量文件?

因为生产环境和开发环境的许多配置是不同的,例如公有云密钥,数据库连接等,这些信息是绝对不能提交到 git 等版本控制软件的,所以我们可以创建一个 .env 文件如下

ACCESS_KEY=xxx
ACCESS_SECRET=xxx

把这个文件加入 gitignore 中,然后使用 --env-file 载入这个文件就可以在开发环境中直接使用了,而不需要每次都手动输入了。

部署

生产环境我们肯定不会使用测试的方式启动,需要类似 gunicorn 等工具启动一个正式服务,我们也可以使用 Docker 等容器技术把生产部署过程自动化。

编写 server.py

from application import create_app

app = create_app()

这里很简单,创建一个 Flask app 对象即可,然后可以通过 gunicorn server:app 启动。

编写 requirements.txt 文件,用于自动安装依赖。后期可以把用到的依赖写进去。

flask
flask-restful
click
envparse
gunicorn

编写 Dockerfile 文件

FROM python:3.8

COPY . /opt
WORKDIR /opt
RUN pip install --no-cache-dir -r requirements.txt
CMD ["gunicorn", "-b", "0.0.0.0:80", "server:app"]

然后就可以使用如下命令用 Docker 启动服务容器了。

docker build -t myapp:0.1 .
docker run -d --name myapp -p 80:80 myapp:0.1

至此,一个简单的 Flask 骨架就完成了,大家可以在下面看到完整的项目。

Github Flask 骨架示例

编写业务

上面大概用了 20 分钟搞了个 Flask 的骨架,对于开发团队来说,骨架只要开发一次,后续的项目直接克隆就行了。下面我们就来编写具体的发送短信业务。

使用哪个公有云?

实际业务中我们可能使用单一一个云,也可能混合使用多个云。在我们的实际业务中,具体用哪个公有云的服务,不是取决于我们,而是取决于谁的价格低,谁的优惠多,谁的功能强。😄

所以我们可以提取短信业务的共性写一个抽象类。短信服务的共同点主要有短信模版,签名,接收人,模版参数等。

一个简单的抽象类

class SmsProvider:

def __init__(self, **kwargs):
self.conf = kwargs

def send(self, template, receivers, **kwargs):
pass

然后有基于阿里云的实现,以下代码根据官方示例修改

class AliyunSmsProvider(SmsProvider):

def send(self, template, receivers, **kwargs):
from aliyunsdkcore.request import CommonRequest
client = self.get_client(self.conf['app_key'], self.conf['app_secret'], self.conf['region_id'])
request = CommonRequest()
request.set_accept_format('json')
request.set_domain(self.conf['domain'])
request.set_method('POST')
request.set_protocol_type('https')
request.set_version(self.conf['version'])
request.set_action_name('SendSms')
request.add_query_param('RegionId', self.conf['region_id'])
request.add_query_param('PhoneNumbers', receivers)
request.add_query_param('SignName', self.conf['sign_name'])
request.add_query_param('TemplateCode', self.get_template_id(template))
request.add_query_param('TemplateParam', self.build_template_params(**kwargs))
return client.do_action_with_exception(request)

def get_template_id(self, name):
if name in self.conf['template_id_map']:
return self.conf['template_id_map'][name]
else:
raise ValueError('no template {} found!'.format(name))

@staticmethod
def get_client(app_key, app_secret, region_id):
from aliyunsdkcore.client import AcsClient
return AcsClient(app_key, app_secret, region_id)

@staticmethod
def build_template_params(**kwargs):
if 'params' in kwargs and kwargs['params']:
return json.dumps(kwargs['params'])
else:
return ''

然后在 BaseConfig 添加以下配置,是一些公有云 API 的基本配置,需要在运行是通过环境变量载入,其中 template_id_map 里的内容是模版的名称和对应的 ID,用于区分不同的短信模版,如验证码,推广等,名称作为参数供调用方使用,避免了直接传递 ID。

    # SMS config
SMS_CONF = {
'aliyun': {
'provider_cls': 'app.sms.AliyunSmsProvider',
'config': {
'domain': 'dysmsapi.aliyuncs.com',
'version': os.environ.get('ALIYUN_SMS_VERSION') or '2017-05-25',
'app_key': os.environ.get('ALIYUN_SMS_APP_KEY'),
'app_secret': os.environ.get('ALIYUN_SMS_APP_SECRET'),
'region_id': os.environ.get('ALIYUN_SMS_REGION_ID'),
'sign_name': os.environ.get('ALIYUN_SMS_SIGN_NAME'),
'template_id_map': {
'captcha': 'xxx'
}
}
}
}

其中模版 ID,签名,App Key,App Secret 需要在阿里云控制台获取,模版和签名需要审核后才能获得。

同样的方法可以添加华为云的 API,也可直接从示例修改,只是华为云暂时没有 SDK,需要通过 API 调用,大同小异。

class HuaweiSmsProvider(SmsProvider):

def send(self, template, receivers, **kwargs):
header = {'Authorization': 'WSSE realm="SDP",profile="UsernameToken",type="Appkey"',
'X-WSSE': self.build_wsse_header(self.conf['app_key'], self.conf['app_secret'])}
form_data = {
'from': self.conf['sender'],
'to': receivers,
'templateId': self.get_template_id(template),
'templateParas': self.build_template_params(**kwargs),
}
r = requests.post(self.conf['url'], data=form_data, headers=header, verify=False)
return r

def get_template_id(self, name):
if name in self.conf['template_id_map']:
return self.conf['template_id_map'][name]
else:
raise ValueError('no template {} found!'.format(name))

@staticmethod
def build_wsse_header(app_key, app_secret):
now = time.strftime('%Y-%m-%dT%H:%M:%SZ')
nonce = str(uuid.uuid4()).replace('-', '')
digest = hashlib.sha256((nonce + now + app_secret).encode()).hexdigest()
digest_base64 = base64.b64encode(digest.encode()).decode()
return 'UsernameToken Username="{}",PasswordDigest="{}",Nonce="{}",Created="{}"'.format(app_key, digest_base64, nonce, now)

@staticmethod
def build_template_params(**kwargs):
if 'params' in kwargs and kwargs['params']:
return json.dumps(list(kwargs['params'].values()))
else:
return ''

也是添加配置,最后的 BaseConfig 如下所示,其中 SMS_PROVIDER 配置指定 SMS_CONF 的键,指定我们现在使用的是哪个公有云服务:

class BaseConfig:
SECRET_KEY = os.environ.get('SECRET_KEY') or os.urandom(16)
DEBUG = False
TESTING = False

# SMS config
SMS_PROVIDER = os.environ.get('SMS_PROVIDER')
SMS_CONF = {
'aliyun': {
'provider_cls': 'app.sms.AliyunSmsProvider',
'config': {
'domain': 'dysmsapi.aliyuncs.com',
'version': os.environ.get('ALIYUN_SMS_VERSION') or '2017-05-25',
'app_key': os.environ.get('ALIYUN_SMS_APP_KEY'),
'app_secret': os.environ.get('ALIYUN_SMS_APP_SECRET'),
'region_id': os.environ.get('ALIYUN_SMS_REGION_ID'),
'sign_name': os.environ.get('ALIYUN_SMS_SIGN_NAME'),
'template_id_map': {
'captcha': 'xxx'
}
}
},
'huawei': {
'provider_cls': 'app.sms.HuaweiSmsProvider',
'config': {
'url': os.environ.get('HUAWEI_URL'),
'app_key': os.environ.get('HUAWEI_SMS_APP_KEY'),
'app_secret': os.environ.get('HUAWEI_SMS_APP_SECRET'),
'sender': os.environ.get('HUAWEI_SMS_SENDER_ID'),
'template_id_map': {
'captcha': 'xxx'
}
}
}
}

其他的公有云也可以通过类似的方式添加。

然后我们添加一个方法,获取 Provider 的单例对象。这里使用 Flask 的 g 对象,把我们的 Provider 对象注册成全局的单例对象。

from flask import g, current_app
from werkzeug.utils import import_string


def create_sms():
provider = current_app.config['SMS_PROVIDER']
sms_config = current_app.config['SMS_CONF']
if provider in sms_config:
cls = sms_config[provider]['provider_cls']
conf = sms_config[provider]['config']
sms = import_string(cls)(**conf)
return sms
return None


def get_sms():
if 'sms' not in g:
g.sms = create_sms()
return g.sms

这些都完成后,就可以添加一个视图类,这里用到了 Flask-Restful 库,生成 API 视图。

app/api/sms.py

import logging
from flask_restful import Resource, reqparse
from app.sms import get_sms


# 定义参数,参考 https://flask-restful.readthedocs.io/en/latest/reqparse.html
parser = reqparse.RequestParser(bundle_errors=True)
parser.add_argument('receivers', help='Comma separated receivers.', required=True)
parser.add_argument('template', help='Notification template name.', required=True)
parser.add_argument('params', help='Notification template params.', type=dict)


class Sms(Resource):

def post(self):
args = parser.parse_args()
sms = get_sms()
try:
res = sms.send(**args)
except Exception as e:
logging.error(e)
return {'message': 'failed'}, 500
if res.status_code < 300:
return {'message': 'send'}, 200
else:
logging.error('Send sms failed with {}'.format(res.text))
return {'message': 'failed'}, 500

然后我们定义路由。

app/api/__init__.py

from flask import Blueprint
from flask_restful import Api
from app.api.health import Health
from app.api.sms import Sms


api_bp = Blueprint('api', __name__, url_prefix='/api')
api = Api(api_bp)

api.add_resource(Sms, '/sms')

最后记得在我们的应用 app 模块里注册蓝图。

app/__init__.py

from app.api import api_bp


# register blueprint
def register(app):
app.register_blueprint(api_bp)

至此,我们的短信微服务就完成了。可以通过我们上面的方法进行测试和部署。

其中我们定义了一些环境变量,在测试时可通过环境变量文件载入,运行时可通过容器的环境变量载入。放在 instance 目录下是因为 instance 是我们默认的 Flask 实例目录,这个目录是不会提交到 git 里的。

instance/env

SMS_PROVIDER=huawei
HUAWEI_URL=https://rtcsms.cn-north-1.myhuaweicloud.com:10743/sms/batchSendSms/v1
HUAWEI_SMS_APP_KEY=aaa
HUAWEI_SMS_APP_SECRET=bbb
HUAWEI_SMS_SENDER_ID=ccc

运行时通过环境变量载入

docker run -d --name sms -p 80:80 \
-e "SMS_PROVIDER=aliyun" \
-e "ALIYUN_SMS_APP_KEY=aaa" \
-e "ALIYUN_SMS_APP_SECRET=bbb" \
-e "ALIYUN_SMS_REGION_ID=cn-hangzhou" \
-e "ALIYUN_SMS_SIGN_NAME=ccc" \
myapp:0.1

完整的项目可在这里查看。

示例项目代码

然后我们可以做如下测试,注意修改配置中的模版 ID 和环境变量,并根据自己的模版参数修改 params。

实战:1 小时上线 Flask 短信微服务

结语

对于老鸟来说,开发这个项目,可能根本不需要 1 个小时。对于规范的线上项目来说,还是缺少一些东西的,例如单元测试。大家的生产 API 服务是怎么样的?欢迎讨论!

这里的短信微服务只是抛砖引玉,其实所有的公有云 API 服务都可以一样的套用。1 小时上线一个微服务,剩下 7 小时划水刷掘金😄。

参考

  • palletsprojects.com/p/flask/
  • flask-restful.readthedocs.io/en/latest/
  • github.com/wwtg99/flas…
  • 阿里云短信 API
  • 华为云短信 API
  • github.com/wwtg99/sms-…
  • zhuanlan.zhihu.com/p/104380919


回复下方「关键词」,获取优质资源


回复关键词「 pybook03」,立即获取主页君与小伙伴一起翻译的《Think Python 2e》电子版

回复关键词「入门资料」,立即获取主页君整理的 10 本 Python 入门书的电子版

回复关键词「m」,立即获取Python精选优质文章合集

回复关键词「book 数字」,将数字替换成 0 及以上数字,有惊喜好礼哦~


题图:pexels,CC0 授权。

好文章,我在看❤️