Skip to content

第29章 网络爬虫

学习目标

完成本章学习后,你将能够:

  1. 理解网络爬虫原理:HTTP协议、网页结构、爬虫工作流程
  2. 使用requests库:发送HTTP请求、处理响应、会话管理
  3. 解析HTML文档:BeautifulSoup、XPath、CSS选择器
  4. 构建爬虫框架:URL管理、请求调度、数据提取管道
  5. 使用Scrapy框架:Spider开发、中间件、管道组件
  6. 处理反爬机制:User-Agent轮换、代理IP、验证码识别
  7. 实现异步爬虫:aiohttp、异步IO、并发控制
  8. 遵守爬虫伦理:robots.txt、请求频率控制、数据合规

29.1 网络爬虫基础

29.1.1 HTTP协议回顾

HTTP(HyperText Transfer Protocol)是Web通信的基础协议:

┌─────────────────────────────────────────────────────────────────────┐
│                        HTTP请求/响应流程                              │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  客户端 (Client)                      服务器 (Server)                │
│  ┌─────────────┐                      ┌─────────────┐              │
│  │             │  ──── 请求 ────►     │             │              │
│  │   Browser   │                      │   Web App   │              │
│  │   / Crawler │  ◄─── 响应 ────      │   / API     │              │
│  │             │                      │             │              │
│  └─────────────┘                      └─────────────┘              │
│                                                                     │
│  请求 (Request):                                                    │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │ GET /api/users HTTP/1.1                                     │   │
│  │ Host: example.com                                           │   │
│  │ User-Agent: Mozilla/5.0 ...                                 │   │
│  │ Accept: application/json                                    │   │
│  │ Cookie: session_id=abc123                                   │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                                                     │
│  响应 (Response):                                                   │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │ HTTP/1.1 200 OK                                             │   │
│  │ Content-Type: application/json                              │   │
│  │ Set-Cookie: session_id=xyz789                               │   │
│  │                                                             │   │
│  │ {"users": [{"id": 1, "name": "Alice"}]}                     │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

29.1.2 常见HTTP状态码

状态码类别说明
200成功请求成功
301/302重定向资源已移动
400客户端错误请求格式错误
401认证错误需要登录
403禁止访问无权限
404未找到资源不存在
429请求过多触发限流
500服务器错误内部错误
503服务不可用服务器过载

29.1.3 爬虫工作流程

python
from dataclasses import dataclass, field
from typing import Optional, List, Dict, Any
from enum import Enum
from datetime import datetime
import hashlib


class RequestStatus(Enum):
    PENDING = "pending"
    IN_PROGRESS = "in_progress"
    COMPLETED = "completed"
    FAILED = "failed"
    SKIPPED = "skipped"


@dataclass
class Request:
    url: str
    method: str = "GET"
    headers: Dict[str, str] = field(default_factory=dict)
    params: Dict[str, Any] = field(default_factory=dict)
    data: Optional[Dict[str, Any]] = None
    cookies: Dict[str, str] = field(default_factory=dict)
    priority: int = 0
    retry_count: int = 0
    max_retries: int = 3
    status: RequestStatus = RequestStatus.PENDING
    created_at: datetime = field(default_factory=datetime.now)

    @property
    def fingerprint(self) -> str:
        unique_str = f"{self.method}:{self.url}:{sorted(self.params.items())}"
        return hashlib.md5(unique_str.encode()).hexdigest()


@dataclass
class Response:
    url: str
    status_code: int
    headers: Dict[str, str]
    content: bytes
    text: str
    elapsed: float
    request: Request

    @property
    def ok(self) -> bool:
        return 200 <= self.status_code < 300

    @property
    def is_redirect(self) -> bool:
        return self.status_code in (301, 302, 303, 307, 308)


@dataclass
class Item:
    data: Dict[str, Any]
    url: str
    extracted_at: datetime = field(default_factory=datetime.now)
    metadata: Dict[str, Any] = field(default_factory=dict)

29.2 requests库详解

29.2.1 基本请求操作

python
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from typing import Optional, Dict, Any, Union
import time
import logging

logger = logging.getLogger(__name__)


class HttpClient:
    def __init__(
        self,
        base_url: Optional[str] = None,
        timeout: int = 30,
        max_retries: int = 3,
        backoff_factor: float = 0.5,
        user_agent: Optional[str] = None
    ):
        self.base_url = base_url.rstrip("/") if base_url else None
        self.timeout = timeout
        self.session = requests.Session()

        retry_strategy = Retry(
            total=max_retries,
            backoff_factor=backoff_factor,
            status_forcelist=[429, 500, 502, 503, 504],
            allowed_methods=["HEAD", "GET", "OPTIONS", "POST", "PUT", "DELETE"]
        )
        adapter = HTTPAdapter(max_retries=retry_strategy)
        self.session.mount("http://", adapter)
        self.session.mount("https://", adapter)

        if user_agent:
            self.session.headers["User-Agent"] = user_agent

    def _build_url(self, endpoint: str) -> str:
        if self.base_url:
            return f"{self.base_url}/{endpoint.lstrip('/')}"
        return endpoint

    def request(
        self,
        method: str,
        url: str,
        params: Optional[Dict[str, Any]] = None,
        data: Optional[Union[Dict[str, Any], str, bytes]] = None,
        json: Optional[Any] = None,
        headers: Optional[Dict[str, str]] = None,
        cookies: Optional[Dict[str, str]] = None,
        **kwargs
    ) -> requests.Response:
        url = self._build_url(url)

        start_time = time.time()
        try:
            response = self.session.request(
                method=method.upper(),
                url=url,
                params=params,
                data=data,
                json=json,
                headers=headers,
                cookies=cookies,
                timeout=kwargs.pop("timeout", self.timeout),
                **kwargs
            )
            elapsed = time.time() - start_time
            logger.info(
                f"{method.upper()} {url} - {response.status_code} - {elapsed:.2f}s"
            )
            return response
        except requests.exceptions.RequestException as e:
            logger.error(f"Request failed: {method.upper()} {url} - {e}")
            raise

    def get(self, url: str, **kwargs) -> requests.Response:
        return self.request("GET", url, **kwargs)

    def post(self, url: str, **kwargs) -> requests.Response:
        return self.request("POST", url, **kwargs)

    def put(self, url: str, **kwargs) -> requests.Response:
        return self.request("PUT", url, **kwargs)

    def delete(self, url: str, **kwargs) -> requests.Response:
        return self.request("DELETE", url, **kwargs)

    def close(self) -> None:
        self.session.close()

    def __enter__(self) -> "HttpClient":
        return self

    def __exit__(self, exc_type, exc_val, exc_tb) -> None:
        self.close()


