Skip to content

第39章 性能测试与优化

学习目标

完成本章学习后,你将能够:

  1. 理解性能指标:响应时间、吞吐量、并发数、资源利用率
  2. 使用性能测试工具:Locust、JMeter、Apache Benchmark
  3. 进行代码性能分析:cProfile、line_profiler、memory_profiler
  4. 优化Python代码:算法优化、数据结构选择、缓存策略
  5. 优化数据库性能:查询优化、索引设计、连接池配置
  6. 实现缓存机制:Redis缓存、内存缓存、缓存策略
  7. 进行并发优化:多线程、多进程、异步IO
  8. 监控系统性能:性能指标收集、告警机制、性能报告

39.1 性能测试基础

39.1.1 性能指标

python
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Any
from datetime import datetime
import statistics
import time


@dataclass
class PerformanceMetric:
    name: str
    value: float
    unit: str
    timestamp: datetime = field(default_factory=datetime.now)
    tags: Dict[str, str] = field(default_factory=dict)

    def to_dict(self) -> Dict:
        return {
            "name": self.name,
            "value": self.value,
            "unit": self.unit,
            "timestamp": self.timestamp.isoformat(),
            "tags": self.tags
        }


@dataclass
class ResponseTimeStats:
    min: float
    max: float
    mean: float
    median: float
    p90: float
    p95: float
    p99: float
    std_dev: float

    @classmethod
    def from_samples(cls, samples: List[float]) -> "ResponseTimeStats":
        if not samples:
            return cls(0, 0, 0, 0, 0, 0, 0, 0)

        sorted_samples = sorted(samples)
        n = len(sorted_samples)

        return cls(
            min=sorted_samples[0],
            max=sorted_samples[-1],
            mean=statistics.mean(samples),
            median=statistics.median(samples),
            p90=sorted_samples[int(n * 0.9)] if n > 0 else 0,
            p95=sorted_samples[int(n * 0.95)] if n > 0 else 0,
            p99=sorted_samples[int(n * 0.99)] if n > 0 else 0,
            std_dev=statistics.stdev(samples) if n > 1 else 0
        )


@dataclass
class LoadTestResult:
    total_requests: int
    successful_requests: int
    failed_requests: int
    total_duration: float
    requests_per_second: float
    response_times: ResponseTimeStats
    errors: Dict[str, int] = field(default_factory=dict)

    @property
    def success_rate(self) -> float:
        if self.total_requests == 0:
            return 0.0
        return self.successful_requests / self.total_requests

    @property
    def error_rate(self) -> float:
        return 1.0 - self.success_rate

    def to_dict(self) -> Dict:
        return {
            "total_requests": self.total_requests,
            "successful_requests": self.successful_requests,
            "failed_requests": self.failed_requests,
            "total_duration": self.total_duration,
            "requests_per_second": self.requests_per_second,
            "success_rate": f"{self.success_rate:.2%}",
            "error_rate": f"{self.error_rate:.2%}",
            "response_times": {
                "min": f"{self.response_times.min:.3f}s",
                "max": f"{self.response_times.max:.3f}s",
                "mean": f"{self.response_times.mean:.3f}s",
                "median": f"{self.response_times.median:.3f}s",
                "p90": f"{self.response_times.p90:.3f}s",
                "p95": f"{self.response_times.p95:.3f}s",
                "p99": f"{self.response_times.p99:.3f}s"
            },
            "errors": self.errors
        }


class PerformanceCollector:
    def __init__(self):
        self._metrics: List[PerformanceMetric] = []
        self._response_times: List[float] = []
        self._errors: Dict[str, int] = {}

    def record_response_time(self, duration: float) -> None:
        self._response_times.append(duration)

    def record_error(self, error_type: str) -> None:
        self._errors[error_type] = self._errors.get(error_type, 0) + 1

    def record_metric(self, name: str, value: float, unit: str, tags: Dict = None) -> None:
        self._metrics.append(PerformanceMetric(
            name=name,
            value=value,
            unit=unit,
            tags=tags or {}
        ))

    def get_response_time_stats(self) -> ResponseTimeStats:
        return ResponseTimeStats.from_samples(self._response_times)

    def get_metrics(self, name: str = None) -> List[PerformanceMetric]:
        if name:
            return [m for m in self._metrics if m.name == name]
        return self._metrics.copy()

    def clear(self) -> None:
        self._metrics.clear()
        self._response_times.clear()
        self._errors.clear()


