Skip to content

第30章 机器学习入门

学习目标

完成本章学习后,你将能够:

  1. 理解机器学习基础概念:监督学习、无监督学习、强化学习
  2. 掌握数据预处理技术:数据清洗、特征工程、数据标准化
  3. 使用scikit-learn进行分类任务:决策树、随机森林、SVM
  4. 实现回归分析:线性回归、多项式回归、正则化
  5. 应用聚类算法:K-Means、层次聚类、DBSCAN
  6. 进行模型评估:交叉验证、混淆矩阵、ROC曲线
  7. 实现模型调优:网格搜索、随机搜索、贝叶斯优化
  8. 构建机器学习流水线:特征转换、模型训练、预测部署

30.1 机器学习基础

30.1.1 什么是机器学习

机器学习(Machine Learning)是人工智能的一个分支,它使计算机系统能够从数据中学习并改进,而无需进行明确的编程。

┌─────────────────────────────────────────────────────────────────────┐
│                        机器学习分类                                  │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │                    监督学习 (Supervised Learning)             │   │
│  │  ┌───────────────┐              ┌───────────────┐           │   │
│  │  │    分类       │              │    回归       │           │   │
│  │  │  Classification│              │  Regression   │           │   │
│  │  │               │              │               │           │   │
│  │  │ • 邮件分类    │              │ • 房价预测    │           │   │
│  │  │ • 图像识别    │              │ • 股票预测    │           │   │
│  │  │ • 疾病诊断    │              │ • 销量预测    │           │   │
│  │  └───────────────┘              └───────────────┘           │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                                                     │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │                  无监督学习 (Unsupervised Learning)           │   │
│  │  ┌───────────────┐              ┌───────────────┐           │   │
│  │  │    聚类       │              │   降维        │           │   │
│  │  │  Clustering   │              │ Dimensionality│           │   │
│  │  │               │              │  Reduction    │           │   │
│  │  │ • 客户分群    │              │ • PCA        │           │   │
│  │  │ • 异常检测    │              │ • t-SNE      │           │   │
│  │  │ • 图像分割    │              │ • LSA        │           │   │
│  │  └───────────────┘              └───────────────┘           │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                                                     │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │                   强化学习 (Reinforcement Learning)           │   │
│  │  • 游戏AI    • 机器人控制    • 自动驾驶                       │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

30.1.2 机器学习工作流程

python
from dataclasses import dataclass, field
from typing import Dict, List, Any, Optional, Union
from enum import Enum
import numpy as np
import pandas as pd


class TaskType(Enum):
    CLASSIFICATION = "classification"
    REGRESSION = "regression"
    CLUSTERING = "clustering"
    DIMENSIONALITY_REDUCTION = "dimensionality_reduction"


class DataType(Enum):
    NUMERICAL = "numerical"
    CATEGORICAL = "categorical"
    TEXT = "text"
    IMAGE = "image"
    TIME_SERIES = "time_series"


@dataclass
class Dataset:
    X: np.ndarray
    y: Optional[np.ndarray] = None
    feature_names: List[str] = field(default_factory=list)
    target_name: str = "target"
    task_type: TaskType = TaskType.CLASSIFICATION

    @property
    def n_samples(self) -> int:
        return self.X.shape[0]

    @property
    def n_features(self) -> int:
        return self.X.shape[1] if len(self.X.shape) > 1 else 1

    @property
    def n_classes(self) -> int:
        if self.y is None:
            return 0
        return len(np.unique(self.y))

    def split(
        self,
        test_size: float = 0.2,
        random_state: int = 42,
        stratify: bool = False
    ) -> tuple:
        from sklearn.model_selection import train_test_split

        if self.y is None:
            X_train, X_test = train_test_split(
                self.X,
                test_size=test_size,
                random_state=random_state
            )
            return (
                Dataset(X_train, feature_names=self.feature_names),
                Dataset(X_test, feature_names=self.feature_names)
            )

        stratify_param = self.y if stratify else None
        X_train, X_test, y_train, y_test = train_test_split(
            self.X, self.y,
            test_size=test_size,
            random_state=random_state,
            stratify=stratify_param
        )

        return (
            Dataset(X_train, y_train, self.feature_names, self.target_name, self.task_type),
            Dataset(X_test, y_test, self.feature_names, self.target_name, self.task_type)
        )


@dataclass
class ModelMetrics:
    accuracy: Optional[float] = None
    precision: Optional[float] = None
    recall: Optional[float] = None
    f1_score: Optional[float] = None
    r2_score: Optional[float] = None
    mse: Optional[float] = None
    mae: Optional[float] = None
    custom_metrics: Dict[str, float] = field(default_factory=dict)

    def to_dict(self) -> Dict[str, float]:
        result = {}
        for key, value in self.__dict__.items():
            if key != "custom_metrics" and value is not None:
                result[key] = value
        result.update(self.custom_metrics)
        return result

    def __repr__(self) -> str:
        metrics_str = ", ".join(
            f"{k}={v:.4f}" for k, v in self.to_dict().items()
        )
        return f"ModelMetrics({metrics_str})"

30.1.3 常用数据集

python
from sklearn.datasets import (
    load_iris,
    load_wine,
    load_breast_cancer,
    load_diabetes,
    load_boston,
    make_classification,
    make_regression,
    make_blobs
)
from typing import Tuple


class DatasetLoader:
    @staticmethod
    def load_iris_dataset() -> Dataset:
        data = load_iris()
        return Dataset(
            X=data.data,
            y=data.target,
            feature_names=list(data.feature_names),
            target_name="species",
            task_type=TaskType.CLASSIFICATION
        )

    @staticmethod
    def load_wine_dataset() -> Dataset:
        data = load_wine()
        return Dataset(
            X=data.data,
            y=data.target,
            feature_names=list(data.feature_names),
            target_name="wine_class",
            task_type=TaskType.CLASSIFICATION
        )

    @staticmethod
    def load_breast_cancer_dataset() -> Dataset:
        data = load_breast_cancer()
        return Dataset(
            X=data.data,
            y=data.target,
            feature_names=list(data.feature_names),
            target_name="cancer_type",
            task_type=TaskType.CLASSIFICATION
        )

    @staticmethod
    def load_diabetes_dataset() -> Dataset:
        data = load_diabetes()
        return Dataset(
            X=data.data,
            y=data.target,
            feature_names=list(data.feature_names),
            target_name="progression",
            task_type=TaskType.REGRESSION
        )

    @staticmethod
    def generate_classification(
        n_samples: int = 1000,
        n_features: int = 20,
        n_classes: int = 2,
        random_state: int = 42
    ) -> Dataset:
        X, y = make_classification(
            n_samples=n_samples,
            n_features=n_features,
            n_classes=n_classes,
            random_state=random_state
        )
        feature_names = [f"feature_{i}" for i in range(n_features)]
        return Dataset(
            X=X,
            y=y,
            feature_names=feature_names,
            task_type=TaskType.CLASSIFICATION
        )

    @staticmethod
    def generate_regression(
        n_samples: int = 1000,
        n_features: int = 20,
        random_state: int = 42
    ) -> Dataset:
        X, y = make_regression(
            n_samples=n_samples,
            n_features=n_features,
            random_state=random_state
        )
        feature_names = [f"feature_{i}" for i in range(n_features)]
        return Dataset(
            X=X,
            y=y,
            feature_names=feature_names,
            task_type=TaskType.REGRESSION
        )

    @staticmethod
    def generate_clustering(
        n_samples: int = 1000,
        n_features: int = 2,
        n_clusters: int = 3,
        random_state: int = 42
    ) -> Dataset:
        X, y = make_blobs(
            n_samples=n_samples,
            n_features=n_features,
            centers=n_clusters,
            random_state=random_state
        )
        feature_names = [f"feature_{i}" for i in range(n_features)]
        return Dataset(
            X=X,
            y=y,
            feature_names=feature_names,
            task_type=TaskType.CLUSTERING
        )

