第30章 机器学习入门
学习目标
完成本章学习后,你将能够:
- 理解机器学习基础概念:监督学习、无监督学习、强化学习
- 掌握数据预处理技术:数据清洗、特征工程、数据标准化
- 使用scikit-learn进行分类任务:决策树、随机森林、SVM
- 实现回归分析:线性回归、多项式回归、正则化
- 应用聚类算法:K-Means、层次聚类、DBSCAN
- 进行模型评估:交叉验证、混淆矩阵、ROC曲线
- 实现模型调优:网格搜索、随机搜索、贝叶斯优化
- 构建机器学习流水线:特征转换、模型训练、预测部署
30.1 机器学习基础
30.1.1 什么是机器学习
机器学习(Machine Learning)是人工智能的一个分支,它使计算机系统能够从数据中学习并改进,而无需进行明确的编程。
┌─────────────────────────────────────────────────────────────────────┐
│ 机器学习分类 │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ 监督学习 (Supervised Learning) │ │
│ │ ┌───────────────┐ ┌───────────────┐ │ │
│ │ │ 分类 │ │ 回归 │ │ │
│ │ │ Classification│ │ Regression │ │ │
│ │ │ │ │ │ │ │
│ │ │ • 邮件分类 │ │ • 房价预测 │ │ │
│ │ │ • 图像识别 │ │ • 股票预测 │ │ │
│ │ │ • 疾病诊断 │ │ • 销量预测 │ │ │
│ │ └───────────────┘ └───────────────┘ │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ 无监督学习 (Unsupervised Learning) │ │
│ │ ┌───────────────┐ ┌───────────────┐ │ │
│ │ │ 聚类 │ │ 降维 │ │ │
│ │ │ Clustering │ │ Dimensionality│ │ │
│ │ │ │ │ Reduction │ │ │
│ │ │ • 客户分群 │ │ • PCA │ │ │
│ │ │ • 异常检测 │ │ • t-SNE │ │ │
│ │ │ • 图像分割 │ │ • LSA │ │ │
│ │ └───────────────┘ └───────────────┘ │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ 强化学习 (Reinforcement Learning) │ │
│ │ • 游戏AI • 机器人控制 • 自动驾驶 │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘30.1.2 机器学习工作流程
python
from dataclasses import dataclass, field
from typing import Dict, List, Any, Optional, Union
from enum import Enum
import numpy as np
import pandas as pd
class TaskType(Enum):
CLASSIFICATION = "classification"
REGRESSION = "regression"
CLUSTERING = "clustering"
DIMENSIONALITY_REDUCTION = "dimensionality_reduction"
class DataType(Enum):
NUMERICAL = "numerical"
CATEGORICAL = "categorical"
TEXT = "text"
IMAGE = "image"
TIME_SERIES = "time_series"
@dataclass
class Dataset:
X: np.ndarray
y: Optional[np.ndarray] = None
feature_names: List[str] = field(default_factory=list)
target_name: str = "target"
task_type: TaskType = TaskType.CLASSIFICATION
@property
def n_samples(self) -> int:
return self.X.shape[0]
@property
def n_features(self) -> int:
return self.X.shape[1] if len(self.X.shape) > 1 else 1
@property
def n_classes(self) -> int:
if self.y is None:
return 0
return len(np.unique(self.y))
def split(
self,
test_size: float = 0.2,
random_state: int = 42,
stratify: bool = False
) -> tuple:
from sklearn.model_selection import train_test_split
if self.y is None:
X_train, X_test = train_test_split(
self.X,
test_size=test_size,
random_state=random_state
)
return (
Dataset(X_train, feature_names=self.feature_names),
Dataset(X_test, feature_names=self.feature_names)
)
stratify_param = self.y if stratify else None
X_train, X_test, y_train, y_test = train_test_split(
self.X, self.y,
test_size=test_size,
random_state=random_state,
stratify=stratify_param
)
return (
Dataset(X_train, y_train, self.feature_names, self.target_name, self.task_type),
Dataset(X_test, y_test, self.feature_names, self.target_name, self.task_type)
)
@dataclass
class ModelMetrics:
accuracy: Optional[float] = None
precision: Optional[float] = None
recall: Optional[float] = None
f1_score: Optional[float] = None
r2_score: Optional[float] = None
mse: Optional[float] = None
mae: Optional[float] = None
custom_metrics: Dict[str, float] = field(default_factory=dict)
def to_dict(self) -> Dict[str, float]:
result = {}
for key, value in self.__dict__.items():
if key != "custom_metrics" and value is not None:
result[key] = value
result.update(self.custom_metrics)
return result
def __repr__(self) -> str:
metrics_str = ", ".join(
f"{k}={v:.4f}" for k, v in self.to_dict().items()
)
return f"ModelMetrics({metrics_str})"30.1.3 常用数据集
python
from sklearn.datasets import (
load_iris,
load_wine,
load_breast_cancer,
load_diabetes,
load_boston,
make_classification,
make_regression,
make_blobs
)
from typing import Tuple
class DatasetLoader:
@staticmethod
def load_iris_dataset() -> Dataset:
data = load_iris()
return Dataset(
X=data.data,
y=data.target,
feature_names=list(data.feature_names),
target_name="species",
task_type=TaskType.CLASSIFICATION
)
@staticmethod
def load_wine_dataset() -> Dataset:
data = load_wine()
return Dataset(
X=data.data,
y=data.target,
feature_names=list(data.feature_names),
target_name="wine_class",
task_type=TaskType.CLASSIFICATION
)
@staticmethod
def load_breast_cancer_dataset() -> Dataset:
data = load_breast_cancer()
return Dataset(
X=data.data,
y=data.target,
feature_names=list(data.feature_names),
target_name="cancer_type",
task_type=TaskType.CLASSIFICATION
)
@staticmethod
def load_diabetes_dataset() -> Dataset:
data = load_diabetes()
return Dataset(
X=data.data,
y=data.target,
feature_names=list(data.feature_names),
target_name="progression",
task_type=TaskType.REGRESSION
)
@staticmethod
def generate_classification(
n_samples: int = 1000,
n_features: int = 20,
n_classes: int = 2,
random_state: int = 42
) -> Dataset:
X, y = make_classification(
n_samples=n_samples,
n_features=n_features,
n_classes=n_classes,
random_state=random_state
)
feature_names = [f"feature_{i}" for i in range(n_features)]
return Dataset(
X=X,
y=y,
feature_names=feature_names,
task_type=TaskType.CLASSIFICATION
)
@staticmethod
def generate_regression(
n_samples: int = 1000,
n_features: int = 20,
random_state: int = 42
) -> Dataset:
X, y = make_regression(
n_samples=n_samples,
n_features=n_features,
random_state=random_state
)
feature_names = [f"feature_{i}" for i in range(n_features)]
return Dataset(
X=X,
y=y,
feature_names=feature_names,
task_type=TaskType.REGRESSION
)
@staticmethod
def generate_clustering(
n_samples: int = 1000,
n_features: int = 2,
n_clusters: int = 3,
random_state: int = 42
) -> Dataset:
X, y = make_blobs(
n_samples=n_samples,
n_features=n_features,
centers=n_clusters,
random_state=random_state
)
feature_names = [f"feature_{i}" for i in range(n_features)]
return Dataset(
X=X,
y=y,
feature_names=feature_names,
task_type=TaskType.CLUSTERING
)30.2 数据预处理
30.2.1 数据清洗
python
from sklearn.impute import SimpleImputer, KNNImputer
from sklearn.preprocessing import (
StandardScaler,
MinMaxScaler,
RobustScaler,
LabelEncoder,
OneHotEncoder,
OrdinalEncoder
)
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
import pandas as pd
import numpy as np
from typing import List, Optional, Dict, Any
class DataCleaner:
def __init__(self):
self.missing_values_summary: Dict[str, Any] = {}
def analyze_missing_values(self, df: pd.DataFrame) -> pd.DataFrame:
missing = df.isnull().sum()
percentage = (missing / len(df)) * 100
summary = pd.DataFrame({
"missing_count": missing,
"missing_percentage": percentage,
"dtype": df.dtypes
})
self.missing_values_summary = summary.to_dict()
return summary[summary["missing_count"] > 0].sort_values(
"missing_percentage", ascending=False
)
def handle_missing_values(
self,
df: pd.DataFrame,
strategy: str = "mean",
columns: Optional[List[str]] = None,
fill_value: Optional[Any] = None
) -> pd.DataFrame:
df = df.copy()
if columns is None:
columns = df.select_dtypes(include=[np.number]).columns.tolist()
if strategy == "drop":
return df.dropna(subset=columns)
if strategy == "fill" and fill_value is not None:
df[columns] = df[columns].fillna(fill_value)
return df
imputer = SimpleImputer(strategy=strategy)
df[columns] = imputer.fit_transform(df[columns])
return df
def remove_duplicates(self, df: pd.DataFrame, subset: Optional[List[str]] = None) -> pd.DataFrame:
return df.drop_duplicates(subset=subset, keep="first")
def remove_outliers(
self,
df: pd.DataFrame,
columns: Optional[List[str]] = None,
method: str = "iqr",
threshold: float = 1.5
) -> pd.DataFrame:
df = df.copy()
if columns is None:
columns = df.select_dtypes(include=[np.number]).columns.tolist()
for col in columns:
if method == "iqr":
Q1 = df[col].quantile(0.25)
Q3 = df[col].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - threshold * IQR
upper_bound = Q3 + threshold * IQR
df = df[(df[col] >= lower_bound) & (df[col] <= upper_bound)]
elif method == "zscore":
z_scores = np.abs((df[col] - df[col].mean()) / df[col].std())
df = df[z_scores <= threshold]
return df
class FeatureEncoder:
def __init__(self):
self.label_encoders: Dict[str, LabelEncoder] = {}
self.one_hot_encoder: Optional[OneHotEncoder] = None
self.ordinal_encoder: Optional[OrdinalEncoder] = None
def label_encode(
self,
df: pd.DataFrame,
columns: List[str]
) -> pd.DataFrame:
df = df.copy()
for col in columns:
le = LabelEncoder()
df[col] = le.fit_transform(df[col].astype(str))
self.label_encoders[col] = le
return df
def one_hot_encode(
self,
df: pd.DataFrame,
columns: List[str],
drop: str = "first"
) -> pd.DataFrame:
df = df.copy()
self.one_hot_encoder = OneHotEncoder(
sparse_output=False,
drop=drop,
handle_unknown="ignore"
)
encoded = self.one_hot_encoder.fit_transform(df[columns])
encoded_df = pd.DataFrame(
encoded,
columns=self.one_hot_encoder.get_feature_names_out(columns),
index=df.index
)
df = df.drop(columns=columns)
df = pd.concat([df, encoded_df], axis=1)
return df
def ordinal_encode(
self,
df: pd.DataFrame,
columns: List[str],
categories: Optional[Dict[str, List[str]]] = None
) -> pd.DataFrame:
df = df.copy()
if categories:
cat_list = [categories.get(col, "auto") for col in columns]
else:
cat_list = "auto"
self.ordinal_encoder = OrdinalEncoder(categories=cat_list)
df[columns] = self.ordinal_encoder.fit_transform(df[columns])
return df
class FeatureScaler:
def __init__(self):
self.scalers: Dict[str, Any] = {}
def standard_scale(
self,
X: np.ndarray,
feature_names: Optional[List[str]] = None
) -> np.ndarray:
scaler = StandardScaler()
scaled = scaler.fit_transform(X)
self.scalers["standard"] = scaler
return scaled
def minmax_scale(
self,
X: np.ndarray,
feature_range: Tuple[float, float] = (0, 1)
) -> np.ndarray:
scaler = MinMaxScaler(feature_range=feature_range)
scaled = scaler.fit_transform(X)
self.scalers["minmax"] = scaler
return scaled
def robust_scale(self, X: np.ndarray) -> np.ndarray:
scaler = RobustScaler()
scaled = scaler.fit_transform(X)
self.scalers["robust"] = scaler
return scaled
def log_transform(self, X: np.ndarray) -> np.ndarray:
return np.log1p(X)
def box_cox_transform(self, X: np.ndarray) -> np.ndarray:
from scipy import stats
transformed = np.zeros_like(X)
for i in range(X.shape[1]):
transformed[:, i], _ = stats.boxcox(X[:, i] + 1)
return transformed30.2.2 特征工程
python
from sklearn.feature_selection import (
SelectKBest,
f_classif,
f_regression,
mutual_info_classif,
RFE
)
from sklearn.decomposition import PCA
from sklearn.preprocessing import PolynomialFeatures
import itertools
class FeatureEngineer:
def __init__(self):
self.poly_transformer: Optional[PolynomialFeatures] = None
self.pca: Optional[PCA] = None
def create_polynomial_features(
self,
X: np.ndarray,
degree: int = 2,
interaction_only: bool = False,
include_bias: bool = False
) -> np.ndarray:
self.poly_transformer = PolynomialFeatures(
degree=degree,
interaction_only=interaction_only,
include_bias=include_bias
)
return self.poly_transformer.fit_transform(X)
def create_interaction_features(
self,
df: pd.DataFrame,
columns: List[str]
) -> pd.DataFrame:
df = df.copy()
for col1, col2 in itertools.combinations(columns, 2):
df[f"{col1}_x_{col2}"] = df[col1] * df[col2]
return df
def create_ratio_features(
self,
df: pd.DataFrame,
numerator: str,
denominator: str,
new_name: Optional[str] = None
) -> pd.DataFrame:
df = df.copy()
name = new_name or f"{numerator}_div_{denominator}"
df[name] = df[numerator] / (df[denominator] + 1e-10)
return df
def bin_numerical_feature(
self,
df: pd.DataFrame,
column: str,
bins: int = 5,
labels: Optional[List[str]] = None
) -> pd.DataFrame:
df = df.copy()
new_col = f"{column}_binned"
df[new_col] = pd.cut(df[column], bins=bins, labels=labels)
return df
def apply_pca(
self,
X: np.ndarray,
n_components: Union[int, float] = 0.95
) -> np.ndarray:
self.pca = PCA(n_components=n_components)
return self.pca.fit_transform(X)
def select_features(
self,
X: np.ndarray,
y: np.ndarray,
k: int = 10,
task_type: TaskType = TaskType.CLASSIFICATION
) -> Tuple[np.ndarray, np.ndarray]:
score_func = f_classif if task_type == TaskType.CLASSIFICATION else f_regression
selector = SelectKBest(score_func=score_func, k=k)
X_selected = selector.fit_transform(X, y)
return X_selected, selector.get_support_indices()
class DateTimeFeatures:
@staticmethod
def extract_features(
df: pd.DataFrame,
column: str,
drop_original: bool = True
) -> pd.DataFrame:
df = df.copy()
dt_col = pd.to_datetime(df[column])
df[f"{column}_year"] = dt_col.dt.year
df[f"{column}_month"] = dt_col.dt.month
df[f"{column}_day"] = dt_col.dt.day
df[f"{column}_dayofweek"] = dt_col.dt.dayofweek
df[f"{column}_dayofyear"] = dt_col.dt.dayofyear
df[f"{column}_weekofyear"] = dt_col.dt.isocalendar().week
df[f"{column}_quarter"] = dt_col.dt.quarter
df[f"{column}_hour"] = dt_col.dt.hour
df[f"{column}_minute"] = dt_col.dt.minute
df[f"{column}_is_weekend"] = (dt_col.dt.dayofweek >= 5).astype(int)
if drop_original:
df = df.drop(columns=[column])
return df
class TextFeatures:
@staticmethod
def extract_basic_features(
df: pd.DataFrame,
column: str,
drop_original: bool = True
) -> pd.DataFrame:
df = df.copy()
df[f"{column}_length"] = df[column].str.len()
df[f"{column}_word_count"] = df[column].str.split().str.len()
df[f"{column}_char_count"] = df[column].str.len()
df[f"{column}_avg_word_length"] = (
df[f"{column}_char_count"] / (df[f"{column}_word_count"] + 1)
)
if drop_original:
df = df.drop(columns=[column])
return df30.3 分类算法
30.3.1 决策树
python
from sklearn.tree import DecisionTreeClassifier, DecisionTreeRegressor
from sklearn.ensemble import (
RandomForestClassifier,
RandomForestRegressor,
GradientBoostingClassifier,
GradientBoostingRegressor,
AdaBoostClassifier
)
from sklearn.svm import SVC, SVR
from sklearn.neighbors import KNeighborsClassifier, KNeighborsRegressor
from sklearn.linear_model import LogisticRegression, LinearRegression, Ridge, Lasso
from sklearn.naive_bayes import GaussianNB, MultinomialNB
from sklearn.neural_network import MLPClassifier
import matplotlib.pyplot as plt
from typing import Tuple, Optional
class DecisionTreeModel:
def __init__(
self,
max_depth: Optional[int] = None,
min_samples_split: int = 2,
min_samples_leaf: int = 1,
criterion: str = "gini",
random_state: int = 42
):
self.model = DecisionTreeClassifier(
max_depth=max_depth,
min_samples_split=min_samples_split,
min_samples_leaf=min_samples_leaf,
criterion=criterion,
random_state=random_state
)
self.feature_importances_: Optional[np.ndarray] = None
def fit(self, X: np.ndarray, y: np.ndarray) -> "DecisionTreeModel":
self.model.fit(X, y)
self.feature_importances_ = self.model.feature_importances_
return self
def predict(self, X: np.ndarray) -> np.ndarray:
return self.model.predict(X)
def predict_proba(self, X: np.ndarray) -> np.ndarray:
return self.model.predict_proba(X)
def get_feature_importance(
self,
feature_names: List[str],
top_n: int = 10
) -> pd.DataFrame:
importance_df = pd.DataFrame({
"feature": feature_names,
"importance": self.feature_importances_
})
return importance_df.nlargest(top_n, "importance")
def plot_feature_importance(
self,
feature_names: List[str],
top_n: int = 10,
figsize: Tuple[int, int] = (10, 6)
) -> None:
importance_df = self.get_feature_importance(feature_names, top_n)
plt.figure(figsize=figsize)
plt.barh(importance_df["feature"], importance_df["importance"])
plt.xlabel("Feature Importance")
plt.ylabel("Feature")
plt.title("Feature Importance (Decision Tree)")
plt.tight_layout()
plt.show()
class RandomForestModel:
def __init__(
self,
n_estimators: int = 100,
max_depth: Optional[int] = None,
min_samples_split: int = 2,
min_samples_leaf: int = 1,
max_features: str = "sqrt",
random_state: int = 42,
n_jobs: int = -1
):
self.model = RandomForestClassifier(
n_estimators=n_estimators,
max_depth=max_depth,
min_samples_split=min_samples_split,
min_samples_leaf=min_samples_leaf,
max_features=max_features,
random_state=random_state,
n_jobs=n_jobs
)
self.feature_importances_: Optional[np.ndarray] = None
def fit(self, X: np.ndarray, y: np.ndarray) -> "RandomForestModel":
self.model.fit(X, y)
self.feature_importances_ = self.model.feature_importances_
return self
def predict(self, X: np.ndarray) -> np.ndarray:
return self.model.predict(X)
def predict_proba(self, X: np.ndarray) -> np.ndarray:
return self.model.predict_proba(X)
def get_oob_score(self) -> float:
return self.model.oob_score_ if hasattr(self.model, "oob_score_") else None
class GradientBoostingModel:
def __init__(
self,
n_estimators: int = 100,
learning_rate: float = 0.1,
max_depth: int = 3,
min_samples_split: int = 2,
min_samples_leaf: int = 1,
subsample: float = 1.0,
random_state: int = 42
):
self.model = GradientBoostingClassifier(
n_estimators=n_estimators,
learning_rate=learning_rate,
max_depth=max_depth,
min_samples_split=min_samples_split,
min_samples_leaf=min_samples_leaf,
subsample=subsample,
random_state=random_state
)
def fit(self, X: np.ndarray, y: np.ndarray) -> "GradientBoostingModel":
self.model.fit(X, y)
return self
def predict(self, X: np.ndarray) -> np.ndarray:
return self.model.predict(X)
def predict_proba(self, X: np.ndarray) -> np.ndarray:
return self.model.predict_proba(X)
def get_feature_importance(self, feature_names: List[str]) -> pd.DataFrame:
return pd.DataFrame({
"feature": feature_names,
"importance": self.model.feature_importances_
}).sort_values("importance", ascending=False)30.3.2 支持向量机
python
class SVMModel:
def __init__(
self,
kernel: str = "rbf",
C: float = 1.0,
gamma: str = "scale",
degree: int = 3,
random_state: int = 42
):
self.model = SVC(
kernel=kernel,
C=C,
gamma=gamma,
degree=degree,
probability=True,
random_state=random_state
)
def fit(self, X: np.ndarray, y: np.ndarray) -> "SVMModel":
self.model.fit(X, y)
return self
def predict(self, X: np.ndarray) -> np.ndarray:
return self.model.predict(X)
def predict_proba(self, X: np.ndarray) -> np.ndarray:
return self.model.predict_proba(X)
def get_support_vectors(self) -> np.ndarray:
return self.model.support_vectors_
class KNNModel:
def __init__(
self,
n_neighbors: int = 5,
weights: str = "uniform",
algorithm: str = "auto",
metric: str = "minkowski",
p: int = 2
):
self.model = KNeighborsClassifier(
n_neighbors=n_neighbors,
weights=weights,
algorithm=algorithm,
metric=metric,
p=p
)
def fit(self, X: np.ndarray, y: np.ndarray) -> "KNNModel":
self.model.fit(X, y)
return self
def predict(self, X: np.ndarray) -> np.ndarray:
return self.model.predict(X)
def predict_proba(self, X: np.ndarray) -> np.ndarray:
return self.model.predict_proba(X)
def kneighbors(self, X: np.ndarray, n_neighbors: int = None) -> Tuple[np.ndarray, np.ndarray]:
return self.model.kneighbors(X, n_neighbors=n_neighbors)
class LogisticRegressionModel:
def __init__(
self,
penalty: str = "l2",
C: float = 1.0,
solver: str = "lbfgs",
max_iter: int = 1000,
random_state: int = 42
):
self.model = LogisticRegression(
penalty=penalty,
C=C,
solver=solver,
max_iter=max_iter,
random_state=random_state
)
def fit(self, X: np.ndarray, y: np.ndarray) -> "LogisticRegressionModel":
self.model.fit(X, y)
return self
def predict(self, X: np.ndarray) -> np.ndarray:
return self.model.predict(X)
def predict_proba(self, X: np.ndarray) -> np.ndarray:
return self.model.predict_proba(X)
def get_coefficients(self, feature_names: List[str]) -> pd.DataFrame:
return pd.DataFrame({
"feature": feature_names,
"coefficient": self.model.coef_[0]
}).sort_values("coefficient", key=abs, ascending=False)30.3.3 朴素贝叶斯
python
class NaiveBayesModel:
def __init__(self, model_type: str = "gaussian"):
if model_type == "gaussian":
self.model = GaussianNB()
elif model_type == "multinomial":
self.model = MultinomialNB()
else:
raise ValueError(f"Unknown model type: {model_type}")
def fit(self, X: np.ndarray, y: np.ndarray) -> "NaiveBayesModel":
self.model.fit(X, y)
return self
def predict(self, X: np.ndarray) -> np.ndarray:
return self.model.predict(X)
def predict_proba(self, X: np.ndarray) -> np.ndarray:
return self.model.predict_proba(X)
class MultiLayerPerceptron:
def __init__(
self,
hidden_layer_sizes: Tuple[int, ...] = (100,),
activation: str = "relu",
solver: str = "adam",
alpha: float = 0.0001,
learning_rate: str = "constant",
learning_rate_init: float = 0.001,
max_iter: int = 200,
random_state: int = 42
):
self.model = MLPClassifier(
hidden_layer_sizes=hidden_layer_sizes,
activation=activation,
solver=solver,
alpha=alpha,
learning_rate=learning_rate,
learning_rate_init=learning_rate_init,
max_iter=max_iter,
random_state=random_state
)
def fit(self, X: np.ndarray, y: np.ndarray) -> "MultiLayerPerceptron":
self.model.fit(X, y)
return self
def predict(self, X: np.ndarray) -> np.ndarray:
return self.model.predict(X)
def predict_proba(self, X: np.ndarray) -> np.ndarray:
return self.model.predict_proba(X)
def get_loss_curve(self) -> List[float]:
return self.model.loss_curve_30.4 回归算法
30.4.1 线性回归
python
from sklearn.linear_model import (
LinearRegression,
Ridge,
Lasso,
ElasticNet,
BayesianRidge
)
from sklearn.preprocessing import PolynomialFeatures
from sklearn.pipeline import Pipeline
class LinearRegressionModel:
def __init__(self, fit_intercept: bool = True):
self.model = LinearRegression(fit_intercept=fit_intercept)
self.coefficients_: Optional[np.ndarray] = None
self.intercept_: Optional[float] = None
def fit(self, X: np.ndarray, y: np.ndarray) -> "LinearRegressionModel":
self.model.fit(X, y)
self.coefficients_ = self.model.coef_
self.intercept_ = self.model.intercept_
return self
def predict(self, X: np.ndarray) -> np.ndarray:
return self.model.predict(X)
def get_equation(self, feature_names: List[str]) -> str:
terms = []
for name, coef in zip(feature_names, self.coefficients_):
if coef != 0:
terms.append(f"{coef:.4f} * {name}")
equation = " + ".join(terms)
if self.intercept_ != 0:
equation += f" + {self.intercept_:.4f}"
return f"y = {equation}"
def get_coefficients_df(self, feature_names: List[str]) -> pd.DataFrame:
return pd.DataFrame({
"feature": ["intercept"] + list(feature_names),
"coefficient": [self.intercept_] + list(self.coefficients_)
})
class RidgeRegression:
def __init__(
self,
alpha: float = 1.0,
fit_intercept: bool = True,
solver: str = "auto"
):
self.model = Ridge(
alpha=alpha,
fit_intercept=fit_intercept,
solver=solver
)
def fit(self, X: np.ndarray, y: np.ndarray) -> "RidgeRegression":
self.model.fit(X, y)
return self
def predict(self, X: np.ndarray) -> np.ndarray:
return self.model.predict(X)
class LassoRegression:
def __init__(
self,
alpha: float = 1.0,
fit_intercept: bool = True,
max_iter: int = 1000
):
self.model = Lasso(
alpha=alpha,
fit_intercept=fit_intercept,
max_iter=max_iter
)
def fit(self, X: np.ndarray, y: np.ndarray) -> "LassoRegression":
self.model.fit(X, y)
return self
def predict(self, X: np.ndarray) -> np.ndarray:
return self.model.predict(X)
def get_selected_features(self, feature_names: List[str]) -> List[str]:
return [
name for name, coef in zip(feature_names, self.model.coef_)
if coef != 0
]
class PolynomialRegression:
def __init__(self, degree: int = 2, include_bias: bool = True):
self.degree = degree
self.pipeline = Pipeline([
("poly", PolynomialFeatures(degree=degree, include_bias=include_bias)),
("linear", LinearRegression())
])
def fit(self, X: np.ndarray, y: np.ndarray) -> "PolynomialRegression":
self.pipeline.fit(X, y)
return self
def predict(self, X: np.ndarray) -> np.ndarray:
return self.pipeline.predict(X)
def get_polynomial_features(self) -> List[str]:
return self.pipeline.named_steps["poly"].get_feature_names_out()30.4.2 决策树回归
python
class DecisionTreeRegressorModel:
def __init__(
self,
max_depth: Optional[int] = None,
min_samples_split: int = 2,
min_samples_leaf: int = 1,
random_state: int = 42
):
self.model = DecisionTreeRegressor(
max_depth=max_depth,
min_samples_split=min_samples_split,
min_samples_leaf=min_samples_leaf,
random_state=random_state
)
def fit(self, X: np.ndarray, y: np.ndarray) -> "DecisionTreeRegressorModel":
self.model.fit(X, y)
return self
def predict(self, X: np.ndarray) -> np.ndarray:
return self.model.predict(X)
class RandomForestRegressorModel:
def __init__(
self,
n_estimators: int = 100,
max_depth: Optional[int] = None,
min_samples_split: int = 2,
min_samples_leaf: int = 1,
random_state: int = 42,
n_jobs: int = -1
):
self.model = RandomForestRegressor(
n_estimators=n_estimators,
max_depth=max_depth,
min_samples_split=min_samples_split,
min_samples_leaf=min_samples_leaf,
random_state=random_state,
n_jobs=n_jobs
)
def fit(self, X: np.ndarray, y: np.ndarray) -> "RandomForestRegressorModel":
self.model.fit(X, y)
return self
def predict(self, X: np.ndarray) -> np.ndarray:
return self.model.predict(X)
class GradientBoostingRegressorModel:
def __init__(
self,
n_estimators: int = 100,
learning_rate: float = 0.1,
max_depth: int = 3,
min_samples_split: int = 2,
min_samples_leaf: int = 1,
subsample: float = 1.0,
random_state: int = 42
):
self.model = GradientBoostingRegressor(
n_estimators=n_estimators,
learning_rate=learning_rate,
max_depth=max_depth,
min_samples_split=min_samples_split,
min_samples_leaf=min_samples_leaf,
subsample=subsample,
random_state=random_state
)
def fit(self, X: np.ndarray, y: np.ndarray) -> "GradientBoostingRegressorModel":
self.model.fit(X, y)
return self
def predict(self, X: np.ndarray) -> np.ndarray:
return self.model.predict(X)
def get_training_score(self) -> List[float]:
return list(self.model.train_score_)30.5 聚类算法
30.5.1 K-Means聚类
python
from sklearn.cluster import KMeans, AgglomerativeClustering, DBSCAN
from sklearn.metrics import silhouette_score, calinski_harabasz_score, davies_bouldin_score
from scipy.cluster.hierarchy import dendrogram, linkage
import matplotlib.pyplot as plt
class KMeansClustering:
def __init__(
self,
n_clusters: int = 8,
init: str = "k-means++",
n_init: int = 10,
max_iter: int = 300,
random_state: int = 42
):
self.model = KMeans(
n_clusters=n_clusters,
init=init,
n_init=n_init,
max_iter=max_iter,
random_state=random_state
)
self.cluster_centers_: Optional[np.ndarray] = None
self.labels_: Optional[np.ndarray] = None
self.inertia_: Optional[float] = None
def fit(self, X: np.ndarray) -> "KMeansClustering":
self.model.fit(X)
self.cluster_centers_ = self.model.cluster_centers_
self.labels_ = self.model.labels_
self.inertia_ = self.model.inertia_
return self
def predict(self, X: np.ndarray) -> np.ndarray:
return self.model.predict(X)
def fit_predict(self, X: np.ndarray) -> np.ndarray:
self.fit(X)
return self.labels_
def find_optimal_k(
self,
X: np.ndarray,
k_range: range = range(2, 11),
method: str = "elbow"
) -> Tuple[List[int], List[float]]:
scores = []
k_values = list(k_range)
for k in k_values:
kmeans = KMeans(n_clusters=k, random_state=42)
kmeans.fit(X)
if method == "elbow":
scores.append(kmeans.inertia_)
elif method == "silhouette":
scores.append(silhouette_score(X, kmeans.labels_))
return k_values, scores
def plot_elbow_curve(
self,
X: np.ndarray,
k_range: range = range(2, 11),
figsize: Tuple[int, int] = (10, 6)
) -> None:
k_values, inertias = self.find_optimal_k(X, k_range, method="elbow")
plt.figure(figsize=figsize)
plt.plot(k_values, inertias, "bo-")
plt.xlabel("Number of Clusters (k)")
plt.ylabel("Inertia")
plt.title("Elbow Method for Optimal k")
plt.grid(True)
plt.show()
def plot_silhouette_curve(
self,
X: np.ndarray,
k_range: range = range(2, 11),
figsize: Tuple[int, int] = (10, 6)
) -> None:
k_values, scores = self.find_optimal_k(X, k_range, method="silhouette")
plt.figure(figsize=figsize)
plt.plot(k_values, scores, "ro-")
plt.xlabel("Number of Clusters (k)")
plt.ylabel("Silhouette Score")
plt.title("Silhouette Score for Optimal k")
plt.grid(True)
plt.show()
class HierarchicalClustering:
def __init__(
self,
n_clusters: int = 2,
linkage: str = "ward",
affinity: str = "euclidean"
):
self.model = AgglomerativeClustering(
n_clusters=n_clusters,
linkage=linkage,
affinity=affinity if linkage != "ward" else "euclidean"
)
self.labels_: Optional[np.ndarray] = None
def fit(self, X: np.ndarray) -> "HierarchicalClustering":
self.model.fit(X)
self.labels_ = self.model.labels_
return self
def fit_predict(self, X: np.ndarray) -> np.ndarray:
self.fit(X)
return self.labels_
def plot_dendrogram(
self,
X: np.ndarray,
method: str = "ward",
figsize: Tuple[int, int] = (12, 8)
) -> None:
linked = linkage(X, method=method)
plt.figure(figsize=figsize)
dendrogram(linked)
plt.title("Hierarchical Clustering Dendrogram")
plt.xlabel("Sample Index")
plt.ylabel("Distance")
plt.show()
class DBSCANClustering:
def __init__(
self,
eps: float = 0.5,
min_samples: int = 5,
metric: str = "euclidean"
):
self.model = DBSCAN(
eps=eps,
min_samples=min_samples,
metric=metric
)
self.labels_: Optional[np.ndarray] = None
self.core_sample_indices_: Optional[np.ndarray] = None
def fit(self, X: np.ndarray) -> "DBSCANClustering":
self.model.fit(X)
self.labels_ = self.model.labels_
self.core_sample_indices_ = self.model.core_sample_indices_
return self
def fit_predict(self, X: np.ndarray) -> np.ndarray:
self.fit(X)
return self.labels_
def get_cluster_stats(self) -> Dict[str, Any]:
unique_labels = set(self.labels_)
n_clusters = len(unique_labels) - (1 if -1 in self.labels_ else 0)
n_noise = list(self.labels_).count(-1)
return {
"n_clusters": n_clusters,
"n_noise_points": n_noise,
"noise_percentage": n_noise / len(self.labels_) * 100
}30.6 模型评估
30.6.1 分类评估指标
python
from sklearn.metrics import (
accuracy_score,
precision_score,
recall_score,
f1_score,
confusion_matrix,
classification_report,
roc_curve,
auc,
roc_auc_score,
precision_recall_curve,
average_precision_score
)
from sklearn.model_selection import cross_val_score, cross_validate
import seaborn as sns
class ClassificationEvaluator:
def __init__(self, y_true: np.ndarray, y_pred: np.ndarray, y_proba: Optional[np.ndarray] = None):
self.y_true = y_true
self.y_pred = y_pred
self.y_proba = y_proba
self._metrics: Optional[ModelMetrics] = None
def compute_metrics(self, average: str = "weighted") -> ModelMetrics:
self._metrics = ModelMetrics(
accuracy=accuracy_score(self.y_true, self.y_pred),
precision=precision_score(self.y_true, self.y_pred, average=average),
recall=recall_score(self.y_true, self.y_pred, average=average),
f1_score=f1_score(self.y_true, self.y_pred, average=average)
)
return self._metrics
def get_confusion_matrix(self) -> np.ndarray:
return confusion_matrix(self.y_true, self.y_pred)
def plot_confusion_matrix(
self,
labels: Optional[List[str]] = None,
figsize: Tuple[int, int] = (8, 6),
cmap: str = "Blues"
) -> None:
cm = self.get_confusion_matrix()
plt.figure(figsize=figsize)
sns.heatmap(
cm,
annot=True,
fmt="d",
cmap=cmap,
xticklabels=labels,
yticklabels=labels
)
plt.xlabel("Predicted")
plt.ylabel("Actual")
plt.title("Confusion Matrix")
plt.tight_layout()
plt.show()
def get_classification_report(self, target_names: Optional[List[str]] = None) -> str:
return classification_report(self.y_true, self.y_pred, target_names=target_names)
def plot_roc_curve(self, figsize: Tuple[int, int] = (8, 6)) -> None:
if self.y_proba is None:
raise ValueError("Probability predictions required for ROC curve")
if len(self.y_proba.shape) > 1 and self.y_proba.shape[1] > 2:
raise ValueError("ROC curve for multi-class requires one-vs-rest approach")
if len(self.y_proba.shape) > 1:
y_score = self.y_proba[:, 1]
else:
y_score = self.y_proba
fpr, tpr, _ = roc_curve(self.y_true, y_score)
roc_auc = auc(fpr, tpr)
plt.figure(figsize=figsize)
plt.plot(fpr, tpr, color="darkorange", lw=2, label=f"ROC curve (AUC = {roc_auc:.2f})")
plt.plot([0, 1], [0, 1], color="navy", lw=2, linestyle="--")
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.title("Receiver Operating Characteristic (ROC) Curve")
plt.legend(loc="lower right")
plt.grid(True)
plt.show()
def plot_precision_recall_curve(self, figsize: Tuple[int, int] = (8, 6)) -> None:
if self.y_proba is None:
raise ValueError("Probability predictions required for PR curve")
if len(self.y_proba.shape) > 1:
y_score = self.y_proba[:, 1]
else:
y_score = self.y_proba
precision, recall, _ = precision_recall_curve(self.y_true, y_score)
ap = average_precision_score(self.y_true, y_score)
plt.figure(figsize=figsize)
plt.plot(recall, precision, color="blue", lw=2, label=f"PR curve (AP = {ap:.2f})")
plt.xlabel("Recall")
plt.ylabel("Precision")
plt.title("Precision-Recall Curve")
plt.legend(loc="lower left")
plt.grid(True)
plt.show()
class RegressionEvaluator:
def __init__(self, y_true: np.ndarray, y_pred: np.ndarray):
self.y_true = y_true
self.y_pred = y_pred
self._metrics: Optional[ModelMetrics] = None
def compute_metrics(self) -> ModelMetrics:
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
self._metrics = ModelMetrics(
r2_score=r2_score(self.y_true, self.y_pred),
mse=mean_squared_error(self.y_true, self.y_pred),
mae=mean_absolute_error(self.y_true, self.y_pred)
)
self._metrics.custom_metrics["rmse"] = np.sqrt(self._metrics.mse)
return self._metrics
def plot_residuals(self, figsize: Tuple[int, int] = (10, 4)) -> None:
residuals = self.y_true - self.y_pred
fig, axes = plt.subplots(1, 2, figsize=figsize)
axes[0].scatter(self.y_pred, residuals, alpha=0.5)
axes[0].axhline(y=0, color="r", linestyle="--")
axes[0].set_xlabel("Predicted Values")
axes[0].set_ylabel("Residuals")
axes[0].set_title("Residuals vs Predicted")
axes[1].hist(residuals, bins=30, edgecolor="black")
axes[1].set_xlabel("Residuals")
axes[1].set_ylabel("Frequency")
axes[1].set_title("Distribution of Residuals")
plt.tight_layout()
plt.show()
def plot_predictions(self, figsize: Tuple[int, int] = (8, 8)) -> None:
plt.figure(figsize=figsize)
plt.scatter(self.y_true, self.y_pred, alpha=0.5)
min_val = min(self.y_true.min(), self.y_pred.min())
max_val = max(self.y_true.max(), self.y_pred.max())
plt.plot([min_val, max_val], [min_val, max_val], "r--", lw=2)
plt.xlabel("Actual Values")
plt.ylabel("Predicted Values")
plt.title("Actual vs Predicted Values")
plt.grid(True)
plt.show()
class ClusteringEvaluator:
def __init__(self, X: np.ndarray, labels: np.ndarray):
self.X = X
self.labels = labels
def compute_metrics(self) -> Dict[str, float]:
return {
"silhouette_score": silhouette_score(self.X, self.labels),
"calinski_harabasz_score": calinski_harabasz_score(self.X, self.labels),
"davies_bouldin_score": davies_bouldin_score(self.X, self.labels)
}
def plot_clusters(self, figsize: Tuple[int, int] = (10, 8)) -> None:
if self.X.shape[1] > 2:
from sklearn.decomposition import PCA
pca = PCA(n_components=2)
X_2d = pca.fit_transform(self.X)
else:
X_2d = self.X
plt.figure(figsize=figsize)
scatter = plt.scatter(X_2d[:, 0], X_2d[:, 1], c=self.labels, cmap="viridis", alpha=0.6)
plt.colorbar(scatter, label="Cluster")
plt.xlabel("Component 1")
plt.ylabel("Component 2")
plt.title("Cluster Visualization")
plt.show()30.6.2 交叉验证
python
from sklearn.model_selection import (
KFold,
StratifiedKFold,
LeaveOneOut,
cross_val_predict
)
class CrossValidator:
def __init__(
self,
cv: int = 5,
stratified: bool = True,
shuffle: bool = True,
random_state: int = 42
):
self.cv = cv
self.stratified = stratified
self.shuffle = shuffle
self.random_state = random_state
self._cv_splitter = None
def get_splitter(self) -> Union[KFold, StratifiedKFold]:
if self.stratified:
return StratifiedKFold(
n_splits=self.cv,
shuffle=self.shuffle,
random_state=self.random_state
)
return KFold(
n_splits=self.cv,
shuffle=self.shuffle,
random_state=self.random_state
)
def cross_validate_model(
self,
model,
X: np.ndarray,
y: np.ndarray,
scoring: List[str] = None
) -> Dict[str, Any]:
if scoring is None:
scoring = ["accuracy", "precision_weighted", "recall_weighted", "f1_weighted"]
cv_splitter = self.get_splitter()
results = cross_validate(
model,
X,
y,
cv=cv_splitter,
scoring=scoring,
return_train_score=True
)
summary = {}
for metric in scoring:
test_key = f"test_{metric}"
train_key = f"train_{metric}"
if test_key in results:
summary[metric] = {
"test_mean": results[test_key].mean(),
"test_std": results[test_key].std(),
"train_mean": results[train_key].mean(),
"train_std": results[train_key].std()
}
return summary
def get_cross_val_predictions(
self,
model,
X: np.ndarray,
y: np.ndarray
) -> np.ndarray:
cv_splitter = self.get_splitter()
return cross_val_predict(model, X, y, cv=cv_splitter)30.7 模型调优
30.7.1 网格搜索
python
from sklearn.model_selection import GridSearchCV, RandomizedSearchCV
from scipy.stats import uniform, randint
class HyperparameterTuner:
def __init__(
self,
model,
param_grid: Dict[str, Any],
cv: int = 5,
scoring: str = "accuracy",
n_jobs: int = -1,
verbose: int = 1
):
self.model = model
self.param_grid = param_grid
self.cv = cv
self.scoring = scoring
self.n_jobs = n_jobs
self.verbose = verbose
self.best_model = None
self.best_params = None
self.best_score = None
def grid_search(
self,
X: np.ndarray,
y: np.ndarray
) -> Any:
grid_search = GridSearchCV(
self.model,
self.param_grid,
cv=self.cv,
scoring=self.scoring,
n_jobs=self.n_jobs,
verbose=self.verbose,
return_train_score=True
)
grid_search.fit(X, y)
self.best_model = grid_search.best_estimator_
self.best_params = grid_search.best_params_
self.best_score = grid_search.best_score_
return self.best_model
def random_search(
self,
X: np.ndarray,
y: np.ndarray,
n_iter: int = 50
) -> Any:
random_search = RandomizedSearchCV(
self.model,
self.param_grid,
n_iter=n_iter,
cv=self.cv,
scoring=self.scoring,
n_jobs=self.n_jobs,
verbose=self.verbose,
random_state=42,
return_train_score=True
)
random_search.fit(X, y)
self.best_model = random_search.best_estimator_
self.best_params = random_search.best_params_
self.best_score = random_search.best_score_
return self.best_model
def get_results_df(self) -> pd.DataFrame:
if self.best_model is None:
raise ValueError("No search has been performed yet")
return pd.DataFrame(self.best_model.cv_results_)
class RandomForestTuner:
PARAM_GRID = {
"n_estimators": [50, 100, 200],
"max_depth": [None, 10, 20, 30],
"min_samples_split": [2, 5, 10],
"min_samples_leaf": [1, 2, 4],
"max_features": ["sqrt", "log2"]
}
RANDOM_PARAM_GRID = {
"n_estimators": randint(50, 300),
"max_depth": randint(5, 50),
"min_samples_split": randint(2, 20),
"min_samples_leaf": randint(1, 10),
"max_features": ["sqrt", "log2", None]
}
@classmethod
def tune(
cls,
X: np.ndarray,
y: np.ndarray,
method: str = "grid",
n_iter: int = 50
) -> Tuple[Any, Dict[str, Any]]:
from sklearn.ensemble import RandomForestClassifier
param_grid = cls.PARAM_GRID if method == "grid" else cls.RANDOM_PARAM_GRID
tuner = HyperparameterTuner(
RandomForestClassifier(random_state=42),
param_grid
)
if method == "grid":
best_model = tuner.grid_search(X, y)
else:
best_model = tuner.random_search(X, y, n_iter=n_iter)
return best_model, tuner.best_params
class SVMTuner:
PARAM_GRID = {
"C": [0.1, 1, 10, 100],
"kernel": ["rbf", "linear", "poly"],
"gamma": ["scale", "auto", 0.1, 0.01]
}
RANDOM_PARAM_GRID = {
"C": uniform(0.1, 100),
"kernel": ["rbf", "linear", "poly"],
"gamma": uniform(0.001, 1)
}
@classmethod
def tune(
cls,
X: np.ndarray,
y: np.ndarray,
method: str = "random",
n_iter: int = 30
) -> Tuple[Any, Dict[str, Any]]:
param_grid = cls.PARAM_GRID if method == "grid" else cls.RANDOM_PARAM_GRID
tuner = HyperparameterTuner(
SVC(random_state=42),
param_grid
)
if method == "grid":
best_model = tuner.grid_search(X, y)
else:
best_model = tuner.random_search(X, y, n_iter=n_iter)
return best_model, tuner.best_params30.8 机器学习流水线
30.8.1 Scikit-learn Pipeline
python
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.feature_selection import SelectKBest, f_classif
import joblib
from pathlib import Path
class MLPipeline:
def __init__(
self,
numerical_features: List[str],
categorical_features: List[str],
model,
feature_selector: Optional[Any] = None
):
self.numerical_features = numerical_features
self.categorical_features = categorical_features
self.model = model
self.feature_selector = feature_selector
self.pipeline: Optional[Pipeline] = None
def build_pipeline(self) -> Pipeline:
numerical_transformer = Pipeline(steps=[
("imputer", SimpleImputer(strategy="median")),
("scaler", StandardScaler())
])
categorical_transformer = Pipeline(steps=[
("imputer", SimpleImputer(strategy="most_frequent")),
("onehot", OneHotEncoder(handle_unknown="ignore", sparse_output=False))
])
preprocessor = ColumnTransformer(
transformers=[
("num", numerical_transformer, self.numerical_features),
("cat", categorical_transformer, self.categorical_features)
]
)
steps = [("preprocessor", preprocessor)]
if self.feature_selector:
steps.append(("selector", self.feature_selector))
steps.append(("model", self.model))
self.pipeline = Pipeline(steps=steps)
return self.pipeline
def fit(self, X: pd.DataFrame, y: np.ndarray) -> "MLPipeline":
if self.pipeline is None:
self.build_pipeline()
self.pipeline.fit(X, y)
return self
def predict(self, X: pd.DataFrame) -> np.ndarray:
return self.pipeline.predict(X)
def predict_proba(self, X: pd.DataFrame) -> np.ndarray:
return self.pipeline.predict_proba(X)
def save(self, filepath: str) -> None:
joblib.dump(self.pipeline, filepath)
@classmethod
def load(cls, filepath: str) -> Pipeline:
return joblib.load(filepath)
class AutoMLPipeline:
def __init__(
self,
task_type: TaskType = TaskType.CLASSIFICATION,
cv: int = 5,
scoring: str = None
):
self.task_type = task_type
self.cv = cv
self.scoring = scoring or ("accuracy" if task_type == TaskType.CLASSIFICATION else "r2")
self.best_model = None
self.best_params = None
self.results = []
def fit(
self,
X: pd.DataFrame,
y: np.ndarray,
models: Optional[List[Any]] = None,
param_grids: Optional[Dict[str, Dict]] = None
) -> "AutoMLPipeline":
if models is None:
models = self._get_default_models()
if param_grids is None:
param_grids = self._get_default_param_grids()
for name, model in models.items():
print(f"Training {name}...")
tuner = HyperparameterTuner(
model,
param_grids.get(name, {}),
cv=self.cv,
scoring=self.scoring
)
tuner.random_search(X, y, n_iter=20)
self.results.append({
"model_name": name,
"best_score": tuner.best_score,
"best_params": tuner.best_params,
"model": tuner.best_model
})
self.results.sort(key=lambda x: x["best_score"], reverse=True)
self.best_model = self.results[0]["model"]
self.best_params = self.results[0]["best_params"]
return self
def _get_default_models(self) -> Dict[str, Any]:
if self.task_type == TaskType.CLASSIFICATION:
return {
"RandomForest": RandomForestClassifier(random_state=42),
"SVM": SVC(random_state=42),
"LogisticRegression": LogisticRegression(random_state=42, max_iter=1000),
"KNN": KNeighborsClassifier()
}
else:
return {
"RandomForest": RandomForestRegressor(random_state=42),
"SVR": SVR(),
"LinearRegression": LinearRegression(),
"Ridge": Ridge(random_state=42)
}
def _get_default_param_grids(self) -> Dict[str, Dict]:
if self.task_type == TaskType.CLASSIFICATION:
return {
"RandomForest": {"n_estimators": [50, 100, 200], "max_depth": [None, 10, 20]},
"SVM": {"C": [0.1, 1, 10], "kernel": ["rbf", "linear"]},
"LogisticRegression": {"C": [0.1, 1, 10]},
"KNN": {"n_neighbors": [3, 5, 7, 9]}
}
else:
return {
"RandomForest": {"n_estimators": [50, 100, 200], "max_depth": [None, 10, 20]},
"SVR": {"C": [0.1, 1, 10], "kernel": ["rbf", "linear"]},
"Ridge": {"alpha": [0.1, 1, 10]}
}
def get_results_df(self) -> pd.DataFrame:
return pd.DataFrame([
{"model": r["model_name"], "score": r["best_score"]}
for r in self.results
]).sort_values("score", ascending=False)
def predict(self, X: np.ndarray) -> np.ndarray:
if self.best_model is None:
raise ValueError("Model not fitted yet")
return self.best_model.predict(X)30.9 知识图谱
30.9.1 机器学习工作流程
机器学习标准工作流程
┌─────────────────────────────────────────────────────────────┐
│ 数据收集 → 数据清洗 → 特征工程 → 模型训练 → 评估优化 → 部署│
└─────────────────────────────────────────────────────────────┘
详细步骤:
┌─────────────────────────────────────────┐
│ 1. 数据收集 获取原始数据 │
│ 2. 数据清洗 处理缺失值、异常值 │
│ 3. 特征工程 特征提取、选择、转换 │
│ 4. 数据分割 训练集/验证集/测试集 │
│ 5. 模型选择 选择合适算法 │
│ 6. 模型训练 拟合数据 │
│ 7. 模型评估 性能指标计算 │
│ 8. 超参调优 网格搜索、随机搜索 │
│ 9. 模型部署 生产环境应用 │
└─────────────────────────────────────────┘30.9.2 算法分类
机器学习算法分类
监督学习:
┌─────────────────────────────────────────┐
│ 分类: 决策树、SVM、随机森林、神经网络 │
│ 回归: 线性回归、岭回归、LASSO │
└─────────────────────────────────────────┘
无监督学习:
┌─────────────────────────────────────────┐
│ 聚类: K-Means、DBSCAN、层次聚类 │
│ 降维: PCA、t-SNE、UMAP │
└─────────────────────────────────────────┘
强化学习:
┌─────────────────────────────────────────┐
│ Q-Learning、DQN、PPO、A3C │
└─────────────────────────────────────────┘30.9.3 模型评估指标
分类评估指标
┌─────────────────────────────────────────┐
│ 准确率(Accuracy) 正确预测比例 │
│ 精确率(Precision) 预测为正中真正为正 │
│ 召回率(Recall) 真正为正中被预测为正│
│ F1分数 精确率和召回率调和平均│
│ AUC-ROC 分类阈值无关指标 │
└─────────────────────────────────────────┘
回归评估指标:
┌─────────────────────────────────────────┐
│ MSE 均方误差 │
│ RMSE 均方根误差 │
│ MAE 平均绝对误差 │
│ R² 决定系数 │
└─────────────────────────────────────────┘30.10 技术选型指南
30.10.1 算法选型
| 数据量 | 特征数 | 推荐算法 | 原因 |
|---|---|---|---|
| 小 | 少 | 决策树 | 可解释性好 |
| 中 | 中 | 随机森林 | 鲁棒性强 |
| 大 | 多 | 梯度提升 | 性能最优 |
| 大 | 高维 | 神经网络 | 自动特征 |
30.10.2 框架选型
| 场景 | 推荐框架 | 原因 |
|---|---|---|
| 传统ML | scikit-learn | 功能完整 |
| 深度学习 | PyTorch/TensorFlow | 灵活强大 |
| AutoML | AutoGluon/H2O | 自动化 |
| 大规模 | Spark MLlib | 分布式 |
30.11 常见问题与解决方案
30.11.1 过拟合问题
python
# 问题:模型在训练集表现好,测试集差
# 解决方案:正则化、交叉验证、增加数据
from sklearn.linear_model import Ridge
from sklearn.model_selection import cross_val_score
# 使用正则化
model = Ridge(alpha=1.0)
# 交叉验证
scores = cross_val_score(model, X, y, cv=5)30.11.2 数据不平衡
python
# 问题:类别分布不均衡
# 解决方案:过采样、欠采样、类别权重
from imblearn.over_sampling import SMOTE
from sklearn.utils.class_weight import compute_class_weight
# 过采样
smote = SMOTE()
X_resampled, y_resampled = smote.fit_resample(X, y)
# 类别权重
class_weights = compute_class_weight('balanced', classes=np.unique(y), y=y)30.11.3 特征选择
python
# 问题:特征过多,模型复杂
# 解决方案:特征选择
from sklearn.feature_selection import SelectKBest, f_classif
# 选择最好的K个特征
selector = SelectKBest(f_classif, k=10)
X_selected = selector.fit_transform(X, y)30.12 本章小结
本章详细介绍了Python机器学习的基础概念和实践:
- 机器学习基础:监督学习、无监督学习、工作流程
- 数据预处理:数据清洗、特征编码、特征缩放
- 特征工程:多项式特征、交互特征、降维
- 分类算法:决策树、随机森林、SVM、朴素贝叶斯
- 回归算法:线性回归、正则化回归、多项式回归
- 聚类算法:K-Means、层次聚类、DBSCAN
- 模型评估:混淆矩阵、ROC曲线、交叉验证
- 模型调优:网格搜索、随机搜索、自动化流水线
练习题
- 使用决策树对鸢尾花数据集进行分类,并可视化决策树
- 实现一个房价预测模型,使用多种回归算法比较性能
- 使用K-Means对客户数据进行聚类分析
- 构建一个完整的机器学习流水线,包含数据预处理、特征选择和模型训练
- 实现一个简单的AutoML工具,自动选择最佳模型和参数