Skip to content

第42章 数据分析进阶

学习目标

完成本章学习后,你将能够:

  1. 掌握Pandas高级特性:MultiIndex、分层索引、高级分组
  2. 进行数据清洗:缺失值处理、异常值检测、数据转换
  3. 实现数据转换:透视表、交叉表、数据重塑
  4. 分析时间序列:日期处理、时间窗口、滚动计算
  5. 处理大数据:分块处理、内存优化、Dask并行计算
  6. 实现数据聚合:复杂聚合、自定义函数、链式操作
  7. 进行数据可视化:统计图表、交互式可视化、报告生成
  8. 构建数据处理管道:ETL流程、数据验证、自动化处理

42.1 Pandas高级特性

42.1.1 MultiIndex与分层索引

python
import pandas as pd
import numpy as np
from typing import Any, Dict, List, Optional, Union, Callable
from dataclasses import dataclass


class MultiIndexDemo:
    @staticmethod
    def create_multiindex_dataframe() -> pd.DataFrame:
        arrays = [
            ['A', 'A', 'A', 'B', 'B', 'B'],
            ['X', 'Y', 'Z', 'X', 'Y', 'Z'],
            [1, 2, 3, 1, 2, 3]
        ]
        index = pd.MultiIndex.from_arrays(arrays, names=['group', 'category', 'id'])
        data = np.random.randn(6, 4)
        df = pd.DataFrame(data, index=index, columns=['col1', 'col2', 'col3', 'col4'])
        return df

    @staticmethod
    def create_multiindex_from_product() -> pd.DataFrame:
        index = pd.MultiIndex.from_product(
            [['A', 'B'], ['X', 'Y'], [1, 2]],
            names=['group', 'category', 'id']
        )
        data = np.arange(8).reshape(8, 1)
        df = pd.DataFrame(data, index=index, columns=['value'])
        return df

    @staticmethod
    def stack_unstack_demo() -> pd.DataFrame:
        df = pd.DataFrame({
            'A': [1, 2, 3],
            'B': [4, 5, 6],
            'C': [7, 8, 9]
        }, index=['x', 'y', 'z'])

        stacked = df.stack()
        unstacked = stacked.unstack()

        return stacked, unstacked

    @staticmethod
    def pivot_table_demo() -> pd.DataFrame:
        df = pd.DataFrame({
            'date': pd.date_range('2024-01-01', periods=12, freq='D'),
            'category': ['A', 'B', 'C'] * 4,
            'region': ['North', 'South'] * 6,
            'sales': np.random.randint(100, 1000, 12)
        })

        pivot = df.pivot_table(
            values='sales',
            index='date',
            columns=['category', 'region'],
            aggfunc='sum'
        )

        return pivot


class AdvancedIndexing:
    @staticmethod
    def query_operations(df: pd.DataFrame, conditions: str) -> pd.DataFrame:
        return df.query(conditions)

    @staticmethod
    def multi_condition_filter(df: pd.DataFrame, conditions: Dict[str, Any]) -> pd.DataFrame:
        mask = pd.Series(True, index=df.index)
        for col, condition in conditions.items():
            if callable(condition):
                mask &= df[col].apply(condition)
            elif isinstance(condition, (list, tuple)):
                mask &= df[col].isin(condition)
            else:
                mask &= df[col] == condition
        return df[mask]

    @staticmethod
    def conditional_update(df: pd.DataFrame, condition: pd.Series, updates: Dict[str, Any]) -> pd.DataFrame:
        df = df.copy()
        for col, value in updates.items():
            df.loc[condition, col] = value
        return df

    @staticmethod
    def lookup_values(df: pd.DataFrame, lookup_df: pd.DataFrame, key_col: str, value_col: str) -> pd.Series:
        lookup_dict = lookup_df.set_index(key_col)[value_col].to_dict()
        return df[key_col].map(lookup_dict)


class GroupByAdvanced:
    @staticmethod
    def multiple_aggregations(df: pd.DataFrame, group_cols: List[str], agg_dict: Dict) -> pd.DataFrame:
        return df.groupby(group_cols).agg(agg_dict)

    @staticmethod
    def custom_aggregation(df: pd.DataFrame, group_col: str, value_col: str, func: Callable) -> pd.Series:
        return df.groupby(group_col)[value_col].agg(func)

    @staticmethod
    def transform_operations(df: pd.DataFrame, group_col: str, value_col: str, func: Callable) -> pd.Series:
        return df.groupby(group_col)[value_col].transform(func)

    @staticmethod
    def filter_groups(df: pd.DataFrame, group_col: str, condition: Callable) -> pd.DataFrame:
        return df.groupby(group_col).filter(condition)

    @staticmethod
    def rolling_groupby(df: pd.DataFrame, group_col: str, value_col: str, window: int) -> pd.DataFrame:
        return df.groupby(group_col)[value_col].rolling(window).mean().reset_index()


def pandas_advanced_example():
    df = MultiIndexDemo.create_multiindex_dataframe()
    print("MultiIndex DataFrame:")
    print(df)

    pivot = MultiIndexDemo.pivot_table_demo()
    print("\nPivot Table:")
    print(pivot)

    df2 = pd.DataFrame({
        'group': ['A', 'A', 'B', 'B'],
        'value': [10, 20, 30, 40]
    })

    agg_result = GroupByAdvanced.multiple_aggregations(
        df2,
        ['group'],
        {'value': ['sum', 'mean', 'std', 'count']}
    )
    print("\nMultiple Aggregations:")
    print(agg_result)

42.1.2 数据合并与连接