30.2 数据预处理

30.2.1 数据清洗

python
from sklearn.impute import SimpleImputer, KNNImputer
from sklearn.preprocessing import (
    StandardScaler,
    MinMaxScaler,
    RobustScaler,
    LabelEncoder,
    OneHotEncoder,
    OrdinalEncoder
)
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
import pandas as pd
import numpy as np
from typing import List, Optional, Dict, Any


class DataCleaner:
    def __init__(self):
        self.missing_values_summary: Dict[str, Any] = {}

    def analyze_missing_values(self, df: pd.DataFrame) -> pd.DataFrame:
        missing = df.isnull().sum()
        percentage = (missing / len(df)) * 100
        summary = pd.DataFrame({
            "missing_count": missing,
            "missing_percentage": percentage,
            "dtype": df.dtypes
        })
        self.missing_values_summary = summary.to_dict()
        return summary[summary["missing_count"] > 0].sort_values(
            "missing_percentage", ascending=False
        )

    def handle_missing_values(
        self,
        df: pd.DataFrame,
        strategy: str = "mean",
        columns: Optional[List[str]] = None,
        fill_value: Optional[Any] = None
    ) -> pd.DataFrame:
        df = df.copy()

        if columns is None:
            columns = df.select_dtypes(include=[np.number]).columns.tolist()

        if strategy == "drop":
            return df.dropna(subset=columns)

        if strategy == "fill" and fill_value is not None:
            df[columns] = df[columns].fillna(fill_value)
            return df

        imputer = SimpleImputer(strategy=strategy)
        df[columns] = imputer.fit_transform(df[columns])

        return df

    def remove_duplicates(self, df: pd.DataFrame, subset: Optional[List[str]] = None) -> pd.DataFrame:
        return df.drop_duplicates(subset=subset, keep="first")

    def remove_outliers(
        self,
        df: pd.DataFrame,
        columns: Optional[List[str]] = None,
        method: str = "iqr",
        threshold: float = 1.5
    ) -> pd.DataFrame:
        df = df.copy()

        if columns is None:
            columns = df.select_dtypes(include=[np.number]).columns.tolist()

        for col in columns:
            if method == "iqr":
                Q1 = df[col].quantile(0.25)
                Q3 = df[col].quantile(0.75)
                IQR = Q3 - Q1
                lower_bound = Q1 - threshold * IQR
                upper_bound = Q3 + threshold * IQR
                df = df[(df[col] >= lower_bound) & (df[col] <= upper_bound)]

            elif method == "zscore":
                z_scores = np.abs((df[col] - df[col].mean()) / df[col].std())
                df = df[z_scores <= threshold]

        return df


class FeatureEncoder:
    def __init__(self):
        self.label_encoders: Dict[str, LabelEncoder] = {}
        self.one_hot_encoder: Optional[OneHotEncoder] = None
        self.ordinal_encoder: Optional[OrdinalEncoder] = None

    def label_encode(
        self,
        df: pd.DataFrame,
        columns: List[str]
    ) -> pd.DataFrame:
        df = df.copy()

        for col in columns:
            le = LabelEncoder()
            df[col] = le.fit_transform(df[col].astype(str))
            self.label_encoders[col] = le

        return df

    def one_hot_encode(
        self,
        df: pd.DataFrame,
        columns: List[str],
        drop: str = "first"
    ) -> pd.DataFrame:
        df = df.copy()

        self.one_hot_encoder = OneHotEncoder(
            sparse_output=False,
            drop=drop,
            handle_unknown="ignore"
        )

        encoded = self.one_hot_encoder.fit_transform(df[columns])
        encoded_df = pd.DataFrame(
            encoded,
            columns=self.one_hot_encoder.get_feature_names_out(columns),
            index=df.index
        )

        df = df.drop(columns=columns)
        df = pd.concat([df, encoded_df], axis=1)

        return df

    def ordinal_encode(
        self,
        df: pd.DataFrame,
        columns: List[str],
        categories: Optional[Dict[str, List[str]]] = None
    ) -> pd.DataFrame:
        df = df.copy()

        if categories:
            cat_list = [categories.get(col, "auto") for col in columns]
        else:
            cat_list = "auto"

        self.ordinal_encoder = OrdinalEncoder(categories=cat_list)
        df[columns] = self.ordinal_encoder.fit_transform(df[columns])

        return df


class FeatureScaler:
    def __init__(self):
        self.scalers: Dict[str, Any] = {}

    def standard_scale(
        self,
        X: np.ndarray,
        feature_names: Optional[List[str]] = None
    ) -> np.ndarray:
        scaler = StandardScaler()
        scaled = scaler.fit_transform(X)
        self.scalers["standard"] = scaler
        return scaled

    def minmax_scale(
        self,
        X: np.ndarray,
        feature_range: Tuple[float, float] = (0, 1)
    ) -> np.ndarray:
        scaler = MinMaxScaler(feature_range=feature_range)
        scaled = scaler.fit_transform(X)
        self.scalers["minmax"] = scaler
        return scaled

    def robust_scale(self, X: np.ndarray) -> np.ndarray:
        scaler = RobustScaler()
        scaled = scaler.fit_transform(X)
        self.scalers["robust"] = scaler
        return scaled

    def log_transform(self, X: np.ndarray) -> np.ndarray:
        return np.log1p(X)

    def box_cox_transform(self, X: np.ndarray) -> np.ndarray:
        from scipy import stats
        transformed = np.zeros_like(X)
        for i in range(X.shape[1]):
            transformed[:, i], _ = stats.boxcox(X[:, i] + 1)
        return transformed

30.2.2 特征工程

python
from sklearn.feature_selection import (
    SelectKBest,
    f_classif,
    f_regression,
    mutual_info_classif,
    RFE
)
from sklearn.decomposition import PCA
from sklearn.preprocessing import PolynomialFeatures
import itertools


class FeatureEngineer:
    def __init__(self):
        self.poly_transformer: Optional[PolynomialFeatures] = None
        self.pca: Optional[PCA] = None

    def create_polynomial_features(
        self,
        X: np.ndarray,
        degree: int = 2,
        interaction_only: bool = False,
        include_bias: bool = False
    ) -> np.ndarray:
        self.poly_transformer = PolynomialFeatures(
            degree=degree,
            interaction_only=interaction_only,
            include_bias=include_bias
        )
        return self.poly_transformer.fit_transform(X)

    def create_interaction_features(
        self,
        df: pd.DataFrame,
        columns: List[str]
    ) -> pd.DataFrame:
        df = df.copy()

        for col1, col2 in itertools.combinations(columns, 2):
            df[f"{col1}_x_{col2}"] = df[col1] * df[col2]

        return df

    def create_ratio_features(
        self,
        df: pd.DataFrame,
        numerator: str,
        denominator: str,
        new_name: Optional[str] = None
    ) -> pd.DataFrame:
        df = df.copy()
        name = new_name or f"{numerator}_div_{denominator}"
        df[name] = df[numerator] / (df[denominator] + 1e-10)
        return df

    def bin_numerical_feature(
        self,
        df: pd.DataFrame,
        column: str,
        bins: int = 5,
        labels: Optional[List[str]] = None
    ) -> pd.DataFrame:
        df = df.copy()
        new_col = f"{column}_binned"
        df[new_col] = pd.cut(df[column], bins=bins, labels=labels)
        return df

    def apply_pca(
        self,
        X: np.ndarray,
        n_components: Union[int, float] = 0.95
    ) -> np.ndarray:
        self.pca = PCA(n_components=n_components)
        return self.pca.fit_transform(X)

    def select_features(
        self,
        X: np.ndarray,
        y: np.ndarray,
        k: int = 10,
        task_type: TaskType = TaskType.CLASSIFICATION
    ) -> Tuple[np.ndarray, np.ndarray]:
        score_func = f_classif if task_type == TaskType.CLASSIFICATION else f_regression

        selector = SelectKBest(score_func=score_func, k=k)
        X_selected = selector.fit_transform(X, y)

        return X_selected, selector.get_support_indices()


