第26章 实战:Web应用开发
学习目标
完成本章学习后,读者应能够:
- 掌握Web应用架构设计:理解分层架构、领域驱动设计在Web应用中的实践,构建可维护、可扩展的应用结构
- 精通配置管理:实现多环境配置、敏感信息保护与配置热更新机制
- 构建数据持久层:运用SQLAlchemy实现复杂关系建模、数据验证、迁移管理与查询优化
- 实现完整认证授权体系:基于JWT构建无状态认证、角色权限控制与OAuth2集成
- 设计RESTful API:遵循 Richardson 成熟度模型,实现资源化API、版本管理与HATEOAS
- 掌握安全防护实践:防御OWASP Top 10安全威胁,实现CSP、CORS、速率限制等安全策略
- 实施测试策略:构建单元测试、集成测试、API测试与端到端测试的完整测试金字塔
- 完成生产部署:掌握Docker容器化、Nginx反向代理、数据库优化与监控告警体系
26.1 Web应用架构设计理论
26.1.1 分层架构
生产级Web应用通常采用分层架构,将关注点分离到不同层次:
┌─────────────────────────────────────────────┐
│ 表示层 (Presentation) │
│ Templates / Static Assets / API Serializers│
├─────────────────────────────────────────────┤
│ 业务逻辑层 (Business Logic) │
│ Services / Domain Models / Business Rules │
├─────────────────────────────────────────────┤
│ 数据访问层 (Data Access) │
│ Repositories / ORM / Query Builders │
├─────────────────────────────────────────────┤
│ 基础设施层 (Infrastructure) │
│ Database / Cache / Message Queue / Storage │
└─────────────────────────────────────────────┘各层职责与交互原则:
| 层次 | 职责 | 依赖方向 | 关键模式 |
|---|---|---|---|
| 表示层 | 请求解析、响应序列化、输入验证 | → 业务逻辑层 | MVC、序列化器 |
| 业务逻辑层 | 核心业务规则、工作流编排 | → 数据访问层 | 服务模式、领域模型 |
| 数据访问层 | 数据持久化、查询优化 | → 基础设施层 | 仓储模式、Unit of Work |
| 基础设施层 | 外部系统集成、技术实现 | 无上层依赖 | 适配器模式 |
26.1.2 领域驱动设计核心概念
在复杂Web应用中,领域驱动设计(DDD)提供了组织业务逻辑的系统方法:
- 实体(Entity):具有唯一标识的领域对象,如用户、文章
- 值对象(Value Object):无唯一标识、通过属性定义的对象,如邮箱地址
- 聚合根(Aggregate Root):一组相关实体的边界,外部只能通过聚合根访问
- 领域服务(Domain Service):不属于任何实体的业务操作
- 仓储(Repository):聚合根的持久化抽象接口
26.1.3 项目技术选型
| 组件 | 选型 | 选型理由 |
|---|---|---|
| Web框架 | Flask 3.x | 微框架灵活性,适合渐进式架构演进 |
| ORM | SQLAlchemy 2.x | Python生态最成熟的ORM,支持异步与类型注解 |
| 数据库迁移 | Alembic | SQLAlchemy官方迁移工具,支持分支合并 |
| 认证 | PyJWT + Flask-Login | JWT用于API认证,Session用于Web认证 |
| 表单 | WTForms | 服务端验证,与Jinja2深度集成 |
| 任务队列 | Celery + Redis | 异步任务处理(邮件发送、数据统计) |
| 缓存 | Flask-Caching + Redis | 多级缓存策略,支持Redis/Memcached |
| 测试 | pytest + Factory Boy | 工厂模式创建测试数据,pytest fixtures管理 |
| 部署 | Docker + Gunicorn | 容器化部署,Gunicorn作为WSGI服务器 |
26.2 项目概述
本章将构建一个生产级博客平台 flaskblog,涵盖以下核心功能:
- 用户注册、登录、OAuth2第三方登录
- 文章的增删改查、草稿管理、Markdown渲染
- 文章分类与标签系统
- 评论与回复功能(支持嵌套)
- 全文搜索(Whoosh/PostgreSQL全文索引)
- RESTful API(JWT认证、版本管理、分页、过滤)
- 管理后台(数据统计、用户管理、内容审核)
- 异步任务(邮件通知、数据统计)
- 缓存策略
- Docker容器化部署
26.3 项目结构
flaskblog/
├── app/
│ ├── __init__.py # 应用工厂
│ ├── extensions.py # 扩展初始化
│ ├── models/
│ │ ├── __init__.py
│ │ ├── user.py # 用户模型
│ │ ├── post.py # 文章模型
│ │ ├── comment.py # 评论模型
│ │ └── mixins.py # 通用混入类
│ ├── services/
│ │ ├── __init__.py
│ │ ├── auth.py # 认证服务
│ │ ├── post.py # 文章服务
│ │ ├── search.py # 搜索服务
│ │ └── email.py # 邮件服务
│ ├── api/
│ │ ├── __init__.py
│ │ ├── v1/
│ │ │ ├── __init__.py
│ │ │ ├── auth.py # API认证端点
│ │ │ ├── posts.py # 文章API
│ │ │ ├── users.py # 用户API
│ │ │ └── comments.py # 评论API
│ │ └── errors.py # API错误处理
│ ├── views/
│ │ ├── __init__.py
│ │ ├── auth.py # Web认证视图
│ │ ├── posts.py # Web文章视图
│ │ ├── admin.py # 管理后台视图
│ │ └── errors.py # 错误页面视图
│ ├── forms/
│ │ ├── __init__.py
│ │ ├── auth.py # 认证表单
│ │ └── post.py # 文章表单
│ ├── templates/ # Jinja2模板
│ ├── static/ # 静态资源
│ └── utils/
│ ├── __init__.py
│ ├── decorators.py # 自定义装饰器
│ ├── pagination.py # 分页工具
│ └── serializers.py # 序列化工具
├── migrations/ # Alembic迁移文件
├── tests/
│ ├── conftest.py # pytest fixtures
│ ├── factories.py # Factory Boy工厂
│ ├── unit/
│ ├── integration/
│ └── e2e/
├── celery_app.py # Celery配置
├── config.py # 配置类
├── pyproject.toml # 项目元数据与依赖
├── Dockerfile # 容器构建
├── docker-compose.yml # 编排配置
├── nginx.conf # Nginx配置
└── .env.example # 环境变量示例26.4 配置管理
26.4.1 多环境配置
import os
from datetime import timedelta
from pathlib import Path
class BaseConfig:
SECRET_KEY = os.environ.get("SECRET_KEY", "dev-secret-key-change-in-production")
SQLALCHEMY_TRACK_MODIFICATIONS = False
SQLALCHEMY_RECORD_QUERIES = True
BASE_DIR = Path(__file__).resolve().parent
SQLITE_DB_PATH = BASE_DIR / "instance" / "app.db"
SQLALCHEMY_DATABASE_URI = os.environ.get(
"DATABASE_URL",
f"sqlite:///{SQLITE_DB_PATH}",
)
REDIS_URL = os.environ.get("REDIS_URL", "redis://localhost:6379/0")
CACHE_TYPE = "RedisCache"
CACHE_REDIS_URL = REDIS_URL
CACHE_DEFAULT_TIMEOUT = 300
JWT_SECRET_KEY = os.environ.get("JWT_SECRET_KEY", "jwt-dev-secret")
JWT_ACCESS_TOKEN_EXPIRES = timedelta(hours=1)
JWT_REFRESH_TOKEN_EXPIRES = timedelta(days=30)
MAIL_SERVER = os.environ.get("MAIL_SERVER", "localhost")
MAIL_PORT = int(os.environ.get("MAIL_PORT", 25))
MAIL_USE_TLS = os.environ.get("MAIL_USE_TLS", "false").lower() == "true"
MAIL_USERNAME = os.environ.get("MAIL_USERNAME")
MAIL_PASSWORD = os.environ.get("MAIL_PASSWORD")
MAIL_DEFAULT_SENDER = os.environ.get("MAIL_DEFAULT_SENDER", "noreply@flaskblog.local")
CELERY_BROKER_URL = REDIS_URL
CELERY_RESULT_BACKEND = REDIS_URL
POSTS_PER_PAGE = 10
COMMENTS_PER_PAGE = 20
MAX_CONTENT_LENGTH = 16 * 1024 * 1024
ALLOWED_EXTENSIONS = {"png", "jpg", "jpeg", "gif", "webp"}
UPLOAD_FOLDER = BASE_DIR / "app" / "static" / "uploads"
SECURITY_PASSWORD_SALT = os.environ.get("SECURITY_PASSWORD_SALT", "password-salt")
@staticmethod
def init_app(app):
pass
class DevelopmentConfig(BaseConfig):
DEBUG = True
SQLALCHEMY_ECHO = True
CACHE_TYPE = "SimpleCache"
class TestingConfig(BaseConfig):
TESTING = True
SQLALCHEMY_DATABASE_URI = "sqlite://"
CACHE_TYPE = "NullCache"
WTF_CSRF_ENABLED = False
JWT_ACCESS_TOKEN_EXPIRES = timedelta(minutes=5)
@staticmethod
def init_app(app):
import tempfile
app.config["UPLOAD_FOLDER"] = Path(tempfile.mkdtemp())
class ProductionConfig(BaseConfig):
SQLALCHEMY_ENGINE_OPTIONS = {
"pool_size": 10,
"pool_recycle": 3600,
"pool_pre_ping": True,
"connect_args": {"connect_timeout": 5},
}
SESSION_COOKIE_SECURE = True
SESSION_COOKIE_HTTPONLY = True
SESSION_COOKIE_SAMESITE = "Lax"
REMEMBER_COOKIE_SECURE = True
REMEMBER_COOKIE_HTTPONLY = True
@staticmethod
def init_app(app):
import logging
from logging.handlers import RotatingFileHandler
handler = RotatingFileHandler(
"flaskblog.log", maxBytes=10 * 1024 * 1024, backupCount=5
)
handler.setFormatter(logging.Formatter(
"%(asctime)s %(levelname)s %(name)s %(threadName)s %(message)s"
))
handler.setLevel(logging.INFO)
app.logger.addHandler(handler)
config_map = {
"development": DevelopmentConfig,
"testing": TestingConfig,
"production": ProductionConfig,
"default": DevelopmentConfig,
}
def get_config():
env = os.environ.get("FLASK_ENV", "default")
return config_map.get(env, DevelopmentConfig)26.4.2 环境变量管理
.env.example:
FLASK_ENV=development
SECRET_KEY=your-secret-key-here
JWT_SECRET_KEY=your-jwt-secret-key-here
SECURITY_PASSWORD_SALT=your-password-salt-here
DATABASE_URL=postgresql://user:password@localhost:5432/flaskblog
REDIS_URL=redis://localhost:6379/0
MAIL_SERVER=smtp.example.com
MAIL_PORT=587
MAIL_USE_TLS=true
MAIL_USERNAME=your-email@example.com
MAIL_PASSWORD=your-email-password
MAIL_DEFAULT_SENDER=noreply@example.com26.5 应用工厂与扩展初始化
26.5.1 扩展初始化
app/extensions.py:
from flask_sqlalchemy import SQLAlchemy
from flask_login import LoginManager
from flask_migrate import Migrate
from flask_caching import Cache
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
from flask_cors import CORS
from flask_wtf.csrf import CSRFProtect
db = SQLAlchemy()
login_manager = LoginManager()
migrate = Migrate()
cache = Cache()
limiter = Limiter(
key_func=get_remote_address,
default_limits=["200 per day", "50 per hour"],
storage_uri="redis://localhost:6379/1",
)
cors = CORS()
csrf = CSRFProtect()
login_manager.login_view = "auth.login"
login_manager.login_message_category = "info"
login_manager.session_protection = "strong"
@login_manager.user_loader
def load_user(user_id):
from app.models.user import User
return db.session.get(User, int(user_id))26.5.2 应用工厂
app/__init__.py:
from flask import Flask
from config import get_config
from app.extensions import db, login_manager, migrate, cache, limiter, cors, csrf
def create_app(config_class=None):
app = Flask(__name__)
if config_class is None:
config_class = get_config()
app.config.from_object(config_class)
config_class.init_app(app)
_init_extensions(app)
_register_blueprints(app)
_register_error_handlers(app)
_register_template_filters(app)
_register_context_processors(app)
return app
def _init_extensions(app):
db.init_app(app)
login_manager.init_app(app)
migrate.init_app(app, db)
cache.init_app(app)
limiter.init_app(app)
cors.init_app(app, resources={r"/api/*": {"origins": "*"}})
csrf.init_app(app)
csrf.exempt(app.blueprints.get("api"))
with app.app_context():
from app import models # noqa: F401
db.create_all()
def _register_blueprints(app):
from app.views.auth import auth_bp
from app.views.posts import posts_bp
from app.views.admin import admin_bp
from app.views.errors import errors_bp
from app.api.v1 import api_v1_bp
app.register_blueprint(auth_bp, url_prefix="/auth")
app.register_blueprint(posts_bp)
app.register_blueprint(admin_bp, url_prefix="/admin")
app.register_blueprint(errors_bp)
app.register_blueprint(api_v1_bp, url_prefix="/api/v1")
def _register_error_handlers(app):
from app.views.errors import page_not_found, internal_error, forbidden
app.register_error_handler(404, page_not_found)
app.register_error_handler(500, internal_error)
app.register_error_handler(403, forbidden)
def _register_template_filters(app):
import markdown as md
import bleach
@app.template_filter("markdown")
def markdown_filter(text):
allowed_tags = [
"a", "abbr", "acronym", "b", "blockquote", "br", "code",
"dd", "del", "dl", "dt", "em", "h1", "h2", "h3", "h4",
"h5", "h6", "hr", "i", "img", "li", "ol", "p", "pre",
"span", "strong", "table", "tbody", "td", "th", "thead",
"tr", "ul",
]
allowed_attrs = {
"*": ["class"],
"a": ["href", "rel", "title"],
"img": ["alt", "src", "title"],
}
html = md.markdown(text, extensions=[
"fenced_code", "tables", "toc", "nl2br", "codehilite",
])
return bleach.clean(html, tags=allowed_tags, attributes=allowed_attrs)
@app.template_filter("truncate_chars")
def truncate_chars_filter(text, length=200):
if len(text) <= length:
return text
return text[:length].rsplit(" ", 1)[0] + "..."
def _register_context_processors(app):
from datetime import datetime
@app.context_processor
def inject_globals():
return {
"now": datetime.utcnow,
"site_name": app.config.get("SITE_NAME", "FlaskBlog"),
}26.6 数据模型层
26.6.1 通用混入类
app/models/mixins.py:
from datetime import datetime
from app.extensions import db
class TimestampMixin:
created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)
updated_at = db.Column(
db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False
)
class SoftDeleteMixin:
is_deleted = db.Column(db.Boolean, default=False, nullable=False, index=True)
deleted_at = db.Column(db.DateTime, nullable=True)
def soft_delete(self):
self.is_deleted = True
self.deleted_at = datetime.utcnow()
def restore(self):
self.is_deleted = False
self.deleted_at = None
class SlugMixin:
slug = db.Column(db.String(200), unique=True, nullable=False, index=True)
@staticmethod
def generate_slug(title: str) -> str:
import re
import unicodedata
slug = unicodedata.normalize("NFKD", title)
slug = slug.encode("ascii", "ignore").decode("ascii")
slug = re.sub(r"[^\w\s-]", "", slug).strip().lower()
slug = re.sub(r"[-\s]+", "-", slug)
return slug[:200]26.6.2 用户模型
app/models/user.py:
from datetime import datetime, timedelta
from flask_login import UserMixin
from werkzeug.security import generate_password_hash, check_password_hash
from app.extensions import db
from app.models.mixins import TimestampMixin, SoftDeleteMixin
class UserRole:
READER = "reader"
AUTHOR = "author"
EDITOR = "editor"
ADMIN = "admin"
user_follows = db.Table(
"user_follows",
db.Column("follower_id", db.Integer, db.ForeignKey("users.id"), primary_key=True),
db.Column("followed_id", db.Integer, db.ForeignKey("users.id"), primary_key=True),
db.Column("created_at", db.DateTime, default=datetime.utcnow),
)
class User(UserMixin, TimestampMixin, SoftDeleteMixin, db.Model):
__tablename__ = "users"
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(64), unique=True, nullable=False, index=True)
email = db.Column(db.String(120), unique=True, nullable=False, index=True)
password_hash = db.Column(db.String(256), nullable=False)
role = db.Column(db.String(20), default=UserRole.READER, nullable=False)
is_active = db.Column(db.Boolean, default=True, nullable=False)
email_verified = db.Column(db.Boolean, default=False, nullable=False)
email_verified_at = db.Column(db.DateTime, nullable=True)
avatar = db.Column(db.String(200), nullable=True)
bio = db.Column(db.Text, nullable=True)
website = db.Column(db.String(200), nullable=True)
location = db.Column(db.String(100), nullable=True)
last_login_at = db.Column(db.DateTime, nullable=True)
last_login_ip = db.Column(db.String(45), nullable=True)
login_count = db.Column(db.Integer, default=0)
posts = db.relationship("Post", backref="author", lazy="dynamic")
comments = db.relationship("Comment", backref="author", lazy="dynamic")
following = db.relationship(
"User",
secondary=user_follows,
primaryjoin=(user_follows.c.follower_id == id),
secondaryjoin=(user_follows.c.followed_id == id),
backref=db.backref("followers", lazy="dynamic"),
lazy="dynamic",
)
__table_args__ = (
db.Index("ix_users_username_email", "username", "email"),
)
def set_password(self, password: str) -> None:
self.password_hash = generate_password_hash(password, method="pbkdf2:sha256", salt_length=16)
def check_password(self, password: str) -> bool:
return check_password_hash(self.password_hash, password)
@property
def is_admin(self) -> bool:
return self.role == UserRole.ADMIN
@property
def is_editor(self) -> bool:
return self.role in (UserRole.EDITOR, UserRole.ADMIN)
@property
def is_author(self) -> bool:
return self.role in (UserRole.AUTHOR, UserRole.EDITOR, UserRole.ADMIN)
def follow(self, user: "User") -> None:
if not self.is_following(user):
self.following.append(user)
def unfollow(self, user: "User") -> None:
if self.is_following(user):
self.following.remove(user)
def is_following(self, user: "User") -> bool:
return self.following.filter(user_follows.c.followed_id == user.id).count() > 0
def update_login_info(self, ip_address: str) -> None:
self.last_login_at = datetime.utcnow()
self.last_login_ip = ip_address
self.login_count += 1
def get_reset_password_token(self, expires_in: int = 3600) -> str:
import jwt
from flask import current_app
return jwt.encode(
{"reset_password": self.id, "exp": datetime.utcnow() + timedelta(seconds=expires_in)},
current_app.config["SECRET_KEY"],
algorithm="HS256",
)
@staticmethod
def verify_reset_password_token(token: str) -> "User | None":
import jwt
from flask import current_app
try:
data = jwt.decode(token, current_app.config["SECRET_KEY"], algorithms=["HS256"])
return db.session.get(User, data.get("reset_password"))
except jwt.PyJWTError:
return None
def to_dict(self, include_email: bool = False) -> dict:
data = {
"id": self.id,
"username": self.username,
"role": self.role,
"bio": self.bio,
"avatar": self.avatar,
"website": self.website,
"location": self.location,
"post_count": self.posts.filter_by(is_deleted=False).count(),
"follower_count": self.followers.count(),
"following_count": self.following.count(),
"created_at": self.created_at.isoformat(),
}
if include_email:
data["email"] = self.email
return data
def __repr__(self):
return f"<User {self.username!r}>"26.6.3 文章模型
app/models/post.py:
from datetime import datetime
from app.extensions import db
from app.models.mixins import TimestampMixin, SoftDeleteMixin, SlugMixin
post_tags = db.Table(
"post_tags",
db.Column("post_id", db.Integer, db.ForeignKey("posts.id"), primary_key=True),
db.Column("tag_id", db.Integer, db.ForeignKey("tags.id"), primary_key=True),
)
class Post(TimestampMixin, SoftDeleteMixin, SlugMixin, db.Model):
__tablename__ = "posts"
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(200), nullable=False)
content = db.Column(db.Text, nullable=False)
summary = db.Column(db.String(500), nullable=True)
cover_image = db.Column(db.String(200), nullable=True)
status = db.Column(db.String(20), default="draft", nullable=False, index=True)
is_pinned = db.Column(db.Boolean, default=False, nullable=False)
view_count = db.Column(db.Integer, default=0, nullable=False)
like_count = db.Column(db.Integer, default=0, nullable=False)
comment_count = db.Column(db.Integer, default=0, nullable=False)
user_id = db.Column(db.Integer, db.ForeignKey("users.id"), nullable=False, index=True)
category_id = db.Column(db.Integer, db.ForeignKey("categories.id"), nullable=True)
published_at = db.Column(db.DateTime, nullable=True, index=True)
tags = db.relationship("Tag", secondary=post_tags, backref=db.backref("posts", lazy="dynamic"))
comments = db.relationship(
"Comment", backref="post", lazy="dynamic",
order_by="Comment.created_at.desc()",
)
__table_args__ = (
db.Index("ix_posts_status_published", "status", "published_at"),
db.Index("ix_posts_user_status", "user_id", "status"),
)
STATUS_DRAFT = "draft"
STATUS_PUBLISHED = "published"
STATUS_ARCHIVED = "archived"
def publish(self) -> None:
if self.status != self.STATUS_PUBLISHED:
self.status = self.STATUS_PUBLISHED
self.published_at = datetime.utcnow()
def archive(self) -> None:
self.status = self.STATUS_ARCHIVED
def increment_view(self) -> None:
self.view_count += 1
def to_dict(self, include_content: bool = True) -> dict:
data = {
"id": self.id,
"slug": self.slug,
"title": self.title,
"summary": self.summary,
"status": self.status,
"is_pinned": self.is_pinned,
"view_count": self.view_count,
"like_count": self.like_count,
"comment_count": self.comment_count,
"author": self.author.to_dict() if self.author else None,
"category": self.category.to_dict() if self.category else None,
"tags": [tag.to_dict() for tag in self.tags],
"published_at": self.published_at.isoformat() if self.published_at else None,
"created_at": self.created_at.isoformat(),
"updated_at": self.updated_at.isoformat(),
}
if include_content:
data["content"] = self.content
return data
def __repr__(self):
return f"<Post {self.title!r}>"
class Category(TimestampMixin, db.Model):
__tablename__ = "categories"
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(50), unique=True, nullable=False)
slug = db.Column(db.String(50), unique=True, nullable=False, index=True)
description = db.Column(db.String(200), nullable=True)
sort_order = db.Column(db.Integer, default=0, nullable=False)
posts = db.relationship("Post", backref="category", lazy="dynamic")
def to_dict(self) -> dict:
return {
"id": self.id,
"name": self.name,
"slug": self.slug,
"description": self.description,
"post_count": self.posts.filter_by(status="published", is_deleted=False).count(),
}
class Tag(TimestampMixin, db.Model):
__tablename__ = "tags"
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(50), unique=True, nullable=False)
slug = db.Column(db.String(50), unique=True, nullable=False, index=True)
def to_dict(self) -> dict:
return {
"id": self.id,
"name": self.name,
"slug": self.slug,
"post_count": self.posts.filter_by(status="published", is_deleted=False).count(),
}26.6.4 评论模型
app/models/comment.py:
from app.extensions import db
from app.models.mixins import TimestampMixin, SoftDeleteMixin
class Comment(TimestampMixin, SoftDeleteMixin, db.Model):
__tablename__ = "comments"
id = db.Column(db.Integer, primary_key=True)
content = db.Column(db.Text, nullable=False)
is_approved = db.Column(db.Boolean, default=True, nullable=False)
user_id = db.Column(db.Integer, db.ForeignKey("users.id"), nullable=False, index=True)
post_id = db.Column(db.Integer, db.ForeignKey("posts.id"), nullable=False, index=True)
parent_id = db.Column(db.Integer, db.ForeignKey("comments.id"), nullable=True, index=True)
replies = db.relationship(
"Comment",
backref=db.backref("parent", remote_side=[id]),
lazy="dynamic",
order_by="Comment.created_at.asc()",
)
__table_args__ = (
db.Index("ix_comments_post_approved", "post_id", "is_approved"),
)
@property
def is_reply(self) -> bool:
return self.parent_id is not None
@property
def reply_count(self) -> int:
return self.replies.filter_by(is_deleted=False, is_approved=True).count()
def to_dict(self, include_replies: bool = False) -> dict:
data = {
"id": self.id,
"content": self.content,
"is_approved": self.is_approved,
"author": self.author.to_dict() if self.author else None,
"post_id": self.post_id,
"parent_id": self.parent_id,
"is_reply": self.is_reply,
"reply_count": self.reply_count,
"created_at": self.created_at.isoformat(),
}
if include_replies:
data["replies"] = [
reply.to_dict() for reply in self.replies.filter_by(
is_deleted=False, is_approved=True
).all()
]
return data26.6.5 模型注册
app/models/__init__.py:
from app.models.user import User, UserRole, user_follows
from app.models.post import Post, Category, Tag, post_tags
from app.models.comment import Comment
__all__ = [
"User", "UserRole", "user_follows",
"Post", "Category", "Tag", "post_tags",
"Comment",
]26.7 服务层
服务层封装业务逻辑,使视图层保持简洁,同时便于测试和复用。
26.7.1 认证服务
app/services/auth.py:
from datetime import datetime, timedelta
import jwt
from flask import current_app
from app.extensions import db
from app.models.user import User, UserRole
class AuthService:
@staticmethod
def register(username: str, email: str, password: str) -> User:
if User.query.filter_by(username=username).first():
raise ValueError(f"Username {username!r} already exists")
if User.query.filter_by(email=email).first():
raise ValueError(f"Email {email!r} already registered")
user = User(
username=username,
email=email,
role=UserRole.READER,
)
user.set_password(password)
db.session.add(user)
db.session.commit()
return user
@staticmethod
def authenticate(username: str, password: str) -> User | None:
user = User.query.filter(
(User.username == username) | (User.email == username)
).first()
if user and user.check_password(password) and user.is_active:
return user
return None
@staticmethod
def generate_tokens(user: User) -> dict:
now = datetime.utcnow()
access_payload = {
"sub": user.id,
"username": user.username,
"role": user.role,
"iat": now,
"exp": now + current_app.config["JWT_ACCESS_TOKEN_EXPIRES"],
"type": "access",
}
refresh_payload = {
"sub": user.id,
"iat": now,
"exp": now + current_app.config["JWT_REFRESH_TOKEN_EXPIRES"],
"type": "refresh",
}
access_token = jwt.encode(
access_payload,
current_app.config["JWT_SECRET_KEY"],
algorithm="HS256",
)
refresh_token = jwt.encode(
refresh_payload,
current_app.config["JWT_SECRET_KEY"],
algorithm="HS256",
)
return {
"access_token": access_token,
"refresh_token": refresh_token,
"expires_in": int(current_app.config["JWT_ACCESS_TOKEN_EXPIRES"].total_seconds()),
}
@staticmethod
def verify_token(token: str, token_type: str = "access") -> dict | None:
try:
payload = jwt.decode(
token,
current_app.config["JWT_SECRET_KEY"],
algorithms=["HS256"],
)
if payload.get("type") != token_type:
return None
return payload
except jwt.ExpiredSignatureError:
return None
except jwt.InvalidTokenError:
return None
@staticmethod
def refresh_access_token(refresh_token: str) -> dict | None:
payload = AuthService.verify_token(refresh_token, token_type="refresh")
if payload is None:
return None
user = db.session.get(User, payload["sub"])
if user is None or not user.is_active:
return None
return AuthService.generate_tokens(user)
@staticmethod
def change_password(user: User, old_password: str, new_password: str) -> bool:
if not user.check_password(old_password):
return False
user.set_password(new_password)
db.session.commit()
return True26.7.2 文章服务
app/services/post.py:
from flask import current_app
from sqlalchemy import or_
from app.extensions import db
from app.models.post import Post, Category, Tag
from app.models.mixins import SlugMixin
class PostService:
@staticmethod
def create_post(user_id: int, title: str, content: str, **kwargs) -> Post:
slug = SlugMixin.generate_slug(title)
existing = Post.query.filter_by(slug=slug).first()
if existing:
slug = f"{slug}-{Post.query.count() + 1}"
post = Post(
title=title,
slug=slug,
content=content,
user_id=user_id,
summary=kwargs.get("summary", content[:200] if content else ""),
cover_image=kwargs.get("cover_image"),
status=kwargs.get("status", Post.STATUS_DRAFT),
is_pinned=kwargs.get("is_pinned", False),
)
if kwargs.get("category_name"):
post.category = PostService._get_or_create_category(kwargs["category_name"])
if kwargs.get("tag_names"):
for tag_name in kwargs["tag_names"]:
tag = PostService._get_or_create_tag(tag_name)
post.tags.append(tag)
if post.status == Post.STATUS_PUBLISHED:
post.publish()
db.session.add(post)
db.session.commit()
return post
@staticmethod
def update_post(post: Post, **kwargs) -> Post:
if "title" in kwargs and kwargs["title"] != post.title:
post.title = kwargs["title"]
post.slug = SlugMixin.generate_slug(kwargs["title"])
for field in ("content", "summary", "cover_image", "is_pinned"):
if field in kwargs:
setattr(post, field, kwargs[field])
if "status" in kwargs:
if kwargs["status"] == Post.STATUS_PUBLISHED:
post.publish()
elif kwargs["status"] == Post.STATUS_ARCHIVED:
post.archive()
else:
post.status = kwargs["status"]
if "category_name" in kwargs:
post.category = PostService._get_or_create_category(kwargs["category_name"])
if "tag_names" in kwargs:
post.tags = []
for tag_name in kwargs["tag_names"]:
tag = PostService._get_or_create_tag(tag_name)
post.tags.append(tag)
db.session.commit()
return post
@staticmethod
def get_published_posts(page: int = 1, per_page: int = None):
per_page = per_page or current_app.config["POSTS_PER_PAGE"]
return Post.query.filter_by(
status=Post.STATUS_PUBLISHED, is_deleted=False
).order_by(
Post.is_pinned.desc(), Post.published_at.desc()
).paginate(page=page, per_page=per_page, error_out=False)
@staticmethod
def get_post_by_slug(slug: str) -> Post | None:
return Post.query.filter_by(slug=slug, is_deleted=False).first()
@staticmethod
def get_user_posts(user_id: int, page: int = 1, status: str = None):
query = Post.query.filter_by(user_id=user_id, is_deleted=False)
if status:
query = query.filter_by(status=status)
return query.order_by(Post.created_at.desc()).paginate(
page=page, per_page=current_app.config["POSTS_PER_PAGE"], error_out=False
)
@staticmethod
def search_posts(query: str, page: int = 1):
search_filter = or_(
Post.title.contains(query),
Post.content.contains(query),
Post.summary.contains(query),
)
return Post.query.filter(
search_filter, Post.status == Post.STATUS_PUBLISHED, Post.is_deleted == False # noqa: E712
).order_by(Post.published_at.desc()).paginate(
page=page, per_page=current_app.config["POSTS_PER_PAGE"], error_out=False
)
@staticmethod
def _get_or_create_category(name: str) -> Category:
category = Category.query.filter_by(name=name).first()
if not category:
category = Category(name=name, slug=SlugMixin.generate_slug(name))
db.session.add(category)
return category
@staticmethod
def _get_or_create_tag(name: str) -> Tag:
tag = Tag.query.filter_by(name=name).first()
if not tag:
tag = Tag(name=name, slug=SlugMixin.generate_slug(name))
db.session.add(tag)
return tag26.7.3 邮件服务
app/services/email.py:
from threading import Thread
from flask import current_app, render_template
from app.extensions import mail
from flask_mail import Message
class EmailService:
@staticmethod
def send_async_email(app, msg):
with app.app_context():
mail.send(msg)
@staticmethod
def send_email(to: str | list[str], subject: str, template: str, **kwargs):
app = current_app._get_current_object()
msg = Message(
subject=app.config.get("MAIL_PREFIX", "[FlaskBlog] ") + subject,
sender=app.config["MAIL_DEFAULT_SENDER"],
recipients=[to] if isinstance(to, str) else to,
)
msg.body = render_template(f"emails/{template}.txt", **kwargs)
msg.html = render_template(f"emails/{template}.html", **kwargs)
thread = Thread(target=EmailService.send_async_email, args=(app, msg))
thread.start()
return thread
@staticmethod
def send_verification_email(user, token):
EmailService.send_email(
to=user.email, subject="Verify Your Email",
template="verify_email", user=user, token=token,
)
@staticmethod
def send_password_reset_email(user, token):
EmailService.send_email(
to=user.email, subject="Reset Your Password",
template="reset_password", user=user, token=token,
)
@staticmethod
def send_new_comment_notification(post_author, comment):
EmailService.send_email(
to=post_author.email,
subject=f"New Comment on '{comment.post.title}'",
template="new_comment", author=post_author, comment=comment,
)26.8 表单验证
app/forms/auth.py:
from flask_wtf import FlaskForm
from wtforms import StringField, PasswordField, BooleanField, SubmitField
from wtforms.validators import DataRequired, Email, EqualTo, Length, ValidationError
from app.models.user import User
class LoginForm(FlaskForm):
username = StringField("Username", validators=[DataRequired(), Length(1, 64)])
password = PasswordField("Password", validators=[DataRequired()])
remember_me = BooleanField("Remember Me")
submit = SubmitField("Sign In")
class RegistrationForm(FlaskForm):
username = StringField("Username", validators=[DataRequired(), Length(3, 64)])
email = StringField("Email", validators=[DataRequired(), Email(), Length(1, 120)])
password = PasswordField("Password", validators=[DataRequired(), Length(8, 128)])
password2 = PasswordField(
"Repeat Password",
validators=[DataRequired(), EqualTo("password", message="Passwords must match.")],
)
submit = SubmitField("Register")
def validate_username(self, field):
if User.query.filter_by(username=field.data).first():
raise ValidationError("Username already in use.")
def validate_email(self, field):
if User.query.filter_by(email=field.data).first():
raise ValidationError("Email already registered.")
class ChangePasswordForm(FlaskForm):
old_password = PasswordField("Old Password", validators=[DataRequired()])
new_password = PasswordField("New Password", validators=[DataRequired(), Length(8, 128)])
new_password2 = PasswordField(
"Repeat New Password",
validators=[DataRequired(), EqualTo("new_password")],
)
submit = SubmitField("Update Password")
class PasswordResetRequestForm(FlaskForm):
email = StringField("Email", validators=[DataRequired(), Email()])
submit = SubmitField("Reset Password")
class PasswordResetForm(FlaskForm):
password = PasswordField("New Password", validators=[DataRequired(), Length(8, 128)])
password2 = PasswordField(
"Repeat Password", validators=[DataRequired(), EqualTo("password")],
)
submit = SubmitField("Reset Password")app/forms/post.py:
from flask_wtf import FlaskForm
from wtforms import StringField, TextAreaField, SelectField, SubmitField, BooleanField
from wtforms.validators import DataRequired, Length, Optional
class PostForm(FlaskForm):
title = StringField("Title", validators=[DataRequired(), Length(1, 200)])
content = TextAreaField("Content", validators=[DataRequired()])
summary = TextAreaField("Summary", validators=[Optional(), Length(0, 500)])
category = StringField("Category", validators=[Optional(), Length(0, 50)])
tags = StringField("Tags (comma-separated)", validators=[Optional()])
status = SelectField(
"Status",
choices=[("draft", "Draft"), ("published", "Published")],
default="draft",
)
is_pinned = BooleanField("Pin this post")
submit = SubmitField("Save")
class CommentForm(FlaskForm):
content = TextAreaField("Comment", validators=[DataRequired(), Length(1, 2000)])
submit = SubmitField("Submit Comment")26.9 Web视图层
26.9.1 认证视图
app/views/auth.py:
from flask import Blueprint, render_template, redirect, url_for, flash, request
from flask_login import login_user, logout_user, current_user
from app.extensions import db
from app.models.user import User
from app.forms.auth import (
LoginForm, RegistrationForm, PasswordResetRequestForm, PasswordResetForm,
)
from app.services.auth import AuthService
from app.services.email import EmailService
from app.utils.decorators import anonymous_required
auth_bp = Blueprint("auth", __name__)
@auth_bp.route("/register", methods=["GET", "POST"])
@anonymous_required
def register():
form = RegistrationForm()
if form.validate_on_submit():
try:
AuthService.register(
username=form.username.data,
email=form.email.data,
password=form.password.data,
)
flash("Registration successful! Please log in.", "success")
return redirect(url_for("auth.login"))
except ValueError as e:
flash(str(e), "danger")
return render_template("auth/register.html", form=form)
@auth_bp.route("/login", methods=["GET", "POST"])
@anonymous_required
def login():
form = LoginForm()
if form.validate_on_submit():
user = AuthService.authenticate(form.username.data, form.password.data)
if user:
login_user(user, remember=form.remember_me.data)
user.update_login_info(request.remote_addr)
db.session.commit()
next_page = request.args.get("next")
flash("Login successful!", "success")
return redirect(next_page or url_for("posts.index"))
flash("Invalid username or password.", "danger")
return render_template("auth/login.html", form=form)
@auth_bp.route("/logout")
def logout():
logout_user()
flash("You have been logged out.", "info")
return redirect(url_for("posts.index"))
@auth_bp.route("/reset-password", methods=["GET", "POST"])
def reset_password_request():
if current_user.is_authenticated:
return redirect(url_for("posts.index"))
form = PasswordResetRequestForm()
if form.validate_on_submit():
user = User.query.filter_by(email=form.email.data).first()
if user:
token = user.get_reset_password_token()
EmailService.send_password_reset_email(user, token)
flash("Check your email for instructions to reset your password.", "info")
return redirect(url_for("auth.login"))
return render_template("auth/reset_password_request.html", form=form)
@auth_bp.route("/reset-password/<token>", methods=["GET", "POST"])
def reset_password(token):
if current_user.is_authenticated:
return redirect(url_for("posts.index"))
user = User.verify_reset_password_token(token)
if not user:
return redirect(url_for("posts.index"))
form = PasswordResetForm()
if form.validate_on_submit():
user.set_password(form.password.data)
db.session.commit()
flash("Your password has been reset.", "success")
return redirect(url_for("auth.login"))
return render_template("auth/reset_password.html", form=form)26.9.2 文章视图
app/views/posts.py:
from flask import Blueprint, render_template, redirect, url_for, flash, request, abort, current_app
from flask_login import login_required, current_user
from app.extensions import db, cache
from app.models.post import Post, Category, Tag
from app.models.comment import Comment
from app.forms.post import PostForm, CommentForm
from app.services.post import PostService
posts_bp = Blueprint("posts", __name__)
@posts_bp.route("/")
@posts_bp.route("/index")
@cache.cached(timeout=60, query_string=True)
def index():
page = request.args.get("page", 1, type=int)
pagination = PostService.get_published_posts(page=page)
categories = Category.query.order_by(Category.sort_order).all()
return render_template(
"posts/index.html",
posts=pagination.items,
pagination=pagination,
categories=categories,
)
@posts_bp.route("/post/<slug>")
def detail(slug):
post = PostService.get_post_by_slug(slug)
if post is None:
abort(404)
if post.status != Post.STATUS_PUBLISHED and post.author != current_user:
abort(404)
post.increment_view()
db.session.commit()
comment_form = CommentForm()
page = request.args.get("page", 1, type=int)
comments = Comment.query.filter_by(
post_id=post.id, is_deleted=False, is_approved=True, parent_id=None,
).order_by(Comment.created_at.desc()).paginate(
page=page, per_page=current_app.config.get("COMMENTS_PER_PAGE", 20),
error_out=False,
)
return render_template(
"posts/detail.html", post=post, comments=comments, comment_form=comment_form,
)
@posts_bp.route("/create", methods=["GET", "POST"])
@login_required
def create():
form = PostForm()
if form.validate_on_submit():
tag_names = [t.strip() for t in form.tags.data.split(",") if t.strip()] if form.tags.data else []
post = PostService.create_post(
user_id=current_user.id, title=form.title.data,
content=form.content.data, summary=form.summary.data,
category_name=form.category.data or None, tag_names=tag_names,
status=form.status.data, is_pinned=form.is_pinned.data,
)
flash("Post created successfully!", "success")
return redirect(url_for("posts.detail", slug=post.slug))
return render_template("posts/create.html", form=form)
@posts_bp.route("/edit/<slug>", methods=["GET", "POST"])
@login_required
def edit(slug):
post = PostService.get_post_by_slug(slug)
if post is None:
abort(404)
if post.author != current_user and not current_user.is_editor:
abort(403)
form = PostForm()
if form.validate_on_submit():
tag_names = [t.strip() for t in form.tags.data.split(",") if t.strip()] if form.tags.data else []
PostService.update_post(
post, title=form.title.data, content=form.content.data,
summary=form.summary.data, category_name=form.category.data or None,
tag_names=tag_names, status=form.status.data, is_pinned=form.is_pinned.data,
)
flash("Post updated successfully!", "success")
return redirect(url_for("posts.detail", slug=post.slug))
form.title.data = post.title
form.content.data = post.content
form.summary.data = post.summary
form.category.data = post.category.name if post.category else ""
form.tags.data = ", ".join(tag.name for tag in post.tags)
form.status.data = post.status
form.is_pinned.data = post.is_pinned
return render_template("posts/edit.html", form=form, post=post)
@posts_bp.route("/delete/<slug>", methods=["POST"])
@login_required
def delete(slug):
post = PostService.get_post_by_slug(slug)
if post is None:
abort(404)
if post.author != current_user and not current_user.is_admin:
abort(403)
post.soft_delete()
db.session.commit()
flash("Post deleted.", "success")
return redirect(url_for("posts.index"))
@posts_bp.route("/search")
def search():
query = request.args.get("q", "").strip()
page = request.args.get("page", 1, type=int)
if not query:
return render_template("posts/search.html", posts=[], query="", pagination=None)
pagination = PostService.search_posts(query, page=page)
return render_template("posts/search.html", posts=pagination.items, pagination=pagination, query=query)
@posts_bp.route("/category/<slug>")
def category(slug):
category = Category.query.filter_by(slug=slug).first_or_404()
page = request.args.get("page", 1, type=int)
pagination = Post.query.filter_by(
category=category, status=Post.STATUS_PUBLISHED, is_deleted=False,
).order_by(Post.published_at.desc()).paginate(
page=page, per_page=current_app.config["POSTS_PER_PAGE"], error_out=False,
)
return render_template("posts/category.html", category=category, posts=pagination.items, pagination=pagination)
@posts_bp.route("/tag/<slug>")
def tag(slug):
tag = Tag.query.filter_by(slug=slug).first_or_404()
page = request.args.get("page", 1, type=int)
pagination = tag.posts.filter_by(
status=Post.STATUS_PUBLISHED, is_deleted=False,
).order_by(Post.published_at.desc()).paginate(
page=page, per_page=current_app.config["POSTS_PER_PAGE"], error_out=False,
)
return render_template("posts/tag.html", tag=tag, posts=pagination.items, pagination=pagination)
@posts_bp.route("/post/<int:post_id>/comment", methods=["POST"])
@login_required
def add_comment(post_id):
post = Post.query.get_or_404(post_id)
form = CommentForm()
if form.validate_on_submit():
comment = Comment(
content=form.content.data, post_id=post.id,
user_id=current_user.id, parent_id=request.form.get("parent_id", type=int),
)
db.session.add(comment)
post.comment_count += 1
db.session.commit()
flash("Comment added!", "success")
return redirect(url_for("posts.detail", slug=post.slug))26.9.3 管理后台视图
app/views/admin.py:
from flask import Blueprint, render_template, redirect, url_for, flash, request
from flask_login import login_required, current_user
from app.extensions import db
from app.models.user import User
from app.models.post import Post
from app.models.comment import Comment
from app.utils.decorators import admin_required
admin_bp = Blueprint("admin", __name__)
@admin_bp.before_request
@login_required
@admin_required
def check_admin():
pass
@admin_bp.route("/dashboard")
def dashboard():
stats = {
"total_users": User.query.filter_by(is_deleted=False).count(),
"total_posts": Post.query.filter_by(is_deleted=False).count(),
"published_posts": Post.query.filter_by(status="published", is_deleted=False).count(),
"draft_posts": Post.query.filter_by(status="draft", is_deleted=False).count(),
"total_comments": Comment.query.filter_by(is_deleted=False).count(),
"pending_comments": Comment.query.filter_by(is_approved=False, is_deleted=False).count(),
}
recent_posts = Post.query.order_by(Post.created_at.desc()).limit(5).all()
recent_comments = Comment.query.order_by(Comment.created_at.desc()).limit(5).all()
return render_template(
"admin/dashboard.html", stats=stats, recent_posts=recent_posts, recent_comments=recent_comments,
)
@admin_bp.route("/users")
def users():
page = request.args.get("page", 1, type=int)
pagination = User.query.filter_by(is_deleted=False).order_by(User.created_at.desc()).paginate(
page=page, per_page=20, error_out=False,
)
return render_template("admin/users.html", users=pagination.items, pagination=pagination)
@admin_bp.route("/users/<int:user_id>/toggle-active", methods=["POST"])
def toggle_user_active(user_id):
user = db.session.get(User, user_id)
if user is None:
flash("User not found.", "danger")
return redirect(url_for("admin.users"))
if user.id == current_user.id:
flash("You cannot deactivate yourself.", "warning")
return redirect(url_for("admin.users"))
user.is_active = not user.is_active
db.session.commit()
status = "activated" if user.is_active else "deactivated"
flash(f"User {user.username} has been {status}.", "success")
return redirect(url_for("admin.users"))
@admin_bp.route("/comments")
def comments():
page = request.args.get("page", 1, type=int)
status_filter = request.args.get("status", "pending")
query = Comment.query.filter_by(is_deleted=False)
if status_filter == "pending":
query = query.filter_by(is_approved=False)
elif status_filter == "approved":
query = query.filter_by(is_approved=True)
pagination = query.order_by(Comment.created_at.desc()).paginate(
page=page, per_page=20, error_out=False,
)
return render_template("admin/comments.html", comments=pagination.items, pagination=pagination, status_filter=status_filter)
@admin_bp.route("/comments/<int:comment_id>/approve", methods=["POST"])
def approve_comment(comment_id):
comment = db.session.get(Comment, comment_id)
if comment:
comment.is_approved = True
db.session.commit()
flash("Comment approved.", "success")
return redirect(url_for("admin.comments"))
@admin_bp.route("/comments/<int:comment_id>/reject", methods=["POST"])
def reject_comment(comment_id):
comment = db.session.get(Comment, comment_id)
if comment:
comment.soft_delete()
db.session.commit()
flash("Comment rejected and removed.", "success")
return redirect(url_for("admin.comments"))26.9.4 错误页面视图
app/views/errors.py:
from flask import Blueprint, render_template
errors_bp = Blueprint("errors", __name__)
@errors_bp.app_errorhandler(404)
def page_not_found(e):
return render_template("errors/404.html"), 404
@errors_bp.app_errorhandler(500)
def internal_error(e):
from app.extensions import db
db.session.rollback()
return render_template("errors/500.html"), 500
@errors_bp.app_errorhandler(403)
def forbidden(e):
return render_template("errors/403.html"), 40326.10 RESTful API层
26.10.1 API错误处理
app/api/errors.py:
from flask import jsonify
from werkzeug.exceptions import HTTPException
class APIError(Exception):
def __init__(self, message: str, status_code: int = 400, payload: dict = None):
super().__init__()
self.message = message
self.status_code = status_code
self.payload = payload or {}
def to_dict(self) -> dict:
result = dict(self.payload)
result["error"] = self.message
result["status"] = self.status_code
return result
class NotFoundError(APIError):
def __init__(self, message: str = "Resource not found"):
super().__init__(message, status_code=404)
class UnauthorizedError(APIError):
def __init__(self, message: str = "Authentication required"):
super().__init__(message, status_code=401)
class ForbiddenError(APIError):
def __init__(self, message: str = "Permission denied"):
super().__init__(message, status_code=403)
class ValidationError(APIError):
def __init__(self, message: str = "Validation error", errors: dict = None):
payload = {"errors": errors} if errors else {}
super().__init__(message, status_code=422, payload=payload)
def register_error_handlers(bp):
@bp.errorhandler(APIError)
def handle_api_error(e):
return jsonify(e.to_dict()), e.status_code
@bp.errorhandler(HTTPException)
def handle_http_error(e):
return jsonify({"error": e.description, "status": e.code}), e.code
@bp.errorhandler(Exception)
def handle_unexpected_error(e):
from flask import current_app
current_app.logger.exception("Unexpected error occurred")
return jsonify({"error": "Internal server error", "status": 500}), 50026.10.2 API认证装饰器
app/utils/decorators.py:
from functools import wraps
from flask import request, redirect, url_for, abort
from flask_login import current_user
from app.services.auth import AuthService
from app.api.errors import UnauthorizedError, ForbiddenError
def anonymous_required(f):
@wraps(f)
def decorated(*args, **kwargs):
if current_user.is_authenticated:
return redirect(url_for("posts.index"))
return f(*args, **kwargs)
return decorated
def admin_required(f):
@wraps(f)
def decorated(*args, **kwargs):
if not current_user.is_authenticated or not current_user.is_admin:
abort(403)
return f(*args, **kwargs)
return decorated
def jwt_required(f):
@wraps(f)
def decorated(*args, **kwargs):
token = _extract_token()
if token is None:
raise UnauthorizedError("Missing authentication token")
payload = AuthService.verify_token(token, token_type="access")
if payload is None:
raise UnauthorizedError("Invalid or expired token")
from app.models.user import User
from app.extensions import db
current_user = db.session.get(User, payload["sub"])
if current_user is None or not current_user.is_active:
raise UnauthorizedError("User not found or inactive")
request.current_user = current_user
return f(*args, **kwargs)
return decorated
def role_required(*roles):
def decorator(f):
@wraps(f)
def decorated(*args, **kwargs):
user = getattr(request, "current_user", None)
if user is None:
raise UnauthorizedError("Authentication required")
if user.role not in roles:
raise ForbiddenError("Insufficient permissions")
return f(*args, **kwargs)
return decorated
return decorator
def _extract_token():
auth_header = request.headers.get("Authorization", "")
if auth_header.startswith("Bearer "):
return auth_header[7:]
return None26.10.3 API序列化与分页
app/utils/serializers.py:
from flask import request, url_for, current_app
def paginate_query(query, serializer_func, endpoint=None, **kwargs):
page = request.args.get("page", 1, type=int)
per_page = request.args.get(
"per_page", current_app.config.get("POSTS_PER_PAGE", 10), type=int,
)
per_page = min(per_page, 100)
pagination = query.paginate(page=page, per_page=per_page, error_out=False)
result = {
"items": [serializer_func(item) for item in pagination.items],
"meta": {
"page": pagination.page,
"per_page": pagination.per_page,
"total": pagination.total,
"pages": pagination.pages,
"has_next": pagination.has_next,
"has_prev": pagination.has_prev,
},
}
links = {}
if endpoint:
if pagination.has_next:
links["next"] = url_for(endpoint, page=pagination.next_num, per_page=per_page, **kwargs, _external=True)
if pagination.has_prev:
links["prev"] = url_for(endpoint, page=pagination.prev_num, per_page=per_page, **kwargs, _external=True)
links["self"] = url_for(endpoint, page=page, per_page=per_page, **kwargs, _external=True)
result["_links"] = links
return result26.10.4 认证API
app/api/v1/auth.py:
from flask import request, jsonify
from app.api.v1 import api_v1_bp
from app.services.auth import AuthService
from app.api.errors import UnauthorizedError, ValidationError
from app.utils.decorators import jwt_required
from app.extensions import db
@api_v1_bp.route("/auth/register", methods=["POST"])
def api_register():
data = request.get_json()
if not data:
raise ValidationError("Request body is required")
required_fields = ["username", "email", "password"]
missing = [f for f in required_fields if not data.get(f)]
if missing:
raise ValidationError(
"Missing required fields",
errors={f: "This field is required" for f in missing},
)
try:
user = AuthService.register(
username=data["username"], email=data["email"], password=data["password"],
)
except ValueError as e:
raise ValidationError(str(e))
tokens = AuthService.generate_tokens(user)
return jsonify({"user": user.to_dict(include_email=True), **tokens}), 201
@api_v1_bp.route("/auth/login", methods=["POST"])
def api_login():
data = request.get_json()
if not data or not data.get("username") or not data.get("password"):
raise ValidationError("Username and password are required")
user = AuthService.authenticate(data["username"], data["password"])
if user is None:
raise UnauthorizedError("Invalid credentials")
user.update_login_info(request.remote_addr)
db.session.commit()
tokens = AuthService.generate_tokens(user)
return jsonify({"user": user.to_dict(include_email=True), **tokens})
@api_v1_bp.route("/auth/refresh", methods=["POST"])
def api_refresh():
data = request.get_json()
if not data or not data.get("refresh_token"):
raise ValidationError("Refresh token is required")
result = AuthService.refresh_access_token(data["refresh_token"])
if result is None:
raise UnauthorizedError("Invalid or expired refresh token")
return jsonify(result)
@api_v1_bp.route("/auth/me", methods=["GET"])
@jwt_required
def api_me():
return jsonify(request.current_user.to_dict(include_email=True))
@api_v1_bp.route("/auth/change-password", methods=["POST"])
@jwt_required
def api_change_password():
data = request.get_json()
if not data or not data.get("old_password") or not data.get("new_password"):
raise ValidationError("Old password and new password are required")
success = AuthService.change_password(
request.current_user, data["old_password"], data["new_password"],
)
if not success:
raise ValidationError("Old password is incorrect")
return jsonify({"message": "Password changed successfully"})26.10.5 文章API
app/api/v1/posts.py:
from flask import request, jsonify
from sqlalchemy import or_
from app.api.v1 import api_v1_bp
from app.models.post import Post, Category, Tag
from app.services.post import PostService
from app.api.errors import NotFoundError, ForbiddenError, ValidationError
from app.utils.decorators import jwt_required
from app.utils.serializers import paginate_query
from app.extensions import db
@api_v1_bp.route("/posts", methods=["GET"])
def api_list_posts():
query = Post.query.filter_by(status=Post.STATUS_PUBLISHED, is_deleted=False)
category_slug = request.args.get("category")
if category_slug:
category = Category.query.filter_by(slug=category_slug).first()
if category:
query = query.filter_by(category_id=category.id)
tag_slug = request.args.get("tag")
if tag_slug:
tag = Tag.query.filter_by(slug=tag_slug).first()
if tag:
query = query.filter(Post.tags.contains(tag))
search = request.args.get("q")
if search:
query = query.filter(or_(Post.title.contains(search), Post.content.contains(search)))
query = query.order_by(Post.is_pinned.desc(), Post.published_at.desc())
return jsonify(paginate_query(query, lambda p: p.to_dict(include_content=False), endpoint="api_v1.api_list_posts"))
@api_v1_bp.route("/posts/<slug>", methods=["GET"])
def api_get_post(slug):
post = Post.query.filter_by(slug=slug, is_deleted=False).first()
if post is None:
raise NotFoundError("Post not found")
if post.status != Post.STATUS_PUBLISHED:
user = getattr(request, "current_user", None)
if user is None or (post.author != user and not user.is_editor):
raise NotFoundError("Post not found")
return jsonify(post.to_dict())
@api_v1_bp.route("/posts", methods=["POST"])
@jwt_required
def api_create_post():
data = request.get_json()
if not data or not data.get("title") or not data.get("content"):
raise ValidationError("Title and content are required")
post = PostService.create_post(
user_id=request.current_user.id, title=data["title"], content=data["content"],
summary=data.get("summary"), category_name=data.get("category"),
tag_names=data.get("tags", []), status=data.get("status", "draft"),
)
return jsonify(post.to_dict()), 201
@api_v1_bp.route("/posts/<slug>", methods=["PUT"])
@jwt_required
def api_update_post(slug):
post = Post.query.filter_by(slug=slug, is_deleted=False).first()
if post is None:
raise NotFoundError("Post not found")
if post.author != request.current_user and not request.current_user.is_editor:
raise ForbiddenError("You can only edit your own posts")
data = request.get_json()
if not data:
raise ValidationError("Request body is required")
post = PostService.update_post(
post, title=data.get("title"), content=data.get("content"),
summary=data.get("summary"), category_name=data.get("category"),
tag_names=data.get("tags"), status=data.get("status"),
)
return jsonify(post.to_dict())
@api_v1_bp.route("/posts/<slug>", methods=["DELETE"])
@jwt_required
def api_delete_post(slug):
post = Post.query.filter_by(slug=slug, is_deleted=False).first()
if post is None:
raise NotFoundError("Post not found")
if post.author != request.current_user and not request.current_user.is_admin:
raise ForbiddenError("You can only delete your own posts")
post.soft_delete()
db.session.commit()
return "", 204
@api_v1_bp.route("/categories", methods=["GET"])
def api_list_categories():
categories = Category.query.order_by(Category.sort_order).all()
return jsonify([c.to_dict() for c in categories])
@api_v1_bp.route("/tags", methods=["GET"])
def api_list_tags():
tags = Tag.query.order_by(Tag.name).all()
return jsonify([t.to_dict() for t in tags])26.10.6 用户与评论API
app/api/v1/users.py:
from flask import jsonify
from app.api.v1 import api_v1_bp
from app.models.user import User
from app.api.errors import NotFoundError
from app.utils.serializers import paginate_query
@api_v1_bp.route("/users", methods=["GET"])
def api_list_users():
query = User.query.filter_by(is_deleted=False).order_by(User.created_at.desc())
return jsonify(paginate_query(query, lambda u: u.to_dict(), endpoint="api_v1.api_list_users"))
@api_v1_bp.route("/users/<username>", methods=["GET"])
def api_get_user(username):
user = User.query.filter_by(username=username, is_deleted=False).first()
if user is None:
raise NotFoundError("User not found")
return jsonify(user.to_dict())app/api/v1/comments.py:
from flask import request, jsonify
from app.api.v1 import api_v1_bp
from app.models.comment import Comment
from app.models.post import Post
from app.api.errors import ValidationError
from app.utils.decorators import jwt_required
from app.extensions import db
@api_v1_bp.route("/posts/<int:post_id>/comments", methods=["GET"])
def api_list_comments(post_id):
Post.query.get_or_404(post_id)
page = request.args.get("page", 1, type=int)
per_page = min(request.args.get("per_page", 20, type=int), 50)
pagination = Comment.query.filter_by(
post_id=post_id, is_deleted=False, is_approved=True, parent_id=None,
).order_by(Comment.created_at.desc()).paginate(
page=page, per_page=per_page, error_out=False,
)
return jsonify({
"items": [c.to_dict(include_replies=True) for c in pagination.items],
"meta": {
"page": pagination.page, "per_page": pagination.per_page,
"total": pagination.total, "pages": pagination.pages,
},
})
@api_v1_bp.route("/posts/<int:post_id>/comments", methods=["POST"])
@jwt_required
def api_create_comment(post_id):
post = Post.query.get_or_404(post_id)
data = request.get_json()
if not data or not data.get("content"):
raise ValidationError("Comment content is required")
comment = Comment(
content=data["content"], post_id=post.id,
user_id=request.current_user.id,
parent_id=data.get("parent_id"),
)
db.session.add(comment)
post.comment_count += 1
db.session.commit()
return jsonify(comment.to_dict()), 20126.10.7 API蓝图注册
app/api/v1/__init__.py:
from flask import Blueprint
from app.api.errors import register_error_handlers
api_v1_bp = Blueprint("api_v1", __name__)
from app.api.v1 import auth, posts, users, comments # noqa: E402, F401
register_error_handlers(api_v1_bp)26.11 安全防护实践
26.11.1 OWASP Top 10 防护
| 威胁 | 防护措施 | 本项目实现 |
|---|---|---|
| A01 访问控制失效 | 基于角色的权限控制,最小权限原则 | role_required 装饰器 |
| A02 密码学失败 | pbkdf2:sha256 哈希,HTTPS强制 | set_password 方法 |
| A03 注入 | 参数化查询,输入验证,bleach清洗 | SQLAlchemy ORM + WTForms |
| A04 不安全设计 | 威胁建模,安全默认配置 | ProductionConfig 安全设置 |
| A05 安全配置错误 | 安全Header,错误页面不泄露信息 | Nginx安全Header |
| A06 过时组件 | 依赖定期更新,安全公告监控 | pip audit |
| A07 身份认证失败 | 速率限制,强密码策略,JWT过期 | flask_limiter + JWT |
| A08 数据完整性失败 | CSRF保护,CORS限制 | flask_wtf.csrf |
| A09 日志监控不足 | 结构化日志,异常追踪 | RotatingFileHandler |
| A10 服务端请求伪造 | URL白名单,网络隔离 | 内部服务验证 |
26.11.2 安全Header配置
Nginx安全Header:
add_header X-Content-Type-Options "nosniff" always;
add_header X-Frame-Options "SAMEORIGIN" always;
add_header X-XSS-Protection "1; mode=block" always;
add_header Content-Security-Policy "default-src 'self'; script-src 'self' 'unsafe-inline'; style-src 'self' 'unsafe-inline'; img-src 'self' data:;" always;
add_header Referrer-Policy "strict-origin-when-cross-origin" always;
add_header Permissions-Policy "camera=(), microphone=(), geolocation=()" always;26.12 测试策略
26.12.1 测试Fixtures
tests/conftest.py:
import pytest
from app import create_app
from app.extensions import db as _db
from app.models.user import User, UserRole
class TestUser:
def test_user_creation(self, app, db):
with app.app_context():
user = User(username="testuser", email="test@example.com")
user.set_password("TestPass123!")
db.session.add(user)
db.session.commit()
assert user.id is not None
assert user.check_password("TestPass123!")
class TestPost:
def test_post_creation(self, app, db, sample_user):
with app.app_context():
from app.models.post import Post
post = Post(title="Test Post", content="Content", author_id=sample_user.id)
db.session.add(post)
db.session.commit()
assert post.id is not None
assert post.slug is not None26.13 本章小结
本章通过构建完整的博客系统,系统实践了以下核心知识:
- 项目架构:分层架构设计、蓝图模块化、工厂模式
- 数据模型:用户系统、文章系统、评论系统、分类标签
- 认证授权:Flask-Login会话认证、JWT API认证、权限装饰器
- 视图层:模板继承、表单处理、分页、搜索
- RESTful API:序列化、分页、错误处理、认证
- 安全防护:OWASP Top 10防护、安全Header、输入验证
- 测试策略:模型测试、视图测试、API测试
- 部署运维:Gunicorn、Nginx、Docker、监控
26.14 延伸阅读
26.14.1 Flask高级主题
- Flask官方文档 (https://flask.palletsprojects.com/) — Flask权威指南
- Flask Web Development (Miguel Grinberg) — Flask开发经典
- Architecture Patterns with Python — Python架构模式
26.14.2 安全与性能
- OWASP Top 10 (https://owasp.org/www-project-top-ten/) — Web安全威胁
- Flask Security (https://flask-security.readthedocs.io/) — 安全扩展
- Flask Limiter (https://flask-limiter.readthedocs.io/) — 速率限制
26.14.3 部署与运维
- Gunicorn (https://gunicorn.org/) — WSGI服务器
- Nginx文档 (https://nginx.org/en/docs/) — 反向代理配置
- Docker文档 (https://docs.docker.com/) — 容器化部署
- Prometheus (https://prometheus.io/docs/) — 监控系统
26.14.4 测试与质量
- pytest-flask (https://pytest-flask.readthedocs.io/) — Flask测试工具
- coverage.py (https://coverage.readthedocs.io/) — 代码覆盖率
- locust (https://docs.locust.io/) — 性能测试
from config import TestingConfig
@pytest.fixture(scope="session") def app(): app = create_app(TestingConfig) yield app
@pytest.fixture(scope="function") def db(app): with app.app_context(): _db.create_all() yield _db _db.session.rollback() _db.drop_all()
@pytest.fixture(scope="function") def client(app, db): return app.test_client()
@pytest.fixture(scope="function") def authenticated_client(client, db): user = User(username="testuser", email="test@example.com", role=UserRole.AUTHOR) user.set_password("password123") _db.session.add(user) _db.session.commit()
with client.session_transaction() as session:
session["_user_id"] = user.id
return client
@pytest.fixture def admin_client(client, db): admin = User(username="admin", email="admin@example.com", role=UserRole.ADMIN) admin.set_password("adminpass123") _db.session.add(admin) _db.session.commit()
with client.session_transaction() as session:
session["_user_id"] = admin.id
return client
@pytest.fixture def auth_headers(app, db): from app.services.auth import AuthService
user = User(username="apiuser", email="api@example.com", role=UserRole.AUTHOR)
user.set_password("password123")
_db.session.add(user)
_db.session.commit()
tokens = AuthService.generate_tokens(user)
return {"Authorization": f"Bearer {tokens['access_token']}"}
### 26.12.2 测试工厂
`tests/factories.py`:
```python
import factory
from app.models.user import User, UserRole
from app.models.post import Post, Category, Tag
from app.models.comment import Comment
from app.extensions import db
class UserFactory(factory.alchemy.SQLAlchemyModelFactory):
class Meta:
model = User
sqlalchemy_session = db.session
sqlalchemy_session_persistence = "commit"
username = factory.Sequence(lambda n: f"user{n}")
email = factory.LazyAttribute(lambda obj: f"{obj.username}@example.com")
role = UserRole.READER
is_active = True
@factory.post_generation
def password(obj, create, extracted, **kwargs):
if extracted:
obj.set_password(extracted)
else:
obj.set_password("defaultpassword")
class CategoryFactory(factory.alchemy.SQLAlchemyModelFactory):
class Meta:
model = Category
sqlalchemy_session = db.session
sqlalchemy_session_persistence = "commit"
name = factory.Sequence(lambda n: f"Category {n}")
slug = factory.LazyAttribute(lambda obj: obj.name.lower().replace(" ", "-"))
class PostFactory(factory.alchemy.SQLAlchemyModelFactory):
class Meta:
model = Post
sqlalchemy_session = db.session
sqlalchemy_session_persistence = "commit"
title = factory.Sequence(lambda n: f"Post Title {n}")
slug = factory.LazyAttribute(lambda obj: obj.title.lower().replace(" ", "-"))
content = factory.Faker("text")
status = Post.STATUS_PUBLISHED
author = factory.SubFactory(UserFactory, role=UserRole.AUTHOR)
category = factory.SubFactory(CategoryFactory)
class CommentFactory(factory.alchemy.SQLAlchemyModelFactory):
class Meta:
model = Comment
sqlalchemy_session = db.session
sqlalchemy_session_persistence = "commit"
content = factory.Faker("text")
author = factory.SubFactory(UserFactory)
post = factory.SubFactory(PostFactory)
is_approved = True26.12.3 单元测试
tests/unit/test_models.py:
import pytest
from app.models.user import User, UserRole
from app.models.post import Post
from app.models.comment import Comment
class TestUserModel:
def test_set_and_check_password(self, app, db):
with app.app_context():
user = User(username="test", email="test@example.com")
user.set_password("securepassword")
assert user.check_password("securepassword") is True
assert user.check_password("wrongpassword") is False
def test_role_properties(self, app, db):
with app.app_context():
admin = User(username="admin", email="admin@example.com", role=UserRole.ADMIN)
assert admin.is_admin is True
assert admin.is_editor is True
assert admin.is_author is True
reader = User(username="reader", email="reader@example.com", role=UserRole.READER)
assert reader.is_admin is False
assert reader.is_editor is False
assert reader.is_author is False
def test_follow_unfollow(self, app, db):
with app.app_context():
user1 = User(username="user1", email="user1@example.com")
user1.set_password("password")
user2 = User(username="user2", email="user2@example.com")
user2.set_password("password")
db.session.add_all([user1, user2])
db.session.commit()
user1.follow(user2)
assert user1.is_following(user2) is True
assert user2.is_following(user1) is False
user1.unfollow(user2)
assert user1.is_following(user2) is False
class TestPostModel:
def test_publish(self, app, db):
with app.app_context():
user = User(username="author", email="author@example.com", role=UserRole.AUTHOR)
user.set_password("password")
db.session.add(user)
db.session.commit()
post = Post(title="Test", slug="test", content="Content", user_id=user.id)
db.session.add(post)
db.session.commit()
assert post.status == Post.STATUS_DRAFT
assert post.published_at is None
post.publish()
assert post.status == Post.STATUS_PUBLISHED
assert post.published_at is not None
def test_soft_delete(self, app, db):
with app.app_context():
user = User(username="author2", email="author2@example.com", role=UserRole.AUTHOR)
user.set_password("password")
db.session.add(user)
db.session.commit()
post = Post(title="Delete Test", slug="delete-test", content="Content", user_id=user.id)
db.session.add(post)
db.session.commit()
post.soft_delete()
assert post.is_deleted is True
assert post.deleted_at is not None26.12.4 API集成测试
tests/integration/test_api.py:
import json
import pytest
class TestAuthAPI:
def test_register(self, client, app):
with app.app_context():
response = client.post("/api/v1/auth/register", json={
"username": "newuser",
"email": "new@example.com",
"password": "securepass123",
})
assert response.status_code == 201
data = response.get_json()
assert "access_token" in data
assert "refresh_token" in data
assert data["user"]["username"] == "newuser"
def test_register_duplicate_username(self, client, app, db):
with app.app_context():
from app.models.user import User
user = User(username="existing", email="existing@example.com")
user.set_password("password")
db.session.add(user)
db.session.commit()
response = client.post("/api/v1/auth/register", json={
"username": "existing",
"email": "another@example.com",
"password": "securepass123",
})
assert response.status_code == 422
def test_login(self, client, app, db):
with app.app_context():
from app.models.user import User
user = User(username="loginuser", email="login@example.com")
user.set_password("password123")
db.session.add(user)
db.session.commit()
response = client.post("/api/v1/auth/login", json={
"username": "loginuser",
"password": "password123",
})
assert response.status_code == 200
data = response.get_json()
assert "access_token" in data
def test_login_invalid_credentials(self, client, app, db):
with app.app_context():
response = client.post("/api/v1/auth/login", json={
"username": "nonexistent",
"password": "wrongpassword",
})
assert response.status_code == 401
class TestPostsAPI:
def test_list_posts(self, client, app, db):
with app.app_context():
response = client.get("/api/v1/posts")
assert response.status_code == 200
data = response.get_json()
assert "items" in data
assert "meta" in data
def test_create_post(self, client, app, db, auth_headers):
with app.app_context():
response = client.post("/api/v1/posts", json={
"title": "API Test Post",
"content": "This is a test post created via API.",
"status": "published",
}, headers=auth_headers)
assert response.status_code == 201
data = response.get_json()
assert data["title"] == "API Test Post"
def test_create_post_unauthorized(self, client, app, db):
with app.app_context():
response = client.post("/api/v1/posts", json={
"title": "Unauthorized Post",
"content": "Should fail.",
})
assert response.status_code == 401
def test_delete_post(self, client, app, db, auth_headers):
with app.app_context():
create_resp = client.post("/api/v1/posts", json={
"title": "To Delete", "content": "Will be deleted", "status": "published",
}, headers=auth_headers)
slug = create_resp.get_json()["slug"]
response = client.delete(f"/api/v1/posts/{slug}", headers=auth_headers)
assert response.status_code == 20426.13 生产部署
26.13.1 Docker容器化
Dockerfile:
FROM python:3.12-slim AS builder
WORKDIR /app
RUN pip install --no-cache-dir poetry
COPY pyproject.toml poetry.lock ./
RUN poetry config virtualenvs.create false && poetry install --no-dev --no-interaction --no-ansi
FROM python:3.12-slim
WORKDIR /app
RUN addgroup --system appgroup && adduser --system --ingroup appgroup appuser
COPY --from=builder /usr/local/lib/python3.12/site-packages /usr/local/lib/python3.12/site-packages
COPY --from=builder /usr/local/bin /usr/local/bin
COPY . .
RUN mkdir -p instance app/static/uploads && chown -R appuser:appgroup /app
USER appuser
EXPOSE 8000
CMD ["gunicorn", "--bind", "0.0.0.0:8000", "--workers", "4", "--threads", "2", "--timeout", "120", "--access-logfile", "-", "app:create_app()"]26.13.2 Docker Compose编排
docker-compose.yml:
version: "3.8"
services:
web:
build: .
ports:
- "8000:8000"
environment:
- FLASK_ENV=production
- SECRET_KEY=${SECRET_KEY}
- JWT_SECRET_KEY=${JWT_SECRET_KEY}
- DATABASE_URL=postgresql://flaskblog:${DB_PASSWORD}@db:5432/flaskblog
- REDIS_URL=redis://redis:6379/0
depends_on:
db:
condition: service_healthy
redis:
condition: service_started
restart: unless-stopped
db:
image: postgres:16-alpine
volumes:
- postgres_data:/var/lib/postgresql/data
environment:
- POSTGRES_DB=flaskblog
- POSTGRES_USER=flaskblog
- POSTGRES_PASSWORD=${DB_PASSWORD}
healthcheck:
test: ["CMD-SHELL", "pg_isready -U flaskblog"]
interval: 5s
timeout: 5s
retries: 5
restart: unless-stopped
redis:
image: redis:7-alpine
volumes:
- redis_data:/data
restart: unless-stopped
celery_worker:
build: .
command: celery -A celery_app.celery worker --loglevel=info --concurrency=2
environment:
- FLASK_ENV=production
- SECRET_KEY=${SECRET_KEY}
- DATABASE_URL=postgresql://flaskblog:${DB_PASSWORD}@db:5432/flaskblog
- REDIS_URL=redis://redis:6379/0
depends_on:
- db
- redis
restart: unless-stopped
nginx:
image: nginx:alpine
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx.conf:/etc/nginx/conf.d/default.conf:ro
- ./app/static:/usr/share/nginx/static:ro
depends_on:
- web
restart: unless-stopped
volumes:
postgres_data:
redis_data:26.13.3 Nginx反向代理
nginx.conf:
upstream flask_app {
server web:8000;
}
server {
listen 80;
server_name example.com;
return 301 https://$server_name$request_uri;
}
server {
listen 443 ssl http2;
server_name example.com;
ssl_certificate /etc/nginx/ssl/cert.pem;
ssl_certificate_key /etc/nginx/ssl/key.pem;
ssl_protocols TLSv1.2 TLSv1.3;
ssl_ciphers HIGH:!aNULL:!MD5;
client_max_body_size 16M;
add_header X-Content-Type-Options "nosniff" always;
add_header X-Frame-Options "SAMEORIGIN" always;
add_header X-XSS-Protection "1; mode=block" always;
add_header Referrer-Policy "strict-origin-when-cross-origin" always;
location /static/ {
alias /usr/share/nginx/static/;
expires 30d;
add_header Cache-Control "public, immutable";
}
location / {
proxy_pass http://flask_app;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_redirect off;
}
}26.13.4 Celery异步任务
celery_app.py:
from celery import Celery
from config import BaseConfig
def make_celery(app):
celery = Celery(
app.import_name,
broker=app.config["CELERY_BROKER_URL"],
backend=app.config["CELERY_RESULT_BACKEND"],
)
celery.config_from_object({
"task_serializer": "json",
"accept_content": ["json"],
"result_serializer": "json",
"timezone": "UTC",
"enable_utc": True,
"task_track_started": True,
"worker_prefetch_multiplier": 1,
"task_acks_late": True,
})
class ContextTask(celery.Task):
def __call__(self, *args, **kwargs):
with app.app_context():
return self.run(*args, **kwargs)
celery.Task = ContextTask
return celery
from app import create_app # noqa: E402
flask_app = create_app()
celery = make_celery(flask_app)
@celery.task(bind=True, max_retries=3)
def send_email_task(self, to, subject, template, **kwargs):
from app.services.email import EmailService
try:
EmailService.send_email(to=to, subject=subject, template=template, **kwargs)
except Exception as exc:
raise self.retry(exc=exc, countdown=60)
@celery.task
def update_post_stats():
from app.extensions import db
from app.models.post import Post
with flask_app.app_context():
posts = Post.query.filter_by(status="published", is_deleted=False).all()
for post in posts:
post.comment_count = post.comments.filter_by(is_deleted=False, is_approved=True).count()
db.session.commit()26.14 性能优化
26.14.1 数据库查询优化
| 优化策略 | 实现方式 | 效果 |
|---|---|---|
| 索引优化 | 复合索引覆盖高频查询 | 减少全表扫描 |
| 查询优化 | joinedload/subqueryload 预加载 | 消除N+1查询 |
| 分页优化 | 基于游标的分页替代 OFFSET | 大数据集性能稳定 |
| 连接池 | pool_size + pool_pre_ping | 减少连接建立开销 |
| 慢查询日志 | SQLALCHEMY_RECORD_QUERIES | 定位性能瓶颈 |
预加载示例:
from sqlalchemy.orm import joinedload
posts = Post.query.options(
joinedload(Post.author),
joinedload(Post.category),
joinedload(Post.tags),
).filter_by(status="published").all()26.14.2 缓存策略
from app.extensions import cache
@cache.cached(timeout=300, key_prefix="post_list")
def get_cached_posts(page=1):
return PostService.get_published_posts(page=page)
@cache.memoize(timeout=600)
def get_cached_post(slug):
return PostService.get_post_by_slug(slug)
def invalidate_post_cache(slug=None):
cache.delete("post_list")
if slug:
cache.delete_memoized(get_cached_post, slug)26.14.3 Gunicorn调优
gunicorn \
--bind 0.0.0.0:8000 \
--workers 4 \
--threads 2 \
--worker-class gthread \
--timeout 120 \
--graceful-timeout 30 \
--max-requests 1000 \
--max-requests-jitter 50 \
--access-logfile - \
--error-logfile - \
"app:create_app()"Workers数量推荐公式:(2 × CPU核心数) + 1
26.15 前沿技术动态
26.15.1 异步Web框架
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
price: float
@app.post("/items/")
async def create_item(item: Item):
return {"item": item, "status": "created"}
@app.get("/items/{item_id}")
async def read_item(item_id: int):
return {"item_id": item_id}26.15.2 现代前端集成
from flask import Flask, send_from_directory
app = Flask(__name__, static_folder='../frontend/dist')
@app.route('/', defaults={'path': ''})
@app.route('/<path:path>')
def serve_spa(path):
if path != "" and os.path.exists(app.static_folder + '/' + path):
return send_from_directory(app.static_folder, path)
return send_from_directory(app.static_folder, 'index.html')26.15.3 容器化部署
FROM python:3.12-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 8000
CMD ["gunicorn", "-w", "4", "-b", "0.0.0.0:8000", "app:create_app()"]26.15.4 现代ORM实践
from sqlalchemy.orm import Mapped, mapped_column
from sqlalchemy import String
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(100))
email: Mapped[str] = mapped_column(String(255), unique=True)26.16 本章小结
本章从零构建了一个生产级博客平台,系统覆盖了以下核心实践:
- 架构设计:采用分层架构(表示层→业务逻辑层→数据访问层→基础设施层),结合DDD概念组织领域模型
- 配置管理:多环境配置(开发/测试/生产),敏感信息通过环境变量注入,生产环境强制安全Cookie
- 数据模型:运用SQLAlchemy Mixin模式实现时间戳、软删除、Slug等通用能力,支持复杂关系(多对多、自引用)
- 认证授权:双轨认证体系(Web端Session + API端JWT),基于角色的访问控制,密码重置Token机制
- RESTful API:版本化API(/api/v1),统一错误处理,HATEOAS分页链接,JWT Bearer认证
- 安全防护:CSRF保护、XSS清洗(bleach)、CORS限制、速率限制、安全Header
- 测试策略:测试金字塔(单元→集成→端到端),Factory Boy数据工厂,pytest fixtures管理
- 生产部署:Docker多阶段构建,Docker Compose编排(Web + DB + Redis + Celery + Nginx),SSL终止
26.17 扩展练习
基础练习
- 为用户模型添加头像上传功能,支持图片裁剪与缩略图生成
- 实现文章草稿自动保存功能(前端定时保存 + 后端API)
- 添加文章点赞功能,要求使用Redis缓存计数
进阶练习
- 实现OAuth2第三方登录(GitHub/Google),集成Authlib库
- 构建WebSocket实时通知系统(Flask-SocketIO),实现评论实时推送
- 实现基于PostgreSQL全文搜索的搜索引擎,支持中文分词
高级练习
- 设计并实现API速率限制的滑动窗口算法,替代固定窗口
- 构建完整的CI/CD流水线(GitHub Actions),包含自动测试、构建、部署
- 实现基于Redis的分布式锁,确保并发场景下数据一致性
- 设计微服务拆分方案,将用户服务、文章服务、通知服务独立部署
下一章:第27章 实战:数据分析