第29章 网络爬虫
学习目标
完成本章学习后,你将能够:
- 理解网络爬虫原理:HTTP协议、网页结构、爬虫工作流程
- 使用requests库:发送HTTP请求、处理响应、会话管理
- 解析HTML文档:BeautifulSoup、XPath、CSS选择器
- 构建爬虫框架:URL管理、请求调度、数据提取管道
- 使用Scrapy框架:Spider开发、中间件、管道组件
- 处理反爬机制:User-Agent轮换、代理IP、验证码识别
- 实现异步爬虫:aiohttp、异步IO、并发控制
- 遵守爬虫伦理:robots.txt、请求频率控制、数据合规
29.1 网络爬虫基础
29.1.1 HTTP协议回顾
HTTP(HyperText Transfer Protocol)是Web通信的基础协议:
┌─────────────────────────────────────────────────────────────────────┐
│ HTTP请求/响应流程 │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ 客户端 (Client) 服务器 (Server) │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ │ ──── 请求 ────► │ │ │
│ │ Browser │ │ Web App │ │
│ │ / Crawler │ ◄─── 响应 ──── │ / API │ │
│ │ │ │ │ │
│ └─────────────┘ └─────────────┘ │
│ │
│ 请求 (Request): │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ GET /api/users HTTP/1.1 │ │
│ │ Host: example.com │ │
│ │ User-Agent: Mozilla/5.0 ... │ │
│ │ Accept: application/json │ │
│ │ Cookie: session_id=abc123 │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │
│ 响应 (Response): │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ HTTP/1.1 200 OK │ │
│ │ Content-Type: application/json │ │
│ │ Set-Cookie: session_id=xyz789 │ │
│ │ │ │
│ │ {"users": [{"id": 1, "name": "Alice"}]} │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘29.1.2 常见HTTP状态码
| 状态码 | 类别 | 说明 |
|---|---|---|
| 200 | 成功 | 请求成功 |
| 301/302 | 重定向 | 资源已移动 |
| 400 | 客户端错误 | 请求格式错误 |
| 401 | 认证错误 | 需要登录 |
| 403 | 禁止访问 | 无权限 |
| 404 | 未找到 | 资源不存在 |
| 429 | 请求过多 | 触发限流 |
| 500 | 服务器错误 | 内部错误 |
| 503 | 服务不可用 | 服务器过载 |
29.1.3 爬虫工作流程
python
from dataclasses import dataclass, field
from typing import Optional, List, Dict, Any
from enum import Enum
from datetime import datetime
import hashlib
class RequestStatus(Enum):
PENDING = "pending"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
FAILED = "failed"
SKIPPED = "skipped"
@dataclass
class Request:
url: str
method: str = "GET"
headers: Dict[str, str] = field(default_factory=dict)
params: Dict[str, Any] = field(default_factory=dict)
data: Optional[Dict[str, Any]] = None
cookies: Dict[str, str] = field(default_factory=dict)
priority: int = 0
retry_count: int = 0
max_retries: int = 3
status: RequestStatus = RequestStatus.PENDING
created_at: datetime = field(default_factory=datetime.now)
@property
def fingerprint(self) -> str:
unique_str = f"{self.method}:{self.url}:{sorted(self.params.items())}"
return hashlib.md5(unique_str.encode()).hexdigest()
@dataclass
class Response:
url: str
status_code: int
headers: Dict[str, str]
content: bytes
text: str
elapsed: float
request: Request
@property
def ok(self) -> bool:
return 200 <= self.status_code < 300
@property
def is_redirect(self) -> bool:
return self.status_code in (301, 302, 303, 307, 308)
@dataclass
class Item:
data: Dict[str, Any]
url: str
extracted_at: datetime = field(default_factory=datetime.now)
metadata: Dict[str, Any] = field(default_factory=dict)29.2 requests库详解
29.2.1 基本请求操作
python
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from typing import Optional, Dict, Any, Union
import time
import logging
logger = logging.getLogger(__name__)
class HttpClient:
def __init__(
self,
base_url: Optional[str] = None,
timeout: int = 30,
max_retries: int = 3,
backoff_factor: float = 0.5,
user_agent: Optional[str] = None
):
self.base_url = base_url.rstrip("/") if base_url else None
self.timeout = timeout
self.session = requests.Session()
retry_strategy = Retry(
total=max_retries,
backoff_factor=backoff_factor,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["HEAD", "GET", "OPTIONS", "POST", "PUT", "DELETE"]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
self.session.mount("http://", adapter)
self.session.mount("https://", adapter)
if user_agent:
self.session.headers["User-Agent"] = user_agent
def _build_url(self, endpoint: str) -> str:
if self.base_url:
return f"{self.base_url}/{endpoint.lstrip('/')}"
return endpoint
def request(
self,
method: str,
url: str,
params: Optional[Dict[str, Any]] = None,
data: Optional[Union[Dict[str, Any], str, bytes]] = None,
json: Optional[Any] = None,
headers: Optional[Dict[str, str]] = None,
cookies: Optional[Dict[str, str]] = None,
**kwargs
) -> requests.Response:
url = self._build_url(url)
start_time = time.time()
try:
response = self.session.request(
method=method.upper(),
url=url,
params=params,
data=data,
json=json,
headers=headers,
cookies=cookies,
timeout=kwargs.pop("timeout", self.timeout),
**kwargs
)
elapsed = time.time() - start_time
logger.info(
f"{method.upper()} {url} - {response.status_code} - {elapsed:.2f}s"
)
return response
except requests.exceptions.RequestException as e:
logger.error(f"Request failed: {method.upper()} {url} - {e}")
raise
def get(self, url: str, **kwargs) -> requests.Response:
return self.request("GET", url, **kwargs)
def post(self, url: str, **kwargs) -> requests.Response:
return self.request("POST", url, **kwargs)
def put(self, url: str, **kwargs) -> requests.Response:
return self.request("PUT", url, **kwargs)
def delete(self, url: str, **kwargs) -> requests.Response:
return self.request("DELETE", url, **kwargs)
def close(self) -> None:
self.session.close()
def __enter__(self) -> "HttpClient":
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
self.close()
class SessionManager:
def __init__(self, client: HttpClient):
self.client = client
self._logged_in = False
def login(
self,
login_url: str,
username: str,
password: str,
username_field: str = "username",
password_field: str = "password"
) -> bool:
response = self.client.post(
login_url,
data={
username_field: username,
password_field: password
}
)
if response.status_code == 200:
self._logged_in = True
logger.info("Login successful")
return True
logger.warning(f"Login failed: {response.status_code}")
return False
def logout(self, logout_url: str) -> bool:
if not self._logged_in:
return True
response = self.client.get(logout_url)
self._logged_in = False
return response.status_code == 200
@property
def cookies(self) -> Dict[str, str]:
return dict(self.client.session.cookies)
@property
def is_logged_in(self) -> bool:
return self._logged_in29.2.2 高级请求功能
python
import json
from typing import Iterator, Generator
from contextlib import contextmanager
class AdvancedHttpClient(HttpClient):
def download_file(
self,
url: str,
save_path: str,
chunk_size: int = 8192,
progress_callback: Optional[callable] = None
) -> str:
response = self.get(url, stream=True)
response.raise_for_status()
total_size = int(response.headers.get("content-length", 0))
downloaded = 0
with open(save_path, "wb") as f:
for chunk in response.iter_content(chunk_size=chunk_size):
if chunk:
f.write(chunk)
downloaded += len(chunk)
if progress_callback and total_size:
progress_callback(downloaded, total_size)
logger.info(f"Downloaded: {save_path}")
return save_path
def upload_file(
self,
url: str,
file_path: str,
file_field: str = "file",
additional_data: Optional[Dict[str, Any]] = None
) -> requests.Response:
with open(file_path, "rb") as f:
files = {file_field: (file_path.split("/")[-1], f)}
data = additional_data or {}
return self.post(url, files=files, data=data)
def paginated_request(
self,
url: str,
page_param: str = "page",
start_page: int = 1,
max_pages: Optional[int] = None,
stop_condition: Optional[callable] = None,
**kwargs
) -> Generator[requests.Response, None, None]:
page = start_page
while True:
if max_pages and page > max_pages:
break
params = kwargs.pop("params", {})
params[page_param] = page
response = self.get(url, params=params, **kwargs)
if stop_condition and stop_condition(response):
break
yield response
page += 1
def json_api_request(
self,
method: str,
url: str,
data: Optional[Dict[str, Any]] = None,
api_key: Optional[str] = None
) -> Dict[str, Any]:
headers = {"Content-Type": "application/json", "Accept": "application/json"}
if api_key:
headers["Authorization"] = f"Bearer {api_key}"
response = self.request(method, url, json=data, headers=headers)
response.raise_for_status()
return response.json()
@contextmanager
def rate_limit(self, requests_per_second: float):
min_interval = 1.0 / requests_per_second
last_request_time = [0.0]
original_request = self.request
def rate_limited_request(*args, **kwargs):
elapsed = time.time() - last_request_time[0]
if elapsed < min_interval:
time.sleep(min_interval - elapsed)
result = original_request(*args, **kwargs)
last_request_time[0] = time.time()
return result
self.request = rate_limited_request
try:
yield self
finally:
self.request = original_request
class ProxyManager:
def __init__(self, proxies: Optional[List[str]] = None):
self.proxies = proxies or []
self._current_index = 0
self._failed_proxies = set()
def add_proxy(self, proxy: str) -> None:
if proxy not in self.proxies:
self.proxies.append(proxy)
def get_proxy(self) -> Optional[Dict[str, str]]:
if not self.proxies:
return None
attempts = 0
while attempts < len(self.proxies):
proxy = self.proxies[self._current_index]
self._current_index = (self._current_index + 1) % len(self.proxies)
if proxy not in self._failed_proxies:
return {
"http": proxy,
"https": proxy
}
attempts += 1
return None
def mark_failed(self, proxy: str) -> None:
self._failed_proxies.add(proxy)
def rotate_proxy(self) -> None:
if self.proxies:
self._current_index = (self._current_index + 1) % len(self.proxies)
class UserAgentRotator:
DEFAULT_USER_AGENTS = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:121.0) Gecko/20100101 Firefox/121.0",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.2 Safari/605.1.15",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
]
def __init__(self, user_agents: Optional[List[str]] = None):
self.user_agents = user_agents or self.DEFAULT_USER_AGENTS
self._current_index = 0
def get_user_agent(self) -> str:
user_agent = self.user_agents[self._current_index]
self._current_index = (self._current_index + 1) % len(self.user_agents)
return user_agent
def get_headers(self) -> Dict[str, str]:
return {
"User-Agent": self.get_user_agent(),
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"Accept-Encoding": "gzip, deflate, br",
"Connection": "keep-alive",
}29.3 HTML解析
29.3.1 BeautifulSoup详解
python
from bs4 import BeautifulSoup, Tag, NavigableString
from typing import List, Optional, Union
import re
class HTMLParser:
def __init__(self, html: str, parser: str = "html.parser"):
self.soup = BeautifulSoup(html, parser)
self.html = html
def find(
self,
name: Optional[str] = None,
attrs: Optional[Dict[str, Union[str, re.Pattern]]] = None,
**kwargs
) -> Optional[Tag]:
return self.soup.find(name, attrs, **kwargs)
def find_all(
self,
name: Optional[str] = None,
attrs: Optional[Dict[str, Union[str, re.Pattern]]] = None,
limit: Optional[int] = None,
**kwargs
) -> List[Tag]:
return self.soup.find_all(name, attrs, limit=limit, **kwargs)
def select(self, selector: str) -> List[Tag]:
return self.soup.select(selector)
def select_one(self, selector: str) -> Optional[Tag]:
return self.soup.select_one(selector)
def get_text(
self,
element: Optional[Tag] = None,
separator: str = " ",
strip: bool = True
) -> str:
target = element or self.soup
text = target.get_text(separator=separator, strip=strip)
return " ".join(text.split())
def get_attribute(
self,
element: Tag,
attribute: str,
default: Optional[str] = None
) -> Optional[str]:
return element.get(attribute, default)
def find_by_text(
self,
text: Union[str, re.Pattern],
tag: Optional[str] = None
) -> List[Tag]:
return self.soup.find_all(tag, string=text)
def find_by_regex(
self,
pattern: str,
attribute: str = None,
tag: Optional[str] = None
) -> List[Tag]:
regex = re.compile(pattern)
if attribute:
return self.soup.find_all(tag, attrs={attribute: regex})
return self.soup.find_all(tag, string=regex)
def extract_links(self, base_url: Optional[str] = None) -> List[Dict[str, str]]:
links = []
for a in self.find_all("a"):
href = a.get("href")
if href:
if base_url and not href.startswith(("http://", "https://")):
href = f"{base_url.rstrip('/')}/{href.lstrip('/')}"
links.append({
"url": href,
"text": self.get_text(a),
"title": a.get("title", "")
})
return links
def extract_images(self, base_url: Optional[str] = None) -> List[Dict[str, str]]:
images = []
for img in self.find_all("img"):
src = img.get("src") or img.get("data-src")
if src:
if base_url and not src.startswith(("http://", "https://")):
src = f"{base_url.rstrip('/')}/{src.lstrip('/')}"
images.append({
"url": src,
"alt": img.get("alt", ""),
"title": img.get("title", "")
})
return images
def extract_tables(self) -> List[List[List[str]]]:
tables = []
for table in self.find_all("table"):
table_data = []
for row in table.find_all("tr"):
row_data = []
for cell in row.find_all(["td", "th"]):
row_data.append(self.get_text(cell))
if row_data:
table_data.append(row_data)
if table_data:
tables.append(table_data)
return tables
def extract_meta(self) -> Dict[str, str]:
meta_data = {}
for meta in self.find_all("meta"):
name = meta.get("name") or meta.get("property")
content = meta.get("content")
if name and content:
meta_data[name] = content
return meta_data
class ArticleExtractor:
def __init__(self, html: str):
self.parser = HTMLParser(html)
def extract(self) -> Dict[str, Any]:
return {
"title": self.extract_title(),
"author": self.extract_author(),
"publish_date": self.extract_publish_date(),
"content": self.extract_content(),
"summary": self.extract_summary(),
"tags": self.extract_tags(),
"images": self.parser.extract_images(),
}
def extract_title(self) -> str:
title_selectors = [
"h1.article-title",
"h1.title",
"h1",
"title",
'meta[property="og:title"]',
]
for selector in title_selectors:
element = self.parser.select_one(selector)
if element:
if element.name == "meta":
return element.get("content", "")
return self.parser.get_text(element)
return ""
def extract_author(self) -> str:
author_selectors = [
'.author',
'[rel="author"]',
'.byline',
'meta[name="author"]',
]
for selector in author_selectors:
element = self.parser.select_one(selector)
if element:
if element.name == "meta":
return element.get("content", "")
return self.parser.get_text(element)
return ""
def extract_publish_date(self) -> str:
date_selectors = [
'time',
'.publish-date',
'.date',
'meta[property="article:published_time"]',
'meta[name="publish-date"]',
]
for selector in date_selectors:
element = self.parser.select_one(selector)
if element:
if element.name == "meta":
return element.get("content", "")
if element.name == "time":
return element.get("datetime") or self.parser.get_text(element)
return self.parser.get_text(element)
return ""
def extract_content(self) -> str:
content_selectors = [
'article',
'.article-content',
'.content',
'.post-content',
'.entry-content',
'main',
]
for selector in content_selectors:
element = self.parser.select_one(selector)
if element:
for unwanted in element.find_all(['script', 'style', 'nav', 'footer', 'aside']):
unwanted.decompose()
return self.parser.get_text(element)
return ""
def extract_summary(self) -> str:
summary = self.parser.select_one('meta[property="og:description"]')
if summary:
return summary.get("content", "")
summary = self.parser.select_one('meta[name="description"]')
if summary:
return summary.get("content", "")
content = self.extract_content()
return content[:200] + "..." if len(content) > 200 else content
def extract_tags(self) -> List[str]:
tags = []
tag_elements = self.parser.select('.tags a, .tag, [rel="tag"]')
for tag in tag_elements:
tag_text = self.parser.get_text(tag)
if tag_text:
tags.append(tag_text)
meta_keywords = self.parser.select_one('meta[name="keywords"]')
if meta_keywords:
keywords = meta_keywords.get("content", "")
tags.extend([k.strip() for k in keywords.split(",")])
return list(set(tags))29.3.2 XPath解析
python
from lxml import etree
from typing import List as ListType, Any
class XPathParser:
def __init__(self, html: str):
self.tree = etree.HTML(html)
self.html = html
def xpath(self, expression: str) -> ListType[Any]:
return self.tree.xpath(expression)
def xpath_string(self, expression: str) -> str:
result = self.xpath(expression)
if result:
if isinstance(result[0], etree._Element):
return "".join(result[0].itertext())
return str(result[0])
return ""
def xpath_strings(self, expression: str) -> ListType[str]:
results = self.xpath(expression)
strings = []
for result in results:
if isinstance(result, etree._Element):
strings.append("".join(result.itertext()))
else:
strings.append(str(result))
return strings
def xpath_attribute(self, expression: str, attribute: str) -> ListType[str]:
elements = self.xpath(expression)
return [el.get(attribute, "") for el in elements if el.get(attribute)]
def extract_links(self) -> ListType[Dict[str, str]]:
links = []
for a in self.xpath("//a[@href]"):
links.append({
"url": a.get("href"),
"text": "".join(a.itertext()).strip(),
"title": a.get("title", "")
})
return links
def extract_table_data(self, table_xpath: str = "//table") -> ListType[ListType[ListType[str]]]:
tables = []
for table in self.xpath(table_xpath):
table_data = []
for row in table.xpath(".//tr"):
row_data = []
for cell in row.xpath(".//td | .//th"):
row_data.append("".join(cell.itertext()).strip())
if row_data:
table_data.append(row_data)
if table_data:
tables.append(table_data)
return tables
class NewsParser:
def __init__(self, html: str):
self.xpath_parser = XPathParser(html)
self.bs_parser = HTMLParser(html)
def parse_news_list(self, config: Dict[str, str]) -> ListType[Dict[str, str]]:
news_list = []
items = self.xpath_parser.xpath(config.get("item_xpath", "//div[@class='news-item']"))
for item in items:
news = {}
title_xpath = config.get("title_xpath", ".//h2/a")
title_el = item.xpath(title_xpath)
if title_el:
news["title"] = "".join(title_el[0].itertext()).strip()
news["url"] = title_el[0].get("href", "")
date_xpath = config.get("date_xpath", ".//span[@class='date']")
date_el = item.xpath(date_xpath)
if date_el:
news["date"] = "".join(date_el[0].itertext()).strip()
summary_xpath = config.get("summary_xpath", ".//p[@class='summary']")
summary_el = item.xpath(summary_xpath)
if summary_el:
news["summary"] = "".join(summary_el[0].itertext()).strip()
if news.get("title"):
news_list.append(news)
return news_list29.4 爬虫框架设计
29.4.1 URL管理器
python
from urllib.parse import urljoin, urlparse, urlunparse, parse_qs, urlencode
from collections import deque
from typing import Set, Optional, List, Dict, Any
import hashlib
import threading
class URL:
def __init__(
self,
url: str,
method: str = "GET",
params: Optional[Dict[str, Any]] = None,
data: Optional[Dict[str, Any]] = None,
headers: Optional[Dict[str, str]] = None,
cookies: Optional[Dict[str, str]] = None,
priority: int = 0,
depth: int = 0,
parent_url: Optional[str] = None,
meta: Optional[Dict[str, Any]] = None
):
self.raw_url = url
self.parsed = urlparse(url)
self.method = method.upper()
self.params = params or {}
self.data = data or {}
self.headers = headers or {}
self.cookies = cookies or {}
self.priority = priority
self.depth = depth
self.parent_url = parent_url
self.meta = meta or {}
@property
def url(self) -> str:
if self.params:
query_dict = parse_qs(self.parsed.query)
query_dict.update(self.params)
query = urlencode(query_dict, doseq=True)
return urlunparse((
self.parsed.scheme,
self.parsed.netloc,
self.parsed.path,
self.parsed.params,
query,
self.parsed.fragment
))
return self.raw_url
@property
def fingerprint(self) -> str:
unique_str = f"{self.method}:{self.url}"
return hashlib.md5(unique_str.encode()).hexdigest()
@property
def domain(self) -> str:
return self.parsed.netloc
@property
def scheme(self) -> str:
return self.parsed.scheme
def is_valid(self) -> bool:
return bool(self.parsed.scheme and self.parsed.netloc)
def is_same_domain(self, other_url: str) -> bool:
other_parsed = urlparse(other_url)
return self.parsed.netloc == other_parsed.netloc
def join(self, relative_url: str) -> "URL":
absolute = urljoin(self.raw_url, relative_url)
return URL(absolute, depth=self.depth + 1, parent_url=self.raw_url)
def __repr__(self) -> str:
return f"URL({self.url})"
def __eq__(self, other) -> bool:
if isinstance(other, URL):
return self.fingerprint == other.fingerprint
return False
def __hash__(self) -> int:
return hash(self.fingerprint)
class URLManager:
def __init__(self, max_depth: int = 3, max_urls: int = 10000):
self.max_depth = max_depth
self.max_urls = max_urls
self._pending: deque = deque()
self._seen: Set[str] = set()
self._lock = threading.Lock()
self._total_count = 0
def add_url(self, url: URL) -> bool:
if not url.is_valid():
return False
if url.depth > self.max_depth:
return False
with self._lock:
if url.fingerprint in self._seen:
return False
if self._total_count >= self.max_urls:
return False
self._pending.append(url)
self._seen.add(url.fingerprint)
self._total_count += 1
return True
def add_urls(self, urls: List[URL]) -> int:
added = 0
for url in urls:
if self.add_url(url):
added += 1
return added
def get_url(self) -> Optional[URL]:
with self._lock:
if self._pending:
return self._pending.popleft()
return None
def has_pending(self) -> bool:
return len(self._pending) > 0
@property
def pending_count(self) -> int:
return len(self._pending)
@property
def seen_count(self) -> int:
return len(self._seen)
@property
def total_count(self) -> int:
return self._total_count
def clear(self) -> None:
with self._lock:
self._pending.clear()
self._seen.clear()
self._total_count = 0
def is_seen(self, url: URL) -> bool:
return url.fingerprint in self._seen29.4.2 请求调度器
python
import queue
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass, field
from typing import Callable, Optional, List
import time
import logging
logger = logging.getLogger(__name__)
@dataclass
class CrawlResult:
url: URL
response: Optional[requests.Response] = None
error: Optional[Exception] = None
items: List[Any] = field(default_factory=list)
new_urls: List[URL] = field(default_factory=list)
elapsed: float = 0.0
@property
def success(self) -> bool:
return self.error is None and self.response is not None
class RequestScheduler:
def __init__(
self,
http_client: HttpClient,
url_manager: URLManager,
max_workers: int = 5,
request_delay: float = 0.5,
timeout: int = 30
):
self.http_client = http_client
self.url_manager = url_manager
self.max_workers = max_workers
self.request_delay = request_delay
self.timeout = timeout
self._running = False
self._results: queue.Queue = queue.Queue()
self._callbacks: List[Callable] = []
def add_callback(self, callback: Callable) -> None:
self._callbacks.append(callback)
def _fetch(self, url: URL) -> CrawlResult:
start_time = time.time()
try:
time.sleep(self.request_delay)
response = self.http_client.request(
method=url.method,
url=url.url,
headers=url.headers,
cookies=url.cookies,
params=url.params if url.method == "GET" else None,
data=url.data if url.method == "POST" else None,
timeout=self.timeout
)
result = CrawlResult(
url=url,
response=response,
elapsed=time.time() - start_time
)
for callback in self._callbacks:
try:
callback(result)
except Exception as e:
logger.error(f"Callback error: {e}")
return result
except Exception as e:
logger.error(f"Fetch error: {url.url} - {e}")
return CrawlResult(
url=url,
error=e,
elapsed=time.time() - start_time
)
def crawl(
self,
start_urls: List[URL],
parser: Callable[[CrawlResult], tuple],
max_requests: Optional[int] = None
) -> List[Any]:
self.url_manager.add_urls(start_urls)
all_items = []
request_count = 0
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = {}
while self.url_manager.has_pending() or futures:
while self.url_manager.has_pending() and len(futures) < self.max_workers:
if max_requests and request_count >= max_requests:
break
url = self.url_manager.get_url()
if url:
future = executor.submit(self._fetch, url)
futures[future] = url
request_count += 1
if not futures:
break
done_futures = []
for future in as_completed(futures, timeout=1.0):
done_futures.append(future)
for future in done_futures:
url = futures.pop(future)
try:
result = future.result()
self._results.put(result)
if result.success and parser:
items, new_urls = parser(result)
all_items.extend(items)
for new_url in new_urls:
new_url.depth = url.depth + 1
self.url_manager.add_url(new_url)
except Exception as e:
logger.error(f"Future error: {e}")
return all_items
def stop(self) -> None:
self._running = False
class RateLimiter:
def __init__(self, requests_per_second: float):
self.min_interval = 1.0 / requests_per_second
self._last_request_time = 0.0
self._lock = threading.Lock()
def wait(self) -> None:
with self._lock:
now = time.time()
elapsed = now - self._last_request_time
if elapsed < self.min_interval:
time.sleep(self.min_interval - elapsed)
self._last_request_time = time.time()29.4.3 数据管道
python
import json
import csv
from pathlib import Path
from typing import Any, Dict, List, Protocol
from abc import ABC, abstractmethod
import sqlite3
from datetime import datetime
class Pipeline(Protocol):
def process_item(self, item: Dict[str, Any]) -> Dict[str, Any]: ...
def close(self) -> None: ...
class JSONPipeline:
def __init__(self, output_file: str = "output.json", encoding: str = "utf-8"):
self.output_file = Path(output_file)
self.encoding = encoding
self.items: List[Dict[str, Any]] = []
def process_item(self, item: Dict[str, Any]) -> Dict[str, Any]:
item["scraped_at"] = datetime.now().isoformat()
self.items.append(item)
return item
def close(self) -> None:
with open(self.output_file, "w", encoding=self.encoding) as f:
json.dump(self.items, f, ensure_ascii=False, indent=2)
logger.info(f"Saved {len(self.items)} items to {self.output_file}")
class CSVPipeline:
def __init__(
self,
output_file: str = "output.csv",
encoding: str = "utf-8",
fieldnames: Optional[List[str]] = None
):
self.output_file = Path(output_file)
self.encoding = encoding
self.fieldnames = fieldnames
self._file = None
self._writer = None
self._initialized = False
def process_item(self, item: Dict[str, Any]) -> Dict[str, Any]:
if not self._initialized:
self._file = open(self.output_file, "w", encoding=self.encoding, newline="")
if self.fieldnames is None:
self.fieldnames = list(item.keys())
self._writer = csv.DictWriter(self._file, fieldnames=self.fieldnames)
self._writer.writeheader()
self._initialized = True
self._writer.writerow(item)
return item
def close(self) -> None:
if self._file:
self._file.close()
logger.info(f"Saved items to {self.output_file}")
class SQLitePipeline:
def __init__(
self,
db_path: str = "crawl.db",
table_name: str = "items"
):
self.db_path = db_path
self.table_name = table_name
self._conn = None
self._cursor = None
self._initialized = False
def _init_db(self, item: Dict[str, Any]) -> None:
self._conn = sqlite3.connect(self.db_path)
self._cursor = self._conn.cursor()
columns = []
for key, value in item.items():
if isinstance(value, int):
col_type = "INTEGER"
elif isinstance(value, float):
col_type = "REAL"
else:
col_type = "TEXT"
columns.append(f"{key} {col_type}")
columns_sql = ", ".join(columns)
self._cursor.execute(f"""
CREATE TABLE IF NOT EXISTS {self.table_name} (
id INTEGER PRIMARY KEY AUTOINCREMENT,
{columns_sql}
)
""")
self._conn.commit()
self._initialized = True
def process_item(self, item: Dict[str, Any]) -> Dict[str, Any]:
if not self._initialized:
self._init_db(item)
columns = ", ".join(item.keys())
placeholders = ", ".join(["?" for _ in item])
values = tuple(item.values())
self._cursor.execute(
f"INSERT INTO {self.table_name} ({columns}) VALUES ({placeholders})",
values
)
self._conn.commit()
return item
def close(self) -> None:
if self._conn:
self._conn.close()
logger.info(f"Saved items to {self.db_path}")
class DuplicateFilterPipeline:
def __init__(self, key_field: str = "url"):
self.key_field = key_field
self._seen: set = set()
def process_item(self, item: Dict[str, Any]) -> Dict[str, Any]:
key = item.get(self.key_field)
if key in self._seen:
raise DropItem(f"Duplicate item: {key}")
self._seen.add(key)
return item
class DropItem(Exception):
pass
class PipelineManager:
def __init__(self, pipelines: List[Pipeline]):
self.pipelines = pipelines
def process_item(self, item: Dict[str, Any]) -> Dict[str, Any]:
for pipeline in self.pipelines:
try:
item = pipeline.process_item(item)
except DropItem:
raise
except Exception as e:
logger.error(f"Pipeline error: {e}")
return item
def close(self) -> None:
for pipeline in self.pipelines:
try:
pipeline.close()
except Exception as e:
logger.error(f"Pipeline close error: {e}")29.5 Scrapy框架
29.5.1 Spider开发
python
import scrapy
from scrapy import Spider, Request
from scrapy.http import Response
from scrapy.item import Item, Field
from typing import List, Dict, Any, Generator
import re
class NewsItem(scrapy.Item):
title = Field()
url = Field()
author = Field()
publish_date = Field()
content = Field()
summary = Field()
tags = Field()
images = Field()
class BaseSpider(Spider):
custom_settings = {
"CONCURRENT_REQUESTS": 8,
"DOWNLOAD_DELAY": 0.5,
"RANDOMIZE_DOWNLOAD_DELAY": True,
"COOKIES_ENABLED": False,
}
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.stats = {
"pages_crawled": 0,
"items_scraped": 0,
"errors": 0
}
def closed(self, reason: str) -> None:
self.logger.info(f"Spider closed: {reason}")
self.logger.info(f"Stats: {self.stats}")
class NewsSpider(BaseSpider):
name = "news"
allowed_domains = ["example.com"]
start_urls = ["https://example.com/news"]
def parse(self, response: Response) -> Generator:
self.stats["pages_crawled"] += 1
for article in response.css("article.news-item"):
url = article.css("a::attr(href)").get()
if url:
yield response.follow(
url,
callback=self.parse_article,
meta={"depth": response.meta.get("depth", 0) + 1}
)
next_page = response.css("a.next-page::attr(href)").get()
if next_page:
yield response.follow(next_page, callback=self.parse)
def parse_article(self, response: Response) -> Generator:
self.stats["pages_crawled"] += 1
item = NewsItem()
item["url"] = response.url
item["title"] = response.css("h1.title::text").get(default="").strip()
item["author"] = response.css(".author::text").get(default="").strip()
item["publish_date"] = response.css("time::attr(datetime)").get(default="")
item["content"] = " ".join(response.css(".content p::text").getall())
item["summary"] = response.css("meta[name=description]::attr(content)").get(default="")
item["tags"] = response.css(".tags a::text").getall()
item["images"] = response.css(".content img::attr(src)").getall()
self.stats["items_scraped"] += 1
yield item29.5.2 中间件开发
python
from scrapy import signals
from scrapy.http import Request, Response
from scrapy.exceptions import IgnoreRequest
import random
import time
import logging
logger = logging.getLogger(__name__)
class UserAgentMiddleware:
def __init__(self, user_agents: List[str]):
self.user_agents = user_agents
@classmethod
def from_crawler(cls, crawler):
user_agents = crawler.settings.getlist("USER_AGENTS", [])
return cls(user_agents)
def process_request(self, request: Request, spider) -> None:
if self.user_agents:
request.headers["User-Agent"] = random.choice(self.user_agents)
class ProxyMiddleware:
def __init__(self, proxies: List[str], rotate_enabled: bool = True):
self.proxies = proxies
self.rotate_enabled = rotate_enabled
self.current_index = 0
@classmethod
def from_crawler(cls, crawler):
proxies = crawler.settings.getlist("PROXIES", [])
rotate_enabled = crawler.settings.getbool("PROXY_ROTATE_ENABLED", True)
return cls(proxies, rotate_enabled)
def process_request(self, request: Request, spider) -> None:
if not self.proxies:
return
proxy = self._get_proxy()
request.meta["proxy"] = proxy
logger.debug(f"Using proxy: {proxy}")
def _get_proxy(self) -> str:
if self.rotate_enabled:
proxy = self.proxies[self.current_index]
self.current_index = (self.current_index + 1) % len(self.proxies)
return proxy
return random.choice(self.proxies)
def process_exception(self, request: Request, exception, spider):
if "proxy" in request.meta:
proxy = request.meta["proxy"]
if proxy in self.proxies:
logger.warning(f"Removing failed proxy: {proxy}")
self.proxies.remove(proxy)
class RetryMiddleware:
def __init__(
self,
max_retry_times: int = 3,
retry_http_codes: List[int] = None,
retry_delay: float = 1.0
):
self.max_retry_times = max_retry_times
self.retry_http_codes = retry_http_codes or [500, 502, 503, 504, 408, 429]
self.retry_delay = retry_delay
@classmethod
def from_crawler(cls, crawler):
return cls(
max_retry_times=crawler.settings.getint("RETRY_TIMES", 3),
retry_http_codes=crawler.settings.getlist("RETRY_HTTP_CODES"),
retry_delay=crawler.settings.getfloat("RETRY_DELAY", 1.0)
)
def process_response(self, request: Request, response: Response, spider):
if request.meta.get("dont_retry", False):
return response
if response.status in self.retry_http_codes:
return self._retry(request, spider) or response
return response
def process_exception(self, request: Request, exception, spider):
if request.meta.get("dont_retry", False):
return None
return self._retry(request, spider)
def _retry(self, request: Request, spider) -> Request:
retry_times = request.meta.get("retry_times", 0) + 1
if retry_times > self.max_retry_times:
logger.error(f"Gave up retrying {request.url} (failed {retry_times} times)")
return None
logger.warning(f"Retrying {request.url} (attempt {retry_times}/{self.max_retry_times})")
retryreq = request.copy()
retryreq.meta["retry_times"] = retry_times
retryreq.dont_filter = True
retryreq.priority = request.priority - 10
time.sleep(self.retry_delay * retry_times)
return retryreq29.6 异步爬虫
29.6.1 aiohttp异步请求
python
import aiohttp
import asyncio
from typing import List, Dict, Any, Optional, AsyncGenerator
from dataclasses import dataclass
import time
import logging
logger = logging.getLogger(__name__)
@dataclass
class AsyncRequest:
url: str
method: str = "GET"
headers: Dict[str, str] = None
params: Dict[str, Any] = None
data: Dict[str, Any] = None
timeout: int = 30
def __post_init__(self):
self.headers = self.headers or {}
self.params = self.params or {}
self.data = self.data or {}
@dataclass
class AsyncResponse:
url: str
status: int
headers: Dict[str, str]
text: str
json_data: Any
elapsed: float
class AsyncHttpClient:
def __init__(
self,
max_connections: int = 100,
max_per_host: int = 10,
timeout: int = 30,
user_agent: str = None
):
self.max_connections = max_connections
self.max_per_host = max_per_host
self.timeout = aiohttp.ClientTimeout(total=timeout)
self.user_agent = user_agent or "AsyncCrawler/1.0"
self._session: Optional[aiohttp.ClientSession] = None
async def __aenter__(self) -> "AsyncHttpClient":
connector = aiohttp.TCPConnector(
limit=self.max_connections,
limit_per_host=self.max_per_host
)
headers = {"User-Agent": self.user_agent}
self._session = aiohttp.ClientSession(
connector=connector,
timeout=self.timeout,
headers=headers
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
if self._session:
await self._session.close()
async def request(self, req: AsyncRequest) -> AsyncResponse:
start_time = time.time()
try:
async with self._session.request(
method=req.method,
url=req.url,
headers=req.headers,
params=req.params,
data=req.data
) as response:
text = await response.text()
try:
json_data = await response.json()
except:
json_data = None
elapsed = time.time() - start_time
logger.info(f"{req.method} {req.url} - {response.status} - {elapsed:.2f}s")
return AsyncResponse(
url=str(response.url),
status=response.status,
headers=dict(response.headers),
text=text,
json_data=json_data,
elapsed=elapsed
)
except asyncio.TimeoutError:
logger.error(f"Timeout: {req.url}")
raise
except aiohttp.ClientError as e:
logger.error(f"Request error: {req.url} - {e}")
raise
async def get(self, url: str, **kwargs) -> AsyncResponse:
return await self.request(AsyncRequest(url=url, method="GET", **kwargs))
async def post(self, url: str, **kwargs) -> AsyncResponse:
return await self.request(AsyncRequest(url=url, method="POST", **kwargs))
class AsyncCrawler:
def __init__(
self,
max_concurrent: int = 10,
request_delay: float = 0.1
):
self.max_concurrent = max_concurrent
self.request_delay = request_delay
self._semaphore: Optional[asyncio.Semaphore] = None
async def crawl(
self,
urls: List[str],
parser: callable,
max_concurrent: int = None
) -> List[Any]:
self._semaphore = asyncio.Semaphore(max_concurrent or self.max_concurrent)
async with AsyncHttpClient() as client:
tasks = [self._crawl_one(client, url, parser) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
return [r for r in results if not isinstance(r, Exception)]
async def _crawl_one(
self,
client: AsyncHttpClient,
url: str,
parser: callable
) -> Any:
async with self._semaphore:
await asyncio.sleep(self.request_delay)
response = await client.get(url)
return parser(response)
async def async_crawl_example():
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/delay/3",
]
def parser(response: AsyncResponse) -> Dict[str, Any]:
return {
"url": response.url,
"status": response.status,
"elapsed": response.elapsed
}
crawler = AsyncCrawler(max_concurrent=5)
results = await crawler.crawl(urls, parser)
for result in results:
print(result)
if __name__ == "__main__":
asyncio.run(async_crawl_example())29.7 反爬机制应对
29.7.1 常见反爬策略
python
import random
import time
from typing import List, Dict
import hashlib
class AntiDetection:
@staticmethod
def generate_browser_headers() -> Dict[str, str]:
accept_languages = [
"zh-CN,zh;q=0.9,en;q=0.8",
"en-US,en;q=0.9",
"zh-TW,zh;q=0.9,en;q=0.8",
]
accept_encodings = ["gzip, deflate, br", "gzip, deflate"]
return {
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8",
"Accept-Language": random.choice(accept_languages),
"Accept-Encoding": random.choice(accept_encodings),
"Connection": "keep-alive",
"Upgrade-Insecure-Requests": "1",
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "none",
"Sec-Fetch-User": "?1",
"Cache-Control": "max-age=0",
}
@staticmethod
def random_delay(min_delay: float = 0.5, max_delay: float = 2.0) -> None:
delay = random.uniform(min_delay, max_delay)
time.sleep(delay)
@staticmethod
def generate_fingerprint() -> Dict[str, str]:
canvas_hash = hashlib.md5(str(random.random()).encode()).hexdigest()
webgl_hash = hashlib.md5(str(random.random()).encode()).hexdigest()
return {
"canvas": canvas_hash,
"webgl": webgl_hash,
"timezone": str(random.randint(-12, 12)),
"screen_resolution": f"{random.randint(1024, 1920)}x{random.randint(768, 1080)}",
}
class CookieManager:
def __init__(self):
self._cookies: Dict[str, Dict[str, str]] = {}
def add_cookies(self, domain: str, cookies: Dict[str, str]) -> None:
if domain not in self._cookies:
self._cookies[domain] = {}
self._cookies[domain].update(cookies)
def get_cookies(self, domain: str) -> Dict[str, str]:
return self._cookies.get(domain, {})
def clear_cookies(self, domain: str = None) -> None:
if domain:
self._cookies.pop(domain, None)
else:
self._cookies.clear()
class SessionPool:
def __init__(self, pool_size: int = 5):
self.pool_size = pool_size
self._sessions: List[HttpClient] = []
self._current_index = 0
def initialize(self) -> None:
for _ in range(self.pool_size):
client = HttpClient(user_agent=UserAgentRotator().get_user_agent())
self._sessions.append(client)
def get_session(self) -> HttpClient:
session = self._sessions[self._current_index]
self._current_index = (self._current_index + 1) % self.pool_size
return session
def close_all(self) -> None:
for session in self._sessions:
session.close()
self._sessions.clear()29.8 爬虫伦理与合规
29.8.1 robots.txt解析
python
from urllib.robotparser import RobotFileParser
from urllib.parse import urlparse
from typing import Dict, List
import time
class RobotsParser:
def __init__(self, user_agent: str = "*"):
self.user_agent = user_agent
self._parsers: Dict[str, RobotFileParser] = {}
def can_fetch(self, url: str) -> bool:
parsed = urlparse(url)
domain = f"{parsed.scheme}://{parsed.netloc}"
if domain not in self._parsers:
rp = RobotFileParser()
rp.set_url(f"{domain}/robots.txt")
try:
rp.read()
self._parsers[domain] = rp
except Exception:
return True
return self._parsers[domain].can_fetch(self.user_agent, url)
def get_crawl_delay(self, url: str) -> float:
parsed = urlparse(url)
domain = f"{parsed.scheme}://{parsed.netloc}"
if domain in self._parsers:
delay = self._parsers[domain].crawl_delay(self.user_agent)
if delay:
return float(delay)
return 0.0
def get_sitemaps(self, url: str) -> List[str]:
parsed = urlparse(url)
domain = f"{parsed.scheme}://{parsed.netloc}"
if domain in self._parsers:
return list(self._parsers[domain].site_maps() or [])
return []
class CrawlDelay:
def __init__(self, default_delay: float = 1.0):
self.default_delay = default_delay
self._domain_delays: Dict[str, float] = {}
self._last_request: Dict[str, float] = {}
def set_delay(self, domain: str, delay: float) -> None:
self._domain_delays[domain] = delay
def wait(self, domain: str) -> None:
delay = self._domain_delays.get(domain, self.default_delay)
if domain in self._last_request:
elapsed = time.time() - self._last_request[domain]
if elapsed < delay:
time.sleep(delay - elapsed)
self._last_request[domain] = time.time()
class EthicalCrawler:
def __init__(
self,
user_agent: str = "MyCrawler/1.0",
default_delay: float = 1.0,
respect_robots: bool = True
):
self.user_agent = user_agent
self.robots_parser = RobotsParser(user_agent)
self.crawl_delay = CrawlDelay(default_delay)
self.respect_robots = respect_robots
def can_crawl(self, url: str) -> bool:
if not self.respect_robots:
return True
return self.robots_parser.can_fetch(url)
def crawl(self, url: str) -> Optional[requests.Response]:
if not self.can_crawl(url):
print(f"Blocked by robots.txt: {url}")
return None
domain = urlparse(url).netloc
self.crawl_delay.wait(domain)
try:
response = requests.get(url, headers={"User-Agent": self.user_agent})
return response
except Exception as e:
print(f"Crawl error: {e}")
return None29.9 知识图谱
29.9.1 爬虫技术体系
Python爬虫技术体系
┌─────────────────────────────────────────────────────────────┐
│ 爬虫架构层次 │
├─────────────────────────────────────────────────────────────┤
│ URL管理器 → 请求调度器 → 下载器 → 解析器 → 数据存储 │
└─────────────────────────────────────────────────────────────┘
核心组件:
┌─────────────────────────────────────────┐
│ URL管理器 去重、队列、优先级 │
│ 下载器 HTTP请求、重试、代理 │
│ 解析器 HTML/XML/JSON解析 │
│ 数据存储 文件、数据库、消息队列 │
└─────────────────────────────────────────┘
技术栈:
┌─────────────────────────────────────────┐
│ HTTP客户端: requests, aiohttp, httpx │
│ HTML解析: BeautifulSoup, lxml │
│ 框架: Scrapy, PySpider │
│ 异步: asyncio, uvloop │
│ 存储: MongoDB, Redis, MySQL │
└─────────────────────────────────────────┘29.9.2 Scrapy架构
Scrapy框架架构
┌─────────────────────────────────────────────────────────────┐
│ Scrapy Engine │
│ 核心引擎,协调各组件 │
└─────────────────────────────────────────────────────────────┘
│
├──▶ Scheduler (调度器)
│ URL队列管理
│
├──▶ Downloader (下载器)
│ HTTP请求
│
├──▶ Spider (爬虫)
│ 解析逻辑
│
├──▶ Item Pipeline (管道)
│ 数据处理
│
└──▶ Middleware (中间件)
请求/响应处理29.9.3 反爬策略与应对
反爬策略与应对方案
┌─────────────────────────────────────────┐
│ 反爬策略 │ 应对方案 │
├─────────────────────────────────────────┤
│ User-Agent检测 │ UA池轮换 │
│ IP限制 │ 代理池 │
│ 频率限制 │ 请求延迟 │
│ Cookie验证 │ 会话管理 │
│ 验证码 │ OCR/打码平台 │
│ JavaScript渲染 │ Selenium/Playwright │
│ 字体反爬 │ 字体解析 │
│ 动态Token │ 逆向分析 │
└─────────────────────────────────────────┘29.10 技术选型指南
29.10.1 爬虫框架选型
| 场景 | 推荐方案 | 原因 |
|---|---|---|
| 简单爬虫 | requests + BeautifulSoup | 简单直接 |
| 中型项目 | Scrapy | 功能完整 |
| 高并发 | Scrapy + aiohttp | 异步高效 |
| 动态页面 | Playwright | 支持JS渲染 |
| 分布式 | Scrapy-Redis | 分布式支持 |
29.10.2 解析工具选型
| 场景 | 推荐工具 | 原因 |
|---|---|---|
| 简单解析 | BeautifulSoup | 易用性好 |
| 高性能 | lxml | 速度快 |
| 复杂选择 | XPath | 表达力强 |
| JSON API | jsonpath | 专用工具 |
29.10.3 存储方案选型
| 数据量 | 推荐方案 | 原因 |
|---|---|---|
| 小数据 | JSON/CSV | 简单直接 |
| 中数据 | SQLite | 轻量数据库 |
| 大数据 | MongoDB | 灵活存储 |
| 分布式 | Redis + MySQL | 高性能 |
29.11 常见问题与解决方案
29.11.1 编码问题
python
# 问题:网页编码错误
response = requests.get(url)
# response.text 乱码
# 解决方案:自动检测编码
response.encoding = response.apparent_encoding
text = response.text
# 或使用chardet
import chardet
encoding = chardet.detect(response.content)['encoding']
text = response.content.decode(encoding)29.11.2 反爬被封
python
# 问题:IP被封
# 解决方案:使用代理池
import random
proxies_pool = [
{'http': 'http://proxy1:8080'},
{'http': 'http://proxy2:8080'},
]
def get_with_proxy(url):
proxy = random.choice(proxies_pool)
return requests.get(url, proxies=proxy, timeout=10)29.11.3 动态内容
python
# 问题:JavaScript渲染的内容无法获取
# 解决方案:使用Playwright
from playwright.sync_api import sync_playwright
def get_dynamic_content(url):
with sync_playwright() as p:
browser = p.chromium.launch()
page = browser.new_page()
page.goto(url)
content = page.content()
browser.close()
return content29.12 本章小结
本章详细介绍了Python网络爬虫的核心概念和实践:
- HTTP基础:理解请求/响应模型、状态码、Headers
- requests库:会话管理、重试机制、代理支持
- HTML解析:BeautifulSoup和XPath两种解析方式
- 爬虫框架:URL管理、请求调度、数据管道
- Scrapy框架:Spider、Middleware、Pipeline组件
- 异步爬虫:aiohttp实现高性能并发爬取
- 反爬应对:User-Agent轮换、代理、验证码处理
- 爬虫伦理:遵守robots.txt、控制请求频率
练习题
- 编写一个爬虫,爬取新闻网站的标题和内容
- 使用Scrapy框架实现一个电商商品爬虫
- 实现一个支持增量爬取的爬虫系统
- 编写一个异步爬虫,并发爬取多个API接口
- 设计一个分布式爬虫架构