Skip to content

第26章 实战:Web应用开发

学习目标

完成本章学习后,读者应能够:

  1. 掌握Web应用架构设计:理解分层架构、领域驱动设计在Web应用中的实践,构建可维护、可扩展的应用结构
  2. 精通配置管理:实现多环境配置、敏感信息保护与配置热更新机制
  3. 构建数据持久层:运用SQLAlchemy实现复杂关系建模、数据验证、迁移管理与查询优化
  4. 实现完整认证授权体系:基于JWT构建无状态认证、角色权限控制与OAuth2集成
  5. 设计RESTful API:遵循 Richardson 成熟度模型,实现资源化API、版本管理与HATEOAS
  6. 掌握安全防护实践:防御OWASP Top 10安全威胁,实现CSP、CORS、速率限制等安全策略
  7. 实施测试策略:构建单元测试、集成测试、API测试与端到端测试的完整测试金字塔
  8. 完成生产部署:掌握Docker容器化、Nginx反向代理、数据库优化与监控告警体系

26.1 Web应用架构设计理论

26.1.1 分层架构

生产级Web应用通常采用分层架构,将关注点分离到不同层次:

┌─────────────────────────────────────────────┐
│              表示层 (Presentation)            │
│   Templates / Static Assets / API Serializers│
├─────────────────────────────────────────────┤
│              业务逻辑层 (Business Logic)      │
│   Services / Domain Models / Business Rules  │
├─────────────────────────────────────────────┤
│              数据访问层 (Data Access)          │
│   Repositories / ORM / Query Builders        │
├─────────────────────────────────────────────┤
│              基础设施层 (Infrastructure)       │
│   Database / Cache / Message Queue / Storage  │
└─────────────────────────────────────────────┘

各层职责与交互原则:

层次职责依赖方向关键模式
表示层请求解析、响应序列化、输入验证→ 业务逻辑层MVC、序列化器
业务逻辑层核心业务规则、工作流编排→ 数据访问层服务模式、领域模型
数据访问层数据持久化、查询优化→ 基础设施层仓储模式、Unit of Work
基础设施层外部系统集成、技术实现无上层依赖适配器模式

26.1.2 领域驱动设计核心概念

在复杂Web应用中,领域驱动设计(DDD)提供了组织业务逻辑的系统方法:

  • 实体(Entity):具有唯一标识的领域对象,如用户、文章
  • 值对象(Value Object):无唯一标识、通过属性定义的对象,如邮箱地址
  • 聚合根(Aggregate Root):一组相关实体的边界,外部只能通过聚合根访问
  • 领域服务(Domain Service):不属于任何实体的业务操作
  • 仓储(Repository):聚合根的持久化抽象接口

26.1.3 项目技术选型

组件选型选型理由
Web框架Flask 3.x微框架灵活性,适合渐进式架构演进
ORMSQLAlchemy 2.xPython生态最成熟的ORM,支持异步与类型注解
数据库迁移AlembicSQLAlchemy官方迁移工具,支持分支合并
认证PyJWT + Flask-LoginJWT用于API认证,Session用于Web认证
表单WTForms服务端验证,与Jinja2深度集成
任务队列Celery + Redis异步任务处理(邮件发送、数据统计)
缓存Flask-Caching + Redis多级缓存策略,支持Redis/Memcached
测试pytest + Factory Boy工厂模式创建测试数据,pytest fixtures管理
部署Docker + Gunicorn容器化部署,Gunicorn作为WSGI服务器

26.2 项目概述

本章将构建一个生产级博客平台 flaskblog,涵盖以下核心功能:

  • 用户注册、登录、OAuth2第三方登录
  • 文章的增删改查、草稿管理、Markdown渲染
  • 文章分类与标签系统
  • 评论与回复功能(支持嵌套)
  • 全文搜索(Whoosh/PostgreSQL全文索引)
  • RESTful API(JWT认证、版本管理、分页、过滤)
  • 管理后台(数据统计、用户管理、内容审核)
  • 异步任务(邮件通知、数据统计)
  • 缓存策略
  • Docker容器化部署

26.3 项目结构

flaskblog/
├── app/
│   ├── __init__.py              # 应用工厂
│   ├── extensions.py            # 扩展初始化
│   ├── models/
│   │   ├── __init__.py
│   │   ├── user.py              # 用户模型
│   │   ├── post.py              # 文章模型
│   │   ├── comment.py           # 评论模型
│   │   └── mixins.py            # 通用混入类
│   ├── services/
│   │   ├── __init__.py
│   │   ├── auth.py              # 认证服务
│   │   ├── post.py              # 文章服务
│   │   ├── search.py            # 搜索服务
│   │   └── email.py             # 邮件服务
│   ├── api/
│   │   ├── __init__.py
│   │   ├── v1/
│   │   │   ├── __init__.py
│   │   │   ├── auth.py          # API认证端点
│   │   │   ├── posts.py         # 文章API
│   │   │   ├── users.py         # 用户API
│   │   │   └── comments.py      # 评论API
│   │   └── errors.py            # API错误处理
│   ├── views/
│   │   ├── __init__.py
│   │   ├── auth.py              # Web认证视图
│   │   ├── posts.py             # Web文章视图
│   │   ├── admin.py             # 管理后台视图
│   │   └── errors.py            # 错误页面视图
│   ├── forms/
│   │   ├── __init__.py
│   │   ├── auth.py              # 认证表单
│   │   └── post.py              # 文章表单
│   ├── templates/               # Jinja2模板
│   ├── static/                  # 静态资源
│   └── utils/
│       ├── __init__.py
│       ├── decorators.py        # 自定义装饰器
│       ├── pagination.py        # 分页工具
│       └── serializers.py       # 序列化工具
├── migrations/                  # Alembic迁移文件
├── tests/
│   ├── conftest.py              # pytest fixtures
│   ├── factories.py             # Factory Boy工厂
│   ├── unit/
│   ├── integration/
│   └── e2e/
├── celery_app.py                # Celery配置
├── config.py                    # 配置类
├── pyproject.toml               # 项目元数据与依赖
├── Dockerfile                   # 容器构建
├── docker-compose.yml           # 编排配置
├── nginx.conf                   # Nginx配置
└── .env.example                 # 环境变量示例

26.4 配置管理

26.4.1 多环境配置

python
import os
from datetime import timedelta
from pathlib import Path


class BaseConfig:
    SECRET_KEY = os.environ.get("SECRET_KEY", "dev-secret-key-change-in-production")
    SQLALCHEMY_TRACK_MODIFICATIONS = False
    SQLALCHEMY_RECORD_QUERIES = True

    BASE_DIR = Path(__file__).resolve().parent

    SQLITE_DB_PATH = BASE_DIR / "instance" / "app.db"
    SQLALCHEMY_DATABASE_URI = os.environ.get(
        "DATABASE_URL",
        f"sqlite:///{SQLITE_DB_PATH}",
    )

    REDIS_URL = os.environ.get("REDIS_URL", "redis://localhost:6379/0")

    CACHE_TYPE = "RedisCache"
    CACHE_REDIS_URL = REDIS_URL
    CACHE_DEFAULT_TIMEOUT = 300

    JWT_SECRET_KEY = os.environ.get("JWT_SECRET_KEY", "jwt-dev-secret")
    JWT_ACCESS_TOKEN_EXPIRES = timedelta(hours=1)
    JWT_REFRESH_TOKEN_EXPIRES = timedelta(days=30)

    MAIL_SERVER = os.environ.get("MAIL_SERVER", "localhost")
    MAIL_PORT = int(os.environ.get("MAIL_PORT", 25))
    MAIL_USE_TLS = os.environ.get("MAIL_USE_TLS", "false").lower() == "true"
    MAIL_USERNAME = os.environ.get("MAIL_USERNAME")
    MAIL_PASSWORD = os.environ.get("MAIL_PASSWORD")
    MAIL_DEFAULT_SENDER = os.environ.get("MAIL_DEFAULT_SENDER", "noreply@flaskblog.local")

    CELERY_BROKER_URL = REDIS_URL
    CELERY_RESULT_BACKEND = REDIS_URL

    POSTS_PER_PAGE = 10
    COMMENTS_PER_PAGE = 20
    MAX_CONTENT_LENGTH = 16 * 1024 * 1024

    ALLOWED_EXTENSIONS = {"png", "jpg", "jpeg", "gif", "webp"}
    UPLOAD_FOLDER = BASE_DIR / "app" / "static" / "uploads"

    SECURITY_PASSWORD_SALT = os.environ.get("SECURITY_PASSWORD_SALT", "password-salt")

    @staticmethod
    def init_app(app):
        pass


class DevelopmentConfig(BaseConfig):
    DEBUG = True
    SQLALCHEMY_ECHO = True
    CACHE_TYPE = "SimpleCache"


class TestingConfig(BaseConfig):
    TESTING = True
    SQLALCHEMY_DATABASE_URI = "sqlite://"
    CACHE_TYPE = "NullCache"
    WTF_CSRF_ENABLED = False
    JWT_ACCESS_TOKEN_EXPIRES = timedelta(minutes=5)

    @staticmethod
    def init_app(app):
        import tempfile
        app.config["UPLOAD_FOLDER"] = Path(tempfile.mkdtemp())


class ProductionConfig(BaseConfig):
    SQLALCHEMY_ENGINE_OPTIONS = {
        "pool_size": 10,
        "pool_recycle": 3600,
        "pool_pre_ping": True,
        "connect_args": {"connect_timeout": 5},
    }

    SESSION_COOKIE_SECURE = True
    SESSION_COOKIE_HTTPONLY = True
    SESSION_COOKIE_SAMESITE = "Lax"
    REMEMBER_COOKIE_SECURE = True
    REMEMBER_COOKIE_HTTPONLY = True

    @staticmethod
    def init_app(app):
        import logging
        from logging.handlers import RotatingFileHandler
        handler = RotatingFileHandler(
            "flaskblog.log", maxBytes=10 * 1024 * 1024, backupCount=5
        )
        handler.setFormatter(logging.Formatter(
            "%(asctime)s %(levelname)s %(name)s %(threadName)s %(message)s"
        ))
        handler.setLevel(logging.INFO)
        app.logger.addHandler(handler)


config_map = {
    "development": DevelopmentConfig,
    "testing": TestingConfig,
    "production": ProductionConfig,
    "default": DevelopmentConfig,
}


def get_config():
    env = os.environ.get("FLASK_ENV", "default")
    return config_map.get(env, DevelopmentConfig)

26.4.2 环境变量管理

.env.example