class SessionManager:
    def __init__(self, client: HttpClient):
        self.client = client
        self._logged_in = False

    def login(
        self,
        login_url: str,
        username: str,
        password: str,
        username_field: str = "username",
        password_field: str = "password"
    ) -> bool:
        response = self.client.post(
            login_url,
            data={
                username_field: username,
                password_field: password
            }
        )

        if response.status_code == 200:
            self._logged_in = True
            logger.info("Login successful")
            return True

        logger.warning(f"Login failed: {response.status_code}")
        return False

    def logout(self, logout_url: str) -> bool:
        if not self._logged_in:
            return True

        response = self.client.get(logout_url)
        self._logged_in = False
        return response.status_code == 200

    @property
    def cookies(self) -> Dict[str, str]:
        return dict(self.client.session.cookies)

    @property
    def is_logged_in(self) -> bool:
        return self._logged_in

29.2.2 高级请求功能

python
import json
from typing import Iterator, Generator
from contextlib import contextmanager


class AdvancedHttpClient(HttpClient):
    def download_file(
        self,
        url: str,
        save_path: str,
        chunk_size: int = 8192,
        progress_callback: Optional[callable] = None
    ) -> str:
        response = self.get(url, stream=True)
        response.raise_for_status()

        total_size = int(response.headers.get("content-length", 0))
        downloaded = 0

        with open(save_path, "wb") as f:
            for chunk in response.iter_content(chunk_size=chunk_size):
                if chunk:
                    f.write(chunk)
                    downloaded += len(chunk)
                    if progress_callback and total_size:
                        progress_callback(downloaded, total_size)

        logger.info(f"Downloaded: {save_path}")
        return save_path

    def upload_file(
        self,
        url: str,
        file_path: str,
        file_field: str = "file",
        additional_data: Optional[Dict[str, Any]] = None
    ) -> requests.Response:
        with open(file_path, "rb") as f:
            files = {file_field: (file_path.split("/")[-1], f)}
            data = additional_data or {}
            return self.post(url, files=files, data=data)

    def paginated_request(
        self,
        url: str,
        page_param: str = "page",
        start_page: int = 1,
        max_pages: Optional[int] = None,
        stop_condition: Optional[callable] = None,
        **kwargs
    ) -> Generator[requests.Response, None, None]:
        page = start_page

        while True:
            if max_pages and page > max_pages:
                break

            params = kwargs.pop("params", {})
            params[page_param] = page

            response = self.get(url, params=params, **kwargs)

            if stop_condition and stop_condition(response):
                break

            yield response
            page += 1

    def json_api_request(
        self,
        method: str,
        url: str,
        data: Optional[Dict[str, Any]] = None,
        api_key: Optional[str] = None
    ) -> Dict[str, Any]:
        headers = {"Content-Type": "application/json", "Accept": "application/json"}

        if api_key:
            headers["Authorization"] = f"Bearer {api_key}"

        response = self.request(method, url, json=data, headers=headers)
        response.raise_for_status()
        return response.json()

    @contextmanager
    def rate_limit(self, requests_per_second: float):
        min_interval = 1.0 / requests_per_second
        last_request_time = [0.0]

        original_request = self.request

        def rate_limited_request(*args, **kwargs):
            elapsed = time.time() - last_request_time[0]
            if elapsed < min_interval:
                time.sleep(min_interval - elapsed)

            result = original_request(*args, **kwargs)
            last_request_time[0] = time.time()
            return result

        self.request = rate_limited_request
        try:
            yield self
        finally:
            self.request = original_request


class ProxyManager:
    def __init__(self, proxies: Optional[List[str]] = None):
        self.proxies = proxies or []
        self._current_index = 0
        self._failed_proxies = set()

    def add_proxy(self, proxy: str) -> None:
        if proxy not in self.proxies:
            self.proxies.append(proxy)

    def get_proxy(self) -> Optional[Dict[str, str]]:
        if not self.proxies:
            return None

        attempts = 0
        while attempts < len(self.proxies):
            proxy = self.proxies[self._current_index]
            self._current_index = (self._current_index + 1) % len(self.proxies)

            if proxy not in self._failed_proxies:
                return {
                    "http": proxy,
                    "https": proxy
                }

            attempts += 1

        return None

    def mark_failed(self, proxy: str) -> None:
        self._failed_proxies.add(proxy)

    def rotate_proxy(self) -> None:
        if self.proxies:
            self._current_index = (self._current_index + 1) % len(self.proxies)


class UserAgentRotator:
    DEFAULT_USER_AGENTS = [
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
        "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:121.0) Gecko/20100101 Firefox/121.0",
        "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.2 Safari/605.1.15",
        "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
    ]

    def __init__(self, user_agents: Optional[List[str]] = None):
        self.user_agents = user_agents or self.DEFAULT_USER_AGENTS
        self._current_index = 0

    def get_user_agent(self) -> str:
        user_agent = self.user_agents[self._current_index]
        self._current_index = (self._current_index + 1) % len(self.user_agents)
        return user_agent

    def get_headers(self) -> Dict[str, str]:
        return {
            "User-Agent": self.get_user_agent(),
            "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
            "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
            "Accept-Encoding": "gzip, deflate, br",
            "Connection": "keep-alive",
        }

29.3 HTML解析

29.3.1 BeautifulSoup详解

python
from bs4 import BeautifulSoup, Tag, NavigableString
from typing import List, Optional, Union
import re


