Skip to content

第51章 金融科技

学习目标

完成本章学习后,你将能够:

  1. 获取金融数据:股票行情、财务数据、宏观经济数据
  2. 进行数据分析:技术指标、统计分析、时间序列分析
  3. 构建交易策略:策略开发、回测框架、参数优化
  4. 实现风险管理:风险度量、仓位管理、止损止盈
  5. 开发交易系统:订单管理、交易接口、实时监控
  6. 进行量化研究:因子分析、组合优化、绩效评估
  7. 处理高频数据:Tick数据、订单簿、实时计算
  8. 构建风控系统:实时监控、预警机制、合规检查

51.1 金融数据获取

51.1.1 数据源接口

python
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any, Tuple
from datetime import datetime, date
from enum import Enum
import json
import time


class AssetType(Enum):
    STOCK = "stock"
    BOND = "bond"
    FUTURE = "future"
    OPTION = "option"
    FOREX = "forex"
    CRYPTO = "crypto"
    FUND = "fund"


class Market(Enum):
    CN_SH = "sh"
    CN_SZ = "sz"
    US_NYSE = "nyse"
    US_NASDAQ = "nasdaq"
    HK = "hk"


@dataclass
class Bar:
    symbol: str
    timestamp: datetime
    open: float
    high: float
    low: float
    close: float
    volume: int
    amount: float = 0.0

    @property
    def range(self) -> float:
        return self.high - self.low

    @property
    def body(self) -> float:
        return abs(self.close - self.open)

    @property
    def is_bullish(self) -> bool:
        return self.close > self.open


@dataclass
class Tick:
    symbol: str
    timestamp: datetime
    price: float
    volume: int
    bid: float = 0.0
    ask: float = 0.0
    bid_volume: int = 0
    ask_volume: int = 0

    @property
    def spread(self) -> float:
        return self.ask - self.bid if self.ask and self.bid else 0.0

    @property
    def mid_price(self) -> float:
        return (self.bid + self.ask) / 2 if self.bid and self.ask else self.price


@dataclass
class OrderBook:
    symbol: str
    timestamp: datetime
    bids: List[Tuple[float, int]]
    asks: List[Tuple[float, int]]

    @property
    def best_bid(self) -> Tuple[float, int]:
        return self.bids[0] if self.bids else (0.0, 0)

    @property
    def best_ask(self) -> Tuple[float, int]:
        return self.asks[0] if self.asks else (0.0, 0)

    @property
    def spread(self) -> float:
        return self.best_ask[0] - self.best_bid[0]

    @property
    def mid_price(self) -> float:
        return (self.best_bid[0] + self.best_ask[0]) / 2


@dataclass
class FinancialData:
    symbol: str
    date: date
    revenue: float = 0.0
    net_income: float = 0.0
    eps: float = 0.0
    pe_ratio: float = 0.0
    pb_ratio: float = 0.0
    roe: float = 0.0
    debt_ratio: float = 0.0
    current_ratio: float = 0.0
    dividend_yield: float = 0.0


class MarketDataAPI:
    def __init__(self, api_key: str = "", base_url: str = ""):
        self.api_key = api_key
        self.base_url = base_url
        self._cache: Dict[str, Any] = {}
        self._cache_time: Dict[str, float] = {}
        self.cache_ttl = 60

    def _get_cache(self, key: str) -> Optional[Any]:
        if key in self._cache:
            if time.time() - self._cache_time.get(key, 0) < self.cache_ttl:
                return self._cache[key]
        return None

    def _set_cache(self, key: str, value: Any) -> None:
        self._cache[key] = value
        self._cache_time[key] = time.time()

    def get_bars(
        self,
        symbol: str,
        start_date: date,
        end_date: date,
        frequency: str = "1d"
    ) -> List[Bar]:
        bars = []
        current = datetime.combine(start_date, datetime.min.time())

        for i in range(100):
            bar = Bar(
                symbol=symbol,
                timestamp=current,
                open=100.0 + i * 0.5,
                high=101.0 + i * 0.5,
                low=99.0 + i * 0.5,
                close=100.5 + i * 0.5,
                volume=1000000 + i * 10000
            )
            bars.append(bar)

        return bars

    def get_ticks(
        self,
        symbol: str,
        start_time: datetime,
        end_time: datetime
    ) -> List[Tick]:
        ticks = []
        current = start_time

        while current < end_time:
            tick = Tick(
                symbol=symbol,
                timestamp=current,
                price=100.0,
                volume=100,
                bid=99.9,
                ask=100.1
            )
            ticks.append(tick)
            current = datetime.fromtimestamp(current.timestamp() + 1)

        return ticks

    def get_order_book(self, symbol: str) -> OrderBook:
        return OrderBook(
            symbol=symbol,
            timestamp=datetime.now(),
            bids=[
                (100.0, 1000),
                (99.9, 2000),
                (99.8, 3000)
            ],
            asks=[
                (100.1, 1000),
                (100.2, 2000),
                (100.3, 3000)
            ]
        )

    def get_quote(self, symbol: str) -> Dict:
        return {
            "symbol": symbol,
            "price": 100.0,
            "change": 1.0,
            "change_percent": 1.0,
            "volume": 1000000,
            "turnover": 100000000,
            "high": 101.0,
            "low": 99.0,
            "open": 99.5,
            "prev_close": 99.0,
            "timestamp": datetime.now()
        }

    def get_financial_data(self, symbol: str) -> FinancialData:
        return FinancialData(
            symbol=symbol,
            date=date.today(),
            revenue=1000000000,
            net_income=100000000,
            eps=1.0,
            pe_ratio=20.0,
            pb_ratio=2.0,
            roe=0.15,
            debt_ratio=0.3,
            current_ratio=1.5,
            dividend_yield=0.02
        )

    def get_symbols(self, market: Market) -> List[str]:
        return ["AAPL", "GOOGL", "MSFT", "AMZN", "META"]

51.1.2 数据存储

python
import sqlite3
from typing import List, Optional
from datetime import datetime, date