bash
FLASK_ENV=development
SECRET_KEY=your-secret-key-here
JWT_SECRET_KEY=your-jwt-secret-key-here
SECURITY_PASSWORD_SALT=your-password-salt-here
DATABASE_URL=postgresql://user:password@localhost:5432/flaskblog
REDIS_URL=redis://localhost:6379/0
MAIL_SERVER=smtp.example.com
MAIL_PORT=587
MAIL_USE_TLS=true
MAIL_USERNAME=your-email@example.com
MAIL_PASSWORD=your-email-password
MAIL_DEFAULT_SENDER=noreply@example.com

26.5 应用工厂与扩展初始化

26.5.1 扩展初始化

app/extensions.py

python
from flask_sqlalchemy import SQLAlchemy
from flask_login import LoginManager
from flask_migrate import Migrate
from flask_caching import Cache
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
from flask_cors import CORS
from flask_wtf.csrf import CSRFProtect


db = SQLAlchemy()
login_manager = LoginManager()
migrate = Migrate()
cache = Cache()
limiter = Limiter(
    key_func=get_remote_address,
    default_limits=["200 per day", "50 per hour"],
    storage_uri="redis://localhost:6379/1",
)
cors = CORS()
csrf = CSRFProtect()


login_manager.login_view = "auth.login"
login_manager.login_message_category = "info"
login_manager.session_protection = "strong"


@login_manager.user_loader
def load_user(user_id):
    from app.models.user import User
    return db.session.get(User, int(user_id))

26.5.2 应用工厂

app/__init__.py

python
from flask import Flask
from config import get_config
from app.extensions import db, login_manager, migrate, cache, limiter, cors, csrf


def create_app(config_class=None):
    app = Flask(__name__)

    if config_class is None:
        config_class = get_config()
    app.config.from_object(config_class)
    config_class.init_app(app)

    _init_extensions(app)
    _register_blueprints(app)
    _register_error_handlers(app)
    _register_template_filters(app)
    _register_context_processors(app)

    return app


def _init_extensions(app):
    db.init_app(app)
    login_manager.init_app(app)
    migrate.init_app(app, db)
    cache.init_app(app)
    limiter.init_app(app)
    cors.init_app(app, resources={r"/api/*": {"origins": "*"}})
    csrf.init_app(app)
    csrf.exempt(app.blueprints.get("api"))

    with app.app_context():
        from app import models  # noqa: F401
        db.create_all()


def _register_blueprints(app):
    from app.views.auth import auth_bp
    from app.views.posts import posts_bp
    from app.views.admin import admin_bp
    from app.views.errors import errors_bp
    from app.api.v1 import api_v1_bp

    app.register_blueprint(auth_bp, url_prefix="/auth")
    app.register_blueprint(posts_bp)
    app.register_blueprint(admin_bp, url_prefix="/admin")
    app.register_blueprint(errors_bp)
    app.register_blueprint(api_v1_bp, url_prefix="/api/v1")


def _register_error_handlers(app):
    from app.views.errors import page_not_found, internal_error, forbidden

    app.register_error_handler(404, page_not_found)
    app.register_error_handler(500, internal_error)
    app.register_error_handler(403, forbidden)


def _register_template_filters(app):
    import markdown as md
    import bleach

    @app.template_filter("markdown")
    def markdown_filter(text):
        allowed_tags = [
            "a", "abbr", "acronym", "b", "blockquote", "br", "code",
            "dd", "del", "dl", "dt", "em", "h1", "h2", "h3", "h4",
            "h5", "h6", "hr", "i", "img", "li", "ol", "p", "pre",
            "span", "strong", "table", "tbody", "td", "th", "thead",
            "tr", "ul",
        ]
        allowed_attrs = {
            "*": ["class"],
            "a": ["href", "rel", "title"],
            "img": ["alt", "src", "title"],
        }
        html = md.markdown(text, extensions=[
            "fenced_code", "tables", "toc", "nl2br", "codehilite",
        ])
        return bleach.clean(html, tags=allowed_tags, attributes=allowed_attrs)

    @app.template_filter("truncate_chars")
    def truncate_chars_filter(text, length=200):
        if len(text) <= length:
            return text
        return text[:length].rsplit(" ", 1)[0] + "..."


def _register_context_processors(app):
    from datetime import datetime

    @app.context_processor
    def inject_globals():
        return {
            "now": datetime.utcnow,
            "site_name": app.config.get("SITE_NAME", "FlaskBlog"),
        }

26.6 数据模型层

26.6.1 通用混入类

app/models/mixins.py

python
from datetime import datetime
from app.extensions import db


class TimestampMixin:
    created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)
    updated_at = db.Column(
        db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False
    )


class SoftDeleteMixin:
    is_deleted = db.Column(db.Boolean, default=False, nullable=False, index=True)
    deleted_at = db.Column(db.DateTime, nullable=True)

    def soft_delete(self):
        self.is_deleted = True
        self.deleted_at = datetime.utcnow()

    def restore(self):
        self.is_deleted = False
        self.deleted_at = None


class SlugMixin:
    slug = db.Column(db.String(200), unique=True, nullable=False, index=True)

    @staticmethod
    def generate_slug(title: str) -> str:
        import re
        import unicodedata
        slug = unicodedata.normalize("NFKD", title)
        slug = slug.encode("ascii", "ignore").decode("ascii")
        slug = re.sub(r"[^\w\s-]", "", slug).strip().lower()
        slug = re.sub(r"[-\s]+", "-", slug)
        return slug[:200]

26.6.2 用户模型

app/models/user.py

python
from datetime import datetime, timedelta
from flask_login import UserMixin
from werkzeug.security import generate_password_hash, check_password_hash
from app.extensions import db
from app.models.mixins import TimestampMixin, SoftDeleteMixin


class UserRole:
    READER = "reader"
    AUTHOR = "author"
    EDITOR = "editor"
    ADMIN = "admin"


user_follows = db.Table(
    "user_follows",
    db.Column("follower_id", db.Integer, db.ForeignKey("users.id"), primary_key=True),
    db.Column("followed_id", db.Integer, db.ForeignKey("users.id"), primary_key=True),
    db.Column("created_at", db.DateTime, default=datetime.utcnow),
)


class User(UserMixin, TimestampMixin, SoftDeleteMixin, db.Model):
    __tablename__ = "users"

    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(64), unique=True, nullable=False, index=True)
    email = db.Column(db.String(120), unique=True, nullable=False, index=True)
    password_hash = db.Column(db.String(256), nullable=False)
    role = db.Column(db.String(20), default=UserRole.READER, nullable=False)
    is_active = db.Column(db.Boolean, default=True, nullable=False)
    email_verified = db.Column(db.Boolean, default=False, nullable=False)
    email_verified_at = db.Column(db.DateTime, nullable=True)

    avatar = db.Column(db.String(200), nullable=True)
    bio = db.Column(db.Text, nullable=True)
    website = db.Column(db.String(200), nullable=True)
    location = db.Column(db.String(100), nullable=True)

    last_login_at = db.Column(db.DateTime, nullable=True)
    last_login_ip = db.Column(db.String(45), nullable=True)
    login_count = db.Column(db.Integer, default=0)

    posts = db.relationship("Post", backref="author", lazy="dynamic")
    comments = db.relationship("Comment", backref="author", lazy="dynamic")

    following = db.relationship(
        "User",
        secondary=user_follows,
        primaryjoin=(user_follows.c.follower_id == id),
        secondaryjoin=(user_follows.c.followed_id == id),
        backref=db.backref("followers", lazy="dynamic"),
        lazy="dynamic",
    )

    __table_args__ = (
        db.Index("ix_users_username_email", "username", "email"),
    )

    def set_password(self, password: str) -> None:
        self.password_hash = generate_password_hash(password, method="pbkdf2:sha256", salt_length=16)

    def check_password(self, password: str) -> bool:
        return check_password_hash(self.password_hash, password)

    @property
    def is_admin(self) -> bool:
        return self.role == UserRole.ADMIN

    @property
    def is_editor(self) -> bool:
        return self.role in (UserRole.EDITOR, UserRole.ADMIN)

    @property
    def is_author(self) -> bool:
        return self.role in (UserRole.AUTHOR, UserRole.EDITOR, UserRole.ADMIN)

    def follow(self, user: "User") -> None:
        if not self.is_following(user):
            self.following.append(user)

    def unfollow(self, user: "User") -> None:
        if self.is_following(user):
            self.following.remove(user)

    def is_following(self, user: "User") -> bool:
        return self.following.filter(user_follows.c.followed_id == user.id).count() > 0

    def update_login_info(self, ip_address: str) -> None:
        self.last_login_at = datetime.utcnow()
        self.last_login_ip = ip_address
        self.login_count += 1

    def get_reset_password_token(self, expires_in: int = 3600) -> str:
        import jwt
        from flask import current_app
        return jwt.encode(
            {"reset_password": self.id, "exp": datetime.utcnow() + timedelta(seconds=expires_in)},
            current_app.config["SECRET_KEY"],
            algorithm="HS256",
        )

    @staticmethod
    def verify_reset_password_token(token: str) -> "User | None":
        import jwt
        from flask import current_app
        try:
            data = jwt.decode(token, current_app.config["SECRET_KEY"], algorithms=["HS256"])
            return db.session.get(User, data.get("reset_password"))
        except jwt.PyJWTError:
            return None

    def to_dict(self, include_email: bool = False) -> dict:
        data = {
            "id": self.id,
            "username": self.username,
            "role": self.role,
            "bio": self.bio,
            "avatar": self.avatar,
            "website": self.website,
            "location": self.location,
            "post_count": self.posts.filter_by(is_deleted=False).count(),
            "follower_count": self.followers.count(),
            "following_count": self.following.count(),
            "created_at": self.created_at.isoformat(),
        }
        if include_email:
            data["email"] = self.email
        return data

    def __repr__(self):
        return f"<User {self.username!r}>"

26.6.3 文章模型

app/models/post.py

python
from datetime import datetime
from app.extensions import db
from app.models.mixins import TimestampMixin, SoftDeleteMixin, SlugMixin


post_tags = db.Table(
    "post_tags",
    db.Column("post_id", db.Integer, db.ForeignKey("posts.id"), primary_key=True),
    db.Column("tag_id", db.Integer, db.ForeignKey("tags.id"), primary_key=True),
)


