Flask 工厂模式注册初始化logger
工厂模式的意义
引用官方解释:
If you are already using packages and blueprints for your application (Modular Applications with Blueprints) there are a couple of really nice ways to further improve the experience. A common pattern is creating the application object when the blueprint is imported. But if you move the creation of this object into a function, you can then create multiple instances of this app later.
So why would you want to do this?
Testing. You can have instances of the application with different settings to test every case.
Multiple instances. Imagine you want to run different versions of the same application. Of course you could have multiple instances with different configs set up in your webserver, but if you use factories, you can have multiple instances of the same application running in the same application process which can be handy.
大概意思是:
为了方便测试,你可以随心所欲的切换不同的配置实例来测试你的用例。
相同的应用,你可以利用不同的配置实例来进行初始化,达到运行不同版本应用的目的。
官方实例的应用(autoapp.py):
def create_app(config_filename):
app = Flask(__name__)
app.config.from_pyfile(config_filename)
from yourapplication.model import db
db.init_app(app)
from yourapplication.views.admin import admin
from yourapplication.views.frontend import frontend
app.register_blueprint(admin)
app.register_blueprint(frontend)
return app
初始化
为了能让我们的日志初始化添加如下代码(autoapp.py):
def register_logging(app):
app.config.setdefault("LOG_PATH", "application.log")
log_formatter = "%(asctime)s [%(thread)d:%(threadName)s] %(filename)s:%(module)s:%(funcName)s in %(lineno)d] [%(levelname)s]: %(message)s"
app.config.setdefault("LOG_FORMATTER", log_formatter)
app.config.setdefault("LOG_MAX_BYTES", 50 * 1024 * 1024)
app.config.setdefault("LOG_BACKUP_COUNT", 10)
app.config.setdefault("LOG_INTERVAL", 1)
app.config.setdefault("LOG_WHEN", "D")
app.config.setdefault("LOG_LEVEL", "INFO")
formatter = logging.Formatter(app.config["LOG_FORMATTER"])
# 将日志输出到文件
# 指定间隔时间自动生成文件的处理器
# 实例化TimedRotatingFileHandler
# interval是时间间隔,
# backupCount是备份文件的个数,如果超过这个个数,就会自动删除
# when是间隔的时间单位,单位有以下几种:
# S 秒
# M 分
# H 小时、
# D 天、
# W 每星期(interval==0时代表星期一)
# midnight 每天凌晨
timed_rotating_file_handler = TimedRotatingFileHandler(
filename=app.config["LOG_PATH"],
interval=app.config["LOG_INTERVAL"],
when=app.config["LOG_WHEN"],
backupCount=app.config["LOG_BACKUP_COUNT"],
encoding="utf-8",
)
timed_rotating_file_handler.setFormatter(formatter) # 设置文件里写入的格式
timed_rotating_file_handler.setLevel(app.config["LOG_LEVEL"])
# StreamHandler
stream_handler = StreamHandler()
stream_handler.setFormatter(formatter)
stream_handler.setLevel(app.config["LOG_LEVEL"])
# SMTPHandler
mail_handler = DelaySMTPHandler(
mailhost=app.config["MAILHOST"],
credentials=app.config["CREDENTIALS"],
fromaddr=app.config["FROMADDR"],
toaddrs=app.config["TOADDRS"],
subject=app.config["SUBJECT"],
)
mail_handler.setLevel(logging.ERROR)
mail_handler.setFormatter(formatter)
# 删除默认的handler
# app.logger.removeHandler(default_handler)
# 设置logger
for logger in (
app.logger,
logging.getLogger("sqlalchemy"),
logging.getLogger("werkzeug"),
):
logger.addHandler(stream_handler)
logger.addHandler(timed_rotating_file_handler)
if os.getenv("FLASK_ENV") == "production":
logger.addHandler(mail_handler)
# set logger for elk
# stash_handler = logstash.LogstashHandler(
# app.config.get('ELK_HOST'),
# app.config.get('ELK_PORT')
# )
# root_logger.addHandler(stashHandler)
使用log
from flask import current_app
current_app.logger.info("hello world")
这样做的好处
可以多环境随意切换配置,而不用更改代码
日志配置在
setting.py
统一配置管理
完整代码
autoapp.py:
def create_app(config_filename):
app = Flask(__name__)
app.config.from_pyfile(config_filename)
from yourapplication.model import db
db.init_app(app)
register_logging(app)
from yourapplication.views.admin import admin
from yourapplication.views.frontend import frontend
app.register_blueprint(admin)
app.register_blueprint(frontend)
return app
def register_logging(app):
app.config.setdefault("LOG_PATH", "application.log")
log_formatter = "%(asctime)s [%(thread)d:%(threadName)s] %(filename)s:%(module)s:%(funcName)s in %(lineno)d] [%(levelname)s]: %(message)s"
app.config.setdefault("LOG_FORMATTER", log_formatter)
app.config.setdefault("LOG_MAX_BYTES", 50 * 1024 * 1024)
app.config.setdefault("LOG_BACKUP_COUNT", 10)
app.config.setdefault("LOG_INTERVAL", 1)
app.config.setdefault("LOG_WHEN", "D")
app.config.setdefault("LOG_LEVEL", "INFO")
formatter = logging.Formatter(app.config["LOG_FORMATTER"])
# 将日志输出到文件
# 指定间隔时间自动生成文件的处理器
# 实例化TimedRotatingFileHandler
# interval是时间间隔,
# backupCount是备份文件的个数,如果超过这个个数,就会自动删除
# when是间隔的时间单位,单位有以下几种:
# S 秒
# M 分
# H 小时、
# D 天、
# W 每星期(interval==0时代表星期一)
# midnight 每天凌晨
timed_rotating_file_handler = TimedRotatingFileHandler(
filename=app.config["LOG_PATH"],
interval=app.config["LOG_INTERVAL"],
when=app.config["LOG_WHEN"],
backupCount=app.config["LOG_BACKUP_COUNT"],
encoding="utf-8",
)
timed_rotating_file_handler.setFormatter(formatter) # 设置文件里写入的格式
timed_rotating_file_handler.setLevel(app.config["LOG_LEVEL"])
# StreamHandler
stream_handler = StreamHandler()
stream_handler.setFormatter(formatter)
stream_handler.setLevel(app.config["LOG_LEVEL"])
# SMTPHandler
mail_handler = DelaySMTPHandler(
mailhost=app.config["MAILHOST"],
credentials=app.config["CREDENTIALS"],
fromaddr=app.config["FROMADDR"],
toaddrs=app.config["TOADDRS"],
subject=app.config["SUBJECT"],
)
mail_handler.setLevel(logging.ERROR)
mail_handler.setFormatter(formatter)
# 删除默认的handler
# app.logger.removeHandler(default_handler)
# 设置logger
for logger in (
app.logger,
logging.getLogger("sqlalchemy"),
logging.getLogger("werkzeug"),
):
logger.addHandler(stream_handler)
logger.addHandler(timed_rotating_file_handler)
if os.getenv("FLASK_ENV") == "production":
logger.addHandler(mail_handler)
# set logger for elk
# stash_handler = logstash.LogstashHandler(
# app.config.get('ELK_HOST'),
# app.config.get('ELK_PORT')
# )
# root_logger.addHandler(stashHandler)
settings.py
# -*- coding: utf-8 -*-
"""Application configuration.
"""
import os
BASE_DIR = os.path.dirname(__file__)
class Config(object):
# Base settings #################################################
DEBUG = False
TESTING = False
SECRET_KEY = ""
BUNDLE_ERRORS = True
# 日志配置 ###############################################################
LOG_PATH = os.path.join(BASE_DIR, "logs", "falling-wind-service.log")
LOG_FORMATTER = (
"%(asctime)s [%(name)s] [%(thread)d:%(threadName)s] "
"%(filename)s:%(module)s:%(funcName)s "
"in %(lineno)d] "
"[%(levelname)s]: %(message)s"
)
LOG_MAX_BYTES = 50 * 1024 * 1024 # 日志文件大小
LOG_BACKUP_COUNT = 10 # 备份文件数量
LOG_INTERVAL = 1
LOG_WHEN = "D"
# 数据库配置 ####################################################
SQLALCHEMY_ENGINE_OPTIONS = {
"pool_timeout": 10, # 默认链接超时时长
"pool_size": 10, # 数据库链接池大小
}
# 提供多库链接 使用其他库进行链接的时候需要使用bind指定那个库使用
SQLALCHEMY_BINDS = {}
SQLALCHEMY_TRACK_MODIFICATIONS = True
# Celery ##################################################################
enable_utc = True
timezone = "Asia/Shanghai"
# or the actual content-type (MIME)
accept_content = ["application/json"]
# or the actual content-type (MIME)
result_accept_content = ["application/json"]
include = ["app_tasks.user_tasks"]
result_expires = 3600
# JWT ####################################################################
JWT_SECRET_KEY = ""
# JWT_BLACKLIST_ENABLED = False
# JWT_BLACKLIST_TOKEN_CHECKS = ['access', 'refresh']
class Pro(Config):
ENV = "product"
# 日志 ##################################################################
LOG_PATH = "your application log path"
# DB ##################################################################
DB_HOST = ""
DB_PORT = 3306
DB_DATABASE = ""
DB_USER = ""
DB_PASSWORD = ""
SQLALCHEMY_DATABASE_URI = "mysql+pymysql://{}:{}@{}:{}/{}".format(
DB_USER, DB_PASSWORD, DB_HOST, DB_PORT, DB_DATABASE
)
# Redis ####################################################
REDIS_URL = ""
# Mail ########################################################
MAIL_SERVER = "smtp.qq.com"
MAIL_PORT = 465
MAIL_USE_SSL = True
MAIL_USERNAME = ""
MAIL_PASSWORD = ""
# Celery ###########################################################
# Broker settings.
broker_url = ""
# Using the redis to store task state and results.
result_backend = ""
# JWT ###############################################################
# JWT_ACCESS_TOKEN_EXPIRES = 60 * 60
JWT_SECRET_KEY = ""
# SMTPHandler ######################################################
MAILHOST = ("smtp.qq.com", 465)
CREDENTIALS = ("", "")
FROMADDR = ""
TOADDRS = [""]
SUBJECT = ""
SECURE = ("SSL",)
class Dev(Config):
DEBUG = True
ENV = "dev"
# 日志 ##################################################################
LOG_PATH = "your path"
# DB ####################################################################
DB_HOST = "localhost"
DB_PORT = 3306
DB_DATABASE = ""
DB_USER = ""
DB_PASSWORD = ""
SQLALCHEMY_DATABASE_URI = "mysql+pymysql://{}:{}@{}:{}/{}".format(
DB_USER, DB_PASSWORD, DB_HOST, DB_PORT, DB_DATABASE
)
# Redis ####################################################
REDIS_URL = ""
# Mail ########################################################
MAIL_SERVER = "smtp.qq.com"
MAIL_PORT = 25
MAIL_USE_TLS = True
MAIL_USERNAME = ""
MAIL_PASSWORD = ""
# Celery ###################################################################
# Broker settings.
broker_url = ""
# Using the redis to store task state and results.
result_backend = ""
# JWT ###############################################################
JWT_ACCESS_TOKEN_EXPIRES = 60 * 60
JWT_SECRET_KEY = ""
# SMTPHandler ######################################################
MAILHOST = ("smtp.qq.com", 465)
CREDENTIALS = ("", "")
FROMADDR = ""
TOADDRS = [""]
SUBJECT = ""
class Test(Config):
TESTING = True
DEBUG = True
ENV = "test"
app.py
# -*- coding: utf-8 -*-
"""Create an application instance."""
from autoapp import create_app
import os
from extensions import celery
if os.getenv("FLASK_ENV") == "development":
app = create_app("settings.Dev")
elif os.getenv("FLASK_ENV") == "production":
app = create_app("settings.Pro")
else:
raise EnvironmentError("Please set FLASK_ENV !!!")
celery = celery
运行flask
windows
$ set FLASK_ENV=development
$ flask run
Linux
$ export FLASK_ENV=development
$ flask run
Enjoy your code!