class DateTimeFeatures:
    @staticmethod
    def extract_features(
        df: pd.DataFrame,
        column: str,
        drop_original: bool = True
    ) -> pd.DataFrame:
        df = df.copy()
        dt_col = pd.to_datetime(df[column])

        df[f"{column}_year"] = dt_col.dt.year
        df[f"{column}_month"] = dt_col.dt.month
        df[f"{column}_day"] = dt_col.dt.day
        df[f"{column}_dayofweek"] = dt_col.dt.dayofweek
        df[f"{column}_dayofyear"] = dt_col.dt.dayofyear
        df[f"{column}_weekofyear"] = dt_col.dt.isocalendar().week
        df[f"{column}_quarter"] = dt_col.dt.quarter
        df[f"{column}_hour"] = dt_col.dt.hour
        df[f"{column}_minute"] = dt_col.dt.minute
        df[f"{column}_is_weekend"] = (dt_col.dt.dayofweek >= 5).astype(int)

        if drop_original:
            df = df.drop(columns=[column])

        return df


class TextFeatures:
    @staticmethod
    def extract_basic_features(
        df: pd.DataFrame,
        column: str,
        drop_original: bool = True
    ) -> pd.DataFrame:
        df = df.copy()

        df[f"{column}_length"] = df[column].str.len()
        df[f"{column}_word_count"] = df[column].str.split().str.len()
        df[f"{column}_char_count"] = df[column].str.len()
        df[f"{column}_avg_word_length"] = (
            df[f"{column}_char_count"] / (df[f"{column}_word_count"] + 1)
        )

        if drop_original:
            df = df.drop(columns=[column])

        return df

30.3 分类算法

30.3.1 决策树

python
from sklearn.tree import DecisionTreeClassifier, DecisionTreeRegressor
from sklearn.ensemble import (
    RandomForestClassifier,
    RandomForestRegressor,
    GradientBoostingClassifier,
    GradientBoostingRegressor,
    AdaBoostClassifier
)
from sklearn.svm import SVC, SVR
from sklearn.neighbors import KNeighborsClassifier, KNeighborsRegressor
from sklearn.linear_model import LogisticRegression, LinearRegression, Ridge, Lasso
from sklearn.naive_bayes import GaussianNB, MultinomialNB
from sklearn.neural_network import MLPClassifier
import matplotlib.pyplot as plt
from typing import Tuple, Optional


class DecisionTreeModel:
    def __init__(
        self,
        max_depth: Optional[int] = None,
        min_samples_split: int = 2,
        min_samples_leaf: int = 1,
        criterion: str = "gini",
        random_state: int = 42
    ):
        self.model = DecisionTreeClassifier(
            max_depth=max_depth,
            min_samples_split=min_samples_split,
            min_samples_leaf=min_samples_leaf,
            criterion=criterion,
            random_state=random_state
        )
        self.feature_importances_: Optional[np.ndarray] = None

    def fit(self, X: np.ndarray, y: np.ndarray) -> "DecisionTreeModel":
        self.model.fit(X, y)
        self.feature_importances_ = self.model.feature_importances_
        return self

    def predict(self, X: np.ndarray) -> np.ndarray:
        return self.model.predict(X)

    def predict_proba(self, X: np.ndarray) -> np.ndarray:
        return self.model.predict_proba(X)

    def get_feature_importance(
        self,
        feature_names: List[str],
        top_n: int = 10
    ) -> pd.DataFrame:
        importance_df = pd.DataFrame({
            "feature": feature_names,
            "importance": self.feature_importances_
        })
        return importance_df.nlargest(top_n, "importance")

    def plot_feature_importance(
        self,
        feature_names: List[str],
        top_n: int = 10,
        figsize: Tuple[int, int] = (10, 6)
    ) -> None:
        importance_df = self.get_feature_importance(feature_names, top_n)

        plt.figure(figsize=figsize)
        plt.barh(importance_df["feature"], importance_df["importance"])
        plt.xlabel("Feature Importance")
        plt.ylabel("Feature")
        plt.title("Feature Importance (Decision Tree)")
        plt.tight_layout()
        plt.show()


class RandomForestModel:
    def __init__(
        self,
        n_estimators: int = 100,
        max_depth: Optional[int] = None,
        min_samples_split: int = 2,
        min_samples_leaf: int = 1,
        max_features: str = "sqrt",
        random_state: int = 42,
        n_jobs: int = -1
    ):
        self.model = RandomForestClassifier(
            n_estimators=n_estimators,
            max_depth=max_depth,
            min_samples_split=min_samples_split,
            min_samples_leaf=min_samples_leaf,
            max_features=max_features,
            random_state=random_state,
            n_jobs=n_jobs
        )
        self.feature_importances_: Optional[np.ndarray] = None

    def fit(self, X: np.ndarray, y: np.ndarray) -> "RandomForestModel":
        self.model.fit(X, y)
        self.feature_importances_ = self.model.feature_importances_
        return self

    def predict(self, X: np.ndarray) -> np.ndarray:
        return self.model.predict(X)

    def predict_proba(self, X: np.ndarray) -> np.ndarray:
        return self.model.predict_proba(X)

    def get_oob_score(self) -> float:
        return self.model.oob_score_ if hasattr(self.model, "oob_score_") else None


class GradientBoostingModel:
    def __init__(
        self,
        n_estimators: int = 100,
        learning_rate: float = 0.1,
        max_depth: int = 3,
        min_samples_split: int = 2,
        min_samples_leaf: int = 1,
        subsample: float = 1.0,
        random_state: int = 42
    ):
        self.model = GradientBoostingClassifier(
            n_estimators=n_estimators,
            learning_rate=learning_rate,
            max_depth=max_depth,
            min_samples_split=min_samples_split,
            min_samples_leaf=min_samples_leaf,
            subsample=subsample,
            random_state=random_state
        )

    def fit(self, X: np.ndarray, y: np.ndarray) -> "GradientBoostingModel":
        self.model.fit(X, y)
        return self

    def predict(self, X: np.ndarray) -> np.ndarray:
        return self.model.predict(X)

    def predict_proba(self, X: np.ndarray) -> np.ndarray:
        return self.model.predict_proba(X)

    def get_feature_importance(self, feature_names: List[str]) -> pd.DataFrame:
        return pd.DataFrame({
            "feature": feature_names,
            "importance": self.model.feature_importances_
        }).sort_values("importance", ascending=False)

30.3.2 支持向量机

