第27章 实战:数据分析工程
学习目标
完成本章学习后,你将能够:
- 理解数据分析工程的全生命周期:从数据采集、清洗、转换、分析到可视化的完整流程
- 掌握 Pandas 高级操作:包括多层索引、窗口函数、管道操作、性能优化
- 运用 NumPy 进行高效数值计算:广播机制、向量化运算、线性代数操作
- 构建可复用的数据分析框架:基于管道模式的数据处理架构
- 实现生产级数据质量保障:数据验证、异常检测、血缘追踪
- 掌握高级可视化技术:统计图表、交互式可视化、可视化最佳实践
- 理解数据分析的统计学基础:假设检验、回归分析、时间序列分解
- 构建自动化分析报告系统:程序化报告生成与调度
27.1 数据分析工程概述
27.1.1 从数据分析到数据分析工程
传统数据分析侧重于探索性分析和洞察发现,而数据分析工程(Data Analytics Engineering)则将软件工程的最佳实践引入数据分析流程,强调:
- 可重现性(Reproducibility):任何分析结果都应可重现
- 可测试性(Testability):数据转换逻辑应有自动化测试保障
- 可维护性(Maintainability):代码结构清晰、模块化、文档完善
- 可扩展性(Scalability):从原型到生产的平滑过渡
- 可观测性(Observability):数据处理流程的状态可监控、可追踪
27.1.2 数据分析项目生命周期
┌─────────────────────────────────────────────────────────────────────┐
│ 数据分析项目生命周期 │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ 问题定义 │───►│ 数据采集 │───►│ 数据清洗 │───►│ 数据转换 │ │
│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │
│ ▲ │ │
│ │ ┌──────────┐ ┌──────────┐ │ │
│ └──────────────│ 决策支持 │◄───│ 可视化 │◄────────┘ │
│ └──────────┘ └──────────┘ │
│ ▲ │
│ │ │
│ ┌──────────┐ │
│ │ 统计建模 │ │
│ └──────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘27.1.3 技术栈选型
| 层次 | 工具 | 用途 |
|---|---|---|
| 数值计算 | NumPy | 高性能数组运算、线性代数 |
| 数据处理 | Pandas | 结构化数据处理、时序分析 |
| 可视化 | Matplotlib / Seaborn / Plotly | 静态图表 / 统计图表 / 交互式图表 |
| 统计建模 | SciPy / Statsmodels | 假设检验、回归分析 |
| 数据验证 | Pandera / Pydantic | Schema 验证、数据契约 |
| 任务调度 | APScheduler / Prefect | 定时任务、工作流编排 |
| 报告生成 | Jupyter / WeasyPrint | 交互式分析 / PDF 报告 |
27.2 项目架构设计
27.2.1 项目结构
data_analytics/
├── pyproject.toml
├── README.md
├── .env.example
├── configs/
│ ├── base.yaml
│ ├── development.yaml
│ └── production.yaml
├── data/
│ ├── raw/ # 原始数据(只读)
│ ├── interim/ # 中间处理数据
│ ├── processed/ # 处理后数据
│ └── external/ # 外部数据源
├── src/
│ └── analytics/
│ ├── __init__.py
│ ├── config.py # 配置管理
│ ├── models/
│ │ ├── __init__.py
│ │ ├── schemas.py # 数据模型与验证
│ │ └── enums.py # 枚举定义
│ ├── ingestion/
│ │ ├── __init__.py
│ │ ├── base.py # 数据采集基类
│ │ ├── csv_loader.py
│ │ ├── api_client.py
│ │ └── database_loader.py
│ ├── transforms/
│ │ ├── __init__.py
│ │ ├── pipeline.py # 管道框架
│ │ ├── cleaners.py # 清洗转换
│ │ ├── features.py # 特征工程
│ │ └── aggregators.py # 聚合计算
│ ├── analysis/
│ │ ├── __init__.py
│ │ ├── descriptive.py # 描述性统计
│ │ ├── inferential.py # 推断性统计
│ │ └── time_series.py # 时间序列分析
│ ├── visualization/
│ │ ├── __init__.py
│ │ ├── base.py # 可视化基类
│ │ ├── statistical.py # 统计图表
│ │ ├── timeseries.py # 时序图表
│ │ └── themes.py # 主题配置
│ └── reporting/
│ ├── __init__.py
│ ├── generator.py # 报告生成
│ └── templates/ # 报告模板
├── notebooks/
│ ├── 01-exploration.ipynb
│ └── 02-modeling.ipynb
├── tests/
│ ├── conftest.py
│ ├── test_transforms.py
│ ├── test_analysis.py
│ └── test_visualization.py
└── scripts/
├── run_pipeline.py
└── generate_report.py27.2.2 配置管理
from __future__ import annotations
import os
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
import yaml
@dataclass(frozen=True)
class DatabaseConfig:
host: str = "localhost"
port: int = 5432
name: str = "analytics"
user: str = "analytics"
password: str = ""
@property
def url(self) -> str:
return f"postgresql://{self.user}:{self.password}@{self.host}:{self.port}/{self.name}"
@dataclass(frozen=True)
class CacheConfig:
backend: str = "redis"
host: str = "localhost"
port: int = 6379
db: int = 0
ttl: int = 3600
@property
def url(self) -> str:
return f"redis://{self.host}:{self.port}/{self.db}"
@dataclass(frozen=True)
class LoggingConfig:
level: str = "INFO"
format: str = "%(asctime)s [%(levelname)s] %(name)s: %(message)s"
file: str | None = None
@dataclass(frozen=True)
class AppConfig:
env: str = "development"
base_dir: Path = field(default_factory=lambda: Path(__file__).resolve().parent.parent.parent)
database: DatabaseConfig = field(default_factory=DatabaseConfig)
cache: CacheConfig = field(default_factory=CacheConfig)
logging: LoggingConfig = field(default_factory=LoggingConfig)
data_dir: Path = field(default_factory=lambda: Path("data"))
def __post_init__(self):
object.__setattr__(self, "data_dir", self.base_dir / self.data_dir)
object.__setattr__(self, "data_dir_raw", self.data_dir / "raw")
object.__setattr__(self, "data_dir_interim", self.data_dir / "interim")
object.__setattr__(self, "data_dir_processed", self.data_dir / "processed")
class ConfigLoader:
_instance: AppConfig | None = None
@classmethod
def load(cls, env: str | None = None) -> AppConfig:
if cls._instance is not None:
return cls._instance
env = env or os.environ.get("APP_ENV", "development")
base_dir = Path(__file__).resolve().parent.parent.parent
base_config = cls._load_yaml(base_dir / "configs" / "base.yaml")
env_config = cls._load_yaml(base_dir / "configs" / f"{env}.yaml")
merged = cls._deep_merge(base_config, env_config)
merged = cls._resolve_env_vars(merged)
cls._instance = cls._dict_to_config(merged, env)
return cls._instance
@classmethod
def _load_yaml(cls, path: Path) -> dict:
if not path.exists():
return {}
with open(path, encoding="utf-8") as f:
return yaml.safe_load(f) or {}
@classmethod
def _deep_merge(cls, base: dict, override: dict) -> dict:
result = base.copy()
for key, value in override.items():
if key in result and isinstance(result[key], dict) and isinstance(value, dict):
result[key] = cls._deep_merge(result[key], value)
else:
result[key] = value
return result
@classmethod
def _resolve_env_vars(cls, config: dict) -> dict:
import re
result = {}
for key, value in config.items():
if isinstance(value, dict):
result[key] = cls._resolve_env_vars(value)
elif isinstance(value, str):
result[key] = re.sub(
r"\$\{(\w+)(?::([^}]*))?\}",
lambda m: os.environ.get(m.group(1), m.group(2) or ""),
value,
)
else:
result[key] = value
return result
@classmethod
def _dict_to_config(cls, d: dict, env: str) -> AppConfig:
db_cfg = DatabaseConfig(**d.get("database", {}))
cache_cfg = CacheConfig(**d.get("cache", {}))
log_cfg = LoggingConfig(**d.get("logging", {}))
return AppConfig(
env=env,
database=db_cfg,
cache=cache_cfg,
logging=log_cfg,
)27.3 数据模型与验证
27.3.1 类型枚举
from enum import Enum
class ProductCategory(str, Enum):
ELECTRONICS = "Electronics"
CLOTHING = "Clothing"
FOOD = "Food"
BOOKS = "Books"
SPORTS = "Sports"
HOME = "Home"
HEALTH = "Health"
class Region(str, Enum):
NORTH = "North"
SOUTH = "South"
EAST = "East"
WEST = "West"
CENTRAL = "Central"
class PaymentMethod(str, Enum):
CREDIT_CARD = "Credit Card"
DEBIT_CARD = "Debit Card"
CASH = "Cash"
BANK_TRANSFER = "Bank Transfer"
DIGITAL_WALLET = "Digital Wallet"
class OrderStatus(str, Enum):
PENDING = "Pending"
CONFIRMED = "Confirmed"
SHIPPED = "Shipped"
DELIVERED = "Delivered"
CANCELLED = "Cancelled"
RETURNED = "Returned"
class DataQualityLevel(str, Enum):
VALID = "valid"
WARNING = "warning"
ERROR = "error"
MISSING = "missing"27.3.2 Schema 验证
使用 Pandera 实现运行时数据验证,确保数据管道中每一步的数据质量:
from __future__ import annotations
import pandera as pa
from pandera import Column, Check, Index
import pandas as pd
from datetime import datetime
class OrderSchema(pa.SchemaModel):
order_id: pa.typing.Series[str] = pa.Field(
str_matches=r"^ORD-\d{8}-\w{4}$",
description="订单唯一标识符,格式: ORD-YYYYMMDD-XXXX",
)
order_date: pa.typing.Series[datetime] = pa.Field(
ge=datetime(2020, 1, 1),
le=datetime.now(),
description="订单日期",
)
customer_id: pa.typing.Series[str] = pa.Field(
str_matches=r"^CUS-\d{5}$",
description="客户唯一标识符",
)
product_id: pa.typing.Series[str] = pa.Field(
str_matches=r"^PRD-\d{4}$",
description="产品唯一标识符",
)
category: pa.typing.Series[str] = pa.Field(
isin=[e.value for e in ProductCategory],
description="产品类别",
)
quantity: pa.typing.Series[int] = pa.Field(
ge=1,
le=1000,
description="购买数量",
)
unit_price: pa.typing.Series[float] = pa.Field(
ge=0.01,
le=100000.0,
description="单价",
)
total_amount: pa.typing.Series[float] = pa.Field(
ge=0.0,
description="订单总金额 = quantity × unit_price",
)
region: pa.typing.Series[str] = pa.Field(
isin=[e.value for e in Region],
description="销售区域",
)
payment_method: pa.typing.Series[str] = pa.Field(
isin=[e.value for e in PaymentMethod],
description="支付方式",
)
status: pa.typing.Series[str] = pa.Field(
isin=[e.value for e in OrderStatus],
description="订单状态",
)
@pa.check("total_amount")
@classmethod
def validate_total_amount(cls, total_amount: pa.typing.Series[float]) -> pa.typing.Series[bool]:
return total_amount >= 0
class Config:
coerce = True
strict = True
ordered = False
class AggregatedSchema(pa.SchemaModel):
period: pa.typing.Series[datetime] = pa.Field(description="统计周期")
category: pa.typing.Series[str] = pa.Field(description="产品类别")
region: pa.typing.Series[str] = pa.Field(description="销售区域")
order_count: pa.typing.Series[int] = pa.Field(ge=0, description="订单数量")
total_revenue: pa.typing.Series[float] = pa.Field(ge=0, description="总收入")
avg_order_value: pa.typing.Series[float] = pa.Field(ge=0, description="平均订单价值")
unique_customers: pa.typing.Series[int] = pa.Field(ge=0, description="唯一客户数")
class Config:
coerce = True
strict = True
def validate_dataframe(df: pd.DataFrame, schema: type[pa.SchemaModel]) -> pd.DataFrame:
try:
validated = schema.validate(df)
return validated
except pa.errors.SchemaError as e:
failure_cases = e.failure_cases
print(f"数据验证失败: {len(failure_cases)} 条记录不合规")
print(f"失败详情:\n{failure_cases}")
raise27.4 数据采集层
27.4.1 采集基类
from __future__ import annotations
import logging
from abc import ABC, abstractmethod
from datetime import datetime
from pathlib import Path
from typing import Any
import pandas as pd
logger = logging.getLogger(__name__)
class IngestionResult:
def __init__(
self,
data: pd.DataFrame,
source: str,
metadata: dict[str, Any] | None = None,
):
self.data = data
self.source = source
self.metadata = metadata or {}
self.ingested_at = datetime.now()
self.row_count = len(data)
self.column_count = len(data.columns)
self.memory_mb = data.memory_usage(deep=True).sum() / (1024 ** 2)
def summary(self) -> dict[str, Any]:
return {
"source": self.source,
"ingested_at": self.ingested_at.isoformat(),
"row_count": self.row_count,
"column_count": self.column_count,
"memory_mb": round(self.memory_mb, 2),
"columns": list(self.data.columns),
"dtypes": {col: str(dtype) for col, dtype in self.data.dtypes.items()},
"null_counts": self.data.isnull().sum().to_dict(),
**self.metadata,
}
class BaseLoader(ABC):
def __init__(self, config: dict[str, Any] | None = None):
self.config = config or {}
self._logger = logging.getLogger(self.__class__.__name__)
@abstractmethod
def load(self, source: str, **kwargs) -> IngestionResult:
...
def _post_load(self, result: IngestionResult) -> IngestionResult:
self._logger.info(
f"Loaded {result.row_count} rows, {result.column_count} columns "
f"from {result.source} ({result.memory_mb:.2f} MB)"
)
return result27.4.2 多源数据加载器
class CSVLoader(BaseLoader):
def load(self, source: str, **kwargs) -> IngestionResult:
path = Path(source)
if not path.exists():
raise FileNotFoundError(f"CSV file not found: {path}")
encoding = self.config.get("encoding", "utf-8")
parse_dates = self.config.get("parse_dates", None)
df = pd.read_csv(
source,
encoding=encoding,
parse_dates=parse_dates,
**kwargs,
)
result = IngestionResult(
data=df,
source=str(path),
metadata={"file_size_mb": path.stat().st_size / (1024 ** 2)},
)
return self._post_load(result)
class APILoader(BaseLoader):
def __init__(self, config: dict[str, Any] | None = None):
super().__init__(config)
self._session = None
@property
def session(self):
if self._session is None:
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
self._session = requests.Session()
retry_strategy = Retry(
total=self.config.get("max_retries", 3),
backoff_factor=self.config.get("backoff_factor", 1.0),
status_forcelist=[429, 500, 502, 503, 504],
)
adapter = HTTPAdapter(max_retries=retry_strategy)
self._session.mount("https://", adapter)
self._session.mount("http://", adapter)
if "api_key" in self.config:
self._session.headers["Authorization"] = f"Bearer {self.config['api_key']}"
return self._session
def load(self, source: str, **kwargs) -> IngestionResult:
params = kwargs.pop("params", None)
headers = kwargs.pop("headers", {})
response = self.session.get(source, params=params, headers=headers, timeout=30)
response.raise_for_status()
data = response.json()
if isinstance(data, dict):
records = data.get("data", data.get("results", data.get("items", [data])))
elif isinstance(data, list):
records = data
else:
raise ValueError(f"Unexpected API response structure: {type(data)}")
df = pd.DataFrame(records)
result = IngestionResult(
data=df,
source=source,
metadata={
"status_code": response.status_code,
"response_time_ms": response.elapsed.total_seconds() * 1000,
},
)
return self._post_load(result)
def load_paginated(
self,
base_url: str,
page_param: str = "page",
page_size_param: str = "per_page",
page_size: int = 100,
max_pages: int | None = None,
**kwargs,
) -> IngestionResult:
all_records = []
page = 1
while True:
if max_pages and page > max_pages:
break
params = kwargs.pop("params", {})
params[page_param] = page
params[page_size_param] = page_size
response = self.session.get(base_url, params=params, timeout=30)
response.raise_for_status()
data = response.json()
if isinstance(data, dict):
records = data.get("data", data.get("results", data.get("items", [])))
else:
records = data
if not records:
break
all_records.extend(records)
self._logger.info(f"Page {page}: loaded {len(records)} records")
if len(records) < page_size:
break
page += 1
df = pd.DataFrame(all_records)
result = IngestionResult(
data=df,
source=base_url,
metadata={"total_pages": page, "total_records": len(all_records)},
)
return self._post_load(result)
class DatabaseLoader(BaseLoader):
def load(self, source: str, **kwargs) -> IngestionResult:
import sqlalchemy
query = kwargs.pop("query", None)
if query is None:
raise ValueError("SQL query is required for DatabaseLoader")
engine = sqlalchemy.create_engine(source, **self.config.get("engine_options", {}))
df = pd.read_sql(query, engine, **kwargs)
engine.dispose()
result = IngestionResult(data=df, source=source, metadata={"query": query})
return self._post_load(result)27.4.3 合成数据生成器
用于开发和测试的高质量合成数据生成:
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
from dataclasses import dataclass
@dataclass(frozen=True)
class DataGenerationConfig:
n_records: int = 10000
start_date: datetime = datetime(2023, 1, 1)
end_date: datetime = datetime(2025, 12, 31)
n_customers: int = 500
n_products: int = 200
seed: int = 42
missing_rate: float = 0.02
duplicate_rate: float = 0.01
outlier_rate: float = 0.03
class SyntheticDataGenerator:
PRODUCT_PRICES = {
ProductCategory.ELECTRONICS: (50, 3000),
ProductCategory.CLOTHING: (15, 500),
ProductCategory.FOOD: (2, 100),
ProductCategory.BOOKS: (10, 150),
ProductCategory.SPORTS: (20, 800),
ProductCategory.HOME: (30, 1500),
ProductCategory.HEALTH: (10, 500),
}
REGION_WEIGHTS = {
Region.NORTH: 0.25,
Region.SOUTH: 0.20,
Region.EAST: 0.25,
Region.WEST: 0.20,
Region.CENTRAL: 0.10,
}
PAYMENT_WEIGHTS = {
PaymentMethod.CREDIT_CARD: 0.40,
PaymentMethod.DEBIT_CARD: 0.25,
PaymentMethod.DIGITAL_WALLET: 0.20,
PaymentMethod.BANK_TRANSFER: 0.10,
PaymentMethod.CASH: 0.05,
}
def __init__(self, config: DataGenerationConfig | None = None):
self.config = config or DataGenerationConfig()
self._rng = np.random.default_rng(self.config.seed)
def generate(self) -> pd.DataFrame:
dates = self._generate_dates()
customers = self._generate_customer_ids()
products, categories, prices = self._generate_products()
quantities = self._generate_quantities()
regions = self._generate_regions()
payments = self._generate_payments()
statuses = self._generate_statuses(dates)
df = pd.DataFrame({
"order_id": self._generate_order_ids(dates),
"order_date": dates,
"customer_id": customers,
"product_id": products,
"category": categories,
"quantity": quantities,
"unit_price": prices,
"total_amount": np.round(quantities * prices, 2),
"region": regions,
"payment_method": payments,
"status": statuses,
})
df = self._inject_quality_issues(df)
df = df.sort_values("order_date").reset_index(drop=True)
return df
def _generate_dates(self) -> pd.DatetimeIndex:
n_days = (self.config.end_date - self.config.start_date).days
days_offset = self._rng.integers(0, n_days, size=self.config.n_records)
hour_weights = np.zeros(24)
hour_weights[9:18] = 3.0
hour_weights[12:14] = 4.0
hour_weights[20:22] = 2.0
hour_weights = hour_weights / hour_weights.sum()
dates = []
for offset in days_offset:
date = self.config.start_date + timedelta(days=int(offset))
hour = self._rng.choice(24, p=hour_weights)
minute = self._rng.integers(0, 60)
dates.append(date.replace(hour=int(hour), minute=int(minute)))
return pd.DatetimeIndex(dates)
def _generate_order_ids(self, dates: pd.DatetimeIndex) -> list[str]:
return [
f"ORD-{d.strftime('%Y%m%d')}-{self._rng.integers(1000, 9999):04d}"
for d in dates
]
def _generate_customer_ids(self) -> list[str]:
customer_pool = [f"CUS-{i:05d}" for i in range(1, self.config.n_customers + 1)]
zipf_weights = 1.0 / np.arange(1, self.config.n_customers + 1) ** 0.8
zipf_weights /= zipf_weights.sum()
return list(self._rng.choice(customer_pool, size=self.config.n_records, p=zipf_weights))
def _generate_products(self) -> tuple[list, list, list]:
product_pool = [f"PRD-{i:04d}" for i in range(1, self.config.n_products + 1)]
products = list(self._rng.choice(product_pool, size=self.config.n_records))
product_category_map = {}
product_price_map = {}
for pid in product_pool:
cat = self._rng.choice(list(ProductCategory))
product_category_map[pid] = cat.value
low, high = self.PRODUCT_PRICES[cat]
product_price_map[pid] = round(float(self._rng.uniform(low, high)), 2)
categories = [product_category_map[p] for p in products]
prices = [product_price_map[p] for p in products]
return products, categories, prices
def _generate_quantities(self) -> np.ndarray:
quantities = self._rng.lognormal(mean=1.0, sigma=0.6, size=self.config.n_records)
return np.clip(np.round(quantities).astype(int), 1, 100)
def _generate_regions(self) -> list[str]:
regions = list(self.REGION_WEIGHTS.keys())
weights = np.array(list(self.REGION_WEIGHTS.values()))
return list(self._rng.choice(regions, size=self.config.n_records, p=weights))
def _generate_payments(self) -> list[str]:
methods = list(self.PAYMENT_WEIGHTS.keys())
weights = np.array(list(self.PAYMENT_WEIGHTS.values()))
return list(self._rng.choice(methods, size=self.config.n_records, p=weights))
def _generate_statuses(self, dates: pd.DatetimeIndex) -> list[str]:
statuses = []
now = datetime.now()
for d in dates:
days_elapsed = (now - d.to_pydatetime()).days
if days_elapsed < 0:
statuses.append(OrderStatus.PENDING.value)
elif days_elapsed < 1:
statuses.append(self._rng.choice(
[OrderStatus.PENDING.value, OrderStatus.CONFIRMED.value],
p=[0.3, 0.7],
))
elif days_elapsed < 3:
statuses.append(self._rng.choice(
[OrderStatus.CONFIRMED.value, OrderStatus.SHIPPED.value],
p=[0.2, 0.8],
))
elif days_elapsed < 7:
statuses.append(self._rng.choice(
[OrderStatus.SHIPPED.value, OrderStatus.DELIVERED.value],
p=[0.3, 0.7],
))
else:
statuses.append(self._rng.choice(
[OrderStatus.DELIVERED.value, OrderStatus.CANCELLED.value, OrderStatus.RETURNED.value],
p=[0.85, 0.10, 0.05],
))
return statuses
def _inject_quality_issues(self, df: pd.DataFrame) -> pd.DataFrame:
n = len(df)
rng = self._rng
n_missing = int(n * self.config.missing_rate)
missing_indices = rng.choice(n, size=n_missing, replace=False)
for idx in missing_indices:
col = rng.choice(["region", "payment_method", "category"])
df.iloc[idx, df.columns.get_loc(col)] = None
n_duplicates = int(n * self.config.duplicate_rate)
dup_indices = rng.choice(n, size=n_duplicates, replace=False)
dup_rows = df.iloc[dup_indices].copy()
df = pd.concat([df, dup_rows], ignore_index=True)
n_outliers = int(n * self.config.outlier_rate)
outlier_indices = rng.choice(n, size=n_outliers, replace=False)
df.loc[outlier_indices, "unit_price"] = df.loc[outlier_indices, "unit_price"] * rng.uniform(5, 20, size=n_outliers)
df.loc[outlier_indices, "quantity"] = (df.loc[outlier_indices, "quantity"] * rng.integers(10, 50, size=n_outliers)).astype(int)
df.loc[outlier_indices, "total_amount"] = df.loc[outlier_indices, "unit_price"] * df.loc[outlier_indices, "quantity"]
return df27.5 数据转换管道
27.5.1 管道框架
基于函数式管道模式构建可组合的数据转换框架:
from __future__ import annotations
import logging
import time
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Any, Callable
import pandas as pd
logger = logging.getLogger(__name__)
@dataclass
class TransformResult:
data: pd.DataFrame
transform_name: str
input_rows: int
output_rows: int
duration_ms: float
metadata: dict[str, Any] = field(default_factory=dict)
@property
def rows_diff(self) -> int:
return self.output_rows - self.input_rows
@property
def rows_diff_pct(self) -> float:
if self.input_rows == 0:
return 0.0
return (self.rows_diff / self.input_rows) * 100
class BaseTransform(ABC):
def __init__(self, name: str | None = None):
self.name = name or self.__class__.__name__
self._logger = logging.getLogger(self.name)
@abstractmethod
def transform(self, df: pd.DataFrame) -> pd.DataFrame:
...
def __call__(self, df: pd.DataFrame) -> pd.DataFrame:
return self.transform(df)
def execute(self, df: pd.DataFrame) -> tuple[pd.DataFrame, TransformResult]:
input_rows = len(df)
start = time.perf_counter()
result_df = self.transform(df)
duration_ms = (time.perf_counter() - start) * 1000
result = TransformResult(
data=result_df,
transform_name=self.name,
input_rows=input_rows,
output_rows=len(result_df),
duration_ms=duration_ms,
)
self._logger.info(
f"{self.name}: {input_rows} → {len(result_df)} rows "
f"({result.rows_diff_pct:+.1f}%) in {duration_ms:.1f}ms"
)
return result_df, result
class Pipeline:
def __init__(self, name: str = "Pipeline"):
self.name = name
self._steps: list[BaseTransform | Callable] = []
self._results: list[TransformResult] = []
def add(self, step: BaseTransform | Callable, name: str | None = None) -> "Pipeline":
if isinstance(step, BaseTransform):
self._steps.append(step)
else:
wrapper = _FunctionTransform(step, name or step.__name__)
self._steps.append(wrapper)
return self
def run(self, df: pd.DataFrame) -> pd.DataFrame:
self._results.clear()
current_df = df.copy()
logger.info(f"Pipeline '{self.name}' starting with {len(current_df)} rows")
for step in self._steps:
if isinstance(step, BaseTransform):
current_df, result = step.execute(current_df)
self._results.append(result)
else:
current_df = step(current_df)
logger.info(f"Pipeline '{self.name}' completed with {len(current_df)} rows")
return current_df
@property
def results(self) -> list[TransformResult]:
return list(self._results)
def summary(self) -> pd.DataFrame:
if not self._sections:
return pd.DataFrame()
return pd.DataFrame([
{
"step": r.transform_name,
"input_rows": r.input_rows,
"output_rows": r.output_rows,
"diff": r.rows_diff,
"diff_pct": f"{r.rows_diff_pct:+.1f}%",
"duration_ms": round(r.duration_ms, 1),
}
for r in self._results
])
class _FunctionTransform(BaseTransform):
def __init__(self, func: Callable, name: str):
super().__init__(name)
self._func = func
def transform(self, df: pd.DataFrame) -> pd.DataFrame:
return self._func(df)27.5.2 清洗转换
class DuplicateRemover(BaseTransform):
def __init__(self, subset: list[str] | None = None, keep: str = "first"):
super().__init__("DuplicateRemover")
self.subset = subset
self.keep = keep
def transform(self, df: pd.DataFrame) -> pd.DataFrame:
return df.drop_duplicates(subset=self.subset, keep=self.keep).reset_index(drop=True)
class MissingValueHandler(BaseTransform):
STRATEGIES = {"drop", "fill_constant", "fill_forward", "fill_backward", "fill_median", "fill_mode"}
def __init__(
self,
strategy: str = "drop",
fill_value: Any = None,
columns: list[str] | None = None,
):
super().__init__("MissingValueHandler")
if strategy not in self.STRATEGIES:
raise ValueError(f"Unknown strategy: {strategy}. Choose from {self.STRATEGIES}")
self.strategy = strategy
self.fill_value = fill_value
self.columns = columns
def transform(self, df: pd.DataFrame) -> pd.DataFrame:
target_cols = self.columns or df.columns.tolist()
if self.strategy == "drop":
return df.dropna(subset=target_cols).reset_index(drop=True)
result = df.copy()
for col in target_cols:
if col not in result.columns:
continue
if self.strategy == "fill_constant":
result[col] = result[col].fillna(self.fill_value)
elif self.strategy == "fill_forward":
result[col] = result[col].ffill()
elif self.strategy == "fill_backward":
result[col] = result[col].bfill()
elif self.strategy == "fill_median":
if pd.api.types.is_numeric_dtype(result[col]):
result[col] = result[col].fillna(result[col].median())
elif self.strategy == "fill_mode":
mode = result[col].mode()
if len(mode) > 0:
result[col] = result[col].fillna(mode.iloc[0])
return result
class OutlierRemover(BaseTransform):
def __init__(
self,
columns: list[str],
method: str = "iqr",
threshold: float = 1.5,
action: str = "remove",
):
super().__init__("OutlierRemover")
self.columns = columns
self.method = method
self.threshold = threshold
self.action = action
def transform(self, df: pd.DataFrame) -> pd.DataFrame:
mask = pd.Series(True, index=df.index)
for col in self.columns:
if col not in df.columns or not pd.api.types.is_numeric_dtype(df[col]):
continue
if self.method == "iqr":
Q1 = df[col].quantile(0.25)
Q3 = df[col].quantile(0.75)
IQR = Q3 - Q1
lower = Q1 - self.threshold * IQR
upper = Q3 + self.threshold * IQR
col_mask = (df[col] >= lower) & (df[col] <= upper)
elif self.method == "zscore":
z_scores = (df[col] - df[col].mean()) / df[col].std()
col_mask = z_scores.abs() <= self.threshold
else:
continue
mask &= col_mask
if self.action == "remove":
return df[mask].reset_index(drop=True)
elif self.action == "clip":
result = df.copy()
for col in self.columns:
if col in result.columns and pd.api.types.is_numeric_dtype(result[col]):
Q1 = result[col].quantile(0.25)
Q3 = result[col].quantile(0.75)
IQR = Q3 - Q1
lower = Q1 - self.threshold * IQR
upper = Q3 + self.threshold * IQR
result[col] = result[col].clip(lower, upper)
return result
return df
class TypeConverter(BaseTransform):
def __init__(self, conversions: dict[str, str | type]):
super().__init__("TypeConverter")
self.conversions = conversions
def transform(self, df: pd.DataFrame) -> pd.DataFrame:
result = df.copy()
for col, dtype in self.conversions.items():
if col not in result.columns:
continue
try:
if dtype == "datetime64[ns]" or dtype is pd.Timestamp:
result[col] = pd.to_datetime(result[col], errors="coerce")
elif dtype == "category":
result[col] = result[col].astype("category")
else:
result[col] = result[col].astype(dtype)
except (ValueError, TypeError) as e:
self._logger.warning(f"Failed to convert column '{col}': {e}")
return result
class DateTimeFeatureExtractor(BaseTransform):
def __init__(self, column: str, prefix: str | None = None, features: list[str] | None = None):
super().__init__("DateTimeFeatureExtractor")
self.column = column
self.prefix = prefix or column
self.features = features or ["year", "month", "day", "weekday", "hour", "is_weekend"]
def transform(self, df: pd.DataFrame) -> pd.DataFrame:
result = df.copy()
dt_col = pd.to_datetime(result[self.column])
feature_map = {
"year": dt_col.dt.year,
"month": dt_col.dt.month,
"day": dt_col.dt.day,
"weekday": dt_col.dt.weekday,
"hour": dt_col.dt.hour,
"quarter": dt_col.dt.quarter,
"day_of_year": dt_col.dt.day_of_year,
"week_of_year": dt_col.dt.isocalendar().week.astype(int),
"is_weekend": dt_col.dt.weekday >= 5,
"is_month_start": dt_col.dt.is_month_start,
"is_month_end": dt_col.dt.is_month_end,
}
for feat in self.features:
if feat in feature_map:
result[f"{self.prefix}_{feat}"] = feature_map[feat]
return result27.5.3 特征工程
class FeatureEngineer(BaseTransform):
def __init__(self):
super().__init__("FeatureEngineer")
def transform(self, df: pd.DataFrame) -> pd.DataFrame:
result = df.copy()
if "unit_price" in result.columns and "quantity" in result.columns:
result["calculated_total"] = result["unit_price"] * result["quantity"]
result["price_tier"] = pd.cut(
result["unit_price"],
bins=[0, 25, 100, 500, float("inf")],
labels=["Budget", "Mid-Range", "Premium", "Luxury"],
)
if "customer_id" in result.columns:
customer_stats = result.groupby("customer_id").agg(
customer_order_count=("order_id", "count"),
customer_total_spent=("total_amount", "sum"),
customer_avg_order_value=("total_amount", "mean"),
customer_first_order=("order_date", "min"),
customer_last_order=("order_date", "max"),
)
customer_stats["customer_days_active"] = (
customer_stats["customer_last_order"] - customer_stats["customer_first_order"]
).dt.days
customer_stats["customer_avg_days_between_orders"] = (
customer_stats["customer_days_active"] / customer_stats["customer_order_count"].clip(lower=1)
)
result = result.merge(customer_stats, on="customer_id", how="left")
if "category" in result.columns and "order_date" in result.columns:
result["order_month"] = result["order_date"].dt.to_period("M")
category_monthly = result.groupby(["category", "order_month"])["total_amount"].sum().reset_index()
category_monthly.columns = ["category", "order_month", "category_monthly_revenue"]
result = result.merge(category_monthly, on=["category", "order_month"], how="left")
result = result.drop(columns=["order_month"])
if "customer_total_spent" in result.columns:
result["customer_segment"] = pd.qcut(
result["customer_total_spent"],
q=4,
labels=["Bronze", "Silver", "Gold", "Platinum"],
duplicates="drop",
)
return result
class RollingFeatureBuilder(BaseTransform):
def __init__(
self,
date_column: str,
group_column: str,
value_column: str,
windows: list[int] | None = None,
):
super().__init__("RollingFeatureBuilder")
self.date_column = date_column
self.group_column = group_column
self.value_column = value_column
self.windows = windows or [7, 14, 30]
def transform(self, df: pd.DataFrame) -> pd.DataFrame:
result = df.copy()
result = result.sort_values([self.group_column, self.date_column])
for window in self.windows:
rolling = result.groupby(self.group_column)[self.value_column].transform(
lambda s: s.rolling(window=window, min_periods=1).mean()
)
result[f"{self.value_column}_rolling_{window}d_mean"] = rolling
rolling_std = result.groupby(self.group_column)[self.value_column].transform(
lambda s: s.rolling(window=window, min_periods=1).std()
)
result[f"{self.value_column}_rolling_{window}d_std"] = rolling_std
return result27.5.4 聚合计算
class Aggregator(BaseTransform):
def __init__(
self,
group_by: list[str],
aggregations: dict[str, list[str | tuple]],
):
super().__init__("Aggregator")
self.group_by = group_by
self.aggregations = aggregations
def transform(self, df: pd.DataFrame) -> pd.DataFrame:
named_aggs = {}
for col, funcs in self.aggregations.items():
for func in funcs:
if isinstance(func, tuple):
name, agg_func = func
named_aggs[f"{col}_{name}"] = pd.NamedAgg(column=col, aggfunc=agg_func)
else:
named_aggs[f"{col}_{func}"] = pd.NamedAgg(column=col, aggfunc=func)
return df.groupby(self.group_by).agg(**named_aggs).reset_index()
class TimeSeriesAggregator(BaseTransform):
def __init__(
self,
date_column: str,
value_columns: list[str],
freq: str = "D",
aggregations: dict[str, str] | None = None,
):
super().__init__("TimeSeriesAggregator")
self.date_column = date_column
self.value_columns = value_columns
self.freq = freq
self.aggregations = aggregations or {"sum": "sum", "mean": "mean", "count": "count"}
def transform(self, df: pd.DataFrame) -> pd.DataFrame:
result = df.copy()
result[self.date_column] = pd.to_datetime(result[self.date_column])
result = result.set_index(self.date_column)
agg_dict = {col: list(self.aggregations.values()) for col in self.value_columns}
resampled = result[self.value_columns].resample(self.freq).agg(agg_dict)
resampled.columns = [
f"{col}_{agg}" for col, agg in resampled.columns
]
return resampled.reset_index()27.6 统计分析层
27.6.1 描述性统计
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from typing import Any
import numpy as np
import pandas as pd
from scipy import stats
logger = logging.getLogger(__name__)
@dataclass
class DistributionProfile:
column: str
count: int
mean: float
std: float
min_val: float
q25: float
median: float
q75: float
max_val: float
skewness: float
kurtosis: float
iqr: float
cv: float
def has_outliers_iqr(self, threshold: float = 1.5) -> bool:
lower = self.q25 - threshold * self.iqr
upper = self.q75 + threshold * self.iqr
return self.min_val < lower or self.max_val > upper
def to_dict(self) -> dict[str, Any]:
return {
"column": self.column,
"count": self.count,
"mean": round(self.mean, 4),
"std": round(self.std, 4),
"min": round(self.min_val, 4),
"q25": round(self.q25, 4),
"median": round(self.median, 4),
"q75": round(self.q75, 4),
"max": round(self.max_val, 4),
"skewness": round(self.skewness, 4),
"kurtosis": round(self.kurtosis, 4),
"iqr": round(self.iqr, 4),
"cv": round(self.cv, 4),
}
class DescriptiveAnalyzer:
def __init__(self, df: pd.DataFrame):
self.df = df
def profile_column(self, column: str) -> DistributionProfile:
series = self.df[column].dropna()
q25, q50, q75 = series.quantile([0.25, 0.5, 0.75])
return DistributionProfile(
column=column,
count=len(series),
mean=float(series.mean()),
std=float(series.std()),
min_val=float(series.min()),
q25=float(q25),
median=float(q50),
q75=float(q75),
max_val=float(series.max()),
skewness=float(series.skew()),
kurtosis=float(series.kurtosis()),
iqr=float(q75 - q25),
cv=float(series.std() / series.mean()) if series.mean() != 0 else float("inf"),
)
def profile_all_numeric(self) -> pd.DataFrame:
numeric_cols = self.df.select_dtypes(include=[np.number]).columns
profiles = [self.profile_column(col).to_dict() for col in numeric_cols]
return pd.DataFrame(profiles).set_index("column")
def correlation_analysis(
self,
method: str = "pearson",
threshold: float = 0.0,
) -> pd.DataFrame:
numeric_df = self.df.select_dtypes(include=[np.number])
corr = numeric_df.corr(method=method)
if threshold > 0:
mask = corr.abs() >= threshold
corr = corr.where(mask)
return corr
def frequency_table(self, column: str, normalize: bool = False) -> pd.DataFrame:
counts = self.df[column].value_counts(normalize=normalize)
cum_pct = counts.cumsum() if normalize else (counts / counts.sum()).cumsum()
return pd.DataFrame({
column: counts.index,
"count": self.df[column].value_counts().values if normalize else counts.values,
"percentage": (counts.values * 100) if normalize else (counts.values / counts.sum() * 100),
"cumulative_pct": cum_pct.values * 100,
}).reset_index(drop=True)
def cross_tabulation(
self,
row_column: str,
col_column: str,
normalize: str = "index",
) -> pd.DataFrame:
ct = pd.crosstab(
self.df[row_column],
self.df[col_column],
normalize=normalize,
)
return (ct * 100).round(2)27.6.2 推断性统计
@dataclass
class HypothesisTestResult:
test_name: str
statistic: float
p_value: float
alpha: float
null_hypothesis: str
alternative_hypothesis: str
@property
def is_significant(self) -> bool:
return self.p_value < self.alpha
@property
def conclusion(self) -> str:
if self.is_significant:
return f"拒绝零假设 (p={self.p_value:.4f} < α={self.alpha}),{self.alternative_hypothesis}"
return f"无法拒绝零假设 (p={self.p_value:.4f} ≥ α={self.alpha}),{self.null_hypothesis}"
def to_dict(self) -> dict[str, Any]:
return {
"test": self.test_name,
"statistic": round(self.statistic, 4),
"p_value": round(self.p_value, 6),
"significant": self.is_significant,
"conclusion": self.conclusion,
}
class InferentialAnalyzer:
def __init__(self, df: pd.DataFrame, alpha: float = 0.05):
self.df = df
self.alpha = alpha
def two_sample_ttest(
self,
column: str,
group_column: str,
group_a: str | int,
group_b: str | int,
equal_var: bool = False,
) -> HypothesisTestResult:
sample_a = self.df[self.df[group_column] == group_a][column].dropna()
sample_b = self.df[self.df[group_column] == group_b][column].dropna()
statistic, p_value = stats.ttest_ind(sample_a, sample_b, equal_var=equal_var)
return HypothesisTestResult(
test_name="Welch's t-test" if not equal_var else "Student's t-test",
statistic=float(statistic),
p_value=float(p_value),
alpha=self.alpha,
null_hypothesis=f"{group_a} 和 {group_b} 在 {column} 上的均值无显著差异",
alternative_hypothesis=f"{group_a} 和 {group_b} 在 {column} 上的均值存在显著差异",
)
def anova(
self,
column: str,
group_column: str,
) -> HypothesisTestResult:
groups = self.df[group_column].unique()
samples = [
self.df[self.df[group_column] == g][column].dropna()
for g in groups
]
statistic, p_value = stats.f_oneway(*samples)
return HypothesisTestResult(
test_name="One-way ANOVA",
statistic=float(statistic),
p_value=float(p_value),
alpha=self.alpha,
null_hypothesis=f"所有组在 {column} 上的均值相等",
alternative_hypothesis=f"至少有一组在 {column} 上的均值与其他组不同",
)
def chi_square_test(
self,
column_a: str,
column_b: str,
) -> HypothesisTestResult:
contingency = pd.crosstab(self.df[column_a], self.df[column_b])
statistic, p_value, dof, expected = stats.chi2_contingency(contingency)
return HypothesisTestResult(
test_name="Chi-square test of independence",
statistic=float(statistic),
p_value=float(p_value),
alpha=self.alpha,
null_hypothesis=f"{column_a} 和 {column_b} 相互独立",
alternative_hypothesis=f"{column_a} 和 {column_b} 存在关联",
)
def normality_test(self, column: str) -> HypothesisTestResult:
sample = self.df[column].dropna()
if len(sample) > 5000:
sample = sample.sample(5000, random_state=42)
statistic, p_value = stats.shapiro(sample)
return HypothesisTestResult(
test_name="Shapiro-Wilk normality test",
statistic=float(statistic),
p_value=float(p_value),
alpha=self.alpha,
null_hypothesis=f"{column} 服从正态分布",
alternative_hypothesis=f"{column} 不服从正态分布",
)27.6.3 时间序列分析
class TimeSeriesAnalyzer:
def __init__(self, df: pd.DataFrame, date_column: str, value_column: str):
self.df = df.copy()
self.df[date_column] = pd.to_datetime(self.df[date_column])
self.df = self.df.sort_values(date_column).set_index(date_column)
self.series = self.df[value_column]
self.date_column = date_column
self.value_column = value_column
def decompose(
self,
period: int | None = None,
model: str = "additive",
) -> dict[str, pd.Series]:
from statsmodels.tsa.seasonal import seasonal_decompose
if period is None:
freq = pd.infer_freq(self.series.index)
if freq:
period_map = {"D": 7, "W": 52, "M": 12, "Q": 4, "Y": 1}
period = period_map.get(freq, 12)
else:
period = 12
decomposition = seasonal_decompose(
self.series,
period=period,
model=model,
extrapolate_trend="freq",
)
return {
"trend": decomposition.trend,
"seasonal": decomposition.seasonal,
"residual": decomposition.resid,
"observed": decomposition.observed,
}
def test_stationarity(self) -> HypothesisTestResult:
from statsmodels.tsa.stattools import adfuller
result = adfuller(self.series.dropna(), autolag="AIC")
return HypothesisTestResult(
test_name="Augmented Dickey-Fuller test",
statistic=result[0],
p_value=result[1],
alpha=0.05,
null_hypothesis="时间序列存在单位根(非平稳)",
alternative_hypothesis="时间序列不存在单位根(平稳)",
)
def compute_autocorrelation(self, lags: int = 40) -> pd.DataFrame:
from statsmodels.tsa.stattools import acf, pacf
acf_values = acf(self.series.dropna(), nlags=lags, fft=True)
pacf_values = pacf(self.series.dropna(), nlags=lags)
return pd.DataFrame({
"lag": range(lags + 1),
"acf": acf_values,
"pacf": pacf_values,
})
def growth_rate_analysis(self, freq: str = "M") -> pd.DataFrame:
resampled = self.series.resample(freq).sum()
growth = resampled.pct_change()
return pd.DataFrame({
"period": resampled.index,
"value": resampled.values,
"growth_rate": growth.values,
"abs_change": resampled.diff().values,
}).set_index("period")
def moving_averages(self, windows: list[int] | None = None) -> pd.DataFrame:
windows = windows or [7, 14, 30, 90]
result = pd.DataFrame({"actual": self.series})
for w in windows:
result[f"ma_{w}"] = self.series.rolling(window=w, min_periods=1).mean()
result[f"ema_{w}"] = self.series.ewm(span=w, adjust=False).mean()
return result27.7 可视化层
27.7.1 可视化基类与主题
from __future__ import annotations
from abc import ABC, abstractmethod
from dataclasses import dataclass
from pathlib import Path
from typing import Any
import matplotlib.pyplot as plt
import matplotlib as mpl
import numpy as np
import pandas as pd
import seaborn as sns
@dataclass(frozen=True)
class ChartTheme:
name: str = "custom"
figure_size: tuple[float, float] = (12, 7)
dpi: int = 150
title_size: int = 16
label_size: int = 12
tick_size: int = 10
palette: str = "husl"
grid_alpha: float = 0.3
background_color: str = "#FAFAFA"
font_family: str = "sans-serif"
def apply(self):
plt.rcParams.update({
"figure.figsize": self.figure_size,
"figure.dpi": self.dpi,
"axes.titlesize": self.title_size,
"axes.labelsize": self.label_size,
"xtick.labelsize": self.tick_size,
"ytick.labelsize": self.tick_size,
"axes.facecolor": self.background_color,
"figure.facecolor": self.background_color,
"font.family": self.font_family,
"axes.grid": True,
"grid.alpha": self.grid_alpha,
"axes.spines.top": False,
"axes.spines.right": False,
})
sns.set_palette(self.palette)
THEME_LIGHT = ChartTheme(name="light")
THEME_DARK = ChartTheme(
name="dark",
background_color="#1E1E1E",
palette="dark",
)
THEME_PUBLICATION = ChartTheme(
name="publication",
figure_size=(8, 5),
dpi=300,
title_size=14,
label_size=11,
tick_size=9,
palette="colorblind",
)
class BaseChart(ABC):
def __init__(self, df: pd.DataFrame, theme: ChartTheme | None = None):
self.df = df
self.theme = theme or THEME_LIGHT
self._fig: plt.Figure | None = None
self._ax: plt.Axes | None = None
def _setup(self, title: str = "", xlabel: str = "", ylabel: str = "") -> tuple[plt.Figure, plt.Axes]:
self.theme.apply()
fig, ax = plt.subplots(figsize=self.theme.figure_size, dpi=self.theme.dpi)
ax.set_title(title, fontsize=self.theme.title_size, fontweight="bold", pad=15)
ax.set_xlabel(xlabel, fontsize=self.theme.label_size)
ax.set_ylabel(ylabel, fontsize=self.theme.label_size)
self._fig = fig
self._ax = ax
return fig, ax
@abstractmethod
def plot(self, **kwargs) -> "BaseChart":
...
def save(self, path: str | Path, **kwargs):
if self._fig is None:
raise RuntimeError("Call plot() before save()")
save_kwargs = {"dpi": self.theme.dpi, "bbox_inches": "tight", "facecolor": self.theme.background_color}
save_kwargs.update(kwargs)
self._fig.savefig(path, **save_kwargs)
plt.close(self._fig)
self._fig = None
self._ax = None
def show(self):
if self._fig is None:
raise RuntimeError("Call plot() before show()")
plt.tight_layout()
plt.show()27.7.2 统计图表
class DistributionChart(BaseChart):
def plot(
self,
column: str,
bins: int = 30,
fit_normal: bool = True,
title: str | None = None,
) -> "DistributionChart":
fig, ax = self._setup(
title=title or f"{column} 的分布",
xlabel=column,
ylabel="频率",
)
data = self.df[column].dropna()
n, bin_edges, patches = ax.hist(
data,
bins=bins,
density=True,
alpha=0.7,
edgecolor="white",
linewidth=0.5,
)
if fit_normal:
mu, sigma = data.mean(), data.std()
x = np.linspace(data.min(), data.max(), 200)
ax.plot(x, stats.norm.pdf(x, mu, sigma), "r-", linewidth=2, label=f"正态拟合 (μ={mu:.2f}, σ={sigma:.2f})")
ax.legend(fontsize=self.theme.tick_size)
mean_val = data.mean()
median_val = data.median()
ax.axvline(mean_val, color="red", linestyle="--", alpha=0.7, label=f"均值: {mean_val:.2f}")
ax.axvline(median_val, color="green", linestyle="--", alpha=0.7, label=f"中位数: {median_val:.2f}")
ax.legend(fontsize=self.theme.tick_size)
return self
class BoxPlotChart(BaseChart):
def plot(
self,
value_column: str,
group_column: str | None = None,
title: str | None = None,
show_points: bool = True,
) -> "BoxPlotChart":
fig, ax = self._setup(
title=title or f"{value_column} 的箱线图",
xlabel=group_column or "",
ylabel=value_column,
)
if group_column:
sns.boxplot(
data=self.df,
x=group_column,
y=value_column,
ax=ax,
width=0.5,
fliersize=3,
)
if show_points:
sns.stripplot(
data=self.df,
x=group_column,
y=value_column,
ax=ax,
size=3,
alpha=0.3,
color="black",
)
else:
ax.boxplot(self.df[value_column].dropna(), vert=True, widths=0.5)
return self
class CorrelationHeatmapChart(BaseChart):
def plot(
self,
method: str = "pearson",
annot: bool = True,
title: str = "相关性热力图",
columns: list[str] | None = None,
) -> "CorrelationHeatmapChart":
numeric_df = self.df.select_dtypes(include=[np.number])
if columns:
numeric_df = numeric_df[columns]
corr = numeric_df.corr(method=method)
fig, ax = self._setup(title=title)
mask = np.triu(np.ones_like(corr, dtype=bool), k=1)
sns.heatmap(
corr,
mask=mask,
annot=annot,
fmt=".2f",
cmap="RdBu_r",
center=0,
vmin=-1,
vmax=1,
square=True,
linewidths=0.5,
ax=ax,
annot_kws={"size": 8},
)
return self
class GroupComparisonChart(BaseChart):
def plot(
self,
value_column: str,
group_column: str,
chart_type: str = "bar",
aggfunc: str = "mean",
title: str | None = None,
sort_by_value: bool = True,
) -> "GroupComparisonChart":
fig, ax = self._setup(
title=title or f"{value_column} 按 {group_column} 分组 ({aggfunc})",
xlabel=group_column,
ylabel=value_column,
)
if aggfunc == "mean":
grouped = self.df.groupby(group_column)[value_column].mean()
elif aggfunc == "median":
grouped = self.df.groupby(group_column)[value_column].median()
elif aggfunc == "sum":
grouped = self.df.groupby(group_column)[value_column].sum()
else:
grouped = self.df.groupby(group_column)[value_column].agg(aggfunc)
if sort_by_value:
grouped = grouped.sort_values(ascending=False)
if chart_type == "bar":
bars = ax.bar(range(len(grouped)), grouped.values, edgecolor="white", linewidth=0.5)
ax.set_xticks(range(len(grouped)))
ax.set_xticklabels(grouped.index, rotation=45, ha="right")
for bar, val in zip(bars, grouped.values):
ax.text(
bar.get_x() + bar.get_width() / 2,
bar.get_height(),
f"{val:,.2f}",
ha="center",
va="bottom",
fontsize=self.theme.tick_size - 1,
)
elif chart_type == "violin":
sns.violinplot(data=self.df, x=group_column, y=value_column, ax=ax, inner="box")
return self27.7.3 时序图表
class TimeSeriesChart(BaseChart):
def plot(
self,
date_column: str,
value_column: str,
group_column: str | None = None,
freq: str = "D",
aggfunc: str = "sum",
show_trend: bool = True,
show_ma: bool = True,
ma_window: int = 7,
title: str | None = None,
) -> "TimeSeriesChart":
df = self.df.copy()
df[date_column] = pd.to_datetime(df[date_column])
if group_column:
grouped = df.groupby([pd.Grouper(key=date_column, freq=freq), group_column])[value_column]
else:
grouped = df.groupby(pd.Grouper(key=date_column, freq=freq))[value_column]
if aggfunc == "sum":
resampled = grouped.sum().reset_index()
elif aggfunc == "mean":
resampled = grouped.mean().reset_index()
else:
resampled = grouped.agg(aggfunc).reset_index()
fig, ax = self._setup(
title=title or f"{value_column} 时间趋势",
xlabel="日期",
ylabel=value_column,
)
if group_column:
for name, group in resampled.groupby(group_column):
ax.plot(group[date_column], group[value_column], label=str(name), marker="o", markersize=3)
ax.legend(fontsize=self.theme.tick_size)
else:
ax.plot(resampled[date_column], resampled[value_column], linewidth=1.5, label="实际值")
if show_trend:
x_numeric = (resampled[date_column] - resampled[date_column].min()).dt.days
z = np.polyfit(x_numeric, resampled[value_column], 1)
p = np.poly1d(z)
ax.plot(
resampled[date_column],
p(x_numeric),
"r--",
linewidth=2,
alpha=0.7,
label=f"趋势线 (斜率: {z[0]:.2f}/天)",
)
if show_ma:
ma = resampled.set_index(date_column)[value_column].rolling(ma_window, min_periods=1).mean()
ax.plot(
ma.index,
ma.values,
linewidth=2,
alpha=0.8,
label=f"{ma_window}日移动平均",
)
ax.legend(fontsize=self.theme.tick_size)
plt.xticks(rotation=45)
return self
class SeasonalDecompositionChart(BaseChart):
def plot(
self,
date_column: str,
value_column: str,
period: int = 12,
model: str = "additive",
title: str = "时间序列分解",
) -> "SeasonalDecompositionChart":
from statsmodels.tsa.seasonal import seasonal_decompose
df = self.df.copy()
df[date_column] = pd.to_datetime(df[date_column])
df = df.sort_values(date_column).set_index(date_column)
series = df[value_column]
decomposition = seasonal_decompose(series, period=period, model=model, extrapolate_trend="freq")
fig, axes = plt.subplots(4, 1, figsize=self.theme.figure_size, sharex=True)
fig.suptitle(title, fontsize=self.theme.title_size, fontweight="bold")
components = [
("观测值", decomposition.observed),
("趋势", decomposition.trend),
("季节性", decomposition.seasonal),
("残差", decomposition.resid),
]
for ax, (label, component) in zip(axes, components):
ax.plot(component, linewidth=1.2)
ax.set_ylabel(label, fontsize=self.theme.label_size)
ax.grid(True, alpha=self.theme.grid_alpha)
ax.spines["top"].set_visible(False)
ax.spines["right"].set_visible(False)
plt.tight_layout()
self._fig = fig
self._ax = axes[0]
return self27.8 报告生成
27.8.1 报告引擎
from __future__ import annotations
import json
from datetime import datetime
from pathlib import Path
from typing import Any
import pandas as pd
import numpy as np
class AnalysisReport:
def __init__(self, title: str = "数据分析报告"):
self.title = title
self.generated_at = datetime.now()
self._sections: list[dict[str, Any]] = []
def add_markdown(self, content: str, level: int = 2) -> "AnalysisReport":
self._sections.append({"type": "markdown", "content": content, "level": level})
return self
def add_dataframe(
self,
df: pd.DataFrame,
title: str = "",
max_rows: int = 20,
show_index: bool = False,
) -> "AnalysisReport":
self._sections.append({
"type": "dataframe",
"data": df.head(max_rows),
"title": title,
"show_index": show_index,
})
return self
def add_metric_cards(self, metrics: dict[str, Any]) -> "AnalysisReport":
self._sections.append({"type": "metrics", "data": metrics})
return self
def add_chart_reference(self, chart_path: str, title: str = "") -> "AnalysisReport":
self._sections.append({"type": "chart", "path": chart_path, "title": title})
return self
def add_test_results(self, results: list[dict[str, Any]]) -> "AnalysisReport":
self._sections.append({"type": "tests", "data": results})
return self
def to_markdown(self) -> str:
lines = []
lines.append(f"# {self.title}")
lines.append(f"\n> 生成时间: {self.generated_at.strftime('%Y-%m-%d %H:%M:%S')}")
lines.append("")
for section in self._sections:
if section["type"] == "markdown":
prefix = "#" * section.get("level", 2)
lines.append(f"{prefix} {section['content']}")
lines.append("")
elif section["type"] == "dataframe":
if section.get("title"):
lines.append(f"### {section['title']}")
lines.append("")
df = section["data"]
lines.append(df.to_markdown(index=section.get("show_index", False)))
lines.append("")
elif section["type"] == "metrics":
lines.append("### 关键指标")
lines.append("")
for key, value in section["data"].items():
if isinstance(value, float):
lines.append(f"- **{key}**: {value:,.2f}")
elif isinstance(value, int):
lines.append(f"- **{key}**: {value:,}")
else:
lines.append(f"- **{key}**: {value}")
lines.append("")
elif section["type"] == "chart":
if section.get("title"):
lines.append(f"### {section['title']}")
lines.append("")
lines.append(f"")
lines.append("")
elif section["type"] == "tests":
lines.append("### 假设检验结果")
lines.append("")
test_df = pd.DataFrame(section["data"])
lines.append(test_df.to_markdown(index=False))
lines.append("")
return "\n".join(lines)
def save(self, path: str | Path):
path = Path(path)
path.parent.mkdir(parents=True, exist_ok=True)
content = self.to_markdown()
with open(path, "w", encoding="utf-8") as f:
f.write(content)
def to_json(self) -> str:
serializable_sections = []
for section in self._sections:
s = section.copy()
if s["type"] == "dataframe":
s["data"] = s["data"].to_dict(orient="records")
serializable_sections.append(s)
return json.dumps({
"title": self.title,
"generated_at": self.generated_at.isoformat(),
"sections": serializable_sections,
}, ensure_ascii=False, indent=2)27.8.2 报告生成实践
def generate_sales_report(
df: pd.DataFrame,
output_dir: str = "reports",
) -> AnalysisReport:
report = AnalysisReport(title="销售数据分析报告")
report.add_markdown("数据概览", level=2)
report.add_metric_cards({
"总订单数": len(df),
"总销售额": float(df["total_amount"].sum()),
"平均订单金额": float(df["total_amount"].mean()),
"客户数": int(df["customer_id"].nunique()),
"产品数": int(df["product_id"].nunique()),
})
report.add_markdown("销售额分布", level=2)
report.add_dataframe(
df.groupby("category")["total_amount"].agg(["count", "sum", "mean", "median"]).round(2),
title="各品类销售统计",
)
report.add_markdown("区域分析", level=2)
region_stats = df.groupby("region").agg(
订单数=("order_id", "count"),
总销售额=("total_amount", "sum"),
平均订单额=("total_amount", "mean"),
).round(2)
report.add_dataframe(region_stats, title="各区域销售统计")
report.add_markdown("假设检验", level=2)
from scipy import stats as sp_stats
regions = df["region"].unique().tolist()
groups = [df[df["region"] == r]["total_amount"].values for r in regions]
f_stat, p_value = sp_stats.f_oneway(*groups)
report.add_test_results([{
"检验方法": "单因素方差分析 (ANOVA)",
"原假设": "各区域平均销售额无显著差异",
"F统计量": round(f_stat, 4),
"p值": round(p_value, 6),
"结论": "拒绝原假设" if p_value < 0.05 else "无法拒绝原假设",
}])
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
chart_dir = output_path / "charts"
chart_dir.mkdir(exist_ok=True)
dist_chart = DistributionChart(df, theme=THEME_PUBLICATION)
dist_chart.plot(column="total_amount", bins=50)
dist_chart.save(chart_dir / "amount_distribution.png")
report.add_chart_reference("charts/amount_distribution.png", title="订单金额分布")
ts_chart = TimeSeriesChart(df, theme=THEME_PUBLICATION)
ts_chart.plot(date_column="order_date", value_column="total_amount", freq="W", aggfunc="sum")
ts_chart.save(chart_dir / "weekly_trend.png")
report.add_chart_reference("charts/weekly_trend.png", title="周销售额趋势")
corr_chart = CorrelationHeatmapChart(df, theme=THEME_PUBLICATION)
corr_chart.plot(columns=["quantity", "unit_price", "total_amount"])
corr_chart.save(chart_dir / "correlation.png")
report.add_chart_reference("charts/correlation.png", title="变量相关性")
report.save(output_path / "sales_report.md")
json_path = output_path / "sales_report.json"
with open(json_path, "w", encoding="utf-8") as f:
f.write(report.to_json())
return report使用示例:
from ch27.data_acquisition import DataAcquisitionService
from ch27.transforms import Pipeline, FillMissingTransform, RemoveDuplicatesTransform
from ch27.transforms import ComputeDerivedColumnsTransform, FilterTransform
from ch27.validation import OrderSchema
acquisition = DataAcquisitionService(config)
raw_df = acquisition.from_csv("data/orders.csv")
try:
validated_df = OrderSchema.validate(raw_df)
except pa.errors.SchemaError as e:
print(f"数据验证失败: {e.failure_cases}")
validated_df = raw_df
pipeline = Pipeline(name="销售数据预处理")
pipeline.add(RemoveDuplicatesTransform(subset=["order_id"]))
pipeline.add(FillMissingTransform(
strategy={"quantity": 0, "unit_price": 0.0, "total_amount": 0.0},
))
pipeline.add(ComputeDerivedColumnsTransform({
"total_amount": lambda df: df["quantity"] * df["unit_price"],
"order_month": lambda df: pd.to_datetime(df["order_date"]).dt.to_period("M").astype(str),
}))
pipeline.add(FilterTransform(lambda df: df["status"].isin(["completed", "delivered"])))
clean_df = pipeline.run(validated_df)
report = generate_sales_report(clean_df, output_dir="reports/sales")
print(f"报告已生成: {report.title}")
print(f"包含 {len(report._sections)} 个章节")27.8.3 自动化报告管道
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from enum import Enum
import logging
import schedule
import time
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.base import MIMEBase
from email import encoders
logger = logging.getLogger(__name__)
class ReportFormat(Enum):
MARKDOWN = "markdown"
HTML = "html"
JSON = "json"
PDF = "pdf"
@dataclass
class ReportSchedule:
name: str
cron_expression: str
recipients: list[str] = field(default_factory=list)
format: ReportFormat = ReportFormat.MARKDOWN
enabled: bool = True
class BaseReportGenerator(ABC):
def __init__(self, name: str, config: dict | None = None):
self.name = name
self.config = config or {}
self._report: AnalysisReport | None = None
@abstractmethod
def collect_data(self) -> pd.DataFrame:
...
@abstractmethod
def analyze(self, df: pd.DataFrame) -> AnalysisReport:
...
def generate(self) -> AnalysisReport:
logger.info(f"[{self.name}] 开始生成报告")
df = self.collect_data()
logger.info(f"[{self.name}] 数据采集完成: {len(df)} 行")
self._report = self.analyze(df)
logger.info(f"[{self.name}] 分析完成")
return self._report
def save(self, output_dir: str | Path, fmt: ReportFormat = ReportFormat.MARKDOWN) -> Path:
if self._report is None:
raise RuntimeError("请先调用 generate() 生成报告")
output_dir = Path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
if fmt == ReportFormat.MARKDOWN:
path = output_dir / f"{self.name}_{timestamp}.md"
self._report.save(path)
elif fmt == ReportFormat.JSON:
path = output_dir / f"{self.name}_{timestamp}.json"
with open(path, "w", encoding="utf-8") as f:
f.write(self._report.to_json())
elif fmt == ReportFormat.HTML:
path = output_dir / f"{self.name}_{timestamp}.html"
md_content = self._report.to_markdown()
html_content = self._markdown_to_html(md_content)
with open(path, "w", encoding="utf-8") as f:
f.write(html_content)
else:
raise ValueError(f"不支持的格式: {fmt}")
logger.info(f"[{self.name}] 报告已保存: {path}")
return path
@staticmethod
def _markdown_to_html(md_content: str) -> str:
import markdown as md_lib
html_body = md_lib.markdown(
md_content,
extensions=["tables", "fenced_code", "toc", "nl2br"],
)
return f"""<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>数据分析报告</title>
<style>
body {{
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif;
max-width: 900px;
margin: 0 auto;
padding: 20px;
line-height: 1.6;
color: #333;
}}
table {{ border-collapse: collapse; width: 100%; margin: 16px 0; }}
th, td {{ border: 1px solid #ddd; padding: 8px 12px; text-align: left; }}
th {{ background-color: #f5f5f5; font-weight: 600; }}
tr:nth-child(even) {{ background-color: #fafafa; }}
img {{ max-width: 100%; height: auto; }}
blockquote {{ border-left: 4px solid #ddd; padding-left: 16px; color: #666; }}
code {{ background-color: #f5f5f5; padding: 2px 6px; border-radius: 3px; }}
</style>
</head>
<body>
{html_body}
</body>
</html>"""
class SalesReportGenerator(BaseReportGenerator):
def __init__(self, acquisition_service: DataAcquisitionService, pipeline: Pipeline):
super().__init__(name="销售分析报告")
self.acquisition = acquisition_service
self.pipeline = pipeline
def collect_data(self) -> pd.DataFrame:
raw_df = self.acquisition.from_csv(self.config.get("data_path", "data/orders.csv"))
return self.pipeline.run(raw_df)
def analyze(self, df: pd.DataFrame) -> AnalysisReport:
return generate_sales_report(df, output_dir=self.config.get("output_dir", "reports"))
class ReportScheduler:
def __init__(self):
self._generators: dict[str, BaseReportGenerator] = {}
self._schedules: dict[str, ReportSchedule] = {}
def register(self, generator: BaseReportGenerator, schedule_config: ReportSchedule):
self._generators[schedule_config.name] = generator
self._schedules[schedule_config.name] = schedule_config
def run_report(self, name: str) -> Path | None:
if name not in self._generators:
logger.error(f"未注册的报告: {name}")
return None
generator = self._generators[name]
schedule = self._schedules[name]
try:
generator.generate()
path = generator.save(
output_dir=f"reports/{name}",
fmt=schedule.format,
)
if schedule.recipients:
self._send_notification(schedule, path)
return path
except Exception as e:
logger.error(f"报告生成失败 [{name}]: {e}")
return None
def _send_notification(self, schedule: ReportSchedule, report_path: Path):
smtp_config = self.config.get("smtp", {})
if not smtp_config:
logger.warning("SMTP 配置缺失,跳过邮件通知")
return
msg = MIMEMultipart()
msg["Subject"] = f"数据分析报告: {schedule.name}"
msg["From"] = smtp_config.get("from", "reports@example.com")
msg["To"] = ", ".join(schedule.recipients)
body = f"""
报告名称: {schedule.name}
生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
报告路径: {report_path}
此邮件由自动化报告系统发送。
"""
msg.attach(MIMEText(body, "plain", "utf-8"))
with open(report_path, "rb") as f:
part = MIMEBase("application", "octet-stream")
part.set_payload(f.read())
encoders.encode_base64(part)
part.add_header("Content-Disposition", f"attachment; filename={report_path.name}")
msg.attach(part)
with smtplib.SMTP(smtp_config["host"], smtp_config.get("port", 587)) as server:
if smtp_config.get("use_tls", True):
server.starttls()
server.login(smtp_config["user"], smtp_config["password"])
server.send_message(msg)
logger.info(f"通知已发送至: {schedule.recipients}")
def start(self):
for name, schedule in self._schedules.items():
if not schedule.enabled:
continue
parts = schedule.cron_expression.split()
if len(parts) == 1:
schedule.every(int(parts[0])).days.do(self.run_report, name=name)
elif parts[1].lower() == "monday":
schedule.every().monday.do(self.run_report, name=name)
elif parts[1].lower() == "friday":
schedule.every().friday.do(self.run_report, name=name)
logger.info("报告调度器已启动")
while True:
schedule.run_pending()
time.sleep(60)使用示例:
acquisition = DataAcquisitionService(config)
pipeline = Pipeline(name="销售数据预处理")
pipeline.add(RemoveDuplicatesTransform(subset=["order_id"]))
pipeline.add(FillMissingTransform(strategy={"quantity": 0, "unit_price": 0.0}))
sales_generator = SalesReportGenerator(acquisition, pipeline)
sales_generator.config = {
"data_path": "data/orders.csv",
"output_dir": "reports/sales",
}
scheduler = ReportScheduler()
scheduler.register(
sales_generator,
ReportSchedule(
name="weekly_sales",
cron_expression="1 monday",
recipients=["team@example.com"],
format=ReportFormat.HTML,
),
)
scheduler.run_report("weekly_sales")
scheduler.start()27.9 前沿技术动态
27.9.1 现代数据分析库
import polars as pl
df = pl.read_csv("data.csv")
result = df.filter(pl.col("price") > 100).group_by("category").agg(
pl.col("price").mean().alias("avg_price"),
pl.col("quantity").sum().alias("total_qty")
)27.9.2 交互式可视化
import plotly.express as px
import streamlit as st
df = px.data.iris()
fig = px.scatter(df, x="sepal_width", y="sepal_length", color="species")
st.title("Iris Dataset Analysis")
st.plotly_chart(fig)27.9.3 大数据处理框架
import dask.dataframe as dd
ddf = dd.read_parquet("s3://bucket/data/*.parquet")
result = ddf.groupby("category").amount.sum().compute()27.9.4 机器学习集成
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
pipeline = Pipeline([
('scaler', StandardScaler()),
('classifier', RandomForestClassifier())
])
pipeline.fit(X_train, y_train)
predictions = pipeline.predict(X_test)27.10 本章小结
27.10.1 核心知识点回顾
本章系统阐述了数据分析工程的完整体系,从工程化视角重新审视数据分析流程:
1. 数据分析工程化思维
传统数据分析以探索性分析为主,而数据分析工程将软件工程的最佳实践引入分析流程,强调可重现性、可测试性、可维护性、可扩展性和可观测性。这一思维转变是构建生产级数据分析系统的基石。
2. 项目架构设计
采用分层架构(采集层 → 转换层 → 分析层 → 可视化层 → 报告层),配合配置管理、日志系统和异常处理,构建出可维护、可扩展的数据分析项目。配置的集中管理与环境隔离确保了项目在不同环境下的正确运行。
3. 数据模型与验证
使用 Pandera 等工具实现声明式数据验证,将数据质量保障前置到数据入口。枚举类型约束业务取值范围,Schema 定义列级规则,数据血缘追踪记录转换历史,三者协同构成数据质量保障体系。
4. 数据采集层
统一的数据采集接口屏蔽了不同数据源(CSV、API、数据库)的访问差异,配合缓存策略和重试机制,确保数据采集的可靠性和效率。合成数据生成器为开发和测试提供了可控的数据环境。
5. 数据转换管道
基于管道模式(Pipeline Pattern)构建可组合的数据转换流程,每个转换步骤独立可测、可复用。BaseTransform 抽象基类定义了统一接口,TransformResult 记录转换元数据,支持血缘追踪和影响分析。
6. 统计分析层
- 描述性统计:通过
DistributionProfile系统化刻画数据分布特征,包括集中趋势、离散程度和分布形态 - 推断性统计:假设检验(t 检验、ANOVA、卡方检验)从样本推断总体特征,p 值与效应量结合判断统计显著性与实际意义
- 时间序列分析:趋势分解、平稳性检验、自相关分析为时序数据建模奠定基础
7. 可视化层
主题系统实现图表风格统一,BaseChart 抽象基类确保接口一致,具体图表类(分布图、箱线图、热力图、时序图)封装专业可视化逻辑。出版级主题配置满足学术发表需求。
8. 报告生成
AnalysisReport 类实现程序化报告构建,支持 Markdown、HTML、JSON 多格式输出。自动化报告管道结合调度器实现定期报告生成与分发,将分析结果及时送达决策者。
27.10.2 设计模式总结
| 设计模式 | 应用场景 | 本章实现 |
|---|---|---|
| 管道模式 (Pipeline) | 数据转换流程 | Pipeline 类组合多个 BaseTransform |
| 策略模式 (Strategy) | 数据采集、缺失值填充 | DataAcquisitionService、FillMissingTransform |
| 模板方法 (Template Method) | 图表绘制流程 | BaseChart._setup() + 子类 plot() |
| 建造者模式 (Builder) | 报告构建 | AnalysisReport 链式调用 |
| 工厂模式 (Factory) | 数据源创建 | DataAcquisitionService.from_* 方法族 |
| 观察者模式 (Observer) | 报告调度通知 | ReportScheduler._send_notification() |
27.10.3 性能优化要点
- 向量化运算:优先使用 Pandas/NumPy 向量化操作替代 Python 循环,性能可提升 10-100 倍
- 数据类型优化:使用
category类型存储低基数字符串列,可减少 50%+ 内存占用 - 惰性求值:管道模式中各步骤仅在
run()调用时执行,避免中间结果的冗余计算 - 分块处理:对于超大数据集,使用
chunksize参数分块读取和处理 - 缓存策略:API 数据采集使用本地缓存,避免重复请求
27.11 习题与项目练习
27.11.1 基础练习
练习 1:数据验证框架
为以下销售数据设计 Pandera Schema,要求:
transaction_id:格式为TXN-+ 8位数字timestamp:介于 2024-01-01 至当前日期之间amount:正数,不超过 1,000,000currency:仅限USD、EUR、GBP、CNY、JPYstatus:仅限pending、completed、refunded、failed
import pandera as pa
class TransactionSchema(pa.SchemaModel):
pass练习 2:数据转换管道
实现以下转换步骤并组合为管道:
- 去除重复交易记录(基于
transaction_id) - 填充缺失的
currency为USD - 计算手续费列
fee = amount * 0.029 + 0.30(当 currency 为 USD 时) - 过滤掉
failed状态的交易
class DeduplicateTransform(BaseTransform):
def _apply(self, df: pd.DataFrame) -> pd.DataFrame:
pass
class FillCurrencyTransform(BaseTransform):
def _apply(self, df: pd.DataFrame) -> pd.DataFrame:
pass
class ComputeFeeTransform(BaseTransform):
def _apply(self, df: pd.DataFrame) -> pd.DataFrame:
pass练习 3:描述性统计分析
对给定的数据集,计算以下统计量并生成分布画像:
- 均值、中位数、众数
- 标准差、方差、变异系数
- 偏度、峰度
- 四分位数、IQR
- 异常值比例(基于 1.5 × IQR 规则)
27.11.2 进阶练习
练习 4:多数据源融合分析
实现一个分析系统,从以下三个数据源采集数据并融合分析:
- CSV 文件:产品基础信息(product_id, name, category, price)
- REST API:实时库存数据(product_id, stock, warehouse)
- SQLite 数据库:历史销售记录(product_id, date, quantity, revenue)
要求:
- 设计统一的数据模型和验证规则
- 实现数据融合管道,处理产品ID不一致问题
- 生成库存预警报告(库存低于近30天日均销量的7倍)
练习 5:A/B 测试分析框架
实现一个通用的 A/B 测试分析框架:
@dataclass
class ABTestResult:
test_name: str
control_mean: float
treatment_mean: float
lift: float
p_value: float
confidence_interval: tuple[float, float]
effect_size: float
power: float
recommendation: str
class ABTestAnalyzer:
def __init__(self, alpha: float = 0.05, power: float = 0.8):
self.alpha = alpha
self.power = power
def analyze(
self,
control: np.ndarray,
treatment: np.ndarray,
test_name: str = "A/B Test",
) -> ABTestResult:
pass
def required_sample_size(
self,
baseline_rate: float,
minimum_detectable_effect: float,
) -> int:
pass要求:
- 支持连续型指标(t 检验)和比例型指标(Z 检验)
- 计算效应量(Cohen's d / Cohen's h)
- 计算统计功效和所需样本量
- 生成包含置信区间的检验报告
练习 6:时间序列预测管道
基于本章的 TimeSeriesAnalyzer,扩展实现:
- 平稳性检验(ADF 检验、KPSS 检验)
- 自相关分析(ACF、PACF 图)
- 简单预测模型(移动平均、指数平滑)
- 预测精度评估(MAE、RMSE、MAPE)
27.11.3 综合项目
项目:电商数据分析平台
构建一个完整的电商数据分析平台,包含以下模块:
阶段一:数据基础层
- 设计数据模型,覆盖用户、商品、订单、支付、物流五个核心实体
- 实现多数据源采集器(CSV 导入 + API 同步 + 数据库查询)
- 构建数据验证层,定义业务规则约束
- 实现合成数据生成器,生成至少 10,000 条订单数据用于测试
阶段二:分析引擎层
- 构建数据转换管道,实现 ETL 流程
- 实现用户行为分析模块(RFM 模型、用户生命周期、留存分析)
- 实现商品分析模块(关联规则、品类分析、价格弹性)
- 实现运营分析模块(漏斗分析、转化率、A/B 测试)
阶段三:可视化与报告层
- 设计可复用的图表主题系统
- 实现交互式仪表盘(使用 Plotly Dash 或 Streamlit)
- 构建自动化报告生成管道
- 实现报告调度与邮件分发
阶段四:工程质量保障
- 编写单元测试,覆盖核心分析逻辑
- 实现数据质量监控(异常检测、漂移预警)
- 添加日志和性能追踪
- 编写项目文档和 API 文档
交付要求:
- 完整的项目代码,遵循本章架构设计
- 测试覆盖率 ≥ 80%
- 项目文档包含架构图、数据字典、API 说明
- 至少生成 3 种类型的分析报告示例
27.12 延伸阅读
27.12.1 数据分析工程
- 《Data Engineering Cookbook》 — Data Engineering 的实践指南,涵盖数据管道、数据仓库和数据湖架构
- 《Fundamentals of Data Engineering》 (Joe Reis & Matt Housley) — 系统阐述数据工程生命周期,从数据生成到分析全链路
- 《The Data Warehouse Toolkit》 (Ralph Kimball) — 维度建模的经典参考,数据仓库设计的基石
- dbt 文档 (https://docs.getdbt.com/) — 数据转换工具 dbt 的官方文档,现代数据分析工程的事实标准
27.12.2 Python 数据科学生态
- 《Python for Data Analysis》 (Wes McKinney) — Pandas 作者亲著,数据处理与分析的权威参考
- 《Effective Pandas》 (Matt Harrison) — 深入 Pandas 高级用法和性能优化技巧
- NumPy 官方文档 (https://numpy.org/doc/) — 数值计算的完整参考
- Pandas 官方文档 — Scaling to large datasets (https://pandas.pydata.org/docs/user_guide/scale.html) — 大规模数据集处理指南
27.12.3 统计学与可视化
- 《Practical Statistics for Data Scientists》 (Peter Bruce 等) — 面向数据科学家的实用统计方法
- 《Storytelling with Data》 (Cole Nussbaumer Knaflic) — 数据可视化与叙事的实践指南
- 《The Visual Display of Quantitative Information》 (Edward Tufte) — 信息可视化设计的经典著作
- Seaborn 官方教程 (https://seaborn.pydata.org/tutorial.html) — 统计可视化最佳实践
27.12.4 生产级数据分析
- Great Expectations (https://greatexpectations.io/) — 数据验证和文档化平台
- Pandera (https://pandera.readthedocs.io/) — Python 数据验证框架
- Apache Airflow (https://airflow.apache.org/) — 工作流编排平台,用于调度数据管道
- Prefect (https://prefect.io/) — 现代 Python 工作流编排框架
- Polars (https://pola.rs/) — 高性能 DataFrame 库,Pandas 的现代替代方案