python
class DataMerge:
    @staticmethod
    def merge_dataframes(
        left: pd.DataFrame,
        right: pd.DataFrame,
        on: Union[str, List[str]],
        how: str = 'inner'
    ) -> pd.DataFrame:
        return pd.merge(left, right, on=on, how=how)

    @staticmethod
    def join_dataframes(
        left: pd.DataFrame,
        right: pd.DataFrame,
        on: str = None,
        how: str = 'left'
    ) -> pd.DataFrame:
        return left.join(right, on=on, how=how)

    @staticmethod
    def concat_dataframes(
        dfs: List[pd.DataFrame],
        axis: int = 0,
        ignore_index: bool = True
    ) -> pd.DataFrame:
        return pd.concat(dfs, axis=axis, ignore_index=ignore_index)

    @staticmethod
    def append_rows(df: pd.DataFrame, new_rows: pd.DataFrame) -> pd.DataFrame:
        return pd.concat([df, new_rows], ignore_index=True)


class DataReshape:
    @staticmethod
    def melt_dataframe(
        df: pd.DataFrame,
        id_vars: List[str],
        value_vars: List[str],
        var_name: str = 'variable',
        value_name: str = 'value'
    ) -> pd.DataFrame:
        return df.melt(id_vars=id_vars, value_vars=value_vars, var_name=var_name, value_name=value_name)

    @staticmethod
    def pivot_dataframe(
        df: pd.DataFrame,
        index: str,
        columns: str,
        values: str
    ) -> pd.DataFrame:
        return df.pivot(index=index, columns=columns, values=values)

    @staticmethod
    def wide_to_long(
        df: pd.DataFrame,
        stubnames: List[str],
        i: List[str],
        j: str,
        sep: str = ''
    ) -> pd.DataFrame:
        return pd.wide_to_long(df, stubnames=stubnames, i=i, j=j, sep=sep)

    @staticmethod
    def explode_column(df: pd.DataFrame, column: str) -> pd.DataFrame:
        return df.explode(column)


class CrossTable:
    @staticmethod
    def create_crosstab(
        df: pd.DataFrame,
        index: str,
        columns: str,
        values: str = None,
        aggfunc: str = 'count'
    ) -> pd.DataFrame:
        return pd.crosstab(
            df[index],
            df[columns],
            values=df[values] if values else None,
            aggfunc=aggfunc
        )

    @staticmethod
    def crosstab_with_margins(
        df: pd.DataFrame,
        index: str,
        columns: str,
        margins: bool = True,
        normalize: str = None
    ) -> pd.DataFrame:
        return pd.crosstab(
            df[index],
            df[columns],
            margins=margins,
            normalize=normalize
        )

42.2 数据清洗

42.2.1 缺失值处理

python
from typing import Union, List, Callable, Optional
import numpy as np


class MissingValueHandler:
    def __init__(self, df: pd.DataFrame):
        self.df = df

    def identify_missing(self) -> pd.DataFrame:
        return self.df.isnull().sum().to_frame('missing_count')

    def missing_percentage(self) -> pd.DataFrame:
        total = len(self.df)
        missing = self.df.isnull().sum()
        percentage = (missing / total * 100).round(2)
        return pd.DataFrame({
            'missing_count': missing,
            'missing_percentage': percentage
        })

    def drop_missing_rows(self, subset: List[str] = None, how: str = 'any') -> pd.DataFrame:
        return self.df.dropna(subset=subset, how=how)

    def drop_missing_columns(self, threshold: float = 0.5) -> pd.DataFrame:
        min_valid = int(len(self.df) * threshold)
        return self.df.dropna(axis=1, thresh=min_valid)

    def fill_with_constant(self, value: Any, columns: List[str] = None) -> pd.DataFrame:
        df = self.df.copy()
        if columns:
            df[columns] = df[columns].fillna(value)
        else:
            df = df.fillna(value)
        return df

    def fill_with_forward(self, columns: List[str] = None) -> pd.DataFrame:
        df = self.df.copy()
        if columns:
            df[columns] = df[columns].ffill()
        else:
            df = df.ffill()
        return df

    def fill_with_backward(self, columns: List[str] = None) -> pd.DataFrame:
        df = self.df.copy()
        if columns:
            df[columns] = df[columns].bfill()
        else:
            df = df.bfill()
        return df

    def fill_with_mean(self, columns: List[str] = None) -> pd.DataFrame:
        df = self.df.copy()
        if columns:
            for col in columns:
                df[col] = df[col].fillna(df[col].mean())
        else:
            df = df.fillna(df.mean(numeric_only=True))
        return df

    def fill_with_median(self, columns: List[str] = None) -> pd.DataFrame:
        df = self.df.copy()
        if columns:
            for col in columns:
                df[col] = df[col].fillna(df[col].median())
        else:
            df = df.fillna(df.median(numeric_only=True))
        return df

    def fill_with_mode(self, columns: List[str] = None) -> pd.DataFrame:
        df = self.df.copy()
        if columns:
            for col in columns:
                mode_val = df[col].mode()
                if len(mode_val) > 0:
                    df[col] = df[col].fillna(mode_val[0])
        else:
            for col in df.columns:
                mode_val = df[col].mode()
                if len(mode_val) > 0:
                    df[col] = df[col].fillna(mode_val[0])
        return df

    def fill_with_interpolation(self, method: str = 'linear', columns: List[str] = None) -> pd.DataFrame:
        df = self.df.copy()
        if columns:
            df[columns] = df[columns].interpolate(method=method)
        else:
            df = df.interpolate(method=method)
        return df

    def fill_with_group_mean(self, group_col: str, value_col: str) -> pd.DataFrame:
        df = self.df.copy()
        df[value_col] = df.groupby(group_col)[value_col].transform(
            lambda x: x.fillna(x.mean())
        )
        return df