class Post(TimestampMixin, SoftDeleteMixin, SlugMixin, db.Model):
    __tablename__ = "posts"

    id = db.Column(db.Integer, primary_key=True)
    title = db.Column(db.String(200), nullable=False)
    content = db.Column(db.Text, nullable=False)
    summary = db.Column(db.String(500), nullable=True)
    cover_image = db.Column(db.String(200), nullable=True)

    status = db.Column(db.String(20), default="draft", nullable=False, index=True)
    is_pinned = db.Column(db.Boolean, default=False, nullable=False)

    view_count = db.Column(db.Integer, default=0, nullable=False)
    like_count = db.Column(db.Integer, default=0, nullable=False)
    comment_count = db.Column(db.Integer, default=0, nullable=False)

    user_id = db.Column(db.Integer, db.ForeignKey("users.id"), nullable=False, index=True)
    category_id = db.Column(db.Integer, db.ForeignKey("categories.id"), nullable=True)
    published_at = db.Column(db.DateTime, nullable=True, index=True)

    tags = db.relationship("Tag", secondary=post_tags, backref=db.backref("posts", lazy="dynamic"))
    comments = db.relationship(
        "Comment", backref="post", lazy="dynamic",
        order_by="Comment.created_at.desc()",
    )

    __table_args__ = (
        db.Index("ix_posts_status_published", "status", "published_at"),
        db.Index("ix_posts_user_status", "user_id", "status"),
    )

    STATUS_DRAFT = "draft"
    STATUS_PUBLISHED = "published"
    STATUS_ARCHIVED = "archived"

    def publish(self) -> None:
        if self.status != self.STATUS_PUBLISHED:
            self.status = self.STATUS_PUBLISHED
            self.published_at = datetime.utcnow()

    def archive(self) -> None:
        self.status = self.STATUS_ARCHIVED

    def increment_view(self) -> None:
        self.view_count += 1

    def to_dict(self, include_content: bool = True) -> dict:
        data = {
            "id": self.id,
            "slug": self.slug,
            "title": self.title,
            "summary": self.summary,
            "status": self.status,
            "is_pinned": self.is_pinned,
            "view_count": self.view_count,
            "like_count": self.like_count,
            "comment_count": self.comment_count,
            "author": self.author.to_dict() if self.author else None,
            "category": self.category.to_dict() if self.category else None,
            "tags": [tag.to_dict() for tag in self.tags],
            "published_at": self.published_at.isoformat() if self.published_at else None,
            "created_at": self.created_at.isoformat(),
            "updated_at": self.updated_at.isoformat(),
        }
        if include_content:
            data["content"] = self.content
        return data

    def __repr__(self):
        return f"<Post {self.title!r}>"


class Category(TimestampMixin, db.Model):
    __tablename__ = "categories"

    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(50), unique=True, nullable=False)
    slug = db.Column(db.String(50), unique=True, nullable=False, index=True)
    description = db.Column(db.String(200), nullable=True)
    sort_order = db.Column(db.Integer, default=0, nullable=False)

    posts = db.relationship("Post", backref="category", lazy="dynamic")

    def to_dict(self) -> dict:
        return {
            "id": self.id,
            "name": self.name,
            "slug": self.slug,
            "description": self.description,
            "post_count": self.posts.filter_by(status="published", is_deleted=False).count(),
        }


class Tag(TimestampMixin, db.Model):
    __tablename__ = "tags"

    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(50), unique=True, nullable=False)
    slug = db.Column(db.String(50), unique=True, nullable=False, index=True)

    def to_dict(self) -> dict:
        return {
            "id": self.id,
            "name": self.name,
            "slug": self.slug,
            "post_count": self.posts.filter_by(status="published", is_deleted=False).count(),
        }

26.6.4 评论模型

app/models/comment.py

python
from app.extensions import db
from app.models.mixins import TimestampMixin, SoftDeleteMixin


class Comment(TimestampMixin, SoftDeleteMixin, db.Model):
    __tablename__ = "comments"

    id = db.Column(db.Integer, primary_key=True)
    content = db.Column(db.Text, nullable=False)
    is_approved = db.Column(db.Boolean, default=True, nullable=False)

    user_id = db.Column(db.Integer, db.ForeignKey("users.id"), nullable=False, index=True)
    post_id = db.Column(db.Integer, db.ForeignKey("posts.id"), nullable=False, index=True)
    parent_id = db.Column(db.Integer, db.ForeignKey("comments.id"), nullable=True, index=True)

    replies = db.relationship(
        "Comment",
        backref=db.backref("parent", remote_side=[id]),
        lazy="dynamic",
        order_by="Comment.created_at.asc()",
    )

    __table_args__ = (
        db.Index("ix_comments_post_approved", "post_id", "is_approved"),
    )

    @property
    def is_reply(self) -> bool:
        return self.parent_id is not None

    @property
    def reply_count(self) -> int:
        return self.replies.filter_by(is_deleted=False, is_approved=True).count()

    def to_dict(self, include_replies: bool = False) -> dict:
        data = {
            "id": self.id,
            "content": self.content,
            "is_approved": self.is_approved,
            "author": self.author.to_dict() if self.author else None,
            "post_id": self.post_id,
            "parent_id": self.parent_id,
            "is_reply": self.is_reply,
            "reply_count": self.reply_count,
            "created_at": self.created_at.isoformat(),
        }
        if include_replies:
            data["replies"] = [
                reply.to_dict() for reply in self.replies.filter_by(
                    is_deleted=False, is_approved=True
                ).all()
            ]
        return data

26.6.5 模型注册

app/models/__init__.py

python
from app.models.user import User, UserRole, user_follows
from app.models.post import Post, Category, Tag, post_tags
from app.models.comment import Comment

__all__ = [
    "User", "UserRole", "user_follows",
    "Post", "Category", "Tag", "post_tags",
    "Comment",
]

26.7 服务层

服务层封装业务逻辑,使视图层保持简洁,同时便于测试和复用。

26.7.1 认证服务

app/services/auth.py

python
from datetime import datetime, timedelta
import jwt
from flask import current_app
from app.extensions import db
from app.models.user import User, UserRole


class AuthService:
    @staticmethod
    def register(username: str, email: str, password: str) -> User:
        if User.query.filter_by(username=username).first():
            raise ValueError(f"Username {username!r} already exists")
        if User.query.filter_by(email=email).first():
            raise ValueError(f"Email {email!r} already registered")

        user = User(
            username=username,
            email=email,
            role=UserRole.READER,
        )
        user.set_password(password)
        db.session.add(user)
        db.session.commit()
        return user

    @staticmethod
    def authenticate(username: str, password: str) -> User | None:
        user = User.query.filter(
            (User.username == username) | (User.email == username)
        ).first()
        if user and user.check_password(password) and user.is_active:
            return user
        return None

    @staticmethod
    def generate_tokens(user: User) -> dict:
        now = datetime.utcnow()
        access_payload = {
            "sub": user.id,
            "username": user.username,
            "role": user.role,
            "iat": now,
            "exp": now + current_app.config["JWT_ACCESS_TOKEN_EXPIRES"],
            "type": "access",
        }
        refresh_payload = {
            "sub": user.id,
            "iat": now,
            "exp": now + current_app.config["JWT_REFRESH_TOKEN_EXPIRES"],
            "type": "refresh",
        }

        access_token = jwt.encode(
            access_payload,
            current_app.config["JWT_SECRET_KEY"],
            algorithm="HS256",
        )
        refresh_token = jwt.encode(
            refresh_payload,
            current_app.config["JWT_SECRET_KEY"],
            algorithm="HS256",
        )
        return {
            "access_token": access_token,
            "refresh_token": refresh_token,
            "expires_in": int(current_app.config["JWT_ACCESS_TOKEN_EXPIRES"].total_seconds()),
        }

    @staticmethod
    def verify_token(token: str, token_type: str = "access") -> dict | None:
        try:
            payload = jwt.decode(
                token,
                current_app.config["JWT_SECRET_KEY"],
                algorithms=["HS256"],
            )
            if payload.get("type") != token_type:
                return None
            return payload
        except jwt.ExpiredSignatureError:
            return None
        except jwt.InvalidTokenError:
            return None

    @staticmethod
    def refresh_access_token(refresh_token: str) -> dict | None:
        payload = AuthService.verify_token(refresh_token, token_type="refresh")
        if payload is None:
            return None
        user = db.session.get(User, payload["sub"])
        if user is None or not user.is_active:
            return None
        return AuthService.generate_tokens(user)

    @staticmethod
    def change_password(user: User, old_password: str, new_password: str) -> bool:
        if not user.check_password(old_password):
            return False
        user.set_password(new_password)
        db.session.commit()
        return True

26.7.2 文章服务

app/services/post.py

python
from flask import current_app
from sqlalchemy import or_
from app.extensions import db
from app.models.post import Post, Category, Tag
from app.models.mixins import SlugMixin


class PostService:
    @staticmethod
    def create_post(user_id: int, title: str, content: str, **kwargs) -> Post:
        slug = SlugMixin.generate_slug(title)
        existing = Post.query.filter_by(slug=slug).first()
        if existing:
            slug = f"{slug}-{Post.query.count() + 1}"

        post = Post(
            title=title,
            slug=slug,
            content=content,
            user_id=user_id,
            summary=kwargs.get("summary", content[:200] if content else ""),
            cover_image=kwargs.get("cover_image"),
            status=kwargs.get("status", Post.STATUS_DRAFT),
            is_pinned=kwargs.get("is_pinned", False),
        )

        if kwargs.get("category_name"):
            post.category = PostService._get_or_create_category(kwargs["category_name"])

        if kwargs.get("tag_names"):
            for tag_name in kwargs["tag_names"]:
                tag = PostService._get_or_create_tag(tag_name)
                post.tags.append(tag)

        if post.status == Post.STATUS_PUBLISHED:
            post.publish()

        db.session.add(post)
        db.session.commit()
        return post

    @staticmethod
    def update_post(post: Post, **kwargs) -> Post:
        if "title" in kwargs and kwargs["title"] != post.title:
            post.title = kwargs["title"]
            post.slug = SlugMixin.generate_slug(kwargs["title"])

        for field in ("content", "summary", "cover_image", "is_pinned"):
            if field in kwargs:
                setattr(post, field, kwargs[field])

        if "status" in kwargs:
            if kwargs["status"] == Post.STATUS_PUBLISHED:
                post.publish()
            elif kwargs["status"] == Post.STATUS_ARCHIVED:
                post.archive()
            else:
                post.status = kwargs["status"]

        if "category_name" in kwargs:
            post.category = PostService._get_or_create_category(kwargs["category_name"])

        if "tag_names" in kwargs:
            post.tags = []
            for tag_name in kwargs["tag_names"]:
                tag = PostService._get_or_create_tag(tag_name)
                post.tags.append(tag)

        db.session.commit()
        return post

    @staticmethod
    def get_published_posts(page: int = 1, per_page: int = None):
        per_page = per_page or current_app.config["POSTS_PER_PAGE"]
        return Post.query.filter_by(
            status=Post.STATUS_PUBLISHED, is_deleted=False
        ).order_by(
            Post.is_pinned.desc(), Post.published_at.desc()
        ).paginate(page=page, per_page=per_page, error_out=False)

    @staticmethod
    def get_post_by_slug(slug: str) -> Post | None:
        return Post.query.filter_by(slug=slug, is_deleted=False).first()

    @staticmethod
    def get_user_posts(user_id: int, page: int = 1, status: str = None):
        query = Post.query.filter_by(user_id=user_id, is_deleted=False)
        if status:
            query = query.filter_by(status=status)
        return query.order_by(Post.created_at.desc()).paginate(
            page=page, per_page=current_app.config["POSTS_PER_PAGE"], error_out=False
        )

    @staticmethod
    def search_posts(query: str, page: int = 1):
        search_filter = or_(
            Post.title.contains(query),
            Post.content.contains(query),
            Post.summary.contains(query),
        )
        return Post.query.filter(
            search_filter, Post.status == Post.STATUS_PUBLISHED, Post.is_deleted == False  # noqa: E712
        ).order_by(Post.published_at.desc()).paginate(
            page=page, per_page=current_app.config["POSTS_PER_PAGE"], error_out=False
        )

    @staticmethod
    def _get_or_create_category(name: str) -> Category:
        category = Category.query.filter_by(name=name).first()
        if not category:
            category = Category(name=name, slug=SlugMixin.generate_slug(name))
            db.session.add(category)
        return category

    @staticmethod
    def _get_or_create_tag(name: str) -> Tag:
        tag = Tag.query.filter_by(name=name).first()
        if not tag:
            tag = Tag(name=name, slug=SlugMixin.generate_slug(name))
            db.session.add(tag)
        return tag