class SimpleLoadTester:
    def __init__(self, target_func: callable, concurrency: int = 10):
        self.target_func = target_func
        self.concurrency = concurrency
        self._collector = PerformanceCollector()

    def run(self, duration: float = 60, ramp_up: float = 0) -> LoadTestResult:
        import threading
        import queue

        start_time = time.time()
        results_queue = queue.Queue()
        stop_event = threading.Event()

        def worker():
            while not stop_event.is_set():
                request_start = time.time()
                try:
                    result = self.target_func()
                    duration = time.time() - request_start
                    results_queue.put(("success", duration))
                except Exception as e:
                    results_queue.put(("error", str(type(e).__name__)))

        workers = []
        for i in range(self.concurrency):
            if ramp_up > 0:
                time.sleep(ramp_up / self.concurrency)
            t = threading.Thread(target=worker)
            t.start()
            workers.append(t)

        time.sleep(duration)
        stop_event.set()

        for t in workers:
            t.join(timeout=1)

        total_duration = time.time() - start_time
        response_times = []
        successful = 0
        failed = 0
        errors = {}

        while not results_queue.empty():
            status, value = results_queue.get()
            if status == "success":
                successful += 1
                response_times.append(value)
            else:
                failed += 1
                errors[value] = errors.get(value, 0) + 1

        total_requests = successful + failed

        return LoadTestResult(
            total_requests=total_requests,
            successful_requests=successful,
            failed_requests=failed,
            total_duration=total_duration,
            requests_per_second=total_requests / total_duration,
            response_times=ResponseTimeStats.from_samples(response_times),
            errors=errors
        )

39.1.2 基准测试

python
import timeit
import functools
from typing import Callable, List, Dict, Any, Tuple
from dataclasses import dataclass


@dataclass
class BenchmarkResult:
    name: str
    iterations: int
    total_time: float
    avg_time: float
    min_time: float
    max_time: float
    ops_per_second: float

    def __str__(self) -> str:
        return (
            f"{self.name}:\n"
            f"  Iterations: {self.iterations}\n"
            f"  Total time: {self.total_time:.6f}s\n"
            f"  Average: {self.avg_time * 1000:.4f}ms\n"
            f"  Min: {self.min_time * 1000:.4f}ms\n"
            f"  Max: {self.max_time * 1000:.4f}ms\n"
            f"  Ops/sec: {self.ops_per_second:.2f}"
        )


class Benchmark:
    def __init__(self, name: str = None, iterations: int = 1000, warmup: int = 10):
        self.name = name
        self.iterations = iterations
        self.warmup = warmup
        self._results: List[BenchmarkResult] = []

    def run(self, func: Callable, *args, **kwargs) -> BenchmarkResult:
        name = self.name or func.__name__

        for _ in range(self.warmup):
            func(*args, **kwargs)

        times = []
        for _ in range(self.iterations):
            start = timeit.default_timer()
            func(*args, **kwargs)
            end = timeit.default_timer()
            times.append(end - start)

        total_time = sum(times)
        avg_time = total_time / self.iterations
        min_time = min(times)
        max_time = max(times)
        ops_per_second = self.iterations / total_time if total_time > 0 else 0

        result = BenchmarkResult(
            name=name,
            iterations=self.iterations,
            total_time=total_time,
            avg_time=avg_time,
            min_time=min_time,
            max_time=max_time,
            ops_per_second=ops_per_second
        )

        self._results.append(result)
        return result

    def compare(self, *funcs: Tuple[Callable, ...]) -> Dict[str, BenchmarkResult]:
        results = {}
        for func in funcs:
            result = self.run(func)
            results[func.__name__] = result
        return results

    def get_results(self) -> List[BenchmarkResult]:
        return self._results.copy()

    def print_summary(self) -> None:
        for result in self._results:
            print(result)
            print()