class DataStorage:
    def __init__(self, db_path: str = "market_data.db"):
        self.db_path = db_path
        self._init_db()

    def _init_db(self) -> None:
        with sqlite3.connect(self.db_path) as conn:
            conn.execute("""
                CREATE TABLE IF NOT EXISTS bars (
                    symbol TEXT,
                    timestamp DATETIME,
                    open REAL,
                    high REAL,
                    low REAL,
                    close REAL,
                    volume INTEGER,
                    amount REAL,
                    PRIMARY KEY (symbol, timestamp)
                )
            """)

            conn.execute("""
                CREATE TABLE IF NOT EXISTS ticks (
                    symbol TEXT,
                    timestamp DATETIME,
                    price REAL,
                    volume INTEGER,
                    bid REAL,
                    ask REAL,
                    PRIMARY KEY (symbol, timestamp)
                )
            """)

    def save_bars(self, bars: List[Bar]) -> None:
        with sqlite3.connect(self.db_path) as conn:
            for bar in bars:
                conn.execute("""
                    INSERT OR REPLACE INTO bars
                    (symbol, timestamp, open, high, low, close, volume, amount)
                    VALUES (?, ?, ?, ?, ?, ?, ?, ?)
                """, (
                    bar.symbol,
                    bar.timestamp.isoformat(),
                    bar.open,
                    bar.high,
                    bar.low,
                    bar.close,
                    bar.volume,
                    bar.amount
                ))

    def load_bars(
        self,
        symbol: str,
        start_date: date,
        end_date: date
    ) -> List[Bar]:
        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.execute("""
                SELECT symbol, timestamp, open, high, low, close, volume, amount
                FROM bars
                WHERE symbol = ? AND timestamp >= ? AND timestamp <= ?
                ORDER BY timestamp
            """, (
                symbol,
                start_date.isoformat(),
                end_date.isoformat()
            ))

            bars = []
            for row in cursor:
                bars.append(Bar(
                    symbol=row[0],
                    timestamp=datetime.fromisoformat(row[1]),
                    open=row[2],
                    high=row[3],
                    low=row[4],
                    close=row[5],
                    volume=row[6],
                    amount=row[7]
                ))

            return bars

    def save_ticks(self, ticks: List[Tick]) -> None:
        with sqlite3.connect(self.db_path) as conn:
            for tick in ticks:
                conn.execute("""
                    INSERT OR REPLACE INTO ticks
                    (symbol, timestamp, price, volume, bid, ask)
                    VALUES (?, ?, ?, ?, ?, ?)
                """, (
                    tick.symbol,
                    tick.timestamp.isoformat(),
                    tick.price,
                    tick.volume,
                    tick.bid,
                    tick.ask
                ))

51.2 技术分析

51.2.1 技术指标

python
from typing import List, Tuple
import math


class TechnicalIndicators:
    @staticmethod
    def sma(prices: List[float], period: int) -> List[float]:
        result = []

        for i in range(len(prices)):
            if i < period - 1:
                result.append(None)
            else:
                avg = sum(prices[i - period + 1:i + 1]) / period
                result.append(avg)

        return result

    @staticmethod
    def ema(prices: List[float], period: int) -> List[float]:
        result = []
        multiplier = 2 / (period + 1)

        for i in range(len(prices)):
            if i == 0:
                result.append(prices[0])
            else:
                ema = (prices[i] - result[-1]) * multiplier + result[-1]
                result.append(ema)

        return result

    @staticmethod
    def rsi(prices: List[float], period: int = 14) -> List[float]:
        result = []

        gains = []
        losses = []

        for i in range(1, len(prices)):
            change = prices[i] - prices[i - 1]
            gains.append(max(0, change))
            losses.append(max(0, -change))

        for i in range(len(prices)):
            if i < period:
                result.append(None)
            else:
                avg_gain = sum(gains[i - period:i]) / period
                avg_loss = sum(losses[i - period:i]) / period

                if avg_loss == 0:
                    result.append(100)
                else:
                    rs = avg_gain / avg_loss
                    rsi = 100 - (100 / (1 + rs))
                    result.append(rsi)

        return result

    @staticmethod
    def macd(
        prices: List[float],
        fast_period: int = 12,
        slow_period: int = 26,
        signal_period: int = 9
    ) -> Tuple[List[float], List[float], List[float]]:
        fast_ema = TechnicalIndicators.ema(prices, fast_period)
        slow_ema = TechnicalIndicators.ema(prices, slow_period)

        macd_line = [
            fast - slow if fast and slow else None
            for fast, slow in zip(fast_ema, slow_ema)
        ]

        valid_macd = [m for m in macd_line if m is not None]
        signal_line = [None] * (len(prices) - len(valid_macd))
        signal_line.extend(TechnicalIndicators.ema(valid_macd, signal_period))

        histogram = [
            macd - signal if macd and signal else None
            for macd, signal in zip(macd_line, signal_line)
        ]

        return macd_line, signal_line, histogram

    @staticmethod
    def bollinger_bands(
        prices: List[float],
        period: int = 20,
        std_dev: float = 2.0
    ) -> Tuple[List[float], List[float], List[float]]:
        middle = TechnicalIndicators.sma(prices, period)

        upper = []
        lower = []

        for i in range(len(prices)):
            if i < period - 1:
                upper.append(None)
                lower.append(None)
            else:
                window = prices[i - period + 1:i + 1]
                mean = middle[i]
                variance = sum((p - mean) ** 2 for p in window) / period
                std = math.sqrt(variance)

                upper.append(mean + std_dev * std)
                lower.append(mean - std_dev * std)

        return upper, middle, lower

    @staticmethod
    def atr(
        highs: List[float],
        lows: List[float],
        closes: List[float],
        period: int = 14
    ) -> List[float]:
        true_ranges = []

        for i in range(len(highs)):
            if i == 0:
                tr = highs[i] - lows[i]
            else:
                tr = max(
                    highs[i] - lows[i],
                    abs(highs[i] - closes[i - 1]),
                    abs(lows[i] - closes[i - 1])
                )
            true_ranges.append(tr)

        atr = TechnicalIndicators.sma(true_ranges, period)
        return atr

    @staticmethod
    def stochastic(
        highs: List[float],
        lows: List[float],
        closes: List[float],
        k_period: int = 14,
        d_period: int = 3
    ) -> Tuple[List[float], List[float]]:
        k_values = []

        for i in range(len(closes)):
            if i < k_period - 1:
                k_values.append(None)
            else:
                highest = max(highs[i - k_period + 1:i + 1])
                lowest = min(lows[i - k_period + 1:i + 1])

                if highest == lowest:
                    k_values.append(50)
                else:
                    k = 100 * (closes[i] - lowest) / (highest - lowest)
                    k_values.append(k)

        d_values = TechnicalIndicators.sma(
            [k for k in k_values if k is not None],
            d_period
        )

        d_full = [None] * (k_period - 1)
        d_full.extend(d_values)

        return k_values, d_full

    @staticmethod
    def obv(closes: List[float], volumes: List[int]) -> List[float]:
        obv = [0]

        for i in range(1, len(closes)):
            if closes[i] > closes[i - 1]:
                obv.append(obv[-1] + volumes[i])
            elif closes[i] < closes[i - 1]:
                obv.append(obv[-1] - volumes[i])
            else:
                obv.append(obv[-1])

        return obv

    @staticmethod
    def vwap(
        highs: List[float],
        lows: List[float],
        closes: List[float],
        volumes: List[int]
    ) -> List[float]:
        vwap_values = []
        cumulative_tp_volume = 0
        cumulative_volume = 0

        for i in range(len(closes)):
            tp = (highs[i] + lows[i] + closes[i]) / 3
            cumulative_tp_volume += tp * volumes[i]
            cumulative_volume += volumes[i]

            if cumulative_volume > 0:
                vwap_values.append(cumulative_tp_volume / cumulative_volume)
            else:
                vwap_values.append(tp)

        return vwap_values

