Skip to content

第17章 Flask Web开发

学习目标

完成本章学习后,读者应能够:

  1. 理解Web框架的本质:掌握WSGI/ASGI协议原理,理解HTTP请求-响应生命周期在框架中的实现机制
  2. 掌握Flask架构设计:深入理解Flask的微框架设计哲学、应用上下文与请求上下文机制
  3. 熟练运用路由系统:实现RESTful路由设计、动态路由、蓝图模块化与URL构建策略
  4. 精通模板引擎:掌握Jinja2模板继承、自定义过滤器、宏定义与模板性能优化
  5. 构建数据持久层:运用Flask-SQLAlchemy实现ORM映射、关系建模、迁移管理与查询优化
  6. 实现认证授权体系:基于Flask-Login构建完整的用户认证、会话管理与权限控制系统
  7. 掌握工程化实践:运用应用工厂模式、蓝图架构、配置管理实现可维护的大型应用
  8. 实施安全防护:理解并防御XSS、CSRF、SQL注入、点击劫持等Web安全威胁
  9. 完成生产部署:掌握Gunicorn/uWSGI部署、容器化、性能调优与监控体系

17.1 Web框架基础理论

17.1.1 HTTP协议与Web应用架构

Web应用的本质是遵循HTTP协议进行请求-响应交互的程序。理解HTTP协议的工作机制是掌握任何Web框架的前提。

客户端 (Browser)                          服务端 (Web Server)
    |                                          |
    |  1. DNS解析 → TCP三次握手 → TLS协商      |
    |                                          |
    |  2. HTTP请求 ──────────────────────────> |
    |     GET /api/users HTTP/1.1              |
    |     Host: example.com                    |
    |     Accept: application/json             |
    |                                          |
    |  3. 服务器处理请求                        |
    |     WSGI Server → Flask App → View Func  |
    |                                          |
    |  4. HTTP响应 <────────────────────────── |
    |     HTTP/1.1 200 OK                      |
    |     Content-Type: application/json       |
    |     {"users": [...]}                     |
    |                                          |
    |  5. TCP四次挥手                           |

HTTP请求方法与语义对照:

方法语义幂等性安全性典型用途
GET获取资源查询数据
POST创建资源提交表单、创建记录
PUT全量更新替换整个资源
PATCH增量更新修改部分字段
DELETE删除资源删除记录
HEAD获取元信息检查资源是否存在
OPTIONS获取支持方法CORS预检请求

17.1.2 WSGI协议深度解析

WSGI(Web Server Gateway Interface,PEP 3333)是Python Web应用与服务器之间的标准接口协议。Flask基于WSGI构建,理解WSGI对掌握Flask的运行机制至关重要。

python
def simple_wsgi_app(environ, start_response):
    status = "200 OK"
    headers = [("Content-Type", "text/plain; charset=utf-8")]
    start_response(status, headers)
    return [b"Hello, WSGI!"]

WSGI接口的两个核心组件:

  • environ:包含请求信息的字典,由WSGI服务器填充
  • start_response:可调用对象,用于设置响应状态码和头部
python
def detailed_wsgi_app(environ, start_response):
    method = environ.get("REQUEST_METHOD", "GET")
    path = environ.get("PATH_INFO", "/")
    query_string = environ.get("QUERY_STRING", "")
    content_type = environ.get("CONTENT_TYPE", "")
    content_length = int(environ.get("CONTENT_LENGTH", 0) or 0)
    server_name = environ.get("SERVER_NAME", "localhost")
    server_port = environ.get("SERVER_PORT", "80")

    request_body = environ["wsgi.input"].read(content_length) if content_length > 0 else b""

    response_body = (
        f"Method: {method}\n"
        f"Path: {path}\n"
        f"Query: {query_string}\n"
        f"Content-Type: {content_type}\n"
        f"Body: {request_body.decode('utf-8')}\n"
        f"Server: {server_name}:{server_port}\n"
    ).encode("utf-8")

    status = "200 OK"
    headers = [
        ("Content-Type", "text/plain; charset=utf-8"),
        ("Content-Length", str(len(response_body))),
    ]
    start_response(status, headers)
    return [response_body]

WSGI中间件模式——在应用与服务器之间插入处理层:

python
class TimingMiddleware:
    def __init__(self, app):
        self.app = app

    def __call__(self, environ, start_response):
        import time
        start = time.perf_counter()

        def custom_start_response(status, headers, exc_info=None):
            elapsed = time.perf_counter() - start
            headers.append(("X-Response-Time", f"{elapsed:.4f}s"))
            return start_response(status, headers, exc_info)

        return self.app(environ, custom_start_response)


class RequestLoggingMiddleware:
    def __init__(self, app, logger=None):
        self.app = app
        self.logger = logger

    def __call__(self, environ, start_response):
        method = environ.get("REQUEST_METHOD", "GET")
        path = environ.get("PATH_INFO", "/")
        query = environ.get("QUERY_STRING", "")
        remote = environ.get("REMOTE_ADDR", "-")

        if self.logger:
            self.logger.info(f"{remote} {method} {path}?{query}")

        return self.app(environ, start_response)

17.1.3 从WSGI到ASGI:异步Web的演进

ASGI(Asynchronous Server Gateway Interface)是WSGI的异步继承者,支持WebSocket、HTTP/2和长轮询等场景:

python
async def asgi_app(scope, receive, send):
    if scope["type"] == "http":
        await send({
            "type": "http.response.start",
            "status": 200,
            "headers": [[b"content-type", b"text/plain"]],
        })
        await send({
            "type": "http.response.body",
            "body": b"Hello, ASGI!",
        })
    elif scope["type"] == "websocket":
        while True:
            message = await receive()
            if message["type"] == "websocket.connect":
                await send({"type": "websocket.accept"})
            elif message["type"] == "websocket.receive":
                await send({
                    "type": "websocket.send",
                    "text": f"Echo: {message.get('text', '')}",
                })
            elif message["type"] == "websocket.disconnect":
                break

Flask 3.0+已开始支持ASGI模式,可通过异步视图函数处理并发请求:

python
from flask import Flask

app = Flask(__name__)

@app.route("/async-endpoint")
async def async_handler():
    import asyncio
    await asyncio.sleep(0.1)
    return {"message": "Async response"}

17.1.4 Flask的设计哲学

Flask遵循"微框架"(Microframework)设计理念,其核心原则包括:

  1. 显式优于隐式:不强制项目结构,开发者拥有完全控制权
  2. 最小核心:核心仅提供路由、模板、请求上下文,其余通过扩展实现
  3. 可组合性:通过蓝图和扩展机制构建复杂应用
  4. 开发者友好:提供优秀的调试工具和详细的错误信息

Flask核心依赖仅两个库:

依赖功能说明
WerkzeugWSGI工具库提供路由、请求/响应对象、开发服务器、调试器
Jinja2模板引擎提供模板渲染、继承、过滤器、宏等
Flask应用架构层次:

┌─────────────────────────────────────────┐
│              Flask Application           │
│  ┌───────────┐  ┌──────────┐  ┌──────┐ │
│  │  Routing   │  │ Template │  │ Ctx  │ │
│  │  System    │  │  Engine  │  │ Mgmt │ │
│  └─────┬─────┘  └────┬─────┘  └──┬───┘ │
│        │              │           │      │
│  ┌─────┴─────┐  ┌────┴─────┐  ┌─┴───┐ │
│  │  Werkzeug  │  │  Jinja2  │  │ Ctx │ │
│  │  Routing   │  │ Template │  │ Loc │ │
│  └───────────┘  └──────────┘  └─────┘ │
│                                         │
│  ┌─────────────────────────────────────┐│
│  │         Extension System            ││
│  │  SQLAlchemy │ Login │ WTForms │ ... ││
│  └─────────────────────────────────────┘│
└─────────────────────────────────────────┘

17.2 Flask应用核心机制

17.2.1 应用初始化与配置

python
from flask import Flask
import os


class Config:
    SECRET_KEY = os.environ.get("SECRET_KEY", "dev-key-change-in-production")
    SQLALCHEMY_DATABASE_URI = os.environ.get(
        "DATABASE_URL", "sqlite:///app.db"
    )
    SQLALCHEMY_TRACK_MODIFICATIONS = False
    SQLALCHEMY_ENGINE_OPTIONS = {
        "pool_size": 10,
        "pool_recycle": 3600,
        "pool_pre_ping": True,
    }
    JSON_SORT_KEYS = False
    MAX_CONTENT_LENGTH = 16 * 1024 * 1024


class DevelopmentConfig(Config):
    DEBUG = True
    SQLALCHEMY_ECHO = True


class TestingConfig(Config):
    TESTING = True
    SQLALCHEMY_DATABASE_URI = "sqlite:///:memory:"
    WTF_CSRF_ENABLED = False


class ProductionConfig(Config):
    DEBUG = False
    SQLALCHEMY_ECHO = False


config_map = {
    "development": DevelopmentConfig,
    "testing": TestingConfig,
    "production": ProductionConfig,
    "default": DevelopmentConfig,
}

17.2.2 应用工厂模式

应用工厂模式(Application Factory Pattern)是Flask推荐的项目组织方式,它将应用创建过程封装为函数,支持:

  • 多配置环境切换
  • 避免循环导入
  • 便于测试时创建独立应用实例
  • 支持多应用实例运行
python
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
from flask_login import LoginManager
from flask_cors import CORS

db = SQLAlchemy()
migrate = Migrate()
login_manager = LoginManager()
cors = CORS()