def benchmark(iterations: int = 1000, warmup: int = 10):
    def decorator(func: Callable) -> Callable:
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            bench = Benchmark(func.__name__, iterations, warmup)
            result = bench.run(func, *args, **kwargs)
            print(result)
            return result
        return wrapper
    return decorator


class ComparisonBenchmark:
    def __init__(self, name: str = None):
        self.name = name
        self._functions: List[Tuple[str, Callable]] = []

    def add_function(self, name: str, func: Callable) -> None:
        self._functions.append((name, func))

    def run(self, iterations: int = 1000, warmup: int = 10) -> Dict[str, BenchmarkResult]:
        results = {}

        for name, func in self._functions:
            bench = Benchmark(name, iterations, warmup)
            result = bench.run(func)
            results[name] = result

        return results

    def print_comparison(self, results: Dict[str, BenchmarkResult]) -> None:
        sorted_results = sorted(results.items(), key=lambda x: x[1].avg_time)

        print(f"\n{'='*60}")
        print(f"Benchmark: {self.name or 'Comparison'}")
        print(f"{'='*60}")
        print(f"{'Name':<20} {'Avg Time':<15} {'Ops/sec':<15} {'Relative':<10}")
        print(f"{'-'*60}")

        baseline = sorted_results[0][1].avg_time

        for name, result in sorted_results:
            relative = result.avg_time / baseline
            print(f"{name:<20} {result.avg_time*1000:>10.4f}ms {result.ops_per_second:>14.2f} {relative:>9.2f}x")

        print(f"{'='*60}\n")


def benchmark_example():
    def list_comprehension():
        return [x * 2 for x in range(1000)]

    def map_function():
        return list(map(lambda x: x * 2, range(1000)))

    def for_loop():
        result = []
        for x in range(1000):
            result.append(x * 2)
        return result

    comparison = ComparisonBenchmark("List Creation Methods")
    comparison.add_function("list_comprehension", list_comprehension)
    comparison.add_function("map_function", map_function)
    comparison.add_function("for_loop", for_loop)

    results = comparison.run(iterations=10000)
    comparison.print_comparison(results)

39.2 Locust负载测试

39.2.1 Locust基础

python
from locust import HttpUser, task, between, events
from locust.runners import MasterRunner, WorkerRunner
from locust.stats import stats_printer, stats_history
from typing import Dict, List, Optional
import json
import time


class ApiUser(HttpUser):
    wait_time = between(1, 3)

    host = "http://localhost:8000"

    def on_start(self):
        self.login()

    def login(self):
        response = self.client.post("/api/login", json={
            "username": "testuser",
            "password": "testpass"
        })
        if response.status_code == 200:
            self.token = response.json().get("token")

    @task(3)
    def get_users(self):
        self.client.get(
            "/api/users",
            headers={"Authorization": f"Bearer {getattr(self, 'token', '')}"}
        )

    @task(2)
    def get_user_detail(self):
        user_id = 1
        self.client.get(
            f"/api/users/{user_id}",
            headers={"Authorization": f"Bearer {getattr(self, 'token', '')}"}
        )

    @task(1)
    def create_user(self):
        self.client.post(
            "/api/users",
            json={
                "name": "Test User",
                "email": "test@example.com"
            },
            headers={"Authorization": f"Bearer {getattr(self, 'token', '')}"}
        )


class WebsiteUser(HttpUser):
    wait_time = between(2, 5)

    @task(10)
    def index(self):
        self.client.get("/")

    @task(5)
    def about(self):
        self.client.get("/about")

    @task(3)
    def products(self):
        self.client.get("/products")

    @task(2)
    def product_detail(self):
        product_id = 1
        self.client.get(f"/products/{product_id}")

    @task(1)
    def search(self):
        self.client.get("/search?q=test")


class LocustConfig:
    class Config:
        locustfile = "locustfile.py"
        host = "http://localhost:8000"
        users = 100
        spawn_rate = 10
        run_time = "5m"
        headless = True
        html = "report.html"
        csv = "results"
        only_summary = True