51.2.2 K线形态识别

python
from typing import List, Optional
from dataclasses import dataclass


@dataclass
class Pattern:
    name: str
    index: int
    direction: str
    confidence: float


class CandlestickPatterns:
    def __init__(self, bars: List[Bar]):
        self.bars = bars

    def _body_size(self, bar: Bar) -> float:
        return abs(bar.close - bar.open)

    def _upper_shadow(self, bar: Bar) -> float:
        return bar.high - max(bar.open, bar.close)

    def _lower_shadow(self, bar: Bar) -> float:
        return min(bar.open, bar.close) - bar.low

    def _is_bullish(self, bar: Bar) -> bool:
        return bar.close > bar.open

    def _is_bearish(self, bar: Bar) -> bool:
        return bar.close < bar.open

    def doji(self, index: int) -> Optional[Pattern]:
        bar = self.bars[index]
        body = self._body_size(bar)
        total_range = bar.range

        if total_range > 0 and body / total_range < 0.1:
            return Pattern(
                name="Doji",
                index=index,
                direction="neutral",
                confidence=0.7
            )
        return None

    def hammer(self, index: int) -> Optional[Pattern]:
        bar = self.bars[index]
        body = self._body_size(bar)
        lower_shadow = self._lower_shadow(bar)
        upper_shadow = self._upper_shadow(bar)

        if (
            lower_shadow > body * 2 and
            upper_shadow < body * 0.5 and
            self._is_bullish(bar)
        ):
            return Pattern(
                name="Hammer",
                index=index,
                direction="bullish",
                confidence=0.8
            )
        return None

    def shooting_star(self, index: int) -> Optional[Pattern]:
        bar = self.bars[index]
        body = self._body_size(bar)
        lower_shadow = self._lower_shadow(bar)
        upper_shadow = self._upper_shadow(bar)

        if (
            upper_shadow > body * 2 and
            lower_shadow < body * 0.5 and
            self._is_bearish(bar)
        ):
            return Pattern(
                name="Shooting Star",
                index=index,
                direction="bearish",
                confidence=0.8
            )
        return None

    def engulfing(self, index: int) -> Optional[Pattern]:
        if index < 1:
            return None

        current = self.bars[index]
        previous = self.bars[index - 1]

        if (
            self._is_bullish(current) and
            self._is_bearish(previous) and
            current.open < previous.close and
            current.close > previous.open
        ):
            return Pattern(
                name="Bullish Engulfing",
                index=index,
                direction="bullish",
                confidence=0.85
            )

        if (
            self._is_bearish(current) and
            self._is_bullish(previous) and
            current.open > previous.close and
            current.close < previous.open
        ):
            return Pattern(
                name="Bearish Engulfing",
                index=index,
                direction="bearish",
                confidence=0.85
            )

        return None

    def morning_star(self, index: int) -> Optional[Pattern]:
        if index < 2:
            return None

        first = self.bars[index - 2]
        second = self.bars[index - 1]
        third = self.bars[index]

        if (
            self._is_bearish(first) and
            self._body_size(second) < self._body_size(first) * 0.3 and
            self._is_bullish(third) and
            third.close > (first.open + first.close) / 2
        ):
            return Pattern(
                name="Morning Star",
                index=index,
                direction="bullish",
                confidence=0.9
            )
        return None

    def evening_star(self, index: int) -> Optional[Pattern]:
        if index < 2:
            return None

        first = self.bars[index - 2]
        second = self.bars[index - 1]
        third = self.bars[index]

        if (
            self._is_bullish(first) and
            self._body_size(second) < self._body_size(first) * 0.3 and
            self._is_bearish(third) and
            third.close < (first.open + first.close) / 2
        ):
            return Pattern(
                name="Evening Star",
                index=index,
                direction="bearish",
                confidence=0.9
            )
        return None

    def detect_all(self) -> List[Pattern]:
        patterns = []

        for i in range(len(self.bars)):
            for detector in [
                self.doji,
                self.hammer,
                self.shooting_star,
                self.engulfing,
                self.morning_star,
                self.evening_star
            ]:
                pattern = detector(i)
                if pattern:
                    patterns.append(pattern)

        return patterns

51.3 交易策略

51.3.1 策略框架

python
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any
from enum import Enum


class OrderSide(Enum):
    BUY = "buy"
    SELL = "sell"


class OrderType(Enum):
    MARKET = "market"
    LIMIT = "limit"
    STOP = "stop"
    STOP_LIMIT = "stop_limit"