class MissingPatternAnalyzer:
    def __init__(self, df: pd.DataFrame):
        self.df = df

    def analyze_patterns(self) -> pd.DataFrame:
        missing_matrix = self.df.isnull()
        pattern_counts = missing_matrix.value_counts().reset_index()
        pattern_counts.columns = ['pattern', 'count']
        return pattern_counts

    def get_missing_correlation(self) -> pd.DataFrame:
        return self.df.isnull().corr()

    def find_missing_blocks(self, min_size: int = 2) -> List[Dict]:
        blocks = []
        for col in self.df.columns:
            is_missing = self.df[col].isnull()
            block_start = None
            block_size = 0

            for i, val in enumerate(is_missing):
                if val:
                    if block_start is None:
                        block_start = i
                    block_size += 1
                else:
                    if block_size >= min_size:
                        blocks.append({
                            'column': col,
                            'start': block_start,
                            'end': block_start + block_size - 1,
                            'size': block_size
                        })
                    block_start = None
                    block_size = 0

            if block_size >= min_size:
                blocks.append({
                    'column': col,
                    'start': block_start,
                    'end': block_start + block_size - 1,
                    'size': block_size
                })

        return blocks

42.2.2 异常值检测与处理

python
from scipy import stats
from typing import Tuple


class OutlierDetector:
    def __init__(self, df: pd.DataFrame):
        self.df = df

    def zscore_method(self, column: str, threshold: float = 3.0) -> pd.Series:
        z_scores = np.abs(stats.zscore(self.df[column].dropna()))
        return z_scores > threshold

    def iqr_method(self, column: str, multiplier: float = 1.5) -> pd.Series:
        Q1 = self.df[column].quantile(0.25)
        Q3 = self.df[column].quantile(0.75)
        IQR = Q3 - Q1

        lower_bound = Q1 - multiplier * IQR
        upper_bound = Q3 + multiplier * IQR

        return (self.df[column] < lower_bound) | (self.df[column] > upper_bound)

    def percentile_method(self, column: str, lower: float = 0.01, upper: float = 0.99) -> pd.Series:
        lower_bound = self.df[column].quantile(lower)
        upper_bound = self.df[column].quantile(upper)

        return (self.df[column] < lower_bound) | (self.df[column] > upper_bound)

    def mad_method(self, column: str, threshold: float = 3.0) -> pd.Series:
        median = self.df[column].median()
        mad = np.median(np.abs(self.df[column] - median))

        modified_z_scores = 0.6745 * (self.df[column] - median) / mad

        return np.abs(modified_z_scores) > threshold

    def isolation_forest(self, columns: List[str], contamination: float = 0.1) -> pd.Series:
        from sklearn.ensemble import IsolationForest

        clf = IsolationForest(contamination=contamination, random_state=42)
        predictions = clf.fit_predict(self.df[columns].dropna())

        result = pd.Series(False, index=self.df.index)
        valid_idx = self.df[columns].dropna().index
        result.loc[valid_idx] = predictions == -1

        return result

    def get_outlier_summary(self, column: str, methods: List[str] = None) -> pd.DataFrame:
        methods = methods or ['zscore', 'iqr', 'percentile']

        results = {}
        if 'zscore' in methods:
            results['zscore'] = self.zscore_method(column).sum()
        if 'iqr' in methods:
            results['iqr'] = self.iqr_method(column).sum()
        if 'percentile' in methods:
            results['percentile'] = self.percentile_method(column).sum()

        return pd.DataFrame.from_dict(results, orient='index', columns=['outlier_count'])


class OutlierHandler:
    def __init__(self, df: pd.DataFrame):
        self.df = df

    def remove_outliers(self, column: str, mask: pd.Series) -> pd.DataFrame:
        return self.df[~mask]

    def cap_outliers(
        self,
        column: str,
        lower_percentile: float = 0.01,
        upper_percentile: float = 0.99
    ) -> pd.DataFrame:
        df = self.df.copy()
        lower = df[column].quantile(lower_percentile)
        upper = df[column].quantile(upper_percentile)

        df[column] = df[column].clip(lower=lower, upper=upper)
        return df

    def replace_with_median(self, column: str, mask: pd.Series) -> pd.DataFrame:
        df = self.df.copy()
        median = df.loc[~mask, column].median()
        df.loc[mask, column] = median
        return df

    def winsorize(self, column: str, limits: Tuple[float, float] = (0.05, 0.05)) -> pd.DataFrame:
        from scipy.stats import mstats

        df = self.df.copy()
        df[column] = mstats.winsorize(df[column], limits=limits)
        return df

    def log_transform(self, column: str) -> pd.DataFrame:
        df = self.df.copy()
        df[column] = np.log1p(df[column] - df[column].min() + 1)
        return df

    def boxcox_transform(self, column: str) -> Tuple[pd.DataFrame, float]:
        from scipy.stats import boxcox

        df = self.df.copy()
        transformed, lambda_val = boxcox(df[column] - df[column].min() + 1)
        df[column] = transformed
        return df, lambda_val

42.3 时间序列分析

42.3.1 时间处理

python
from datetime import datetime, timedelta
from typing import Union


class TimeSeriesHandler:
    def __init__(self, df: pd.DataFrame, time_col: str):
        self.df = df.copy()
        self.time_col = time_col

    def convert_to_datetime(self, format: str = None) -> pd.DataFrame:
        self.df[self.time_col] = pd.to_datetime(self.df[self.time_col], format=format)
        return self.df

    def set_index(self) -> pd.DataFrame:
        self.df = self.df.set_index(self.time_col)
        return self.df

    def resample(self, freq: str, agg_func: str = 'mean') -> pd.DataFrame:
        if self.time_col in self.df.columns:
            df = self.df.set_index(self.time_col)
        else:
            df = self.df

        return df.resample(freq).agg(agg_func)

    def rolling_window(self, window: int, agg_func: str = 'mean') -> pd.DataFrame:
        if self.time_col in self.df.columns:
            df = self.df.set_index(self.time_col)
        else:
            df = self.df

        return df.rolling(window=window).agg(agg_func)

    def expanding_window(self, agg_func: str = 'mean') -> pd.DataFrame:
        if self.time_col in self.df.columns:
            df = self.df.set_index(self.time_col)
        else:
            df = self.df

        return df.expanding().agg(agg_func)

    def shift(self, periods: int = 1) -> pd.DataFrame:
        return self.df.shift(periods=periods)

    def diff(self, periods: int = 1) -> pd.DataFrame:
        return self.df.diff(periods=periods)

    def pct_change(self, periods: int = 1) -> pd.DataFrame:
        return self.df.pct_change(periods=periods)

    def add_time_features(self) -> pd.DataFrame:
        df = self.df.copy()

        if self.time_col in df.columns:
            dt = pd.to_datetime(df[self.time_col])
        else:
            dt = df.index

        df['year'] = dt.year
        df['month'] = dt.month
        df['day'] = dt.day
        df['day_of_week'] = dt.dayofweek
        df['day_of_year'] = dt.dayofyear
        df['week_of_year'] = dt.isocalendar().week
        df['quarter'] = dt.quarter
        df['is_weekend'] = dt.dayofweek >= 5

        return df

    def create_lag_features(self, columns: List[str], lags: List[int]) -> pd.DataFrame:
        df = self.df.copy()

        for col in columns:
            for lag in lags:
                df[f'{col}_lag_{lag}'] = df[col].shift(lag)

        return df

    def create_rolling_features(
        self,
        columns: List[str],
        windows: List[int],
        functions: List[str] = ['mean', 'std']
    ) -> pd.DataFrame:
        df = self.df.copy()

        for col in columns:
            for window in windows:
                for func in functions:
                    df[f'{col}_rolling_{window}_{func}'] = df[col].rolling(window).agg(func)

        return df