class HTMLParser:
    def __init__(self, html: str, parser: str = "html.parser"):
        self.soup = BeautifulSoup(html, parser)
        self.html = html

    def find(
        self,
        name: Optional[str] = None,
        attrs: Optional[Dict[str, Union[str, re.Pattern]]] = None,
        **kwargs
    ) -> Optional[Tag]:
        return self.soup.find(name, attrs, **kwargs)

    def find_all(
        self,
        name: Optional[str] = None,
        attrs: Optional[Dict[str, Union[str, re.Pattern]]] = None,
        limit: Optional[int] = None,
        **kwargs
    ) -> List[Tag]:
        return self.soup.find_all(name, attrs, limit=limit, **kwargs)

    def select(self, selector: str) -> List[Tag]:
        return self.soup.select(selector)

    def select_one(self, selector: str) -> Optional[Tag]:
        return self.soup.select_one(selector)

    def get_text(
        self,
        element: Optional[Tag] = None,
        separator: str = " ",
        strip: bool = True
    ) -> str:
        target = element or self.soup
        text = target.get_text(separator=separator, strip=strip)
        return " ".join(text.split())

    def get_attribute(
        self,
        element: Tag,
        attribute: str,
        default: Optional[str] = None
    ) -> Optional[str]:
        return element.get(attribute, default)

    def find_by_text(
        self,
        text: Union[str, re.Pattern],
        tag: Optional[str] = None
    ) -> List[Tag]:
        return self.soup.find_all(tag, string=text)

    def find_by_regex(
        self,
        pattern: str,
        attribute: str = None,
        tag: Optional[str] = None
    ) -> List[Tag]:
        regex = re.compile(pattern)
        if attribute:
            return self.soup.find_all(tag, attrs={attribute: regex})
        return self.soup.find_all(tag, string=regex)

    def extract_links(self, base_url: Optional[str] = None) -> List[Dict[str, str]]:
        links = []
        for a in self.find_all("a"):
            href = a.get("href")
            if href:
                if base_url and not href.startswith(("http://", "https://")):
                    href = f"{base_url.rstrip('/')}/{href.lstrip('/')}"
                links.append({
                    "url": href,
                    "text": self.get_text(a),
                    "title": a.get("title", "")
                })
        return links

    def extract_images(self, base_url: Optional[str] = None) -> List[Dict[str, str]]:
        images = []
        for img in self.find_all("img"):
            src = img.get("src") or img.get("data-src")
            if src:
                if base_url and not src.startswith(("http://", "https://")):
                    src = f"{base_url.rstrip('/')}/{src.lstrip('/')}"
                images.append({
                    "url": src,
                    "alt": img.get("alt", ""),
                    "title": img.get("title", "")
                })
        return images

    def extract_tables(self) -> List[List[List[str]]]:
        tables = []
        for table in self.find_all("table"):
            table_data = []
            for row in table.find_all("tr"):
                row_data = []
                for cell in row.find_all(["td", "th"]):
                    row_data.append(self.get_text(cell))
                if row_data:
                    table_data.append(row_data)
            if table_data:
                tables.append(table_data)
        return tables

    def extract_meta(self) -> Dict[str, str]:
        meta_data = {}
        for meta in self.find_all("meta"):
            name = meta.get("name") or meta.get("property")
            content = meta.get("content")
            if name and content:
                meta_data[name] = content
        return meta_data


class ArticleExtractor:
    def __init__(self, html: str):
        self.parser = HTMLParser(html)

    def extract(self) -> Dict[str, Any]:
        return {
            "title": self.extract_title(),
            "author": self.extract_author(),
            "publish_date": self.extract_publish_date(),
            "content": self.extract_content(),
            "summary": self.extract_summary(),
            "tags": self.extract_tags(),
            "images": self.parser.extract_images(),
        }

    def extract_title(self) -> str:
        title_selectors = [
            "h1.article-title",
            "h1.title",
            "h1",
            "title",
            'meta[property="og:title"]',
        ]

        for selector in title_selectors:
            element = self.parser.select_one(selector)
            if element:
                if element.name == "meta":
                    return element.get("content", "")
                return self.parser.get_text(element)

        return ""

    def extract_author(self) -> str:
        author_selectors = [
            '.author',
            '[rel="author"]',
            '.byline',
            'meta[name="author"]',
        ]

        for selector in author_selectors:
            element = self.parser.select_one(selector)
            if element:
                if element.name == "meta":
                    return element.get("content", "")
                return self.parser.get_text(element)

        return ""

    def extract_publish_date(self) -> str:
        date_selectors = [
            'time',
            '.publish-date',
            '.date',
            'meta[property="article:published_time"]',
            'meta[name="publish-date"]',
        ]

        for selector in date_selectors:
            element = self.parser.select_one(selector)
            if element:
                if element.name == "meta":
                    return element.get("content", "")
                if element.name == "time":
                    return element.get("datetime") or self.parser.get_text(element)
                return self.parser.get_text(element)

        return ""

    def extract_content(self) -> str:
        content_selectors = [
            'article',
            '.article-content',
            '.content',
            '.post-content',
            '.entry-content',
            'main',
        ]

        for selector in content_selectors:
            element = self.parser.select_one(selector)
            if element:
                for unwanted in element.find_all(['script', 'style', 'nav', 'footer', 'aside']):
                    unwanted.decompose()
                return self.parser.get_text(element)

        return ""

    def extract_summary(self) -> str:
        summary = self.parser.select_one('meta[property="og:description"]')
        if summary:
            return summary.get("content", "")

        summary = self.parser.select_one('meta[name="description"]')
        if summary:
            return summary.get("content", "")

        content = self.extract_content()
        return content[:200] + "..." if len(content) > 200 else content

    def extract_tags(self) -> List[str]:
        tags = []

        tag_elements = self.parser.select('.tags a, .tag, [rel="tag"]')
        for tag in tag_elements:
            tag_text = self.parser.get_text(tag)
            if tag_text:
                tags.append(tag_text)

        meta_keywords = self.parser.select_one('meta[name="keywords"]')
        if meta_keywords:
            keywords = meta_keywords.get("content", "")
            tags.extend([k.strip() for k in keywords.split(",")])

        return list(set(tags))

29.3.2 XPath解析

python
from lxml import etree
from typing import List as ListType, Any


class XPathParser:
    def __init__(self, html: str):
        self.tree = etree.HTML(html)
        self.html = html

    def xpath(self, expression: str) -> ListType[Any]:
        return self.tree.xpath(expression)

    def xpath_string(self, expression: str) -> str:
        result = self.xpath(expression)
        if result:
            if isinstance(result[0], etree._Element):
                return "".join(result[0].itertext())
            return str(result[0])
        return ""

    def xpath_strings(self, expression: str) -> ListType[str]:
        results = self.xpath(expression)
        strings = []
        for result in results:
            if isinstance(result, etree._Element):
                strings.append("".join(result.itertext()))
            else:
                strings.append(str(result))
        return strings

    def xpath_attribute(self, expression: str, attribute: str) -> ListType[str]:
        elements = self.xpath(expression)
        return [el.get(attribute, "") for el in elements if el.get(attribute)]

    def extract_links(self) -> ListType[Dict[str, str]]:
        links = []
        for a in self.xpath("//a[@href]"):
            links.append({
                "url": a.get("href"),
                "text": "".join(a.itertext()).strip(),
                "title": a.get("title", "")
            })
        return links

    def extract_table_data(self, table_xpath: str = "//table") -> ListType[ListType[ListType[str]]]:
        tables = []
        for table in self.xpath(table_xpath):
            table_data = []
            for row in table.xpath(".//tr"):
                row_data = []
                for cell in row.xpath(".//td | .//th"):
                    row_data.append("".join(cell.itertext()).strip())
                if row_data:
                    table_data.append(row_data)
            if table_data:
                tables.append(table_data)
        return tables