class OrderStatus(Enum):
    PENDING = "pending"
    FILLED = "filled"
    CANCELLED = "cancelled"
    REJECTED = "rejected"


@dataclass
class Order:
    symbol: str
    side: OrderSide
    quantity: int
    order_type: OrderType
    price: float = 0.0
    stop_price: float = 0.0
    status: OrderStatus = OrderStatus.PENDING
    filled_quantity: int = 0
    filled_price: float = 0.0
    created_at: datetime = field(default_factory=datetime.now)
    filled_at: Optional[datetime] = None

    @property
    def is_filled(self) -> bool:
        return self.status == OrderStatus.FILLED

    @property
    def remaining_quantity(self) -> int:
        return self.quantity - self.filled_quantity


@dataclass
class Position:
    symbol: str
    quantity: int
    avg_cost: float
    current_price: float = 0.0

    @property
    def market_value(self) -> float:
        return self.quantity * self.current_price

    @property
    def cost_basis(self) -> float:
        return self.quantity * self.avg_cost

    @property
    def unrealized_pnl(self) -> float:
        return self.market_value - self.cost_basis

    @property
    def unrealized_pnl_percent(self) -> float:
        if self.cost_basis == 0:
            return 0.0
        return self.unrealized_pnl / self.cost_basis * 100


@dataclass
class Portfolio:
    cash: float
    positions: Dict[str, Position] = field(default_factory=dict)

    @property
    def total_value(self) -> float:
        return self.cash + sum(p.market_value for p in self.positions.values())

    def get_position(self, symbol: str) -> Optional[Position]:
        return self.positions.get(symbol)

    def has_position(self, symbol: str) -> bool:
        return symbol in self.positions and self.positions[symbol].quantity > 0


class Strategy(ABC):
    def __init__(self, name: str):
        self.name = name
        self.portfolio: Optional[Portfolio] = None
        self.data: Dict[str, List[Bar]] = {}

    def set_portfolio(self, portfolio: Portfolio) -> None:
        self.portfolio = portfolio

    def set_data(self, symbol: str, bars: List[Bar]) -> None:
        self.data[symbol] = bars

    @abstractmethod
    def on_bar(self, bar: Bar) -> List[Order]:
        pass

    @abstractmethod
    def on_tick(self, tick: Tick) -> List[Order]:
        pass

    def buy(
        self,
        symbol: str,
        quantity: int,
        order_type: OrderType = OrderType.MARKET,
        price: float = 0.0
    ) -> Order:
        return Order(
            symbol=symbol,
            side=OrderSide.BUY,
            quantity=quantity,
            order_type=order_type,
            price=price
        )

    def sell(
        self,
        symbol: str,
        quantity: int,
        order_type: OrderType = OrderType.MARKET,
        price: float = 0.0
    ) -> Order:
        return Order(
            symbol=symbol,
            side=OrderSide.SELL,
            quantity=quantity,
            order_type=order_type,
            price=price
        )


class MovingAverageCrossover(Strategy):
    def __init__(
        self,
        fast_period: int = 10,
        slow_period: int = 30
    ):
        super().__init__("MA_Crossover")
        self.fast_period = fast_period
        self.slow_period = slow_period
        self._prev_fast = None
        self._prev_slow = None

    def on_bar(self, bar: Bar) -> List[Order]:
        orders = []

        bars = self.data.get(bar.symbol, [])
        if len(bars) < self.slow_period:
            return orders

        closes = [b.close for b in bars]
        fast_ma = TechnicalIndicators.sma(closes, self.fast_period)[-1]
        slow_ma = TechnicalIndicators.sma(closes, self.slow_period)[-1]

        if self._prev_fast and self._prev_slow:
            if self._prev_fast <= self._prev_slow and fast_ma > slow_ma:
                if not self.portfolio.has_position(bar.symbol):
                    quantity = int(self.portfolio.cash * 0.95 / bar.close)
                    orders.append(self.buy(bar.symbol, quantity))

            elif self._prev_fast >= self._prev_slow and fast_ma < slow_ma:
                position = self.portfolio.get_position(bar.symbol)
                if position and position.quantity > 0:
                    orders.append(self.sell(bar.symbol, position.quantity))

        self._prev_fast = fast_ma
        self._prev_slow = slow_ma

        return orders

    def on_tick(self, tick: Tick) -> List[Order]:
        return []


class RSIStrategy(Strategy):
    def __init__(
        self,
        period: int = 14,
        oversold: float = 30,
        overbought: float = 70
    ):
        super().__init__("RSI_Strategy")
        self.period = period
        self.oversold = oversold
        self.overbought = overbought

    def on_bar(self, bar: Bar) -> List[Order]:
        orders = []

        bars = self.data.get(bar.symbol, [])
        if len(bars) < self.period + 1:
            return orders

        closes = [b.close for b in bars]
        rsi_values = TechnicalIndicators.rsi(closes, self.period)
        current_rsi = rsi_values[-1]

        if current_rsi < self.oversold:
            if not self.portfolio.has_position(bar.symbol):
                quantity = int(self.portfolio.cash * 0.95 / bar.close)
                orders.append(self.buy(bar.symbol, quantity))

        elif current_rsi > self.overbought:
            position = self.portfolio.get_position(bar.symbol)
            if position and position.quantity > 0:
                orders.append(self.sell(bar.symbol, position.quantity))

        return orders

    def on_tick(self, tick: Tick) -> List[Order]:
        return []

51.3.2 回测引擎

python
from dataclasses import dataclass
from typing import List, Dict, Optional
import math


@dataclass
class Trade:
    symbol: str
    side: OrderSide
    quantity: int
    price: float
    timestamp: datetime
    commission: float = 0.0

    @property
    def value(self) -> float:
        return self.quantity * self.price


@dataclass
class BacktestResult:
    initial_capital: float
    final_capital: float
    total_return: float
    annual_return: float
    max_drawdown: float
    sharpe_ratio: float
    win_rate: float
    total_trades: int
    trades: List[Trade]