python
class SVMModel:
    def __init__(
        self,
        kernel: str = "rbf",
        C: float = 1.0,
        gamma: str = "scale",
        degree: int = 3,
        random_state: int = 42
    ):
        self.model = SVC(
            kernel=kernel,
            C=C,
            gamma=gamma,
            degree=degree,
            probability=True,
            random_state=random_state
        )

    def fit(self, X: np.ndarray, y: np.ndarray) -> "SVMModel":
        self.model.fit(X, y)
        return self

    def predict(self, X: np.ndarray) -> np.ndarray:
        return self.model.predict(X)

    def predict_proba(self, X: np.ndarray) -> np.ndarray:
        return self.model.predict_proba(X)

    def get_support_vectors(self) -> np.ndarray:
        return self.model.support_vectors_


class KNNModel:
    def __init__(
        self,
        n_neighbors: int = 5,
        weights: str = "uniform",
        algorithm: str = "auto",
        metric: str = "minkowski",
        p: int = 2
    ):
        self.model = KNeighborsClassifier(
            n_neighbors=n_neighbors,
            weights=weights,
            algorithm=algorithm,
            metric=metric,
            p=p
        )

    def fit(self, X: np.ndarray, y: np.ndarray) -> "KNNModel":
        self.model.fit(X, y)
        return self

    def predict(self, X: np.ndarray) -> np.ndarray:
        return self.model.predict(X)

    def predict_proba(self, X: np.ndarray) -> np.ndarray:
        return self.model.predict_proba(X)

    def kneighbors(self, X: np.ndarray, n_neighbors: int = None) -> Tuple[np.ndarray, np.ndarray]:
        return self.model.kneighbors(X, n_neighbors=n_neighbors)


class LogisticRegressionModel:
    def __init__(
        self,
        penalty: str = "l2",
        C: float = 1.0,
        solver: str = "lbfgs",
        max_iter: int = 1000,
        random_state: int = 42
    ):
        self.model = LogisticRegression(
            penalty=penalty,
            C=C,
            solver=solver,
            max_iter=max_iter,
            random_state=random_state
        )

    def fit(self, X: np.ndarray, y: np.ndarray) -> "LogisticRegressionModel":
        self.model.fit(X, y)
        return self

    def predict(self, X: np.ndarray) -> np.ndarray:
        return self.model.predict(X)

    def predict_proba(self, X: np.ndarray) -> np.ndarray:
        return self.model.predict_proba(X)

    def get_coefficients(self, feature_names: List[str]) -> pd.DataFrame:
        return pd.DataFrame({
            "feature": feature_names,
            "coefficient": self.model.coef_[0]
        }).sort_values("coefficient", key=abs, ascending=False)

30.3.3 朴素贝叶斯

python
class NaiveBayesModel:
    def __init__(self, model_type: str = "gaussian"):
        if model_type == "gaussian":
            self.model = GaussianNB()
        elif model_type == "multinomial":
            self.model = MultinomialNB()
        else:
            raise ValueError(f"Unknown model type: {model_type}")

    def fit(self, X: np.ndarray, y: np.ndarray) -> "NaiveBayesModel":
        self.model.fit(X, y)
        return self

    def predict(self, X: np.ndarray) -> np.ndarray:
        return self.model.predict(X)

    def predict_proba(self, X: np.ndarray) -> np.ndarray:
        return self.model.predict_proba(X)


class MultiLayerPerceptron:
    def __init__(
        self,
        hidden_layer_sizes: Tuple[int, ...] = (100,),
        activation: str = "relu",
        solver: str = "adam",
        alpha: float = 0.0001,
        learning_rate: str = "constant",
        learning_rate_init: float = 0.001,
        max_iter: int = 200,
        random_state: int = 42
    ):
        self.model = MLPClassifier(
            hidden_layer_sizes=hidden_layer_sizes,
            activation=activation,
            solver=solver,
            alpha=alpha,
            learning_rate=learning_rate,
            learning_rate_init=learning_rate_init,
            max_iter=max_iter,
            random_state=random_state
        )

    def fit(self, X: np.ndarray, y: np.ndarray) -> "MultiLayerPerceptron":
        self.model.fit(X, y)
        return self

    def predict(self, X: np.ndarray) -> np.ndarray:
        return self.model.predict(X)

    def predict_proba(self, X: np.ndarray) -> np.ndarray:
        return self.model.predict_proba(X)

    def get_loss_curve(self) -> List[float]:
        return self.model.loss_curve_

30.4 回归算法

30.4.1 线性回归

python
from sklearn.linear_model import (
    LinearRegression,
    Ridge,
    Lasso,
    ElasticNet,
    BayesianRidge
)
from sklearn.preprocessing import PolynomialFeatures
from sklearn.pipeline import Pipeline


class LinearRegressionModel:
    def __init__(self, fit_intercept: bool = True):
        self.model = LinearRegression(fit_intercept=fit_intercept)
        self.coefficients_: Optional[np.ndarray] = None
        self.intercept_: Optional[float] = None

    def fit(self, X: np.ndarray, y: np.ndarray) -> "LinearRegressionModel":
        self.model.fit(X, y)
        self.coefficients_ = self.model.coef_
        self.intercept_ = self.model.intercept_
        return self

    def predict(self, X: np.ndarray) -> np.ndarray:
        return self.model.predict(X)

    def get_equation(self, feature_names: List[str]) -> str:
        terms = []
        for name, coef in zip(feature_names, self.coefficients_):
            if coef != 0:
                terms.append(f"{coef:.4f} * {name}")

        equation = " + ".join(terms)
        if self.intercept_ != 0:
            equation += f" + {self.intercept_:.4f}"

        return f"y = {equation}"

    def get_coefficients_df(self, feature_names: List[str]) -> pd.DataFrame:
        return pd.DataFrame({
            "feature": ["intercept"] + list(feature_names),
            "coefficient": [self.intercept_] + list(self.coefficients_)
        })


class RidgeRegression:
    def __init__(
        self,
        alpha: float = 1.0,
        fit_intercept: bool = True,
        solver: str = "auto"
    ):
        self.model = Ridge(
            alpha=alpha,
            fit_intercept=fit_intercept,
            solver=solver
        )

    def fit(self, X: np.ndarray, y: np.ndarray) -> "RidgeRegression":
        self.model.fit(X, y)
        return self

    def predict(self, X: np.ndarray) -> np.ndarray:
        return self.model.predict(X)


class LassoRegression:
    def __init__(
        self,
        alpha: float = 1.0,
        fit_intercept: bool = True,
        max_iter: int = 1000
    ):
        self.model = Lasso(
            alpha=alpha,
            fit_intercept=fit_intercept,
            max_iter=max_iter
        )

    def fit(self, X: np.ndarray, y: np.ndarray) -> "LassoRegression":
        self.model.fit(X, y)
        return self

    def predict(self, X: np.ndarray) -> np.ndarray:
        return self.model.predict(X)

    def get_selected_features(self, feature_names: List[str]) -> List[str]:
        return [
            name for name, coef in zip(feature_names, self.model.coef_)
            if coef != 0
        ]


class PolynomialRegression:
    def __init__(self, degree: int = 2, include_bias: bool = True):
        self.degree = degree
        self.pipeline = Pipeline([
            ("poly", PolynomialFeatures(degree=degree, include_bias=include_bias)),
            ("linear", LinearRegression())
        ])

    def fit(self, X: np.ndarray, y: np.ndarray) -> "PolynomialRegression":
        self.pipeline.fit(X, y)
        return self

    def predict(self, X: np.ndarray) -> np.ndarray:
        return self.pipeline.predict(X)

    def get_polynomial_features(self) -> List[str]:
        return self.pipeline.named_steps["poly"].get_feature_names_out()

30.4.2 决策树回归