class DateRangeGenerator:
    @staticmethod
    def generate_range(
        start: str,
        end: str,
        freq: str = 'D',
        include_end: bool = True
    ) -> pd.DatetimeIndex:
        return pd.date_range(start=start, end=end, freq=freq, inclusive='both' if include_end else 'left')

    @staticmethod
    def generate_periods(start: str, periods: int, freq: str = 'D') -> pd.DatetimeIndex:
        return pd.date_range(start=start, periods=periods, freq=freq)

    @staticmethod
    def business_days(start: str, end: str) -> pd.DatetimeIndex:
        return pd.bdate_range(start=start, end=end)

    @staticmethod
    def date_range_by_offset(start: str, offsets: List[str]) -> pd.DatetimeIndex:
        dates = [pd.Timestamp(start)]
        for offset in offsets:
            dates.append(dates[-1] + pd.tseries.frequencies.to_offset(offset))
        return pd.DatetimeIndex(dates)

42.3.2 时间序列分析

python
class TimeSeriesAnalysis:
    def __init__(self, series: pd.Series):
        self.series = series.dropna()

    def decompose(self, model: str = 'additive', period: int = None) -> Dict:
        from statsmodels.tsa.seasonal import seasonal_decompose

        decomposition = seasonal_decompose(self.series, model=model, period=period)

        return {
            'trend': decomposition.trend,
            'seasonal': decomposition.seasonal,
            'residual': decomposition.resid,
            'observed': decomposition.observed
        }

    def test_stationarity(self) -> Dict:
        from statsmodels.tsa.stattools import adfuller

        result = adfuller(self.series)

        return {
            'test_statistic': result[0],
            'p_value': result[1],
            'critical_values': result[4],
            'is_stationary': result[1] < 0.05
        }

    def autocorrelation(self, nlags: int = 40) -> Dict:
        from statsmodels.tsa.stattools import acf, pacf

        return {
            'acf': acf(self.series, nlags=nlags),
            'pacf': pacf(self.series, nlags=nlags)
        }

    def moving_average(self, window: int) -> pd.Series:
        return self.series.rolling(window=window).mean()

    def exponential_smoothing(self, alpha: float = 0.3) -> pd.Series:
        return self.series.ewm(alpha=alpha).mean()

    def seasonal_decomposition_plot_data(self, period: int = 12) -> pd.DataFrame:
        decomp = self.decompose(period=period)

        return pd.DataFrame({
            'observed': decomp['observed'],
            'trend': decomp['trend'],
            'seasonal': decomp['seasonal'],
            'residual': decomp['residual']
        })


class TimeSeriesFeatureEngineering:
    def __init__(self, df: pd.DataFrame, time_col: str, target_col: str):
        self.df = df.copy()
        self.time_col = time_col
        self.target_col = target_col

    def create_all_features(self) -> pd.DataFrame:
        df = self.df.copy()

        df[self.time_col] = pd.to_datetime(df[self.time_col])
        df = df.sort_values(self.time_col)

        df['year'] = df[self.time_col].dt.year
        df['month'] = df[self.time_col].dt.month
        df['day'] = df[self.time_col].dt.day
        df['day_of_week'] = df[self.time_col].dt.dayofweek
        df['day_of_year'] = df[self.time_col].dt.dayofyear
        df['week_of_year'] = df[self.time_col].dt.isocalendar().week
        df['quarter'] = df[self.time_col].dt.quarter
        df['is_weekend'] = df[self.time_col].dt.dayofweek >= 5

        for lag in [1, 7, 14, 30]:
            df[f'lag_{lag}'] = df[self.target_col].shift(lag)

        for window in [7, 14, 30]:
            df[f'rolling_mean_{window}'] = df[self.target_col].rolling(window).mean()
            df[f'rolling_std_{window}'] = df[self.target_col].rolling(window).std()
            df[f'rolling_min_{window}'] = df[self.target_col].rolling(window).min()
            df[f'rolling_max_{window}'] = df[self.target_col].rolling(window).max()

        df['diff_1'] = df[self.target_col].diff(1)
        df['diff_7'] = df[self.target_col].diff(7)
        df['pct_change'] = df[self.target_col].pct_change()

        return df

42.4 大数据处理

42.4.1 分块处理

python
from typing import Iterator, Callable
import gc