class BacktestEngine:
    def __init__(
        self,
        initial_capital: float = 100000,
        commission_rate: float = 0.001
    ):
        self.initial_capital = initial_capital
        self.commission_rate = commission_rate
        self.portfolio = Portfolio(cash=initial_capital)
        self.trades: List[Trade] = []
        self.equity_curve: List[float] = []

    def run(
        self,
        strategy: Strategy,
        data: Dict[str, List[Bar]]
    ) -> BacktestResult:
        strategy.set_portfolio(self.portfolio)

        symbols = list(data.keys())
        all_timestamps = set()

        for symbol, bars in data.items():
            for bar in bars:
                all_timestamps.add(bar.timestamp)

        sorted_timestamps = sorted(all_timestamps)

        for timestamp in sorted_timestamps:
            for symbol in symbols:
                bars = data.get(symbol, [])
                bar_at_time = next(
                    (b for b in bars if b.timestamp == timestamp),
                    None
                )

                if bar_at_time:
                    strategy.set_data(symbol, [b for b in bars if b.timestamp <= timestamp])
                    orders = strategy.on_bar(bar_at_time)

                    for order in orders:
                        self._execute_order(order, bar_at_time)

            self._update_portfolio_prices(data, timestamp)
            self.equity_curve.append(self.portfolio.total_value)

        return self._calculate_result()

    def _execute_order(self, order: Order, bar: Bar) -> None:
        if order.side == OrderSide.BUY:
            cost = order.quantity * bar.close
            commission = cost * self.commission_rate

            if self.portfolio.cash >= cost + commission:
                self.portfolio.cash -= (cost + commission)

                position = self.portfolio.get_position(order.symbol)
                if position:
                    total_cost = position.cost_basis + cost
                    total_quantity = position.quantity + order.quantity
                    position.avg_cost = total_cost / total_quantity
                    position.quantity = total_quantity
                else:
                    self.portfolio.positions[order.symbol] = Position(
                        symbol=order.symbol,
                        quantity=order.quantity,
                        avg_cost=bar.close
                    )

                self.trades.append(Trade(
                    symbol=order.symbol,
                    side=order.side,
                    quantity=order.quantity,
                    price=bar.close,
                    timestamp=bar.timestamp,
                    commission=commission
                ))

        elif order.side == OrderSide.SELL:
            position = self.portfolio.get_position(order.symbol)

            if position and position.quantity >= order.quantity:
                revenue = order.quantity * bar.close
                commission = revenue * self.commission_rate

                self.portfolio.cash += (revenue - commission)
                position.quantity -= order.quantity

                if position.quantity == 0:
                    del self.portfolio.positions[order.symbol]

                self.trades.append(Trade(
                    symbol=order.symbol,
                    side=order.side,
                    quantity=order.quantity,
                    price=bar.close,
                    timestamp=bar.timestamp,
                    commission=commission
                ))

    def _update_portfolio_prices(
        self,
        data: Dict[str, List[Bar]],
        timestamp: datetime
    ) -> None:
        for symbol, position in self.portfolio.positions.items():
            bars = data.get(symbol, [])
            bar = next(
                (b for b in bars if b.timestamp == timestamp),
                None
            )
            if bar:
                position.current_price = bar.close

    def _calculate_result(self) -> BacktestResult:
        final_capital = self.portfolio.total_value
        total_return = (final_capital - self.initial_capital) / self.initial_capital

        trading_days = len(self.equity_curve)
        annual_return = (1 + total_return) ** (252 / trading_days) - 1 if trading_days > 0 else 0

        max_drawdown = self._calculate_max_drawdown()
        sharpe_ratio = self._calculate_sharpe_ratio()
        win_rate = self._calculate_win_rate()

        return BacktestResult(
            initial_capital=self.initial_capital,
            final_capital=final_capital,
            total_return=total_return,
            annual_return=annual_return,
            max_drawdown=max_drawdown,
            sharpe_ratio=sharpe_ratio,
            win_rate=win_rate,
            total_trades=len(self.trades),
            trades=self.trades
        )

    def _calculate_max_drawdown(self) -> float:
        if not self.equity_curve:
            return 0.0

        peak = self.equity_curve[0]
        max_dd = 0.0

        for value in self.equity_curve:
            if value > peak:
                peak = value

            dd = (peak - value) / peak
            if dd > max_dd:
                max_dd = dd

        return max_dd

    def _calculate_sharpe_ratio(self, risk_free_rate: float = 0.02) -> float:
        if len(self.equity_curve) < 2:
            return 0.0

        returns = []
        for i in range(1, len(self.equity_curve)):
            r = (self.equity_curve[i] - self.equity_curve[i - 1]) / self.equity_curve[i - 1]
            returns.append(r)

        if not returns:
            return 0.0

        avg_return = sum(returns) / len(returns)
        variance = sum((r - avg_return) ** 2 for r in returns) / len(returns)
        std_return = math.sqrt(variance)

        if std_return == 0:
            return 0.0

        daily_rf = risk_free_rate / 252
        excess_return = avg_return - daily_rf

        return excess_return / std_return * math.sqrt(252)

    def _calculate_win_rate(self) -> float:
        if not self.trades:
            return 0.0

        wins = 0
        total_pairs = 0

        buy_trades: Dict[str, List[Trade]] = {}

        for trade in self.trades:
            if trade.side == OrderSide.BUY:
                if trade.symbol not in buy_trades:
                    buy_trades[trade.symbol] = []
                buy_trades[trade.symbol].append(trade)
            else:
                buys = buy_trades.get(trade.symbol, [])
                if buys:
                    buy = buys.pop(0)
                    if trade.price > buy.price:
                        wins += 1
                    total_pairs += 1

        return wins / total_pairs if total_pairs > 0 else 0.0

51.4 风险管理

51.4.1 风险度量

python
from dataclasses import dataclass
from typing import List, Dict, Optional
import math


@dataclass
class RiskMetrics:
    var_95: float
    var_99: float
    cvar_95: float
    max_drawdown: float
    volatility: float
    beta: float