class CustomLoadTest(HttpUser):
    wait_time = between(0.5, 2)

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._request_count = 0
        self._error_count = 0

    @task
    def api_request(self):
        endpoints = ["/api/data", "/api/status", "/api/health"]
        endpoint = endpoints[self._request_count % len(endpoints)]

        with self.client.get(endpoint, catch_response=True) as response:
            self._request_count += 1

            if response.status_code == 200:
                response.success()
            elif response.status_code == 429:
                response.failure("Rate limited")
                self._error_count += 1
            else:
                response.failure(f"Unexpected status: {response.status_code}")
                self._error_count += 1


@events.test_start.add_listener
def on_test_start(environment, **kwargs):
    print("Load test starting...")


@events.test_stop.add_listener
def on_test_stop(environment, **kwargs):
    print("Load test completed.")


@events.request.add_listener
def on_request(request_type, name, response_time, response_length, exception, **kwargs):
    if exception:
        print(f"Request failed: {name} - {exception}")


class LocustRunner:
    def __init__(self, config: LocustConfig = None):
        self.config = config or LocustConfig()

    def run_headless(self) -> Dict:
        from locust import main

        results = {
            "total_requests": 0,
            "total_failures": 0,
            "avg_response_time": 0,
            "requests_per_second": 0
        }

        return results

    def generate_report(self, output_file: str = "report.html") -> None:
        print(f"Report saved to {output_file}")

39.2.2 高级测试场景

python
from locust import SequentialTaskSet, task, tag
import random
import string


class UserJourney(SequentialTaskSet):
    def on_start(self):
        self.user_id = None
        self.cart_id = None

    @task
    def register(self):
        username = ''.join(random.choices(string.ascii_lowercase, k=8))
        response = self.client.post("/api/register", json={
            "username": username,
            "password": "password123",
            "email": f"{username}@example.com"
        })
        if response.status_code == 201:
            self.user_id = response.json().get("user_id")

    @task
    def login(self):
        if not self.user_id:
            self.interrupt()
        response = self.client.post("/api/login", json={
            "username": "testuser",
            "password": "password123"
        })
        if response.status_code == 200:
            self.token = response.json().get("token")

    @task
    def browse_products(self):
        for _ in range(random.randint(1, 5)):
            self.client.get("/api/products")

    @task
    def add_to_cart(self):
        product_id = random.randint(1, 100)
        quantity = random.randint(1, 3)
        self.client.post("/api/cart/add", json={
            "product_id": product_id,
            "quantity": quantity
        })

    @task
    def checkout(self):
        self.client.post("/api/checkout")

    def on_stop(self):
        if self.user_id:
            self.client.delete(f"/api/users/{self.user_id}")


class ECommerceUser(HttpUser):
    tasks = [UserJourney]
    wait_time = between(1, 3)


class StressTestUser(HttpUser):
    wait_time = between(0.1, 0.5)

    @task
    def rapid_requests(self):
        for _ in range(10):
            self.client.get("/api/health")


class SpikeTestUser(HttpUser):
    wait_time = between(0, 0.5)

    @task
    def spike_request(self):
        self.client.get("/api/data")


class SoakTestUser(HttpUser):
    wait_time = between(5, 10)

    @task
    def normal_usage(self):
        self.client.get("/api/dashboard")


class TestDataGenerator:
    @staticmethod
    def generate_user_data() -> Dict:
        return {
            "username": ''.join(random.choices(string.ascii_lowercase, k=8)),
            "email": f"{''.join(random.choices(string.ascii_lowercase, k=6))}@example.com",
            "age": random.randint(18, 65),
            "country": random.choice(["US", "UK", "CN", "JP", "DE"])
        }

    @staticmethod
    def generate_product_data() -> Dict:
        return {
            "name": f"Product {random.randint(1000, 9999)}",
            "price": round(random.uniform(10, 1000), 2),
            "category": random.choice(["electronics", "clothing", "books", "home"]),
            "stock": random.randint(0, 100)
        }

    @staticmethod
    def generate_order_data(user_id: int) -> Dict:
        items = []
        for _ in range(random.randint(1, 5)):
            items.append({
                "product_id": random.randint(1, 100),
                "quantity": random.randint(1, 3)
            })
        return {
            "user_id": user_id,
            "items": items,
            "shipping_address": "123 Main St"
        }

39.3 代码性能分析

39.3.1 cProfile分析

python
import cProfile
import pstats
import io
from pstats import SortKey
from typing import Callable, Dict, List, Any
from dataclasses import dataclass
import functools