python
class DecisionTreeRegressorModel:
    def __init__(
        self,
        max_depth: Optional[int] = None,
        min_samples_split: int = 2,
        min_samples_leaf: int = 1,
        random_state: int = 42
    ):
        self.model = DecisionTreeRegressor(
            max_depth=max_depth,
            min_samples_split=min_samples_split,
            min_samples_leaf=min_samples_leaf,
            random_state=random_state
        )

    def fit(self, X: np.ndarray, y: np.ndarray) -> "DecisionTreeRegressorModel":
        self.model.fit(X, y)
        return self

    def predict(self, X: np.ndarray) -> np.ndarray:
        return self.model.predict(X)


class RandomForestRegressorModel:
    def __init__(
        self,
        n_estimators: int = 100,
        max_depth: Optional[int] = None,
        min_samples_split: int = 2,
        min_samples_leaf: int = 1,
        random_state: int = 42,
        n_jobs: int = -1
    ):
        self.model = RandomForestRegressor(
            n_estimators=n_estimators,
            max_depth=max_depth,
            min_samples_split=min_samples_split,
            min_samples_leaf=min_samples_leaf,
            random_state=random_state,
            n_jobs=n_jobs
        )

    def fit(self, X: np.ndarray, y: np.ndarray) -> "RandomForestRegressorModel":
        self.model.fit(X, y)
        return self

    def predict(self, X: np.ndarray) -> np.ndarray:
        return self.model.predict(X)


class GradientBoostingRegressorModel:
    def __init__(
        self,
        n_estimators: int = 100,
        learning_rate: float = 0.1,
        max_depth: int = 3,
        min_samples_split: int = 2,
        min_samples_leaf: int = 1,
        subsample: float = 1.0,
        random_state: int = 42
    ):
        self.model = GradientBoostingRegressor(
            n_estimators=n_estimators,
            learning_rate=learning_rate,
            max_depth=max_depth,
            min_samples_split=min_samples_split,
            min_samples_leaf=min_samples_leaf,
            subsample=subsample,
            random_state=random_state
        )

    def fit(self, X: np.ndarray, y: np.ndarray) -> "GradientBoostingRegressorModel":
        self.model.fit(X, y)
        return self

    def predict(self, X: np.ndarray) -> np.ndarray:
        return self.model.predict(X)

    def get_training_score(self) -> List[float]:
        return list(self.model.train_score_)

30.5 聚类算法

30.5.1 K-Means聚类

python
from sklearn.cluster import KMeans, AgglomerativeClustering, DBSCAN
from sklearn.metrics import silhouette_score, calinski_harabasz_score, davies_bouldin_score
from scipy.cluster.hierarchy import dendrogram, linkage
import matplotlib.pyplot as plt


class KMeansClustering:
    def __init__(
        self,
        n_clusters: int = 8,
        init: str = "k-means++",
        n_init: int = 10,
        max_iter: int = 300,
        random_state: int = 42
    ):
        self.model = KMeans(
            n_clusters=n_clusters,
            init=init,
            n_init=n_init,
            max_iter=max_iter,
            random_state=random_state
        )
        self.cluster_centers_: Optional[np.ndarray] = None
        self.labels_: Optional[np.ndarray] = None
        self.inertia_: Optional[float] = None

    def fit(self, X: np.ndarray) -> "KMeansClustering":
        self.model.fit(X)
        self.cluster_centers_ = self.model.cluster_centers_
        self.labels_ = self.model.labels_
        self.inertia_ = self.model.inertia_
        return self

    def predict(self, X: np.ndarray) -> np.ndarray:
        return self.model.predict(X)

    def fit_predict(self, X: np.ndarray) -> np.ndarray:
        self.fit(X)
        return self.labels_

    def find_optimal_k(
        self,
        X: np.ndarray,
        k_range: range = range(2, 11),
        method: str = "elbow"
    ) -> Tuple[List[int], List[float]]:
        scores = []
        k_values = list(k_range)

        for k in k_values:
            kmeans = KMeans(n_clusters=k, random_state=42)
            kmeans.fit(X)

            if method == "elbow":
                scores.append(kmeans.inertia_)
            elif method == "silhouette":
                scores.append(silhouette_score(X, kmeans.labels_))

        return k_values, scores

    def plot_elbow_curve(
        self,
        X: np.ndarray,
        k_range: range = range(2, 11),
        figsize: Tuple[int, int] = (10, 6)
    ) -> None:
        k_values, inertias = self.find_optimal_k(X, k_range, method="elbow")

        plt.figure(figsize=figsize)
        plt.plot(k_values, inertias, "bo-")
        plt.xlabel("Number of Clusters (k)")
        plt.ylabel("Inertia")
        plt.title("Elbow Method for Optimal k")
        plt.grid(True)
        plt.show()

    def plot_silhouette_curve(
        self,
        X: np.ndarray,
        k_range: range = range(2, 11),
        figsize: Tuple[int, int] = (10, 6)
    ) -> None:
        k_values, scores = self.find_optimal_k(X, k_range, method="silhouette")

        plt.figure(figsize=figsize)
        plt.plot(k_values, scores, "ro-")
        plt.xlabel("Number of Clusters (k)")
        plt.ylabel("Silhouette Score")
        plt.title("Silhouette Score for Optimal k")
        plt.grid(True)
        plt.show()


class HierarchicalClustering:
    def __init__(
        self,
        n_clusters: int = 2,
        linkage: str = "ward",
        affinity: str = "euclidean"
    ):
        self.model = AgglomerativeClustering(
            n_clusters=n_clusters,
            linkage=linkage,
            affinity=affinity if linkage != "ward" else "euclidean"
        )
        self.labels_: Optional[np.ndarray] = None

    def fit(self, X: np.ndarray) -> "HierarchicalClustering":
        self.model.fit(X)
        self.labels_ = self.model.labels_
        return self

    def fit_predict(self, X: np.ndarray) -> np.ndarray:
        self.fit(X)
        return self.labels_

    def plot_dendrogram(
        self,
        X: np.ndarray,
        method: str = "ward",
        figsize: Tuple[int, int] = (12, 8)
    ) -> None:
        linked = linkage(X, method=method)

        plt.figure(figsize=figsize)
        dendrogram(linked)
        plt.title("Hierarchical Clustering Dendrogram")
        plt.xlabel("Sample Index")
        plt.ylabel("Distance")
        plt.show()


class DBSCANClustering:
    def __init__(
        self,
        eps: float = 0.5,
        min_samples: int = 5,
        metric: str = "euclidean"
    ):
        self.model = DBSCAN(
            eps=eps,
            min_samples=min_samples,
            metric=metric
        )
        self.labels_: Optional[np.ndarray] = None
        self.core_sample_indices_: Optional[np.ndarray] = None

    def fit(self, X: np.ndarray) -> "DBSCANClustering":
        self.model.fit(X)
        self.labels_ = self.model.labels_
        self.core_sample_indices_ = self.model.core_sample_indices_
        return self

    def fit_predict(self, X: np.ndarray) -> np.ndarray:
        self.fit(X)
        return self.labels_

    def get_cluster_stats(self) -> Dict[str, Any]:
        unique_labels = set(self.labels_)
        n_clusters = len(unique_labels) - (1 if -1 in self.labels_ else 0)
        n_noise = list(self.labels_).count(-1)

        return {
            "n_clusters": n_clusters,
            "n_noise_points": n_noise,
            "noise_percentage": n_noise / len(self.labels_) * 100
        }

30.6 模型评估

30.6.1 分类评估指标

python
from sklearn.metrics import (
    accuracy_score,
    precision_score,
    recall_score,
    f1_score,
    confusion_matrix,
    classification_report,
    roc_curve,
    auc,
    roc_auc_score,
    precision_recall_curve,
    average_precision_score
)
from sklearn.model_selection import cross_val_score, cross_validate
import seaborn as sns