class RiskManager:
    def __init__(self, portfolio: Portfolio):
        self.portfolio = portfolio
        self.returns_history: List[float] = []

    def update_returns(self, returns: float) -> None:
        self.returns_history.append(returns)

    def calculate_var(self, confidence: float = 0.95) -> float:
        if not self.returns_history:
            return 0.0

        sorted_returns = sorted(self.returns_history)
        index = int((1 - confidence) * len(sorted_returns))

        var = -sorted_returns[index] * self.portfolio.total_value
        return var

    def calculate_cvar(self, confidence: float = 0.95) -> float:
        if not self.returns_history:
            return 0.0

        sorted_returns = sorted(self.returns_history)
        index = int((1 - confidence) * len(sorted_returns))

        tail_returns = sorted_returns[:index]
        if not tail_returns:
            return 0.0

        cvar = -sum(tail_returns) / len(tail_returns) * self.portfolio.total_value
        return cvar

    def calculate_volatility(self, annualize: bool = True) -> float:
        if len(self.returns_history) < 2:
            return 0.0

        mean = sum(self.returns_history) / len(self.returns_history)
        variance = sum((r - mean) ** 2 for r in self.returns_history) / len(self.returns_history)
        volatility = math.sqrt(variance)

        if annualize:
            volatility *= math.sqrt(252)

        return volatility

    def calculate_beta(
        self,
        market_returns: List[float]
    ) -> float:
        if len(self.returns_history) < 2 or len(market_returns) < 2:
            return 1.0

        min_len = min(len(self.returns_history), len(market_returns))
        portfolio_returns = self.returns_history[-min_len:]
        market = market_returns[-min_len:]

        portfolio_mean = sum(portfolio_returns) / len(portfolio_returns)
        market_mean = sum(market) / len(market)

        covariance = sum(
            (p - portfolio_mean) * (m - market_mean)
            for p, m in zip(portfolio_returns, market)
        ) / len(portfolio_returns)

        market_variance = sum(
            (m - market_mean) ** 2
            for m in market
        ) / len(market)

        if market_variance == 0:
            return 1.0

        return covariance / market_variance

    def get_risk_metrics(
        self,
        market_returns: List[float] = None
    ) -> RiskMetrics:
        return RiskMetrics(
            var_95=self.calculate_var(0.95),
            var_99=self.calculate_var(0.99),
            cvar_95=self.calculate_cvar(0.95),
            max_drawdown=self._calculate_max_drawdown(),
            volatility=self.calculate_volatility(),
            beta=self.calculate_beta(market_returns) if market_returns else 1.0
        )

    def _calculate_max_drawdown(self) -> float:
        if not self.returns_history:
            return 0.0

        cumulative = 1.0
        peak = 1.0
        max_dd = 0.0

        for r in self.returns_history:
            cumulative *= (1 + r)

            if cumulative > peak:
                peak = cumulative

            dd = (peak - cumulative) / peak
            if dd > max_dd:
                max_dd = dd

        return max_dd


class PositionSizer:
    @staticmethod
    def fixed_amount(
        capital: float,
        amount: float
    ) -> int:
        return int(amount)

    @staticmethod
    def fixed_fraction(
        capital: float,
        fraction: float,
        price: float
    ) -> int:
        return int(capital * fraction / price)

    @staticmethod
    def kelly_criterion(
        win_rate: float,
        avg_win: float,
        avg_loss: float,
        capital: float,
        price: float
    ) -> int:
        if avg_loss == 0:
            return 0

        b = avg_win / avg_loss
        q = 1 - win_rate

        kelly = (win_rate * b - q) / b

        kelly = max(0, min(kelly, 0.25))

        return int(capital * kelly / price)

    @staticmethod
    def risk_parity(
        volatilities: Dict[str, float],
        capital: float,
        prices: Dict[str, float]
    ) -> Dict[str, int]:
        inv_vols = {
            symbol: 1 / vol if vol > 0 else 0
            for symbol, vol in volatilities.items()
        }

        total_inv_vol = sum(inv_vols.values())

        if total_inv_vol == 0:
            return {symbol: 0 for symbol in volatilities}

        weights = {
            symbol: inv_vol / total_inv_vol
            for symbol, inv_vol in inv_vols.items()
        }

        quantities = {}
        for symbol, weight in weights.items():
            price = prices.get(symbol, 1)
            quantities[symbol] = int(capital * weight / price)

        return quantities


class StopLossManager:
    def __init__(self):
        self.stops: Dict[str, Dict] = {}

    def set_stop_loss(
        self,
        symbol: str,
        stop_price: float,
        stop_type: str = "fixed"
    ) -> None:
        self.stops[symbol] = {
            "stop_price": stop_price,
            "stop_type": stop_type,
            "triggered": False
        }

    def set_trailing_stop(
        self,
        symbol: str,
        trail_percent: float
    ) -> None:
        self.stops[symbol] = {
            "trail_percent": trail_percent,
            "highest_price": 0.0,
            "stop_type": "trailing",
            "triggered": False
        }

    def update_price(self, symbol: str, current_price: float) -> bool:
        if symbol not in self.stops:
            return False

        stop = self.stops[symbol]

        if stop["triggered"]:
            return True

        if stop["stop_type"] == "trailing":
            if current_price > stop["highest_price"]:
                stop["highest_price"] = current_price
                stop["stop_price"] = current_price * (1 - stop["trail_percent"])

        if current_price <= stop["stop_price"]:
            stop["triggered"] = True
            return True

        return False

    def is_triggered(self, symbol: str) -> bool:
        return self.stops.get(symbol, {}).get("triggered", False)

51.5 知识图谱

51.5.1 金融科技技术架构