@dataclass
class ProfileResult:
    total_calls: int
    primitive_calls: int
    total_time: float
    cum_time: float
    functions: List[Dict]

    def print_summary(self) -> None:
        print(f"Total calls: {self.total_calls}")
        print(f"Primitive calls: {self.primitive_calls}")
        print(f"Total time: {self.total_time:.4f}s")
        print(f"Cumulative time: {self.cum_time:.4f}s")
        print("\nTop functions by cumulative time:")
        for func in self.functions[:10]:
            print(f"  {func['name']}: {func['cum_time']:.4f}s ({func['calls']} calls)")


class Profiler:
    def __init__(self, sort_by: str = "cumulative"):
        self.sort_by = sort_by
        self._profile = None
        self._stats = None

    def profile(self, func: Callable, *args, **kwargs) -> tuple:
        self._profile = cProfile.Profile()
        self._profile.enable()

        result = func(*args, **kwargs)

        self._profile.disable()

        stream = io.StringIO()
        self._stats = pstats.Stats(self._profile, stream=stream)
        self._stats.sort_stats(self.sort_by)

        return result, self._stats

    def print_stats(self, limit: int = 20) -> None:
        if self._stats:
            self._stats.print_stats(limit)

    def get_stats_dict(self) -> Dict:
        if not self._stats:
            return {}

        stats_dict = {}
        for func, (cc, nc, tt, ct, callers) in self._stats.stats.items():
            filename, line, name = func
            stats_dict[f"{filename}:{line}:{name}"] = {
                "call_count": cc,
                "native_calls": nc,
                "total_time": tt,
                "cumulative_time": ct
            }

        return stats_dict

    def save_to_file(self, filename: str) -> None:
        if self._stats:
            self._stats.dump_stats(filename)


def profile(sort_by: str = "cumulative", limit: int = 20):
    def decorator(func: Callable) -> Callable:
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            profiler = Profiler(sort_by)
            result, stats = profiler.profile(func, *args, **kwargs)
            profiler.print_stats(limit)
            return result
        return wrapper
    return decorator


class LineProfiler:
    def __init__(self):
        self._functions: List[Callable] = []
        self._results: Dict[str, Dict] = {}

    def add_function(self, func: Callable) -> None:
        self._functions.append(func)

    def run(self, func: Callable, *args, **kwargs) -> Any:
        try:
            from line_profiler import LineProfiler as LP
            lp = LP()

            for f in self._functions:
                lp.add_function(f)

            lp.enable_by_count()
            result = func(*args, **kwargs)
            lp.disable_by_count()

            lp.print_stats()
            return result
        except ImportError:
            print("line_profiler not installed. Run: pip install line_profiler")
            return func(*args, **kwargs)


class MemoryProfiler:
    def __init__(self):
        self._snapshots: List[Any] = []

    def take_snapshot(self) -> None:
        try:
            import tracemalloc
            if not tracemalloc.is_tracing():
                tracemalloc.start()
            self._snapshots.append(tracemalloc.take_snapshot())
        except ImportError:
            print("tracemalloc not available")

    def compare_snapshots(self, snapshot1_idx: int = -2, snapshot2_idx: int = -1) -> List:
        if len(self._snapshots) < 2:
            return []

        snapshot1 = self._snapshots[snapshot1_idx]
        snapshot2 = self._snapshots[snapshot2_idx]

        top_stats = snapshot2.compare_to(snapshot1, "lineno")

        results = []
        for stat in top_stats[:10]:
            results.append({
                "file": str(stat),
                "size_diff": stat.size_diff,
                "count_diff": stat.count_diff
            })

        return results

    def get_memory_usage(self) -> Dict:
        try:
            import tracemalloc
            if not tracemalloc.is_tracing():
                tracemalloc.start()

            current, peak = tracemalloc.get_traced_memory()
            return {
                "current_mb": current / 1024 / 1024,
                "peak_mb": peak / 1024 / 1024
            }
        except ImportError:
            return {}

    def stop(self) -> None:
        try:
            import tracemalloc
            tracemalloc.stop()
        except ImportError:
            pass