def create_app(config_name="default"):
    app = Flask(__name__)
    app.config.from_object(config_map[config_name])

    db.init_app(app)
    migrate.init_app(app, db)
    login_manager.init_app(app)
    cors.init_app(app)

    login_manager.login_view = "auth.login"
    login_manager.login_message_category = "info"

    from .main import main_bp
    from .auth import auth_bp
    from .api import api_bp

    app.register_blueprint(main_bp)
    app.register_blueprint(auth_bp, url_prefix="/auth")
    app.register_blueprint(api_bp, url_prefix="/api/v1")

    register_error_handlers(app)
    register_template_filters(app)
    register_cli_commands(app)
    register_hooks(app)

    return app


def register_error_handlers(app):
    from flask import render_template, jsonify

    def wants_json_response():
        from flask import request
        return (
            request.accept_mimetypes.best_match(["application/json", "text/html"])
            == "application/json"
        )

    @app.errorhandler(404)
    def not_found(error):
        if wants_json_response():
            return jsonify({"error": "Not found", "status": 404}), 404
        return render_template("errors/404.html"), 404

    @app.errorhandler(500)
    def internal_error(error):
        db.session.rollback()
        if wants_json_response():
            return jsonify({"error": "Internal server error", "status": 500}), 500
        return render_template("errors/500.html"), 500

    @app.errorhandler(403)
    def forbidden(error):
        if wants_json_response():
            return jsonify({"error": "Forbidden", "status": 403}), 403
        return render_template("errors/403.html"), 403


def register_template_filters(app):
    @app.template_filter("datetime_format")
    def datetime_format(value, format="%Y-%m-%d %H:%M"):
        if value is None:
            return ""
        return value.strftime(format)

    @app.template_filter("truncate_chars")
    def truncate_chars(value, length=100):
        if len(value) <= length:
            return value
        return value[:length].rsplit(" ", 1)[0] + "..."


def register_cli_commands(app):
    import click

    @app.cli.command("init-db")
    def init_db():
        db.create_all()
        click.echo("数据库初始化完成。")

    @app.cli.command("create-admin")
    @click.argument("username")
    @click.argument("email")
    @click.argument("password")
    def create_admin(username, email, password):
        from .models import User
        user = User(username=username, email=email, is_admin=True)
        user.set_password(password)
        db.session.add(user)
        db.session.commit()
        click.echo(f"管理员用户 {username} 创建成功。")


def register_hooks(app):
    from flask import g, request
    import time

    @app.before_request
    def before_request():
        g.start_time = time.perf_counter()

    @app.after_request
    def after_request(response):
        if hasattr(g, "start_time"):
            elapsed = time.perf_counter() - g.start_time
            response.headers["X-Response-Time"] = f"{elapsed:.4f}s"
        return response

17.2.3 上下文机制深度解析

Flask的上下文机制是其架构的核心,理解上下文对排查"Working outside of application context"等错误至关重要。

Flask存在两种上下文:

上下文类型作用域包含对象生命周期
应用上下文应用级别current_appg请求期间或手动推送
请求上下文请求级别requestsession单个HTTP请求
python
from flask import Flask, current_app, g, request

app = Flask(__name__)
app.config["APP_NAME"] = "MyApp"


@app.route("/context-demo")
def context_demo():
    app_ctx_info = {
        "app_name": current_app.name,
        "config_app_name": current_app.config["APP_NAME"],
        "debug_mode": current_app.debug,
    }

    request_ctx_info = {
        "method": request.method,
        "url": request.url,
        "endpoint": request.endpoint,
        "blueprint": request.blueprint,
        "view_args": request.view_args,
    }

    g.request_id = "req-12345"
    g.user_ip = request.remote_addr

    return {
        "app_context": app_ctx_info,
        "request_context": request_ctx_info,
        "g_data": {"request_id": g.request_id, "user_ip": g.user_ip},
    }

手动推送上下文(用于CLI脚本、测试等场景):

python
from flask import Flask

app = Flask(__name__)
app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///test.db"


with app.app_context():
    from flask import current_app
    print(current_app.name)
    print(current_app.config["SQLALCHEMY_DATABASE_URI"])


with app.test_request_context("/api/test?foo=bar", method="POST"):
    from flask import request
    print(request.method)
    print(request.url)
    print(request.args.get("foo"))

上下文栈的实现原理(简化版):

python
class AppContext:
    def __init__(self, app):
        self.app = app
        self.g = app.app_ctx_globals_class()
        self._refcnt = 0

    def push(self):
        self._refcnt += 1
        _app_ctx_stack.push(self)

    def pop(self, exc=None):
        self._refcnt -= 1
        if self._refcnt <= 0:
            _app_ctx_stack.pop()

    def __enter__(self):
        self.push()
        return self

    def __exit__(self, *args):
        self.pop()


class RequestContext:
    def __init__(self, app, environ, request=None, session=None):
        self.app = app
        self.request = request or app.request_class(environ)
        self.session = session

    def push(self):
        app_ctx = _app_ctx_stack.top
        if app_ctx is None or app_ctx.app != self.app:
            app_ctx = self.app.app_context()
            app_ctx.push()
            self._got_app_context = True
        _request_ctx_stack.push(self)

    def pop(self, exc=None):
        _request_ctx_stack.pop()
        if getattr(self, "_got_app_context", False):
            app_ctx = _app_ctx_stack.pop()
            app_ctx.pop(exc)

17.3 路由系统

17.3.1 路由注册与匹配机制

Flask使用Werkzeug的路由系统,基于MapRule实现URL匹配:

python
from flask import Flask

app = Flask(__name__)


@app.route("/")
def index():
    return "Home"


@app.route("/hello")
def hello():
    return "Hello"


@app.route("/user/<username>")
def user_profile(username):
    return f"User: {username}"


@app.route("/post/<int:post_id>")
def post_detail(post_id):
    return f"Post: {post_id}"


@app.route("/post/<int:post_id>/comment/<int:comment_id>")
def post_comment(post_id, comment_id):
    return f"Post {post_id}, Comment {comment_id}"


@app.route("/path/<path:filepath>")
def serve_path(filepath):
    return f"Path: {filepath}"


@app.route("/download/<uuid:file_id>")
def download(file_id):
    return f"File ID: {file_id}"

路由参数转换器:

转换器匹配规则示例URLPython类型
string任意文本(不含//user/<name>str
int正整数/post/<int:id>int
float浮点数/price/<float:p>float
path任意文本(含//file/<path:p>str
uuidUUID字符串/item/<uuid:id>uuid.UUID
any指定选项之一/<any(a,b):page>str

自定义转换器:

python
from werkzeug.routing import BaseConverter


class ListConverter(BaseConverter):
    def to_python(self, value):
        return value.split(",")

    def to_url(self, values):
        return ",".join(str(v) for v in values)


class RegexConverter(BaseConverter):
    def __init__(self, url_map, regex_pattern):
        super().__init__(url_map)
        self.regex = regex_pattern


app.url_map.converters["list"] = ListConverter
app.url_map.converters["regex"] = RegexConverter


@app.route("/items/<list:items>")
def items_view(items):
    return {"items": items}


@app.route("/version/<regex(r'\d+\.\d+\.\d+'):version>")
def version_info(version):
    return {"version": version}

17.3.2 HTTP方法与RESTful路由设计

python
from flask import Flask, request, jsonify

app = Flask(__name__)


@app.route("/api/articles", methods=["GET", "POST"])
def articles_collection():
    if request.method == "GET":
        page = request.args.get("page", 1, type=int)
        per_page = request.args.get("per_page", 10, type=int)
        return jsonify({
            "articles": [],
            "page": page,
            "per_page": per_page,
        })
    elif request.method == "POST":
        data = request.get_json()
        if not data or "title" not in data:
            return jsonify({"error": "标题不能为空"}), 400
        return jsonify({"id": 1, "title": data["title"]}), 201


@app.route("/api/articles/<int:article_id>", methods=["GET", "PUT", "DELETE"])
def article_resource(article_id):
    if request.method == "GET":
        return jsonify({"id": article_id, "title": "示例文章"})
    elif request.method == "PUT":
        data = request.get_json()
        return jsonify({"id": article_id, "title": data.get("title", "")})
    elif request.method == "DELETE":
        return "", 204


@app.route("/api/articles/<int:article_id>/comments", methods=["GET", "POST"])
def article_comments(article_id):
    if request.method == "GET":
        return jsonify({"article_id": article_id, "comments": []})
    elif request.method == "POST":
        data = request.get_json()
        return jsonify({
            "id": 1,
            "article_id": article_id,
            "content": data.get("content", ""),
        }), 201

使用MethodView实现类视图,更优雅地组织RESTful API:

python
from flask import Flask, request, jsonify
from flask.views import MethodView

app = Flask(__name__)


class ArticleAPI(MethodView):
    def get(self, article_id):
        if article_id is None:
            page = request.args.get("page", 1, type=int)
            return jsonify({"articles": [], "page": page})
        return jsonify({"id": article_id, "title": "示例文章"})

    def post(self):
        data = request.get_json()
        if not data or "title" not in data:
            return jsonify({"error": "标题不能为空"}), 400
        return jsonify({"id": 1, "title": data["title"]}), 201

    def put(self, article_id):
        data = request.get_json()
        return jsonify({"id": article_id, "title": data.get("title", "")})

    def delete(self, article_id):
        return "", 204


article_view = ArticleAPI.as_view("article_api")
app.add_url_rule(
    "/api/articles",
    defaults={"article_id": None},
    view_func=article_view,
    methods=["GET"],
)
app.add_url_rule("/api/articles", view_func=article_view, methods=["POST"])
app.add_url_rule(
    "/api/articles/<int:article_id>",
    view_func=article_view,
    methods=["GET", "PUT", "DELETE"],
)

17.3.3 蓝图(Blueprint)模块化架构

蓝图是Flask实现模块化的核心机制,它允许将应用拆分为独立的功能模块:

python
from flask import Blueprint

auth_bp = Blueprint(
    "auth",
    __name__,
    template_folder="templates",
    static_folder="static",
)


@auth_bp.route("/login", methods=["GET", "POST"])
def login():
    return "Login Page"


@auth_bp.route("/register", methods=["GET", "POST"])
def register():
    return "Register Page"


@auth_bp.route("/logout")
def logout():
    return "Logged out"
python
from flask import Blueprint, jsonify

api_bp = Blueprint("api", __name__, url_prefix="/api/v1")


@api_bp.route("/status")
def status():
    return jsonify({"status": "ok", "version": "1.0"})


@api_bp.route("/users", methods=["GET"])
def list_users():
    return jsonify({"users": []})

蓝图嵌套——构建多层模块结构:

python
from flask import Blueprint

api_v1_bp = Blueprint("api_v1", __name__, url_prefix="/api/v1")
api_v2_bp = Blueprint("api_v2", __name__, url_prefix="/api/v2")

users_v1_bp = Blueprint("users_v1", __name__)
articles_v1_bp = Blueprint("articles_v1", __name__)


@users_v1_bp.route("/users")
def list_users():
    return {"version": "v1", "users": []}


@articles_v1_bp.route("/articles")
def list_articles():
    return {"version": "v1", "articles": []}


api_v1_bp.register_blueprint(users_v1_bp)
api_v1_bp.register_blueprint(articles_v1_bp)

17.4 请求与响应

17.4.1 请求对象详解

python
from flask import Flask, request

app = Flask(__name__)


@app.route("/request-info", methods=["GET", "POST", "PUT"])
def request_info():
    info = {
        "method": request.method,
        "url": request.url,
        "base_url": request.base_url,
        "url_root": request.url_root,
        "path": request.path,
        "full_path": request.full_path,
        "scheme": request.scheme,
        "is_secure": request.is_secure,
        "host": request.host,
        "remote_addr": request.remote_addr,
    }
    return info


@app.route("/query-demo")
def query_demo():
    search = request.args.get("q", "")
    page = request.args.get("page", 1, type=int)
    sort = request.args.get("sort", "created_at")
    tags = request.args.getlist("tag")
    return {
        "search": search,
        "page": page,
        "sort": sort,
        "tags": tags,
    }


@app.route("/form-demo", methods=["POST"])
def form_demo():
    username = request.form.get("username")
    password = request.form.get("password")
    hobbies = request.form.getlist("hobby")
    return {
        "username": username,
        "hobbies": hobbies,
    }


@app.route("/json-demo", methods=["POST"])
def json_demo():
    data = request.get_json(silent=True)
    if data is None:
        return {"error": "Invalid JSON"}, 400
    return {"received": data, "content_type": request.content_type}


@app.route("/headers-demo")
def headers_demo():
    return {
        "user_agent": request.user_agent.string,
        "user_agent_browser": request.user_agent.browser,
        "user_agent_platform": request.user_agent.platform,
        "accept_languages": [str(l) for l in request.accept_languages],
        "authorization": request.headers.get("Authorization"),
    }


@app.route("/cookie-demo")
def cookie_demo():
    all_cookies = {k: v for k, v in request.cookies.items()}
    session_id = request.cookies.get("session_id")
    return {"all_cookies": all_cookies, "session_id": session_id}

17.4.2 响应构建

python
from flask import (
    Flask,
    make_response,
    jsonify,
    Response,
    stream_with_context,
    redirect,
    url_for,
    send_file,
)
import json
import time
import io
import csv

app = Flask(__name__)


@app.route("/response-text")
def text_response():
    return "Plain text response"


@app.route("/response-json")
def json_response():
    return jsonify(
        message="Success",
        data={"id": 1, "name": "Example"},
        status="ok",
    )


@app.route("/response-custom")
def custom_response():
    resp = make_response(jsonify({"message": "Created"}), 201)
    resp.headers["X-Custom-Header"] = "CustomValue"
    resp.headers["Cache-Control"] = "no-cache, no-store, must-revalidate"
    return resp


@app.route("/response-cookie")
def cookie_response():
    resp = make_response("Cookie已设置")
    resp.set_cookie(
        "session_token",
        value="abc123def456",
        max_age=3600,
        httponly=True,
        secure=True,
        samesite="Lax",
    )
    return resp


@app.route("/response-redirect")
def redirect_response():
    return redirect(url_for("text_response"))


@app.route("/response-stream")
def stream_response():
    def generate():
        for i in range(10):
            json_data = json.dumps({"count": i, "timestamp": time.time()})
            yield f"data: {json_data}\n\n"
            time.sleep(0.5)

    return Response(
        stream_with_context(generate()),
        mimetype="text/event-stream",
        headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
    )


@app.route("/response-download")
def download_response():
    buffer = io.BytesIO(b"Hello, this is a downloadable file content.")
    return send_file(
        buffer,
        as_attachment=True,
        download_name="example.txt",
        mimetype="text/plain",
    )


@app.route("/response-csv")
def csv_response():
    output = io.StringIO()
    writer = csv.writer(output)
    writer.writerow(["ID", "Name", "Email"])
    writer.writerow([1, "Alice", "alice@example.com"])
    writer.writerow([2, "Bob", "bob@example.com"])

    resp = make_response(output.getvalue())
    resp.headers["Content-Type"] = "text/csv; charset=utf-8"
    resp.headers["Content-Disposition"] = "attachment; filename=users.csv"
    return resp

17.4.3 请求钩子(Hooks)

请求钩子允许在请求处理的不同阶段插入通用逻辑:

python
from flask import Flask, g, request
import time
import uuid

app = Flask(__name__)


@app.before_request
def before_request_hook():
    g.request_id = str(uuid.uuid4())[:8]
    g.start_time = time.perf_counter()
    g.user = None

    auth_header = request.headers.get("Authorization")
    if auth_header and auth_header.startswith("Bearer "):
        token = auth_header[7:]
        g.user = verify_token(token)


@app.after_request
def after_request_hook(response):
    if hasattr(g, "request_id"):
        response.headers["X-Request-ID"] = g.request_id
    if hasattr(g, "start_time"):
        elapsed = time.perf_counter() - g.start_time
        response.headers["X-Response-Time"] = f"{elapsed:.4f}s"
    return response


@app.teardown_request
def teardown_request_hook(exception=None):
    if exception:
        app.logger.error(
            f"请求异常: {exception}, "
            f"Request-ID: {getattr(g, 'request_id', 'N/A')}"
        )


@app.teardown_appcontext
def teardown_appcontext_hook(exception=None):
    from . import db
    if exception:
        db.session.rollback()
    db.session.remove()


def verify_token(token):
    if token == "valid-token":
        return {"id": 1, "username": "admin"}
    return None

钩子执行顺序:

请求进入


before_first_request  (仅首次请求)


before_request        (按注册顺序执行)


视图函数处理


after_request         (按注册逆序执行)


teardown_request      (无论是否异常都执行)


teardown_appcontext   (清理应用上下文)

17.5 Jinja2模板引擎

17.5.1 模板渲染与继承

Jinja2是Flask的默认模板引擎,提供模板继承、宏、过滤器等强大功能。

基础模板 templates/base.html

html
<!DOCTYPE html>
<html lang="zh-CN">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>{% block title %}MyApp{% endblock %}</title>
    <link rel="stylesheet" href="{{ url_for('static', filename='css/style.css') }}">
    {% block extra_css %}{% endblock %}
</head>
<body>
    <nav class="navbar">
        <div class="container">
            <a class="brand" href="{{ url_for('main.index') }}">MyApp</a>
            <ul class="nav-links">
                <li><a href="{{ url_for('main.index') }}">首页</a></li>
                <li><a href="{{ url_for('main.about') }}">关于</a></li>
                {% if current_user.is_authenticated %}
                <li><a href="{{ url_for('auth.profile') }}">{{ current_user.username }}</a></li>
                <li><a href="{{ url_for('auth.logout') }}">退出</a></li>
                {% else %}
                <li><a href="{{ url_for('auth.login') }}">登录</a></li>
                <li><a href="{{ url_for('auth.register') }}">注册</a></li>
                {% endif %}
            </ul>
        </div>
    </nav>

    <main class="container">
        {% with messages = get_flashed_messages(with_categories=true) %}
        {% if messages %}
        <div class="flash-messages">
            {% for category, message in messages %}
            <div class="alert alert-{{ category }}">{{ message }}</div>
            {% endfor %}
        </div>
        {% endif %}
        {% endwith %}

        {% block content %}{% endblock %}
    </main>

    <footer class="footer">
        <div class="container">
            <p>&copy; 2026 MyApp. All rights reserved.</p>
        </div>
    </footer>

    <script src="{{ url_for('static', filename='js/main.js') }}"></script>
    {% block extra_js %}{% endblock %}
</body>
</html>

子模板 templates/main/index.html

html
{% extends "base.html" %}

{% block title %}首页 - {{ super() }}{% endblock %}

{% block content %}
<section class="hero">
    <h1>欢迎来到 MyApp</h1>
    <p>一个基于Flask构建的Web应用</p>
</section>

<section class="features">
    <h2>核心功能</h2>
    <div class="grid">
        {% for feature in features %}
        <div class="card">
            <h3>{{ feature.title }}</h3>
            <p>{{ feature.description }}</p>
            <a href="{{ feature.url }}">了解更多 &rarr;</a>
        </div>
        {% endfor %}
    </div>
</section>
{% endblock %}

列表模板 templates/main/articles.html

html
{% extends "base.html" %}

{% block title %}文章列表 - {{ super() }}{% endblock %}

{% block content %}
<h1>文章列表</h1>

{% if articles %}
<table class="table">
    <thead>
        <tr>
            <th>标题</th>
            <th>作者</th>
            <th>发布时间</th>
            <th>操作</th>
        </tr>
    </thead>
    <tbody>
        {% for article in articles %}
        <tr>
            <td><a href="{{ url_for('main.article_detail', article_id=article.id) }}">{{ article.title }}</a></td>
            <td>{{ article.author.username }}</td>
            <td>{{ article.created_at | datetime_format }}</td>
            <td>
                <a href="{{ url_for('main.edit_article', article_id=article.id) }}">编辑</a>
                <form method="post" action="{{ url_for('main.delete_article', article_id=article.id) }}" style="display:inline">
                    <input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
                    <button type="submit" onclick="return confirm('确认删除?')">删除</button>
                </form>
            </td>
        </tr>
        {% endfor %}
    </tbody>
</table>

{% if pagination.pages > 1 %}
<nav class="pagination">
    {% if pagination.has_prev %}
    <a href="{{ url_for('main.articles', page=pagination.prev_num) }}">&laquo; 上一页</a>
    {% endif %}

    {% for page_num in pagination.iter_pages() %}
        {% if page_num %}
        <a href="{{ url_for('main.articles', page=page_num) }}"
           class="{{ 'active' if page_num == pagination.page else '' }}">{{ page_num }}</a>
        {% else %}
        <span class="ellipsis">...</span>
        {% endif %}
    {% endfor %}

    {% if pagination.has_next %}
    <a href="{{ url_for('main.articles', page=pagination.next_num) }}">下一页 &raquo;</a>
    {% endif %}
</nav>
{% endif %}

{% else %}
<p class="empty-state">暂无文章。</p>
{% endif %}
{% endblock %}

17.5.2 自定义过滤器与宏

python
from flask import Flask

app = Flask(__name__)


@app.template_filter("datetime_format")
def datetime_format(value, format="%Y-%m-%d %H:%M"):
    if value is None:
        return ""
    return value.strftime(format)


@app.template_filter("currency")
def currency_format(value, symbol="¥"):
    if value is None:
        return ""
    return f"{symbol}{value:,.2f}"


@app.template_filter("pluralize")
def pluralize(count, singular="", plural="s"):
    if count == 1:
        return singular
    return plural


@app.template_filter("truncate_html")
def truncate_html(value, length=200, end="..."):
    import re
    text = re.sub(r"<[^>]+>", "", value)
    if len(text) <= length:
        return value
    return text[:length].rsplit(" ", 1)[0] + end


@app.template_filter("time_ago")
def time_ago(value):
    from datetime import datetime
    now = datetime.now()
    diff = now - value
    seconds = int(diff.total_seconds())

    intervals = [
        (31536000, "年"),
        (2592000, "个月"),
        (604800, "周"),
        (86400, "天"),
        (3600, "小时"),
        (60, "分钟"),
        (1, "秒"),
    ]

    for interval, label in intervals:
        count = seconds // interval
        if count > 0:
            return f"{count}{label}前"
    return "刚刚"

模板宏——可复用的模板组件:

html
{# templates/macros/forms.html #}

{% macro render_field(field, label_visible=true) -%}
<div class="form-group {% if field.errors %}has-error{% endif %}">
    {% if label_visible %}
    <label for="{{ field.id }}">{{ field.label.text }}</label>
    {% endif %}
    {{ field(class="form-control" + (" is-invalid" if field.errors else ""), **kwargs) }}
    {% if field.errors %}
    <div class="invalid-feedback">
        {% for error in field.errors %}
        <span>{{ error }}</span>
        {% endfor %}
    </div>
    {% elif field.description %}
    <small class="form-text text-muted">{{ field.description }}</small>
    {% endif %}
</div>
{%- endmacro %}


{% macro render_pagination(pagination, endpoint) -%}
{% if pagination.pages > 1 %}
<nav aria-label="分页导航">
    <ul class="pagination">
        <li class="page-item {{ 'disabled' if not pagination.has_prev else '' }}">
            <a class="page-link" href="{{ url_for(endpoint, page=pagination.prev_num) if pagination.has_prev else '#' }}">&laquo;</a>
        </li>
        {% for page_num in pagination.iter_pages(left_edge=1, right_edge=1, left_current=2, right_current=2) %}
            {% if page_num %}
            <li class="page-item {{ 'active' if page_num == pagination.page else '' }}">
                <a class="page-link" href="{{ url_for(endpoint, page=page_num) }}">{{ page_num }}</a>
            </li>
            {% else %}
            <li class="page-item disabled"><span class="page-link">...</span></li>
            {% endif %}
        {% endfor %}
        <li class="page-item {{ 'disabled' if not pagination.has_next else '' }}">
            <a class="page-link" href="{{ url_for(endpoint, page=pagination.next_num) if pagination.has_next else '#' }}">&raquo;</a>
        </li>
    </ul>
</nav>
{% endif %}
{%- endmacro %}

17.6 表单处理与验证

17.6.1 Flask-WTF集成

python
from flask_wtf import FlaskForm
from wtforms import (
    StringField,
    PasswordField,
    TextAreaField,
    SelectField,
    BooleanField,
    FileField,
)
from wtforms.validators import (
    DataRequired,
    Email,
    Length,
    EqualTo,
    Optional,
    Regexp,
    ValidationError,
)


class RegistrationForm(FlaskForm):
    username = StringField(
        "用户名",
        validators=[
            DataRequired(message="用户名不能为空"),
            Length(min=3, max=20, message="用户名长度需在3-20个字符之间"),
            Regexp(
                r"^[a-zA-Z][a-zA-Z0-9_]*$",
                message="用户名只能包含字母、数字和下划线,且以字母开头",
            ),
        ],
    )
    email = StringField(
        "邮箱",
        validators=[
            DataRequired(message="邮箱不能为空"),
            Email(message="请输入有效的邮箱地址"),
        ],
    )
    password = PasswordField(
        "密码",
        validators=[
            DataRequired(message="密码不能为空"),
            Length(min=8, message="密码长度不能少于8个字符"),
        ],
    )
    confirm_password = PasswordField(
        "确认密码",
        validators=[
            DataRequired(message="请确认密码"),
            EqualTo("password", message="两次输入的密码不一致"),
        ],
    )
    agree_terms = BooleanField(
        "同意服务条款",
        validators=[DataRequired(message="必须同意服务条款才能注册")],
    )

    def validate_username(self, field):
        from .models import User
        if User.query.filter_by(username=field.data).first():
            raise ValidationError("该用户名已被注册")

    def validate_email(self, field):
        from .models import User
        if User.query.filter_by(email=field.data).first():
            raise ValidationError("该邮箱已被注册")


class ArticleForm(FlaskForm):
    title = StringField(
        "标题",
        validators=[
            DataRequired(message="标题不能为空"),
            Length(max=200, message="标题不能超过200个字符"),
        ],
    )
    content = TextAreaField(
        "内容",
        validators=[DataRequired(message="内容不能为空")],
    )
    category_id = SelectField(
        "分类",
        coerce=int,
        validators=[DataRequired(message="请选择分类")],
    )
    tags = StringField("标签(用逗号分隔)", validators=[Optional()])
    is_published = BooleanField("发布", default=True)


class ChangePasswordForm(FlaskForm):
    old_password = PasswordField(
        "当前密码",
        validators=[DataRequired(message="请输入当前密码")],
    )
    new_password = PasswordField(
        "新密码",
        validators=[
            DataRequired(message="请输入新密码"),
            Length(min=8, message="密码长度不能少于8个字符"),
        ],
    )
    confirm_password = PasswordField(
        "确认新密码",
        validators=[
            DataRequired(message="请确认新密码"),
            EqualTo("new_password", message="两次输入的密码不一致"),
        ],
    )

    def validate_new_password(self, field):
        if field.data == self.old_password.data:
            raise ValidationError("新密码不能与当前密码相同")

17.6.2 表单视图与模板集成

python
from flask import (
    Blueprint,
    render_template,
    redirect,
    url_for,
    flash,
    request,
)
from flask_login import login_user, logout_user, login_required, current_user

auth_bp = Blueprint("auth", __name__)


@auth_bp.route("/register", methods=["GET", "POST"])
def register():
    from .forms import RegistrationForm
    from .models import User, db

    if current_user.is_authenticated:
        return redirect(url_for("main.index"))

    form = RegistrationForm()
    if form.validate_on_submit():
        user = User(
            username=form.username.data,
            email=form.email.data,
        )
        user.set_password(form.password.data)
        db.session.add(user)
        db.session.commit()

        flash("注册成功!请登录。", "success")
        return redirect(url_for("auth.login"))

    return render_template("auth/register.html", form=form)


@auth_bp.route("/login", methods=["GET", "POST"])
def login():
    from .forms import LoginForm
    from .models import User

    if current_user.is_authenticated:
        return redirect(url_for("main.index"))

    form = LoginForm()
    if form.validate_on_submit():
        user = User.query.filter_by(username=form.username.data).first()
        if user and user.check_password(form.password.data):
            login_user(user, remember=form.remember.data)
            next_page = request.args.get("next")
            flash("登录成功!", "success")
            return redirect(next_page or url_for("main.index"))
        flash("用户名或密码错误", "danger")

    return render_template("auth/login.html", form=form)


@auth_bp.route("/logout")
@login_required
def logout():
    logout_user()
    flash("已退出登录。", "info")
    return redirect(url_for("main.index"))

表单模板:

html
{% extends "base.html" %}
{% from "macros/forms.html" import render_field %}

{% block title %}注册 - {{ super() }}{% endblock %}

{% block content %}
<div class="auth-container">
    <h1>创建账户</h1>

    <form method="post" novalidate>
        {{ form.hidden_tag() }}

        {{ render_field(form.username, placeholder="请输入用户名") }}
        {{ render_field(form.email, placeholder="请输入邮箱", type="email") }}
        {{ render_field(form.password, placeholder="请输入密码") }}
        {{ render_field(form.confirm_password, placeholder="请再次输入密码") }}
        {{ render_field(form.agree_terms) }}

        <button type="submit" class="btn btn-primary">注册</button>
    </form>

    <p>已有账户?<a href="{{ url_for('auth.login') }}">立即登录</a></p>
</div>
{% endblock %}

17.6.3 文件上传处理

python
import os
import uuid
from pathlib import Path
from flask import Blueprint, request, jsonify, current_app
from werkzeug.utils import secure_filename

upload_bp = Blueprint("upload", __name__)

ALLOWED_IMAGE_EXTENSIONS = {"png", "jpg", "jpeg", "gif", "webp"}
ALLOWED_DOCUMENT_EXTENSIONS = {"pdf", "doc", "docx", "xls", "xlsx", "txt"}
MAX_IMAGE_SIZE = 5 * 1024 * 1024
MAX_DOCUMENT_SIZE = 20 * 1024 * 1024


def allowed_file(filename, allowed_extensions):
    return (
        "." in filename
        and filename.rsplit(".", 1)[1].lower() in allowed_extensions
    )


def generate_safe_filename(filename):
    name = secure_filename(filename)
    return f"{uuid.uuid4().hex[:8]}_{name}"


def get_upload_path(category="images"):
    upload_root = Path(current_app.config.get("UPLOAD_FOLDER", "uploads"))
    target_dir = upload_root / category
    target_dir.mkdir(parents=True, exist_ok=True)
    return target_dir


@upload_bp.route("/upload/image", methods=["POST"])
def upload_image():
    if "file" not in request.files:
        return jsonify({"error": "未找到文件"}), 400

    file = request.files["file"]
    if file.filename == "":
        return jsonify({"error": "未选择文件"}), 400

    if not allowed_file(file.filename, ALLOWED_IMAGE_EXTENSIONS):
        return jsonify({"error": f"不支持的文件类型"}), 400

    file.seek(0, 2)
    file_size = file.tell()
    file.seek(0)

    if file_size > MAX_IMAGE_SIZE:
        return jsonify({"error": "文件大小超过5MB限制"}), 400

    filename = generate_safe_filename(file.filename)
    save_path = get_upload_path("images") / filename
    file.save(str(save_path))

    return jsonify({
        "message": "上传成功",
        "filename": filename,
        "url": f"/uploads/images/{filename}",
        "size": file_size,
    }), 201

17.7 数据库集成

17.7.1 SQLAlchemy ORM模型定义

python
from datetime import datetime, timezone
from flask_sqlalchemy import SQLAlchemy
from flask_login import UserMixin
from werkzeug.security import generate_password_hash, check_password_hash

db = SQLAlchemy()


class User(UserMixin, db.Model):
    __tablename__ = "users"

    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(80), unique=True, nullable=False, index=True)
    email = db.Column(db.String(120), unique=True, nullable=False, index=True)
    password_hash = db.Column(db.String(256), nullable=False)
    nickname = db.Column(db.String(50))
    bio = db.Column(db.Text)
    avatar_url = db.Column(db.String(256))
    is_active = db.Column(db.Boolean, default=True, nullable=False)
    is_admin = db.Column(db.Boolean, default=False, nullable=False)
    created_at = db.Column(
        db.DateTime, default=lambda: datetime.now(timezone.utc), nullable=False
    )
    updated_at = db.Column(
        db.DateTime,
        default=lambda: datetime.now(timezone.utc),
        onupdate=lambda: datetime.now(timezone.utc),
    )

    posts = db.relationship("Post", backref="author", lazy="dynamic")
    comments = db.relationship("Comment", backref="author", lazy="dynamic")

    def set_password(self, password):
        self.password_hash = generate_password_hash(password)

    def check_password(self, password):
        return check_password_hash(self.password_hash, password)

    def __repr__(self):
        return f"<User {self.username}>"


post_tags = db.Table(
    "post_tags",
    db.Column("post_id", db.Integer, db.ForeignKey("posts.id"), primary_key=True),
    db.Column("tag_id", db.Integer, db.ForeignKey("tags.id"), primary_key=True),
)


class Post(db.Model):
    __tablename__ = "posts"

    id = db.Column(db.Integer, primary_key=True)
    title = db.Column(db.String(200), nullable=False)
    slug = db.Column(db.String(200), unique=True, nullable=False, index=True)
    content = db.Column(db.Text, nullable=False)
    summary = db.Column(db.String(500))
    is_published = db.Column(db.Boolean, default=False, index=True)
    view_count = db.Column(db.Integer, default=0)
    author_id = db.Column(db.Integer, db.ForeignKey("users.id"), nullable=False)
    category_id = db.Column(db.Integer, db.ForeignKey("categories.id"))
    created_at = db.Column(
        db.DateTime, default=lambda: datetime.now(timezone.utc), nullable=False, index=True
    )
    updated_at = db.Column(
        db.DateTime,
        default=lambda: datetime.now(timezone.utc),
        onupdate=lambda: datetime.now(timezone.utc),
    )

    comments = db.relationship("Comment", backref="post", lazy="dynamic")
    tags = db.relationship(
        "Tag", secondary=post_tags, backref=db.backref("posts", lazy="dynamic")
    )

    def __repr__(self):
        return f"<Post {self.title}>"


class Category(db.Model):
    __tablename__ = "categories"

    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(50), unique=True, nullable=False)
    slug = db.Column(db.String(50), unique=True, nullable=False)
    description = db.Column(db.String(200))

    posts = db.relationship("Post", backref="category", lazy="dynamic")

    def __repr__(self):
        return f"<Category {self.name}>"


class Tag(db.Model):
    __tablename__ = "tags"

    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(50), unique=True, nullable=False)
    slug = db.Column(db.String(50), unique=True, nullable=False)

    def __repr__(self):
        return f"<Tag {self.name}>"