┌─────────────────────────────────────────────────────────────────────┐
│                      金融科技技术架构                                 │
├─────────────────────────────────────────────────────────────────────┤
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │                      交易层 (Trading)                         │   │
│  │  ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐       │   │
│  │  │ 策略引擎  │ │ 订单管理  │ │ 风控系统  │ │ 执行引擎  │       │   │
│  │  │ Strategy │ │ Order    │ │ Risk    │ │ Executor│       │   │
│  │  └──────────┘ └──────────┘ └──────────┘ └──────────┘       │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                │                                    │
│  ┌─────────────────────────────┴───────────────────────────────┐   │
│  │                      分析层 (Analysis)                        │   │
│  │  ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐       │   │
│  │  │ 技术分析  │ │ 量化模型  │ │ 回测框架  │ │ 因子分析  │       │   │
│  │  │ TA-Lib  │ │ Model   │ │ Backtest│ │ Factor  │       │   │
│  │  └──────────┘ └──────────┘ └──────────┘ └──────────┘       │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                │                                    │
│  ┌─────────────────────────────┴───────────────────────────────┐   │
│  │                      数据层 (Data)                            │   │
│  │  ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐       │   │
│  │  │ 行情数据  │ │ 财务数据  │ │ 新闻数据  │ │ 另类数据  │       │   │
│  │  │ Market  │ │ Finance │ │ News    │ │ Alt     │       │   │
│  │  └──────────┘ └──────────┘ └──────────┘ └──────────┘       │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                │                                    │
│  ┌─────────────────────────────┴───────────────────────────────┐   │
│  │                      基础设施层 (Infrastructure)              │   │
│  │  ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐       │   │
│  │  │ 数据库   │ │ 消息队列  │ │ 缓存     │ │ 监控     │       │   │
│  │  │ Timescale│ │ Kafka   │ │ Redis   │ │ Grafana │       │   │
│  │  └──────────┘ └──────────┘ └──────────┘ └──────────┘       │   │
│  └─────────────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────────────┘

51.5.2 量化交易流程

┌─────────────────────────────────────────────────────────────────────┐
│                      量化交易工作流程                                │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│   ┌──────────┐                                                      │
│   │ 数据获取  │ ─── 行情 / 财务 / 新闻 / 另类数据                    │
│   └────┬─────┘                                                      │
│        │                                                            │
│        ▼                                                            │
│   ┌──────────────────────────────────────────────────────────┐     │
│   │                    数据处理                               │     │
│   │  ┌──────────┐ ┌──────────┐ ┌──────────┐                 │     │
│   │  │ 清洗     │ │ 标准化   │ │ 特征工程  │                 │     │
│   │  └──────────┘ └──────────┘ └──────────┘                 │     │
│   └──────────────────────────────────────────────────────────┘     │
│        │                                                            │
│        ▼                                                            │
│   ┌──────────────────────────────────────────────────────────┐     │
│   │                    策略分析                               │     │
│   │  ┌──────────┐ ┌──────────┐ ┌──────────┐                 │     │
│   │  │ 信号生成  │ │ 因子计算  │ │ 模型预测  │                 │     │
│   │  └──────────┘ └──────────┘ └──────────┘                 │     │
│   └──────────────────────────────────────────────────────────┘     │
│        │                                                            │
│        ▼                                                            │
│   ┌──────────────────────────────────────────────────────────┐     │
│   │                    风险控制                               │     │
│   │  ┌──────────┐ ┌──────────┐ ┌──────────┐                 │     │
│   │  │ 仓位管理  │ │ 止损止盈  │ │ VaR计算  │                 │     │
│   │  └──────────┘ └──────────┘ └──────────┘                 │     │
│   └──────────────────────────────────────────────────────────┘     │
│        │                                                            │
│        ▼                                                            │
│   ┌──────────────────────────────────────────────────────────┐     │
│   │                    订单执行                               │     │
│   │  ┌──────────┐ ┌──────────┐ ┌──────────┐                 │     │
│   │  │ 订单生成  │ │ 撮合执行  │ │ 成交确认  │                 │     │
│   │  └──────────┘ └──────────┘ └──────────┘                 │     │
│   └──────────────────────────────────────────────────────────┘     │
│        │                                                            │
│        ▼                                                            │
│   ┌──────────────────────────────────────────────────────────┐     │
│   │                    绩效分析                               │     │
│   │  ┌──────────┐ ┌──────────┐ ┌──────────┐                 │     │
│   │  │ 收益计算  │ │ 风险指标  │ │ 归因分析  │                 │     │
│   │  └──────────┘ └──────────┘ └──────────┘                 │     │
│   └──────────────────────────────────────────────────────────┘     │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

51.6 技术选型指南

51.6.1 数据源选型

数据源数据类型延迟费用推荐指数
TushareA股行情、财务免费/付费★★★★★
AKShare多市场数据免费★★★★★
Wind全市场数据极低★★★★☆
聚宽量化数据★★★★☆
Yahoo Finance美股数据免费★★★★☆

51.6.2 回测框架选型

框架性能功能学习曲线推荐指数
Backtrader极高★★★★★
Zipline★★★★☆
VeighNa极高★★★★★
PyAlgoTrade★★★☆☆

51.6.3 技术指标库选型

指标数量性能易用性推荐指数
TA-Lib150+极高★★★★★
pandas-ta130+★★★★★
TA30+★★★☆☆

51.7 常见问题与解决方案

51.7.1 数据质量问题

python
import pandas as pd
import numpy as np
from typing import Dict, List, Optional, Tuple

class DataQualityChecker:
    """数据质量检查器"""
    
    def __init__(self, df: pd.DataFrame):
        self.df = df
    
    def check_missing(self) -> Dict:
        """检查缺失值"""
        missing = self.df.isnull().sum()
        return {
            "missing_counts": missing.to_dict(),
            "missing_ratio": (missing / len(self.df) * 100).to_dict()
        }
    
    def check_outliers(self, columns: List[str] = None, method: str = 'iqr') -> Dict:
        """检查异常值"""
        columns = columns or self.df.select_dtypes(include=[np.number]).columns
        outliers = {}
        
        for col in columns:
            data = self.df[col].dropna()
            if method == 'iqr':
                Q1, Q3 = data.quantile([0.25, 0.75])
                IQR = Q3 - Q1
                outlier_count = ((data < Q1 - 1.5 * IQR) | (data > Q3 + 1.5 * IQR)).sum()
            elif method == 'zscore':
                z_scores = np.abs((data - data.mean()) / data.std())
                outlier_count = (z_scores > 3).sum()
            
            outliers[col] = outlier_count
        
        return outliers
    
    def check_duplicates(self, subset: List[str] = None) -> int:
        """检查重复值"""
        return self.df.duplicated(subset=subset).sum()
    
    def check_price_consistency(self, price_cols: List[str] = None) -> Dict:
        """检查价格一致性"""
        price_cols = price_cols or ['open', 'high', 'low', 'close']
        issues = []
        
        if all(col in self.df.columns for col in price_cols):
            high_violations = (self.df['high'] < self.df['low']).sum()
            if high_violations > 0:
                issues.append(f"High < Low in {high_violations} rows")
            
            for col in ['open', 'close']:
                violations = ((self.df[col] < self.df['low']) | 
                             (self.df[col] > self.df['high'])).sum()
                if violations > 0:
                    issues.append(f"{col} outside [low, high] in {violations} rows")
        
        return {"issues": issues}