class ClassificationEvaluator:
    def __init__(self, y_true: np.ndarray, y_pred: np.ndarray, y_proba: Optional[np.ndarray] = None):
        self.y_true = y_true
        self.y_pred = y_pred
        self.y_proba = y_proba
        self._metrics: Optional[ModelMetrics] = None

    def compute_metrics(self, average: str = "weighted") -> ModelMetrics:
        self._metrics = ModelMetrics(
            accuracy=accuracy_score(self.y_true, self.y_pred),
            precision=precision_score(self.y_true, self.y_pred, average=average),
            recall=recall_score(self.y_true, self.y_pred, average=average),
            f1_score=f1_score(self.y_true, self.y_pred, average=average)
        )
        return self._metrics

    def get_confusion_matrix(self) -> np.ndarray:
        return confusion_matrix(self.y_true, self.y_pred)

    def plot_confusion_matrix(
        self,
        labels: Optional[List[str]] = None,
        figsize: Tuple[int, int] = (8, 6),
        cmap: str = "Blues"
    ) -> None:
        cm = self.get_confusion_matrix()

        plt.figure(figsize=figsize)
        sns.heatmap(
            cm,
            annot=True,
            fmt="d",
            cmap=cmap,
            xticklabels=labels,
            yticklabels=labels
        )
        plt.xlabel("Predicted")
        plt.ylabel("Actual")
        plt.title("Confusion Matrix")
        plt.tight_layout()
        plt.show()

    def get_classification_report(self, target_names: Optional[List[str]] = None) -> str:
        return classification_report(self.y_true, self.y_pred, target_names=target_names)

    def plot_roc_curve(self, figsize: Tuple[int, int] = (8, 6)) -> None:
        if self.y_proba is None:
            raise ValueError("Probability predictions required for ROC curve")

        if len(self.y_proba.shape) > 1 and self.y_proba.shape[1] > 2:
            raise ValueError("ROC curve for multi-class requires one-vs-rest approach")

        if len(self.y_proba.shape) > 1:
            y_score = self.y_proba[:, 1]
        else:
            y_score = self.y_proba

        fpr, tpr, _ = roc_curve(self.y_true, y_score)
        roc_auc = auc(fpr, tpr)

        plt.figure(figsize=figsize)
        plt.plot(fpr, tpr, color="darkorange", lw=2, label=f"ROC curve (AUC = {roc_auc:.2f})")
        plt.plot([0, 1], [0, 1], color="navy", lw=2, linestyle="--")
        plt.xlim([0.0, 1.0])
        plt.ylim([0.0, 1.05])
        plt.xlabel("False Positive Rate")
        plt.ylabel("True Positive Rate")
        plt.title("Receiver Operating Characteristic (ROC) Curve")
        plt.legend(loc="lower right")
        plt.grid(True)
        plt.show()

    def plot_precision_recall_curve(self, figsize: Tuple[int, int] = (8, 6)) -> None:
        if self.y_proba is None:
            raise ValueError("Probability predictions required for PR curve")

        if len(self.y_proba.shape) > 1:
            y_score = self.y_proba[:, 1]
        else:
            y_score = self.y_proba

        precision, recall, _ = precision_recall_curve(self.y_true, y_score)
        ap = average_precision_score(self.y_true, y_score)

        plt.figure(figsize=figsize)
        plt.plot(recall, precision, color="blue", lw=2, label=f"PR curve (AP = {ap:.2f})")
        plt.xlabel("Recall")
        plt.ylabel("Precision")
        plt.title("Precision-Recall Curve")
        plt.legend(loc="lower left")
        plt.grid(True)
        plt.show()


class RegressionEvaluator:
    def __init__(self, y_true: np.ndarray, y_pred: np.ndarray):
        self.y_true = y_true
        self.y_pred = y_pred
        self._metrics: Optional[ModelMetrics] = None

    def compute_metrics(self) -> ModelMetrics:
        from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score

        self._metrics = ModelMetrics(
            r2_score=r2_score(self.y_true, self.y_pred),
            mse=mean_squared_error(self.y_true, self.y_pred),
            mae=mean_absolute_error(self.y_true, self.y_pred)
        )
        self._metrics.custom_metrics["rmse"] = np.sqrt(self._metrics.mse)
        return self._metrics

    def plot_residuals(self, figsize: Tuple[int, int] = (10, 4)) -> None:
        residuals = self.y_true - self.y_pred

        fig, axes = plt.subplots(1, 2, figsize=figsize)

        axes[0].scatter(self.y_pred, residuals, alpha=0.5)
        axes[0].axhline(y=0, color="r", linestyle="--")
        axes[0].set_xlabel("Predicted Values")
        axes[0].set_ylabel("Residuals")
        axes[0].set_title("Residuals vs Predicted")

        axes[1].hist(residuals, bins=30, edgecolor="black")
        axes[1].set_xlabel("Residuals")
        axes[1].set_ylabel("Frequency")
        axes[1].set_title("Distribution of Residuals")

        plt.tight_layout()
        plt.show()

    def plot_predictions(self, figsize: Tuple[int, int] = (8, 8)) -> None:
        plt.figure(figsize=figsize)
        plt.scatter(self.y_true, self.y_pred, alpha=0.5)
        min_val = min(self.y_true.min(), self.y_pred.min())
        max_val = max(self.y_true.max(), self.y_pred.max())
        plt.plot([min_val, max_val], [min_val, max_val], "r--", lw=2)
        plt.xlabel("Actual Values")
        plt.ylabel("Predicted Values")
        plt.title("Actual vs Predicted Values")
        plt.grid(True)
        plt.show()


class ClusteringEvaluator:
    def __init__(self, X: np.ndarray, labels: np.ndarray):
        self.X = X
        self.labels = labels

    def compute_metrics(self) -> Dict[str, float]:
        return {
            "silhouette_score": silhouette_score(self.X, self.labels),
            "calinski_harabasz_score": calinski_harabasz_score(self.X, self.labels),
            "davies_bouldin_score": davies_bouldin_score(self.X, self.labels)
        }

    def plot_clusters(self, figsize: Tuple[int, int] = (10, 8)) -> None:
        if self.X.shape[1] > 2:
            from sklearn.decomposition import PCA
            pca = PCA(n_components=2)
            X_2d = pca.fit_transform(self.X)
        else:
            X_2d = self.X

        plt.figure(figsize=figsize)
        scatter = plt.scatter(X_2d[:, 0], X_2d[:, 1], c=self.labels, cmap="viridis", alpha=0.6)
        plt.colorbar(scatter, label="Cluster")
        plt.xlabel("Component 1")
        plt.ylabel("Component 2")
        plt.title("Cluster Visualization")
        plt.show()

30.6.2 交叉验证

python
from sklearn.model_selection import (
    KFold,
    StratifiedKFold,
    LeaveOneOut,
    cross_val_predict
)