class NewsParser:
    def __init__(self, html: str):
        self.xpath_parser = XPathParser(html)
        self.bs_parser = HTMLParser(html)

    def parse_news_list(self, config: Dict[str, str]) -> ListType[Dict[str, str]]:
        news_list = []
        items = self.xpath_parser.xpath(config.get("item_xpath", "//div[@class='news-item']"))

        for item in items:
            news = {}

            title_xpath = config.get("title_xpath", ".//h2/a")
            title_el = item.xpath(title_xpath)
            if title_el:
                news["title"] = "".join(title_el[0].itertext()).strip()
                news["url"] = title_el[0].get("href", "")

            date_xpath = config.get("date_xpath", ".//span[@class='date']")
            date_el = item.xpath(date_xpath)
            if date_el:
                news["date"] = "".join(date_el[0].itertext()).strip()

            summary_xpath = config.get("summary_xpath", ".//p[@class='summary']")
            summary_el = item.xpath(summary_xpath)
            if summary_el:
                news["summary"] = "".join(summary_el[0].itertext()).strip()

            if news.get("title"):
                news_list.append(news)

        return news_list

29.4 爬虫框架设计

29.4.1 URL管理器

python
from urllib.parse import urljoin, urlparse, urlunparse, parse_qs, urlencode
from collections import deque
from typing import Set, Optional, List, Dict, Any
import hashlib
import threading


class URL:
    def __init__(
        self,
        url: str,
        method: str = "GET",
        params: Optional[Dict[str, Any]] = None,
        data: Optional[Dict[str, Any]] = None,
        headers: Optional[Dict[str, str]] = None,
        cookies: Optional[Dict[str, str]] = None,
        priority: int = 0,
        depth: int = 0,
        parent_url: Optional[str] = None,
        meta: Optional[Dict[str, Any]] = None
    ):
        self.raw_url = url
        self.parsed = urlparse(url)
        self.method = method.upper()
        self.params = params or {}
        self.data = data or {}
        self.headers = headers or {}
        self.cookies = cookies or {}
        self.priority = priority
        self.depth = depth
        self.parent_url = parent_url
        self.meta = meta or {}

    @property
    def url(self) -> str:
        if self.params:
            query_dict = parse_qs(self.parsed.query)
            query_dict.update(self.params)
            query = urlencode(query_dict, doseq=True)
            return urlunparse((
                self.parsed.scheme,
                self.parsed.netloc,
                self.parsed.path,
                self.parsed.params,
                query,
                self.parsed.fragment
            ))
        return self.raw_url

    @property
    def fingerprint(self) -> str:
        unique_str = f"{self.method}:{self.url}"
        return hashlib.md5(unique_str.encode()).hexdigest()

    @property
    def domain(self) -> str:
        return self.parsed.netloc

    @property
    def scheme(self) -> str:
        return self.parsed.scheme

    def is_valid(self) -> bool:
        return bool(self.parsed.scheme and self.parsed.netloc)

    def is_same_domain(self, other_url: str) -> bool:
        other_parsed = urlparse(other_url)
        return self.parsed.netloc == other_parsed.netloc

    def join(self, relative_url: str) -> "URL":
        absolute = urljoin(self.raw_url, relative_url)
        return URL(absolute, depth=self.depth + 1, parent_url=self.raw_url)

    def __repr__(self) -> str:
        return f"URL({self.url})"

    def __eq__(self, other) -> bool:
        if isinstance(other, URL):
            return self.fingerprint == other.fingerprint
        return False

    def __hash__(self) -> int:
        return hash(self.fingerprint)


class URLManager:
    def __init__(self, max_depth: int = 3, max_urls: int = 10000):
        self.max_depth = max_depth
        self.max_urls = max_urls
        self._pending: deque = deque()
        self._seen: Set[str] = set()
        self._lock = threading.Lock()
        self._total_count = 0

    def add_url(self, url: URL) -> bool:
        if not url.is_valid():
            return False

        if url.depth > self.max_depth:
            return False

        with self._lock:
            if url.fingerprint in self._seen:
                return False

            if self._total_count >= self.max_urls:
                return False

            self._pending.append(url)
            self._seen.add(url.fingerprint)
            self._total_count += 1
            return True

    def add_urls(self, urls: List[URL]) -> int:
        added = 0
        for url in urls:
            if self.add_url(url):
                added += 1
        return added

    def get_url(self) -> Optional[URL]:
        with self._lock:
            if self._pending:
                return self._pending.popleft()
            return None

    def has_pending(self) -> bool:
        return len(self._pending) > 0

    @property
    def pending_count(self) -> int:
        return len(self._pending)

    @property
    def seen_count(self) -> int:
        return len(self._seen)

    @property
    def total_count(self) -> int:
        return self._total_count

    def clear(self) -> None:
        with self._lock:
            self._pending.clear()
            self._seen.clear()
            self._total_count = 0

    def is_seen(self, url: URL) -> bool:
        return url.fingerprint in self._seen

29.4.2 请求调度器

python
import queue
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass, field
from typing import Callable, Optional, List
import time
import logging

logger = logging.getLogger(__name__)


@dataclass
class CrawlResult:
    url: URL
    response: Optional[requests.Response] = None
    error: Optional[Exception] = None
    items: List[Any] = field(default_factory=list)
    new_urls: List[URL] = field(default_factory=list)
    elapsed: float = 0.0

    @property
    def success(self) -> bool:
        return self.error is None and self.response is not None


