第17章 Flask Web开发
学习目标
完成本章学习后,读者应能够:
- 理解Web框架的本质:掌握WSGI/ASGI协议原理,理解HTTP请求-响应生命周期在框架中的实现机制
- 掌握Flask架构设计:深入理解Flask的微框架设计哲学、应用上下文与请求上下文机制
- 熟练运用路由系统:实现RESTful路由设计、动态路由、蓝图模块化与URL构建策略
- 精通模板引擎:掌握Jinja2模板继承、自定义过滤器、宏定义与模板性能优化
- 构建数据持久层:运用Flask-SQLAlchemy实现ORM映射、关系建模、迁移管理与查询优化
- 实现认证授权体系:基于Flask-Login构建完整的用户认证、会话管理与权限控制系统
- 掌握工程化实践:运用应用工厂模式、蓝图架构、配置管理实现可维护的大型应用
- 实施安全防护:理解并防御XSS、CSRF、SQL注入、点击劫持等Web安全威胁
- 完成生产部署:掌握Gunicorn/uWSGI部署、容器化、性能调优与监控体系
17.1 Web框架基础理论
17.1.1 HTTP协议与Web应用架构
Web应用的本质是遵循HTTP协议进行请求-响应交互的程序。理解HTTP协议的工作机制是掌握任何Web框架的前提。
客户端 (Browser) 服务端 (Web Server)
| |
| 1. DNS解析 → TCP三次握手 → TLS协商 |
| |
| 2. HTTP请求 ──────────────────────────> |
| GET /api/users HTTP/1.1 |
| Host: example.com |
| Accept: application/json |
| |
| 3. 服务器处理请求 |
| WSGI Server → Flask App → View Func |
| |
| 4. HTTP响应 <────────────────────────── |
| HTTP/1.1 200 OK |
| Content-Type: application/json |
| {"users": [...]} |
| |
| 5. TCP四次挥手 |HTTP请求方法与语义对照:
| 方法 | 语义 | 幂等性 | 安全性 | 典型用途 |
|---|---|---|---|---|
| GET | 获取资源 | 是 | 是 | 查询数据 |
| POST | 创建资源 | 否 | 否 | 提交表单、创建记录 |
| PUT | 全量更新 | 是 | 否 | 替换整个资源 |
| PATCH | 增量更新 | 否 | 否 | 修改部分字段 |
| DELETE | 删除资源 | 是 | 否 | 删除记录 |
| HEAD | 获取元信息 | 是 | 是 | 检查资源是否存在 |
| OPTIONS | 获取支持方法 | 是 | 是 | CORS预检请求 |
17.1.2 WSGI协议深度解析
WSGI(Web Server Gateway Interface,PEP 3333)是Python Web应用与服务器之间的标准接口协议。Flask基于WSGI构建,理解WSGI对掌握Flask的运行机制至关重要。
def simple_wsgi_app(environ, start_response):
status = "200 OK"
headers = [("Content-Type", "text/plain; charset=utf-8")]
start_response(status, headers)
return [b"Hello, WSGI!"]WSGI接口的两个核心组件:
- environ:包含请求信息的字典,由WSGI服务器填充
- start_response:可调用对象,用于设置响应状态码和头部
def detailed_wsgi_app(environ, start_response):
method = environ.get("REQUEST_METHOD", "GET")
path = environ.get("PATH_INFO", "/")
query_string = environ.get("QUERY_STRING", "")
content_type = environ.get("CONTENT_TYPE", "")
content_length = int(environ.get("CONTENT_LENGTH", 0) or 0)
server_name = environ.get("SERVER_NAME", "localhost")
server_port = environ.get("SERVER_PORT", "80")
request_body = environ["wsgi.input"].read(content_length) if content_length > 0 else b""
response_body = (
f"Method: {method}\n"
f"Path: {path}\n"
f"Query: {query_string}\n"
f"Content-Type: {content_type}\n"
f"Body: {request_body.decode('utf-8')}\n"
f"Server: {server_name}:{server_port}\n"
).encode("utf-8")
status = "200 OK"
headers = [
("Content-Type", "text/plain; charset=utf-8"),
("Content-Length", str(len(response_body))),
]
start_response(status, headers)
return [response_body]WSGI中间件模式——在应用与服务器之间插入处理层:
class TimingMiddleware:
def __init__(self, app):
self.app = app
def __call__(self, environ, start_response):
import time
start = time.perf_counter()
def custom_start_response(status, headers, exc_info=None):
elapsed = time.perf_counter() - start
headers.append(("X-Response-Time", f"{elapsed:.4f}s"))
return start_response(status, headers, exc_info)
return self.app(environ, custom_start_response)
class RequestLoggingMiddleware:
def __init__(self, app, logger=None):
self.app = app
self.logger = logger
def __call__(self, environ, start_response):
method = environ.get("REQUEST_METHOD", "GET")
path = environ.get("PATH_INFO", "/")
query = environ.get("QUERY_STRING", "")
remote = environ.get("REMOTE_ADDR", "-")
if self.logger:
self.logger.info(f"{remote} {method} {path}?{query}")
return self.app(environ, start_response)17.1.3 从WSGI到ASGI:异步Web的演进
ASGI(Asynchronous Server Gateway Interface)是WSGI的异步继承者,支持WebSocket、HTTP/2和长轮询等场景:
async def asgi_app(scope, receive, send):
if scope["type"] == "http":
await send({
"type": "http.response.start",
"status": 200,
"headers": [[b"content-type", b"text/plain"]],
})
await send({
"type": "http.response.body",
"body": b"Hello, ASGI!",
})
elif scope["type"] == "websocket":
while True:
message = await receive()
if message["type"] == "websocket.connect":
await send({"type": "websocket.accept"})
elif message["type"] == "websocket.receive":
await send({
"type": "websocket.send",
"text": f"Echo: {message.get('text', '')}",
})
elif message["type"] == "websocket.disconnect":
breakFlask 3.0+已开始支持ASGI模式,可通过异步视图函数处理并发请求:
from flask import Flask
app = Flask(__name__)
@app.route("/async-endpoint")
async def async_handler():
import asyncio
await asyncio.sleep(0.1)
return {"message": "Async response"}17.1.4 Flask的设计哲学
Flask遵循"微框架"(Microframework)设计理念,其核心原则包括:
- 显式优于隐式:不强制项目结构,开发者拥有完全控制权
- 最小核心:核心仅提供路由、模板、请求上下文,其余通过扩展实现
- 可组合性:通过蓝图和扩展机制构建复杂应用
- 开发者友好:提供优秀的调试工具和详细的错误信息
Flask核心依赖仅两个库:
| 依赖 | 功能 | 说明 |
|---|---|---|
| Werkzeug | WSGI工具库 | 提供路由、请求/响应对象、开发服务器、调试器 |
| Jinja2 | 模板引擎 | 提供模板渲染、继承、过滤器、宏等 |
Flask应用架构层次:
┌─────────────────────────────────────────┐
│ Flask Application │
│ ┌───────────┐ ┌──────────┐ ┌──────┐ │
│ │ Routing │ │ Template │ │ Ctx │ │
│ │ System │ │ Engine │ │ Mgmt │ │
│ └─────┬─────┘ └────┬─────┘ └──┬───┘ │
│ │ │ │ │
│ ┌─────┴─────┐ ┌────┴─────┐ ┌─┴───┐ │
│ │ Werkzeug │ │ Jinja2 │ │ Ctx │ │
│ │ Routing │ │ Template │ │ Loc │ │
│ └───────────┘ └──────────┘ └─────┘ │
│ │
│ ┌─────────────────────────────────────┐│
│ │ Extension System ││
│ │ SQLAlchemy │ Login │ WTForms │ ... ││
│ └─────────────────────────────────────┘│
└─────────────────────────────────────────┘17.2 Flask应用核心机制
17.2.1 应用初始化与配置
from flask import Flask
import os
class Config:
SECRET_KEY = os.environ.get("SECRET_KEY", "dev-key-change-in-production")
SQLALCHEMY_DATABASE_URI = os.environ.get(
"DATABASE_URL", "sqlite:///app.db"
)
SQLALCHEMY_TRACK_MODIFICATIONS = False
SQLALCHEMY_ENGINE_OPTIONS = {
"pool_size": 10,
"pool_recycle": 3600,
"pool_pre_ping": True,
}
JSON_SORT_KEYS = False
MAX_CONTENT_LENGTH = 16 * 1024 * 1024
class DevelopmentConfig(Config):
DEBUG = True
SQLALCHEMY_ECHO = True
class TestingConfig(Config):
TESTING = True
SQLALCHEMY_DATABASE_URI = "sqlite:///:memory:"
WTF_CSRF_ENABLED = False
class ProductionConfig(Config):
DEBUG = False
SQLALCHEMY_ECHO = False
config_map = {
"development": DevelopmentConfig,
"testing": TestingConfig,
"production": ProductionConfig,
"default": DevelopmentConfig,
}17.2.2 应用工厂模式
应用工厂模式(Application Factory Pattern)是Flask推荐的项目组织方式,它将应用创建过程封装为函数,支持:
- 多配置环境切换
- 避免循环导入
- 便于测试时创建独立应用实例
- 支持多应用实例运行
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
from flask_login import LoginManager
from flask_cors import CORS
db = SQLAlchemy()
migrate = Migrate()
login_manager = LoginManager()
cors = CORS()
def create_app(config_name="default"):
app = Flask(__name__)
app.config.from_object(config_map[config_name])
db.init_app(app)
migrate.init_app(app, db)
login_manager.init_app(app)
cors.init_app(app)
login_manager.login_view = "auth.login"
login_manager.login_message_category = "info"
from .main import main_bp
from .auth import auth_bp
from .api import api_bp
app.register_blueprint(main_bp)
app.register_blueprint(auth_bp, url_prefix="/auth")
app.register_blueprint(api_bp, url_prefix="/api/v1")
register_error_handlers(app)
register_template_filters(app)
register_cli_commands(app)
register_hooks(app)
return app
def register_error_handlers(app):
from flask import render_template, jsonify
def wants_json_response():
from flask import request
return (
request.accept_mimetypes.best_match(["application/json", "text/html"])
== "application/json"
)
@app.errorhandler(404)
def not_found(error):
if wants_json_response():
return jsonify({"error": "Not found", "status": 404}), 404
return render_template("errors/404.html"), 404
@app.errorhandler(500)
def internal_error(error):
db.session.rollback()
if wants_json_response():
return jsonify({"error": "Internal server error", "status": 500}), 500
return render_template("errors/500.html"), 500
@app.errorhandler(403)
def forbidden(error):
if wants_json_response():
return jsonify({"error": "Forbidden", "status": 403}), 403
return render_template("errors/403.html"), 403
def register_template_filters(app):
@app.template_filter("datetime_format")
def datetime_format(value, format="%Y-%m-%d %H:%M"):
if value is None:
return ""
return value.strftime(format)
@app.template_filter("truncate_chars")
def truncate_chars(value, length=100):
if len(value) <= length:
return value
return value[:length].rsplit(" ", 1)[0] + "..."
def register_cli_commands(app):
import click
@app.cli.command("init-db")
def init_db():
db.create_all()
click.echo("数据库初始化完成。")
@app.cli.command("create-admin")
@click.argument("username")
@click.argument("email")
@click.argument("password")
def create_admin(username, email, password):
from .models import User
user = User(username=username, email=email, is_admin=True)
user.set_password(password)
db.session.add(user)
db.session.commit()
click.echo(f"管理员用户 {username} 创建成功。")
def register_hooks(app):
from flask import g, request
import time
@app.before_request
def before_request():
g.start_time = time.perf_counter()
@app.after_request
def after_request(response):
if hasattr(g, "start_time"):
elapsed = time.perf_counter() - g.start_time
response.headers["X-Response-Time"] = f"{elapsed:.4f}s"
return response17.2.3 上下文机制深度解析
Flask的上下文机制是其架构的核心,理解上下文对排查"Working outside of application context"等错误至关重要。
Flask存在两种上下文:
| 上下文类型 | 作用域 | 包含对象 | 生命周期 |
|---|---|---|---|
| 应用上下文 | 应用级别 | current_app、g | 请求期间或手动推送 |
| 请求上下文 | 请求级别 | request、session | 单个HTTP请求 |
from flask import Flask, current_app, g, request
app = Flask(__name__)
app.config["APP_NAME"] = "MyApp"
@app.route("/context-demo")
def context_demo():
app_ctx_info = {
"app_name": current_app.name,
"config_app_name": current_app.config["APP_NAME"],
"debug_mode": current_app.debug,
}
request_ctx_info = {
"method": request.method,
"url": request.url,
"endpoint": request.endpoint,
"blueprint": request.blueprint,
"view_args": request.view_args,
}
g.request_id = "req-12345"
g.user_ip = request.remote_addr
return {
"app_context": app_ctx_info,
"request_context": request_ctx_info,
"g_data": {"request_id": g.request_id, "user_ip": g.user_ip},
}手动推送上下文(用于CLI脚本、测试等场景):
from flask import Flask
app = Flask(__name__)
app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///test.db"
with app.app_context():
from flask import current_app
print(current_app.name)
print(current_app.config["SQLALCHEMY_DATABASE_URI"])
with app.test_request_context("/api/test?foo=bar", method="POST"):
from flask import request
print(request.method)
print(request.url)
print(request.args.get("foo"))上下文栈的实现原理(简化版):
class AppContext:
def __init__(self, app):
self.app = app
self.g = app.app_ctx_globals_class()
self._refcnt = 0
def push(self):
self._refcnt += 1
_app_ctx_stack.push(self)
def pop(self, exc=None):
self._refcnt -= 1
if self._refcnt <= 0:
_app_ctx_stack.pop()
def __enter__(self):
self.push()
return self
def __exit__(self, *args):
self.pop()
class RequestContext:
def __init__(self, app, environ, request=None, session=None):
self.app = app
self.request = request or app.request_class(environ)
self.session = session
def push(self):
app_ctx = _app_ctx_stack.top
if app_ctx is None or app_ctx.app != self.app:
app_ctx = self.app.app_context()
app_ctx.push()
self._got_app_context = True
_request_ctx_stack.push(self)
def pop(self, exc=None):
_request_ctx_stack.pop()
if getattr(self, "_got_app_context", False):
app_ctx = _app_ctx_stack.pop()
app_ctx.pop(exc)17.3 路由系统
17.3.1 路由注册与匹配机制
Flask使用Werkzeug的路由系统,基于Map和Rule实现URL匹配:
from flask import Flask
app = Flask(__name__)
@app.route("/")
def index():
return "Home"
@app.route("/hello")
def hello():
return "Hello"
@app.route("/user/<username>")
def user_profile(username):
return f"User: {username}"
@app.route("/post/<int:post_id>")
def post_detail(post_id):
return f"Post: {post_id}"
@app.route("/post/<int:post_id>/comment/<int:comment_id>")
def post_comment(post_id, comment_id):
return f"Post {post_id}, Comment {comment_id}"
@app.route("/path/<path:filepath>")
def serve_path(filepath):
return f"Path: {filepath}"
@app.route("/download/<uuid:file_id>")
def download(file_id):
return f"File ID: {file_id}"路由参数转换器:
| 转换器 | 匹配规则 | 示例URL | Python类型 |
|---|---|---|---|
string | 任意文本(不含/) | /user/<name> | str |
int | 正整数 | /post/<int:id> | int |
float | 浮点数 | /price/<float:p> | float |
path | 任意文本(含/) | /file/<path:p> | str |
uuid | UUID字符串 | /item/<uuid:id> | uuid.UUID |
any | 指定选项之一 | /<any(a,b):page> | str |
自定义转换器:
from werkzeug.routing import BaseConverter
class ListConverter(BaseConverter):
def to_python(self, value):
return value.split(",")
def to_url(self, values):
return ",".join(str(v) for v in values)
class RegexConverter(BaseConverter):
def __init__(self, url_map, regex_pattern):
super().__init__(url_map)
self.regex = regex_pattern
app.url_map.converters["list"] = ListConverter
app.url_map.converters["regex"] = RegexConverter
@app.route("/items/<list:items>")
def items_view(items):
return {"items": items}
@app.route("/version/<regex(r'\d+\.\d+\.\d+'):version>")
def version_info(version):
return {"version": version}17.3.2 HTTP方法与RESTful路由设计
from flask import Flask, request, jsonify
app = Flask(__name__)
@app.route("/api/articles", methods=["GET", "POST"])
def articles_collection():
if request.method == "GET":
page = request.args.get("page", 1, type=int)
per_page = request.args.get("per_page", 10, type=int)
return jsonify({
"articles": [],
"page": page,
"per_page": per_page,
})
elif request.method == "POST":
data = request.get_json()
if not data or "title" not in data:
return jsonify({"error": "标题不能为空"}), 400
return jsonify({"id": 1, "title": data["title"]}), 201
@app.route("/api/articles/<int:article_id>", methods=["GET", "PUT", "DELETE"])
def article_resource(article_id):
if request.method == "GET":
return jsonify({"id": article_id, "title": "示例文章"})
elif request.method == "PUT":
data = request.get_json()
return jsonify({"id": article_id, "title": data.get("title", "")})
elif request.method == "DELETE":
return "", 204
@app.route("/api/articles/<int:article_id>/comments", methods=["GET", "POST"])
def article_comments(article_id):
if request.method == "GET":
return jsonify({"article_id": article_id, "comments": []})
elif request.method == "POST":
data = request.get_json()
return jsonify({
"id": 1,
"article_id": article_id,
"content": data.get("content", ""),
}), 201使用MethodView实现类视图,更优雅地组织RESTful API:
from flask import Flask, request, jsonify
from flask.views import MethodView
app = Flask(__name__)
class ArticleAPI(MethodView):
def get(self, article_id):
if article_id is None:
page = request.args.get("page", 1, type=int)
return jsonify({"articles": [], "page": page})
return jsonify({"id": article_id, "title": "示例文章"})
def post(self):
data = request.get_json()
if not data or "title" not in data:
return jsonify({"error": "标题不能为空"}), 400
return jsonify({"id": 1, "title": data["title"]}), 201
def put(self, article_id):
data = request.get_json()
return jsonify({"id": article_id, "title": data.get("title", "")})
def delete(self, article_id):
return "", 204
article_view = ArticleAPI.as_view("article_api")
app.add_url_rule(
"/api/articles",
defaults={"article_id": None},
view_func=article_view,
methods=["GET"],
)
app.add_url_rule("/api/articles", view_func=article_view, methods=["POST"])
app.add_url_rule(
"/api/articles/<int:article_id>",
view_func=article_view,
methods=["GET", "PUT", "DELETE"],
)17.3.3 蓝图(Blueprint)模块化架构
蓝图是Flask实现模块化的核心机制,它允许将应用拆分为独立的功能模块:
from flask import Blueprint
auth_bp = Blueprint(
"auth",
__name__,
template_folder="templates",
static_folder="static",
)
@auth_bp.route("/login", methods=["GET", "POST"])
def login():
return "Login Page"
@auth_bp.route("/register", methods=["GET", "POST"])
def register():
return "Register Page"
@auth_bp.route("/logout")
def logout():
return "Logged out"from flask import Blueprint, jsonify
api_bp = Blueprint("api", __name__, url_prefix="/api/v1")
@api_bp.route("/status")
def status():
return jsonify({"status": "ok", "version": "1.0"})
@api_bp.route("/users", methods=["GET"])
def list_users():
return jsonify({"users": []})蓝图嵌套——构建多层模块结构:
from flask import Blueprint
api_v1_bp = Blueprint("api_v1", __name__, url_prefix="/api/v1")
api_v2_bp = Blueprint("api_v2", __name__, url_prefix="/api/v2")
users_v1_bp = Blueprint("users_v1", __name__)
articles_v1_bp = Blueprint("articles_v1", __name__)
@users_v1_bp.route("/users")
def list_users():
return {"version": "v1", "users": []}
@articles_v1_bp.route("/articles")
def list_articles():
return {"version": "v1", "articles": []}
api_v1_bp.register_blueprint(users_v1_bp)
api_v1_bp.register_blueprint(articles_v1_bp)17.4 请求与响应
17.4.1 请求对象详解
from flask import Flask, request
app = Flask(__name__)
@app.route("/request-info", methods=["GET", "POST", "PUT"])
def request_info():
info = {
"method": request.method,
"url": request.url,
"base_url": request.base_url,
"url_root": request.url_root,
"path": request.path,
"full_path": request.full_path,
"scheme": request.scheme,
"is_secure": request.is_secure,
"host": request.host,
"remote_addr": request.remote_addr,
}
return info
@app.route("/query-demo")
def query_demo():
search = request.args.get("q", "")
page = request.args.get("page", 1, type=int)
sort = request.args.get("sort", "created_at")
tags = request.args.getlist("tag")
return {
"search": search,
"page": page,
"sort": sort,
"tags": tags,
}
@app.route("/form-demo", methods=["POST"])
def form_demo():
username = request.form.get("username")
password = request.form.get("password")
hobbies = request.form.getlist("hobby")
return {
"username": username,
"hobbies": hobbies,
}
@app.route("/json-demo", methods=["POST"])
def json_demo():
data = request.get_json(silent=True)
if data is None:
return {"error": "Invalid JSON"}, 400
return {"received": data, "content_type": request.content_type}
@app.route("/headers-demo")
def headers_demo():
return {
"user_agent": request.user_agent.string,
"user_agent_browser": request.user_agent.browser,
"user_agent_platform": request.user_agent.platform,
"accept_languages": [str(l) for l in request.accept_languages],
"authorization": request.headers.get("Authorization"),
}
@app.route("/cookie-demo")
def cookie_demo():
all_cookies = {k: v for k, v in request.cookies.items()}
session_id = request.cookies.get("session_id")
return {"all_cookies": all_cookies, "session_id": session_id}17.4.2 响应构建
from flask import (
Flask,
make_response,
jsonify,
Response,
stream_with_context,
redirect,
url_for,
send_file,
)
import json
import time
import io
import csv
app = Flask(__name__)
@app.route("/response-text")
def text_response():
return "Plain text response"
@app.route("/response-json")
def json_response():
return jsonify(
message="Success",
data={"id": 1, "name": "Example"},
status="ok",
)
@app.route("/response-custom")
def custom_response():
resp = make_response(jsonify({"message": "Created"}), 201)
resp.headers["X-Custom-Header"] = "CustomValue"
resp.headers["Cache-Control"] = "no-cache, no-store, must-revalidate"
return resp
@app.route("/response-cookie")
def cookie_response():
resp = make_response("Cookie已设置")
resp.set_cookie(
"session_token",
value="abc123def456",
max_age=3600,
httponly=True,
secure=True,
samesite="Lax",
)
return resp
@app.route("/response-redirect")
def redirect_response():
return redirect(url_for("text_response"))
@app.route("/response-stream")
def stream_response():
def generate():
for i in range(10):
json_data = json.dumps({"count": i, "timestamp": time.time()})
yield f"data: {json_data}\n\n"
time.sleep(0.5)
return Response(
stream_with_context(generate()),
mimetype="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
)
@app.route("/response-download")
def download_response():
buffer = io.BytesIO(b"Hello, this is a downloadable file content.")
return send_file(
buffer,
as_attachment=True,
download_name="example.txt",
mimetype="text/plain",
)
@app.route("/response-csv")
def csv_response():
output = io.StringIO()
writer = csv.writer(output)
writer.writerow(["ID", "Name", "Email"])
writer.writerow([1, "Alice", "alice@example.com"])
writer.writerow([2, "Bob", "bob@example.com"])
resp = make_response(output.getvalue())
resp.headers["Content-Type"] = "text/csv; charset=utf-8"
resp.headers["Content-Disposition"] = "attachment; filename=users.csv"
return resp17.4.3 请求钩子(Hooks)
请求钩子允许在请求处理的不同阶段插入通用逻辑:
from flask import Flask, g, request
import time
import uuid
app = Flask(__name__)
@app.before_request
def before_request_hook():
g.request_id = str(uuid.uuid4())[:8]
g.start_time = time.perf_counter()
g.user = None
auth_header = request.headers.get("Authorization")
if auth_header and auth_header.startswith("Bearer "):
token = auth_header[7:]
g.user = verify_token(token)
@app.after_request
def after_request_hook(response):
if hasattr(g, "request_id"):
response.headers["X-Request-ID"] = g.request_id
if hasattr(g, "start_time"):
elapsed = time.perf_counter() - g.start_time
response.headers["X-Response-Time"] = f"{elapsed:.4f}s"
return response
@app.teardown_request
def teardown_request_hook(exception=None):
if exception:
app.logger.error(
f"请求异常: {exception}, "
f"Request-ID: {getattr(g, 'request_id', 'N/A')}"
)
@app.teardown_appcontext
def teardown_appcontext_hook(exception=None):
from . import db
if exception:
db.session.rollback()
db.session.remove()
def verify_token(token):
if token == "valid-token":
return {"id": 1, "username": "admin"}
return None钩子执行顺序:
请求进入
│
▼
before_first_request (仅首次请求)
│
▼
before_request (按注册顺序执行)
│
▼
视图函数处理
│
▼
after_request (按注册逆序执行)
│
▼
teardown_request (无论是否异常都执行)
│
▼
teardown_appcontext (清理应用上下文)17.5 Jinja2模板引擎
17.5.1 模板渲染与继承
Jinja2是Flask的默认模板引擎,提供模板继承、宏、过滤器等强大功能。
基础模板 templates/base.html:
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>{% block title %}MyApp{% endblock %}</title>
<link rel="stylesheet" href="{{ url_for('static', filename='css/style.css') }}">
{% block extra_css %}{% endblock %}
</head>
<body>
<nav class="navbar">
<div class="container">
<a class="brand" href="{{ url_for('main.index') }}">MyApp</a>
<ul class="nav-links">
<li><a href="{{ url_for('main.index') }}">首页</a></li>
<li><a href="{{ url_for('main.about') }}">关于</a></li>
{% if current_user.is_authenticated %}
<li><a href="{{ url_for('auth.profile') }}">{{ current_user.username }}</a></li>
<li><a href="{{ url_for('auth.logout') }}">退出</a></li>
{% else %}
<li><a href="{{ url_for('auth.login') }}">登录</a></li>
<li><a href="{{ url_for('auth.register') }}">注册</a></li>
{% endif %}
</ul>
</div>
</nav>
<main class="container">
{% with messages = get_flashed_messages(with_categories=true) %}
{% if messages %}
<div class="flash-messages">
{% for category, message in messages %}
<div class="alert alert-{{ category }}">{{ message }}</div>
{% endfor %}
</div>
{% endif %}
{% endwith %}
{% block content %}{% endblock %}
</main>
<footer class="footer">
<div class="container">
<p>© 2026 MyApp. All rights reserved.</p>
</div>
</footer>
<script src="{{ url_for('static', filename='js/main.js') }}"></script>
{% block extra_js %}{% endblock %}
</body>
</html>子模板 templates/main/index.html:
{% extends "base.html" %}
{% block title %}首页 - {{ super() }}{% endblock %}
{% block content %}
<section class="hero">
<h1>欢迎来到 MyApp</h1>
<p>一个基于Flask构建的Web应用</p>
</section>
<section class="features">
<h2>核心功能</h2>
<div class="grid">
{% for feature in features %}
<div class="card">
<h3>{{ feature.title }}</h3>
<p>{{ feature.description }}</p>
<a href="{{ feature.url }}">了解更多 →</a>
</div>
{% endfor %}
</div>
</section>
{% endblock %}列表模板 templates/main/articles.html:
{% extends "base.html" %}
{% block title %}文章列表 - {{ super() }}{% endblock %}
{% block content %}
<h1>文章列表</h1>
{% if articles %}
<table class="table">
<thead>
<tr>
<th>标题</th>
<th>作者</th>
<th>发布时间</th>
<th>操作</th>
</tr>
</thead>
<tbody>
{% for article in articles %}
<tr>
<td><a href="{{ url_for('main.article_detail', article_id=article.id) }}">{{ article.title }}</a></td>
<td>{{ article.author.username }}</td>
<td>{{ article.created_at | datetime_format }}</td>
<td>
<a href="{{ url_for('main.edit_article', article_id=article.id) }}">编辑</a>
<form method="post" action="{{ url_for('main.delete_article', article_id=article.id) }}" style="display:inline">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
<button type="submit" onclick="return confirm('确认删除?')">删除</button>
</form>
</td>
</tr>
{% endfor %}
</tbody>
</table>
{% if pagination.pages > 1 %}
<nav class="pagination">
{% if pagination.has_prev %}
<a href="{{ url_for('main.articles', page=pagination.prev_num) }}">« 上一页</a>
{% endif %}
{% for page_num in pagination.iter_pages() %}
{% if page_num %}
<a href="{{ url_for('main.articles', page=page_num) }}"
class="{{ 'active' if page_num == pagination.page else '' }}">{{ page_num }}</a>
{% else %}
<span class="ellipsis">...</span>
{% endif %}
{% endfor %}
{% if pagination.has_next %}
<a href="{{ url_for('main.articles', page=pagination.next_num) }}">下一页 »</a>
{% endif %}
</nav>
{% endif %}
{% else %}
<p class="empty-state">暂无文章。</p>
{% endif %}
{% endblock %}17.5.2 自定义过滤器与宏
from flask import Flask
app = Flask(__name__)
@app.template_filter("datetime_format")
def datetime_format(value, format="%Y-%m-%d %H:%M"):
if value is None:
return ""
return value.strftime(format)
@app.template_filter("currency")
def currency_format(value, symbol="¥"):
if value is None:
return ""
return f"{symbol}{value:,.2f}"
@app.template_filter("pluralize")
def pluralize(count, singular="", plural="s"):
if count == 1:
return singular
return plural
@app.template_filter("truncate_html")
def truncate_html(value, length=200, end="..."):
import re
text = re.sub(r"<[^>]+>", "", value)
if len(text) <= length:
return value
return text[:length].rsplit(" ", 1)[0] + end
@app.template_filter("time_ago")
def time_ago(value):
from datetime import datetime
now = datetime.now()
diff = now - value
seconds = int(diff.total_seconds())
intervals = [
(31536000, "年"),
(2592000, "个月"),
(604800, "周"),
(86400, "天"),
(3600, "小时"),
(60, "分钟"),
(1, "秒"),
]
for interval, label in intervals:
count = seconds // interval
if count > 0:
return f"{count}{label}前"
return "刚刚"模板宏——可复用的模板组件:
{# templates/macros/forms.html #}
{% macro render_field(field, label_visible=true) -%}
<div class="form-group {% if field.errors %}has-error{% endif %}">
{% if label_visible %}
<label for="{{ field.id }}">{{ field.label.text }}</label>
{% endif %}
{{ field(class="form-control" + (" is-invalid" if field.errors else ""), **kwargs) }}
{% if field.errors %}
<div class="invalid-feedback">
{% for error in field.errors %}
<span>{{ error }}</span>
{% endfor %}
</div>
{% elif field.description %}
<small class="form-text text-muted">{{ field.description }}</small>
{% endif %}
</div>
{%- endmacro %}
{% macro render_pagination(pagination, endpoint) -%}
{% if pagination.pages > 1 %}
<nav aria-label="分页导航">
<ul class="pagination">
<li class="page-item {{ 'disabled' if not pagination.has_prev else '' }}">
<a class="page-link" href="{{ url_for(endpoint, page=pagination.prev_num) if pagination.has_prev else '#' }}">«</a>
</li>
{% for page_num in pagination.iter_pages(left_edge=1, right_edge=1, left_current=2, right_current=2) %}
{% if page_num %}
<li class="page-item {{ 'active' if page_num == pagination.page else '' }}">
<a class="page-link" href="{{ url_for(endpoint, page=page_num) }}">{{ page_num }}</a>
</li>
{% else %}
<li class="page-item disabled"><span class="page-link">...</span></li>
{% endif %}
{% endfor %}
<li class="page-item {{ 'disabled' if not pagination.has_next else '' }}">
<a class="page-link" href="{{ url_for(endpoint, page=pagination.next_num) if pagination.has_next else '#' }}">»</a>
</li>
</ul>
</nav>
{% endif %}
{%- endmacro %}17.6 表单处理与验证
17.6.1 Flask-WTF集成
from flask_wtf import FlaskForm
from wtforms import (
StringField,
PasswordField,
TextAreaField,
SelectField,
BooleanField,
FileField,
)
from wtforms.validators import (
DataRequired,
Email,
Length,
EqualTo,
Optional,
Regexp,
ValidationError,
)
class RegistrationForm(FlaskForm):
username = StringField(
"用户名",
validators=[
DataRequired(message="用户名不能为空"),
Length(min=3, max=20, message="用户名长度需在3-20个字符之间"),
Regexp(
r"^[a-zA-Z][a-zA-Z0-9_]*$",
message="用户名只能包含字母、数字和下划线,且以字母开头",
),
],
)
email = StringField(
"邮箱",
validators=[
DataRequired(message="邮箱不能为空"),
Email(message="请输入有效的邮箱地址"),
],
)
password = PasswordField(
"密码",
validators=[
DataRequired(message="密码不能为空"),
Length(min=8, message="密码长度不能少于8个字符"),
],
)
confirm_password = PasswordField(
"确认密码",
validators=[
DataRequired(message="请确认密码"),
EqualTo("password", message="两次输入的密码不一致"),
],
)
agree_terms = BooleanField(
"同意服务条款",
validators=[DataRequired(message="必须同意服务条款才能注册")],
)
def validate_username(self, field):
from .models import User
if User.query.filter_by(username=field.data).first():
raise ValidationError("该用户名已被注册")
def validate_email(self, field):
from .models import User
if User.query.filter_by(email=field.data).first():
raise ValidationError("该邮箱已被注册")
class ArticleForm(FlaskForm):
title = StringField(
"标题",
validators=[
DataRequired(message="标题不能为空"),
Length(max=200, message="标题不能超过200个字符"),
],
)
content = TextAreaField(
"内容",
validators=[DataRequired(message="内容不能为空")],
)
category_id = SelectField(
"分类",
coerce=int,
validators=[DataRequired(message="请选择分类")],
)
tags = StringField("标签(用逗号分隔)", validators=[Optional()])
is_published = BooleanField("发布", default=True)
class ChangePasswordForm(FlaskForm):
old_password = PasswordField(
"当前密码",
validators=[DataRequired(message="请输入当前密码")],
)
new_password = PasswordField(
"新密码",
validators=[
DataRequired(message="请输入新密码"),
Length(min=8, message="密码长度不能少于8个字符"),
],
)
confirm_password = PasswordField(
"确认新密码",
validators=[
DataRequired(message="请确认新密码"),
EqualTo("new_password", message="两次输入的密码不一致"),
],
)
def validate_new_password(self, field):
if field.data == self.old_password.data:
raise ValidationError("新密码不能与当前密码相同")17.6.2 表单视图与模板集成
from flask import (
Blueprint,
render_template,
redirect,
url_for,
flash,
request,
)
from flask_login import login_user, logout_user, login_required, current_user
auth_bp = Blueprint("auth", __name__)
@auth_bp.route("/register", methods=["GET", "POST"])
def register():
from .forms import RegistrationForm
from .models import User, db
if current_user.is_authenticated:
return redirect(url_for("main.index"))
form = RegistrationForm()
if form.validate_on_submit():
user = User(
username=form.username.data,
email=form.email.data,
)
user.set_password(form.password.data)
db.session.add(user)
db.session.commit()
flash("注册成功!请登录。", "success")
return redirect(url_for("auth.login"))
return render_template("auth/register.html", form=form)
@auth_bp.route("/login", methods=["GET", "POST"])
def login():
from .forms import LoginForm
from .models import User
if current_user.is_authenticated:
return redirect(url_for("main.index"))
form = LoginForm()
if form.validate_on_submit():
user = User.query.filter_by(username=form.username.data).first()
if user and user.check_password(form.password.data):
login_user(user, remember=form.remember.data)
next_page = request.args.get("next")
flash("登录成功!", "success")
return redirect(next_page or url_for("main.index"))
flash("用户名或密码错误", "danger")
return render_template("auth/login.html", form=form)
@auth_bp.route("/logout")
@login_required
def logout():
logout_user()
flash("已退出登录。", "info")
return redirect(url_for("main.index"))表单模板:
{% extends "base.html" %}
{% from "macros/forms.html" import render_field %}
{% block title %}注册 - {{ super() }}{% endblock %}
{% block content %}
<div class="auth-container">
<h1>创建账户</h1>
<form method="post" novalidate>
{{ form.hidden_tag() }}
{{ render_field(form.username, placeholder="请输入用户名") }}
{{ render_field(form.email, placeholder="请输入邮箱", type="email") }}
{{ render_field(form.password, placeholder="请输入密码") }}
{{ render_field(form.confirm_password, placeholder="请再次输入密码") }}
{{ render_field(form.agree_terms) }}
<button type="submit" class="btn btn-primary">注册</button>
</form>
<p>已有账户?<a href="{{ url_for('auth.login') }}">立即登录</a></p>
</div>
{% endblock %}17.6.3 文件上传处理
import os
import uuid
from pathlib import Path
from flask import Blueprint, request, jsonify, current_app
from werkzeug.utils import secure_filename
upload_bp = Blueprint("upload", __name__)
ALLOWED_IMAGE_EXTENSIONS = {"png", "jpg", "jpeg", "gif", "webp"}
ALLOWED_DOCUMENT_EXTENSIONS = {"pdf", "doc", "docx", "xls", "xlsx", "txt"}
MAX_IMAGE_SIZE = 5 * 1024 * 1024
MAX_DOCUMENT_SIZE = 20 * 1024 * 1024
def allowed_file(filename, allowed_extensions):
return (
"." in filename
and filename.rsplit(".", 1)[1].lower() in allowed_extensions
)
def generate_safe_filename(filename):
name = secure_filename(filename)
return f"{uuid.uuid4().hex[:8]}_{name}"
def get_upload_path(category="images"):
upload_root = Path(current_app.config.get("UPLOAD_FOLDER", "uploads"))
target_dir = upload_root / category
target_dir.mkdir(parents=True, exist_ok=True)
return target_dir
@upload_bp.route("/upload/image", methods=["POST"])
def upload_image():
if "file" not in request.files:
return jsonify({"error": "未找到文件"}), 400
file = request.files["file"]
if file.filename == "":
return jsonify({"error": "未选择文件"}), 400
if not allowed_file(file.filename, ALLOWED_IMAGE_EXTENSIONS):
return jsonify({"error": f"不支持的文件类型"}), 400
file.seek(0, 2)
file_size = file.tell()
file.seek(0)
if file_size > MAX_IMAGE_SIZE:
return jsonify({"error": "文件大小超过5MB限制"}), 400
filename = generate_safe_filename(file.filename)
save_path = get_upload_path("images") / filename
file.save(str(save_path))
return jsonify({
"message": "上传成功",
"filename": filename,
"url": f"/uploads/images/{filename}",
"size": file_size,
}), 20117.7 数据库集成
17.7.1 SQLAlchemy ORM模型定义
from datetime import datetime, timezone
from flask_sqlalchemy import SQLAlchemy
from flask_login import UserMixin
from werkzeug.security import generate_password_hash, check_password_hash
db = SQLAlchemy()
class User(UserMixin, db.Model):
__tablename__ = "users"
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False, index=True)
email = db.Column(db.String(120), unique=True, nullable=False, index=True)
password_hash = db.Column(db.String(256), nullable=False)
nickname = db.Column(db.String(50))
bio = db.Column(db.Text)
avatar_url = db.Column(db.String(256))
is_active = db.Column(db.Boolean, default=True, nullable=False)
is_admin = db.Column(db.Boolean, default=False, nullable=False)
created_at = db.Column(
db.DateTime, default=lambda: datetime.now(timezone.utc), nullable=False
)
updated_at = db.Column(
db.DateTime,
default=lambda: datetime.now(timezone.utc),
onupdate=lambda: datetime.now(timezone.utc),
)
posts = db.relationship("Post", backref="author", lazy="dynamic")
comments = db.relationship("Comment", backref="author", lazy="dynamic")
def set_password(self, password):
self.password_hash = generate_password_hash(password)
def check_password(self, password):
return check_password_hash(self.password_hash, password)
def __repr__(self):
return f"<User {self.username}>"
post_tags = db.Table(
"post_tags",
db.Column("post_id", db.Integer, db.ForeignKey("posts.id"), primary_key=True),
db.Column("tag_id", db.Integer, db.ForeignKey("tags.id"), primary_key=True),
)
class Post(db.Model):
__tablename__ = "posts"
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(200), nullable=False)
slug = db.Column(db.String(200), unique=True, nullable=False, index=True)
content = db.Column(db.Text, nullable=False)
summary = db.Column(db.String(500))
is_published = db.Column(db.Boolean, default=False, index=True)
view_count = db.Column(db.Integer, default=0)
author_id = db.Column(db.Integer, db.ForeignKey("users.id"), nullable=False)
category_id = db.Column(db.Integer, db.ForeignKey("categories.id"))
created_at = db.Column(
db.DateTime, default=lambda: datetime.now(timezone.utc), nullable=False, index=True
)
updated_at = db.Column(
db.DateTime,
default=lambda: datetime.now(timezone.utc),
onupdate=lambda: datetime.now(timezone.utc),
)
comments = db.relationship("Comment", backref="post", lazy="dynamic")
tags = db.relationship(
"Tag", secondary=post_tags, backref=db.backref("posts", lazy="dynamic")
)
def __repr__(self):
return f"<Post {self.title}>"
class Category(db.Model):
__tablename__ = "categories"
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(50), unique=True, nullable=False)
slug = db.Column(db.String(50), unique=True, nullable=False)
description = db.Column(db.String(200))
posts = db.relationship("Post", backref="category", lazy="dynamic")
def __repr__(self):
return f"<Category {self.name}>"
class Tag(db.Model):
__tablename__ = "tags"
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(50), unique=True, nullable=False)
slug = db.Column(db.String(50), unique=True, nullable=False)
def __repr__(self):
return f"<Tag {self.name}>"
class Comment(db.Model):
__tablename__ = "comments"
id = db.Column(db.Integer, primary_key=True)
content = db.Column(db.Text, nullable=False)
is_approved = db.Column(db.Boolean, default=False)
post_id = db.Column(db.Integer, db.ForeignKey("posts.id"), nullable=False, index=True)
author_id = db.Column(db.Integer, db.ForeignKey("users.id"), nullable=False)
parent_id = db.Column(db.Integer, db.ForeignKey("comments.id"))
created_at = db.Column(
db.DateTime, default=lambda: datetime.now(timezone.utc), nullable=False
)
replies = db.relationship(
"Comment", backref=db.backref("parent", remote_side=[id]), lazy="dynamic"
)
def __repr__(self):
return f"<Comment {self.id}>"17.7.2 数据库迁移管理
from flask_migrate import Migrate
migrate = Migrate()迁移命令:
flask db init # 初始化迁移仓库
flask db migrate -m "描述" # 生成迁移脚本
flask db upgrade # 执行迁移
flask db downgrade # 回滚迁移
flask db history # 查看迁移历史
flask db current # 查看当前版本
flask db stamp head # 标记当前数据库为最新版本自定义迁移脚本示例:
def upgrade():
op.add_column("users", sa.Column("last_login_at", sa.DateTime(), nullable=True))
op.create_index("ix_users_last_login", "users", ["last_login_at"])
op.execute(
"UPDATE users SET last_login_at = created_at WHERE last_login_at IS NULL"
)
def downgrade():
op.drop_index("ix_users_last_login", table_name="users")
op.drop_column("users", "last_login_at")17.7.3 查询优化
from sqlalchemy import func, or_, desc
from sqlalchemy.orm import joinedload, subqueryload, contains_eager
class PostQuery:
@staticmethod
def get_published(page=1, per_page=10):
return (
Post.query.filter_by(is_published=True)
.options(joinedload(Post.author), joinedload(Post.category))
.order_by(Post.created_at.desc())
.paginate(page=page, per_page=per_page, error_out=False)
)
@staticmethod
def get_by_slug(slug):
return (
Post.query.filter_by(slug=slug, is_published=True)
.options(joinedload(Post.author), subqueryload(Post.tags))
.first_or_404()
)
@staticmethod
def search(keyword, page=1, per_page=10):
search_filter = or_(
Post.title.ilike(f"%{keyword}%"),
Post.content.ilike(f"%{keyword}%"),
Post.summary.ilike(f"%{keyword}%"),
)
return (
Post.query.filter(search_filter, Post.is_published == True)
.options(joinedload(Post.author))
.order_by(Post.created_at.desc())
.paginate(page=page, per_page=per_page, error_out=False)
)
@staticmethod
def get_by_category(category_slug, page=1, per_page=10):
return (
Post.query.join(Post.category)
.filter(Category.slug == category_slug, Post.is_published == True)
.options(contains_eager(Post.category), joinedload(Post.author))
.order_by(Post.created_at.desc())
.paginate(page=page, per_page=per_page, error_out=False)
)
@staticmethod
def get_stats():
return db.session.query(
func.date_trunc("month", Post.created_at).label("month"),
func.count(Post.id).label("count"),
).filter(
Post.is_published == True
).group_by(
func.date_trunc("month", Post.created_at)
).order_by(
desc("month")
).all()
@staticmethod
def get_popular(limit=10):
return (
Post.query.filter_by(is_published=True)
.order_by(Post.view_count.desc())
.limit(limit)
.all()
)N+1查询问题与解决方案:
# N+1查询问题(低效)
posts = Post.query.all()
for post in posts:
print(post.author.username) # 每次循环都执行一次查询
# 解决方案1: joinedload (JOIN查询)
posts = Post.query.options(joinedload(Post.author)).all()
# 解决方案2: subqueryload (子查询)
posts = Post.query.options(subqueryload(Post.tags)).all()
# 解决方案3: contains_eager (手动控制JOIN)
posts = (
Post.query.join(Post.author)
.filter(User.is_active == True)
.options(contains_eager(Post.author))
.all()
)17.8 用户认证与授权
17.8.1 Flask-Login集成
from flask_login import LoginManager
from .models import User
login_manager = LoginManager()
login_manager.login_view = "auth.login"
login_manager.login_message = "请登录后访问此页面"
login_manager.login_message_category = "info"
login_manager.session_protection = "strong"
@login_manager.user_loader
def load_user(user_id):
return db.session.get(User, int(user_id))
@login_manager.unauthorized_handler
def unauthorized():
from flask import request, jsonify
if (
request.accept_mimetypes.best_match(["application/json", "text/html"])
== "application/json"
):
return jsonify({"error": "未授权访问", "status": 401}), 401
return redirect(url_for("auth.login"))17.8.2 JWT认证
import jwt
from datetime import datetime, timedelta, timezone
from functools import wraps
from flask import request, jsonify, current_app
class TokenService:
@staticmethod
def generate_access_token(user_id, expires_in=3600):
payload = {
"sub": user_id,
"iat": datetime.now(timezone.utc),
"exp": datetime.now(timezone.utc) + timedelta(seconds=expires_in),
"type": "access",
}
return jwt.encode(
payload,
current_app.config["SECRET_KEY"],
algorithm="HS256",
)
@staticmethod
def generate_refresh_token(user_id, expires_in=2592000):
payload = {
"sub": user_id,
"iat": datetime.now(timezone.utc),
"exp": datetime.now(timezone.utc) + timedelta(seconds=expires_in),
"type": "refresh",
}
return jwt.encode(
payload,
current_app.config["SECRET_KEY"],
algorithm="HS256",
)
@staticmethod
def decode_token(token):
try:
payload = jwt.decode(
token,
current_app.config["SECRET_KEY"],
algorithms=["HS256"],
)
return payload
except jwt.ExpiredSignatureError:
return None
except jwt.InvalidTokenError:
return None
def token_required(f):
@wraps(f)
def decorated(*args, **kwargs):
auth_header = request.headers.get("Authorization")
if not auth_header or not auth_header.startswith("Bearer "):
return jsonify({"error": "缺少有效的认证令牌"}), 401
token = auth_header[7:]
payload = TokenService.decode_token(token)
if payload is None:
return jsonify({"error": "令牌无效或已过期"}), 401
if payload.get("type") != "access":
return jsonify({"error": "令牌类型错误"}), 401
from .models import User
user = db.session.get(User, payload["sub"])
if user is None or not user.is_active:
return jsonify({"error": "用户不存在或已禁用"}), 401
request.current_user = user
return f(*args, **kwargs)
return decorated
def admin_required(f):
@wraps(f)
@token_required
def decorated(*args, **kwargs):
if not request.current_user.is_admin:
return jsonify({"error": "需要管理员权限"}), 403
return f(*args, **kwargs)
return decorated17.8.3 权限控制系统
from functools import wraps
from flask import abort
from flask_login import current_user
class Permission:
READ = 0x01
CREATE = 0x02
UPDATE = 0x04
DELETE = 0x08
ADMIN = 0x80
class Role:
GUEST = 0x01
AUTHOR = 0x01 | 0x02 | 0x04
EDITOR = 0x01 | 0x02 | 0x04 | 0x08
ADMIN = 0x01 | 0x02 | 0x04 | 0x08 | 0x80
def permission_required(permission):
def decorator(f):
@wraps(f)
def decorated(*args, **kwargs):
if not current_user.is_authenticated:
abort(401)
if not has_permission(current_user, permission):
abort(403)
return f(*args, **kwargs)
return decorated
return decorator
def has_permission(user, permission):
if user.is_admin:
return True
role_permissions = getattr(user, "role_permissions", 0)
return (role_permissions & permission) == permission
def ownership_required(model_class, id_param="id", owner_attr="author_id"):
def decorator(f):
@wraps(f)
def decorated(*args, **kwargs):
resource_id = kwargs.get(id_param)
resource = model_class.query.get_or_404(resource_id)
if current_user.is_admin:
return f(*args, **kwargs)
owner_id = getattr(resource, owner_attr)
if owner_id != current_user.id:
abort(403)
return f(*args, **kwargs, resource=resource)
return decorated
return decorator17.9 安全防护
17.9.1 CSRF防护
from flask_wtf.csrf import CSRFProtect
csrf = CSRFProtect()AJAX请求中传递CSRF令牌:
// 在所有AJAX请求中自动添加CSRF令牌
const csrfToken = document.querySelector('meta[name="csrf-token"]').content;
fetch("/api/endpoint", {
method: "POST",
headers: {
"Content-Type": "application/json",
"X-CSRFToken": csrfToken,
},
body: JSON.stringify(data),
});<!-- 在基础模板中添加CSRF元标签 -->
<meta name="csrf-token" content="{{ csrf_token() }}">17.9.2 安全头部配置
from flask import Flask
from flask_talisman import Talisman
app = Flask(__name__)
talisman = Talisman(
app,
force_https=True,
strict_transport_security=True,
strict_transport_security_max_age=31536000,
strict_transport_security_include_subdomains=True,
content_security_policy={
"default-src": "'self'",
"script-src": ["'self'", "https://cdn.jsdelivr.net"],
"style-src": ["'self'", "'unsafe-inline'", "https://cdn.jsdelivr.net"],
"img-src": ["'self'", "data:", "https:"],
"font-src": ["'self'", "https://cdn.jsdelivr.net"],
"connect-src": ["'self'", "https://api.example.com"],
"frame-ancestors": "'none'",
},
referrer_policy="strict-origin-when-cross-origin",
)17.9.3 输入验证与SQL注入防护
from markupsafe import escape, Markup
import bleach
def sanitize_html(content, allowed_tags=None, allowed_attributes=None):
default_tags = [
"p", "br", "strong", "em", "u", "h1", "h2", "h3",
"h4", "h5", "h6", "ul", "ol", "li", "a", "blockquote",
"code", "pre", "img",
]
default_attributes = {
"a": ["href", "title"],
"img": ["src", "alt", "width", "height"],
"code": ["class"],
}
tags = allowed_tags or default_tags
attrs = allowed_attributes or default_attributes
cleaned = bleach.clean(
content,
tags=tags,
attributes=attrs,
strip=True,
)
return Markup(cleaned)SQL注入防护的核心原则——始终使用参数化查询:
# 危险!SQL注入漏洞
query = f"SELECT * FROM users WHERE username = '{username}'"
# 安全:ORM方式
user = User.query.filter_by(username=username).first()
# 安全:原生SQL参数化
result = db.session.execute(
text("SELECT * FROM users WHERE username = :username"),
{"username": username},
)17.9.4 密码安全
from werkzeug.security import generate_password_hash, check_password_hash
import secrets
class PasswordPolicy:
MIN_LENGTH = 8
MAX_LENGTH = 128
REQUIRE_UPPERCASE = True
REQUIRE_LOWERCASE = True
REQUIRE_DIGIT = True
REQUIRE_SPECIAL = True
@classmethod
def validate(cls, password):
errors = []
if len(password) < cls.MIN_LENGTH:
errors.append(f"密码长度不能少于{cls.MIN_LENGTH}个字符")
if len(password) > cls.MAX_LENGTH:
errors.append(f"密码长度不能超过{cls.MAX_LENGTH}个字符")
if cls.REQUIRE_UPPERCASE and not any(c.isupper() for c in password):
errors.append("密码必须包含大写字母")
if cls.REQUIRE_LOWERCASE and not any(c.islower() for c in password):
errors.append("密码必须包含小写字母")
if cls.REQUIRE_DIGIT and not any(c.isdigit() for c in password):
errors.append("密码必须包含数字")
if cls.REQUIRE_SPECIAL and not any(
c in "!@#$%^&*()_+-=[]{}|;:,.<>?" for c in password
):
errors.append("密码必须包含特殊字符")
return errors
@staticmethod
def hash_password(password):
return generate_password_hash(password, method="pbkdf2:sha256", salt_length=16)
@staticmethod
def verify_password(password, password_hash):
return check_password_hash(password_hash, password)
@staticmethod
def generate_token(nbytes=32):
return secrets.token_urlsafe(nbytes)17.10 RESTful API开发
17.10.1 API蓝图与序列化
from flask import Blueprint, request, jsonify
from marshmallow import Schema, fields, validate, post_load, EXCLUDE
api_bp = Blueprint("api", __name__, url_prefix="/api/v1")
class UserSchema(Schema):
id = fields.Integer(dump_only=True)
username = fields.String(required=True, validate=validate.Length(min=3, max=20))
email = fields.Email(required=True)
nickname = fields.String(load_default="")
bio = fields.String(load_default="")
created_at = fields.DateTime(dump_only=True)
class Meta:
unknown = EXCLUDE
class PostSchema(Schema):
id = fields.Integer(dump_only=True)
title = fields.String(required=True, validate=validate.Length(min=1, max=200))
content = fields.String(required=True)
summary = fields.String(load_default="")
is_published = fields.Boolean(load_default=False)
slug = fields.String(dump_only=True)
author = fields.Nested(UserSchema, dump_only=True)
tags = fields.List(fields.String(), load_default=[])
created_at = fields.DateTime(dump_only=True)
updated_at = fields.DateTime(dump_only=True)
class PaginatedSchema(Schema):
items = fields.List(fields.Dict())
page = fields.Integer()
per_page = fields.Integer()
total = fields.Integer()
pages = fields.Integer()
has_next = fields.Boolean()
has_prev = fields.Boolean()17.10.2 API视图实现
from flask import Blueprint, request, jsonify
from .models import User, Post, db
from .schemas import UserSchema, PostSchema
from .auth import token_required, admin_required
api_bp = Blueprint("api", __name__, url_prefix="/api/v1")
user_schema = UserSchema()
users_schema = UserSchema(many=True)
post_schema = PostSchema()
posts_schema = PostSchema(many=True)
@api_bp.route("/posts", methods=["GET"])
def get_posts():
page = request.args.get("page", 1, type=int)
per_page = request.args.get("per_page", 10, type=int)
per_page = min(per_page, 100)
pagination = (
Post.query.filter_by(is_published=True)
.order_by(Post.created_at.desc())
.paginate(page=page, per_page=per_page, error_out=False)
)
return jsonify({
"items": posts_schema.dump(pagination.items),
"page": pagination.page,
"per_page": pagination.per_page,
"total": pagination.total,
"pages": pagination.pages,
})
@api_bp.route("/posts/<int:post_id>", methods=["GET"])
def get_post(post_id):
post = Post.query.get_or_404(post_id)
return jsonify(post_schema.dump(post))
@api_bp.route("/posts", methods=["POST"])
@token_required
def create_post():
json_data = request.get_json()
if not json_data:
return jsonify({"error": "未提供输入数据"}), 400
errors = post_schema.validate(json_data)
if errors:
return jsonify({"errors": errors}), 422
data = post_schema.load(json_data)
post = Post(
title=data["title"],
content=data["content"],
summary=data.get("summary", ""),
is_published=data.get("is_published", False),
author_id=request.current_user.id,
)
db.session.add(post)
db.session.commit()
return jsonify(post_schema.dump(post)), 201
@api_bp.route("/posts/<int:post_id>", methods=["PUT"])
@token_required
def update_post(post_id):
post = Post.query.get_or_404(post_id)
if post.author_id != request.current_user.id and not request.current_user.is_admin:
return jsonify({"error": "无权限修改此文章"}), 403
json_data = request.get_json()
if not json_data:
return jsonify({"error": "未提供输入数据"}), 400
errors = post_schema.validate(json_data, partial=True)
if errors:
return jsonify({"errors": errors}), 422
data = post_schema.load(json_data, partial=True)
for key, value in data.items():
setattr(post, key, value)
db.session.commit()
return jsonify(post_schema.dump(post))
@api_bp.route("/posts/<int:post_id>", methods=["DELETE"])
@token_required
def delete_post(post_id):
post = Post.query.get_or_404(post_id)
if post.author_id != request.current_user.id and not request.current_user.is_admin:
return jsonify({"error": "无权限删除此文章"}), 403
db.session.delete(post)
db.session.commit()
return "", 20417.10.3 API错误处理与分页
from flask import jsonify
from werkzeug.exceptions import HTTPException
class APIError(Exception):
def __init__(self, message, status_code=400, payload=None):
super().__init__()
self.message = message
self.status_code = status_code
self.payload = payload
def to_dict(self):
rv = dict(self.payload or {})
rv["error"] = self.message
rv["status"] = self.status_code
return rv
@api_bp.errorhandler(APIError)
def handle_api_error(error):
return jsonify(error.to_dict()), error.status_code
@api_bp.errorhandler(HTTPException)
def handle_http_error(error):
return jsonify({"error": error.description, "status": error.code}), error.code17.11 测试
17.11.1 测试配置与固件
import pytest
from app import create_app, db as _db
from app.models import User
@pytest.fixture(scope="session")
def app():
app = create_app("testing")
yield app
@pytest.fixture(scope="function")
def db(app):
with app.app_context():
_db.create_all()
yield _db
_db.session.remove()
_db.drop_all()
@pytest.fixture(scope="function")
def client(app, db):
return app.test_client()
@pytest.fixture(scope="function")
def sample_user(db):
user = User(username="testuser", email="test@example.com")
user.set_password("TestPass123!")
db.session.add(user)
db.session.commit()
return user
@pytest.fixture(scope="function")
def auth_headers(app, sample_user):
from app.auth import TokenService
with app.app_context():
token = TokenService.generate_access_token(sample_user.id)
return {"Authorization": f"Bearer {token}"}17.11.2 视图测试
class TestAuthViews:
def test_register_page(self, client):
resp = client.get("/auth/register")
assert resp.status_code == 200
assert b"register" in resp.data.lower()
def test_register_success(self, client, db):
resp = client.post("/auth/register", data={
"username": "newuser",
"email": "new@example.com",
"password": "NewPass123!",
"confirm_password": "NewPass123!",
"agree_terms": True,
}, follow_redirects=True)
assert resp.status_code == 200
user = User.query.filter_by(username="newuser").first()
assert user is not None
def test_register_duplicate_username(self, client, sample_user):
resp = client.post("/auth/register", data={
"username": sample_user.username,
"email": "other@example.com",
"password": "NewPass123!",
"confirm_password": "NewPass123!",
"agree_terms": True,
})
assert resp.status_code == 200
assert b"already" in resp.data or b"已被" in resp.data
def test_login_success(self, client, sample_user):
resp = client.post("/auth/login", data={
"username": "testuser",
"password": "TestPass123!",
}, follow_redirects=True)
assert resp.status_code == 200
def test_login_invalid_password(self, client, sample_user):
resp = client.post("/auth/login", data={
"username": "testuser",
"password": "wrongpassword",
})
assert resp.status_code == 200
assert b"error" in resp.data.lower() or b"错误" in resp.data
def test_logout(self, client, sample_user):
client.post("/auth/login", data={
"username": "testuser",
"password": "TestPass123!",
})
resp = client.get("/auth/logout", follow_redirects=True)
assert resp.status_code == 20017.11.3 API测试
import json
class TestPostAPI:
def test_get_posts(self, client):
resp = client.get("/api/v1/posts")
assert resp.status_code == 200
data = resp.get_json()
assert "items" in data
assert "page" in data
def test_create_post_unauthorized(self, client):
resp = client.post("/api/v1/posts", json={"title": "Test"})
assert resp.status_code == 401
def test_create_post_success(self, client, auth_headers):
resp = client.post(
"/api/v1/posts",
json={"title": "Test Post", "content": "Content here"},
headers=auth_headers,
)
assert resp.status_code == 201
data = resp.get_json()
assert data["title"] == "Test Post"
def test_create_post_validation_error(self, client, auth_headers):
resp = client.post(
"/api/v1/posts",
json={"title": ""},
headers=auth_headers,
)
assert resp.status_code == 422
def test_update_post(self, client, auth_headers, db):
from app.models import Post
post = Post(title="Old", content="Old content", author_id=1)
## 17.12 本章小结
本章系统介绍了Flask Web开发的完整体系:
1. **Web框架基础**:HTTP协议、WSGI/ASGI协议、Flask设计哲学
2. **应用核心机制**:应用工厂模式、上下文机制、配置管理
3. **路由系统**:动态路由、蓝图模块化、URL构建
4. **模板引擎**:Jinja2语法、模板继承、自定义过滤器
5. **数据持久层**:SQLAlchemy ORM、关系建模、查询优化
6. **认证授权**:Flask-Login、JWT认证、权限控制
7. **安全防护**:CSRF防护、安全头部、输入验证、密码安全
8. **RESTful API**:API设计、序列化、错误处理
9. **测试**:测试配置、视图测试、API测试
## 17.13 延伸阅读
### 17.13.1 Flask官方资源
- **Flask官方文档** (https://flask.palletsprojects.com/) — Flask权威指南
- **Werkzeug文档** (https://werkzeug.palletsprojects.com/) — WSGI工具库
- **Jinja2文档** (https://jinja.palletsprojects.com/) — 模板引擎
### 17.13.2 扩展与生态
- **Flask-SQLAlchemy** (https://flask-sqlalchemy.palletsprojects.com/) — ORM集成
- **Flask-Migrate** (https://flask-migrate.readthedocs.io/) — 数据库迁移
- **Flask-RESTful** (https://flask-restful.readthedocs.io/) — REST API扩展
- **Flask-Login** (https://flask-login.readthedocs.io/) — 用户认证
### 17.13.3 进阶书籍
- **《Flask Web Development》** (Miguel Grinberg) — Flask开发经典
- **《Python Web Development with Flask》** — 现代Flask实践
- **《Architecture Patterns with Python》** — Python架构模式
### 17.13.4 部署与运维
- **Gunicorn** (https://gunicorn.org/) — WSGI服务器
- **uWSGI** (https://uwsgi-docs.readthedocs.io/) — 应用服务器
- **Docker部署** (https://docs.docker.com/) — 容器化部署
- **Nginx配置** (https://nginx.org/en/docs/) — 反向代理
---
db.session.add(post)
db.session.commit()
resp = client.put(
f"/api/v1/posts/{post.id}",
json={"title": "New Title"},
headers=auth_headers,
)
assert resp.status_code == 200
assert resp.get_json()["title"] == "New Title"
def test_delete_post(self, client, auth_headers, db):
from app.models import Post
post = Post(title="Delete Me", content="Content", author_id=1)
db.session.add(post)
db.session.commit()
resp = client.delete(
f"/api/v1/posts/{post.id}",
headers=auth_headers,
)
assert resp.status_code == 20417.12 生产部署
17.12.1 Gunicorn部署
pip install gunicorn
gunicorn -w 4 -b 0.0.0.0:8000 "app:create_app()"
gunicorn -w 4 -b 0.0.0.0:8000 --worker-class gthread --threads 2 "app:create_app()"
gunicorn -w 4 -k uvicorn.workers.UvicornWorker -b 0.0.0.0:8000 "app:create_app()"Gunicorn配置文件 gunicorn.conf.py:
import multiprocessing
bind = "0.0.0.0:8000"
workers = multiprocessing.cpu_count() * 2 + 1
worker_class = "gthread"
threads = 2
timeout = 120
keepalive = 5
accesslog = "-"
errorlog = "-"
loglevel = "info"
preload_app = True
max_requests = 5000
max_requests_jitter = 50017.12.2 Docker容器化
FROM python:3.12-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
RUN useradd --create-home appuser
USER appuser
EXPOSE 8000
CMD ["gunicorn", "-c", "gunicorn.conf.py", "app:create_app()"]version: "3.8"
services:
web:
build: .
ports:
- "8000:8000"
environment:
- FLASK_ENV=production
- SECRET_KEY=${SECRET_KEY}
- DATABASE_URL=postgresql://user:pass@db:5432/app
depends_on:
- db
- redis
restart: unless-stopped
db:
image: postgres:16-alpine
environment:
POSTGRES_DB: app
POSTGRES_USER: user
POSTGRES_PASSWORD: pass
volumes:
- pgdata:/var/lib/postgresql/data
redis:
image: redis:7-alpine
restart: unless-stopped
nginx:
image: nginx:alpine
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf:ro
- ./certs:/etc/nginx/certs:ro
depends_on:
- web
volumes:
pgdata:17.12.3 Nginx反向代理
upstream flask_app {
server web:8000;
}
server {
listen 80;
server_name example.com;
return 301 https://$server_name$request_uri;
}
server {
listen 443 ssl http2;
server_name example.com;
ssl_certificate /etc/nginx/certs/cert.pem;
ssl_certificate_key /etc/nginx/certs/key.pem;
client_max_body_size 16M;
location / {
proxy_pass http://flask_app;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
location /static/ {
alias /app/static/;
expires 30d;
add_header Cache-Control "public, immutable";
}
location /uploads/ {
alias /app/uploads/;
expires 7d;
}
}17.12.4 性能优化策略
from flask import Flask
from flask_caching import Cache
app = Flask(__name__)
app.config["CACHE_TYPE"] = "RedisCache"
app.config["CACHE_REDIS_URL"] = "redis://localhost:6379/0"
app.config["CACHE_DEFAULT_TIMEOUT"] = 300
cache = Cache(app)
@app.route("/api/posts")
@cache.cached(timeout=60, query_string=True)
def get_posts():
from .models import Post
posts = Post.query.filter_by(is_published=True).all()
return jsonify([p.to_dict() for p in posts])
@app.route("/api/posts/<int:post_id>")
@cache.cached(timeout=300)
def get_post(post_id):
from .models import Post
post = Post.query.get_or_404(post_id)
return jsonify(post.to_dict())数据库连接池优化:
app.config["SQLALCHEMY_ENGINE_OPTIONS"] = {
"pool_size": 20,
"max_overflow": 10,
"pool_recycle": 3600,
"pool_pre_ping": True,
"echo_pool": False,
}17.13 前沿技术动态
17.13.1 Flask 3.0+新特性
Flask 3.0于2023年底发布,带来了多项重要改进:
- 简化扩展初始化:扩展现在支持延迟初始化,不再需要传递app实例
- 改进的异步支持:原生支持
async def视图函数 - 更严格的路由匹配:修复了部分边缘情况的路由冲突
- 改进的JSON支持:默认使用Python内置的
json模块,支持自定义JSON编码器
from flask import Flask
app = Flask(__name__)
app.json.encoder = lambda x: str(x)
@app.route("/async-data")
async def async_data():
import httpx
async with httpx.AsyncClient() as client:
resp = await client.get("https://api.example.com/data")
return resp.json()17.13.2 Flask与前端框架集成
现代Flask应用常与Vue.js、React等前端框架配合使用:
from flask import Flask, send_from_directory
app = Flask(__name__, static_folder="dist")
@app.route("/", defaults={"path": ""})
@app.route("/<path:path>")
def serve_spa(path):
if path and os.path.exists(os.path.join("dist", path)):
return send_from_directory("dist", path)
return send_from_directory("dist", "index.html")17.13.3 微服务架构中的Flask
Flask因其轻量级特性,在微服务架构中广泛使用:
from flask import Flask, jsonify
from flask_consulate import Consul
app = Flask(__name__)
consul = Consul(app, host="consul-server", port=8500)
consul.register_service(
name="user-service",
port=8000,
tags=["flask", "user", "api"],
)
@app.route("/health")
def health():
return jsonify({"status": "healthy"})
@app.route("/api/users/<int:user_id>")
def get_user(user_id):
return jsonify({"id": user_id, "username": "example"})17.14 本章小结
本章系统阐述了Flask Web开发的核心知识体系:
- Web框架基础:深入理解WSGI/ASGI协议原理,掌握HTTP请求-响应生命周期
- Flask核心机制:应用工厂模式、上下文机制(应用上下文与请求上下文)、配置管理
- 路由系统:动态路由、自定义转换器、RESTful设计、蓝图模块化
- 请求与响应:请求对象属性、响应构建、流式响应、请求钩子
- 模板引擎:Jinja2继承体系、自定义过滤器、模板宏
- 表单处理:Flask-WTF验证、文件上传安全处理
- 数据库集成:SQLAlchemy ORM、关系建模、迁移管理、N+1查询优化
- 认证授权:Flask-Login会话认证、JWT令牌认证、位运算权限控制
- 安全防护:CSRF、XSS、SQL注入防御、安全头部、密码安全
- RESTful API:Marshmallow序列化、API错误处理、分页
- 测试:pytest固件、视图测试、API测试
- 生产部署:Gunicorn、Docker容器化、Nginx反向代理、缓存优化
17.15 习题与项目练习
基础题
使用应用工厂模式创建一个Flask应用,包含首页、关于页面和联系页面三个蓝图。
实现一个支持用户注册、登录、退出的完整认证系统,要求包含密码强度验证和CSRF防护。
使用Flask-SQLAlchemy设计一个博客系统的数据模型,包含用户、文章、分类、标签和评论五个模型及其关联关系。
进阶题
实现一个完整的RESTful API,支持文章的CRUD操作,包含JWT认证、Marshmallow序列化验证和分页功能。
设计并实现一个基于位运算的权限控制系统,支持角色(访客、作者、编辑、管理员)和权限(读、写、删、管理)的灵活组合。
实现文件上传功能,要求支持图片和文档两种类型,包含文件类型验证、大小限制、安全文件名生成和存储路径管理。
综合项目
博客平台项目:构建一个功能完整的博客平台,包含以下功能:
- 用户注册/登录(邮箱验证)
- 文章发布/编辑/删除(Markdown支持)
- 分类和标签管理
- 评论系统(支持嵌套回复)
- 全文搜索
- 管理后台
- RESTful API
- 完整的测试覆盖
任务管理系统项目:构建一个团队协作任务管理系统,包含:
- 用户认证与权限管理
- 项目和任务CRUD
- 任务分配与状态流转
- 实时通知(WebSocket)
- API接口
- Docker部署配置
思考题
Flask的"微框架"设计哲学在什么场景下优于Django的"全栈框架"设计?在什么场景下相反?请从技术架构、团队协作、项目规模三个维度分析。
在微服务架构中,Flask应用如何实现服务发现、配置中心、链路追踪等分布式系统必需的能力?请设计一个基于Flask的微服务技术栈方案。