26.7.3 邮件服务

app/services/email.py

python
from threading import Thread
from flask import current_app, render_template
from app.extensions import mail
from flask_mail import Message


class EmailService:
    @staticmethod
    def send_async_email(app, msg):
        with app.app_context():
            mail.send(msg)

    @staticmethod
    def send_email(to: str | list[str], subject: str, template: str, **kwargs):
        app = current_app._get_current_object()
        msg = Message(
            subject=app.config.get("MAIL_PREFIX", "[FlaskBlog] ") + subject,
            sender=app.config["MAIL_DEFAULT_SENDER"],
            recipients=[to] if isinstance(to, str) else to,
        )
        msg.body = render_template(f"emails/{template}.txt", **kwargs)
        msg.html = render_template(f"emails/{template}.html", **kwargs)

        thread = Thread(target=EmailService.send_async_email, args=(app, msg))
        thread.start()
        return thread

    @staticmethod
    def send_verification_email(user, token):
        EmailService.send_email(
            to=user.email, subject="Verify Your Email",
            template="verify_email", user=user, token=token,
        )

    @staticmethod
    def send_password_reset_email(user, token):
        EmailService.send_email(
            to=user.email, subject="Reset Your Password",
            template="reset_password", user=user, token=token,
        )

    @staticmethod
    def send_new_comment_notification(post_author, comment):
        EmailService.send_email(
            to=post_author.email,
            subject=f"New Comment on '{comment.post.title}'",
            template="new_comment", author=post_author, comment=comment,
        )

26.8 表单验证

app/forms/auth.py

python
from flask_wtf import FlaskForm
from wtforms import StringField, PasswordField, BooleanField, SubmitField
from wtforms.validators import DataRequired, Email, EqualTo, Length, ValidationError
from app.models.user import User


class LoginForm(FlaskForm):
    username = StringField("Username", validators=[DataRequired(), Length(1, 64)])
    password = PasswordField("Password", validators=[DataRequired()])
    remember_me = BooleanField("Remember Me")
    submit = SubmitField("Sign In")


class RegistrationForm(FlaskForm):
    username = StringField("Username", validators=[DataRequired(), Length(3, 64)])
    email = StringField("Email", validators=[DataRequired(), Email(), Length(1, 120)])
    password = PasswordField("Password", validators=[DataRequired(), Length(8, 128)])
    password2 = PasswordField(
        "Repeat Password",
        validators=[DataRequired(), EqualTo("password", message="Passwords must match.")],
    )
    submit = SubmitField("Register")

    def validate_username(self, field):
        if User.query.filter_by(username=field.data).first():
            raise ValidationError("Username already in use.")

    def validate_email(self, field):
        if User.query.filter_by(email=field.data).first():
            raise ValidationError("Email already registered.")


class ChangePasswordForm(FlaskForm):
    old_password = PasswordField("Old Password", validators=[DataRequired()])
    new_password = PasswordField("New Password", validators=[DataRequired(), Length(8, 128)])
    new_password2 = PasswordField(
        "Repeat New Password",
        validators=[DataRequired(), EqualTo("new_password")],
    )
    submit = SubmitField("Update Password")


class PasswordResetRequestForm(FlaskForm):
    email = StringField("Email", validators=[DataRequired(), Email()])
    submit = SubmitField("Reset Password")


class PasswordResetForm(FlaskForm):
    password = PasswordField("New Password", validators=[DataRequired(), Length(8, 128)])
    password2 = PasswordField(
        "Repeat Password", validators=[DataRequired(), EqualTo("password")],
    )
    submit = SubmitField("Reset Password")

app/forms/post.py

python
from flask_wtf import FlaskForm
from wtforms import StringField, TextAreaField, SelectField, SubmitField, BooleanField
from wtforms.validators import DataRequired, Length, Optional


class PostForm(FlaskForm):
    title = StringField("Title", validators=[DataRequired(), Length(1, 200)])
    content = TextAreaField("Content", validators=[DataRequired()])
    summary = TextAreaField("Summary", validators=[Optional(), Length(0, 500)])
    category = StringField("Category", validators=[Optional(), Length(0, 50)])
    tags = StringField("Tags (comma-separated)", validators=[Optional()])
    status = SelectField(
        "Status",
        choices=[("draft", "Draft"), ("published", "Published")],
        default="draft",
    )
    is_pinned = BooleanField("Pin this post")
    submit = SubmitField("Save")


class CommentForm(FlaskForm):
    content = TextAreaField("Comment", validators=[DataRequired(), Length(1, 2000)])
    submit = SubmitField("Submit Comment")

26.9 Web视图层

26.9.1 认证视图

app/views/auth.py

python
from flask import Blueprint, render_template, redirect, url_for, flash, request
from flask_login import login_user, logout_user, current_user
from app.extensions import db
from app.models.user import User
from app.forms.auth import (
    LoginForm, RegistrationForm, PasswordResetRequestForm, PasswordResetForm,
)
from app.services.auth import AuthService
from app.services.email import EmailService
from app.utils.decorators import anonymous_required

auth_bp = Blueprint("auth", __name__)


@auth_bp.route("/register", methods=["GET", "POST"])
@anonymous_required
def register():
    form = RegistrationForm()
    if form.validate_on_submit():
        try:
            AuthService.register(
                username=form.username.data,
                email=form.email.data,
                password=form.password.data,
            )
            flash("Registration successful! Please log in.", "success")
            return redirect(url_for("auth.login"))
        except ValueError as e:
            flash(str(e), "danger")
    return render_template("auth/register.html", form=form)


@auth_bp.route("/login", methods=["GET", "POST"])
@anonymous_required
def login():
    form = LoginForm()
    if form.validate_on_submit():
        user = AuthService.authenticate(form.username.data, form.password.data)
        if user:
            login_user(user, remember=form.remember_me.data)
            user.update_login_info(request.remote_addr)
            db.session.commit()
            next_page = request.args.get("next")
            flash("Login successful!", "success")
            return redirect(next_page or url_for("posts.index"))
        flash("Invalid username or password.", "danger")
    return render_template("auth/login.html", form=form)


@auth_bp.route("/logout")
def logout():
    logout_user()
    flash("You have been logged out.", "info")
    return redirect(url_for("posts.index"))


@auth_bp.route("/reset-password", methods=["GET", "POST"])
def reset_password_request():
    if current_user.is_authenticated:
        return redirect(url_for("posts.index"))
    form = PasswordResetRequestForm()
    if form.validate_on_submit():
        user = User.query.filter_by(email=form.email.data).first()
        if user:
            token = user.get_reset_password_token()
            EmailService.send_password_reset_email(user, token)
        flash("Check your email for instructions to reset your password.", "info")
        return redirect(url_for("auth.login"))
    return render_template("auth/reset_password_request.html", form=form)


@auth_bp.route("/reset-password/<token>", methods=["GET", "POST"])
def reset_password(token):
    if current_user.is_authenticated:
        return redirect(url_for("posts.index"))
    user = User.verify_reset_password_token(token)
    if not user:
        return redirect(url_for("posts.index"))
    form = PasswordResetForm()
    if form.validate_on_submit():
        user.set_password(form.password.data)
        db.session.commit()
        flash("Your password has been reset.", "success")
        return redirect(url_for("auth.login"))
    return render_template("auth/reset_password.html", form=form)

26.9.2 文章视图

app/views/posts.py

python
from flask import Blueprint, render_template, redirect, url_for, flash, request, abort, current_app
from flask_login import login_required, current_user
from app.extensions import db, cache
from app.models.post import Post, Category, Tag
from app.models.comment import Comment
from app.forms.post import PostForm, CommentForm
from app.services.post import PostService

posts_bp = Blueprint("posts", __name__)


@posts_bp.route("/")
@posts_bp.route("/index")
@cache.cached(timeout=60, query_string=True)
def index():
    page = request.args.get("page", 1, type=int)
    pagination = PostService.get_published_posts(page=page)
    categories = Category.query.order_by(Category.sort_order).all()
    return render_template(
        "posts/index.html",
        posts=pagination.items,
        pagination=pagination,
        categories=categories,
    )


@posts_bp.route("/post/<slug>")
def detail(slug):
    post = PostService.get_post_by_slug(slug)
    if post is None:
        abort(404)
    if post.status != Post.STATUS_PUBLISHED and post.author != current_user:
        abort(404)

    post.increment_view()
    db.session.commit()

    comment_form = CommentForm()
    page = request.args.get("page", 1, type=int)
    comments = Comment.query.filter_by(
        post_id=post.id, is_deleted=False, is_approved=True, parent_id=None,
    ).order_by(Comment.created_at.desc()).paginate(
        page=page, per_page=current_app.config.get("COMMENTS_PER_PAGE", 20),
        error_out=False,
    )
    return render_template(
        "posts/detail.html", post=post, comments=comments, comment_form=comment_form,
    )