class RequestScheduler:
    def __init__(
        self,
        http_client: HttpClient,
        url_manager: URLManager,
        max_workers: int = 5,
        request_delay: float = 0.5,
        timeout: int = 30
    ):
        self.http_client = http_client
        self.url_manager = url_manager
        self.max_workers = max_workers
        self.request_delay = request_delay
        self.timeout = timeout
        self._running = False
        self._results: queue.Queue = queue.Queue()
        self._callbacks: List[Callable] = []

    def add_callback(self, callback: Callable) -> None:
        self._callbacks.append(callback)

    def _fetch(self, url: URL) -> CrawlResult:
        start_time = time.time()

        try:
            time.sleep(self.request_delay)

            response = self.http_client.request(
                method=url.method,
                url=url.url,
                headers=url.headers,
                cookies=url.cookies,
                params=url.params if url.method == "GET" else None,
                data=url.data if url.method == "POST" else None,
                timeout=self.timeout
            )

            result = CrawlResult(
                url=url,
                response=response,
                elapsed=time.time() - start_time
            )

            for callback in self._callbacks:
                try:
                    callback(result)
                except Exception as e:
                    logger.error(f"Callback error: {e}")

            return result

        except Exception as e:
            logger.error(f"Fetch error: {url.url} - {e}")
            return CrawlResult(
                url=url,
                error=e,
                elapsed=time.time() - start_time
            )

    def crawl(
        self,
        start_urls: List[URL],
        parser: Callable[[CrawlResult], tuple],
        max_requests: Optional[int] = None
    ) -> List[Any]:
        self.url_manager.add_urls(start_urls)
        all_items = []
        request_count = 0

        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            futures = {}

            while self.url_manager.has_pending() or futures:
                while self.url_manager.has_pending() and len(futures) < self.max_workers:
                    if max_requests and request_count >= max_requests:
                        break

                    url = self.url_manager.get_url()
                    if url:
                        future = executor.submit(self._fetch, url)
                        futures[future] = url
                        request_count += 1

                if not futures:
                    break

                done_futures = []
                for future in as_completed(futures, timeout=1.0):
                    done_futures.append(future)

                for future in done_futures:
                    url = futures.pop(future)
                    try:
                        result = future.result()
                        self._results.put(result)

                        if result.success and parser:
                            items, new_urls = parser(result)
                            all_items.extend(items)

                            for new_url in new_urls:
                                new_url.depth = url.depth + 1
                                self.url_manager.add_url(new_url)

                    except Exception as e:
                        logger.error(f"Future error: {e}")

        return all_items

    def stop(self) -> None:
        self._running = False


class RateLimiter:
    def __init__(self, requests_per_second: float):
        self.min_interval = 1.0 / requests_per_second
        self._last_request_time = 0.0
        self._lock = threading.Lock()

    def wait(self) -> None:
        with self._lock:
            now = time.time()
            elapsed = now - self._last_request_time

            if elapsed < self.min_interval:
                time.sleep(self.min_interval - elapsed)

            self._last_request_time = time.time()

29.4.3 数据管道

python
import json
import csv
from pathlib import Path
from typing import Any, Dict, List, Protocol
from abc import ABC, abstractmethod
import sqlite3
from datetime import datetime


class Pipeline(Protocol):
    def process_item(self, item: Dict[str, Any]) -> Dict[str, Any]: ...
    def close(self) -> None: ...


class JSONPipeline:
    def __init__(self, output_file: str = "output.json", encoding: str = "utf-8"):
        self.output_file = Path(output_file)
        self.encoding = encoding
        self.items: List[Dict[str, Any]] = []

    def process_item(self, item: Dict[str, Any]) -> Dict[str, Any]:
        item["scraped_at"] = datetime.now().isoformat()
        self.items.append(item)
        return item

    def close(self) -> None:
        with open(self.output_file, "w", encoding=self.encoding) as f:
            json.dump(self.items, f, ensure_ascii=False, indent=2)
        logger.info(f"Saved {len(self.items)} items to {self.output_file}")


class CSVPipeline:
    def __init__(
        self,
        output_file: str = "output.csv",
        encoding: str = "utf-8",
        fieldnames: Optional[List[str]] = None
    ):
        self.output_file = Path(output_file)
        self.encoding = encoding
        self.fieldnames = fieldnames
        self._file = None
        self._writer = None
        self._initialized = False

    def process_item(self, item: Dict[str, Any]) -> Dict[str, Any]:
        if not self._initialized:
            self._file = open(self.output_file, "w", encoding=self.encoding, newline="")
            if self.fieldnames is None:
                self.fieldnames = list(item.keys())
            self._writer = csv.DictWriter(self._file, fieldnames=self.fieldnames)
            self._writer.writeheader()
            self._initialized = True

        self._writer.writerow(item)
        return item

    def close(self) -> None:
        if self._file:
            self._file.close()
            logger.info(f"Saved items to {self.output_file}")


class SQLitePipeline:
    def __init__(
        self,
        db_path: str = "crawl.db",
        table_name: str = "items"
    ):
        self.db_path = db_path
        self.table_name = table_name
        self._conn = None
        self._cursor = None
        self._initialized = False

    def _init_db(self, item: Dict[str, Any]) -> None:
        self._conn = sqlite3.connect(self.db_path)
        self._cursor = self._conn.cursor()

        columns = []
        for key, value in item.items():
            if isinstance(value, int):
                col_type = "INTEGER"
            elif isinstance(value, float):
                col_type = "REAL"
            else:
                col_type = "TEXT"
            columns.append(f"{key} {col_type}")

        columns_sql = ", ".join(columns)
        self._cursor.execute(f"""
            CREATE TABLE IF NOT EXISTS {self.table_name} (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                {columns_sql}
            )
        """)
        self._conn.commit()
        self._initialized = True

    def process_item(self, item: Dict[str, Any]) -> Dict[str, Any]:
        if not self._initialized:
            self._init_db(item)

        columns = ", ".join(item.keys())
        placeholders = ", ".join(["?" for _ in item])
        values = tuple(item.values())

        self._cursor.execute(
            f"INSERT INTO {self.table_name} ({columns}) VALUES ({placeholders})",
            values
        )
        self._conn.commit()
        return item

    def close(self) -> None:
        if self._conn:
            self._conn.close()
            logger.info(f"Saved items to {self.db_path}")


class DuplicateFilterPipeline:
    def __init__(self, key_field: str = "url"):
        self.key_field = key_field
        self._seen: set = set()

    def process_item(self, item: Dict[str, Any]) -> Dict[str, Any]:
        key = item.get(self.key_field)
        if key in self._seen:
            raise DropItem(f"Duplicate item: {key}")
        self._seen.add(key)
        return item


class DropItem(Exception):
    pass


class PipelineManager:
    def __init__(self, pipelines: List[Pipeline]):
        self.pipelines = pipelines

    def process_item(self, item: Dict[str, Any]) -> Dict[str, Any]:
        for pipeline in self.pipelines:
            try:
                item = pipeline.process_item(item)
            except DropItem:
                raise
            except Exception as e:
                logger.error(f"Pipeline error: {e}")
        return item

    def close(self) -> None:
        for pipeline in self.pipelines:
            try:
                pipeline.close()
            except Exception as e:
                logger.error(f"Pipeline close error: {e}")

29.5 Scrapy框架

29.5.1 Spider开发

python
import scrapy
from scrapy import Spider, Request
from scrapy.http import Response
from scrapy.item import Item, Field
from typing import List, Dict, Any, Generator
import re


class NewsItem(scrapy.Item):
    title = Field()
    url = Field()
    author = Field()
    publish_date = Field()
    content = Field()
    summary = Field()
    tags = Field()
    images = Field()