class CrossValidator:
    def __init__(
        self,
        cv: int = 5,
        stratified: bool = True,
        shuffle: bool = True,
        random_state: int = 42
    ):
        self.cv = cv
        self.stratified = stratified
        self.shuffle = shuffle
        self.random_state = random_state
        self._cv_splitter = None

    def get_splitter(self) -> Union[KFold, StratifiedKFold]:
        if self.stratified:
            return StratifiedKFold(
                n_splits=self.cv,
                shuffle=self.shuffle,
                random_state=self.random_state
            )
        return KFold(
            n_splits=self.cv,
            shuffle=self.shuffle,
            random_state=self.random_state
        )

    def cross_validate_model(
        self,
        model,
        X: np.ndarray,
        y: np.ndarray,
        scoring: List[str] = None
    ) -> Dict[str, Any]:
        if scoring is None:
            scoring = ["accuracy", "precision_weighted", "recall_weighted", "f1_weighted"]

        cv_splitter = self.get_splitter()

        results = cross_validate(
            model,
            X,
            y,
            cv=cv_splitter,
            scoring=scoring,
            return_train_score=True
        )

        summary = {}
        for metric in scoring:
            test_key = f"test_{metric}"
            train_key = f"train_{metric}"
            if test_key in results:
                summary[metric] = {
                    "test_mean": results[test_key].mean(),
                    "test_std": results[test_key].std(),
                    "train_mean": results[train_key].mean(),
                    "train_std": results[train_key].std()
                }

        return summary

    def get_cross_val_predictions(
        self,
        model,
        X: np.ndarray,
        y: np.ndarray
    ) -> np.ndarray:
        cv_splitter = self.get_splitter()
        return cross_val_predict(model, X, y, cv=cv_splitter)

30.7 模型调优

30.7.1 网格搜索

python
from sklearn.model_selection import GridSearchCV, RandomizedSearchCV
from scipy.stats import uniform, randint


class HyperparameterTuner:
    def __init__(
        self,
        model,
        param_grid: Dict[str, Any],
        cv: int = 5,
        scoring: str = "accuracy",
        n_jobs: int = -1,
        verbose: int = 1
    ):
        self.model = model
        self.param_grid = param_grid
        self.cv = cv
        self.scoring = scoring
        self.n_jobs = n_jobs
        self.verbose = verbose
        self.best_model = None
        self.best_params = None
        self.best_score = None

    def grid_search(
        self,
        X: np.ndarray,
        y: np.ndarray
    ) -> Any:
        grid_search = GridSearchCV(
            self.model,
            self.param_grid,
            cv=self.cv,
            scoring=self.scoring,
            n_jobs=self.n_jobs,
            verbose=self.verbose,
            return_train_score=True
        )

        grid_search.fit(X, y)

        self.best_model = grid_search.best_estimator_
        self.best_params = grid_search.best_params_
        self.best_score = grid_search.best_score_

        return self.best_model

    def random_search(
        self,
        X: np.ndarray,
        y: np.ndarray,
        n_iter: int = 50
    ) -> Any:
        random_search = RandomizedSearchCV(
            self.model,
            self.param_grid,
            n_iter=n_iter,
            cv=self.cv,
            scoring=self.scoring,
            n_jobs=self.n_jobs,
            verbose=self.verbose,
            random_state=42,
            return_train_score=True
        )

        random_search.fit(X, y)

        self.best_model = random_search.best_estimator_
        self.best_params = random_search.best_params_
        self.best_score = random_search.best_score_

        return self.best_model

    def get_results_df(self) -> pd.DataFrame:
        if self.best_model is None:
            raise ValueError("No search has been performed yet")

        return pd.DataFrame(self.best_model.cv_results_)


class RandomForestTuner:
    PARAM_GRID = {
        "n_estimators": [50, 100, 200],
        "max_depth": [None, 10, 20, 30],
        "min_samples_split": [2, 5, 10],
        "min_samples_leaf": [1, 2, 4],
        "max_features": ["sqrt", "log2"]
    }

    RANDOM_PARAM_GRID = {
        "n_estimators": randint(50, 300),
        "max_depth": randint(5, 50),
        "min_samples_split": randint(2, 20),
        "min_samples_leaf": randint(1, 10),
        "max_features": ["sqrt", "log2", None]
    }

    @classmethod
    def tune(
        cls,
        X: np.ndarray,
        y: np.ndarray,
        method: str = "grid",
        n_iter: int = 50
    ) -> Tuple[Any, Dict[str, Any]]:
        from sklearn.ensemble import RandomForestClassifier

        param_grid = cls.PARAM_GRID if method == "grid" else cls.RANDOM_PARAM_GRID

        tuner = HyperparameterTuner(
            RandomForestClassifier(random_state=42),
            param_grid
        )

        if method == "grid":
            best_model = tuner.grid_search(X, y)
        else:
            best_model = tuner.random_search(X, y, n_iter=n_iter)

        return best_model, tuner.best_params


class SVMTuner:
    PARAM_GRID = {
        "C": [0.1, 1, 10, 100],
        "kernel": ["rbf", "linear", "poly"],
        "gamma": ["scale", "auto", 0.1, 0.01]
    }

    RANDOM_PARAM_GRID = {
        "C": uniform(0.1, 100),
        "kernel": ["rbf", "linear", "poly"],
        "gamma": uniform(0.001, 1)
    }

    @classmethod
    def tune(
        cls,
        X: np.ndarray,
        y: np.ndarray,
        method: str = "random",
        n_iter: int = 30
    ) -> Tuple[Any, Dict[str, Any]]:
        param_grid = cls.PARAM_GRID if method == "grid" else cls.RANDOM_PARAM_GRID

        tuner = HyperparameterTuner(
            SVC(random_state=42),
            param_grid
        )

        if method == "grid":
            best_model = tuner.grid_search(X, y)
        else:
            best_model = tuner.random_search(X, y, n_iter=n_iter)

        return best_model, tuner.best_params

30.8 机器学习流水线

30.8.1 Scikit-learn Pipeline

python
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.feature_selection import SelectKBest, f_classif
import joblib
from pathlib import Path


class MLPipeline:
    def __init__(
        self,
        numerical_features: List[str],
        categorical_features: List[str],
        model,
        feature_selector: Optional[Any] = None
    ):
        self.numerical_features = numerical_features
        self.categorical_features = categorical_features
        self.model = model
        self.feature_selector = feature_selector
        self.pipeline: Optional[Pipeline] = None

    def build_pipeline(self) -> Pipeline:
        numerical_transformer = Pipeline(steps=[
            ("imputer", SimpleImputer(strategy="median")),
            ("scaler", StandardScaler())
        ])

        categorical_transformer = Pipeline(steps=[
            ("imputer", SimpleImputer(strategy="most_frequent")),
            ("onehot", OneHotEncoder(handle_unknown="ignore", sparse_output=False))
        ])

        preprocessor = ColumnTransformer(
            transformers=[
                ("num", numerical_transformer, self.numerical_features),
                ("cat", categorical_transformer, self.categorical_features)
            ]
        )

        steps = [("preprocessor", preprocessor)]

        if self.feature_selector:
            steps.append(("selector", self.feature_selector))

        steps.append(("model", self.model))

        self.pipeline = Pipeline(steps=steps)
        return self.pipeline

    def fit(self, X: pd.DataFrame, y: np.ndarray) -> "MLPipeline":
        if self.pipeline is None:
            self.build_pipeline()

        self.pipeline.fit(X, y)
        return self

    def predict(self, X: pd.DataFrame) -> np.ndarray:
        return self.pipeline.predict(X)

    def predict_proba(self, X: pd.DataFrame) -> np.ndarray:
        return self.pipeline.predict_proba(X)

    def save(self, filepath: str) -> None:
        joblib.dump(self.pipeline, filepath)

    @classmethod
    def load(cls, filepath: str) -> Pipeline:
        return joblib.load(filepath)