@posts_bp.route("/create", methods=["GET", "POST"])
@login_required
def create():
    form = PostForm()
    if form.validate_on_submit():
        tag_names = [t.strip() for t in form.tags.data.split(",") if t.strip()] if form.tags.data else []
        post = PostService.create_post(
            user_id=current_user.id, title=form.title.data,
            content=form.content.data, summary=form.summary.data,
            category_name=form.category.data or None, tag_names=tag_names,
            status=form.status.data, is_pinned=form.is_pinned.data,
        )
        flash("Post created successfully!", "success")
        return redirect(url_for("posts.detail", slug=post.slug))
    return render_template("posts/create.html", form=form)


@posts_bp.route("/edit/<slug>", methods=["GET", "POST"])
@login_required
def edit(slug):
    post = PostService.get_post_by_slug(slug)
    if post is None:
        abort(404)
    if post.author != current_user and not current_user.is_editor:
        abort(403)

    form = PostForm()
    if form.validate_on_submit():
        tag_names = [t.strip() for t in form.tags.data.split(",") if t.strip()] if form.tags.data else []
        PostService.update_post(
            post, title=form.title.data, content=form.content.data,
            summary=form.summary.data, category_name=form.category.data or None,
            tag_names=tag_names, status=form.status.data, is_pinned=form.is_pinned.data,
        )
        flash("Post updated successfully!", "success")
        return redirect(url_for("posts.detail", slug=post.slug))

    form.title.data = post.title
    form.content.data = post.content
    form.summary.data = post.summary
    form.category.data = post.category.name if post.category else ""
    form.tags.data = ", ".join(tag.name for tag in post.tags)
    form.status.data = post.status
    form.is_pinned.data = post.is_pinned
    return render_template("posts/edit.html", form=form, post=post)


@posts_bp.route("/delete/<slug>", methods=["POST"])
@login_required
def delete(slug):
    post = PostService.get_post_by_slug(slug)
    if post is None:
        abort(404)
    if post.author != current_user and not current_user.is_admin:
        abort(403)
    post.soft_delete()
    db.session.commit()
    flash("Post deleted.", "success")
    return redirect(url_for("posts.index"))


@posts_bp.route("/search")
def search():
    query = request.args.get("q", "").strip()
    page = request.args.get("page", 1, type=int)
    if not query:
        return render_template("posts/search.html", posts=[], query="", pagination=None)
    pagination = PostService.search_posts(query, page=page)
    return render_template("posts/search.html", posts=pagination.items, pagination=pagination, query=query)


@posts_bp.route("/category/<slug>")
def category(slug):
    category = Category.query.filter_by(slug=slug).first_or_404()
    page = request.args.get("page", 1, type=int)
    pagination = Post.query.filter_by(
        category=category, status=Post.STATUS_PUBLISHED, is_deleted=False,
    ).order_by(Post.published_at.desc()).paginate(
        page=page, per_page=current_app.config["POSTS_PER_PAGE"], error_out=False,
    )
    return render_template("posts/category.html", category=category, posts=pagination.items, pagination=pagination)


@posts_bp.route("/tag/<slug>")
def tag(slug):
    tag = Tag.query.filter_by(slug=slug).first_or_404()
    page = request.args.get("page", 1, type=int)
    pagination = tag.posts.filter_by(
        status=Post.STATUS_PUBLISHED, is_deleted=False,
    ).order_by(Post.published_at.desc()).paginate(
        page=page, per_page=current_app.config["POSTS_PER_PAGE"], error_out=False,
    )
    return render_template("posts/tag.html", tag=tag, posts=pagination.items, pagination=pagination)


@posts_bp.route("/post/<int:post_id>/comment", methods=["POST"])
@login_required
def add_comment(post_id):
    post = Post.query.get_or_404(post_id)
    form = CommentForm()
    if form.validate_on_submit():
        comment = Comment(
            content=form.content.data, post_id=post.id,
            user_id=current_user.id, parent_id=request.form.get("parent_id", type=int),
        )
        db.session.add(comment)
        post.comment_count += 1
        db.session.commit()
        flash("Comment added!", "success")
    return redirect(url_for("posts.detail", slug=post.slug))

26.9.3 管理后台视图

app/views/admin.py

python
from flask import Blueprint, render_template, redirect, url_for, flash, request
from flask_login import login_required, current_user
from app.extensions import db
from app.models.user import User
from app.models.post import Post
from app.models.comment import Comment
from app.utils.decorators import admin_required

admin_bp = Blueprint("admin", __name__)


@admin_bp.before_request
@login_required
@admin_required
def check_admin():
    pass


@admin_bp.route("/dashboard")
def dashboard():
    stats = {
        "total_users": User.query.filter_by(is_deleted=False).count(),
        "total_posts": Post.query.filter_by(is_deleted=False).count(),
        "published_posts": Post.query.filter_by(status="published", is_deleted=False).count(),
        "draft_posts": Post.query.filter_by(status="draft", is_deleted=False).count(),
        "total_comments": Comment.query.filter_by(is_deleted=False).count(),
        "pending_comments": Comment.query.filter_by(is_approved=False, is_deleted=False).count(),
    }
    recent_posts = Post.query.order_by(Post.created_at.desc()).limit(5).all()
    recent_comments = Comment.query.order_by(Comment.created_at.desc()).limit(5).all()
    return render_template(
        "admin/dashboard.html", stats=stats, recent_posts=recent_posts, recent_comments=recent_comments,
    )


@admin_bp.route("/users")
def users():
    page = request.args.get("page", 1, type=int)
    pagination = User.query.filter_by(is_deleted=False).order_by(User.created_at.desc()).paginate(
        page=page, per_page=20, error_out=False,
    )
    return render_template("admin/users.html", users=pagination.items, pagination=pagination)


@admin_bp.route("/users/<int:user_id>/toggle-active", methods=["POST"])
def toggle_user_active(user_id):
    user = db.session.get(User, user_id)
    if user is None:
        flash("User not found.", "danger")
        return redirect(url_for("admin.users"))
    if user.id == current_user.id:
        flash("You cannot deactivate yourself.", "warning")
        return redirect(url_for("admin.users"))
    user.is_active = not user.is_active
    db.session.commit()
    status = "activated" if user.is_active else "deactivated"
    flash(f"User {user.username} has been {status}.", "success")
    return redirect(url_for("admin.users"))


@admin_bp.route("/comments")
def comments():
    page = request.args.get("page", 1, type=int)
    status_filter = request.args.get("status", "pending")
    query = Comment.query.filter_by(is_deleted=False)
    if status_filter == "pending":
        query = query.filter_by(is_approved=False)
    elif status_filter == "approved":
        query = query.filter_by(is_approved=True)
    pagination = query.order_by(Comment.created_at.desc()).paginate(
        page=page, per_page=20, error_out=False,
    )
    return render_template("admin/comments.html", comments=pagination.items, pagination=pagination, status_filter=status_filter)


@admin_bp.route("/comments/<int:comment_id>/approve", methods=["POST"])
def approve_comment(comment_id):
    comment = db.session.get(Comment, comment_id)
    if comment:
        comment.is_approved = True
        db.session.commit()
        flash("Comment approved.", "success")
    return redirect(url_for("admin.comments"))


@admin_bp.route("/comments/<int:comment_id>/reject", methods=["POST"])
def reject_comment(comment_id):
    comment = db.session.get(Comment, comment_id)
    if comment:
        comment.soft_delete()
        db.session.commit()
        flash("Comment rejected and removed.", "success")
    return redirect(url_for("admin.comments"))

26.9.4 错误页面视图

app/views/errors.py

python
from flask import Blueprint, render_template

errors_bp = Blueprint("errors", __name__)


@errors_bp.app_errorhandler(404)
def page_not_found(e):
    return render_template("errors/404.html"), 404


@errors_bp.app_errorhandler(500)
def internal_error(e):
    from app.extensions import db
    db.session.rollback()
    return render_template("errors/500.html"), 500


@errors_bp.app_errorhandler(403)
def forbidden(e):
    return render_template("errors/403.html"), 403

26.10 RESTful API层

26.10.1 API错误处理

app/api/errors.py

python
from flask import jsonify
from werkzeug.exceptions import HTTPException


class APIError(Exception):
    def __init__(self, message: str, status_code: int = 400, payload: dict = None):
        super().__init__()
        self.message = message
        self.status_code = status_code
        self.payload = payload or {}

    def to_dict(self) -> dict:
        result = dict(self.payload)
        result["error"] = self.message
        result["status"] = self.status_code
        return result


class NotFoundError(APIError):
    def __init__(self, message: str = "Resource not found"):
        super().__init__(message, status_code=404)


class UnauthorizedError(APIError):
    def __init__(self, message: str = "Authentication required"):
        super().__init__(message, status_code=401)


class ForbiddenError(APIError):
    def __init__(self, message: str = "Permission denied"):
        super().__init__(message, status_code=403)


class ValidationError(APIError):
    def __init__(self, message: str = "Validation error", errors: dict = None):
        payload = {"errors": errors} if errors else {}
        super().__init__(message, status_code=422, payload=payload)


def register_error_handlers(bp):
    @bp.errorhandler(APIError)
    def handle_api_error(e):
        return jsonify(e.to_dict()), e.status_code

    @bp.errorhandler(HTTPException)
    def handle_http_error(e):
        return jsonify({"error": e.description, "status": e.code}), e.code

    @bp.errorhandler(Exception)
    def handle_unexpected_error(e):
        from flask import current_app
        current_app.logger.exception("Unexpected error occurred")
        return jsonify({"error": "Internal server error", "status": 500}), 500

26.10.2 API认证装饰器

app/utils/decorators.py

python
from functools import wraps
from flask import request, redirect, url_for, abort
from flask_login import current_user
from app.services.auth import AuthService
from app.api.errors import UnauthorizedError, ForbiddenError


def anonymous_required(f):
    @wraps(f)
    def decorated(*args, **kwargs):
        if current_user.is_authenticated:
            return redirect(url_for("posts.index"))
        return f(*args, **kwargs)
    return decorated


def admin_required(f):
    @wraps(f)
    def decorated(*args, **kwargs):
        if not current_user.is_authenticated or not current_user.is_admin:
            abort(403)
        return f(*args, **kwargs)
    return decorated


def jwt_required(f):
    @wraps(f)
    def decorated(*args, **kwargs):
        token = _extract_token()
        if token is None:
            raise UnauthorizedError("Missing authentication token")

        payload = AuthService.verify_token(token, token_type="access")
        if payload is None:
            raise UnauthorizedError("Invalid or expired token")

        from app.models.user import User
        from app.extensions import db
        current_user = db.session.get(User, payload["sub"])
        if current_user is None or not current_user.is_active:
            raise UnauthorizedError("User not found or inactive")

        request.current_user = current_user
        return f(*args, **kwargs)
    return decorated