class Comment(db.Model):
    __tablename__ = "comments"

    id = db.Column(db.Integer, primary_key=True)
    content = db.Column(db.Text, nullable=False)
    is_approved = db.Column(db.Boolean, default=False)
    post_id = db.Column(db.Integer, db.ForeignKey("posts.id"), nullable=False, index=True)
    author_id = db.Column(db.Integer, db.ForeignKey("users.id"), nullable=False)
    parent_id = db.Column(db.Integer, db.ForeignKey("comments.id"))
    created_at = db.Column(
        db.DateTime, default=lambda: datetime.now(timezone.utc), nullable=False
    )

    replies = db.relationship(
        "Comment", backref=db.backref("parent", remote_side=[id]), lazy="dynamic"
    )

    def __repr__(self):
        return f"<Comment {self.id}>"

17.7.2 数据库迁移管理

python
from flask_migrate import Migrate

migrate = Migrate()

迁移命令:

bash
flask db init                  # 初始化迁移仓库
flask db migrate -m "描述"     # 生成迁移脚本
flask db upgrade               # 执行迁移
flask db downgrade             # 回滚迁移
flask db history               # 查看迁移历史
flask db current               # 查看当前版本
flask db stamp head            # 标记当前数据库为最新版本

自定义迁移脚本示例:

python
def upgrade():
    op.add_column("users", sa.Column("last_login_at", sa.DateTime(), nullable=True))
    op.create_index("ix_users_last_login", "users", ["last_login_at"])
    op.execute(
        "UPDATE users SET last_login_at = created_at WHERE last_login_at IS NULL"
    )


def downgrade():
    op.drop_index("ix_users_last_login", table_name="users")
    op.drop_column("users", "last_login_at")

17.7.3 查询优化

python
from sqlalchemy import func, or_, desc
from sqlalchemy.orm import joinedload, subqueryload, contains_eager


class PostQuery:
    @staticmethod
    def get_published(page=1, per_page=10):
        return (
            Post.query.filter_by(is_published=True)
            .options(joinedload(Post.author), joinedload(Post.category))
            .order_by(Post.created_at.desc())
            .paginate(page=page, per_page=per_page, error_out=False)
        )

    @staticmethod
    def get_by_slug(slug):
        return (
            Post.query.filter_by(slug=slug, is_published=True)
            .options(joinedload(Post.author), subqueryload(Post.tags))
            .first_or_404()
        )

    @staticmethod
    def search(keyword, page=1, per_page=10):
        search_filter = or_(
            Post.title.ilike(f"%{keyword}%"),
            Post.content.ilike(f"%{keyword}%"),
            Post.summary.ilike(f"%{keyword}%"),
        )
        return (
            Post.query.filter(search_filter, Post.is_published == True)
            .options(joinedload(Post.author))
            .order_by(Post.created_at.desc())
            .paginate(page=page, per_page=per_page, error_out=False)
        )

    @staticmethod
    def get_by_category(category_slug, page=1, per_page=10):
        return (
            Post.query.join(Post.category)
            .filter(Category.slug == category_slug, Post.is_published == True)
            .options(contains_eager(Post.category), joinedload(Post.author))
            .order_by(Post.created_at.desc())
            .paginate(page=page, per_page=per_page, error_out=False)
        )

    @staticmethod
    def get_stats():
        return db.session.query(
            func.date_trunc("month", Post.created_at).label("month"),
            func.count(Post.id).label("count"),
        ).filter(
            Post.is_published == True
        ).group_by(
            func.date_trunc("month", Post.created_at)
        ).order_by(
            desc("month")
        ).all()

    @staticmethod
    def get_popular(limit=10):
        return (
            Post.query.filter_by(is_published=True)
            .order_by(Post.view_count.desc())
            .limit(limit)
            .all()
        )