class DataCleaner:
    """数据清洗器"""
    
    @staticmethod
    def fill_missing_prices(df: pd.DataFrame, method: str = 'ffill') -> pd.DataFrame:
        """填充缺失价格"""
        price_cols = ['open', 'high', 'low', 'close']
        for col in price_cols:
            if col in df.columns:
                if method == 'ffill':
                    df[col] = df[col].ffill()
                elif method == 'interpolate':
                    df[col] = df[col].interpolate()
        return df
    
    @staticmethod
    def adjust_for_splits(df: pd.DataFrame, split_ratio: float) -> pd.DataFrame:
        """调整股票分割"""
        price_cols = ['open', 'high', 'low', 'close']
        for col in price_cols:
            if col in df.columns:
                df[col] = df[col] / split_ratio
        
        if 'volume' in df.columns:
            df['volume'] = df['volume'] * split_ratio
        
        return df
    
    @staticmethod
    def remove_outliers(df: pd.DataFrame, column: str, method: str = 'iqr') -> pd.DataFrame:
        """移除异常值"""
        data = df[column]
        
        if method == 'iqr':
            Q1, Q3 = data.quantile([0.25, 0.75])
            IQR = Q3 - Q1
            mask = (data >= Q1 - 1.5 * IQR) & (data <= Q3 + 1.5 * IQR)
        elif method == 'zscore':
            z_scores = np.abs((data - data.mean()) / data.std())
            mask = z_scores <= 3
        
        return df[mask]

51.7.2 策略回测优化

python
from typing import Dict, List, Tuple, Optional
from dataclasses import dataclass, field
import pandas as pd
import numpy as np

@dataclass
class BacktestResult:
    """回测结果"""
    total_return: float
    annual_return: float
    max_drawdown: float
    sharpe_ratio: float
    win_rate: float
    trades: List[Dict] = field(default_factory=list)


class BacktestEngine:
    """回测引擎"""
    
    def __init__(
        self,
        initial_capital: float = 100000,
        commission: float = 0.0003,
        slippage: float = 0.0001
    ):
        self.initial_capital = initial_capital
        self.commission = commission
        self.slippage = slippage
    
    def run(
        self,
        data: pd.DataFrame,
        signals: pd.Series,
        position_size: float = 0.95
    ) -> BacktestResult:
        """运行回测"""
        capital = self.initial_capital
        position = 0
        trades = []
        equity_curve = [capital]
        
        for i in range(len(data)):
            price = data['close'].iloc[i]
            signal = signals.iloc[i]
            
            if signal == 1 and position == 0:
                shares = int(capital * position_size / price)
                cost = shares * price * (1 + self.commission + self.slippage)
                if cost <= capital:
                    position = shares
                    capital -= cost
                    trades.append({
                        "type": "buy",
                        "price": price,
                        "shares": shares,
                        "date": data.index[i]
                    })
            
            elif signal == -1 and position > 0:
                revenue = position * price * (1 - self.commission - self.slippage)
                capital += revenue
                trades.append({
                    "type": "sell",
                    "price": price,
                    "shares": position,
                    "date": data.index[i]
                })
                position = 0
            
            equity = capital + position * price
            equity_curve.append(equity)
        
        final_equity = capital + position * data['close'].iloc[-1]
        total_return = (final_equity - self.initial_capital) / self.initial_capital
        
        equity_series = pd.Series(equity_curve)
        returns = equity_series.pct_change().dropna()
        
        sharpe = np.sqrt(252) * returns.mean() / returns.std() if returns.std() > 0 else 0
        
        cummax = equity_series.cummax()
        drawdown = (equity_series - cummax) / cummax
        max_drawdown = drawdown.min()
        
        winning_trades = [t for t in trades[1::2] if len(trades) > 1]
        win_rate = len([t for t in winning_trades if t.get('profit', 0) > 0]) / len(winning_trades) if winning_trades else 0
        
        return BacktestResult(
            total_return=total_return,
            annual_return=(1 + total_return) ** (252 / len(data)) - 1,
            max_drawdown=max_drawdown,
            sharpe_ratio=sharpe,
            win_rate=win_rate,
            trades=trades
        )


class StrategyOptimizer:
    """策略优化器"""
    
    def __init__(self, data: pd.DataFrame, strategy_class):
        self.data = data
        self.strategy_class = strategy_class
    
    def grid_search(
        self,
        param_grid: Dict[str, List]
    ) -> List[Dict]:
        """网格搜索优化"""
        from itertools import product
        
        results = []
        param_names = list(param_grid.keys())
        param_values = list(param_grid.values())
        
        for combo in product(*param_values):
            params = dict(zip(param_names, combo))
            
            strategy = self.strategy_class(**params)
            signals = strategy.generate_signals(self.data)
            
            engine = BacktestEngine()
            result = engine.run(self.data, signals)
            
            results.append({
                "params": params,
                "sharpe": result.sharpe_ratio,
                "return": result.total_return,
                "drawdown": result.max_drawdown
            })
        
        return sorted(results, key=lambda x: x['sharpe'], reverse=True)

51.8 本章小结

本章详细介绍了Python金融科技的核心概念和实践:

  1. 金融数据获取:行情数据、Tick数据、订单簿、财务数据
  2. 技术分析:技术指标、K线形态识别
  3. 交易策略:策略框架、MA交叉、RSI策略
  4. 风险管理:VaR、CVaR、仓位管理、止损管理

练习题

  1. 实现一个完整的数据获取系统,支持多数据源
  2. 开发一个技术指标计算库,支持常用指标
  3. 实现一个回测框架,支持多策略组合
  4. 开发一个风险管理系统,支持实时监控
  5. 实现一个简单的量化交易平台,支持模拟交易

扩展阅读

Python技术丛书 - 江苏省宿城中等专业学校