def role_required(*roles):
    def decorator(f):
        @wraps(f)
        def decorated(*args, **kwargs):
            user = getattr(request, "current_user", None)
            if user is None:
                raise UnauthorizedError("Authentication required")
            if user.role not in roles:
                raise ForbiddenError("Insufficient permissions")
            return f(*args, **kwargs)
        return decorated
    return decorator


def _extract_token():
    auth_header = request.headers.get("Authorization", "")
    if auth_header.startswith("Bearer "):
        return auth_header[7:]
    return None

26.10.3 API序列化与分页

app/utils/serializers.py

python
from flask import request, url_for, current_app


def paginate_query(query, serializer_func, endpoint=None, **kwargs):
    page = request.args.get("page", 1, type=int)
    per_page = request.args.get(
        "per_page", current_app.config.get("POSTS_PER_PAGE", 10), type=int,
    )
    per_page = min(per_page, 100)

    pagination = query.paginate(page=page, per_page=per_page, error_out=False)

    result = {
        "items": [serializer_func(item) for item in pagination.items],
        "meta": {
            "page": pagination.page,
            "per_page": pagination.per_page,
            "total": pagination.total,
            "pages": pagination.pages,
            "has_next": pagination.has_next,
            "has_prev": pagination.has_prev,
        },
    }

    links = {}
    if endpoint:
        if pagination.has_next:
            links["next"] = url_for(endpoint, page=pagination.next_num, per_page=per_page, **kwargs, _external=True)
        if pagination.has_prev:
            links["prev"] = url_for(endpoint, page=pagination.prev_num, per_page=per_page, **kwargs, _external=True)
        links["self"] = url_for(endpoint, page=page, per_page=per_page, **kwargs, _external=True)
    result["_links"] = links

    return result

26.10.4 认证API

app/api/v1/auth.py

python
from flask import request, jsonify
from app.api.v1 import api_v1_bp
from app.services.auth import AuthService
from app.api.errors import UnauthorizedError, ValidationError
from app.utils.decorators import jwt_required
from app.extensions import db


@api_v1_bp.route("/auth/register", methods=["POST"])
def api_register():
    data = request.get_json()
    if not data:
        raise ValidationError("Request body is required")

    required_fields = ["username", "email", "password"]
    missing = [f for f in required_fields if not data.get(f)]
    if missing:
        raise ValidationError(
            "Missing required fields",
            errors={f: "This field is required" for f in missing},
        )

    try:
        user = AuthService.register(
            username=data["username"], email=data["email"], password=data["password"],
        )
    except ValueError as e:
        raise ValidationError(str(e))

    tokens = AuthService.generate_tokens(user)
    return jsonify({"user": user.to_dict(include_email=True), **tokens}), 201


@api_v1_bp.route("/auth/login", methods=["POST"])
def api_login():
    data = request.get_json()
    if not data or not data.get("username") or not data.get("password"):
        raise ValidationError("Username and password are required")

    user = AuthService.authenticate(data["username"], data["password"])
    if user is None:
        raise UnauthorizedError("Invalid credentials")

    user.update_login_info(request.remote_addr)
    db.session.commit()

    tokens = AuthService.generate_tokens(user)
    return jsonify({"user": user.to_dict(include_email=True), **tokens})


@api_v1_bp.route("/auth/refresh", methods=["POST"])
def api_refresh():
    data = request.get_json()
    if not data or not data.get("refresh_token"):
        raise ValidationError("Refresh token is required")

    result = AuthService.refresh_access_token(data["refresh_token"])
    if result is None:
        raise UnauthorizedError("Invalid or expired refresh token")
    return jsonify(result)


@api_v1_bp.route("/auth/me", methods=["GET"])
@jwt_required
def api_me():
    return jsonify(request.current_user.to_dict(include_email=True))


@api_v1_bp.route("/auth/change-password", methods=["POST"])
@jwt_required
def api_change_password():
    data = request.get_json()
    if not data or not data.get("old_password") or not data.get("new_password"):
        raise ValidationError("Old password and new password are required")

    success = AuthService.change_password(
        request.current_user, data["old_password"], data["new_password"],
    )
    if not success:
        raise ValidationError("Old password is incorrect")
    return jsonify({"message": "Password changed successfully"})

26.10.5 文章API

app/api/v1/posts.py

python
from flask import request, jsonify
from sqlalchemy import or_
from app.api.v1 import api_v1_bp
from app.models.post import Post, Category, Tag
from app.services.post import PostService
from app.api.errors import NotFoundError, ForbiddenError, ValidationError
from app.utils.decorators import jwt_required
from app.utils.serializers import paginate_query
from app.extensions import db


@api_v1_bp.route("/posts", methods=["GET"])
def api_list_posts():
    query = Post.query.filter_by(status=Post.STATUS_PUBLISHED, is_deleted=False)

    category_slug = request.args.get("category")
    if category_slug:
        category = Category.query.filter_by(slug=category_slug).first()
        if category:
            query = query.filter_by(category_id=category.id)

    tag_slug = request.args.get("tag")
    if tag_slug:
        tag = Tag.query.filter_by(slug=tag_slug).first()
        if tag:
            query = query.filter(Post.tags.contains(tag))

    search = request.args.get("q")
    if search:
        query = query.filter(or_(Post.title.contains(search), Post.content.contains(search)))

    query = query.order_by(Post.is_pinned.desc(), Post.published_at.desc())
    return jsonify(paginate_query(query, lambda p: p.to_dict(include_content=False), endpoint="api_v1.api_list_posts"))


@api_v1_bp.route("/posts/<slug>", methods=["GET"])
def api_get_post(slug):
    post = Post.query.filter_by(slug=slug, is_deleted=False).first()
    if post is None:
        raise NotFoundError("Post not found")
    if post.status != Post.STATUS_PUBLISHED:
        user = getattr(request, "current_user", None)
        if user is None or (post.author != user and not user.is_editor):
            raise NotFoundError("Post not found")
    return jsonify(post.to_dict())


@api_v1_bp.route("/posts", methods=["POST"])
@jwt_required
def api_create_post():
    data = request.get_json()
    if not data or not data.get("title") or not data.get("content"):
        raise ValidationError("Title and content are required")

    post = PostService.create_post(
        user_id=request.current_user.id, title=data["title"], content=data["content"],
        summary=data.get("summary"), category_name=data.get("category"),
        tag_names=data.get("tags", []), status=data.get("status", "draft"),
    )
    return jsonify(post.to_dict()), 201


@api_v1_bp.route("/posts/<slug>", methods=["PUT"])
@jwt_required
def api_update_post(slug):
    post = Post.query.filter_by(slug=slug, is_deleted=False).first()
    if post is None:
        raise NotFoundError("Post not found")
    if post.author != request.current_user and not request.current_user.is_editor:
        raise ForbiddenError("You can only edit your own posts")

    data = request.get_json()
    if not data:
        raise ValidationError("Request body is required")

    post = PostService.update_post(
        post, title=data.get("title"), content=data.get("content"),
        summary=data.get("summary"), category_name=data.get("category"),
        tag_names=data.get("tags"), status=data.get("status"),
    )
    return jsonify(post.to_dict())


@api_v1_bp.route("/posts/<slug>", methods=["DELETE"])
@jwt_required
def api_delete_post(slug):
    post = Post.query.filter_by(slug=slug, is_deleted=False).first()
    if post is None:
        raise NotFoundError("Post not found")
    if post.author != request.current_user and not request.current_user.is_admin:
        raise ForbiddenError("You can only delete your own posts")

    post.soft_delete()
    db.session.commit()
    return "", 204


@api_v1_bp.route("/categories", methods=["GET"])
def api_list_categories():
    categories = Category.query.order_by(Category.sort_order).all()
    return jsonify([c.to_dict() for c in categories])


@api_v1_bp.route("/tags", methods=["GET"])
def api_list_tags():
    tags = Tag.query.order_by(Tag.name).all()
    return jsonify([t.to_dict() for t in tags])

26.10.6 用户与评论API

app/api/v1/users.py

python
from flask import jsonify
from app.api.v1 import api_v1_bp
from app.models.user import User
from app.api.errors import NotFoundError
from app.utils.serializers import paginate_query


@api_v1_bp.route("/users", methods=["GET"])
def api_list_users():
    query = User.query.filter_by(is_deleted=False).order_by(User.created_at.desc())
    return jsonify(paginate_query(query, lambda u: u.to_dict(), endpoint="api_v1.api_list_users"))


@api_v1_bp.route("/users/<username>", methods=["GET"])
def api_get_user(username):
    user = User.query.filter_by(username=username, is_deleted=False).first()
    if user is None:
        raise NotFoundError("User not found")
    return jsonify(user.to_dict())

app/api/v1/comments.py

python
from flask import request, jsonify
from app.api.v1 import api_v1_bp
from app.models.comment import Comment
from app.models.post import Post
from app.api.errors import ValidationError
from app.utils.decorators import jwt_required
from app.extensions import db


@api_v1_bp.route("/posts/<int:post_id>/comments", methods=["GET"])
def api_list_comments(post_id):
    Post.query.get_or_404(post_id)
    page = request.args.get("page", 1, type=int)
    per_page = min(request.args.get("per_page", 20, type=int), 50)

    pagination = Comment.query.filter_by(
        post_id=post_id, is_deleted=False, is_approved=True, parent_id=None,
    ).order_by(Comment.created_at.desc()).paginate(
        page=page, per_page=per_page, error_out=False,
    )

    return jsonify({
        "items": [c.to_dict(include_replies=True) for c in pagination.items],
        "meta": {
            "page": pagination.page, "per_page": pagination.per_page,
            "total": pagination.total, "pages": pagination.pages,
        },
    })


@api_v1_bp.route("/posts/<int:post_id>/comments", methods=["POST"])
@jwt_required
def api_create_comment(post_id):
    post = Post.query.get_or_404(post_id)
    data = request.get_json()
    if not data or not data.get("content"):
        raise ValidationError("Comment content is required")

    comment = Comment(
        content=data["content"], post_id=post.id,
        user_id=request.current_user.id,
        parent_id=data.get("parent_id"),
    )
    db.session.add(comment)
    post.comment_count += 1
    db.session.commit()
    return jsonify(comment.to_dict()), 201

26.10.7 API蓝图注册

app/api/v1/__init__.py

python
from flask import Blueprint
from app.api.errors import register_error_handlers

api_v1_bp = Blueprint("api_v1", __name__)

from app.api.v1 import auth, posts, users, comments  # noqa: E402, F401

register_error_handlers(api_v1_bp)

26.11 安全防护实践

26.11.1 OWASP Top 10 防护

