第39章 性能测试与优化
学习目标
完成本章学习后,你将能够:
- 理解性能指标:响应时间、吞吐量、并发数、资源利用率
- 使用性能测试工具:Locust、JMeter、Apache Benchmark
- 进行代码性能分析:cProfile、line_profiler、memory_profiler
- 优化Python代码:算法优化、数据结构选择、缓存策略
- 优化数据库性能:查询优化、索引设计、连接池配置
- 实现缓存机制:Redis缓存、内存缓存、缓存策略
- 进行并发优化:多线程、多进程、异步IO
- 监控系统性能:性能指标收集、告警机制、性能报告
39.1 性能测试基础
39.1.1 性能指标
python
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Any
from datetime import datetime
import statistics
import time
@dataclass
class PerformanceMetric:
name: str
value: float
unit: str
timestamp: datetime = field(default_factory=datetime.now)
tags: Dict[str, str] = field(default_factory=dict)
def to_dict(self) -> Dict:
return {
"name": self.name,
"value": self.value,
"unit": self.unit,
"timestamp": self.timestamp.isoformat(),
"tags": self.tags
}
@dataclass
class ResponseTimeStats:
min: float
max: float
mean: float
median: float
p90: float
p95: float
p99: float
std_dev: float
@classmethod
def from_samples(cls, samples: List[float]) -> "ResponseTimeStats":
if not samples:
return cls(0, 0, 0, 0, 0, 0, 0, 0)
sorted_samples = sorted(samples)
n = len(sorted_samples)
return cls(
min=sorted_samples[0],
max=sorted_samples[-1],
mean=statistics.mean(samples),
median=statistics.median(samples),
p90=sorted_samples[int(n * 0.9)] if n > 0 else 0,
p95=sorted_samples[int(n * 0.95)] if n > 0 else 0,
p99=sorted_samples[int(n * 0.99)] if n > 0 else 0,
std_dev=statistics.stdev(samples) if n > 1 else 0
)
@dataclass
class LoadTestResult:
total_requests: int
successful_requests: int
failed_requests: int
total_duration: float
requests_per_second: float
response_times: ResponseTimeStats
errors: Dict[str, int] = field(default_factory=dict)
@property
def success_rate(self) -> float:
if self.total_requests == 0:
return 0.0
return self.successful_requests / self.total_requests
@property
def error_rate(self) -> float:
return 1.0 - self.success_rate
def to_dict(self) -> Dict:
return {
"total_requests": self.total_requests,
"successful_requests": self.successful_requests,
"failed_requests": self.failed_requests,
"total_duration": self.total_duration,
"requests_per_second": self.requests_per_second,
"success_rate": f"{self.success_rate:.2%}",
"error_rate": f"{self.error_rate:.2%}",
"response_times": {
"min": f"{self.response_times.min:.3f}s",
"max": f"{self.response_times.max:.3f}s",
"mean": f"{self.response_times.mean:.3f}s",
"median": f"{self.response_times.median:.3f}s",
"p90": f"{self.response_times.p90:.3f}s",
"p95": f"{self.response_times.p95:.3f}s",
"p99": f"{self.response_times.p99:.3f}s"
},
"errors": self.errors
}
class PerformanceCollector:
def __init__(self):
self._metrics: List[PerformanceMetric] = []
self._response_times: List[float] = []
self._errors: Dict[str, int] = {}
def record_response_time(self, duration: float) -> None:
self._response_times.append(duration)
def record_error(self, error_type: str) -> None:
self._errors[error_type] = self._errors.get(error_type, 0) + 1
def record_metric(self, name: str, value: float, unit: str, tags: Dict = None) -> None:
self._metrics.append(PerformanceMetric(
name=name,
value=value,
unit=unit,
tags=tags or {}
))
def get_response_time_stats(self) -> ResponseTimeStats:
return ResponseTimeStats.from_samples(self._response_times)
def get_metrics(self, name: str = None) -> List[PerformanceMetric]:
if name:
return [m for m in self._metrics if m.name == name]
return self._metrics.copy()
def clear(self) -> None:
self._metrics.clear()
self._response_times.clear()
self._errors.clear()
class SimpleLoadTester:
def __init__(self, target_func: callable, concurrency: int = 10):
self.target_func = target_func
self.concurrency = concurrency
self._collector = PerformanceCollector()
def run(self, duration: float = 60, ramp_up: float = 0) -> LoadTestResult:
import threading
import queue
start_time = time.time()
results_queue = queue.Queue()
stop_event = threading.Event()
def worker():
while not stop_event.is_set():
request_start = time.time()
try:
result = self.target_func()
duration = time.time() - request_start
results_queue.put(("success", duration))
except Exception as e:
results_queue.put(("error", str(type(e).__name__)))
workers = []
for i in range(self.concurrency):
if ramp_up > 0:
time.sleep(ramp_up / self.concurrency)
t = threading.Thread(target=worker)
t.start()
workers.append(t)
time.sleep(duration)
stop_event.set()
for t in workers:
t.join(timeout=1)
total_duration = time.time() - start_time
response_times = []
successful = 0
failed = 0
errors = {}
while not results_queue.empty():
status, value = results_queue.get()
if status == "success":
successful += 1
response_times.append(value)
else:
failed += 1
errors[value] = errors.get(value, 0) + 1
total_requests = successful + failed
return LoadTestResult(
total_requests=total_requests,
successful_requests=successful,
failed_requests=failed,
total_duration=total_duration,
requests_per_second=total_requests / total_duration,
response_times=ResponseTimeStats.from_samples(response_times),
errors=errors
)39.1.2 基准测试
python
import timeit
import functools
from typing import Callable, List, Dict, Any, Tuple
from dataclasses import dataclass
@dataclass
class BenchmarkResult:
name: str
iterations: int
total_time: float
avg_time: float
min_time: float
max_time: float
ops_per_second: float
def __str__(self) -> str:
return (
f"{self.name}:\n"
f" Iterations: {self.iterations}\n"
f" Total time: {self.total_time:.6f}s\n"
f" Average: {self.avg_time * 1000:.4f}ms\n"
f" Min: {self.min_time * 1000:.4f}ms\n"
f" Max: {self.max_time * 1000:.4f}ms\n"
f" Ops/sec: {self.ops_per_second:.2f}"
)
class Benchmark:
def __init__(self, name: str = None, iterations: int = 1000, warmup: int = 10):
self.name = name
self.iterations = iterations
self.warmup = warmup
self._results: List[BenchmarkResult] = []
def run(self, func: Callable, *args, **kwargs) -> BenchmarkResult:
name = self.name or func.__name__
for _ in range(self.warmup):
func(*args, **kwargs)
times = []
for _ in range(self.iterations):
start = timeit.default_timer()
func(*args, **kwargs)
end = timeit.default_timer()
times.append(end - start)
total_time = sum(times)
avg_time = total_time / self.iterations
min_time = min(times)
max_time = max(times)
ops_per_second = self.iterations / total_time if total_time > 0 else 0
result = BenchmarkResult(
name=name,
iterations=self.iterations,
total_time=total_time,
avg_time=avg_time,
min_time=min_time,
max_time=max_time,
ops_per_second=ops_per_second
)
self._results.append(result)
return result
def compare(self, *funcs: Tuple[Callable, ...]) -> Dict[str, BenchmarkResult]:
results = {}
for func in funcs:
result = self.run(func)
results[func.__name__] = result
return results
def get_results(self) -> List[BenchmarkResult]:
return self._results.copy()
def print_summary(self) -> None:
for result in self._results:
print(result)
print()
def benchmark(iterations: int = 1000, warmup: int = 10):
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(*args, **kwargs):
bench = Benchmark(func.__name__, iterations, warmup)
result = bench.run(func, *args, **kwargs)
print(result)
return result
return wrapper
return decorator
class ComparisonBenchmark:
def __init__(self, name: str = None):
self.name = name
self._functions: List[Tuple[str, Callable]] = []
def add_function(self, name: str, func: Callable) -> None:
self._functions.append((name, func))
def run(self, iterations: int = 1000, warmup: int = 10) -> Dict[str, BenchmarkResult]:
results = {}
for name, func in self._functions:
bench = Benchmark(name, iterations, warmup)
result = bench.run(func)
results[name] = result
return results
def print_comparison(self, results: Dict[str, BenchmarkResult]) -> None:
sorted_results = sorted(results.items(), key=lambda x: x[1].avg_time)
print(f"\n{'='*60}")
print(f"Benchmark: {self.name or 'Comparison'}")
print(f"{'='*60}")
print(f"{'Name':<20} {'Avg Time':<15} {'Ops/sec':<15} {'Relative':<10}")
print(f"{'-'*60}")
baseline = sorted_results[0][1].avg_time
for name, result in sorted_results:
relative = result.avg_time / baseline
print(f"{name:<20} {result.avg_time*1000:>10.4f}ms {result.ops_per_second:>14.2f} {relative:>9.2f}x")
print(f"{'='*60}\n")
def benchmark_example():
def list_comprehension():
return [x * 2 for x in range(1000)]
def map_function():
return list(map(lambda x: x * 2, range(1000)))
def for_loop():
result = []
for x in range(1000):
result.append(x * 2)
return result
comparison = ComparisonBenchmark("List Creation Methods")
comparison.add_function("list_comprehension", list_comprehension)
comparison.add_function("map_function", map_function)
comparison.add_function("for_loop", for_loop)
results = comparison.run(iterations=10000)
comparison.print_comparison(results)39.2 Locust负载测试
39.2.1 Locust基础
python
from locust import HttpUser, task, between, events
from locust.runners import MasterRunner, WorkerRunner
from locust.stats import stats_printer, stats_history
from typing import Dict, List, Optional
import json
import time
class ApiUser(HttpUser):
wait_time = between(1, 3)
host = "http://localhost:8000"
def on_start(self):
self.login()
def login(self):
response = self.client.post("/api/login", json={
"username": "testuser",
"password": "testpass"
})
if response.status_code == 200:
self.token = response.json().get("token")
@task(3)
def get_users(self):
self.client.get(
"/api/users",
headers={"Authorization": f"Bearer {getattr(self, 'token', '')}"}
)
@task(2)
def get_user_detail(self):
user_id = 1
self.client.get(
f"/api/users/{user_id}",
headers={"Authorization": f"Bearer {getattr(self, 'token', '')}"}
)
@task(1)
def create_user(self):
self.client.post(
"/api/users",
json={
"name": "Test User",
"email": "test@example.com"
},
headers={"Authorization": f"Bearer {getattr(self, 'token', '')}"}
)
class WebsiteUser(HttpUser):
wait_time = between(2, 5)
@task(10)
def index(self):
self.client.get("/")
@task(5)
def about(self):
self.client.get("/about")
@task(3)
def products(self):
self.client.get("/products")
@task(2)
def product_detail(self):
product_id = 1
self.client.get(f"/products/{product_id}")
@task(1)
def search(self):
self.client.get("/search?q=test")
class LocustConfig:
class Config:
locustfile = "locustfile.py"
host = "http://localhost:8000"
users = 100
spawn_rate = 10
run_time = "5m"
headless = True
html = "report.html"
csv = "results"
only_summary = True
class CustomLoadTest(HttpUser):
wait_time = between(0.5, 2)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._request_count = 0
self._error_count = 0
@task
def api_request(self):
endpoints = ["/api/data", "/api/status", "/api/health"]
endpoint = endpoints[self._request_count % len(endpoints)]
with self.client.get(endpoint, catch_response=True) as response:
self._request_count += 1
if response.status_code == 200:
response.success()
elif response.status_code == 429:
response.failure("Rate limited")
self._error_count += 1
else:
response.failure(f"Unexpected status: {response.status_code}")
self._error_count += 1
@events.test_start.add_listener
def on_test_start(environment, **kwargs):
print("Load test starting...")
@events.test_stop.add_listener
def on_test_stop(environment, **kwargs):
print("Load test completed.")
@events.request.add_listener
def on_request(request_type, name, response_time, response_length, exception, **kwargs):
if exception:
print(f"Request failed: {name} - {exception}")
class LocustRunner:
def __init__(self, config: LocustConfig = None):
self.config = config or LocustConfig()
def run_headless(self) -> Dict:
from locust import main
results = {
"total_requests": 0,
"total_failures": 0,
"avg_response_time": 0,
"requests_per_second": 0
}
return results
def generate_report(self, output_file: str = "report.html") -> None:
print(f"Report saved to {output_file}")39.2.2 高级测试场景
python
from locust import SequentialTaskSet, task, tag
import random
import string
class UserJourney(SequentialTaskSet):
def on_start(self):
self.user_id = None
self.cart_id = None
@task
def register(self):
username = ''.join(random.choices(string.ascii_lowercase, k=8))
response = self.client.post("/api/register", json={
"username": username,
"password": "password123",
"email": f"{username}@example.com"
})
if response.status_code == 201:
self.user_id = response.json().get("user_id")
@task
def login(self):
if not self.user_id:
self.interrupt()
response = self.client.post("/api/login", json={
"username": "testuser",
"password": "password123"
})
if response.status_code == 200:
self.token = response.json().get("token")
@task
def browse_products(self):
for _ in range(random.randint(1, 5)):
self.client.get("/api/products")
@task
def add_to_cart(self):
product_id = random.randint(1, 100)
quantity = random.randint(1, 3)
self.client.post("/api/cart/add", json={
"product_id": product_id,
"quantity": quantity
})
@task
def checkout(self):
self.client.post("/api/checkout")
def on_stop(self):
if self.user_id:
self.client.delete(f"/api/users/{self.user_id}")
class ECommerceUser(HttpUser):
tasks = [UserJourney]
wait_time = between(1, 3)
class StressTestUser(HttpUser):
wait_time = between(0.1, 0.5)
@task
def rapid_requests(self):
for _ in range(10):
self.client.get("/api/health")
class SpikeTestUser(HttpUser):
wait_time = between(0, 0.5)
@task
def spike_request(self):
self.client.get("/api/data")
class SoakTestUser(HttpUser):
wait_time = between(5, 10)
@task
def normal_usage(self):
self.client.get("/api/dashboard")
class TestDataGenerator:
@staticmethod
def generate_user_data() -> Dict:
return {
"username": ''.join(random.choices(string.ascii_lowercase, k=8)),
"email": f"{''.join(random.choices(string.ascii_lowercase, k=6))}@example.com",
"age": random.randint(18, 65),
"country": random.choice(["US", "UK", "CN", "JP", "DE"])
}
@staticmethod
def generate_product_data() -> Dict:
return {
"name": f"Product {random.randint(1000, 9999)}",
"price": round(random.uniform(10, 1000), 2),
"category": random.choice(["electronics", "clothing", "books", "home"]),
"stock": random.randint(0, 100)
}
@staticmethod
def generate_order_data(user_id: int) -> Dict:
items = []
for _ in range(random.randint(1, 5)):
items.append({
"product_id": random.randint(1, 100),
"quantity": random.randint(1, 3)
})
return {
"user_id": user_id,
"items": items,
"shipping_address": "123 Main St"
}39.3 代码性能分析
39.3.1 cProfile分析
python
import cProfile
import pstats
import io
from pstats import SortKey
from typing import Callable, Dict, List, Any
from dataclasses import dataclass
import functools
@dataclass
class ProfileResult:
total_calls: int
primitive_calls: int
total_time: float
cum_time: float
functions: List[Dict]
def print_summary(self) -> None:
print(f"Total calls: {self.total_calls}")
print(f"Primitive calls: {self.primitive_calls}")
print(f"Total time: {self.total_time:.4f}s")
print(f"Cumulative time: {self.cum_time:.4f}s")
print("\nTop functions by cumulative time:")
for func in self.functions[:10]:
print(f" {func['name']}: {func['cum_time']:.4f}s ({func['calls']} calls)")
class Profiler:
def __init__(self, sort_by: str = "cumulative"):
self.sort_by = sort_by
self._profile = None
self._stats = None
def profile(self, func: Callable, *args, **kwargs) -> tuple:
self._profile = cProfile.Profile()
self._profile.enable()
result = func(*args, **kwargs)
self._profile.disable()
stream = io.StringIO()
self._stats = pstats.Stats(self._profile, stream=stream)
self._stats.sort_stats(self.sort_by)
return result, self._stats
def print_stats(self, limit: int = 20) -> None:
if self._stats:
self._stats.print_stats(limit)
def get_stats_dict(self) -> Dict:
if not self._stats:
return {}
stats_dict = {}
for func, (cc, nc, tt, ct, callers) in self._stats.stats.items():
filename, line, name = func
stats_dict[f"{filename}:{line}:{name}"] = {
"call_count": cc,
"native_calls": nc,
"total_time": tt,
"cumulative_time": ct
}
return stats_dict
def save_to_file(self, filename: str) -> None:
if self._stats:
self._stats.dump_stats(filename)
def profile(sort_by: str = "cumulative", limit: int = 20):
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(*args, **kwargs):
profiler = Profiler(sort_by)
result, stats = profiler.profile(func, *args, **kwargs)
profiler.print_stats(limit)
return result
return wrapper
return decorator
class LineProfiler:
def __init__(self):
self._functions: List[Callable] = []
self._results: Dict[str, Dict] = {}
def add_function(self, func: Callable) -> None:
self._functions.append(func)
def run(self, func: Callable, *args, **kwargs) -> Any:
try:
from line_profiler import LineProfiler as LP
lp = LP()
for f in self._functions:
lp.add_function(f)
lp.enable_by_count()
result = func(*args, **kwargs)
lp.disable_by_count()
lp.print_stats()
return result
except ImportError:
print("line_profiler not installed. Run: pip install line_profiler")
return func(*args, **kwargs)
class MemoryProfiler:
def __init__(self):
self._snapshots: List[Any] = []
def take_snapshot(self) -> None:
try:
import tracemalloc
if not tracemalloc.is_tracing():
tracemalloc.start()
self._snapshots.append(tracemalloc.take_snapshot())
except ImportError:
print("tracemalloc not available")
def compare_snapshots(self, snapshot1_idx: int = -2, snapshot2_idx: int = -1) -> List:
if len(self._snapshots) < 2:
return []
snapshot1 = self._snapshots[snapshot1_idx]
snapshot2 = self._snapshots[snapshot2_idx]
top_stats = snapshot2.compare_to(snapshot1, "lineno")
results = []
for stat in top_stats[:10]:
results.append({
"file": str(stat),
"size_diff": stat.size_diff,
"count_diff": stat.count_diff
})
return results
def get_memory_usage(self) -> Dict:
try:
import tracemalloc
if not tracemalloc.is_tracing():
tracemalloc.start()
current, peak = tracemalloc.get_traced_memory()
return {
"current_mb": current / 1024 / 1024,
"peak_mb": peak / 1024 / 1024
}
except ImportError:
return {}
def stop(self) -> None:
try:
import tracemalloc
tracemalloc.stop()
except ImportError:
pass
def profile_memory(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(*args, **kwargs):
import tracemalloc
tracemalloc.start()
result = func(*args, **kwargs)
current, peak = tracemalloc.get_traced_memory()
print(f"Memory usage: {current / 1024 / 1024:.2f} MB (peak: {peak / 1024 / 1024:.2f} MB)")
tracemalloc.stop()
return result
return wrapper39.3.2 性能瓶颈识别
python
from typing import List, Tuple, Optional
import time
import sys
class PerformanceAnalyzer:
def __init__(self):
self._bottlenecks: List[Dict] = []
def analyze_function(self, func: Callable, *args, **kwargs) -> Dict:
import dis
import inspect
analysis = {
"name": func.__name__,
"is_generator": inspect.isgeneratorfunction(func),
"is_coroutine": inspect.iscoroutinefunction(func),
"bytecode_instructions": len(list(dis.get_instructions(func))),
"complexity_estimate": self._estimate_complexity(func)
}
return analysis
def _estimate_complexity(self, func: Callable) -> str:
import ast
import inspect
try:
source = inspect.getsource(func)
tree = ast.parse(source)
loop_count = 0
for node in ast.walk(tree):
if isinstance(node, (ast.For, ast.While)):
loop_count += 1
if loop_count == 0:
return "O(1)"
elif loop_count == 1:
return "O(n)"
else:
return f"O(n^{loop_count})"
except:
return "Unknown"
def identify_bottlenecks(self, code: str) -> List[Dict]:
import ast
bottlenecks = []
try:
tree = ast.parse(code)
for node in ast.walk(tree):
if isinstance(node, ast.For):
if isinstance(node.iter, ast.Call):
if hasattr(node.iter.func, 'id'):
if node.iter.func.id == 'range':
bottlenecks.append({
"type": "loop",
"line": node.lineno,
"description": "Potential O(n) loop"
})
elif isinstance(node, ast.Call):
if hasattr(node.func, 'attr'):
if node.func.attr in ['append', 'extend', 'insert']:
bottlenecks.append({
"type": "list_operation",
"line": node.lineno,
"description": f"List {node.func.attr} may be slow in loops"
})
elif isinstance(node, ast.BinOp):
if isinstance(node.op, (ast.Add, ast.Mult)):
if isinstance(node.left, ast.Str) or isinstance(node.right, ast.Str):
bottlenecks.append({
"type": "string_concat",
"line": node.lineno,
"description": "String concatenation in loop may be slow"
})
except SyntaxError:
pass
return bottlenecks
def suggest_optimizations(self, bottlenecks: List[Dict]) -> List[str]:
suggestions = []
for bottleneck in bottlenecks:
if bottleneck["type"] == "loop":
suggestions.append(
f"Line {bottleneck['line']}: Consider using list comprehension or map()"
)
elif bottleneck["type"] == "list_operation":
suggestions.append(
f"Line {bottleneck['line']}: Pre-allocate list size if possible"
)
elif bottleneck["type"] == "string_concat":
suggestions.append(
f"Line {bottleneck['line']}: Use ''.join() or f-strings instead of +"
)
return suggestions
class CodeOptimizer:
@staticmethod
def optimize_list_creation(size: int) -> List[int]:
return [0] * size
@staticmethod
def optimize_string_concat(strings: List[str]) -> str:
return ''.join(strings)
@staticmethod
def optimize_dict_lookup(data: Dict, keys: List[str]) -> List[Any]:
return [data.get(key) for key in keys]
@staticmethod
def optimize_set_operations(list1: List, list2: List) -> set:
return set(list1) & set(list2)
@staticmethod
def optimize_loop_with_local(func: Callable, data: List) -> List:
local_func = func
return [local_func(item) for item in data]
def compare_implementations():
import timeit
def slow_string_concat(n):
result = ""
for i in range(n):
result += str(i)
return result
def fast_string_concat(n):
return ''.join(str(i) for i in range(n))
n = 10000
slow_time = timeit.timeit(lambda: slow_string_concat(n), number=10)
fast_time = timeit.timeit(lambda: fast_string_concat(n), number=10)
print(f"Slow method: {slow_time:.4f}s")
print(f"Fast method: {fast_time:.4f}s")
print(f"Speedup: {slow_time / fast_time:.2f}x")39.4 性能优化策略
39.4.1 算法优化
python
from functools import lru_cache
from typing import List, Dict, Set, Tuple, Optional
import heapq
from collections import defaultdict, Counter, deque
class AlgorithmOptimizations:
@staticmethod
@lru_cache(maxsize=128)
def fibonacci_cached(n: int) -> int:
if n < 2:
return n
return AlgorithmOptimizations.fibonacci_cached(n - 1) + AlgorithmOptimizations.fibonacci_cached(n - 2)
@staticmethod
def fibonacci_iterative(n: int) -> int:
if n < 2:
return n
a, b = 0, 1
for _ in range(2, n + 1):
a, b = b, a + b
return b
@staticmethod
def binary_search(arr: List[int], target: int) -> int:
left, right = 0, len(arr) - 1
while left <= right:
mid = (left + right) // 2
if arr[mid] == target:
return mid
elif arr[mid] < target:
left = mid + 1
else:
right = mid - 1
return -1
@staticmethod
def two_sum_optimized(nums: List[int], target: int) -> Tuple[int, int]:
seen = {}
for i, num in enumerate(nums):
complement = target - num
if complement in seen:
return seen[complement], i
seen[num] = i
return -1, -1
@staticmethod
def merge_sorted_arrays(arr1: List[int], arr2: List[int]) -> List[int]:
result = []
i, j = 0, 0
while i < len(arr1) and j < len(arr2):
if arr1[i] <= arr2[j]:
result.append(arr1[i])
i += 1
else:
result.append(arr2[j])
j += 1
result.extend(arr1[i:])
result.extend(arr2[j:])
return result
@staticmethod
def find_top_k(nums: List[int], k: int) -> List[int]:
return heapq.nlargest(k, nums)
@staticmethod
def group_by_frequency(items: List[str]) -> Dict[str, int]:
return Counter(items)
class DataStructureOptimizations:
def __init__(self):
self._cache: Dict[str, Any] = {}
self._lru_cache: deque = deque(maxlen=100)
def use_set_for_membership(self, items: List[int], target: int) -> bool:
item_set = set(items)
return target in item_set
def use_dict_for_counting(self, items: List[str]) -> Dict[str, int]:
counts = defaultdict(int)
for item in items:
counts[item] += 1
return dict(counts)
def use_heap_for_priority(self, items: List[Tuple[int, str]]) -> List[str]:
heapq.heapify(items)
result = []
while items:
_, value = heapq.heappop(items)
result.append(value)
return result
def use_deque_for_queue(self, items: List[int]) -> List[int]:
queue = deque(items)
result = []
while queue:
result.append(queue.popleft())
return result
class CachingStrategies:
def __init__(self, max_size: int = 100):
self.max_size = max_size
self._cache: Dict[str, Any] = {}
self._access_order: List[str] = []
def get(self, key: str) -> Optional[Any]:
if key in self._cache:
self._access_order.remove(key)
self._access_order.append(key)
return self._cache[key]
return None
def set(self, key: str, value: Any) -> None:
if key in self._cache:
self._access_order.remove(key)
elif len(self._cache) >= self.max_size:
oldest = self._access_order.pop(0)
del self._cache[oldest]
self._cache[key] = value
self._access_order.append(key)
def clear(self) -> None:
self._cache.clear()
self._access_order.clear()
class LRUCache:
def __init__(self, capacity: int):
self.capacity = capacity
self._cache: Dict[str, Any] = {}
self._order: deque = deque()
def get(self, key: str) -> Optional[Any]:
if key in self._cache:
self._order.remove(key)
self._order.append(key)
return self._cache[key]
return None
def put(self, key: str, value: Any) -> None:
if key in self._cache:
self._order.remove(key)
elif len(self._cache) >= self.capacity:
oldest = self._order.popleft()
del self._cache[oldest]
self._cache[key] = value
self._order.append(key)39.4.2 并发优化
python
import threading
import multiprocessing
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
from typing import Callable, List, Any, Tuple
import asyncio
class ConcurrencyOptimizer:
@staticmethod
def parallel_map(func: Callable, items: List[Any], max_workers: int = None) -> List[Any]:
with ThreadPoolExecutor(max_workers=max_workers) as executor:
results = list(executor.map(func, items))
return results
@staticmethod
def parallel_map_process(func: Callable, items: List[Any], max_workers: int = None) -> List[Any]:
with ProcessPoolExecutor(max_workers=max_workers) as executor:
results = list(executor.map(func, items))
return results
@staticmethod
def parallel_submit(
func: Callable,
items: List[Tuple],
max_workers: int = None
) -> List[Any]:
results = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(func, *args) for args in items]
for future in as_completed(futures):
results.append(future.result())
return results
@staticmethod
async def async_map(func: Callable, items: List[Any]) -> List[Any]:
tasks = [func(item) for item in items]
return await asyncio.gather(*tasks)
class ThreadSafeCache:
def __init__(self, max_size: int = 100):
self.max_size = max_size
self._cache: Dict[str, Any] = {}
self._lock = threading.Lock()
def get(self, key: str) -> Optional[Any]:
with self._lock:
return self._cache.get(key)
def set(self, key: str, value: Any) -> None:
with self._lock:
if len(self._cache) >= self.max_size and key not in self._cache:
self._cache.pop(next(iter(self._cache)))
self._cache[key] = value
def delete(self, key: str) -> None:
with self._lock:
self._cache.pop(key, None)
class BatchProcessor:
def __init__(self, batch_size: int = 100, max_workers: int = 4):
self.batch_size = batch_size
self.max_workers = max_workers
def process_in_batches(
self,
items: List[Any],
process_func: Callable[[List[Any]], List[Any]]
) -> List[Any]:
batches = [
items[i:i + self.batch_size]
for i in range(0, len(items), self.batch_size)
]
results = []
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = [executor.submit(process_func, batch) for batch in batches]
for future in as_completed(futures):
results.extend(future.result())
return results
class ConnectionPool:
def __init__(self, create_connection: Callable, max_connections: int = 10):
self._create_connection = create_connection
self._max_connections = max_connections
self._pool: List[Any] = []
self._lock = threading.Lock()
self._semaphore = threading.Semaphore(max_connections)
def get_connection(self) -> Any:
self._semaphore.acquire()
with self._lock:
if self._pool:
return self._pool.pop()
return self._create_connection()
def return_connection(self, conn: Any) -> None:
with self._lock:
self._pool.append(conn)
self._semaphore.release()
@property
def available_connections(self) -> int:
with self._lock:
return len(self._pool)39.5 本章小结
本章详细介绍了Python性能测试与优化的核心概念和实践:
- 性能指标:响应时间、吞吐量、并发数、资源利用率
- 性能测试工具:Locust负载测试、基准测试
- 代码分析:cProfile、line_profiler、memory_profiler
- 算法优化:时间复杂度、空间复杂度、缓存策略
- 并发优化:多线程、多进程、异步IO
- 性能监控:指标收集、瓶颈识别、优化建议
练习题
- 实现一个性能测试框架,支持多种负载模式
- 开发一个代码性能分析工具,自动识别性能瓶颈
- 实现一个智能缓存系统,支持多种缓存策略
- 开发一个数据库查询优化器,自动优化SQL查询
- 实现一个分布式性能监控系统,支持实时监控和告警