第42章 数据分析进阶
学习目标
完成本章学习后,你将能够:
- 掌握Pandas高级特性:MultiIndex、分层索引、高级分组
- 进行数据清洗:缺失值处理、异常值检测、数据转换
- 实现数据转换:透视表、交叉表、数据重塑
- 分析时间序列:日期处理、时间窗口、滚动计算
- 处理大数据:分块处理、内存优化、Dask并行计算
- 实现数据聚合:复杂聚合、自定义函数、链式操作
- 进行数据可视化:统计图表、交互式可视化、报告生成
- 构建数据处理管道:ETL流程、数据验证、自动化处理
42.1 Pandas高级特性
42.1.1 MultiIndex与分层索引
python
import pandas as pd
import numpy as np
from typing import Any, Dict, List, Optional, Union, Callable
from dataclasses import dataclass
class MultiIndexDemo:
@staticmethod
def create_multiindex_dataframe() -> pd.DataFrame:
arrays = [
['A', 'A', 'A', 'B', 'B', 'B'],
['X', 'Y', 'Z', 'X', 'Y', 'Z'],
[1, 2, 3, 1, 2, 3]
]
index = pd.MultiIndex.from_arrays(arrays, names=['group', 'category', 'id'])
data = np.random.randn(6, 4)
df = pd.DataFrame(data, index=index, columns=['col1', 'col2', 'col3', 'col4'])
return df
@staticmethod
def create_multiindex_from_product() -> pd.DataFrame:
index = pd.MultiIndex.from_product(
[['A', 'B'], ['X', 'Y'], [1, 2]],
names=['group', 'category', 'id']
)
data = np.arange(8).reshape(8, 1)
df = pd.DataFrame(data, index=index, columns=['value'])
return df
@staticmethod
def stack_unstack_demo() -> pd.DataFrame:
df = pd.DataFrame({
'A': [1, 2, 3],
'B': [4, 5, 6],
'C': [7, 8, 9]
}, index=['x', 'y', 'z'])
stacked = df.stack()
unstacked = stacked.unstack()
return stacked, unstacked
@staticmethod
def pivot_table_demo() -> pd.DataFrame:
df = pd.DataFrame({
'date': pd.date_range('2024-01-01', periods=12, freq='D'),
'category': ['A', 'B', 'C'] * 4,
'region': ['North', 'South'] * 6,
'sales': np.random.randint(100, 1000, 12)
})
pivot = df.pivot_table(
values='sales',
index='date',
columns=['category', 'region'],
aggfunc='sum'
)
return pivot
class AdvancedIndexing:
@staticmethod
def query_operations(df: pd.DataFrame, conditions: str) -> pd.DataFrame:
return df.query(conditions)
@staticmethod
def multi_condition_filter(df: pd.DataFrame, conditions: Dict[str, Any]) -> pd.DataFrame:
mask = pd.Series(True, index=df.index)
for col, condition in conditions.items():
if callable(condition):
mask &= df[col].apply(condition)
elif isinstance(condition, (list, tuple)):
mask &= df[col].isin(condition)
else:
mask &= df[col] == condition
return df[mask]
@staticmethod
def conditional_update(df: pd.DataFrame, condition: pd.Series, updates: Dict[str, Any]) -> pd.DataFrame:
df = df.copy()
for col, value in updates.items():
df.loc[condition, col] = value
return df
@staticmethod
def lookup_values(df: pd.DataFrame, lookup_df: pd.DataFrame, key_col: str, value_col: str) -> pd.Series:
lookup_dict = lookup_df.set_index(key_col)[value_col].to_dict()
return df[key_col].map(lookup_dict)
class GroupByAdvanced:
@staticmethod
def multiple_aggregations(df: pd.DataFrame, group_cols: List[str], agg_dict: Dict) -> pd.DataFrame:
return df.groupby(group_cols).agg(agg_dict)
@staticmethod
def custom_aggregation(df: pd.DataFrame, group_col: str, value_col: str, func: Callable) -> pd.Series:
return df.groupby(group_col)[value_col].agg(func)
@staticmethod
def transform_operations(df: pd.DataFrame, group_col: str, value_col: str, func: Callable) -> pd.Series:
return df.groupby(group_col)[value_col].transform(func)
@staticmethod
def filter_groups(df: pd.DataFrame, group_col: str, condition: Callable) -> pd.DataFrame:
return df.groupby(group_col).filter(condition)
@staticmethod
def rolling_groupby(df: pd.DataFrame, group_col: str, value_col: str, window: int) -> pd.DataFrame:
return df.groupby(group_col)[value_col].rolling(window).mean().reset_index()
def pandas_advanced_example():
df = MultiIndexDemo.create_multiindex_dataframe()
print("MultiIndex DataFrame:")
print(df)
pivot = MultiIndexDemo.pivot_table_demo()
print("\nPivot Table:")
print(pivot)
df2 = pd.DataFrame({
'group': ['A', 'A', 'B', 'B'],
'value': [10, 20, 30, 40]
})
agg_result = GroupByAdvanced.multiple_aggregations(
df2,
['group'],
{'value': ['sum', 'mean', 'std', 'count']}
)
print("\nMultiple Aggregations:")
print(agg_result)42.1.2 数据合并与连接
python
class DataMerge:
@staticmethod
def merge_dataframes(
left: pd.DataFrame,
right: pd.DataFrame,
on: Union[str, List[str]],
how: str = 'inner'
) -> pd.DataFrame:
return pd.merge(left, right, on=on, how=how)
@staticmethod
def join_dataframes(
left: pd.DataFrame,
right: pd.DataFrame,
on: str = None,
how: str = 'left'
) -> pd.DataFrame:
return left.join(right, on=on, how=how)
@staticmethod
def concat_dataframes(
dfs: List[pd.DataFrame],
axis: int = 0,
ignore_index: bool = True
) -> pd.DataFrame:
return pd.concat(dfs, axis=axis, ignore_index=ignore_index)
@staticmethod
def append_rows(df: pd.DataFrame, new_rows: pd.DataFrame) -> pd.DataFrame:
return pd.concat([df, new_rows], ignore_index=True)
class DataReshape:
@staticmethod
def melt_dataframe(
df: pd.DataFrame,
id_vars: List[str],
value_vars: List[str],
var_name: str = 'variable',
value_name: str = 'value'
) -> pd.DataFrame:
return df.melt(id_vars=id_vars, value_vars=value_vars, var_name=var_name, value_name=value_name)
@staticmethod
def pivot_dataframe(
df: pd.DataFrame,
index: str,
columns: str,
values: str
) -> pd.DataFrame:
return df.pivot(index=index, columns=columns, values=values)
@staticmethod
def wide_to_long(
df: pd.DataFrame,
stubnames: List[str],
i: List[str],
j: str,
sep: str = ''
) -> pd.DataFrame:
return pd.wide_to_long(df, stubnames=stubnames, i=i, j=j, sep=sep)
@staticmethod
def explode_column(df: pd.DataFrame, column: str) -> pd.DataFrame:
return df.explode(column)
class CrossTable:
@staticmethod
def create_crosstab(
df: pd.DataFrame,
index: str,
columns: str,
values: str = None,
aggfunc: str = 'count'
) -> pd.DataFrame:
return pd.crosstab(
df[index],
df[columns],
values=df[values] if values else None,
aggfunc=aggfunc
)
@staticmethod
def crosstab_with_margins(
df: pd.DataFrame,
index: str,
columns: str,
margins: bool = True,
normalize: str = None
) -> pd.DataFrame:
return pd.crosstab(
df[index],
df[columns],
margins=margins,
normalize=normalize
)42.2 数据清洗
42.2.1 缺失值处理
python
from typing import Union, List, Callable, Optional
import numpy as np
class MissingValueHandler:
def __init__(self, df: pd.DataFrame):
self.df = df
def identify_missing(self) -> pd.DataFrame:
return self.df.isnull().sum().to_frame('missing_count')
def missing_percentage(self) -> pd.DataFrame:
total = len(self.df)
missing = self.df.isnull().sum()
percentage = (missing / total * 100).round(2)
return pd.DataFrame({
'missing_count': missing,
'missing_percentage': percentage
})
def drop_missing_rows(self, subset: List[str] = None, how: str = 'any') -> pd.DataFrame:
return self.df.dropna(subset=subset, how=how)
def drop_missing_columns(self, threshold: float = 0.5) -> pd.DataFrame:
min_valid = int(len(self.df) * threshold)
return self.df.dropna(axis=1, thresh=min_valid)
def fill_with_constant(self, value: Any, columns: List[str] = None) -> pd.DataFrame:
df = self.df.copy()
if columns:
df[columns] = df[columns].fillna(value)
else:
df = df.fillna(value)
return df
def fill_with_forward(self, columns: List[str] = None) -> pd.DataFrame:
df = self.df.copy()
if columns:
df[columns] = df[columns].ffill()
else:
df = df.ffill()
return df
def fill_with_backward(self, columns: List[str] = None) -> pd.DataFrame:
df = self.df.copy()
if columns:
df[columns] = df[columns].bfill()
else:
df = df.bfill()
return df
def fill_with_mean(self, columns: List[str] = None) -> pd.DataFrame:
df = self.df.copy()
if columns:
for col in columns:
df[col] = df[col].fillna(df[col].mean())
else:
df = df.fillna(df.mean(numeric_only=True))
return df
def fill_with_median(self, columns: List[str] = None) -> pd.DataFrame:
df = self.df.copy()
if columns:
for col in columns:
df[col] = df[col].fillna(df[col].median())
else:
df = df.fillna(df.median(numeric_only=True))
return df
def fill_with_mode(self, columns: List[str] = None) -> pd.DataFrame:
df = self.df.copy()
if columns:
for col in columns:
mode_val = df[col].mode()
if len(mode_val) > 0:
df[col] = df[col].fillna(mode_val[0])
else:
for col in df.columns:
mode_val = df[col].mode()
if len(mode_val) > 0:
df[col] = df[col].fillna(mode_val[0])
return df
def fill_with_interpolation(self, method: str = 'linear', columns: List[str] = None) -> pd.DataFrame:
df = self.df.copy()
if columns:
df[columns] = df[columns].interpolate(method=method)
else:
df = df.interpolate(method=method)
return df
def fill_with_group_mean(self, group_col: str, value_col: str) -> pd.DataFrame:
df = self.df.copy()
df[value_col] = df.groupby(group_col)[value_col].transform(
lambda x: x.fillna(x.mean())
)
return df
class MissingPatternAnalyzer:
def __init__(self, df: pd.DataFrame):
self.df = df
def analyze_patterns(self) -> pd.DataFrame:
missing_matrix = self.df.isnull()
pattern_counts = missing_matrix.value_counts().reset_index()
pattern_counts.columns = ['pattern', 'count']
return pattern_counts
def get_missing_correlation(self) -> pd.DataFrame:
return self.df.isnull().corr()
def find_missing_blocks(self, min_size: int = 2) -> List[Dict]:
blocks = []
for col in self.df.columns:
is_missing = self.df[col].isnull()
block_start = None
block_size = 0
for i, val in enumerate(is_missing):
if val:
if block_start is None:
block_start = i
block_size += 1
else:
if block_size >= min_size:
blocks.append({
'column': col,
'start': block_start,
'end': block_start + block_size - 1,
'size': block_size
})
block_start = None
block_size = 0
if block_size >= min_size:
blocks.append({
'column': col,
'start': block_start,
'end': block_start + block_size - 1,
'size': block_size
})
return blocks42.2.2 异常值检测与处理
python
from scipy import stats
from typing import Tuple
class OutlierDetector:
def __init__(self, df: pd.DataFrame):
self.df = df
def zscore_method(self, column: str, threshold: float = 3.0) -> pd.Series:
z_scores = np.abs(stats.zscore(self.df[column].dropna()))
return z_scores > threshold
def iqr_method(self, column: str, multiplier: float = 1.5) -> pd.Series:
Q1 = self.df[column].quantile(0.25)
Q3 = self.df[column].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - multiplier * IQR
upper_bound = Q3 + multiplier * IQR
return (self.df[column] < lower_bound) | (self.df[column] > upper_bound)
def percentile_method(self, column: str, lower: float = 0.01, upper: float = 0.99) -> pd.Series:
lower_bound = self.df[column].quantile(lower)
upper_bound = self.df[column].quantile(upper)
return (self.df[column] < lower_bound) | (self.df[column] > upper_bound)
def mad_method(self, column: str, threshold: float = 3.0) -> pd.Series:
median = self.df[column].median()
mad = np.median(np.abs(self.df[column] - median))
modified_z_scores = 0.6745 * (self.df[column] - median) / mad
return np.abs(modified_z_scores) > threshold
def isolation_forest(self, columns: List[str], contamination: float = 0.1) -> pd.Series:
from sklearn.ensemble import IsolationForest
clf = IsolationForest(contamination=contamination, random_state=42)
predictions = clf.fit_predict(self.df[columns].dropna())
result = pd.Series(False, index=self.df.index)
valid_idx = self.df[columns].dropna().index
result.loc[valid_idx] = predictions == -1
return result
def get_outlier_summary(self, column: str, methods: List[str] = None) -> pd.DataFrame:
methods = methods or ['zscore', 'iqr', 'percentile']
results = {}
if 'zscore' in methods:
results['zscore'] = self.zscore_method(column).sum()
if 'iqr' in methods:
results['iqr'] = self.iqr_method(column).sum()
if 'percentile' in methods:
results['percentile'] = self.percentile_method(column).sum()
return pd.DataFrame.from_dict(results, orient='index', columns=['outlier_count'])
class OutlierHandler:
def __init__(self, df: pd.DataFrame):
self.df = df
def remove_outliers(self, column: str, mask: pd.Series) -> pd.DataFrame:
return self.df[~mask]
def cap_outliers(
self,
column: str,
lower_percentile: float = 0.01,
upper_percentile: float = 0.99
) -> pd.DataFrame:
df = self.df.copy()
lower = df[column].quantile(lower_percentile)
upper = df[column].quantile(upper_percentile)
df[column] = df[column].clip(lower=lower, upper=upper)
return df
def replace_with_median(self, column: str, mask: pd.Series) -> pd.DataFrame:
df = self.df.copy()
median = df.loc[~mask, column].median()
df.loc[mask, column] = median
return df
def winsorize(self, column: str, limits: Tuple[float, float] = (0.05, 0.05)) -> pd.DataFrame:
from scipy.stats import mstats
df = self.df.copy()
df[column] = mstats.winsorize(df[column], limits=limits)
return df
def log_transform(self, column: str) -> pd.DataFrame:
df = self.df.copy()
df[column] = np.log1p(df[column] - df[column].min() + 1)
return df
def boxcox_transform(self, column: str) -> Tuple[pd.DataFrame, float]:
from scipy.stats import boxcox
df = self.df.copy()
transformed, lambda_val = boxcox(df[column] - df[column].min() + 1)
df[column] = transformed
return df, lambda_val42.3 时间序列分析
42.3.1 时间处理
python
from datetime import datetime, timedelta
from typing import Union
class TimeSeriesHandler:
def __init__(self, df: pd.DataFrame, time_col: str):
self.df = df.copy()
self.time_col = time_col
def convert_to_datetime(self, format: str = None) -> pd.DataFrame:
self.df[self.time_col] = pd.to_datetime(self.df[self.time_col], format=format)
return self.df
def set_index(self) -> pd.DataFrame:
self.df = self.df.set_index(self.time_col)
return self.df
def resample(self, freq: str, agg_func: str = 'mean') -> pd.DataFrame:
if self.time_col in self.df.columns:
df = self.df.set_index(self.time_col)
else:
df = self.df
return df.resample(freq).agg(agg_func)
def rolling_window(self, window: int, agg_func: str = 'mean') -> pd.DataFrame:
if self.time_col in self.df.columns:
df = self.df.set_index(self.time_col)
else:
df = self.df
return df.rolling(window=window).agg(agg_func)
def expanding_window(self, agg_func: str = 'mean') -> pd.DataFrame:
if self.time_col in self.df.columns:
df = self.df.set_index(self.time_col)
else:
df = self.df
return df.expanding().agg(agg_func)
def shift(self, periods: int = 1) -> pd.DataFrame:
return self.df.shift(periods=periods)
def diff(self, periods: int = 1) -> pd.DataFrame:
return self.df.diff(periods=periods)
def pct_change(self, periods: int = 1) -> pd.DataFrame:
return self.df.pct_change(periods=periods)
def add_time_features(self) -> pd.DataFrame:
df = self.df.copy()
if self.time_col in df.columns:
dt = pd.to_datetime(df[self.time_col])
else:
dt = df.index
df['year'] = dt.year
df['month'] = dt.month
df['day'] = dt.day
df['day_of_week'] = dt.dayofweek
df['day_of_year'] = dt.dayofyear
df['week_of_year'] = dt.isocalendar().week
df['quarter'] = dt.quarter
df['is_weekend'] = dt.dayofweek >= 5
return df
def create_lag_features(self, columns: List[str], lags: List[int]) -> pd.DataFrame:
df = self.df.copy()
for col in columns:
for lag in lags:
df[f'{col}_lag_{lag}'] = df[col].shift(lag)
return df
def create_rolling_features(
self,
columns: List[str],
windows: List[int],
functions: List[str] = ['mean', 'std']
) -> pd.DataFrame:
df = self.df.copy()
for col in columns:
for window in windows:
for func in functions:
df[f'{col}_rolling_{window}_{func}'] = df[col].rolling(window).agg(func)
return df
class DateRangeGenerator:
@staticmethod
def generate_range(
start: str,
end: str,
freq: str = 'D',
include_end: bool = True
) -> pd.DatetimeIndex:
return pd.date_range(start=start, end=end, freq=freq, inclusive='both' if include_end else 'left')
@staticmethod
def generate_periods(start: str, periods: int, freq: str = 'D') -> pd.DatetimeIndex:
return pd.date_range(start=start, periods=periods, freq=freq)
@staticmethod
def business_days(start: str, end: str) -> pd.DatetimeIndex:
return pd.bdate_range(start=start, end=end)
@staticmethod
def date_range_by_offset(start: str, offsets: List[str]) -> pd.DatetimeIndex:
dates = [pd.Timestamp(start)]
for offset in offsets:
dates.append(dates[-1] + pd.tseries.frequencies.to_offset(offset))
return pd.DatetimeIndex(dates)42.3.2 时间序列分析
python
class TimeSeriesAnalysis:
def __init__(self, series: pd.Series):
self.series = series.dropna()
def decompose(self, model: str = 'additive', period: int = None) -> Dict:
from statsmodels.tsa.seasonal import seasonal_decompose
decomposition = seasonal_decompose(self.series, model=model, period=period)
return {
'trend': decomposition.trend,
'seasonal': decomposition.seasonal,
'residual': decomposition.resid,
'observed': decomposition.observed
}
def test_stationarity(self) -> Dict:
from statsmodels.tsa.stattools import adfuller
result = adfuller(self.series)
return {
'test_statistic': result[0],
'p_value': result[1],
'critical_values': result[4],
'is_stationary': result[1] < 0.05
}
def autocorrelation(self, nlags: int = 40) -> Dict:
from statsmodels.tsa.stattools import acf, pacf
return {
'acf': acf(self.series, nlags=nlags),
'pacf': pacf(self.series, nlags=nlags)
}
def moving_average(self, window: int) -> pd.Series:
return self.series.rolling(window=window).mean()
def exponential_smoothing(self, alpha: float = 0.3) -> pd.Series:
return self.series.ewm(alpha=alpha).mean()
def seasonal_decomposition_plot_data(self, period: int = 12) -> pd.DataFrame:
decomp = self.decompose(period=period)
return pd.DataFrame({
'observed': decomp['observed'],
'trend': decomp['trend'],
'seasonal': decomp['seasonal'],
'residual': decomp['residual']
})
class TimeSeriesFeatureEngineering:
def __init__(self, df: pd.DataFrame, time_col: str, target_col: str):
self.df = df.copy()
self.time_col = time_col
self.target_col = target_col
def create_all_features(self) -> pd.DataFrame:
df = self.df.copy()
df[self.time_col] = pd.to_datetime(df[self.time_col])
df = df.sort_values(self.time_col)
df['year'] = df[self.time_col].dt.year
df['month'] = df[self.time_col].dt.month
df['day'] = df[self.time_col].dt.day
df['day_of_week'] = df[self.time_col].dt.dayofweek
df['day_of_year'] = df[self.time_col].dt.dayofyear
df['week_of_year'] = df[self.time_col].dt.isocalendar().week
df['quarter'] = df[self.time_col].dt.quarter
df['is_weekend'] = df[self.time_col].dt.dayofweek >= 5
for lag in [1, 7, 14, 30]:
df[f'lag_{lag}'] = df[self.target_col].shift(lag)
for window in [7, 14, 30]:
df[f'rolling_mean_{window}'] = df[self.target_col].rolling(window).mean()
df[f'rolling_std_{window}'] = df[self.target_col].rolling(window).std()
df[f'rolling_min_{window}'] = df[self.target_col].rolling(window).min()
df[f'rolling_max_{window}'] = df[self.target_col].rolling(window).max()
df['diff_1'] = df[self.target_col].diff(1)
df['diff_7'] = df[self.target_col].diff(7)
df['pct_change'] = df[self.target_col].pct_change()
return df42.4 大数据处理
42.4.1 分块处理
python
from typing import Iterator, Callable
import gc
class ChunkProcessor:
def __init__(self, file_path: str, chunk_size: int = 10000):
self.file_path = file_path
self.chunk_size = chunk_size
def process_chunks(
self,
process_func: Callable[[pd.DataFrame], pd.DataFrame]
) -> pd.DataFrame:
results = []
for chunk in pd.read_csv(self.file_path, chunksize=self.chunk_size):
processed = process_func(chunk)
results.append(processed)
del chunk
gc.collect()
return pd.concat(results, ignore_index=True)
def aggregate_chunks(
self,
group_cols: List[str],
agg_dict: Dict
) -> pd.DataFrame:
results = []
for chunk in pd.read_csv(self.file_path, chunksize=self.chunk_size):
agg_result = chunk.groupby(group_cols).agg(agg_dict).reset_index()
results.append(agg_result)
del chunk
gc.collect()
combined = pd.concat(results, ignore_index=True)
return combined.groupby(group_cols).agg(agg_dict).reset_index()
def filter_chunks(self, filter_func: Callable[[pd.DataFrame], pd.DataFrame]) -> pd.DataFrame:
results = []
for chunk in pd.read_csv(self.file_path, chunksize=self.chunk_size):
filtered = filter_func(chunk)
results.append(filtered)
del chunk
gc.collect()
return pd.concat(results, ignore_index=True)
def iterate_chunks(self) -> Iterator[pd.DataFrame]:
for chunk in pd.read_csv(self.file_path, chunksize=self.chunk_size):
yield chunk
class MemoryOptimizer:
@staticmethod
def optimize_dtypes(df: pd.DataFrame) -> pd.DataFrame:
df = df.copy()
for col in df.columns:
col_type = df[col].dtype
if col_type == 'int64':
if df[col].min() >= 0:
if df[col].max() < 255:
df[col] = df[col].astype('uint8')
elif df[col].max() < 65535:
df[col] = df[col].astype('uint16')
elif df[col].max() < 4294967295:
df[col] = df[col].astype('uint32')
else:
if df[col].min() > -128 and df[col].max() < 127:
df[col] = df[col].astype('int8')
elif df[col].min() > -32768 and df[col].max() < 32767:
df[col] = df[col].astype('int16')
elif df[col].min() > -2147483648 and df[col].max() < 2147483647:
df[col] = df[col].astype('int32')
elif col_type == 'float64':
df[col] = df[col].astype('float32')
elif col_type == 'object':
unique_ratio = df[col].nunique() / len(df)
if unique_ratio < 0.5:
df[col] = df[col].astype('category')
return df
@staticmethod
def downcast_numeric(df: pd.DataFrame) -> pd.DataFrame:
df = df.copy()
for col in df.select_dtypes(include=['int']).columns:
df[col] = pd.to_numeric(df[col], downcast='integer')
for col in df.select_dtypes(include=['float']).columns:
df[col] = pd.to_numeric(df[col], downcast='float')
return df
@staticmethod
def get_memory_usage(df: pd.DataFrame) -> pd.DataFrame:
usage = df.memory_usage(deep=True)
return pd.DataFrame({
'column': usage.index,
'memory_bytes': usage.values,
'memory_mb': usage.values / 1024 / 1024
})42.4.2 Dask并行计算
python
class DaskProcessor:
def __init__(self):
try:
import dask.dataframe as dd
self.dd = dd
self.available = True
except ImportError:
self.available = False
def read_csv(self, file_path: str) -> Any:
if self.available:
return self.dd.read_csv(file_path)
return pd.read_csv(file_path)
def read_parquet(self, file_path: str) -> Any:
if self.available:
return self.dd.read_parquet(file_path)
return pd.read_parquet(file_path)
def parallel_groupby(
self,
df: Any,
group_col: str,
agg_dict: Dict
) -> pd.DataFrame:
if self.available and hasattr(df, 'groupby'):
result = df.groupby(group_col).agg(agg_dict).compute()
return result
return df.groupby(group_col).agg(agg_dict)
def parallel_apply(
self,
df: Any,
func: Callable,
axis: int = 1
) -> Any:
if self.available and hasattr(df, 'apply'):
return df.apply(func, axis=axis, meta=(None, 'object')).compute()
return df.apply(func, axis=axis)
class ParallelProcessor:
def __init__(self, n_workers: int = 4):
self.n_workers = n_workers
def parallel_apply(
self,
df: pd.DataFrame,
func: Callable,
column: str = None
) -> pd.Series:
from concurrent.futures import ThreadPoolExecutor
import numpy as np
chunks = np.array_split(df, self.n_workers)
with ThreadPoolExecutor(max_workers=self.n_workers) as executor:
if column:
results = list(executor.map(
lambda chunk: chunk[column].apply(func),
chunks
))
else:
results = list(executor.map(
lambda chunk: chunk.apply(func, axis=1),
chunks
))
return pd.concat(results)
def parallel_groupby_agg(
self,
df: pd.DataFrame,
group_col: str,
value_col: str,
agg_func: str
) -> pd.DataFrame:
from concurrent.futures import ProcessPoolExecutor
import numpy as np
unique_groups = df[group_col].unique()
chunk_size = max(1, len(unique_groups) // self.n_workers)
def process_chunk(groups):
chunk_df = df[df[group_col].isin(groups)]
return chunk_df.groupby(group_col)[value_col].agg(agg_func)
group_chunks = [
unique_groups[i:i + chunk_size]
for i in range(0, len(unique_groups), chunk_size)
]
with ProcessPoolExecutor(max_workers=self.n_workers) as executor:
results = list(executor.map(process_chunk, group_chunks))
return pd.concat(results)42.5 知识图谱
42.5.1 数据分析技术栈
┌─────────────────────────────────────────────────────────────────────┐
│ Python数据分析技术栈 │
├─────────────────────────────────────────────────────────────────────┤
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ 数据可视化层 │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │Matplotlib│ │ Seaborn │ │ Plotly │ │Bokeh │ │ │
│ │ │基础绑图 │ │统计绑图 │ │交互图表 │ │Web可视化 │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌─────────────────────────────┴───────────────────────────────┐ │
│ │ 数据分析层 │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ Pandas │ │ NumPy │ │ SciPy │ │Statsmodel│ │ │
│ │ │数据处理 │ │数值计算 │ │科学计算 │ │统计分析 │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │时间序列 │ │分组聚合 │ │数据重塑 │ │ │
│ │ │TimeSeries│ │GroupBy │ │Reshape │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌─────────────────────────────┴───────────────────────────────┐ │
│ │ 大数据处理层 │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ Dask │ │ Vaex │ │ Polars │ │ Modin │ │ │
│ │ │并行计算 │ │内存映射 │ │高性能DF │ │分布式 │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌─────────────────────────────┴───────────────────────────────┐ │
│ │ 数据存储层 │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ CSV │ │ Parquet │ │ HDF5 │ │Feather │ │ │
│ │ │文本格式 │ │列式存储 │ │科学数据 │ │快速I/O │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ │
│ └─────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────┘42.5.2 数据分析流程
┌─────────────────────────────────────────────────────────────────────┐
│ 数据分析工作流程 │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────┐ │
│ │ 数据获取 │ ─── API / 数据库 / 文件 / 爬虫 │
│ └────┬─────┘ │
│ │ │
│ ▼ │
│ ┌──────────┐ │
│ │ 数据探索 │ ─── 描述统计 / 分布分析 / 相关性分析 │
│ │ EDA │ │
│ └────┬─────┘ │
│ │ │
│ ▼ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ 数据清洗 │───▶│ 缺失处理 │───▶│ 异常处理 │ │
│ └──────────┘ └──────────┘ └────┬─────┘ │
│ │ │
│ ▼ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ 数据转换 │───▶│ 特征工程 │───▶│ 数据重塑 │ │
│ └──────────┘ └──────────┘ └────┬─────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ 数据分析 │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ 聚合分析 │ │ 时间序列 │ │ 统计建模 │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ 可视化 │───▶│ 报告生成 │───▶│ 洞察输出 │ │
│ └──────────┘ └──────────┘ └──────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘42.6 技术选型指南
42.6.1 数据处理框架选型
| 框架 | 数据规模 | 性能特点 | 内存效率 | 学习曲线 | 推荐指数 |
|---|---|---|---|---|---|
| Pandas | <10GB | 灵活、功能丰富 | 中等 | 低 | ★★★★★ |
| Polars | <100GB | 极高性能 | 高 | 中 | ★★★★★ |
| Dask | >100GB | 并行计算、分布式 | 高 | 中 | ★★★★☆ |
| Vaex | >100GB | 内存映射、零拷贝 | 极高 | 中 | ★★★★☆ |
| Modin | 任意 | Pandas兼容、自动并行 | 高 | 低 | ★★★★☆ |
42.6.2 数据存储格式选型
| 格式 | 读取速度 | 写入速度 | 压缩率 | 随机访问 | 推荐场景 |
|---|---|---|---|---|---|
| Parquet | 快 | 中 | 高 | 支持 | 大数据、列式分析 |
| Feather | 极快 | 极快 | 低 | 支持 | 临时存储、快速I/O |
| HDF5 | 快 | 快 | 中 | 支持 | 科学计算、层次数据 |
| CSV | 慢 | 快 | 无 | 不支持 | 数据交换、人类可读 |
| Pickle | 中 | 中 | 中 | 不支持 | Python对象序列化 |
42.6.3 时间序列分析工具选型
| 工具 | 功能特点 | 适用场景 | 推荐指数 |
|---|---|---|---|
| Pandas TimeSeries | 基础时间序列 | 日常分析 | ★★★★★ |
| Statsmodels | 统计模型、ARIMA | 统计预测 | ★★★★☆ |
| Prophet | 自动预测、可解释 | 业务预测 | ★★★★★ |
| pmdarima | 自动ARIMA | 时间序列预测 | ★★★★☆ |
42.7 常见问题与解决方案
42.7.1 内存不足问题
python
import pandas as pd
import numpy as np
from typing import Iterator, Any
class MemoryEfficientProcessor:
"""内存高效处理器"""
def __init__(self, chunk_size: int = 10000):
self.chunk_size = chunk_size
def process_large_csv(
self,
filepath: str,
process_func: callable,
output_path: str = None
) -> pd.DataFrame:
"""分块处理大型CSV文件"""
chunks = []
for chunk in pd.read_csv(filepath, chunksize=self.chunk_size):
processed = process_func(chunk)
chunks.append(processed)
return pd.concat(chunks, ignore_index=True)
def optimize_dtypes(self, df: pd.DataFrame) -> pd.DataFrame:
"""优化数据类型减少内存"""
for col in df.columns:
col_type = df[col].dtype
if col_type == 'int64':
if df[col].min() >= 0:
if df[col].max() < 255:
df[col] = df[col].astype('uint8')
elif df[col].max() < 65535:
df[col] = df[col].astype('uint16')
else:
df[col] = df[col].astype('uint32')
else:
if df[col].min() > -128 and df[col].max() < 127:
df[col] = df[col].astype('int8')
elif df[col].min() > -32768 and df[col].max() < 32767:
df[col] = df[col].astype('int16')
else:
df[col] = df[col].astype('int32')
elif col_type == 'float64':
df[col] = df[col].astype('float32')
elif col_type == 'object':
unique_ratio = df[col].nunique() / len(df)
if unique_ratio < 0.5:
df[col] = df[col].astype('category')
return df
def downcast_dataframe(self, df: pd.DataFrame) -> pd.DataFrame:
"""自动降级数据类型"""
numerics = ['int16', 'int32', 'int64', 'float16', 'float32', 'float64']
for col in df.select_dtypes(include=numerics).columns:
df[col] = pd.to_numeric(df[col], downcast='integer')
df[col] = pd.to_numeric(df[col], downcast='float')
return df
class LazyDataLoader:
"""惰性数据加载器"""
def __init__(self, filepath: str, format: str = 'parquet'):
self.filepath = filepath
self.format = format
self._metadata = None
def get_columns(self) -> list:
"""获取列名"""
if self.format == 'parquet':
import pyarrow.parquet as pq
schema = pq.read_schema(self.filepath)
return schema.names
return None
def read_columns(self, columns: list) -> pd.DataFrame:
"""只读取指定列"""
if self.format == 'parquet':
return pd.read_parquet(self.filepath, columns=columns)
return pd.read_csv(self.filepath, usecols=columns)
def read_filtered(
self,
filter_func: callable,
chunk_size: int = 10000
) -> pd.DataFrame:
"""过滤读取"""
chunks = []
for chunk in pd.read_csv(self.filepath, chunksize=chunk_size):
filtered = filter_func(chunk)
if len(filtered) > 0:
chunks.append(filtered)
return pd.concat(chunks) if chunks else pd.DataFrame()42.7.2 数据质量问题
python
from typing import Dict, List, Any, Optional
import pandas as pd
import numpy as np
class DataQualityChecker:
"""数据质量检查器"""
def __init__(self, df: pd.DataFrame):
self.df = df
self.report = {}
def check_missing(self) -> Dict:
"""检查缺失值"""
missing = self.df.isnull().sum()
missing_pct = missing / len(self.df) * 100
return {
"missing_counts": missing.to_dict(),
"missing_percentages": missing_pct.to_dict(),
"columns_with_missing": missing[missing > 0].index.tolist()
}
def check_duplicates(self, subset: List[str] = None) -> Dict:
"""检查重复值"""
duplicates = self.df.duplicated(subset=subset, keep=False)
return {
"duplicate_count": duplicates.sum(),
"duplicate_percentage": duplicates.sum() / len(self.df) * 100,
"duplicate_indices": self.df[duplicates].index.tolist()
}
def check_outliers(
self,
columns: List[str] = None,
method: str = 'iqr'
) -> Dict:
"""检查异常值"""
if columns is None:
columns = self.df.select_dtypes(include=[np.number]).columns
outliers = {}
for col in columns:
data = self.df[col].dropna()
if method == 'iqr':
Q1, Q3 = data.quantile([0.25, 0.75])
IQR = Q3 - Q1
lower = Q1 - 1.5 * IQR
upper = Q3 + 1.5 * IQR
outlier_mask = (data < lower) | (data > upper)
elif method == 'zscore':
z_scores = np.abs((data - data.mean()) / data.std())
outlier_mask = z_scores > 3
outliers[col] = {
"count": outlier_mask.sum(),
"percentage": outlier_mask.sum() / len(data) * 100,
"indices": data[outlier_mask].index.tolist()
}
return outliers
def check_data_types(self) -> Dict:
"""检查数据类型"""
type_report = {}
for col in self.df.columns:
type_report[col] = {
"dtype": str(self.df[col].dtype),
"unique_count": self.df[col].nunique(),
"sample_values": self.df[col].head(5).tolist()
}
return type_report
def generate_report(self) -> Dict:
"""生成完整报告"""
return {
"shape": self.df.shape,
"missing": self.check_missing(),
"duplicates": self.check_duplicates(),
"outliers": self.check_outliers(),
"data_types": self.check_data_types(),
"memory_usage": self.df.memory_usage(deep=True).to_dict()
}
class DataCleaner:
"""数据清洗器"""
def __init__(self, df: pd.DataFrame):
self.df = df.copy()
self.cleaning_log = []
def handle_missing(
self,
strategy: Dict[str, str] = None,
default_strategy: str = 'drop'
) -> "DataCleaner":
"""处理缺失值"""
strategy = strategy or {}
for col in self.df.columns:
col_strategy = strategy.get(col, default_strategy)
if self.df[col].isnull().sum() == 0:
continue
if col_strategy == 'drop':
self.df = self.df.dropna(subset=[col])
self.cleaning_log.append(f"Dropped rows with missing {col}")
elif col_strategy == 'mean':
self.df[col] = self.df[col].fillna(self.df[col].mean())
self.cleaning_log.append(f"Filled missing {col} with mean")
elif col_strategy == 'median':
self.df[col] = self.df[col].fillna(self.df[col].median())
self.cleaning_log.append(f"Filled missing {col} with median")
elif col_strategy == 'mode':
self.df[col] = self.df[col].fillna(self.df[col].mode()[0])
self.cleaning_log.append(f"Filled missing {col} with mode")
elif col_strategy == 'ffill':
self.df[col] = self.df[col].ffill()
self.cleaning_log.append(f"Forward filled missing {col}")
elif col_strategy == 'bfill':
self.df[col] = self.df[col].bfill()
self.cleaning_log.append(f"Backward filled missing {col}")
return self
def handle_outliers(
self,
columns: List[str] = None,
method: str = 'clip',
threshold: float = 3
) -> "DataCleaner":
"""处理异常值"""
if columns is None:
columns = self.df.select_dtypes(include=[np.number]).columns
for col in columns:
if method == 'clip':
lower = self.df[col].quantile(0.01)
upper = self.df[col].quantile(0.99)
self.df[col] = self.df[col].clip(lower, upper)
self.cleaning_log.append(f"Clipped outliers in {col}")
elif method == 'remove':
z_scores = np.abs(
(self.df[col] - self.df[col].mean()) / self.df[col].std()
)
self.df = self.df[z_scores <= threshold]
self.cleaning_log.append(f"Removed outliers in {col}")
return self
def remove_duplicates(
self,
subset: List[str] = None,
keep: str = 'first'
) -> "DataCleaner":
"""删除重复值"""
before = len(self.df)
self.df = self.df.drop_duplicates(subset=subset, keep=keep)
after = len(self.df)
self.cleaning_log.append(f"Removed {before - after} duplicate rows")
return self
def get_cleaned_data(self) -> pd.DataFrame:
"""获取清洗后的数据"""
return self.df
def get_cleaning_log(self) -> List[str]:
"""获取清洗日志"""
return self.cleaning_log42.7.3 性能优化问题
python
import pandas as pd
import numpy as np
from typing import Callable, Any
class DataFrameOptimizer:
"""DataFrame性能优化器"""
@staticmethod
def use_vectorization(df: pd.DataFrame) -> pd.DataFrame:
"""向量化操作替代循环"""
return df
@staticmethod
def avoid_chained_indexing(df: pd.DataFrame) -> dict:
"""避免链式索引"""
return {
"bad": "df[df['A'] > 0]['B'] = 1",
"good": "df.loc[df['A'] > 0, 'B'] = 1"
}
@staticmethod
def use_query_for_filtering(df: pd.DataFrame, condition: str) -> pd.DataFrame:
"""使用query方法过滤"""
return df.query(condition)
@staticmethod
def use_eval_for_expressions(df: pd.DataFrame, expr: str) -> pd.Series:
"""使用eval方法计算表达式"""
return df.eval(expr)
@staticmethod
def optimize_merge(
left: pd.DataFrame,
right: pd.DataFrame,
on: str
) -> pd.DataFrame:
"""优化合并操作"""
if len(left) < len(right):
return left.merge(right, on=on)
else:
return right.merge(left, on=on)
class GroupByOptimizer:
"""分组聚合优化器"""
@staticmethod
def multi_agg_optimized(
df: pd.DataFrame,
group_col: str,
agg_dict: dict
) -> pd.DataFrame:
"""优化多聚合操作"""
return df.groupby(group_col).agg(agg_dict)
@staticmethod
def transform_instead_of_apply(
df: pd.DataFrame,
group_col: str,
value_col: str,
func: Callable
) -> pd.Series:
"""使用transform替代apply"""
return df.groupby(group_col)[value_col].transform(func)
@staticmethod
def filter_groups(df: pd.DataFrame, group_col: str, condition: Callable) -> pd.DataFrame:
"""过滤分组"""
return df.groupby(group_col).filter(condition)
class TimeSeriesOptimizer:
"""时间序列优化器"""
@staticmethod
def use_dt_accessor(df: pd.DataFrame, time_col: str) -> pd.DataFrame:
"""使用dt访问器"""
df[time_col] = pd.to_datetime(df[time_col])
df['year'] = df[time_col].dt.year
df['month'] = df[time_col].dt.month
df['day'] = df[time_col].dt.day
return df
@staticmethod
def resample_optimized(
df: pd.DataFrame,
time_col: str,
value_col: str,
freq: str = 'D',
agg: str = 'mean'
) -> pd.DataFrame:
"""优化重采样"""
df = df.set_index(time_col)
return df[value_col].resample(freq).agg(agg)
@staticmethod
def rolling_optimized(
df: pd.DataFrame,
value_col: str,
window: int,
min_periods: int = 1
) -> pd.DataFrame:
"""优化滚动计算"""
df[f'rolling_mean_{window}'] = df[value_col].rolling(
window=window, min_periods=min_periods
).mean()
return df42.8 本章小结
本章详细介绍了Python数据分析进阶的核心概念和实践:
- Pandas高级特性:MultiIndex、分层索引、高级分组
- 数据清洗:缺失值处理、异常值检测、数据转换
- 数据重塑:透视表、交叉表、数据合并
- 时间序列:日期处理、滚动计算、特征工程
- 大数据处理:分块处理、内存优化、并行计算
练习题
- 实现一个数据清洗管道,自动处理缺失值和异常值
- 开发一个时间序列预测系统,支持多种预测模型
- 实现一个大数据处理框架,支持分块和并行计算
- 开发一个数据质量报告生成器,自动生成数据质量报告
- 实现一个ETL流程,支持数据抽取、转换和加载