威胁防护措施本项目实现
A01 访问控制失效基于角色的权限控制,最小权限原则role_required 装饰器
A02 密码学失败pbkdf2:sha256 哈希,HTTPS强制set_password 方法
A03 注入参数化查询,输入验证,bleach清洗SQLAlchemy ORM + WTForms
A04 不安全设计威胁建模,安全默认配置ProductionConfig 安全设置
A05 安全配置错误安全Header,错误页面不泄露信息Nginx安全Header
A06 过时组件依赖定期更新,安全公告监控pip audit
A07 身份认证失败速率限制,强密码策略,JWT过期flask_limiter + JWT
A08 数据完整性失败CSRF保护,CORS限制flask_wtf.csrf
A09 日志监控不足结构化日志,异常追踪RotatingFileHandler
A10 服务端请求伪造URL白名单,网络隔离内部服务验证

26.11.2 安全Header配置

Nginx安全Header:

nginx
add_header X-Content-Type-Options "nosniff" always;
add_header X-Frame-Options "SAMEORIGIN" always;
add_header X-XSS-Protection "1; mode=block" always;
add_header Content-Security-Policy "default-src 'self'; script-src 'self' 'unsafe-inline'; style-src 'self' 'unsafe-inline'; img-src 'self' data:;" always;
add_header Referrer-Policy "strict-origin-when-cross-origin" always;
add_header Permissions-Policy "camera=(), microphone=(), geolocation=()" always;

26.12 测试策略

26.12.1 测试Fixtures

tests/conftest.py

python
import pytest
from app import create_app
from app.extensions import db as _db
from app.models.user import User, UserRole


class TestUser:
    def test_user_creation(self, app, db):
        with app.app_context():
            user = User(username="testuser", email="test@example.com")
            user.set_password("TestPass123!")
            db.session.add(user)
            db.session.commit()
            assert user.id is not None
            assert user.check_password("TestPass123!")


class TestPost:
    def test_post_creation(self, app, db, sample_user):
        with app.app_context():
            from app.models.post import Post
            post = Post(title="Test Post", content="Content", author_id=sample_user.id)
            db.session.add(post)
            db.session.commit()
            assert post.id is not None
            assert post.slug is not None

26.13 本章小结

本章通过构建完整的博客系统,系统实践了以下核心知识:

  1. 项目架构:分层架构设计、蓝图模块化、工厂模式
  2. 数据模型:用户系统、文章系统、评论系统、分类标签
  3. 认证授权:Flask-Login会话认证、JWT API认证、权限装饰器
  4. 视图层:模板继承、表单处理、分页、搜索
  5. RESTful API:序列化、分页、错误处理、认证
  6. 安全防护:OWASP Top 10防护、安全Header、输入验证
  7. 测试策略:模型测试、视图测试、API测试
  8. 部署运维:Gunicorn、Nginx、Docker、监控

26.14 延伸阅读