def profile_memory(func: Callable) -> Callable:
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        import tracemalloc
        tracemalloc.start()

        result = func(*args, **kwargs)

        current, peak = tracemalloc.get_traced_memory()
        print(f"Memory usage: {current / 1024 / 1024:.2f} MB (peak: {peak / 1024 / 1024:.2f} MB)")

        tracemalloc.stop()
        return result
    return wrapper

39.3.2 性能瓶颈识别

python
from typing import List, Tuple, Optional
import time
import sys


class PerformanceAnalyzer:
    def __init__(self):
        self._bottlenecks: List[Dict] = []

    def analyze_function(self, func: Callable, *args, **kwargs) -> Dict:
        import dis
        import inspect

        analysis = {
            "name": func.__name__,
            "is_generator": inspect.isgeneratorfunction(func),
            "is_coroutine": inspect.iscoroutinefunction(func),
            "bytecode_instructions": len(list(dis.get_instructions(func))),
            "complexity_estimate": self._estimate_complexity(func)
        }

        return analysis

    def _estimate_complexity(self, func: Callable) -> str:
        import ast
        import inspect

        try:
            source = inspect.getsource(func)
            tree = ast.parse(source)

            loop_count = 0
            for node in ast.walk(tree):
                if isinstance(node, (ast.For, ast.While)):
                    loop_count += 1

            if loop_count == 0:
                return "O(1)"
            elif loop_count == 1:
                return "O(n)"
            else:
                return f"O(n^{loop_count})"
        except:
            return "Unknown"

    def identify_bottlenecks(self, code: str) -> List[Dict]:
        import ast

        bottlenecks = []

        try:
            tree = ast.parse(code)

            for node in ast.walk(tree):
                if isinstance(node, ast.For):
                    if isinstance(node.iter, ast.Call):
                        if hasattr(node.iter.func, 'id'):
                            if node.iter.func.id == 'range':
                                bottlenecks.append({
                                    "type": "loop",
                                    "line": node.lineno,
                                    "description": "Potential O(n) loop"
                                })

                elif isinstance(node, ast.Call):
                    if hasattr(node.func, 'attr'):
                        if node.func.attr in ['append', 'extend', 'insert']:
                            bottlenecks.append({
                                "type": "list_operation",
                                "line": node.lineno,
                                "description": f"List {node.func.attr} may be slow in loops"
                            })

                elif isinstance(node, ast.BinOp):
                    if isinstance(node.op, (ast.Add, ast.Mult)):
                        if isinstance(node.left, ast.Str) or isinstance(node.right, ast.Str):
                            bottlenecks.append({
                                "type": "string_concat",
                                "line": node.lineno,
                                "description": "String concatenation in loop may be slow"
                            })

        except SyntaxError:
            pass

        return bottlenecks

    def suggest_optimizations(self, bottlenecks: List[Dict]) -> List[str]:
        suggestions = []

        for bottleneck in bottlenecks:
            if bottleneck["type"] == "loop":
                suggestions.append(
                    f"Line {bottleneck['line']}: Consider using list comprehension or map()"
                )
            elif bottleneck["type"] == "list_operation":
                suggestions.append(
                    f"Line {bottleneck['line']}: Pre-allocate list size if possible"
                )
            elif bottleneck["type"] == "string_concat":
                suggestions.append(
                    f"Line {bottleneck['line']}: Use ''.join() or f-strings instead of +"
                )

        return suggestions


class CodeOptimizer:
    @staticmethod
    def optimize_list_creation(size: int) -> List[int]:
        return [0] * size

    @staticmethod
    def optimize_string_concat(strings: List[str]) -> str:
        return ''.join(strings)

    @staticmethod
    def optimize_dict_lookup(data: Dict, keys: List[str]) -> List[Any]:
        return [data.get(key) for key in keys]

    @staticmethod
    def optimize_set_operations(list1: List, list2: List) -> set:
        return set(list1) & set(list2)

    @staticmethod
    def optimize_loop_with_local(func: Callable, data: List) -> List:
        local_func = func
        return [local_func(item) for item in data]