class ChunkProcessor:
    def __init__(self, file_path: str, chunk_size: int = 10000):
        self.file_path = file_path
        self.chunk_size = chunk_size

    def process_chunks(
        self,
        process_func: Callable[[pd.DataFrame], pd.DataFrame]
    ) -> pd.DataFrame:
        results = []

        for chunk in pd.read_csv(self.file_path, chunksize=self.chunk_size):
            processed = process_func(chunk)
            results.append(processed)
            del chunk
            gc.collect()

        return pd.concat(results, ignore_index=True)

    def aggregate_chunks(
        self,
        group_cols: List[str],
        agg_dict: Dict
    ) -> pd.DataFrame:
        results = []

        for chunk in pd.read_csv(self.file_path, chunksize=self.chunk_size):
            agg_result = chunk.groupby(group_cols).agg(agg_dict).reset_index()
            results.append(agg_result)
            del chunk
            gc.collect()

        combined = pd.concat(results, ignore_index=True)
        return combined.groupby(group_cols).agg(agg_dict).reset_index()

    def filter_chunks(self, filter_func: Callable[[pd.DataFrame], pd.DataFrame]) -> pd.DataFrame:
        results = []

        for chunk in pd.read_csv(self.file_path, chunksize=self.chunk_size):
            filtered = filter_func(chunk)
            results.append(filtered)
            del chunk
            gc.collect()

        return pd.concat(results, ignore_index=True)

    def iterate_chunks(self) -> Iterator[pd.DataFrame]:
        for chunk in pd.read_csv(self.file_path, chunksize=self.chunk_size):
            yield chunk


class MemoryOptimizer:
    @staticmethod
    def optimize_dtypes(df: pd.DataFrame) -> pd.DataFrame:
        df = df.copy()

        for col in df.columns:
            col_type = df[col].dtype

            if col_type == 'int64':
                if df[col].min() >= 0:
                    if df[col].max() < 255:
                        df[col] = df[col].astype('uint8')
                    elif df[col].max() < 65535:
                        df[col] = df[col].astype('uint16')
                    elif df[col].max() < 4294967295:
                        df[col] = df[col].astype('uint32')
                else:
                    if df[col].min() > -128 and df[col].max() < 127:
                        df[col] = df[col].astype('int8')
                    elif df[col].min() > -32768 and df[col].max() < 32767:
                        df[col] = df[col].astype('int16')
                    elif df[col].min() > -2147483648 and df[col].max() < 2147483647:
                        df[col] = df[col].astype('int32')

            elif col_type == 'float64':
                df[col] = df[col].astype('float32')

            elif col_type == 'object':
                unique_ratio = df[col].nunique() / len(df)
                if unique_ratio < 0.5:
                    df[col] = df[col].astype('category')

        return df

    @staticmethod
    def downcast_numeric(df: pd.DataFrame) -> pd.DataFrame:
        df = df.copy()

        for col in df.select_dtypes(include=['int']).columns:
            df[col] = pd.to_numeric(df[col], downcast='integer')

        for col in df.select_dtypes(include=['float']).columns:
            df[col] = pd.to_numeric(df[col], downcast='float')

        return df

    @staticmethod
    def get_memory_usage(df: pd.DataFrame) -> pd.DataFrame:
        usage = df.memory_usage(deep=True)
        return pd.DataFrame({
            'column': usage.index,
            'memory_bytes': usage.values,
            'memory_mb': usage.values / 1024 / 1024
        })

42.4.2 Dask并行计算

python
class DaskProcessor:
    def __init__(self):
        try:
            import dask.dataframe as dd
            self.dd = dd
            self.available = True
        except ImportError:
            self.available = False

    def read_csv(self, file_path: str) -> Any:
        if self.available:
            return self.dd.read_csv(file_path)
        return pd.read_csv(file_path)

    def read_parquet(self, file_path: str) -> Any:
        if self.available:
            return self.dd.read_parquet(file_path)
        return pd.read_parquet(file_path)

    def parallel_groupby(
        self,
        df: Any,
        group_col: str,
        agg_dict: Dict
    ) -> pd.DataFrame:
        if self.available and hasattr(df, 'groupby'):
            result = df.groupby(group_col).agg(agg_dict).compute()
            return result
        return df.groupby(group_col).agg(agg_dict)

    def parallel_apply(
        self,
        df: Any,
        func: Callable,
        axis: int = 1
    ) -> Any:
        if self.available and hasattr(df, 'apply'):
            return df.apply(func, axis=axis, meta=(None, 'object')).compute()
        return df.apply(func, axis=axis)