N+1查询问题与解决方案:

python
# N+1查询问题(低效)
posts = Post.query.all()
for post in posts:
    print(post.author.username)  # 每次循环都执行一次查询

# 解决方案1: joinedload (JOIN查询)
posts = Post.query.options(joinedload(Post.author)).all()

# 解决方案2: subqueryload (子查询)
posts = Post.query.options(subqueryload(Post.tags)).all()

# 解决方案3: contains_eager (手动控制JOIN)
posts = (
    Post.query.join(Post.author)
    .filter(User.is_active == True)
    .options(contains_eager(Post.author))
    .all()
)

17.8 用户认证与授权

17.8.1 Flask-Login集成

python
from flask_login import LoginManager
from .models import User

login_manager = LoginManager()
login_manager.login_view = "auth.login"
login_manager.login_message = "请登录后访问此页面"
login_manager.login_message_category = "info"
login_manager.session_protection = "strong"


@login_manager.user_loader
def load_user(user_id):
    return db.session.get(User, int(user_id))


@login_manager.unauthorized_handler
def unauthorized():
    from flask import request, jsonify

    if (
        request.accept_mimetypes.best_match(["application/json", "text/html"])
        == "application/json"
    ):
        return jsonify({"error": "未授权访问", "status": 401}), 401
    return redirect(url_for("auth.login"))

17.8.2 JWT认证

python
import jwt
from datetime import datetime, timedelta, timezone
from functools import wraps
from flask import request, jsonify, current_app


class TokenService:
    @staticmethod
    def generate_access_token(user_id, expires_in=3600):
        payload = {
            "sub": user_id,
            "iat": datetime.now(timezone.utc),
            "exp": datetime.now(timezone.utc) + timedelta(seconds=expires_in),
            "type": "access",
        }
        return jwt.encode(
            payload,
            current_app.config["SECRET_KEY"],
            algorithm="HS256",
        )

    @staticmethod
    def generate_refresh_token(user_id, expires_in=2592000):
        payload = {
            "sub": user_id,
            "iat": datetime.now(timezone.utc),
            "exp": datetime.now(timezone.utc) + timedelta(seconds=expires_in),
            "type": "refresh",
        }
        return jwt.encode(
            payload,
            current_app.config["SECRET_KEY"],
            algorithm="HS256",
        )

    @staticmethod
    def decode_token(token):
        try:
            payload = jwt.decode(
                token,
                current_app.config["SECRET_KEY"],
                algorithms=["HS256"],
            )
            return payload
        except jwt.ExpiredSignatureError:
            return None
        except jwt.InvalidTokenError:
            return None


def token_required(f):
    @wraps(f)
    def decorated(*args, **kwargs):
        auth_header = request.headers.get("Authorization")
        if not auth_header or not auth_header.startswith("Bearer "):
            return jsonify({"error": "缺少有效的认证令牌"}), 401

        token = auth_header[7:]
        payload = TokenService.decode_token(token)

        if payload is None:
            return jsonify({"error": "令牌无效或已过期"}), 401

        if payload.get("type") != "access":
            return jsonify({"error": "令牌类型错误"}), 401

        from .models import User
        user = db.session.get(User, payload["sub"])
        if user is None or not user.is_active:
            return jsonify({"error": "用户不存在或已禁用"}), 401

        request.current_user = user
        return f(*args, **kwargs)

    return decorated


def admin_required(f):
    @wraps(f)
    @token_required
    def decorated(*args, **kwargs):
        if not request.current_user.is_admin:
            return jsonify({"error": "需要管理员权限"}), 403
        return f(*args, **kwargs)

    return decorated

17.8.3 权限控制系统

python
from functools import wraps
from flask import abort
from flask_login import current_user


class Permission:
    READ = 0x01
    CREATE = 0x02
    UPDATE = 0x04
    DELETE = 0x08
    ADMIN = 0x80


class Role:
    GUEST = 0x01
    AUTHOR = 0x01 | 0x02 | 0x04
    EDITOR = 0x01 | 0x02 | 0x04 | 0x08
    ADMIN = 0x01 | 0x02 | 0x04 | 0x08 | 0x80


def permission_required(permission):
    def decorator(f):
        @wraps(f)
        def decorated(*args, **kwargs):
            if not current_user.is_authenticated:
                abort(401)
            if not has_permission(current_user, permission):
                abort(403)
            return f(*args, **kwargs)
        return decorated
    return decorator


def has_permission(user, permission):
    if user.is_admin:
        return True
    role_permissions = getattr(user, "role_permissions", 0)
    return (role_permissions & permission) == permission


def ownership_required(model_class, id_param="id", owner_attr="author_id"):
    def decorator(f):
        @wraps(f)
        def decorated(*args, **kwargs):
            resource_id = kwargs.get(id_param)
            resource = model_class.query.get_or_404(resource_id)

            if current_user.is_admin:
                return f(*args, **kwargs)

            owner_id = getattr(resource, owner_attr)
            if owner_id != current_user.id:
                abort(403)

            return f(*args, **kwargs, resource=resource)
        return decorated
    return decorator

17.9 安全防护

17.9.1 CSRF防护

python
from flask_wtf.csrf import CSRFProtect

csrf = CSRFProtect()

AJAX请求中传递CSRF令牌:

javascript
// 在所有AJAX请求中自动添加CSRF令牌
const csrfToken = document.querySelector('meta[name="csrf-token"]').content;

fetch("/api/endpoint", {
    method: "POST",
    headers: {
        "Content-Type": "application/json",
        "X-CSRFToken": csrfToken,
    },
    body: JSON.stringify(data),
});
html
<!-- 在基础模板中添加CSRF元标签 -->
<meta name="csrf-token" content="{{ csrf_token() }}">

17.9.2 安全头部配置

python
from flask import Flask
from flask_talisman import Talisman

app = Flask(__name__)

talisman = Talisman(
    app,
    force_https=True,
    strict_transport_security=True,
    strict_transport_security_max_age=31536000,
    strict_transport_security_include_subdomains=True,
    content_security_policy={
        "default-src": "'self'",
        "script-src": ["'self'", "https://cdn.jsdelivr.net"],
        "style-src": ["'self'", "'unsafe-inline'", "https://cdn.jsdelivr.net"],
        "img-src": ["'self'", "data:", "https:"],
        "font-src": ["'self'", "https://cdn.jsdelivr.net"],
        "connect-src": ["'self'", "https://api.example.com"],
        "frame-ancestors": "'none'",
    },
    referrer_policy="strict-origin-when-cross-origin",
)

17.9.3 输入验证与SQL注入防护

python
from markupsafe import escape, Markup
import bleach


def sanitize_html(content, allowed_tags=None, allowed_attributes=None):
    default_tags = [
        "p", "br", "strong", "em", "u", "h1", "h2", "h3",
        "h4", "h5", "h6", "ul", "ol", "li", "a", "blockquote",
        "code", "pre", "img",
    ]
    default_attributes = {
        "a": ["href", "title"],
        "img": ["src", "alt", "width", "height"],
        "code": ["class"],
    }

    tags = allowed_tags or default_tags
    attrs = allowed_attributes or default_attributes

    cleaned = bleach.clean(
        content,
        tags=tags,
        attributes=attrs,
        strip=True,
    )
    return Markup(cleaned)

SQL注入防护的核心原则——始终使用参数化查询:

python
# 危险!SQL注入漏洞
query = f"SELECT * FROM users WHERE username = '{username}'"

# 安全:ORM方式
user = User.query.filter_by(username=username).first()

# 安全:原生SQL参数化
result = db.session.execute(
    text("SELECT * FROM users WHERE username = :username"),
    {"username": username},
)

17.9.4 密码安全

python
from werkzeug.security import generate_password_hash, check_password_hash
import secrets


class PasswordPolicy:
    MIN_LENGTH = 8
    MAX_LENGTH = 128
    REQUIRE_UPPERCASE = True
    REQUIRE_LOWERCASE = True
    REQUIRE_DIGIT = True
    REQUIRE_SPECIAL = True

    @classmethod
    def validate(cls, password):
        errors = []
        if len(password) < cls.MIN_LENGTH:
            errors.append(f"密码长度不能少于{cls.MIN_LENGTH}个字符")
        if len(password) > cls.MAX_LENGTH:
            errors.append(f"密码长度不能超过{cls.MAX_LENGTH}个字符")
        if cls.REQUIRE_UPPERCASE and not any(c.isupper() for c in password):
            errors.append("密码必须包含大写字母")
        if cls.REQUIRE_LOWERCASE and not any(c.islower() for c in password):
            errors.append("密码必须包含小写字母")
        if cls.REQUIRE_DIGIT and not any(c.isdigit() for c in password):
            errors.append("密码必须包含数字")
        if cls.REQUIRE_SPECIAL and not any(
            c in "!@#$%^&*()_+-=[]{}|;:,.<>?" for c in password
        ):
            errors.append("密码必须包含特殊字符")
        return errors

    @staticmethod
    def hash_password(password):
        return generate_password_hash(password, method="pbkdf2:sha256", salt_length=16)

    @staticmethod
    def verify_password(password, password_hash):
        return check_password_hash(password_hash, password)

    @staticmethod
    def generate_token(nbytes=32):
        return secrets.token_urlsafe(nbytes)

17.10 RESTful API开发

17.10.1 API蓝图与序列化

python
from flask import Blueprint, request, jsonify
from marshmallow import Schema, fields, validate, post_load, EXCLUDE