class BaseSpider(Spider):
    custom_settings = {
        "CONCURRENT_REQUESTS": 8,
        "DOWNLOAD_DELAY": 0.5,
        "RANDOMIZE_DOWNLOAD_DELAY": True,
        "COOKIES_ENABLED": False,
    }

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.stats = {
            "pages_crawled": 0,
            "items_scraped": 0,
            "errors": 0
        }

    def closed(self, reason: str) -> None:
        self.logger.info(f"Spider closed: {reason}")
        self.logger.info(f"Stats: {self.stats}")


class NewsSpider(BaseSpider):
    name = "news"
    allowed_domains = ["example.com"]
    start_urls = ["https://example.com/news"]

    def parse(self, response: Response) -> Generator:
        self.stats["pages_crawled"] += 1

        for article in response.css("article.news-item"):
            url = article.css("a::attr(href)").get()
            if url:
                yield response.follow(
                    url,
                    callback=self.parse_article,
                    meta={"depth": response.meta.get("depth", 0) + 1}
                )

        next_page = response.css("a.next-page::attr(href)").get()
        if next_page:
            yield response.follow(next_page, callback=self.parse)

    def parse_article(self, response: Response) -> Generator:
        self.stats["pages_crawled"] += 1

        item = NewsItem()
        item["url"] = response.url
        item["title"] = response.css("h1.title::text").get(default="").strip()
        item["author"] = response.css(".author::text").get(default="").strip()
        item["publish_date"] = response.css("time::attr(datetime)").get(default="")
        item["content"] = " ".join(response.css(".content p::text").getall())
        item["summary"] = response.css("meta[name=description]::attr(content)").get(default="")
        item["tags"] = response.css(".tags a::text").getall()
        item["images"] = response.css(".content img::attr(src)").getall()

        self.stats["items_scraped"] += 1
        yield item

29.5.2 中间件开发

python
from scrapy import signals
from scrapy.http import Request, Response
from scrapy.exceptions import IgnoreRequest
import random
import time
import logging

logger = logging.getLogger(__name__)


class UserAgentMiddleware:
    def __init__(self, user_agents: List[str]):
        self.user_agents = user_agents

    @classmethod
    def from_crawler(cls, crawler):
        user_agents = crawler.settings.getlist("USER_AGENTS", [])
        return cls(user_agents)

    def process_request(self, request: Request, spider) -> None:
        if self.user_agents:
            request.headers["User-Agent"] = random.choice(self.user_agents)


class ProxyMiddleware:
    def __init__(self, proxies: List[str], rotate_enabled: bool = True):
        self.proxies = proxies
        self.rotate_enabled = rotate_enabled
        self.current_index = 0

    @classmethod
    def from_crawler(cls, crawler):
        proxies = crawler.settings.getlist("PROXIES", [])
        rotate_enabled = crawler.settings.getbool("PROXY_ROTATE_ENABLED", True)
        return cls(proxies, rotate_enabled)

    def process_request(self, request: Request, spider) -> None:
        if not self.proxies:
            return

        proxy = self._get_proxy()
        request.meta["proxy"] = proxy
        logger.debug(f"Using proxy: {proxy}")

    def _get_proxy(self) -> str:
        if self.rotate_enabled:
            proxy = self.proxies[self.current_index]
            self.current_index = (self.current_index + 1) % len(self.proxies)
            return proxy
        return random.choice(self.proxies)

    def process_exception(self, request: Request, exception, spider):
        if "proxy" in request.meta:
            proxy = request.meta["proxy"]
            if proxy in self.proxies:
                logger.warning(f"Removing failed proxy: {proxy}")
                self.proxies.remove(proxy)


class RetryMiddleware:
    def __init__(
        self,
        max_retry_times: int = 3,
        retry_http_codes: List[int] = None,
        retry_delay: float = 1.0
    ):
        self.max_retry_times = max_retry_times
        self.retry_http_codes = retry_http_codes or [500, 502, 503, 504, 408, 429]
        self.retry_delay = retry_delay

    @classmethod
    def from_crawler(cls, crawler):
        return cls(
            max_retry_times=crawler.settings.getint("RETRY_TIMES", 3),
            retry_http_codes=crawler.settings.getlist("RETRY_HTTP_CODES"),
            retry_delay=crawler.settings.getfloat("RETRY_DELAY", 1.0)
        )

    def process_response(self, request: Request, response: Response, spider):
        if request.meta.get("dont_retry", False):
            return response

        if response.status in self.retry_http_codes:
            return self._retry(request, spider) or response

        return response

    def process_exception(self, request: Request, exception, spider):
        if request.meta.get("dont_retry", False):
            return None

        return self._retry(request, spider)

    def _retry(self, request: Request, spider) -> Request:
        retry_times = request.meta.get("retry_times", 0) + 1

        if retry_times > self.max_retry_times:
            logger.error(f"Gave up retrying {request.url} (failed {retry_times} times)")
            return None

        logger.warning(f"Retrying {request.url} (attempt {retry_times}/{self.max_retry_times})")

        retryreq = request.copy()
        retryreq.meta["retry_times"] = retry_times
        retryreq.dont_filter = True
        retryreq.priority = request.priority - 10

        time.sleep(self.retry_delay * retry_times)

        return retryreq

29.6 异步爬虫

29.6.1 aiohttp异步请求

python
import aiohttp
import asyncio
from typing import List, Dict, Any, Optional, AsyncGenerator
from dataclasses import dataclass
import time
import logging

logger = logging.getLogger(__name__)


@dataclass
class AsyncRequest:
    url: str
    method: str = "GET"
    headers: Dict[str, str] = None
    params: Dict[str, Any] = None
    data: Dict[str, Any] = None
    timeout: int = 30

    def __post_init__(self):
        self.headers = self.headers or {}
        self.params = self.params or {}
        self.data = self.data or {}


@dataclass
class AsyncResponse:
    url: str
    status: int
    headers: Dict[str, str]
    text: str
    json_data: Any
    elapsed: float