class ParallelProcessor:
    def __init__(self, n_workers: int = 4):
        self.n_workers = n_workers

    def parallel_apply(
        self,
        df: pd.DataFrame,
        func: Callable,
        column: str = None
    ) -> pd.Series:
        from concurrent.futures import ThreadPoolExecutor
        import numpy as np

        chunks = np.array_split(df, self.n_workers)

        with ThreadPoolExecutor(max_workers=self.n_workers) as executor:
            if column:
                results = list(executor.map(
                    lambda chunk: chunk[column].apply(func),
                    chunks
                ))
            else:
                results = list(executor.map(
                    lambda chunk: chunk.apply(func, axis=1),
                    chunks
                ))

        return pd.concat(results)

    def parallel_groupby_agg(
        self,
        df: pd.DataFrame,
        group_col: str,
        value_col: str,
        agg_func: str
    ) -> pd.DataFrame:
        from concurrent.futures import ProcessPoolExecutor
        import numpy as np

        unique_groups = df[group_col].unique()
        chunk_size = max(1, len(unique_groups) // self.n_workers)

        def process_chunk(groups):
            chunk_df = df[df[group_col].isin(groups)]
            return chunk_df.groupby(group_col)[value_col].agg(agg_func)

        group_chunks = [
            unique_groups[i:i + chunk_size]
            for i in range(0, len(unique_groups), chunk_size)
        ]

        with ProcessPoolExecutor(max_workers=self.n_workers) as executor:
            results = list(executor.map(process_chunk, group_chunks))

        return pd.concat(results)

42.5 知识图谱

42.5.1 数据分析技术栈

┌─────────────────────────────────────────────────────────────────────┐
│                      Python数据分析技术栈                            │
├─────────────────────────────────────────────────────────────────────┤
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │                      数据可视化层                             │   │
│  │  ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐       │   │
│  │  │Matplotlib│ │ Seaborn  │ │ Plotly   │ │Bokeh     │       │   │
│  │  │基础绑图  │ │统计绑图  │ │交互图表  │ │Web可视化 │       │   │
│  │  └──────────┘ └──────────┘ └──────────┘ └──────────┘       │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                │                                    │
│  ┌─────────────────────────────┴───────────────────────────────┐   │
│  │                      数据分析层                               │   │
│  │  ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐       │   │
│  │  │ Pandas   │ │ NumPy    │ │ SciPy    │ │Statsmodel│       │   │
│  │  │数据处理  │ │数值计算  │ │科学计算  │ │统计分析  │       │   │
│  │  └──────────┘ └──────────┘ └──────────┘ └──────────┘       │   │
│  │  ┌──────────┐ ┌──────────┐ ┌──────────┐                    │   │
│  │  │时间序列  │ │分组聚合  │ │数据重塑  │                    │   │
│  │  │TimeSeries│ │GroupBy   │ │Reshape   │                    │   │
│  │  └──────────┘ └──────────┘ └──────────┘                    │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                │                                    │
│  ┌─────────────────────────────┴───────────────────────────────┐   │
│  │                      大数据处理层                             │   │
│  │  ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐       │   │
│  │  │  Dask    │ │  Vaex    │ │  Polars  │ │ Modin    │       │   │
│  │  │并行计算  │ │内存映射  │ │高性能DF │ │分布式   │       │   │
│  │  └──────────┘ └──────────┘ └──────────┘ └──────────┘       │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                │                                    │
│  ┌─────────────────────────────┴───────────────────────────────┐   │
│  │                      数据存储层                               │   │
│  │  ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐       │   │
│  │  │  CSV     │ │ Parquet  │ │  HDF5    │ │Feather   │       │   │
│  │  │文本格式  │ │列式存储  │ │科学数据  │ │快速I/O  │       │   │
│  │  └──────────┘ └──────────┘ └──────────┘ └──────────┘       │   │
│  └─────────────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────────────┘

42.5.2 数据分析流程

┌─────────────────────────────────────────────────────────────────────┐
│                      数据分析工作流程                                │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│   ┌──────────┐                                                      │
│   │ 数据获取  │ ─── API / 数据库 / 文件 / 爬虫                      │
│   └────┬─────┘                                                      │
│        │                                                            │
│        ▼                                                            │
│   ┌──────────┐                                                      │
│   │ 数据探索  │ ─── 描述统计 / 分布分析 / 相关性分析                 │
│   │  EDA    │                                                      │
│   └────┬─────┘                                                      │
│        │                                                            │
│        ▼                                                            │
│   ┌──────────┐    ┌──────────┐    ┌──────────┐                    │
│   │ 数据清洗  │───▶│ 缺失处理  │───▶│ 异常处理  │                    │
│   └──────────┘    └──────────┘    └────┬─────┘                    │
│                                        │                           │
│                                        ▼                           │
│   ┌──────────┐    ┌──────────┐    ┌──────────┐                    │
│   │ 数据转换  │───▶│ 特征工程  │───▶│ 数据重塑  │                    │
│   └──────────┘    └──────────┘    └────┬─────┘                    │
│                                        │                           │
│                                        ▼                           │
│   ┌──────────────────────────────────────────────────────────┐     │
│   │                    数据分析                               │     │
│   │  ┌──────────┐ ┌──────────┐ ┌──────────┐                 │     │
│   │  │ 聚合分析  │ │ 时间序列  │ │ 统计建模  │                 │     │
│   │  └──────────┘ └──────────┘ └──────────┘                 │     │
│   └──────────────────────────────────────────────────────────┘     │
│                                        │                           │
│                                        ▼                           │
│   ┌──────────┐    ┌──────────┐    ┌──────────┐                    │
│   │ 可视化   │───▶│ 报告生成  │───▶│ 洞察输出  │                    │
│   └──────────┘    └──────────┘    └──────────┘                    │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

42.6 技术选型指南

42.6.1 数据处理框架选型

框架数据规模性能特点内存效率学习曲线推荐指数
Pandas<10GB灵活、功能丰富中等★★★★★
Polars<100GB极高性能★★★★★
Dask>100GB并行计算、分布式★★★★☆
Vaex>100GB内存映射、零拷贝极高★★★★☆
Modin任意Pandas兼容、自动并行★★★★☆

42.6.2 数据存储格式选型

格式读取速度写入速度压缩率随机访问推荐场景
Parquet支持大数据、列式分析
Feather极快极快支持临时存储、快速I/O
HDF5支持科学计算、层次数据
CSV不支持数据交换、人类可读
Pickle不支持Python对象序列化

42.6.3 时间序列分析工具选型

工具功能特点适用场景推荐指数
Pandas TimeSeries基础时间序列日常分析★★★★★
Statsmodels统计模型、ARIMA统计预测★★★★☆
Prophet自动预测、可解释业务预测★★★★★
pmdarima自动ARIMA时间序列预测★★★★☆

42.7 常见问题与解决方案

42.7.1 内存不足问题

python
import pandas as pd
import numpy as np
from typing import Iterator, Any

class MemoryEfficientProcessor:
    """内存高效处理器"""
    
    def __init__(self, chunk_size: int = 10000):
        self.chunk_size = chunk_size
    
    def process_large_csv(
        self,
        filepath: str,
        process_func: callable,
        output_path: str = None
    ) -> pd.DataFrame:
        """分块处理大型CSV文件"""
        chunks = []
        for chunk in pd.read_csv(filepath, chunksize=self.chunk_size):
            processed = process_func(chunk)
            chunks.append(processed)
        
        return pd.concat(chunks, ignore_index=True)
    
    def optimize_dtypes(self, df: pd.DataFrame) -> pd.DataFrame:
        """优化数据类型减少内存"""
        for col in df.columns:
            col_type = df[col].dtype
            
            if col_type == 'int64':
                if df[col].min() >= 0:
                    if df[col].max() < 255:
                        df[col] = df[col].astype('uint8')
                    elif df[col].max() < 65535:
                        df[col] = df[col].astype('uint16')
                    else:
                        df[col] = df[col].astype('uint32')
                else:
                    if df[col].min() > -128 and df[col].max() < 127:
                        df[col] = df[col].astype('int8')
                    elif df[col].min() > -32768 and df[col].max() < 32767:
                        df[col] = df[col].astype('int16')
                    else:
                        df[col] = df[col].astype('int32')
            
            elif col_type == 'float64':
                df[col] = df[col].astype('float32')
            
            elif col_type == 'object':
                unique_ratio = df[col].nunique() / len(df)
                if unique_ratio < 0.5:
                    df[col] = df[col].astype('category')
        
        return df
    
    def downcast_dataframe(self, df: pd.DataFrame) -> pd.DataFrame:
        """自动降级数据类型"""
        numerics = ['int16', 'int32', 'int64', 'float16', 'float32', 'float64']
        
        for col in df.select_dtypes(include=numerics).columns:
            df[col] = pd.to_numeric(df[col], downcast='integer')
            df[col] = pd.to_numeric(df[col], downcast='float')
        
        return df


class LazyDataLoader:
    """惰性数据加载器"""
    
    def __init__(self, filepath: str, format: str = 'parquet'):
        self.filepath = filepath
        self.format = format
        self._metadata = None
    
    def get_columns(self) -> list:
        """获取列名"""
        if self.format == 'parquet':
            import pyarrow.parquet as pq
            schema = pq.read_schema(self.filepath)
            return schema.names
        return None
    
    def read_columns(self, columns: list) -> pd.DataFrame:
        """只读取指定列"""
        if self.format == 'parquet':
            return pd.read_parquet(self.filepath, columns=columns)
        return pd.read_csv(self.filepath, usecols=columns)
    
    def read_filtered(
        self,
        filter_func: callable,
        chunk_size: int = 10000
    ) -> pd.DataFrame:
        """过滤读取"""
        chunks = []
        for chunk in pd.read_csv(self.filepath, chunksize=chunk_size):
            filtered = filter_func(chunk)
            if len(filtered) > 0:
                chunks.append(filtered)
        return pd.concat(chunks) if chunks else pd.DataFrame()

42.7.2 数据质量问题

python
from typing import Dict, List, Any, Optional
import pandas as pd
import numpy as np

class DataQualityChecker:
    """数据质量检查器"""
    
    def __init__(self, df: pd.DataFrame):
        self.df = df
        self.report = {}
    
    def check_missing(self) -> Dict:
        """检查缺失值"""
        missing = self.df.isnull().sum()
        missing_pct = missing / len(self.df) * 100
        
        return {
            "missing_counts": missing.to_dict(),
            "missing_percentages": missing_pct.to_dict(),
            "columns_with_missing": missing[missing > 0].index.tolist()
        }
    
    def check_duplicates(self, subset: List[str] = None) -> Dict:
        """检查重复值"""
        duplicates = self.df.duplicated(subset=subset, keep=False)
        return {
            "duplicate_count": duplicates.sum(),
            "duplicate_percentage": duplicates.sum() / len(self.df) * 100,
            "duplicate_indices": self.df[duplicates].index.tolist()
        }
    
    def check_outliers(
        self,
        columns: List[str] = None,
        method: str = 'iqr'
    ) -> Dict:
        """检查异常值"""
        if columns is None:
            columns = self.df.select_dtypes(include=[np.number]).columns
        
        outliers = {}
        for col in columns:
            data = self.df[col].dropna()
            
            if method == 'iqr':
                Q1, Q3 = data.quantile([0.25, 0.75])
                IQR = Q3 - Q1
                lower = Q1 - 1.5 * IQR
                upper = Q3 + 1.5 * IQR
                outlier_mask = (data < lower) | (data > upper)
            
            elif method == 'zscore':
                z_scores = np.abs((data - data.mean()) / data.std())
                outlier_mask = z_scores > 3
            
            outliers[col] = {
                "count": outlier_mask.sum(),
                "percentage": outlier_mask.sum() / len(data) * 100,
                "indices": data[outlier_mask].index.tolist()
            }
        
        return outliers
    
    def check_data_types(self) -> Dict:
        """检查数据类型"""
        type_report = {}
        for col in self.df.columns:
            type_report[col] = {
                "dtype": str(self.df[col].dtype),
                "unique_count": self.df[col].nunique(),
                "sample_values": self.df[col].head(5).tolist()
            }
        return type_report
    
    def generate_report(self) -> Dict:
        """生成完整报告"""
        return {
            "shape": self.df.shape,
            "missing": self.check_missing(),
            "duplicates": self.check_duplicates(),
            "outliers": self.check_outliers(),
            "data_types": self.check_data_types(),
            "memory_usage": self.df.memory_usage(deep=True).to_dict()
        }


class DataCleaner:
    """数据清洗器"""
    
    def __init__(self, df: pd.DataFrame):
        self.df = df.copy()
        self.cleaning_log = []
    
    def handle_missing(
        self,
        strategy: Dict[str, str] = None,
        default_strategy: str = 'drop'
    ) -> "DataCleaner":
        """处理缺失值"""
        strategy = strategy or {}
        
        for col in self.df.columns:
            col_strategy = strategy.get(col, default_strategy)
            
            if self.df[col].isnull().sum() == 0:
                continue
            
            if col_strategy == 'drop':
                self.df = self.df.dropna(subset=[col])
                self.cleaning_log.append(f"Dropped rows with missing {col}")
            
            elif col_strategy == 'mean':
                self.df[col] = self.df[col].fillna(self.df[col].mean())
                self.cleaning_log.append(f"Filled missing {col} with mean")
            
            elif col_strategy == 'median':
                self.df[col] = self.df[col].fillna(self.df[col].median())
                self.cleaning_log.append(f"Filled missing {col} with median")
            
            elif col_strategy == 'mode':
                self.df[col] = self.df[col].fillna(self.df[col].mode()[0])
                self.cleaning_log.append(f"Filled missing {col} with mode")
            
            elif col_strategy == 'ffill':
                self.df[col] = self.df[col].ffill()
                self.cleaning_log.append(f"Forward filled missing {col}")
            
            elif col_strategy == 'bfill':
                self.df[col] = self.df[col].bfill()
                self.cleaning_log.append(f"Backward filled missing {col}")
        
        return self
    
    def handle_outliers(
        self,
        columns: List[str] = None,
        method: str = 'clip',
        threshold: float = 3
    ) -> "DataCleaner":
        """处理异常值"""
        if columns is None:
            columns = self.df.select_dtypes(include=[np.number]).columns
        
        for col in columns:
            if method == 'clip':
                lower = self.df[col].quantile(0.01)
                upper = self.df[col].quantile(0.99)
                self.df[col] = self.df[col].clip(lower, upper)
                self.cleaning_log.append(f"Clipped outliers in {col}")
            
            elif method == 'remove':
                z_scores = np.abs(
                    (self.df[col] - self.df[col].mean()) / self.df[col].std()
                )
                self.df = self.df[z_scores <= threshold]
                self.cleaning_log.append(f"Removed outliers in {col}")
        
        return self
    
    def remove_duplicates(
        self,
        subset: List[str] = None,
        keep: str = 'first'
    ) -> "DataCleaner":
        """删除重复值"""
        before = len(self.df)
        self.df = self.df.drop_duplicates(subset=subset, keep=keep)
        after = len(self.df)
        self.cleaning_log.append(f"Removed {before - after} duplicate rows")
        return self
    
    def get_cleaned_data(self) -> pd.DataFrame:
        """获取清洗后的数据"""
        return self.df
    
    def get_cleaning_log(self) -> List[str]:
        """获取清洗日志"""
        return self.cleaning_log

42.7.3 性能优化问题

python
import pandas as pd
import numpy as np
from typing import Callable, Any

class DataFrameOptimizer:
    """DataFrame性能优化器"""
    
    @staticmethod
    def use_vectorization(df: pd.DataFrame) -> pd.DataFrame:
        """向量化操作替代循环"""
        return df
    
    @staticmethod
    def avoid_chained_indexing(df: pd.DataFrame) -> dict:
        """避免链式索引"""
        return {
            "bad": "df[df['A'] > 0]['B'] = 1",
            "good": "df.loc[df['A'] > 0, 'B'] = 1"
        }
    
    @staticmethod
    def use_query_for_filtering(df: pd.DataFrame, condition: str) -> pd.DataFrame:
        """使用query方法过滤"""
        return df.query(condition)
    
    @staticmethod
    def use_eval_for_expressions(df: pd.DataFrame, expr: str) -> pd.Series:
        """使用eval方法计算表达式"""
        return df.eval(expr)
    
    @staticmethod
    def optimize_merge(
        left: pd.DataFrame,
        right: pd.DataFrame,
        on: str
    ) -> pd.DataFrame:
        """优化合并操作"""
        if len(left) < len(right):
            return left.merge(right, on=on)
        else:
            return right.merge(left, on=on)


class GroupByOptimizer:
    """分组聚合优化器"""
    
    @staticmethod
    def multi_agg_optimized(
        df: pd.DataFrame,
        group_col: str,
        agg_dict: dict
    ) -> pd.DataFrame:
        """优化多聚合操作"""
        return df.groupby(group_col).agg(agg_dict)
    
    @staticmethod
    def transform_instead_of_apply(
        df: pd.DataFrame,
        group_col: str,
        value_col: str,
        func: Callable
    ) -> pd.Series:
        """使用transform替代apply"""
        return df.groupby(group_col)[value_col].transform(func)
    
    @staticmethod
    def filter_groups(df: pd.DataFrame, group_col: str, condition: Callable) -> pd.DataFrame:
        """过滤分组"""
        return df.groupby(group_col).filter(condition)


class TimeSeriesOptimizer:
    """时间序列优化器"""
    
    @staticmethod
    def use_dt_accessor(df: pd.DataFrame, time_col: str) -> pd.DataFrame:
        """使用dt访问器"""
        df[time_col] = pd.to_datetime(df[time_col])
        df['year'] = df[time_col].dt.year
        df['month'] = df[time_col].dt.month
        df['day'] = df[time_col].dt.day
        return df
    
    @staticmethod
    def resample_optimized(
        df: pd.DataFrame,
        time_col: str,
        value_col: str,
        freq: str = 'D',
        agg: str = 'mean'
    ) -> pd.DataFrame:
        """优化重采样"""
        df = df.set_index(time_col)
        return df[value_col].resample(freq).agg(agg)
    
    @staticmethod
    def rolling_optimized(
        df: pd.DataFrame,
        value_col: str,
        window: int,
        min_periods: int = 1
    ) -> pd.DataFrame:
        """优化滚动计算"""
        df[f'rolling_mean_{window}'] = df[value_col].rolling(
            window=window, min_periods=min_periods
        ).mean()
        return df

42.8 本章小结

本章详细介绍了Python数据分析进阶的核心概念和实践:

  1. Pandas高级特性:MultiIndex、分层索引、高级分组
  2. 数据清洗:缺失值处理、异常值检测、数据转换
  3. 数据重塑:透视表、交叉表、数据合并
  4. 时间序列:日期处理、滚动计算、特征工程
  5. 大数据处理:分块处理、内存优化、并行计算

练习题

  1. 实现一个数据清洗管道,自动处理缺失值和异常值
  2. 开发一个时间序列预测系统,支持多种预测模型
  3. 实现一个大数据处理框架,支持分块和并行计算
  4. 开发一个数据质量报告生成器,自动生成数据质量报告
  5. 实现一个ETL流程,支持数据抽取、转换和加载

扩展阅读

Python技术丛书 - 江苏省宿城中等专业学校