api_bp = Blueprint("api", __name__, url_prefix="/api/v1")


class UserSchema(Schema):
    id = fields.Integer(dump_only=True)
    username = fields.String(required=True, validate=validate.Length(min=3, max=20))
    email = fields.Email(required=True)
    nickname = fields.String(load_default="")
    bio = fields.String(load_default="")
    created_at = fields.DateTime(dump_only=True)

    class Meta:
        unknown = EXCLUDE


class PostSchema(Schema):
    id = fields.Integer(dump_only=True)
    title = fields.String(required=True, validate=validate.Length(min=1, max=200))
    content = fields.String(required=True)
    summary = fields.String(load_default="")
    is_published = fields.Boolean(load_default=False)
    slug = fields.String(dump_only=True)
    author = fields.Nested(UserSchema, dump_only=True)
    tags = fields.List(fields.String(), load_default=[])
    created_at = fields.DateTime(dump_only=True)
    updated_at = fields.DateTime(dump_only=True)


class PaginatedSchema(Schema):
    items = fields.List(fields.Dict())
    page = fields.Integer()
    per_page = fields.Integer()
    total = fields.Integer()
    pages = fields.Integer()
    has_next = fields.Boolean()
    has_prev = fields.Boolean()

17.10.2 API视图实现

python
from flask import Blueprint, request, jsonify
from .models import User, Post, db
from .schemas import UserSchema, PostSchema
from .auth import token_required, admin_required

api_bp = Blueprint("api", __name__, url_prefix="/api/v1")

user_schema = UserSchema()
users_schema = UserSchema(many=True)
post_schema = PostSchema()
posts_schema = PostSchema(many=True)


@api_bp.route("/posts", methods=["GET"])
def get_posts():
    page = request.args.get("page", 1, type=int)
    per_page = request.args.get("per_page", 10, type=int)
    per_page = min(per_page, 100)

    pagination = (
        Post.query.filter_by(is_published=True)
        .order_by(Post.created_at.desc())
        .paginate(page=page, per_page=per_page, error_out=False)
    )

    return jsonify({
        "items": posts_schema.dump(pagination.items),
        "page": pagination.page,
        "per_page": pagination.per_page,
        "total": pagination.total,
        "pages": pagination.pages,
    })


@api_bp.route("/posts/<int:post_id>", methods=["GET"])
def get_post(post_id):
    post = Post.query.get_or_404(post_id)
    return jsonify(post_schema.dump(post))


@api_bp.route("/posts", methods=["POST"])
@token_required
def create_post():
    json_data = request.get_json()
    if not json_data:
        return jsonify({"error": "未提供输入数据"}), 400

    errors = post_schema.validate(json_data)
    if errors:
        return jsonify({"errors": errors}), 422

    data = post_schema.load(json_data)
    post = Post(
        title=data["title"],
        content=data["content"],
        summary=data.get("summary", ""),
        is_published=data.get("is_published", False),
        author_id=request.current_user.id,
    )
    db.session.add(post)
    db.session.commit()

    return jsonify(post_schema.dump(post)), 201


@api_bp.route("/posts/<int:post_id>", methods=["PUT"])
@token_required
def update_post(post_id):
    post = Post.query.get_or_404(post_id)

    if post.author_id != request.current_user.id and not request.current_user.is_admin:
        return jsonify({"error": "无权限修改此文章"}), 403

    json_data = request.get_json()
    if not json_data:
        return jsonify({"error": "未提供输入数据"}), 400

    errors = post_schema.validate(json_data, partial=True)
    if errors:
        return jsonify({"errors": errors}), 422

    data = post_schema.load(json_data, partial=True)
    for key, value in data.items():
        setattr(post, key, value)

    db.session.commit()
    return jsonify(post_schema.dump(post))


@api_bp.route("/posts/<int:post_id>", methods=["DELETE"])
@token_required
def delete_post(post_id):
    post = Post.query.get_or_404(post_id)

    if post.author_id != request.current_user.id and not request.current_user.is_admin:
        return jsonify({"error": "无权限删除此文章"}), 403

    db.session.delete(post)
    db.session.commit()
    return "", 204

17.10.3 API错误处理与分页

python
from flask import jsonify
from werkzeug.exceptions import HTTPException


class APIError(Exception):
    def __init__(self, message, status_code=400, payload=None):
        super().__init__()
        self.message = message
        self.status_code = status_code
        self.payload = payload

    def to_dict(self):
        rv = dict(self.payload or {})
        rv["error"] = self.message
        rv["status"] = self.status_code
        return rv


@api_bp.errorhandler(APIError)
def handle_api_error(error):
    return jsonify(error.to_dict()), error.status_code


@api_bp.errorhandler(HTTPException)
def handle_http_error(error):
    return jsonify({"error": error.description, "status": error.code}), error.code

17.11 测试

17.11.1 测试配置与固件

python
import pytest
from app import create_app, db as _db
from app.models import User


@pytest.fixture(scope="session")
def app():
    app = create_app("testing")
    yield app


@pytest.fixture(scope="function")
def db(app):
    with app.app_context():
        _db.create_all()
        yield _db
        _db.session.remove()
        _db.drop_all()


@pytest.fixture(scope="function")
def client(app, db):
    return app.test_client()


@pytest.fixture(scope="function")
def sample_user(db):
    user = User(username="testuser", email="test@example.com")
    user.set_password("TestPass123!")
    db.session.add(user)
    db.session.commit()
    return user


@pytest.fixture(scope="function")
def auth_headers(app, sample_user):
    from app.auth import TokenService
    with app.app_context():
        token = TokenService.generate_access_token(sample_user.id)
    return {"Authorization": f"Bearer {token}"}

17.11.2 视图测试

python
class TestAuthViews:
    def test_register_page(self, client):
        resp = client.get("/auth/register")
        assert resp.status_code == 200
        assert b"register" in resp.data.lower()

    def test_register_success(self, client, db):
        resp = client.post("/auth/register", data={
            "username": "newuser",
            "email": "new@example.com",
            "password": "NewPass123!",
            "confirm_password": "NewPass123!",
            "agree_terms": True,
        }, follow_redirects=True)
        assert resp.status_code == 200
        user = User.query.filter_by(username="newuser").first()
        assert user is not None

    def test_register_duplicate_username(self, client, sample_user):
        resp = client.post("/auth/register", data={
            "username": sample_user.username,
            "email": "other@example.com",
            "password": "NewPass123!",
            "confirm_password": "NewPass123!",
            "agree_terms": True,
        })
        assert resp.status_code == 200
        assert b"already" in resp.data or b"已被" in resp.data

    def test_login_success(self, client, sample_user):
        resp = client.post("/auth/login", data={
            "username": "testuser",
            "password": "TestPass123!",
        }, follow_redirects=True)
        assert resp.status_code == 200

    def test_login_invalid_password(self, client, sample_user):
        resp = client.post("/auth/login", data={
            "username": "testuser",
            "password": "wrongpassword",
        })
        assert resp.status_code == 200
        assert b"error" in resp.data.lower() or b"错误" in resp.data

    def test_logout(self, client, sample_user):
        client.post("/auth/login", data={
            "username": "testuser",
            "password": "TestPass123!",
        })
        resp = client.get("/auth/logout", follow_redirects=True)
        assert resp.status_code == 200

17.11.3 API测试

python
import json


class TestPostAPI:
    def test_get_posts(self, client):
        resp = client.get("/api/v1/posts")
        assert resp.status_code == 200
        data = resp.get_json()
        assert "items" in data
        assert "page" in data

    def test_create_post_unauthorized(self, client):
        resp = client.post("/api/v1/posts", json={"title": "Test"})
        assert resp.status_code == 401

    def test_create_post_success(self, client, auth_headers):
        resp = client.post(
            "/api/v1/posts",
            json={"title": "Test Post", "content": "Content here"},
            headers=auth_headers,
        )
        assert resp.status_code == 201
        data = resp.get_json()
        assert data["title"] == "Test Post"

    def test_create_post_validation_error(self, client, auth_headers):
        resp = client.post(
            "/api/v1/posts",
            json={"title": ""},
            headers=auth_headers,
        )
        assert resp.status_code == 422

    def test_update_post(self, client, auth_headers, db):
        from app.models import Post
        post = Post(title="Old", content="Old content", author_id=1)

## 17.12 本章小结

本章系统介绍了Flask Web开发的完整体系:

1. **Web框架基础**:HTTP协议、WSGI/ASGI协议、Flask设计哲学
2. **应用核心机制**:应用工厂模式、上下文机制、配置管理
3. **路由系统**:动态路由、蓝图模块化、URL构建
4. **模板引擎**:Jinja2语法、模板继承、自定义过滤器
5. **数据持久层**:SQLAlchemy ORM、关系建模、查询优化
6. **认证授权**:Flask-Login、JWT认证、权限控制
7. **安全防护**:CSRF防护、安全头部、输入验证、密码安全
8. **RESTful API**:API设计、序列化、错误处理
9. **测试**:测试配置、视图测试、API测试

## 17.13 延伸阅读

### 17.13.1 Flask官方资源