class AsyncHttpClient:
    def __init__(
        self,
        max_connections: int = 100,
        max_per_host: int = 10,
        timeout: int = 30,
        user_agent: str = None
    ):
        self.max_connections = max_connections
        self.max_per_host = max_per_host
        self.timeout = aiohttp.ClientTimeout(total=timeout)
        self.user_agent = user_agent or "AsyncCrawler/1.0"
        self._session: Optional[aiohttp.ClientSession] = None

    async def __aenter__(self) -> "AsyncHttpClient":
        connector = aiohttp.TCPConnector(
            limit=self.max_connections,
            limit_per_host=self.max_per_host
        )
        headers = {"User-Agent": self.user_agent}
        self._session = aiohttp.ClientSession(
            connector=connector,
            timeout=self.timeout,
            headers=headers
        )
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
        if self._session:
            await self._session.close()

    async def request(self, req: AsyncRequest) -> AsyncResponse:
        start_time = time.time()

        try:
            async with self._session.request(
                method=req.method,
                url=req.url,
                headers=req.headers,
                params=req.params,
                data=req.data
            ) as response:
                text = await response.text()
                try:
                    json_data = await response.json()
                except:
                    json_data = None

                elapsed = time.time() - start_time
                logger.info(f"{req.method} {req.url} - {response.status} - {elapsed:.2f}s")

                return AsyncResponse(
                    url=str(response.url),
                    status=response.status,
                    headers=dict(response.headers),
                    text=text,
                    json_data=json_data,
                    elapsed=elapsed
                )

        except asyncio.TimeoutError:
            logger.error(f"Timeout: {req.url}")
            raise
        except aiohttp.ClientError as e:
            logger.error(f"Request error: {req.url} - {e}")
            raise

    async def get(self, url: str, **kwargs) -> AsyncResponse:
        return await self.request(AsyncRequest(url=url, method="GET", **kwargs))

    async def post(self, url: str, **kwargs) -> AsyncResponse:
        return await self.request(AsyncRequest(url=url, method="POST", **kwargs))


class AsyncCrawler:
    def __init__(
        self,
        max_concurrent: int = 10,
        request_delay: float = 0.1
    ):
        self.max_concurrent = max_concurrent
        self.request_delay = request_delay
        self._semaphore: Optional[asyncio.Semaphore] = None

    async def crawl(
        self,
        urls: List[str],
        parser: callable,
        max_concurrent: int = None
    ) -> List[Any]:
        self._semaphore = asyncio.Semaphore(max_concurrent or self.max_concurrent)

        async with AsyncHttpClient() as client:
            tasks = [self._crawl_one(client, url, parser) for url in urls]
            results = await asyncio.gather(*tasks, return_exceptions=True)

        return [r for r in results if not isinstance(r, Exception)]

    async def _crawl_one(
        self,
        client: AsyncHttpClient,
        url: str,
        parser: callable
    ) -> Any:
        async with self._semaphore:
            await asyncio.sleep(self.request_delay)
            response = await client.get(url)
            return parser(response)


async def async_crawl_example():
    urls = [
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/2",
        "https://httpbin.org/delay/3",
    ]

    def parser(response: AsyncResponse) -> Dict[str, Any]:
        return {
            "url": response.url,
            "status": response.status,
            "elapsed": response.elapsed
        }

    crawler = AsyncCrawler(max_concurrent=5)
    results = await crawler.crawl(urls, parser)

    for result in results:
        print(result)


if __name__ == "__main__":
    asyncio.run(async_crawl_example())

29.7 反爬机制应对

29.7.1 常见反爬策略

python
import random
import time
from typing import List, Dict
import hashlib


class AntiDetection:
    @staticmethod
    def generate_browser_headers() -> Dict[str, str]:
        accept_languages = [
            "zh-CN,zh;q=0.9,en;q=0.8",
            "en-US,en;q=0.9",
            "zh-TW,zh;q=0.9,en;q=0.8",
        ]

        accept_encodings = ["gzip, deflate, br", "gzip, deflate"]

        return {
            "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8",
            "Accept-Language": random.choice(accept_languages),
            "Accept-Encoding": random.choice(accept_encodings),
            "Connection": "keep-alive",
            "Upgrade-Insecure-Requests": "1",
            "Sec-Fetch-Dest": "document",
            "Sec-Fetch-Mode": "navigate",
            "Sec-Fetch-Site": "none",
            "Sec-Fetch-User": "?1",
            "Cache-Control": "max-age=0",
        }

    @staticmethod
    def random_delay(min_delay: float = 0.5, max_delay: float = 2.0) -> None:
        delay = random.uniform(min_delay, max_delay)
        time.sleep(delay)

    @staticmethod
    def generate_fingerprint() -> Dict[str, str]:
        canvas_hash = hashlib.md5(str(random.random()).encode()).hexdigest()
        webgl_hash = hashlib.md5(str(random.random()).encode()).hexdigest()

        return {
            "canvas": canvas_hash,
            "webgl": webgl_hash,
            "timezone": str(random.randint(-12, 12)),
            "screen_resolution": f"{random.randint(1024, 1920)}x{random.randint(768, 1080)}",
        }


class CookieManager:
    def __init__(self):
        self._cookies: Dict[str, Dict[str, str]] = {}

    def add_cookies(self, domain: str, cookies: Dict[str, str]) -> None:
        if domain not in self._cookies:
            self._cookies[domain] = {}
        self._cookies[domain].update(cookies)

    def get_cookies(self, domain: str) -> Dict[str, str]:
        return self._cookies.get(domain, {})

    def clear_cookies(self, domain: str = None) -> None:
        if domain:
            self._cookies.pop(domain, None)
        else:
            self._cookies.clear()


class SessionPool:
    def __init__(self, pool_size: int = 5):
        self.pool_size = pool_size
        self._sessions: List[HttpClient] = []
        self._current_index = 0

    def initialize(self) -> None:
        for _ in range(self.pool_size):
            client = HttpClient(user_agent=UserAgentRotator().get_user_agent())
            self._sessions.append(client)

    def get_session(self) -> HttpClient:
        session = self._sessions[self._current_index]
        self._current_index = (self._current_index + 1) % self.pool_size
        return session

    def close_all(self) -> None:
        for session in self._sessions:
            session.close()
        self._sessions.clear()

29.8 爬虫伦理与合规

29.8.1 robots.txt解析

python
from urllib.robotparser import RobotFileParser
from urllib.parse import urlparse
from typing import Dict, List
import time


class RobotsParser:
    def __init__(self, user_agent: str = "*"):
        self.user_agent = user_agent
        self._parsers: Dict[str, RobotFileParser] = {}

    def can_fetch(self, url: str) -> bool:
        parsed = urlparse(url)
        domain = f"{parsed.scheme}://{parsed.netloc}"

        if domain not in self._parsers:
            rp = RobotFileParser()
            rp.set_url(f"{domain}/robots.txt")
            try:
                rp.read()
                self._parsers[domain] = rp
            except Exception:
                return True

        return self._parsers[domain].can_fetch(self.user_agent, url)

    def get_crawl_delay(self, url: str) -> float:
        parsed = urlparse(url)
        domain = f"{parsed.scheme}://{parsed.netloc}"

        if domain in self._parsers:
            delay = self._parsers[domain].crawl_delay(self.user_agent)
            if delay:
                return float(delay)

        return 0.0

    def get_sitemaps(self, url: str) -> List[str]:
        parsed = urlparse(url)
        domain = f"{parsed.scheme}://{parsed.netloc}"

        if domain in self._parsers:
            return list(self._parsers[domain].site_maps() or [])

        return []