def compare_implementations():
    import timeit

    def slow_string_concat(n):
        result = ""
        for i in range(n):
            result += str(i)
        return result

    def fast_string_concat(n):
        return ''.join(str(i) for i in range(n))

    n = 10000

    slow_time = timeit.timeit(lambda: slow_string_concat(n), number=10)
    fast_time = timeit.timeit(lambda: fast_string_concat(n), number=10)

    print(f"Slow method: {slow_time:.4f}s")
    print(f"Fast method: {fast_time:.4f}s")
    print(f"Speedup: {slow_time / fast_time:.2f}x")

39.4 性能优化策略

39.4.1 算法优化

python
from functools import lru_cache
from typing import List, Dict, Set, Tuple, Optional
import heapq
from collections import defaultdict, Counter, deque


class AlgorithmOptimizations:
    @staticmethod
    @lru_cache(maxsize=128)
    def fibonacci_cached(n: int) -> int:
        if n < 2:
            return n
        return AlgorithmOptimizations.fibonacci_cached(n - 1) + AlgorithmOptimizations.fibonacci_cached(n - 2)

    @staticmethod
    def fibonacci_iterative(n: int) -> int:
        if n < 2:
            return n
        a, b = 0, 1
        for _ in range(2, n + 1):
            a, b = b, a + b
        return b

    @staticmethod
    def binary_search(arr: List[int], target: int) -> int:
        left, right = 0, len(arr) - 1

        while left <= right:
            mid = (left + right) // 2
            if arr[mid] == target:
                return mid
            elif arr[mid] < target:
                left = mid + 1
            else:
                right = mid - 1

        return -1

    @staticmethod
    def two_sum_optimized(nums: List[int], target: int) -> Tuple[int, int]:
        seen = {}
        for i, num in enumerate(nums):
            complement = target - num
            if complement in seen:
                return seen[complement], i
            seen[num] = i
        return -1, -1

    @staticmethod
    def merge_sorted_arrays(arr1: List[int], arr2: List[int]) -> List[int]:
        result = []
        i, j = 0, 0

        while i < len(arr1) and j < len(arr2):
            if arr1[i] <= arr2[j]:
                result.append(arr1[i])
                i += 1
            else:
                result.append(arr2[j])
                j += 1

        result.extend(arr1[i:])
        result.extend(arr2[j:])
        return result

    @staticmethod
    def find_top_k(nums: List[int], k: int) -> List[int]:
        return heapq.nlargest(k, nums)

    @staticmethod
    def group_by_frequency(items: List[str]) -> Dict[str, int]:
        return Counter(items)


class DataStructureOptimizations:
    def __init__(self):
        self._cache: Dict[str, Any] = {}
        self._lru_cache: deque = deque(maxlen=100)

    def use_set_for_membership(self, items: List[int], target: int) -> bool:
        item_set = set(items)
        return target in item_set

    def use_dict_for_counting(self, items: List[str]) -> Dict[str, int]:
        counts = defaultdict(int)
        for item in items:
            counts[item] += 1
        return dict(counts)

    def use_heap_for_priority(self, items: List[Tuple[int, str]]) -> List[str]:
        heapq.heapify(items)
        result = []
        while items:
            _, value = heapq.heappop(items)
            result.append(value)
        return result

    def use_deque_for_queue(self, items: List[int]) -> List[int]:
        queue = deque(items)
        result = []
        while queue:
            result.append(queue.popleft())
        return result


class CachingStrategies:
    def __init__(self, max_size: int = 100):
        self.max_size = max_size
        self._cache: Dict[str, Any] = {}
        self._access_order: List[str] = []

    def get(self, key: str) -> Optional[Any]:
        if key in self._cache:
            self._access_order.remove(key)
            self._access_order.append(key)
            return self._cache[key]
        return None

    def set(self, key: str, value: Any) -> None:
        if key in self._cache:
            self._access_order.remove(key)
        elif len(self._cache) >= self.max_size:
            oldest = self._access_order.pop(0)
            del self._cache[oldest]

        self._cache[key] = value
        self._access_order.append(key)

    def clear(self) -> None:
        self._cache.clear()
        self._access_order.clear()