26.14.1 Flask高级主题

  • Flask官方文档 (https://flask.palletsprojects.com/) — Flask权威指南
  • Flask Web Development (Miguel Grinberg) — Flask开发经典
  • Architecture Patterns with Python — Python架构模式

26.14.2 安全与性能

26.14.3 部署与运维

26.14.4 测试与质量


from config import TestingConfig

@pytest.fixture(scope="session") def app(): app = create_app(TestingConfig) yield app

@pytest.fixture(scope="function") def db(app): with app.app_context(): _db.create_all() yield _db _db.session.rollback() _db.drop_all()

@pytest.fixture(scope="function") def client(app, db): return app.test_client()

@pytest.fixture(scope="function") def authenticated_client(client, db): user = User(username="testuser", email="test@example.com", role=UserRole.AUTHOR) user.set_password("password123") _db.session.add(user) _db.session.commit()

with client.session_transaction() as session:
    session["_user_id"] = user.id

return client

@pytest.fixture def admin_client(client, db): admin = User(username="admin", email="admin@example.com", role=UserRole.ADMIN) admin.set_password("adminpass123") _db.session.add(admin) _db.session.commit()

with client.session_transaction() as session:
    session["_user_id"] = admin.id

return client

@pytest.fixture def auth_headers(app, db): from app.services.auth import AuthService

user = User(username="apiuser", email="api@example.com", role=UserRole.AUTHOR)
user.set_password("password123")
_db.session.add(user)
_db.session.commit()

tokens = AuthService.generate_tokens(user)
return {"Authorization": f"Bearer {tokens['access_token']}"}

### 26.12.2 测试工厂

`tests/factories.py`:

```python
import factory
from app.models.user import User, UserRole
from app.models.post import Post, Category, Tag
from app.models.comment import Comment
from app.extensions import db


class UserFactory(factory.alchemy.SQLAlchemyModelFactory):
    class Meta:
        model = User
        sqlalchemy_session = db.session
        sqlalchemy_session_persistence = "commit"

    username = factory.Sequence(lambda n: f"user{n}")
    email = factory.LazyAttribute(lambda obj: f"{obj.username}@example.com")
    role = UserRole.READER
    is_active = True

    @factory.post_generation
    def password(obj, create, extracted, **kwargs):
        if extracted:
            obj.set_password(extracted)
        else:
            obj.set_password("defaultpassword")


class CategoryFactory(factory.alchemy.SQLAlchemyModelFactory):
    class Meta:
        model = Category
        sqlalchemy_session = db.session
        sqlalchemy_session_persistence = "commit"

    name = factory.Sequence(lambda n: f"Category {n}")
    slug = factory.LazyAttribute(lambda obj: obj.name.lower().replace(" ", "-"))


class PostFactory(factory.alchemy.SQLAlchemyModelFactory):
    class Meta:
        model = Post
        sqlalchemy_session = db.session
        sqlalchemy_session_persistence = "commit"

    title = factory.Sequence(lambda n: f"Post Title {n}")
    slug = factory.LazyAttribute(lambda obj: obj.title.lower().replace(" ", "-"))
    content = factory.Faker("text")
    status = Post.STATUS_PUBLISHED
    author = factory.SubFactory(UserFactory, role=UserRole.AUTHOR)
    category = factory.SubFactory(CategoryFactory)


class CommentFactory(factory.alchemy.SQLAlchemyModelFactory):
    class Meta:
        model = Comment
        sqlalchemy_session = db.session
        sqlalchemy_session_persistence = "commit"

    content = factory.Faker("text")
    author = factory.SubFactory(UserFactory)
    post = factory.SubFactory(PostFactory)
    is_approved = True

26.12.3 单元测试

tests/unit/test_models.py

python
import pytest
from app.models.user import User, UserRole
from app.models.post import Post
from app.models.comment import Comment


class TestUserModel:
    def test_set_and_check_password(self, app, db):
        with app.app_context():
            user = User(username="test", email="test@example.com")
            user.set_password("securepassword")
            assert user.check_password("securepassword") is True
            assert user.check_password("wrongpassword") is False

    def test_role_properties(self, app, db):
        with app.app_context():
            admin = User(username="admin", email="admin@example.com", role=UserRole.ADMIN)
            assert admin.is_admin is True
            assert admin.is_editor is True
            assert admin.is_author is True

            reader = User(username="reader", email="reader@example.com", role=UserRole.READER)
            assert reader.is_admin is False
            assert reader.is_editor is False
            assert reader.is_author is False

    def test_follow_unfollow(self, app, db):
        with app.app_context():
            user1 = User(username="user1", email="user1@example.com")
            user1.set_password("password")
            user2 = User(username="user2", email="user2@example.com")
            user2.set_password("password")
            db.session.add_all([user1, user2])
            db.session.commit()

            user1.follow(user2)
            assert user1.is_following(user2) is True
            assert user2.is_following(user1) is False

            user1.unfollow(user2)
            assert user1.is_following(user2) is False


class TestPostModel:
    def test_publish(self, app, db):
        with app.app_context():
            user = User(username="author", email="author@example.com", role=UserRole.AUTHOR)
            user.set_password("password")
            db.session.add(user)
            db.session.commit()

            post = Post(title="Test", slug="test", content="Content", user_id=user.id)
            db.session.add(post)
            db.session.commit()

            assert post.status == Post.STATUS_DRAFT
            assert post.published_at is None

            post.publish()
            assert post.status == Post.STATUS_PUBLISHED
            assert post.published_at is not None

    def test_soft_delete(self, app, db):
        with app.app_context():
            user = User(username="author2", email="author2@example.com", role=UserRole.AUTHOR)
            user.set_password("password")
            db.session.add(user)
            db.session.commit()

            post = Post(title="Delete Test", slug="delete-test", content="Content", user_id=user.id)
            db.session.add(post)
            db.session.commit()

            post.soft_delete()
            assert post.is_deleted is True
            assert post.deleted_at is not None

26.12.4 API集成测试

tests/integration/test_api.py

python
import json
import pytest


class TestAuthAPI:
    def test_register(self, client, app):
        with app.app_context():
            response = client.post("/api/v1/auth/register", json={
                "username": "newuser",
                "email": "new@example.com",
                "password": "securepass123",
            })
            assert response.status_code == 201
            data = response.get_json()
            assert "access_token" in data
            assert "refresh_token" in data
            assert data["user"]["username"] == "newuser"

    def test_register_duplicate_username(self, client, app, db):
        with app.app_context():
            from app.models.user import User
            user = User(username="existing", email="existing@example.com")
            user.set_password("password")
            db.session.add(user)
            db.session.commit()

            response = client.post("/api/v1/auth/register", json={
                "username": "existing",
                "email": "another@example.com",
                "password": "securepass123",
            })
            assert response.status_code == 422

    def test_login(self, client, app, db):
        with app.app_context():
            from app.models.user import User
            user = User(username="loginuser", email="login@example.com")
            user.set_password("password123")
            db.session.add(user)
            db.session.commit()

            response = client.post("/api/v1/auth/login", json={
                "username": "loginuser",
                "password": "password123",
            })
            assert response.status_code == 200
            data = response.get_json()
            assert "access_token" in data

    def test_login_invalid_credentials(self, client, app, db):
        with app.app_context():
            response = client.post("/api/v1/auth/login", json={
                "username": "nonexistent",
                "password": "wrongpassword",
            })
            assert response.status_code == 401


class TestPostsAPI:
    def test_list_posts(self, client, app, db):
        with app.app_context():
            response = client.get("/api/v1/posts")
            assert response.status_code == 200
            data = response.get_json()
            assert "items" in data
            assert "meta" in data

    def test_create_post(self, client, app, db, auth_headers):
        with app.app_context():
            response = client.post("/api/v1/posts", json={
                "title": "API Test Post",
                "content": "This is a test post created via API.",
                "status": "published",
            }, headers=auth_headers)
            assert response.status_code == 201
            data = response.get_json()
            assert data["title"] == "API Test Post"

    def test_create_post_unauthorized(self, client, app, db):
        with app.app_context():
            response = client.post("/api/v1/posts", json={
                "title": "Unauthorized Post",
                "content": "Should fail.",
            })
            assert response.status_code == 401

    def test_delete_post(self, client, app, db, auth_headers):
        with app.app_context():
            create_resp = client.post("/api/v1/posts", json={
                "title": "To Delete", "content": "Will be deleted", "status": "published",
            }, headers=auth_headers)
            slug = create_resp.get_json()["slug"]

            response = client.delete(f"/api/v1/posts/{slug}", headers=auth_headers)
            assert response.status_code == 204

26.13 生产部署

26.13.1 Docker容器化

Dockerfile

dockerfile
FROM python:3.12-slim AS builder

WORKDIR /app

RUN pip install --no-cache-dir poetry

COPY pyproject.toml poetry.lock ./
RUN poetry config virtualenvs.create false && poetry install --no-dev --no-interaction --no-ansi

FROM python:3.12-slim

WORKDIR /app

RUN addgroup --system appgroup && adduser --system --ingroup appgroup appuser

COPY --from=builder /usr/local/lib/python3.12/site-packages /usr/local/lib/python3.12/site-packages
COPY --from=builder /usr/local/bin /usr/local/bin
COPY . .

RUN mkdir -p instance app/static/uploads && chown -R appuser:appgroup /app

USER appuser

EXPOSE 8000

CMD ["gunicorn", "--bind", "0.0.0.0:8000", "--workers", "4", "--threads", "2", "--timeout", "120", "--access-logfile", "-", "app:create_app()"]

26.13.2 Docker Compose编排

docker-compose.yml

yaml
version: "3.8"

services:
  web:
    build: .
    ports:
      - "8000:8000"
    environment:
      - FLASK_ENV=production
      - SECRET_KEY=${SECRET_KEY}
      - JWT_SECRET_KEY=${JWT_SECRET_KEY}
      - DATABASE_URL=postgresql://flaskblog:${DB_PASSWORD}@db:5432/flaskblog
      - REDIS_URL=redis://redis:6379/0
    depends_on:
      db:
        condition: service_healthy
      redis:
        condition: service_started
    restart: unless-stopped

  db:
    image: postgres:16-alpine
    volumes:
      - postgres_data:/var/lib/postgresql/data
    environment:
      - POSTGRES_DB=flaskblog
      - POSTGRES_USER=flaskblog
      - POSTGRES_PASSWORD=${DB_PASSWORD}
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U flaskblog"]
      interval: 5s
      timeout: 5s
      retries: 5
    restart: unless-stopped

  redis:
    image: redis:7-alpine
    volumes:
      - redis_data:/data
    restart: unless-stopped

  celery_worker:
    build: .
    command: celery -A celery_app.celery worker --loglevel=info --concurrency=2
    environment:
      - FLASK_ENV=production
      - SECRET_KEY=${SECRET_KEY}
      - DATABASE_URL=postgresql://flaskblog:${DB_PASSWORD}@db:5432/flaskblog
      - REDIS_URL=redis://redis:6379/0
    depends_on:
      - db
      - redis
    restart: unless-stopped

  nginx:
    image: nginx:alpine
    ports:
      - "80:80"
      - "443:443"
    volumes:
      - ./nginx.conf:/etc/nginx/conf.d/default.conf:ro
      - ./app/static:/usr/share/nginx/static:ro
    depends_on:
      - web
    restart: unless-stopped

volumes:
  postgres_data:
  redis_data:

26.13.3 Nginx反向代理

nginx.conf

nginx
upstream flask_app {
    server web:8000;
}

server {
    listen 80;
    server_name example.com;
    return 301 https://$server_name$request_uri;
}

server {
    listen 443 ssl http2;
    server_name example.com;

    ssl_certificate /etc/nginx/ssl/cert.pem;
    ssl_certificate_key /etc/nginx/ssl/key.pem;
    ssl_protocols TLSv1.2 TLSv1.3;
    ssl_ciphers HIGH:!aNULL:!MD5;

    client_max_body_size 16M;

    add_header X-Content-Type-Options "nosniff" always;
    add_header X-Frame-Options "SAMEORIGIN" always;
    add_header X-XSS-Protection "1; mode=block" always;
    add_header Referrer-Policy "strict-origin-when-cross-origin" always;

    location /static/ {
        alias /usr/share/nginx/static/;
        expires 30d;
        add_header Cache-Control "public, immutable";
    }

    location / {
        proxy_pass http://flask_app;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
        proxy_redirect off;
    }
}

26.13.4 Celery异步任务

celery_app.py

python
from celery import Celery
from config import BaseConfig


def make_celery(app):
    celery = Celery(
        app.import_name,
        broker=app.config["CELERY_BROKER_URL"],
        backend=app.config["CELERY_RESULT_BACKEND"],
    )
    celery.config_from_object({
        "task_serializer": "json",
        "accept_content": ["json"],
        "result_serializer": "json",
        "timezone": "UTC",
        "enable_utc": True,
        "task_track_started": True,
        "worker_prefetch_multiplier": 1,
        "task_acks_late": True,
    })

    class ContextTask(celery.Task):
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return self.run(*args, **kwargs)

    celery.Task = ContextTask
    return celery


from app import create_app  # noqa: E402

flask_app = create_app()
celery = make_celery(flask_app)


@celery.task(bind=True, max_retries=3)
def send_email_task(self, to, subject, template, **kwargs):
    from app.services.email import EmailService
    try:
        EmailService.send_email(to=to, subject=subject, template=template, **kwargs)
    except Exception as exc:
        raise self.retry(exc=exc, countdown=60)


@celery.task
def update_post_stats():
    from app.extensions import db
    from app.models.post import Post
    with flask_app.app_context():
        posts = Post.query.filter_by(status="published", is_deleted=False).all()
        for post in posts:
            post.comment_count = post.comments.filter_by(is_deleted=False, is_approved=True).count()
        db.session.commit()

26.14 性能优化

26.14.1 数据库查询优化

优化策略实现方式效果
索引优化复合索引覆盖高频查询减少全表扫描
查询优化joinedload/subqueryload 预加载消除N+1查询
分页优化基于游标的分页替代 OFFSET大数据集性能稳定
连接池pool_size + pool_pre_ping减少连接建立开销
慢查询日志SQLALCHEMY_RECORD_QUERIES定位性能瓶颈

预加载示例:

python
from sqlalchemy.orm import joinedload

posts = Post.query.options(
    joinedload(Post.author),
    joinedload(Post.category),
    joinedload(Post.tags),
).filter_by(status="published").all()

26.14.2 缓存策略

python
from app.extensions import cache


@cache.cached(timeout=300, key_prefix="post_list")
def get_cached_posts(page=1):
    return PostService.get_published_posts(page=page)


@cache.memoize(timeout=600)
def get_cached_post(slug):
    return PostService.get_post_by_slug(slug)


def invalidate_post_cache(slug=None):
    cache.delete("post_list")
    if slug:
        cache.delete_memoized(get_cached_post, slug)

26.14.3 Gunicorn调优

bash
gunicorn \
    --bind 0.0.0.0:8000 \
    --workers 4 \
    --threads 2 \
    --worker-class gthread \
    --timeout 120 \
    --graceful-timeout 30 \
    --max-requests 1000 \
    --max-requests-jitter 50 \
    --access-logfile - \
    --error-logfile - \
    "app:create_app()"

Workers数量推荐公式:(2 × CPU核心数) + 1


26.15 前沿技术动态

26.15.1 异步Web框架

python
from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()

class Item(BaseModel):
    name: str
    price: float

@app.post("/items/")
async def create_item(item: Item):
    return {"item": item, "status": "created"}

@app.get("/items/{item_id}")
async def read_item(item_id: int):
    return {"item_id": item_id}

26.15.2 现代前端集成

python
from flask import Flask, send_from_directory

app = Flask(__name__, static_folder='../frontend/dist')

@app.route('/', defaults={'path': ''})
@app.route('/<path:path>')
def serve_spa(path):
    if path != "" and os.path.exists(app.static_folder + '/' + path):
        return send_from_directory(app.static_folder, path)
    return send_from_directory(app.static_folder, 'index.html')

26.15.3 容器化部署

dockerfile
FROM python:3.12-slim

WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .
EXPOSE 8000
CMD ["gunicorn", "-w", "4", "-b", "0.0.0.0:8000", "app:create_app()"]

26.15.4 现代ORM实践

python
from sqlalchemy.orm import Mapped, mapped_column
from sqlalchemy import String

class User(Base):
    __tablename__ = "users"
    
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(100))
    email: Mapped[str] = mapped_column(String(255), unique=True)

26.16 本章小结

本章从零构建了一个生产级博客平台,系统覆盖了以下核心实践:

  1. 架构设计:采用分层架构(表示层→业务逻辑层→数据访问层→基础设施层),结合DDD概念组织领域模型
  2. 配置管理:多环境配置(开发/测试/生产),敏感信息通过环境变量注入,生产环境强制安全Cookie
  3. 数据模型:运用SQLAlchemy Mixin模式实现时间戳、软删除、Slug等通用能力,支持复杂关系(多对多、自引用)
  4. 认证授权:双轨认证体系(Web端Session + API端JWT),基于角色的访问控制,密码重置Token机制
  5. RESTful API:版本化API(/api/v1),统一错误处理,HATEOAS分页链接,JWT Bearer认证
  6. 安全防护:CSRF保护、XSS清洗(bleach)、CORS限制、速率限制、安全Header
  7. 测试策略:测试金字塔(单元→集成→端到端),Factory Boy数据工厂,pytest fixtures管理
  8. 生产部署:Docker多阶段构建,Docker Compose编排(Web + DB + Redis + Celery + Nginx),SSL终止

26.17 扩展练习

基础练习

  1. 为用户模型添加头像上传功能,支持图片裁剪与缩略图生成
  2. 实现文章草稿自动保存功能(前端定时保存 + 后端API)
  3. 添加文章点赞功能,要求使用Redis缓存计数

进阶练习

  1. 实现OAuth2第三方登录(GitHub/Google),集成Authlib库
  2. 构建WebSocket实时通知系统(Flask-SocketIO),实现评论实时推送
  3. 实现基于PostgreSQL全文搜索的搜索引擎,支持中文分词

高级练习

  1. 设计并实现API速率限制的滑动窗口算法,替代固定窗口
  2. 构建完整的CI/CD流水线(GitHub Actions),包含自动测试、构建、部署
  3. 实现基于Redis的分布式锁,确保并发场景下数据一致性
  4. 设计微服务拆分方案,将用户服务、文章服务、通知服务独立部署

下一章:第27章 实战:数据分析

青少年创意编程 - 高中Python组 - 江苏省宿城中等专业学校