第51章 金融科技
学习目标
完成本章学习后,你将能够:
- 获取金融数据:股票行情、财务数据、宏观经济数据
- 进行数据分析:技术指标、统计分析、时间序列分析
- 构建交易策略:策略开发、回测框架、参数优化
- 实现风险管理:风险度量、仓位管理、止损止盈
- 开发交易系统:订单管理、交易接口、实时监控
- 进行量化研究:因子分析、组合优化、绩效评估
- 处理高频数据:Tick数据、订单簿、实时计算
- 构建风控系统:实时监控、预警机制、合规检查
51.1 金融数据获取
51.1.1 数据源接口
python
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any, Tuple
from datetime import datetime, date
from enum import Enum
import json
import time
class AssetType(Enum):
STOCK = "stock"
BOND = "bond"
FUTURE = "future"
OPTION = "option"
FOREX = "forex"
CRYPTO = "crypto"
FUND = "fund"
class Market(Enum):
CN_SH = "sh"
CN_SZ = "sz"
US_NYSE = "nyse"
US_NASDAQ = "nasdaq"
HK = "hk"
@dataclass
class Bar:
symbol: str
timestamp: datetime
open: float
high: float
low: float
close: float
volume: int
amount: float = 0.0
@property
def range(self) -> float:
return self.high - self.low
@property
def body(self) -> float:
return abs(self.close - self.open)
@property
def is_bullish(self) -> bool:
return self.close > self.open
@dataclass
class Tick:
symbol: str
timestamp: datetime
price: float
volume: int
bid: float = 0.0
ask: float = 0.0
bid_volume: int = 0
ask_volume: int = 0
@property
def spread(self) -> float:
return self.ask - self.bid if self.ask and self.bid else 0.0
@property
def mid_price(self) -> float:
return (self.bid + self.ask) / 2 if self.bid and self.ask else self.price
@dataclass
class OrderBook:
symbol: str
timestamp: datetime
bids: List[Tuple[float, int]]
asks: List[Tuple[float, int]]
@property
def best_bid(self) -> Tuple[float, int]:
return self.bids[0] if self.bids else (0.0, 0)
@property
def best_ask(self) -> Tuple[float, int]:
return self.asks[0] if self.asks else (0.0, 0)
@property
def spread(self) -> float:
return self.best_ask[0] - self.best_bid[0]
@property
def mid_price(self) -> float:
return (self.best_bid[0] + self.best_ask[0]) / 2
@dataclass
class FinancialData:
symbol: str
date: date
revenue: float = 0.0
net_income: float = 0.0
eps: float = 0.0
pe_ratio: float = 0.0
pb_ratio: float = 0.0
roe: float = 0.0
debt_ratio: float = 0.0
current_ratio: float = 0.0
dividend_yield: float = 0.0
class MarketDataAPI:
def __init__(self, api_key: str = "", base_url: str = ""):
self.api_key = api_key
self.base_url = base_url
self._cache: Dict[str, Any] = {}
self._cache_time: Dict[str, float] = {}
self.cache_ttl = 60
def _get_cache(self, key: str) -> Optional[Any]:
if key in self._cache:
if time.time() - self._cache_time.get(key, 0) < self.cache_ttl:
return self._cache[key]
return None
def _set_cache(self, key: str, value: Any) -> None:
self._cache[key] = value
self._cache_time[key] = time.time()
def get_bars(
self,
symbol: str,
start_date: date,
end_date: date,
frequency: str = "1d"
) -> List[Bar]:
bars = []
current = datetime.combine(start_date, datetime.min.time())
for i in range(100):
bar = Bar(
symbol=symbol,
timestamp=current,
open=100.0 + i * 0.5,
high=101.0 + i * 0.5,
low=99.0 + i * 0.5,
close=100.5 + i * 0.5,
volume=1000000 + i * 10000
)
bars.append(bar)
return bars
def get_ticks(
self,
symbol: str,
start_time: datetime,
end_time: datetime
) -> List[Tick]:
ticks = []
current = start_time
while current < end_time:
tick = Tick(
symbol=symbol,
timestamp=current,
price=100.0,
volume=100,
bid=99.9,
ask=100.1
)
ticks.append(tick)
current = datetime.fromtimestamp(current.timestamp() + 1)
return ticks
def get_order_book(self, symbol: str) -> OrderBook:
return OrderBook(
symbol=symbol,
timestamp=datetime.now(),
bids=[
(100.0, 1000),
(99.9, 2000),
(99.8, 3000)
],
asks=[
(100.1, 1000),
(100.2, 2000),
(100.3, 3000)
]
)
def get_quote(self, symbol: str) -> Dict:
return {
"symbol": symbol,
"price": 100.0,
"change": 1.0,
"change_percent": 1.0,
"volume": 1000000,
"turnover": 100000000,
"high": 101.0,
"low": 99.0,
"open": 99.5,
"prev_close": 99.0,
"timestamp": datetime.now()
}
def get_financial_data(self, symbol: str) -> FinancialData:
return FinancialData(
symbol=symbol,
date=date.today(),
revenue=1000000000,
net_income=100000000,
eps=1.0,
pe_ratio=20.0,
pb_ratio=2.0,
roe=0.15,
debt_ratio=0.3,
current_ratio=1.5,
dividend_yield=0.02
)
def get_symbols(self, market: Market) -> List[str]:
return ["AAPL", "GOOGL", "MSFT", "AMZN", "META"]51.1.2 数据存储
python
import sqlite3
from typing import List, Optional
from datetime import datetime, date
class DataStorage:
def __init__(self, db_path: str = "market_data.db"):
self.db_path = db_path
self._init_db()
def _init_db(self) -> None:
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS bars (
symbol TEXT,
timestamp DATETIME,
open REAL,
high REAL,
low REAL,
close REAL,
volume INTEGER,
amount REAL,
PRIMARY KEY (symbol, timestamp)
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS ticks (
symbol TEXT,
timestamp DATETIME,
price REAL,
volume INTEGER,
bid REAL,
ask REAL,
PRIMARY KEY (symbol, timestamp)
)
""")
def save_bars(self, bars: List[Bar]) -> None:
with sqlite3.connect(self.db_path) as conn:
for bar in bars:
conn.execute("""
INSERT OR REPLACE INTO bars
(symbol, timestamp, open, high, low, close, volume, amount)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (
bar.symbol,
bar.timestamp.isoformat(),
bar.open,
bar.high,
bar.low,
bar.close,
bar.volume,
bar.amount
))
def load_bars(
self,
symbol: str,
start_date: date,
end_date: date
) -> List[Bar]:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute("""
SELECT symbol, timestamp, open, high, low, close, volume, amount
FROM bars
WHERE symbol = ? AND timestamp >= ? AND timestamp <= ?
ORDER BY timestamp
""", (
symbol,
start_date.isoformat(),
end_date.isoformat()
))
bars = []
for row in cursor:
bars.append(Bar(
symbol=row[0],
timestamp=datetime.fromisoformat(row[1]),
open=row[2],
high=row[3],
low=row[4],
close=row[5],
volume=row[6],
amount=row[7]
))
return bars
def save_ticks(self, ticks: List[Tick]) -> None:
with sqlite3.connect(self.db_path) as conn:
for tick in ticks:
conn.execute("""
INSERT OR REPLACE INTO ticks
(symbol, timestamp, price, volume, bid, ask)
VALUES (?, ?, ?, ?, ?, ?)
""", (
tick.symbol,
tick.timestamp.isoformat(),
tick.price,
tick.volume,
tick.bid,
tick.ask
))51.2 技术分析
51.2.1 技术指标
python
from typing import List, Tuple
import math
class TechnicalIndicators:
@staticmethod
def sma(prices: List[float], period: int) -> List[float]:
result = []
for i in range(len(prices)):
if i < period - 1:
result.append(None)
else:
avg = sum(prices[i - period + 1:i + 1]) / period
result.append(avg)
return result
@staticmethod
def ema(prices: List[float], period: int) -> List[float]:
result = []
multiplier = 2 / (period + 1)
for i in range(len(prices)):
if i == 0:
result.append(prices[0])
else:
ema = (prices[i] - result[-1]) * multiplier + result[-1]
result.append(ema)
return result
@staticmethod
def rsi(prices: List[float], period: int = 14) -> List[float]:
result = []
gains = []
losses = []
for i in range(1, len(prices)):
change = prices[i] - prices[i - 1]
gains.append(max(0, change))
losses.append(max(0, -change))
for i in range(len(prices)):
if i < period:
result.append(None)
else:
avg_gain = sum(gains[i - period:i]) / period
avg_loss = sum(losses[i - period:i]) / period
if avg_loss == 0:
result.append(100)
else:
rs = avg_gain / avg_loss
rsi = 100 - (100 / (1 + rs))
result.append(rsi)
return result
@staticmethod
def macd(
prices: List[float],
fast_period: int = 12,
slow_period: int = 26,
signal_period: int = 9
) -> Tuple[List[float], List[float], List[float]]:
fast_ema = TechnicalIndicators.ema(prices, fast_period)
slow_ema = TechnicalIndicators.ema(prices, slow_period)
macd_line = [
fast - slow if fast and slow else None
for fast, slow in zip(fast_ema, slow_ema)
]
valid_macd = [m for m in macd_line if m is not None]
signal_line = [None] * (len(prices) - len(valid_macd))
signal_line.extend(TechnicalIndicators.ema(valid_macd, signal_period))
histogram = [
macd - signal if macd and signal else None
for macd, signal in zip(macd_line, signal_line)
]
return macd_line, signal_line, histogram
@staticmethod
def bollinger_bands(
prices: List[float],
period: int = 20,
std_dev: float = 2.0
) -> Tuple[List[float], List[float], List[float]]:
middle = TechnicalIndicators.sma(prices, period)
upper = []
lower = []
for i in range(len(prices)):
if i < period - 1:
upper.append(None)
lower.append(None)
else:
window = prices[i - period + 1:i + 1]
mean = middle[i]
variance = sum((p - mean) ** 2 for p in window) / period
std = math.sqrt(variance)
upper.append(mean + std_dev * std)
lower.append(mean - std_dev * std)
return upper, middle, lower
@staticmethod
def atr(
highs: List[float],
lows: List[float],
closes: List[float],
period: int = 14
) -> List[float]:
true_ranges = []
for i in range(len(highs)):
if i == 0:
tr = highs[i] - lows[i]
else:
tr = max(
highs[i] - lows[i],
abs(highs[i] - closes[i - 1]),
abs(lows[i] - closes[i - 1])
)
true_ranges.append(tr)
atr = TechnicalIndicators.sma(true_ranges, period)
return atr
@staticmethod
def stochastic(
highs: List[float],
lows: List[float],
closes: List[float],
k_period: int = 14,
d_period: int = 3
) -> Tuple[List[float], List[float]]:
k_values = []
for i in range(len(closes)):
if i < k_period - 1:
k_values.append(None)
else:
highest = max(highs[i - k_period + 1:i + 1])
lowest = min(lows[i - k_period + 1:i + 1])
if highest == lowest:
k_values.append(50)
else:
k = 100 * (closes[i] - lowest) / (highest - lowest)
k_values.append(k)
d_values = TechnicalIndicators.sma(
[k for k in k_values if k is not None],
d_period
)
d_full = [None] * (k_period - 1)
d_full.extend(d_values)
return k_values, d_full
@staticmethod
def obv(closes: List[float], volumes: List[int]) -> List[float]:
obv = [0]
for i in range(1, len(closes)):
if closes[i] > closes[i - 1]:
obv.append(obv[-1] + volumes[i])
elif closes[i] < closes[i - 1]:
obv.append(obv[-1] - volumes[i])
else:
obv.append(obv[-1])
return obv
@staticmethod
def vwap(
highs: List[float],
lows: List[float],
closes: List[float],
volumes: List[int]
) -> List[float]:
vwap_values = []
cumulative_tp_volume = 0
cumulative_volume = 0
for i in range(len(closes)):
tp = (highs[i] + lows[i] + closes[i]) / 3
cumulative_tp_volume += tp * volumes[i]
cumulative_volume += volumes[i]
if cumulative_volume > 0:
vwap_values.append(cumulative_tp_volume / cumulative_volume)
else:
vwap_values.append(tp)
return vwap_values51.2.2 K线形态识别
python
from typing import List, Optional
from dataclasses import dataclass
@dataclass
class Pattern:
name: str
index: int
direction: str
confidence: float
class CandlestickPatterns:
def __init__(self, bars: List[Bar]):
self.bars = bars
def _body_size(self, bar: Bar) -> float:
return abs(bar.close - bar.open)
def _upper_shadow(self, bar: Bar) -> float:
return bar.high - max(bar.open, bar.close)
def _lower_shadow(self, bar: Bar) -> float:
return min(bar.open, bar.close) - bar.low
def _is_bullish(self, bar: Bar) -> bool:
return bar.close > bar.open
def _is_bearish(self, bar: Bar) -> bool:
return bar.close < bar.open
def doji(self, index: int) -> Optional[Pattern]:
bar = self.bars[index]
body = self._body_size(bar)
total_range = bar.range
if total_range > 0 and body / total_range < 0.1:
return Pattern(
name="Doji",
index=index,
direction="neutral",
confidence=0.7
)
return None
def hammer(self, index: int) -> Optional[Pattern]:
bar = self.bars[index]
body = self._body_size(bar)
lower_shadow = self._lower_shadow(bar)
upper_shadow = self._upper_shadow(bar)
if (
lower_shadow > body * 2 and
upper_shadow < body * 0.5 and
self._is_bullish(bar)
):
return Pattern(
name="Hammer",
index=index,
direction="bullish",
confidence=0.8
)
return None
def shooting_star(self, index: int) -> Optional[Pattern]:
bar = self.bars[index]
body = self._body_size(bar)
lower_shadow = self._lower_shadow(bar)
upper_shadow = self._upper_shadow(bar)
if (
upper_shadow > body * 2 and
lower_shadow < body * 0.5 and
self._is_bearish(bar)
):
return Pattern(
name="Shooting Star",
index=index,
direction="bearish",
confidence=0.8
)
return None
def engulfing(self, index: int) -> Optional[Pattern]:
if index < 1:
return None
current = self.bars[index]
previous = self.bars[index - 1]
if (
self._is_bullish(current) and
self._is_bearish(previous) and
current.open < previous.close and
current.close > previous.open
):
return Pattern(
name="Bullish Engulfing",
index=index,
direction="bullish",
confidence=0.85
)
if (
self._is_bearish(current) and
self._is_bullish(previous) and
current.open > previous.close and
current.close < previous.open
):
return Pattern(
name="Bearish Engulfing",
index=index,
direction="bearish",
confidence=0.85
)
return None
def morning_star(self, index: int) -> Optional[Pattern]:
if index < 2:
return None
first = self.bars[index - 2]
second = self.bars[index - 1]
third = self.bars[index]
if (
self._is_bearish(first) and
self._body_size(second) < self._body_size(first) * 0.3 and
self._is_bullish(third) and
third.close > (first.open + first.close) / 2
):
return Pattern(
name="Morning Star",
index=index,
direction="bullish",
confidence=0.9
)
return None
def evening_star(self, index: int) -> Optional[Pattern]:
if index < 2:
return None
first = self.bars[index - 2]
second = self.bars[index - 1]
third = self.bars[index]
if (
self._is_bullish(first) and
self._body_size(second) < self._body_size(first) * 0.3 and
self._is_bearish(third) and
third.close < (first.open + first.close) / 2
):
return Pattern(
name="Evening Star",
index=index,
direction="bearish",
confidence=0.9
)
return None
def detect_all(self) -> List[Pattern]:
patterns = []
for i in range(len(self.bars)):
for detector in [
self.doji,
self.hammer,
self.shooting_star,
self.engulfing,
self.morning_star,
self.evening_star
]:
pattern = detector(i)
if pattern:
patterns.append(pattern)
return patterns51.3 交易策略
51.3.1 策略框架
python
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any
from enum import Enum
class OrderSide(Enum):
BUY = "buy"
SELL = "sell"
class OrderType(Enum):
MARKET = "market"
LIMIT = "limit"
STOP = "stop"
STOP_LIMIT = "stop_limit"
class OrderStatus(Enum):
PENDING = "pending"
FILLED = "filled"
CANCELLED = "cancelled"
REJECTED = "rejected"
@dataclass
class Order:
symbol: str
side: OrderSide
quantity: int
order_type: OrderType
price: float = 0.0
stop_price: float = 0.0
status: OrderStatus = OrderStatus.PENDING
filled_quantity: int = 0
filled_price: float = 0.0
created_at: datetime = field(default_factory=datetime.now)
filled_at: Optional[datetime] = None
@property
def is_filled(self) -> bool:
return self.status == OrderStatus.FILLED
@property
def remaining_quantity(self) -> int:
return self.quantity - self.filled_quantity
@dataclass
class Position:
symbol: str
quantity: int
avg_cost: float
current_price: float = 0.0
@property
def market_value(self) -> float:
return self.quantity * self.current_price
@property
def cost_basis(self) -> float:
return self.quantity * self.avg_cost
@property
def unrealized_pnl(self) -> float:
return self.market_value - self.cost_basis
@property
def unrealized_pnl_percent(self) -> float:
if self.cost_basis == 0:
return 0.0
return self.unrealized_pnl / self.cost_basis * 100
@dataclass
class Portfolio:
cash: float
positions: Dict[str, Position] = field(default_factory=dict)
@property
def total_value(self) -> float:
return self.cash + sum(p.market_value for p in self.positions.values())
def get_position(self, symbol: str) -> Optional[Position]:
return self.positions.get(symbol)
def has_position(self, symbol: str) -> bool:
return symbol in self.positions and self.positions[symbol].quantity > 0
class Strategy(ABC):
def __init__(self, name: str):
self.name = name
self.portfolio: Optional[Portfolio] = None
self.data: Dict[str, List[Bar]] = {}
def set_portfolio(self, portfolio: Portfolio) -> None:
self.portfolio = portfolio
def set_data(self, symbol: str, bars: List[Bar]) -> None:
self.data[symbol] = bars
@abstractmethod
def on_bar(self, bar: Bar) -> List[Order]:
pass
@abstractmethod
def on_tick(self, tick: Tick) -> List[Order]:
pass
def buy(
self,
symbol: str,
quantity: int,
order_type: OrderType = OrderType.MARKET,
price: float = 0.0
) -> Order:
return Order(
symbol=symbol,
side=OrderSide.BUY,
quantity=quantity,
order_type=order_type,
price=price
)
def sell(
self,
symbol: str,
quantity: int,
order_type: OrderType = OrderType.MARKET,
price: float = 0.0
) -> Order:
return Order(
symbol=symbol,
side=OrderSide.SELL,
quantity=quantity,
order_type=order_type,
price=price
)
class MovingAverageCrossover(Strategy):
def __init__(
self,
fast_period: int = 10,
slow_period: int = 30
):
super().__init__("MA_Crossover")
self.fast_period = fast_period
self.slow_period = slow_period
self._prev_fast = None
self._prev_slow = None
def on_bar(self, bar: Bar) -> List[Order]:
orders = []
bars = self.data.get(bar.symbol, [])
if len(bars) < self.slow_period:
return orders
closes = [b.close for b in bars]
fast_ma = TechnicalIndicators.sma(closes, self.fast_period)[-1]
slow_ma = TechnicalIndicators.sma(closes, self.slow_period)[-1]
if self._prev_fast and self._prev_slow:
if self._prev_fast <= self._prev_slow and fast_ma > slow_ma:
if not self.portfolio.has_position(bar.symbol):
quantity = int(self.portfolio.cash * 0.95 / bar.close)
orders.append(self.buy(bar.symbol, quantity))
elif self._prev_fast >= self._prev_slow and fast_ma < slow_ma:
position = self.portfolio.get_position(bar.symbol)
if position and position.quantity > 0:
orders.append(self.sell(bar.symbol, position.quantity))
self._prev_fast = fast_ma
self._prev_slow = slow_ma
return orders
def on_tick(self, tick: Tick) -> List[Order]:
return []
class RSIStrategy(Strategy):
def __init__(
self,
period: int = 14,
oversold: float = 30,
overbought: float = 70
):
super().__init__("RSI_Strategy")
self.period = period
self.oversold = oversold
self.overbought = overbought
def on_bar(self, bar: Bar) -> List[Order]:
orders = []
bars = self.data.get(bar.symbol, [])
if len(bars) < self.period + 1:
return orders
closes = [b.close for b in bars]
rsi_values = TechnicalIndicators.rsi(closes, self.period)
current_rsi = rsi_values[-1]
if current_rsi < self.oversold:
if not self.portfolio.has_position(bar.symbol):
quantity = int(self.portfolio.cash * 0.95 / bar.close)
orders.append(self.buy(bar.symbol, quantity))
elif current_rsi > self.overbought:
position = self.portfolio.get_position(bar.symbol)
if position and position.quantity > 0:
orders.append(self.sell(bar.symbol, position.quantity))
return orders
def on_tick(self, tick: Tick) -> List[Order]:
return []51.3.2 回测引擎
python
from dataclasses import dataclass
from typing import List, Dict, Optional
import math
@dataclass
class Trade:
symbol: str
side: OrderSide
quantity: int
price: float
timestamp: datetime
commission: float = 0.0
@property
def value(self) -> float:
return self.quantity * self.price
@dataclass
class BacktestResult:
initial_capital: float
final_capital: float
total_return: float
annual_return: float
max_drawdown: float
sharpe_ratio: float
win_rate: float
total_trades: int
trades: List[Trade]
class BacktestEngine:
def __init__(
self,
initial_capital: float = 100000,
commission_rate: float = 0.001
):
self.initial_capital = initial_capital
self.commission_rate = commission_rate
self.portfolio = Portfolio(cash=initial_capital)
self.trades: List[Trade] = []
self.equity_curve: List[float] = []
def run(
self,
strategy: Strategy,
data: Dict[str, List[Bar]]
) -> BacktestResult:
strategy.set_portfolio(self.portfolio)
symbols = list(data.keys())
all_timestamps = set()
for symbol, bars in data.items():
for bar in bars:
all_timestamps.add(bar.timestamp)
sorted_timestamps = sorted(all_timestamps)
for timestamp in sorted_timestamps:
for symbol in symbols:
bars = data.get(symbol, [])
bar_at_time = next(
(b for b in bars if b.timestamp == timestamp),
None
)
if bar_at_time:
strategy.set_data(symbol, [b for b in bars if b.timestamp <= timestamp])
orders = strategy.on_bar(bar_at_time)
for order in orders:
self._execute_order(order, bar_at_time)
self._update_portfolio_prices(data, timestamp)
self.equity_curve.append(self.portfolio.total_value)
return self._calculate_result()
def _execute_order(self, order: Order, bar: Bar) -> None:
if order.side == OrderSide.BUY:
cost = order.quantity * bar.close
commission = cost * self.commission_rate
if self.portfolio.cash >= cost + commission:
self.portfolio.cash -= (cost + commission)
position = self.portfolio.get_position(order.symbol)
if position:
total_cost = position.cost_basis + cost
total_quantity = position.quantity + order.quantity
position.avg_cost = total_cost / total_quantity
position.quantity = total_quantity
else:
self.portfolio.positions[order.symbol] = Position(
symbol=order.symbol,
quantity=order.quantity,
avg_cost=bar.close
)
self.trades.append(Trade(
symbol=order.symbol,
side=order.side,
quantity=order.quantity,
price=bar.close,
timestamp=bar.timestamp,
commission=commission
))
elif order.side == OrderSide.SELL:
position = self.portfolio.get_position(order.symbol)
if position and position.quantity >= order.quantity:
revenue = order.quantity * bar.close
commission = revenue * self.commission_rate
self.portfolio.cash += (revenue - commission)
position.quantity -= order.quantity
if position.quantity == 0:
del self.portfolio.positions[order.symbol]
self.trades.append(Trade(
symbol=order.symbol,
side=order.side,
quantity=order.quantity,
price=bar.close,
timestamp=bar.timestamp,
commission=commission
))
def _update_portfolio_prices(
self,
data: Dict[str, List[Bar]],
timestamp: datetime
) -> None:
for symbol, position in self.portfolio.positions.items():
bars = data.get(symbol, [])
bar = next(
(b for b in bars if b.timestamp == timestamp),
None
)
if bar:
position.current_price = bar.close
def _calculate_result(self) -> BacktestResult:
final_capital = self.portfolio.total_value
total_return = (final_capital - self.initial_capital) / self.initial_capital
trading_days = len(self.equity_curve)
annual_return = (1 + total_return) ** (252 / trading_days) - 1 if trading_days > 0 else 0
max_drawdown = self._calculate_max_drawdown()
sharpe_ratio = self._calculate_sharpe_ratio()
win_rate = self._calculate_win_rate()
return BacktestResult(
initial_capital=self.initial_capital,
final_capital=final_capital,
total_return=total_return,
annual_return=annual_return,
max_drawdown=max_drawdown,
sharpe_ratio=sharpe_ratio,
win_rate=win_rate,
total_trades=len(self.trades),
trades=self.trades
)
def _calculate_max_drawdown(self) -> float:
if not self.equity_curve:
return 0.0
peak = self.equity_curve[0]
max_dd = 0.0
for value in self.equity_curve:
if value > peak:
peak = value
dd = (peak - value) / peak
if dd > max_dd:
max_dd = dd
return max_dd
def _calculate_sharpe_ratio(self, risk_free_rate: float = 0.02) -> float:
if len(self.equity_curve) < 2:
return 0.0
returns = []
for i in range(1, len(self.equity_curve)):
r = (self.equity_curve[i] - self.equity_curve[i - 1]) / self.equity_curve[i - 1]
returns.append(r)
if not returns:
return 0.0
avg_return = sum(returns) / len(returns)
variance = sum((r - avg_return) ** 2 for r in returns) / len(returns)
std_return = math.sqrt(variance)
if std_return == 0:
return 0.0
daily_rf = risk_free_rate / 252
excess_return = avg_return - daily_rf
return excess_return / std_return * math.sqrt(252)
def _calculate_win_rate(self) -> float:
if not self.trades:
return 0.0
wins = 0
total_pairs = 0
buy_trades: Dict[str, List[Trade]] = {}
for trade in self.trades:
if trade.side == OrderSide.BUY:
if trade.symbol not in buy_trades:
buy_trades[trade.symbol] = []
buy_trades[trade.symbol].append(trade)
else:
buys = buy_trades.get(trade.symbol, [])
if buys:
buy = buys.pop(0)
if trade.price > buy.price:
wins += 1
total_pairs += 1
return wins / total_pairs if total_pairs > 0 else 0.051.4 风险管理
51.4.1 风险度量
python
from dataclasses import dataclass
from typing import List, Dict, Optional
import math
@dataclass
class RiskMetrics:
var_95: float
var_99: float
cvar_95: float
max_drawdown: float
volatility: float
beta: float
class RiskManager:
def __init__(self, portfolio: Portfolio):
self.portfolio = portfolio
self.returns_history: List[float] = []
def update_returns(self, returns: float) -> None:
self.returns_history.append(returns)
def calculate_var(self, confidence: float = 0.95) -> float:
if not self.returns_history:
return 0.0
sorted_returns = sorted(self.returns_history)
index = int((1 - confidence) * len(sorted_returns))
var = -sorted_returns[index] * self.portfolio.total_value
return var
def calculate_cvar(self, confidence: float = 0.95) -> float:
if not self.returns_history:
return 0.0
sorted_returns = sorted(self.returns_history)
index = int((1 - confidence) * len(sorted_returns))
tail_returns = sorted_returns[:index]
if not tail_returns:
return 0.0
cvar = -sum(tail_returns) / len(tail_returns) * self.portfolio.total_value
return cvar
def calculate_volatility(self, annualize: bool = True) -> float:
if len(self.returns_history) < 2:
return 0.0
mean = sum(self.returns_history) / len(self.returns_history)
variance = sum((r - mean) ** 2 for r in self.returns_history) / len(self.returns_history)
volatility = math.sqrt(variance)
if annualize:
volatility *= math.sqrt(252)
return volatility
def calculate_beta(
self,
market_returns: List[float]
) -> float:
if len(self.returns_history) < 2 or len(market_returns) < 2:
return 1.0
min_len = min(len(self.returns_history), len(market_returns))
portfolio_returns = self.returns_history[-min_len:]
market = market_returns[-min_len:]
portfolio_mean = sum(portfolio_returns) / len(portfolio_returns)
market_mean = sum(market) / len(market)
covariance = sum(
(p - portfolio_mean) * (m - market_mean)
for p, m in zip(portfolio_returns, market)
) / len(portfolio_returns)
market_variance = sum(
(m - market_mean) ** 2
for m in market
) / len(market)
if market_variance == 0:
return 1.0
return covariance / market_variance
def get_risk_metrics(
self,
market_returns: List[float] = None
) -> RiskMetrics:
return RiskMetrics(
var_95=self.calculate_var(0.95),
var_99=self.calculate_var(0.99),
cvar_95=self.calculate_cvar(0.95),
max_drawdown=self._calculate_max_drawdown(),
volatility=self.calculate_volatility(),
beta=self.calculate_beta(market_returns) if market_returns else 1.0
)
def _calculate_max_drawdown(self) -> float:
if not self.returns_history:
return 0.0
cumulative = 1.0
peak = 1.0
max_dd = 0.0
for r in self.returns_history:
cumulative *= (1 + r)
if cumulative > peak:
peak = cumulative
dd = (peak - cumulative) / peak
if dd > max_dd:
max_dd = dd
return max_dd
class PositionSizer:
@staticmethod
def fixed_amount(
capital: float,
amount: float
) -> int:
return int(amount)
@staticmethod
def fixed_fraction(
capital: float,
fraction: float,
price: float
) -> int:
return int(capital * fraction / price)
@staticmethod
def kelly_criterion(
win_rate: float,
avg_win: float,
avg_loss: float,
capital: float,
price: float
) -> int:
if avg_loss == 0:
return 0
b = avg_win / avg_loss
q = 1 - win_rate
kelly = (win_rate * b - q) / b
kelly = max(0, min(kelly, 0.25))
return int(capital * kelly / price)
@staticmethod
def risk_parity(
volatilities: Dict[str, float],
capital: float,
prices: Dict[str, float]
) -> Dict[str, int]:
inv_vols = {
symbol: 1 / vol if vol > 0 else 0
for symbol, vol in volatilities.items()
}
total_inv_vol = sum(inv_vols.values())
if total_inv_vol == 0:
return {symbol: 0 for symbol in volatilities}
weights = {
symbol: inv_vol / total_inv_vol
for symbol, inv_vol in inv_vols.items()
}
quantities = {}
for symbol, weight in weights.items():
price = prices.get(symbol, 1)
quantities[symbol] = int(capital * weight / price)
return quantities
class StopLossManager:
def __init__(self):
self.stops: Dict[str, Dict] = {}
def set_stop_loss(
self,
symbol: str,
stop_price: float,
stop_type: str = "fixed"
) -> None:
self.stops[symbol] = {
"stop_price": stop_price,
"stop_type": stop_type,
"triggered": False
}
def set_trailing_stop(
self,
symbol: str,
trail_percent: float
) -> None:
self.stops[symbol] = {
"trail_percent": trail_percent,
"highest_price": 0.0,
"stop_type": "trailing",
"triggered": False
}
def update_price(self, symbol: str, current_price: float) -> bool:
if symbol not in self.stops:
return False
stop = self.stops[symbol]
if stop["triggered"]:
return True
if stop["stop_type"] == "trailing":
if current_price > stop["highest_price"]:
stop["highest_price"] = current_price
stop["stop_price"] = current_price * (1 - stop["trail_percent"])
if current_price <= stop["stop_price"]:
stop["triggered"] = True
return True
return False
def is_triggered(self, symbol: str) -> bool:
return self.stops.get(symbol, {}).get("triggered", False)51.5 知识图谱
51.5.1 金融科技技术架构
┌─────────────────────────────────────────────────────────────────────┐
│ 金融科技技术架构 │
├─────────────────────────────────────────────────────────────────────┤
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ 交易层 (Trading) │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ 策略引擎 │ │ 订单管理 │ │ 风控系统 │ │ 执行引擎 │ │ │
│ │ │ Strategy │ │ Order │ │ Risk │ │ Executor│ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌─────────────────────────────┴───────────────────────────────┐ │
│ │ 分析层 (Analysis) │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ 技术分析 │ │ 量化模型 │ │ 回测框架 │ │ 因子分析 │ │ │
│ │ │ TA-Lib │ │ Model │ │ Backtest│ │ Factor │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌─────────────────────────────┴───────────────────────────────┐ │
│ │ 数据层 (Data) │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ 行情数据 │ │ 财务数据 │ │ 新闻数据 │ │ 另类数据 │ │ │
│ │ │ Market │ │ Finance │ │ News │ │ Alt │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌─────────────────────────────┴───────────────────────────────┐ │
│ │ 基础设施层 (Infrastructure) │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ 数据库 │ │ 消息队列 │ │ 缓存 │ │ 监控 │ │ │
│ │ │ Timescale│ │ Kafka │ │ Redis │ │ Grafana │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ │
│ └─────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────┘51.5.2 量化交易流程
┌─────────────────────────────────────────────────────────────────────┐
│ 量化交易工作流程 │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────┐ │
│ │ 数据获取 │ ─── 行情 / 财务 / 新闻 / 另类数据 │
│ └────┬─────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ 数据处理 │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ 清洗 │ │ 标准化 │ │ 特征工程 │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ 策略分析 │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ 信号生成 │ │ 因子计算 │ │ 模型预测 │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ 风险控制 │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ 仓位管理 │ │ 止损止盈 │ │ VaR计算 │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ 订单执行 │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ 订单生成 │ │ 撮合执行 │ │ 成交确认 │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ 绩效分析 │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ 收益计算 │ │ 风险指标 │ │ 归因分析 │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘51.6 技术选型指南
51.6.1 数据源选型
| 数据源 | 数据类型 | 延迟 | 费用 | 推荐指数 |
|---|---|---|---|---|
| Tushare | A股行情、财务 | 低 | 免费/付费 | ★★★★★ |
| AKShare | 多市场数据 | 中 | 免费 | ★★★★★ |
| Wind | 全市场数据 | 极低 | 高 | ★★★★☆ |
| 聚宽 | 量化数据 | 低 | 中 | ★★★★☆ |
| Yahoo Finance | 美股数据 | 中 | 免费 | ★★★★☆ |
51.6.2 回测框架选型
| 框架 | 性能 | 功能 | 学习曲线 | 推荐指数 |
|---|---|---|---|---|
| Backtrader | 中 | 极高 | 中 | ★★★★★ |
| Zipline | 高 | 高 | 高 | ★★★★☆ |
| VeighNa | 高 | 极高 | 中 | ★★★★★ |
| PyAlgoTrade | 中 | 中 | 低 | ★★★☆☆ |
51.6.3 技术指标库选型
| 库 | 指标数量 | 性能 | 易用性 | 推荐指数 |
|---|---|---|---|---|
| TA-Lib | 150+ | 极高 | 中 | ★★★★★ |
| pandas-ta | 130+ | 高 | 高 | ★★★★★ |
| TA | 30+ | 中 | 高 | ★★★☆☆ |
51.7 常见问题与解决方案
51.7.1 数据质量问题
python
import pandas as pd
import numpy as np
from typing import Dict, List, Optional, Tuple
class DataQualityChecker:
"""数据质量检查器"""
def __init__(self, df: pd.DataFrame):
self.df = df
def check_missing(self) -> Dict:
"""检查缺失值"""
missing = self.df.isnull().sum()
return {
"missing_counts": missing.to_dict(),
"missing_ratio": (missing / len(self.df) * 100).to_dict()
}
def check_outliers(self, columns: List[str] = None, method: str = 'iqr') -> Dict:
"""检查异常值"""
columns = columns or self.df.select_dtypes(include=[np.number]).columns
outliers = {}
for col in columns:
data = self.df[col].dropna()
if method == 'iqr':
Q1, Q3 = data.quantile([0.25, 0.75])
IQR = Q3 - Q1
outlier_count = ((data < Q1 - 1.5 * IQR) | (data > Q3 + 1.5 * IQR)).sum()
elif method == 'zscore':
z_scores = np.abs((data - data.mean()) / data.std())
outlier_count = (z_scores > 3).sum()
outliers[col] = outlier_count
return outliers
def check_duplicates(self, subset: List[str] = None) -> int:
"""检查重复值"""
return self.df.duplicated(subset=subset).sum()
def check_price_consistency(self, price_cols: List[str] = None) -> Dict:
"""检查价格一致性"""
price_cols = price_cols or ['open', 'high', 'low', 'close']
issues = []
if all(col in self.df.columns for col in price_cols):
high_violations = (self.df['high'] < self.df['low']).sum()
if high_violations > 0:
issues.append(f"High < Low in {high_violations} rows")
for col in ['open', 'close']:
violations = ((self.df[col] < self.df['low']) |
(self.df[col] > self.df['high'])).sum()
if violations > 0:
issues.append(f"{col} outside [low, high] in {violations} rows")
return {"issues": issues}
class DataCleaner:
"""数据清洗器"""
@staticmethod
def fill_missing_prices(df: pd.DataFrame, method: str = 'ffill') -> pd.DataFrame:
"""填充缺失价格"""
price_cols = ['open', 'high', 'low', 'close']
for col in price_cols:
if col in df.columns:
if method == 'ffill':
df[col] = df[col].ffill()
elif method == 'interpolate':
df[col] = df[col].interpolate()
return df
@staticmethod
def adjust_for_splits(df: pd.DataFrame, split_ratio: float) -> pd.DataFrame:
"""调整股票分割"""
price_cols = ['open', 'high', 'low', 'close']
for col in price_cols:
if col in df.columns:
df[col] = df[col] / split_ratio
if 'volume' in df.columns:
df['volume'] = df['volume'] * split_ratio
return df
@staticmethod
def remove_outliers(df: pd.DataFrame, column: str, method: str = 'iqr') -> pd.DataFrame:
"""移除异常值"""
data = df[column]
if method == 'iqr':
Q1, Q3 = data.quantile([0.25, 0.75])
IQR = Q3 - Q1
mask = (data >= Q1 - 1.5 * IQR) & (data <= Q3 + 1.5 * IQR)
elif method == 'zscore':
z_scores = np.abs((data - data.mean()) / data.std())
mask = z_scores <= 3
return df[mask]51.7.2 策略回测优化
python
from typing import Dict, List, Tuple, Optional
from dataclasses import dataclass, field
import pandas as pd
import numpy as np
@dataclass
class BacktestResult:
"""回测结果"""
total_return: float
annual_return: float
max_drawdown: float
sharpe_ratio: float
win_rate: float
trades: List[Dict] = field(default_factory=list)
class BacktestEngine:
"""回测引擎"""
def __init__(
self,
initial_capital: float = 100000,
commission: float = 0.0003,
slippage: float = 0.0001
):
self.initial_capital = initial_capital
self.commission = commission
self.slippage = slippage
def run(
self,
data: pd.DataFrame,
signals: pd.Series,
position_size: float = 0.95
) -> BacktestResult:
"""运行回测"""
capital = self.initial_capital
position = 0
trades = []
equity_curve = [capital]
for i in range(len(data)):
price = data['close'].iloc[i]
signal = signals.iloc[i]
if signal == 1 and position == 0:
shares = int(capital * position_size / price)
cost = shares * price * (1 + self.commission + self.slippage)
if cost <= capital:
position = shares
capital -= cost
trades.append({
"type": "buy",
"price": price,
"shares": shares,
"date": data.index[i]
})
elif signal == -1 and position > 0:
revenue = position * price * (1 - self.commission - self.slippage)
capital += revenue
trades.append({
"type": "sell",
"price": price,
"shares": position,
"date": data.index[i]
})
position = 0
equity = capital + position * price
equity_curve.append(equity)
final_equity = capital + position * data['close'].iloc[-1]
total_return = (final_equity - self.initial_capital) / self.initial_capital
equity_series = pd.Series(equity_curve)
returns = equity_series.pct_change().dropna()
sharpe = np.sqrt(252) * returns.mean() / returns.std() if returns.std() > 0 else 0
cummax = equity_series.cummax()
drawdown = (equity_series - cummax) / cummax
max_drawdown = drawdown.min()
winning_trades = [t for t in trades[1::2] if len(trades) > 1]
win_rate = len([t for t in winning_trades if t.get('profit', 0) > 0]) / len(winning_trades) if winning_trades else 0
return BacktestResult(
total_return=total_return,
annual_return=(1 + total_return) ** (252 / len(data)) - 1,
max_drawdown=max_drawdown,
sharpe_ratio=sharpe,
win_rate=win_rate,
trades=trades
)
class StrategyOptimizer:
"""策略优化器"""
def __init__(self, data: pd.DataFrame, strategy_class):
self.data = data
self.strategy_class = strategy_class
def grid_search(
self,
param_grid: Dict[str, List]
) -> List[Dict]:
"""网格搜索优化"""
from itertools import product
results = []
param_names = list(param_grid.keys())
param_values = list(param_grid.values())
for combo in product(*param_values):
params = dict(zip(param_names, combo))
strategy = self.strategy_class(**params)
signals = strategy.generate_signals(self.data)
engine = BacktestEngine()
result = engine.run(self.data, signals)
results.append({
"params": params,
"sharpe": result.sharpe_ratio,
"return": result.total_return,
"drawdown": result.max_drawdown
})
return sorted(results, key=lambda x: x['sharpe'], reverse=True)51.8 本章小结
本章详细介绍了Python金融科技的核心概念和实践:
- 金融数据获取:行情数据、Tick数据、订单簿、财务数据
- 技术分析:技术指标、K线形态识别
- 交易策略:策略框架、MA交叉、RSI策略
- 风险管理:VaR、CVaR、仓位管理、止损管理
练习题
- 实现一个完整的数据获取系统,支持多数据源
- 开发一个技术指标计算库,支持常用指标
- 实现一个回测框架,支持多策略组合
- 开发一个风险管理系统,支持实时监控
- 实现一个简单的量化交易平台,支持模拟交易