class AutoMLPipeline:
    def __init__(
        self,
        task_type: TaskType = TaskType.CLASSIFICATION,
        cv: int = 5,
        scoring: str = None
    ):
        self.task_type = task_type
        self.cv = cv
        self.scoring = scoring or ("accuracy" if task_type == TaskType.CLASSIFICATION else "r2")
        self.best_model = None
        self.best_params = None
        self.results = []

    def fit(
        self,
        X: pd.DataFrame,
        y: np.ndarray,
        models: Optional[List[Any]] = None,
        param_grids: Optional[Dict[str, Dict]] = None
    ) -> "AutoMLPipeline":
        if models is None:
            models = self._get_default_models()

        if param_grids is None:
            param_grids = self._get_default_param_grids()

        for name, model in models.items():
            print(f"Training {name}...")

            tuner = HyperparameterTuner(
                model,
                param_grids.get(name, {}),
                cv=self.cv,
                scoring=self.scoring
            )

            tuner.random_search(X, y, n_iter=20)

            self.results.append({
                "model_name": name,
                "best_score": tuner.best_score,
                "best_params": tuner.best_params,
                "model": tuner.best_model
            })

        self.results.sort(key=lambda x: x["best_score"], reverse=True)
        self.best_model = self.results[0]["model"]
        self.best_params = self.results[0]["best_params"]

        return self

    def _get_default_models(self) -> Dict[str, Any]:
        if self.task_type == TaskType.CLASSIFICATION:
            return {
                "RandomForest": RandomForestClassifier(random_state=42),
                "SVM": SVC(random_state=42),
                "LogisticRegression": LogisticRegression(random_state=42, max_iter=1000),
                "KNN": KNeighborsClassifier()
            }
        else:
            return {
                "RandomForest": RandomForestRegressor(random_state=42),
                "SVR": SVR(),
                "LinearRegression": LinearRegression(),
                "Ridge": Ridge(random_state=42)
            }

    def _get_default_param_grids(self) -> Dict[str, Dict]:
        if self.task_type == TaskType.CLASSIFICATION:
            return {
                "RandomForest": {"n_estimators": [50, 100, 200], "max_depth": [None, 10, 20]},
                "SVM": {"C": [0.1, 1, 10], "kernel": ["rbf", "linear"]},
                "LogisticRegression": {"C": [0.1, 1, 10]},
                "KNN": {"n_neighbors": [3, 5, 7, 9]}
            }
        else:
            return {
                "RandomForest": {"n_estimators": [50, 100, 200], "max_depth": [None, 10, 20]},
                "SVR": {"C": [0.1, 1, 10], "kernel": ["rbf", "linear"]},
                "Ridge": {"alpha": [0.1, 1, 10]}
            }

    def get_results_df(self) -> pd.DataFrame:
        return pd.DataFrame([
            {"model": r["model_name"], "score": r["best_score"]}
            for r in self.results
        ]).sort_values("score", ascending=False)

    def predict(self, X: np.ndarray) -> np.ndarray:
        if self.best_model is None:
            raise ValueError("Model not fitted yet")
        return self.best_model.predict(X)

30.9 知识图谱

30.9.1 机器学习工作流程

机器学习标准工作流程

┌─────────────────────────────────────────────────────────────┐
│  数据收集 → 数据清洗 → 特征工程 → 模型训练 → 评估优化 → 部署│
└─────────────────────────────────────────────────────────────┘

详细步骤:
┌─────────────────────────────────────────┐
│ 1. 数据收集    获取原始数据             │
│ 2. 数据清洗    处理缺失值、异常值       │
│ 3. 特征工程    特征提取、选择、转换     │
│ 4. 数据分割    训练集/验证集/测试集     │
│ 5. 模型选择    选择合适算法             │
│ 6. 模型训练    拟合数据                 │
│ 7. 模型评估    性能指标计算             │
│ 8. 超参调优    网格搜索、随机搜索       │
│ 9. 模型部署    生产环境应用             │
└─────────────────────────────────────────┘

30.9.2 算法分类

机器学习算法分类

监督学习:
┌─────────────────────────────────────────┐
│ 分类: 决策树、SVM、随机森林、神经网络   │
│ 回归: 线性回归、岭回归、LASSO           │
└─────────────────────────────────────────┘

无监督学习:
┌─────────────────────────────────────────┐
│ 聚类: K-Means、DBSCAN、层次聚类         │
│ 降维: PCA、t-SNE、UMAP                  │
└─────────────────────────────────────────┘

强化学习:
┌─────────────────────────────────────────┐
│ Q-Learning、DQN、PPO、A3C               │
└─────────────────────────────────────────┘

30.9.3 模型评估指标

分类评估指标

┌─────────────────────────────────────────┐
│ 准确率(Accuracy)   正确预测比例        │
│ 精确率(Precision)  预测为正中真正为正  │
│ 召回率(Recall)     真正为正中被预测为正│
│ F1分数             精确率和召回率调和平均│
│ AUC-ROC            分类阈值无关指标    │
└─────────────────────────────────────────┘

回归评估指标:
┌─────────────────────────────────────────┐
│ MSE    均方误差                         │
│ RMSE   均方根误差                       │
│ MAE    平均绝对误差                     │
│ R²     决定系数                         │
└─────────────────────────────────────────┘

30.10 技术选型指南

30.10.1 算法选型

数据量特征数推荐算法原因
决策树可解释性好
随机森林鲁棒性强
梯度提升性能最优
高维神经网络自动特征

30.10.2 框架选型

场景推荐框架原因
传统MLscikit-learn功能完整
深度学习PyTorch/TensorFlow灵活强大
AutoMLAutoGluon/H2O自动化
大规模Spark MLlib分布式

30.11 常见问题与解决方案

30.11.1 过拟合问题

python
# 问题:模型在训练集表现好,测试集差
# 解决方案:正则化、交叉验证、增加数据

from sklearn.linear_model import Ridge
from sklearn.model_selection import cross_val_score

# 使用正则化
model = Ridge(alpha=1.0)

# 交叉验证
scores = cross_val_score(model, X, y, cv=5)

30.11.2 数据不平衡

python
# 问题:类别分布不均衡
# 解决方案:过采样、欠采样、类别权重

from imblearn.over_sampling import SMOTE
from sklearn.utils.class_weight import compute_class_weight

# 过采样
smote = SMOTE()
X_resampled, y_resampled = smote.fit_resample(X, y)

# 类别权重
class_weights = compute_class_weight('balanced', classes=np.unique(y), y=y)

30.11.3 特征选择

python
# 问题:特征过多,模型复杂
# 解决方案:特征选择

from sklearn.feature_selection import SelectKBest, f_classif

# 选择最好的K个特征
selector = SelectKBest(f_classif, k=10)
X_selected = selector.fit_transform(X, y)

30.12 本章小结

本章详细介绍了Python机器学习的基础概念和实践:

  1. 机器学习基础:监督学习、无监督学习、工作流程
  2. 数据预处理:数据清洗、特征编码、特征缩放
  3. 特征工程:多项式特征、交互特征、降维
  4. 分类算法:决策树、随机森林、SVM、朴素贝叶斯
  5. 回归算法:线性回归、正则化回归、多项式回归
  6. 聚类算法:K-Means、层次聚类、DBSCAN
  7. 模型评估:混淆矩阵、ROC曲线、交叉验证
  8. 模型调优:网格搜索、随机搜索、自动化流水线

练习题

  1. 使用决策树对鸢尾花数据集进行分类,并可视化决策树
  2. 实现一个房价预测模型,使用多种回归算法比较性能
  3. 使用K-Means对客户数据进行聚类分析
  4. 构建一个完整的机器学习流水线,包含数据预处理、特征选择和模型训练
  5. 实现一个简单的AutoML工具,自动选择最佳模型和参数

扩展阅读

Python技术丛书 - 江苏省宿城中等专业学校