class LRUCache:
    def __init__(self, capacity: int):
        self.capacity = capacity
        self._cache: Dict[str, Any] = {}
        self._order: deque = deque()

    def get(self, key: str) -> Optional[Any]:
        if key in self._cache:
            self._order.remove(key)
            self._order.append(key)
            return self._cache[key]
        return None

    def put(self, key: str, value: Any) -> None:
        if key in self._cache:
            self._order.remove(key)
        elif len(self._cache) >= self.capacity:
            oldest = self._order.popleft()
            del self._cache[oldest]

        self._cache[key] = value
        self._order.append(key)

39.4.2 并发优化

python
import threading
import multiprocessing
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
from typing import Callable, List, Any, Tuple
import asyncio


class ConcurrencyOptimizer:
    @staticmethod
    def parallel_map(func: Callable, items: List[Any], max_workers: int = None) -> List[Any]:
        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            results = list(executor.map(func, items))
        return results

    @staticmethod
    def parallel_map_process(func: Callable, items: List[Any], max_workers: int = None) -> List[Any]:
        with ProcessPoolExecutor(max_workers=max_workers) as executor:
            results = list(executor.map(func, items))
        return results

    @staticmethod
    def parallel_submit(
        func: Callable,
        items: List[Tuple],
        max_workers: int = None
    ) -> List[Any]:
        results = []
        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            futures = [executor.submit(func, *args) for args in items]
            for future in as_completed(futures):
                results.append(future.result())
        return results

    @staticmethod
    async def async_map(func: Callable, items: List[Any]) -> List[Any]:
        tasks = [func(item) for item in items]
        return await asyncio.gather(*tasks)


class ThreadSafeCache:
    def __init__(self, max_size: int = 100):
        self.max_size = max_size
        self._cache: Dict[str, Any] = {}
        self._lock = threading.Lock()

    def get(self, key: str) -> Optional[Any]:
        with self._lock:
            return self._cache.get(key)

    def set(self, key: str, value: Any) -> None:
        with self._lock:
            if len(self._cache) >= self.max_size and key not in self._cache:
                self._cache.pop(next(iter(self._cache)))
            self._cache[key] = value

    def delete(self, key: str) -> None:
        with self._lock:
            self._cache.pop(key, None)


class BatchProcessor:
    def __init__(self, batch_size: int = 100, max_workers: int = 4):
        self.batch_size = batch_size
        self.max_workers = max_workers

    def process_in_batches(
        self,
        items: List[Any],
        process_func: Callable[[List[Any]], List[Any]]
    ) -> List[Any]:
        batches = [
            items[i:i + self.batch_size]
            for i in range(0, len(items), self.batch_size)
        ]

        results = []
        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            futures = [executor.submit(process_func, batch) for batch in batches]
            for future in as_completed(futures):
                results.extend(future.result())

        return results


class ConnectionPool:
    def __init__(self, create_connection: Callable, max_connections: int = 10):
        self._create_connection = create_connection
        self._max_connections = max_connections
        self._pool: List[Any] = []
        self._lock = threading.Lock()
        self._semaphore = threading.Semaphore(max_connections)

    def get_connection(self) -> Any:
        self._semaphore.acquire()
        with self._lock:
            if self._pool:
                return self._pool.pop()
            return self._create_connection()

    def return_connection(self, conn: Any) -> None:
        with self._lock:
            self._pool.append(conn)
        self._semaphore.release()

    @property
    def available_connections(self) -> int:
        with self._lock:
            return len(self._pool)

39.5 本章小结

本章详细介绍了Python性能测试与优化的核心概念和实践:

  1. 性能指标:响应时间、吞吐量、并发数、资源利用率
  2. 性能测试工具:Locust负载测试、基准测试
  3. 代码分析:cProfile、line_profiler、memory_profiler
  4. 算法优化:时间复杂度、空间复杂度、缓存策略
  5. 并发优化:多线程、多进程、异步IO
  6. 性能监控:指标收集、瓶颈识别、优化建议

练习题

  1. 实现一个性能测试框架,支持多种负载模式
  2. 开发一个代码性能分析工具,自动识别性能瓶颈
  3. 实现一个智能缓存系统,支持多种缓存策略
  4. 开发一个数据库查询优化器,自动优化SQL查询
  5. 实现一个分布式性能监控系统,支持实时监控和告警

扩展阅读

Python技术丛书 - 江苏省宿城中等专业学校