- **Flask官方文档** (https://flask.palletsprojects.com/) — Flask权威指南
- **Werkzeug文档** (https://werkzeug.palletsprojects.com/) — WSGI工具库
- **Jinja2文档** (https://jinja.palletsprojects.com/) — 模板引擎

### 17.13.2 扩展与生态

- **Flask-SQLAlchemy** (https://flask-sqlalchemy.palletsprojects.com/) — ORM集成
- **Flask-Migrate** (https://flask-migrate.readthedocs.io/) — 数据库迁移
- **Flask-RESTful** (https://flask-restful.readthedocs.io/) — REST API扩展
- **Flask-Login** (https://flask-login.readthedocs.io/) — 用户认证

### 17.13.3 进阶书籍

- **《Flask Web Development》** (Miguel Grinberg) — Flask开发经典
- **《Python Web Development with Flask》** — 现代Flask实践
- **《Architecture Patterns with Python》** — Python架构模式

### 17.13.4 部署与运维

- **Gunicorn** (https://gunicorn.org/) — WSGI服务器
- **uWSGI** (https://uwsgi-docs.readthedocs.io/) — 应用服务器
- **Docker部署** (https://docs.docker.com/) — 容器化部署
- **Nginx配置** (https://nginx.org/en/docs/) — 反向代理

---

        db.session.add(post)
        db.session.commit()

        resp = client.put(
            f"/api/v1/posts/{post.id}",
            json={"title": "New Title"},
            headers=auth_headers,
        )
        assert resp.status_code == 200
        assert resp.get_json()["title"] == "New Title"

    def test_delete_post(self, client, auth_headers, db):
        from app.models import Post
        post = Post(title="Delete Me", content="Content", author_id=1)
        db.session.add(post)
        db.session.commit()

        resp = client.delete(
            f"/api/v1/posts/{post.id}",
            headers=auth_headers,
        )
        assert resp.status_code == 204

17.12 生产部署

17.12.1 Gunicorn部署

bash
pip install gunicorn
gunicorn -w 4 -b 0.0.0.0:8000 "app:create_app()"
gunicorn -w 4 -b 0.0.0.0:8000 --worker-class gthread --threads 2 "app:create_app()"
gunicorn -w 4 -k uvicorn.workers.UvicornWorker -b 0.0.0.0:8000 "app:create_app()"

Gunicorn配置文件 gunicorn.conf.py

python
import multiprocessing

bind = "0.0.0.0:8000"
workers = multiprocessing.cpu_count() * 2 + 1
worker_class = "gthread"
threads = 2
timeout = 120
keepalive = 5
accesslog = "-"
errorlog = "-"
loglevel = "info"
preload_app = True
max_requests = 5000
max_requests_jitter = 500

17.12.2 Docker容器化

dockerfile
FROM python:3.12-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

RUN useradd --create-home appuser
USER appuser

EXPOSE 8000

CMD ["gunicorn", "-c", "gunicorn.conf.py", "app:create_app()"]
yaml
version: "3.8"
services:
  web:
    build: .
    ports:
      - "8000:8000"
    environment:
      - FLASK_ENV=production
      - SECRET_KEY=${SECRET_KEY}
      - DATABASE_URL=postgresql://user:pass@db:5432/app
    depends_on:
      - db
      - redis
    restart: unless-stopped

  db:
    image: postgres:16-alpine
    environment:
      POSTGRES_DB: app
      POSTGRES_USER: user
      POSTGRES_PASSWORD: pass
    volumes:
      - pgdata:/var/lib/postgresql/data

  redis:
    image: redis:7-alpine
    restart: unless-stopped

  nginx:
    image: nginx:alpine
    ports:
      - "80:80"
      - "443:443"
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf:ro
      - ./certs:/etc/nginx/certs:ro
    depends_on:
      - web

volumes:
  pgdata:

17.12.3 Nginx反向代理

nginx
upstream flask_app {
    server web:8000;
}

server {
    listen 80;
    server_name example.com;
    return 301 https://$server_name$request_uri;
}

server {
    listen 443 ssl http2;
    server_name example.com;

    ssl_certificate /etc/nginx/certs/cert.pem;
    ssl_certificate_key /etc/nginx/certs/key.pem;

    client_max_body_size 16M;

    location / {
        proxy_pass http://flask_app;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
    }

    location /static/ {
        alias /app/static/;
        expires 30d;
        add_header Cache-Control "public, immutable";
    }

    location /uploads/ {
        alias /app/uploads/;
        expires 7d;
    }
}

17.12.4 性能优化策略

python
from flask import Flask
from flask_caching import Cache

app = Flask(__name__)
app.config["CACHE_TYPE"] = "RedisCache"
app.config["CACHE_REDIS_URL"] = "redis://localhost:6379/0"
app.config["CACHE_DEFAULT_TIMEOUT"] = 300

cache = Cache(app)


@app.route("/api/posts")
@cache.cached(timeout=60, query_string=True)
def get_posts():
    from .models import Post
    posts = Post.query.filter_by(is_published=True).all()
    return jsonify([p.to_dict() for p in posts])


@app.route("/api/posts/<int:post_id>")
@cache.cached(timeout=300)
def get_post(post_id):
    from .models import Post
    post = Post.query.get_or_404(post_id)
    return jsonify(post.to_dict())

数据库连接池优化:

python
app.config["SQLALCHEMY_ENGINE_OPTIONS"] = {
    "pool_size": 20,
    "max_overflow": 10,
    "pool_recycle": 3600,
    "pool_pre_ping": True,
    "echo_pool": False,
}

17.13 前沿技术动态

17.13.1 Flask 3.0+新特性

Flask 3.0于2023年底发布,带来了多项重要改进:

  • 简化扩展初始化:扩展现在支持延迟初始化,不再需要传递app实例
  • 改进的异步支持:原生支持async def视图函数
  • 更严格的路由匹配:修复了部分边缘情况的路由冲突
  • 改进的JSON支持:默认使用Python内置的json模块,支持自定义JSON编码器
python
from flask import Flask

app = Flask(__name__)

app.json.encoder = lambda x: str(x)


@app.route("/async-data")
async def async_data():
    import httpx
    async with httpx.AsyncClient() as client:
        resp = await client.get("https://api.example.com/data")
    return resp.json()

17.13.2 Flask与前端框架集成

现代Flask应用常与Vue.js、React等前端框架配合使用:

python
from flask import Flask, send_from_directory

app = Flask(__name__, static_folder="dist")


@app.route("/", defaults={"path": ""})
@app.route("/<path:path>")
def serve_spa(path):
    if path and os.path.exists(os.path.join("dist", path)):
        return send_from_directory("dist", path)
    return send_from_directory("dist", "index.html")

17.13.3 微服务架构中的Flask

Flask因其轻量级特性,在微服务架构中广泛使用:

python
from flask import Flask, jsonify
from flask_consulate import Consul

app = Flask(__name__)

consul = Consul(app, host="consul-server", port=8500)
consul.register_service(
    name="user-service",
    port=8000,
    tags=["flask", "user", "api"],
)


@app.route("/health")
def health():
    return jsonify({"status": "healthy"})


@app.route("/api/users/<int:user_id>")
def get_user(user_id):
    return jsonify({"id": user_id, "username": "example"})

17.14 本章小结

本章系统阐述了Flask Web开发的核心知识体系:

  1. Web框架基础:深入理解WSGI/ASGI协议原理,掌握HTTP请求-响应生命周期
  2. Flask核心机制:应用工厂模式、上下文机制(应用上下文与请求上下文)、配置管理
  3. 路由系统:动态路由、自定义转换器、RESTful设计、蓝图模块化
  4. 请求与响应:请求对象属性、响应构建、流式响应、请求钩子
  5. 模板引擎:Jinja2继承体系、自定义过滤器、模板宏
  6. 表单处理:Flask-WTF验证、文件上传安全处理
  7. 数据库集成:SQLAlchemy ORM、关系建模、迁移管理、N+1查询优化
  8. 认证授权:Flask-Login会话认证、JWT令牌认证、位运算权限控制
  9. 安全防护:CSRF、XSS、SQL注入防御、安全头部、密码安全
  10. RESTful API:Marshmallow序列化、API错误处理、分页
  11. 测试:pytest固件、视图测试、API测试
  12. 生产部署:Gunicorn、Docker容器化、Nginx反向代理、缓存优化

17.15 习题与项目练习

基础题

  1. 使用应用工厂模式创建一个Flask应用,包含首页、关于页面和联系页面三个蓝图。

  2. 实现一个支持用户注册、登录、退出的完整认证系统,要求包含密码强度验证和CSRF防护。

  3. 使用Flask-SQLAlchemy设计一个博客系统的数据模型,包含用户、文章、分类、标签和评论五个模型及其关联关系。

进阶题

  1. 实现一个完整的RESTful API,支持文章的CRUD操作,包含JWT认证、Marshmallow序列化验证和分页功能。

  2. 设计并实现一个基于位运算的权限控制系统,支持角色(访客、作者、编辑、管理员)和权限(读、写、删、管理)的灵活组合。

  3. 实现文件上传功能,要求支持图片和文档两种类型,包含文件类型验证、大小限制、安全文件名生成和存储路径管理。

综合项目

  1. 博客平台项目:构建一个功能完整的博客平台,包含以下功能:

    • 用户注册/登录(邮箱验证)
    • 文章发布/编辑/删除(Markdown支持)
    • 分类和标签管理
    • 评论系统(支持嵌套回复)
    • 全文搜索
    • 管理后台
    • RESTful API
    • 完整的测试覆盖
  2. 任务管理系统项目:构建一个团队协作任务管理系统,包含:

    • 用户认证与权限管理
    • 项目和任务CRUD
    • 任务分配与状态流转
    • 实时通知(WebSocket)
    • API接口
    • Docker部署配置

思考题

  1. Flask的"微框架"设计哲学在什么场景下优于Django的"全栈框架"设计?在什么场景下相反?请从技术架构、团队协作、项目规模三个维度分析。

  2. 在微服务架构中,Flask应用如何实现服务发现、配置中心、链路追踪等分布式系统必需的能力?请设计一个基于Flask的微服务技术栈方案。


下一章:第18章 Django Web开发

青少年创意编程 - 高中Python组 - 江苏省宿城中等专业学校