class CrawlDelay:
    def __init__(self, default_delay: float = 1.0):
        self.default_delay = default_delay
        self._domain_delays: Dict[str, float] = {}
        self._last_request: Dict[str, float] = {}

    def set_delay(self, domain: str, delay: float) -> None:
        self._domain_delays[domain] = delay

    def wait(self, domain: str) -> None:
        delay = self._domain_delays.get(domain, self.default_delay)

        if domain in self._last_request:
            elapsed = time.time() - self._last_request[domain]
            if elapsed < delay:
                time.sleep(delay - elapsed)

        self._last_request[domain] = time.time()


class EthicalCrawler:
    def __init__(
        self,
        user_agent: str = "MyCrawler/1.0",
        default_delay: float = 1.0,
        respect_robots: bool = True
    ):
        self.user_agent = user_agent
        self.robots_parser = RobotsParser(user_agent)
        self.crawl_delay = CrawlDelay(default_delay)
        self.respect_robots = respect_robots

    def can_crawl(self, url: str) -> bool:
        if not self.respect_robots:
            return True
        return self.robots_parser.can_fetch(url)

    def crawl(self, url: str) -> Optional[requests.Response]:
        if not self.can_crawl(url):
            print(f"Blocked by robots.txt: {url}")
            return None

        domain = urlparse(url).netloc
        self.crawl_delay.wait(domain)

        try:
            response = requests.get(url, headers={"User-Agent": self.user_agent})
            return response
        except Exception as e:
            print(f"Crawl error: {e}")
            return None

29.9 知识图谱

29.9.1 爬虫技术体系

Python爬虫技术体系

┌─────────────────────────────────────────────────────────────┐
│                    爬虫架构层次                             │
├─────────────────────────────────────────────────────────────┤
│  URL管理器    → 请求调度器 → 下载器 → 解析器 → 数据存储    │
└─────────────────────────────────────────────────────────────┘

核心组件:
┌─────────────────────────────────────────┐
│ URL管理器    去重、队列、优先级         │
│ 下载器       HTTP请求、重试、代理       │
│ 解析器       HTML/XML/JSON解析          │
│ 数据存储     文件、数据库、消息队列     │
└─────────────────────────────────────────┘

技术栈:
┌─────────────────────────────────────────┐
│ HTTP客户端:  requests, aiohttp, httpx   │
│ HTML解析:    BeautifulSoup, lxml        │
│ 框架:        Scrapy, PySpider           │
│ 异步:        asyncio, uvloop            │
│ 存储:        MongoDB, Redis, MySQL      │
└─────────────────────────────────────────┘

29.9.2 Scrapy架构

Scrapy框架架构

┌─────────────────────────────────────────────────────────────┐
│                    Scrapy Engine                           │
│  核心引擎,协调各组件                                       │
└─────────────────────────────────────────────────────────────┘

        ├──▶ Scheduler (调度器)
        │    URL队列管理

        ├──▶ Downloader (下载器)
        │    HTTP请求

        ├──▶ Spider (爬虫)
        │    解析逻辑

        ├──▶ Item Pipeline (管道)
        │    数据处理

        └──▶ Middleware (中间件)
             请求/响应处理

29.9.3 反爬策略与应对

反爬策略与应对方案

┌─────────────────────────────────────────┐
│ 反爬策略        │ 应对方案              │
├─────────────────────────────────────────┤
│ User-Agent检测  │ UA池轮换              │
│ IP限制          │ 代理池                │
│ 频率限制        │ 请求延迟              │
│ Cookie验证      │ 会话管理              │
│ 验证码          │ OCR/打码平台          │
│ JavaScript渲染  │ Selenium/Playwright   │
│ 字体反爬        │ 字体解析              │
│ 动态Token       │ 逆向分析              │
└─────────────────────────────────────────┘

29.10 技术选型指南

29.10.1 爬虫框架选型

场景推荐方案原因
简单爬虫requests + BeautifulSoup简单直接
中型项目Scrapy功能完整
高并发Scrapy + aiohttp异步高效
动态页面Playwright支持JS渲染
分布式Scrapy-Redis分布式支持

29.10.2 解析工具选型

场景推荐工具原因
简单解析BeautifulSoup易用性好
高性能lxml速度快
复杂选择XPath表达力强
JSON APIjsonpath专用工具

29.10.3 存储方案选型

数据量推荐方案原因
小数据JSON/CSV简单直接
中数据SQLite轻量数据库
大数据MongoDB灵活存储
分布式Redis + MySQL高性能

29.11 常见问题与解决方案

29.11.1 编码问题

python
# 问题:网页编码错误
response = requests.get(url)
# response.text 乱码

# 解决方案:自动检测编码
response.encoding = response.apparent_encoding
text = response.text

# 或使用chardet
import chardet
encoding = chardet.detect(response.content)['encoding']
text = response.content.decode(encoding)

29.11.2 反爬被封

python
# 问题:IP被封
# 解决方案:使用代理池

import random

proxies_pool = [
    {'http': 'http://proxy1:8080'},
    {'http': 'http://proxy2:8080'},
]

def get_with_proxy(url):
    proxy = random.choice(proxies_pool)
    return requests.get(url, proxies=proxy, timeout=10)

29.11.3 动态内容

python
# 问题:JavaScript渲染的内容无法获取
# 解决方案:使用Playwright

from playwright.sync_api import sync_playwright

def get_dynamic_content(url):
    with sync_playwright() as p:
        browser = p.chromium.launch()
        page = browser.new_page()
        page.goto(url)
        content = page.content()
        browser.close()
        return content

29.12 本章小结

本章详细介绍了Python网络爬虫的核心概念和实践:

  1. HTTP基础:理解请求/响应模型、状态码、Headers
  2. requests库:会话管理、重试机制、代理支持
  3. HTML解析:BeautifulSoup和XPath两种解析方式
  4. 爬虫框架:URL管理、请求调度、数据管道
  5. Scrapy框架:Spider、Middleware、Pipeline组件
  6. 异步爬虫:aiohttp实现高性能并发爬取
  7. 反爬应对:User-Agent轮换、代理、验证码处理
  8. 爬虫伦理:遵守robots.txt、控制请求频率

练习题

  1. 编写一个爬虫,爬取新闻网站的标题和内容
  2. 使用Scrapy框架实现一个电商商品爬虫
  3. 实现一个支持增量爬取的爬虫系统
  4. 编写一个异步爬虫,并发爬取多个API接口
  5. 设计一个分布式爬虫架构

扩展阅读

Python技术丛书 - 江苏省宿城中等专业学校