Skip to content

第34章 自然语言处理

学习目标

完成本章学习后,你将能够:

  1. 理解NLP基础:文本预处理、分词、词性标注
  2. 掌握文本处理技术:停用词过滤、词干提取、词形还原
  3. 实现文本表示:词袋模型、TF-IDF、词向量
  4. 进行文本分类:情感分析、主题分类、文本聚类
  5. 实现命名实体识别:人名、地名、组织名识别
  6. 应用文本相似度:余弦相似度、编辑距离、语义相似度
  7. 构建文本生成:语言模型、文本摘要
  8. 开发NLP应用:聊天机器人、问答系统

34.1 文本预处理

34.1.1 文本清洗

python
import re
import string
from typing import List, Optional, Dict, Tuple, Set
from dataclasses import dataclass
from collections import Counter
import unicodedata


@dataclass
class TextStats:
    char_count: int
    word_count: int
    sentence_count: int
    avg_word_length: float
    avg_sentence_length: float
    unique_words: int
    vocabulary_richness: float


class TextCleaner:
    @staticmethod
    def to_lowercase(text: str) -> str:
        return text.lower()

    @staticmethod
    def remove_punctuation(text: str) -> str:
        return text.translate(str.maketrans("", "", string.punctuation))

    @staticmethod
    def remove_digits(text: str) -> str:
        return re.sub(r"\d+", "", text)

    @staticmethod
    def remove_whitespace(text: str) -> str:
        return " ".join(text.split())

    @staticmethod
    def remove_urls(text: str) -> str:
        return re.sub(r"http\S+|www\.\S+", "", text)

    @staticmethod
    def remove_emails(text: str) -> str:
        return re.sub(r"\S+@\S+", "", text)

    @staticmethod
    def remove_html_tags(text: str) -> str:
        return re.sub(r"<[^>]+>", "", text)

    @staticmethod
    def remove_emojis(text: str) -> str:
        emoji_pattern = re.compile(
            "["
            "\U0001F600-\U0001F64F"
            "\U0001F300-\U0001F5FF"
            "\U0001F680-\U0001F6FF"
            "\U0001F1E0-\U0001F1FF"
            "\U00002702-\U000027B0"
            "\U000024C2-\U0001F251"
            "]+",
            flags=re.UNICODE
        )
        return emoji_pattern.sub("", text)

    @staticmethod
    def remove_special_chars(text: str, keep: str = "") -> str:
        pattern = f"[^{re.escape(keep)}a-zA-Z0-9\s]"
        return re.sub(pattern, "", text)

    @staticmethod
    def normalize_unicode(text: str, form: str = "NFKC") -> str:
        return unicodedata.normalize(form, text)

    @staticmethod
    def expand_contractions(text: str) -> str:
        contractions = {
            "won't": "will not",
            "can't": "cannot",
            "n't": " not",
            "'re": " are",
            "'s": " is",
            "'d": " would",
            "'ll": " will",
            "'ve": " have",
            "'m": " am"
        }
        for contraction, expansion in contractions.items():
            text = text.replace(contraction, expansion)
        return text

    @staticmethod
    def clean_all(text: str) -> str:
        text = TextCleaner.remove_html_tags(text)
        text = TextCleaner.remove_urls(text)
        text = TextCleaner.remove_emails(text)
        text = TextCleaner.remove_emojis(text)
        text = TextCleaner.normalize_unicode(text)
        text = TextCleaner.expand_contractions(text)
        text = TextCleaner.remove_punctuation(text)
        text = TextCleaner.remove_digits(text)
        text = TextCleaner.remove_whitespace(text)
        text = TextCleaner.to_lowercase(text)
        return text


class TextStatistics:
    @staticmethod
    def get_stats(text: str) -> TextStats:
        words = text.split()
        sentences = re.split(r"[.!?]+", text)
        sentences = [s.strip() for s in sentences if s.strip()]

        char_count = len(text)
        word_count = len(words)
        sentence_count = len(sentences)

        avg_word_length = sum(len(w) for w in words) / word_count if word_count > 0 else 0
        avg_sentence_length = word_count / sentence_count if sentence_count > 0 else 0

        unique_words = len(set(w.lower() for w in words))
        vocabulary_richness = unique_words / word_count if word_count > 0 else 0

        return TextStats(
            char_count=char_count,
            word_count=word_count,
            sentence_count=sentence_count,
            avg_word_length=avg_word_length,
            avg_sentence_length=avg_sentence_length,
            unique_words=unique_words,
            vocabulary_richness=vocabulary_richness
        )

    @staticmethod
    def word_frequency(text: str, top_n: int = 10) -> Dict[str, int]:
        words = text.lower().split()
        counter = Counter(words)
        return dict(counter.most_common(top_n))

    @staticmethod
    def ngram_frequency(text: str, n: int = 2, top_n: int = 10) -> Dict[str, int]:
        words = text.lower().split()
        ngrams = [" ".join(words[i:i + n]) for i in range(len(words) - n + 1)]
        counter = Counter(ngrams)
        return dict(counter.most_common(top_n))

34.1.2 分词

python
class Tokenizer:
    @staticmethod
    def word_tokenize(text: str) -> List[str]:
        pattern = r"\b\w+\b"
        return re.findall(pattern, text.lower())

    @staticmethod
    def sentence_tokenize(text: str) -> List[str]:
        sentences = re.split(r"(?<=[.!?])\s+", text)
        return [s.strip() for s in sentences if s.strip()]

    @staticmethod
    def char_tokenize(text: str) -> List[str]:
        return list(text)

    @staticmethod
    def word_tokenize_with_punctuation(text: str) -> List[str]:
        pattern = r"\w+|[^\w\s]"
        return re.findall(pattern, text)

    @staticmethod
    def tokenize_by_regex(text: str, pattern: str) -> List[str]:
        return re.findall(pattern, text)


class ChineseTokenizer:
    @staticmethod
    def tokenize_jieba(text: str) -> List[str]:
        try:
            import jieba
            return list(jieba.cut(text))
        except ImportError:
            return list(text)

    @staticmethod
    def tokenize_jieba_pos(text: str) -> List[Tuple[str, str]]:
        try:
            import jieba.posseg as pseg
            return [(word, flag) for word, flag in pseg.cut(text)]
        except ImportError:
            return [(char, "x") for char in text]

    @staticmethod
    def tokenize_by_char(text: str) -> List[str]:
        return list(text)


class TokenFilter:
    def __init__(self):
        self._stopwords: Set[str] = set()
        self._min_length: int = 1
        self._max_length: int = 100

    def load_stopwords(self, filepath: str) -> None:
        with open(filepath, "r", encoding="utf-8") as f:
            self._stopwords = set(line.strip().lower() for line in f)

    def set_stopwords(self, stopwords: Set[str]) -> None:
        self._stopwords = stopwords

    def set_length_filter(self, min_length: int, max_length: int) -> None:
        self._min_length = min_length
        self._max_length = max_length

    def filter_tokens(self, tokens: List[str]) -> List[str]:
        filtered = []
        for token in tokens:
            if token.lower() in self._stopwords:
                continue
            if len(token) < self._min_length or len(token) > self._max_length:
                continue
            filtered.append(token)
        return filtered

    def filter_stopwords(self, tokens: List[str]) -> List[str]:
        return [t for t in tokens if t.lower() not in self._stopwords]

    def filter_by_length(self, tokens: List[str], min_len: int = 1, max_len: int = 100) -> List[str]:
        return [t for t in tokens if min_len <= len(t) <= max_len]

    def filter_digits(self, tokens: List[str]) -> List[str]:
        return [t for t in tokens if not t.isdigit()]

    def filter_punctuation_tokens(self, tokens: List[str]) -> List[str]:
        return [t for t in tokens if t.isalnum()]

34.1.3 词干提取与词形还原

python
class Stemmer:
    def __init__(self):
        try:
            from nltk.stem import PorterStemmer, LancasterStemmer, SnowballStemmer
            self._porter = PorterStemmer()
            self._lancaster = LancasterStemmer()
            self._snowball = SnowballStemmer("english")
            self._available = True
        except ImportError:
            self._available = False

    def porter_stem(self, word: str) -> str:
        if self._available:
            return self._porter.stem(word)
        return word

    def lancaster_stem(self, word: str) -> str:
        if self._available:
            return self._lancaster.stem(word)
        return word

    def snowball_stem(self, word: str) -> str:
        if self._available:
            return self._snowball.stem(word)
        return word

    def stem_tokens(self, tokens: List[str], method: str = "porter") -> List[str]:
        stem_func = {
            "porter": self.porter_stem,
            "lancaster": self.lancaster_stem,
            "snowball": self.snowball_stem
        }.get(method, self.porter_stem)

        return [stem_func(token) for token in tokens]


class Lemmatizer:
    def __init__(self):
        try:
            from nltk.stem import WordNetLemmatizer
            self._lemmatizer = WordNetLemmatizer()
            self._available = True
        except ImportError:
            self._available = False

    def lemmatize(self, word: str, pos: str = "n") -> str:
        if self._available:
            return self._lemmatizer.lemmatize(word, pos)
        return word

    def lemmatize_tokens(self, tokens: List[str], pos: str = "n") -> List[str]:
        return [self.lemmatize(token, pos) for token in tokens]

    def lemmatize_with_pos(self, tokens: List[Tuple[str, str]]) -> List[str]:
        pos_map = {
            "NN": "n", "NNS": "n", "NNP": "n", "NNPS": "n",
            "VB": "v", "VBD": "v", "VBG": "v", "VBN": "v", "VBP": "v", "VBZ": "v",
            "JJ": "a", "JJR": "a", "JJS": "a",
            "RB": "r", "RBR": "r", "RBS": "r"
        }
        return [
            self.lemmatize(word, pos_map.get(pos, "n"))
            for word, pos in tokens
        ]


class POSProcessor:
    def __init__(self):
        try:
            import nltk
            nltk.download("punkt", quiet=True)
            nltk.download("averaged_perceptron_tagger", quiet=True)
            self._available = True
        except ImportError:
            self._available = False

    def pos_tag(self, tokens: List[str]) -> List[Tuple[str, str]]:
        if self._available:
            import nltk
            return nltk.pos_tag(tokens)
        return [(token, "UNK") for token in tokens]

    def get_nouns(self, tagged: List[Tuple[str, str]]) -> List[str]:
        return [word for word, pos in tagged if pos.startswith("NN")]

    def get_verbs(self, tagged: List[Tuple[str, str]]) -> List[str]:
        return [word for word, pos in tagged if pos.startswith("VB")]

    def get_adjectives(self, tagged: List[Tuple[str, str]]) -> List[str]:
        return [word for word, pos in tagged if pos.startswith("JJ")]

    def get_adverbs(self, tagged: List[Tuple[str, str]]) -> List[str]:
        return [word for word, pos in tagged if pos.startswith("RB")]

34.2 文本表示

34.2.1 词袋模型

python
from typing import Dict, List, Tuple, Optional
import math
from collections import Counter, defaultdict


class BagOfWords:
    def __init__(self):
        self.vocabulary: Dict[str, int] = {}
        self.inverse_vocabulary: Dict[int, str] = {}
        self.document_count: int = 0

    def fit(self, documents: List[List[str]]) -> "BagOfWords":
        unique_words = set()
        for doc in documents:
            unique_words.update(doc)

        self.vocabulary = {word: idx for idx, word in enumerate(sorted(unique_words))}
        self.inverse_vocabulary = {idx: word for word, idx in self.vocabulary.items()}
        self.document_count = len(documents)

        return self

    def transform(self, documents: List[List[str]]) -> List[List[int]]:
        vectors = []
        for doc in documents:
            vector = [0] * len(self.vocabulary)
            for word in doc:
                if word in self.vocabulary:
                    vector[self.vocabulary[word]] += 1
            vectors.append(vector)
        return vectors

    def fit_transform(self, documents: List[List[str]]) -> List[List[int]]:
        self.fit(documents)
        return self.transform(documents)

    def get_vocabulary_size(self) -> int:
        return len(self.vocabulary)

    def get_word_from_index(self, index: int) -> Optional[str]:
        return self.inverse_vocabulary.get(index)

    def get_index_from_word(self, word: str) -> Optional[int]:
        return self.vocabulary.get(word)


class TFIDFVectorizer:
    def __init__(self, min_df: int = 1, max_df: float = 1.0, use_idf: bool = True):
        self.min_df = min_df
        self.max_df = max_df
        self.use_idf = use_idf
        self.vocabulary: Dict[str, int] = {}
        self.idf: Dict[str, float] = {}
        self.document_count: int = 0

    def fit(self, documents: List[List[str]]) -> "TFIDFVectorizer":
        self.document_count = len(documents)

        doc_freq: Dict[str, int] = defaultdict(int)
        all_words = set()

        for doc in documents:
            unique_words = set(doc)
            all_words.update(unique_words)
            for word in unique_words:
                doc_freq[word] += 1

        max_doc_count = self.max_df * self.document_count if isinstance(self.max_df, float) else self.max_df

        filtered_words = [
            word for word in all_words
            if doc_freq[word] >= self.min_df and doc_freq[word] <= max_doc_count
        ]

        self.vocabulary = {word: idx for idx, word in enumerate(sorted(filtered_words))}

        if self.use_idf:
            for word in self.vocabulary:
                df = doc_freq[word]
                self.idf[word] = math.log((self.document_count + 1) / (df + 1)) + 1

        return self

    def transform(self, documents: List[List[str]]) -> List[List[float]]:
        vectors = []

        for doc in documents:
            vector = [0.0] * len(self.vocabulary)
            term_freq = Counter(doc)

            for word, count in term_freq.items():
                if word in self.vocabulary:
                    tf = count / len(doc) if len(doc) > 0 else 0
                    tfidf = tf * self.idf.get(word, 1.0) if self.use_idf else tf
                    vector[self.vocabulary[word]] = tfidf

            vectors.append(vector)

        return vectors

    def fit_transform(self, documents: List[List[str]]) -> List[List[float]]:
        self.fit(documents)
        return self.transform(documents)

    def get_feature_names(self) -> List[str]:
        return [self.vocabulary_inverse[idx] for idx in range(len(self.vocabulary))]


class NGramVectorizer:
    def __init__(self, n: int = 2):
        self.n = n
        self.vocabulary: Dict[str, int] = {}

    def _get_ngrams(self, tokens: List[str]) -> List[str]:
        return [" ".join(tokens[i:i + self.n]) for i in range(len(tokens) - self.n + 1)]

    def fit(self, documents: List[List[str]]) -> "NGramVectorizer":
        all_ngrams = set()
        for doc in documents:
            ngrams = self._get_ngrams(doc)
            all_ngrams.update(ngrams)

        self.vocabulary = {ngram: idx for idx, ngram in enumerate(sorted(all_ngrams))}
        return self

    def transform(self, documents: List[List[str]]) -> List[List[int]]:
        vectors = []
        for doc in documents:
            vector = [0] * len(self.vocabulary)
            ngrams = self._get_ngrams(doc)
            for ngram in ngrams:
                if ngram in self.vocabulary:
                    vector[self.vocabulary[ngram]] += 1
            vectors.append(vector)
        return vectors

    def fit_transform(self, documents: List[List[str]]) -> List[List[int]]:
        self.fit(documents)
        return self.transform(documents)

34.2.2 词向量

python
class WordEmbedding:
    def __init__(self):
        self.embeddings: Dict[str, np.ndarray] = {}
        self.vector_size: int = 0

    def load_glove(self, filepath: str) -> None:
        with open(filepath, "r", encoding="utf-8") as f:
            for line in f:
                values = line.strip().split()
                word = values[0]
                vector = np.array(values[1:], dtype=np.float32)
                self.embeddings[word] = vector
                if self.vector_size == 0:
                    self.vector_size = len(vector)

    def load_word2vec(self, filepath: str) -> None:
        try:
            from gensim.models import KeyedVectors
            model = KeyedVectors.load_word2vec_format(filepath, binary=True)
            self.embeddings = {word: model[word] for word in model.key_to_index}
            self.vector_size = model.vector_size
        except ImportError:
            pass

    def get_vector(self, word: str) -> Optional[np.ndarray]:
        return self.embeddings.get(word.lower())

    def get_sentence_vector(self, tokens: List[str], method: str = "mean") -> Optional[np.ndarray]:
        vectors = []
        for token in tokens:
            vec = self.get_vector(token)
            if vec is not None:
                vectors.append(vec)

        if not vectors:
            return None

        vectors = np.array(vectors)

        if method == "mean":
            return np.mean(vectors, axis=0)
        elif method == "sum":
            return np.sum(vectors, axis=0)
        elif method == "max":
            return np.max(vectors, axis=0)
        else:
            return np.mean(vectors, axis=0)

    def cosine_similarity(self, word1: str, word2: str) -> float:
        vec1 = self.get_vector(word1)
        vec2 = self.get_vector(word2)

        if vec1 is None or vec2 is None:
            return 0.0

        dot_product = np.dot(vec1, vec2)
        norm1 = np.linalg.norm(vec1)
        norm2 = np.linalg.norm(vec2)

        if norm1 == 0 or norm2 == 0:
            return 0.0

        return dot_product / (norm1 * norm2)

    def most_similar(self, word: str, top_n: int = 10) -> List[Tuple[str, float]]:
        target_vec = self.get_vector(word)
        if target_vec is None:
            return []

        similarities = []
        for other_word, other_vec in self.embeddings.items():
            if other_word == word.lower():
                continue

            similarity = np.dot(target_vec, other_vec) / (
                np.linalg.norm(target_vec) * np.linalg.norm(other_vec)
            )
            similarities.append((other_word, float(similarity)))

        similarities.sort(key=lambda x: x[1], reverse=True)
        return similarities[:top_n]

    def analogy(
        self,
        word_a: str,
        word_b: str,
        word_c: str,
        top_n: int = 5
    ) -> List[Tuple[str, float]]:
        vec_a = self.get_vector(word_a)
        vec_b = self.get_vector(word_b)
        vec_c = self.get_vector(word_c)

        if vec_a is None or vec_b is None or vec_c is None:
            return []

        target_vec = vec_b - vec_a + vec_c

        similarities = []
        exclude = {word_a.lower(), word_b.lower(), word_c.lower()}

        for word, vec in self.embeddings.items():
            if word in exclude:
                continue

            similarity = np.dot(target_vec, vec) / (
                np.linalg.norm(target_vec) * np.linalg.norm(vec)
            )
            similarities.append((word, float(similarity)))

        similarities.sort(key=lambda x: x[1], reverse=True)
        return similarities[:top_n]


import numpy as np


class SimpleWord2Vec:
    def __init__(self, vector_size: int = 100, window: int = 5, min_count: int = 1):
        self.vector_size = vector_size
        self.window = window
        self.min_count = min_count
        self.vocabulary: Dict[str, int] = {}
        self.embeddings: Optional[np.ndarray] = None

    def _build_vocabulary(self, sentences: List[List[str]]) -> None:
        word_freq = Counter()
        for sentence in sentences:
            word_freq.update(sentence)

        self.vocabulary = {
            word: idx
            for idx, (word, freq) in enumerate(word_freq.items())
            if freq >= self.min_count
        }

    def _generate_training_data(self, sentences: List[List[str]]) -> List[Tuple[int, int]]:
        training_data = []

        for sentence in sentences:
            indices = [self.vocabulary[word] for word in sentence if word in self.vocabulary]

            for i, center in enumerate(indices):
                start = max(0, i - self.window)
                end = min(len(indices), i + self.window + 1)

                for j in range(start, end):
                    if i != j:
                        training_data.append((center, indices[j]))

        return training_data

    def train(self, sentences: List[List[str]], epochs: int = 10, learning_rate: float = 0.01) -> None:
        self._build_vocabulary(sentences)
        vocab_size = len(self.vocabulary)

        if vocab_size == 0:
            return

        self.embeddings = np.random.uniform(-0.5, 0.5, (vocab_size, self.vector_size))
        context_embeddings = np.random.uniform(-0.5, 0.5, (vocab_size, self.vector_size))

        training_data = self._generate_training_data(sentences)

        for _ in range(epochs):
            for center_idx, context_idx in training_data:
                center_vec = self.embeddings[center_idx]
                context_vec = context_embeddings[context_idx]

                dot_product = np.dot(center_vec, context_vec)
                sigmoid = 1 / (1 + np.exp(-dot_product))
                gradient = (sigmoid - 1) * learning_rate

                self.embeddings[center_idx] -= gradient * context_vec
                context_embeddings[context_idx] -= gradient * center_vec

    def get_vector(self, word: str) -> Optional[np.ndarray]:
        if word in self.vocabulary and self.embeddings is not None:
            return self.embeddings[self.vocabulary[word]]
        return None

34.3 文本分类

34.3.1 情感分析

python
from typing import Dict, List, Tuple, Optional
from dataclasses import dataclass
from enum import Enum


class Sentiment(Enum):
    POSITIVE = "positive"
    NEGATIVE = "negative"
    NEUTRAL = "neutral"


@dataclass
class SentimentResult:
    sentiment: Sentiment
    score: float
    confidence: float
    details: Dict[str, float]


class LexiconSentimentAnalyzer:
    def __init__(self):
        self.positive_words: Set[str] = set()
        self.negative_words: Set[str] = set()
        self.word_scores: Dict[str, float] = {}

    def load_lexicon(self, positive_file: str, negative_file: str) -> None:
        with open(positive_file, "r", encoding="utf-8") as f:
            self.positive_words = set(line.strip().lower() for line in f)

        with open(negative_file, "r", encoding="utf-8") as f:
            self.negative_words = set(line.strip().lower() for line in f)

    def load_vader_lexicon(self, filepath: str) -> None:
        with open(filepath, "r", encoding="utf-8") as f:
            for line in f:
                parts = line.strip().split("\t")
                if len(parts) >= 2:
                    word, score = parts[0], float(parts[1])
                    self.word_scores[word.lower()] = score

    def analyze(self, text: str) -> SentimentResult:
        tokens = text.lower().split()

        if self.word_scores:
            return self._analyze_with_scores(tokens)
        else:
            return self._analyze_with_sets(tokens)

    def _analyze_with_sets(self, tokens: List[str]) -> SentimentResult:
        positive_count = sum(1 for t in tokens if t in self.positive_words)
        negative_count = sum(1 for t in tokens if t in self.negative_words)

        total = positive_count + negative_count
        if total == 0:
            return SentimentResult(
                sentiment=Sentiment.NEUTRAL,
                score=0.0,
                confidence=0.0,
                details={"positive": 0, "negative": 0}
            )

        score = (positive_count - negative_count) / total
        confidence = total / len(tokens) if tokens else 0

        if score > 0.1:
            sentiment = Sentiment.POSITIVE
        elif score < -0.1:
            sentiment = Sentiment.NEGATIVE
        else:
            sentiment = Sentiment.NEUTRAL

        return SentimentResult(
            sentiment=sentiment,
            score=score,
            confidence=confidence,
            details={"positive": positive_count, "negative": negative_count}
        )

    def _analyze_with_scores(self, tokens: List[str]) -> SentimentResult:
        scores = [self.word_scores.get(t, 0) for t in tokens]

        if not scores:
            return SentimentResult(
                sentiment=Sentiment.NEUTRAL,
                score=0.0,
                confidence=0.0,
                details={"compound": 0}
            )

        compound = sum(scores) / len(scores)

        if compound >= 0.05:
            sentiment = Sentiment.POSITIVE
        elif compound <= -0.05:
            sentiment = Sentiment.NEGATIVE
        else:
            sentiment = Sentiment.NEUTRAL

        return SentimentResult(
            sentiment=sentiment,
            score=compound,
            confidence=abs(compound),
            details={"compound": compound}
        )


class TextClassifier:
    def __init__(self):
        self.vocabulary: Dict[str, int] = {}
        self.classes: List[str] = []
        self.class_priors: Dict[str, float] = {}
        self.word_probs: Dict[str, Dict[str, float]] = {}

    def fit(self, documents: List[str], labels: List[str]) -> "TextClassifier":
        self.classes = list(set(labels))
        class_docs: Dict[str, List[str]] = {c: [] for c in self.classes}

        all_words = set()
        for doc, label in zip(documents, labels):
            words = doc.lower().split()
            class_docs[label].extend(words)
            all_words.update(words)

        self.vocabulary = {word: idx for idx, word in enumerate(sorted(all_words))}

        total_docs = len(documents)
        for cls in self.classes:
            self.class_priors[cls] = labels.count(cls) / total_docs

        vocab_size = len(self.vocabulary)

        for cls in self.classes:
            words = class_docs[cls]
            word_counts = Counter(words)
            total_words = len(words)

            self.word_probs[cls] = {}
            for word in self.vocabulary:
                count = word_counts.get(word, 0)
                self.word_probs[cls][word] = (count + 1) / (total_words + vocab_size)

        return self

    def predict(self, document: str) -> Tuple[str, Dict[str, float]]:
        words = document.lower().split()

        scores = {}
        for cls in self.classes:
            score = math.log(self.class_priors[cls])
            for word in words:
                if word in self.vocabulary:
                    score += math.log(self.word_probs[cls][word])
            scores[cls] = score

        best_class = max(scores, key=scores.get)

        max_score = max(scores.values())
        exp_scores = {cls: math.exp(score - max_score) for cls, score in scores.items()}
        total = sum(exp_scores.values())
        probabilities = {cls: exp_score / total for cls, exp_score in exp_scores.items()}

        return best_class, probabilities

    def predict_batch(self, documents: List[str]) -> List[Tuple[str, Dict[str, float]]]:
        return [self.predict(doc) for doc in documents]

34.3.2 主题建模

python
class LDATopicModel:
    def __init__(self, n_topics: int = 10, alpha: float = 0.1, beta: float = 0.1):
        self.n_topics = n_topics
        self.alpha = alpha
        self.beta = beta
        self.vocabulary: Dict[str, int] = {}
        self.topic_word_dist: Optional[np.ndarray] = None
        self.doc_topic_dist: Optional[np.ndarray] = None

    def fit(
        self,
        documents: List[List[str]],
        n_iterations: int = 1000,
        random_state: int = 42
    ) -> "LDATopicModel":
        np.random.seed(random_state)

        all_words = set()
        for doc in documents:
            all_words.update(doc)
        self.vocabulary = {word: idx for idx, word in enumerate(sorted(all_words))}

        vocab_size = len(self.vocabulary)
        n_docs = len(documents)

        doc_word_ids = []
        for doc in documents:
            word_ids = [self.vocabulary[word] for word in doc if word in self.vocabulary]
            doc_word_ids.append(word_ids)

        topic_assignments = []
        for doc_words in doc_word_ids:
            topics = np.random.randint(0, self.n_topics, len(doc_words))
            topic_assignments.append(topics)

        doc_topic_counts = np.zeros((n_docs, self.n_topics))
        topic_word_counts = np.zeros((self.n_topics, vocab_size))
        topic_counts = np.zeros(self.n_topics)

        for d, (doc_words, doc_topics) in enumerate(zip(doc_word_ids, topic_assignments)):
            for w, t in zip(doc_words, doc_topics):
                doc_topic_counts[d, t] += 1
                topic_word_counts[t, w] += 1
                topic_counts[t] += 1

        for _ in range(n_iterations):
            for d, (doc_words, doc_topics) in enumerate(zip(doc_word_ids, topic_assignments)):
                for i, (w, t) in enumerate(zip(doc_words, doc_topics)):
                    doc_topic_counts[d, t] -= 1
                    topic_word_counts[t, w] -= 1
                    topic_counts[t] -= 1

                    probs = (doc_topic_counts[d] + self.alpha) * \
                            (topic_word_counts[:, w] + self.beta) / \
                            (topic_counts + vocab_size * self.beta)

                    new_t = np.random.choice(self.n_topics, p=probs / probs.sum())

                    doc_topics[i] = new_t
                    doc_topic_counts[d, new_t] += 1
                    topic_word_counts[new_t, w] += 1
                    topic_counts[new_t] += 1

        self.topic_word_dist = (topic_word_counts + self.beta) / \
                               (topic_counts[:, np.newaxis] + vocab_size * self.beta)
        self.doc_topic_dist = (doc_topic_counts + self.alpha) / \
                              (doc_topic_counts.sum(axis=1, keepdims=True) + self.n_topics * self.alpha)

        return self

    def get_topic_words(self, topic_id: int, top_n: int = 10) -> List[Tuple[str, float]]:
        if self.topic_word_dist is None:
            return []

        inv_vocab = {idx: word for word, idx in self.vocabulary.items()}
        topic_probs = self.topic_word_dist[topic_id]
        top_indices = np.argsort(topic_probs)[::-1][:top_n]

        return [(inv_vocab[idx], topic_probs[idx]) for idx in top_indices]

    def get_document_topics(self, doc_id: int) -> List[Tuple[int, float]]:
        if self.doc_topic_dist is None:
            return []

        probs = self.doc_topic_dist[doc_id]
        return [(t, probs[t]) for t in range(self.n_topics) if probs[t] > 0.01]

    def get_all_topics(self, top_n: int = 10) -> Dict[int, List[Tuple[str, float]]]:
        return {t: self.get_topic_words(t, top_n) for t in range(self.n_topics)}

34.4 命名实体识别

34.4.1 基于规则的NER

python
from typing import List, Tuple, Dict, Optional
from dataclasses import dataclass
from enum import Enum
import re


class EntityType(Enum):
    PERSON = "PERSON"
    ORGANIZATION = "ORGANIZATION"
    LOCATION = "LOCATION"
    DATE = "DATE"
    TIME = "TIME"
    MONEY = "MONEY"
    PERCENT = "PERCENT"
    EMAIL = "EMAIL"
    PHONE = "PHONE"
    URL = "URL"


@dataclass
class Entity:
    text: str
    entity_type: EntityType
    start: int
    end: int
    confidence: float = 1.0


class RuleBasedNER:
    def __init__(self):
        self._patterns: Dict[EntityType, List[str]] = {}
        self._gazetteers: Dict[EntityType, Set[str]] = {}
        self._compile_patterns()

    def _compile_patterns(self) -> None:
        self._compiled_patterns = {
            EntityType.EMAIL: re.compile(r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b"),
            EntityType.PHONE: re.compile(r"\b\d{3}[-.]?\d{3}[-.]?\d{4}\b"),
            EntityType.URL: re.compile(r"https?://[^\s]+"),
            EntityType.DATE: re.compile(
                r"\b\d{1,2}[/-]\d{1,2}[/-]\d{2,4}\b|"
                r"\b(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)[a-z]*\s+\d{1,2},?\s+\d{4}\b"
            ),
            EntityType.TIME: re.compile(r"\b\d{1,2}:\d{2}(?::\d{2})?(?:\s*[AP]M)?\b"),
            EntityType.MONEY: re.compile(r"\$\d+(?:,\d{3})*(?:\.\d{2})?|\d+\s*(?:dollars?|euros?|yen)\b"),
            EntityType.PERCENT: re.compile(r"\d+(?:\.\d+)?%")
        }

    def add_gazetteer(self, entity_type: EntityType, entities: List[str]) -> None:
        if entity_type not in self._gazetteers:
            self._gazetteers[entity_type] = set()
        self._gazetteers[entity_type].update(e.lower() for e in entities)

    def extract(self, text: str) -> List[Entity]:
        entities = []

        for entity_type, pattern in self._compiled_patterns.items():
            for match in pattern.finditer(text):
                entities.append(Entity(
                    text=match.group(),
                    entity_type=entity_type,
                    start=match.start(),
                    end=match.end()
                ))

        for entity_type, gazetteer in self._gazetteers.items():
            for entity_text in gazetteer:
                pattern = re.compile(r"\b" + re.escape(entity_text) + r"\b", re.IGNORECASE)
                for match in pattern.finditer(text):
                    entities.append(Entity(
                        text=match.group(),
                        entity_type=entity_type,
                        start=match.start(),
                        end=match.end()
                    ))

        entities.sort(key=lambda e: e.start)

        return self._remove_overlaps(entities)

    def _remove_overlaps(self, entities: List[Entity]) -> List[Entity]:
        if not entities:
            return []

        filtered = [entities[0]]
        for entity in entities[1:]:
            if entity.start >= filtered[-1].end:
                filtered.append(entity)
        return filtered


class SpacyNER:
    def __init__(self, model: str = "en_core_web_sm"):
        self._available = False
        try:
            import spacy
            self.nlp = spacy.load(model)
            self._available = True
        except ImportError:
            pass

    def extract(self, text: str) -> List[Entity]:
        if not self._available:
            return []

        doc = self.nlp(text)
        entities = []

        for ent in doc.ents:
            entity_type = self._map_entity_type(ent.label_)
            entities.append(Entity(
                text=ent.text,
                entity_type=entity_type,
                start=ent.start_char,
                end=ent.end_char
            ))

        return entities

    @staticmethod
    def _map_entity_type(spacy_label: str) -> EntityType:
        mapping = {
            "PERSON": EntityType.PERSON,
            "ORG": EntityType.ORGANIZATION,
            "GPE": EntityType.LOCATION,
            "LOC": EntityType.LOCATION,
            "DATE": EntityType.DATE,
            "TIME": EntityType.TIME,
            "MONEY": EntityType.MONEY,
            "PERCENT": EntityType.PERCENT
        }
        return mapping.get(spacy_label, EntityType.PERSON)

34.5 文本相似度

34.5.1 相似度计算

python
class TextSimilarity:
    @staticmethod
    def jaccard_similarity(set1: Set[str], set2: Set[str]) -> float:
        if not set1 and not set2:
            return 1.0
        intersection = len(set1 & set2)
        union = len(set1 | set2)
        return intersection / union if union > 0 else 0.0

    @staticmethod
    def dice_similarity(set1: Set[str], set2: Set[str]) -> float:
        if not set1 and not set2:
            return 1.0
        intersection = len(set1 & set2)
        return 2 * intersection / (len(set1) + len(set2)) if (len(set1) + len(set2)) > 0 else 0.0

    @staticmethod
    def cosine_similarity(vec1: List[float], vec2: List[float]) -> float:
        vec1 = np.array(vec1)
        vec2 = np.array(vec2)

        dot_product = np.dot(vec1, vec2)
        norm1 = np.linalg.norm(vec1)
        norm2 = np.linalg.norm(vec2)

        if norm1 == 0 or norm2 == 0:
            return 0.0

        return float(dot_product / (norm1 * norm2))

    @staticmethod
    def euclidean_distance(vec1: List[float], vec2: List[float]) -> float:
        return float(np.linalg.norm(np.array(vec1) - np.array(vec2)))

    @staticmethod
    def manhattan_distance(vec1: List[float], vec2: List[float]) -> float:
        return float(np.sum(np.abs(np.array(vec1) - np.array(vec2))))


class EditDistance:
    @staticmethod
    def levenshtein_distance(s1: str, s2: str) -> int:
        if len(s1) < len(s2):
            return EditDistance.levenshtein_distance(s2, s1)

        if len(s2) == 0:
            return len(s1)

        previous_row = range(len(s2) + 1)
        for i, c1 in enumerate(s1):
            current_row = [i + 1]
            for j, c2 in enumerate(s2):
                insertions = previous_row[j + 1] + 1
                deletions = current_row[j] + 1
                substitutions = previous_row[j] + (c1 != c2)
                current_row.append(min(insertions, deletions, substitutions))
            previous_row = current_row

        return previous_row[-1]

    @staticmethod
    def levenshtein_similarity(s1: str, s2: str) -> float:
        distance = EditDistance.levenshtein_distance(s1, s2)
        max_len = max(len(s1), len(s2))
        return 1 - distance / max_len if max_len > 0 else 1.0

    @staticmethod
    def damerau_levenshtein_distance(s1: str, s2: str) -> int:
        d = {}
        lenstr1 = len(s1)
        lenstr2 = len(s2)

        for i in range(-1, lenstr1 + 1):
            d[(i, -1)] = i + 1
        for j in range(-1, lenstr2 + 1):
            d[(-1, j)] = j + 1

        for i in range(lenstr1):
            for j in range(lenstr2):
                if s1[i] == s2[j]:
                    cost = 0
                else:
                    cost = 1

                d[(i, j)] = min(
                    d[(i - 1, j)] + 1,
                    d[(i, j - 1)] + 1,
                    d[(i - 1, j - 1)] + cost
                )

                if i and j and s1[i] == s2[j - 1] and s1[i - 1] == s2[j]:
                    d[(i, j)] = min(d[(i, j)], d[(i - 2, j - 2)] + cost)

        return d[(lenstr1 - 1, lenstr2 - 1)]

    @staticmethod
    def jaro_winkler_similarity(s1: str, s2: str) -> float:
        if s1 == s2:
            return 1.0

        len1 = len(s1)
        len2 = len(s2)

        if len1 == 0 or len2 == 0:
            return 0.0

        match_distance = max(len1, len2) // 2 - 1
        if match_distance < 0:
            match_distance = 0

        s1_matches = [False] * len1
        s2_matches = [False] * len2

        matches = 0
        transpositions = 0

        for i in range(len1):
            start = max(0, i - match_distance)
            end = min(i + match_distance + 1, len2)

            for j in range(start, end):
                if s2_matches[j] or s1[i] != s2[j]:
                    continue
                s1_matches[i] = True
                s2_matches[j] = True
                matches += 1
                break

        if matches == 0:
            return 0.0

        k = 0
        for i in range(len1):
            if not s1_matches[i]:
                continue
            while not s2_matches[k]:
                k += 1
            if s1[i] != s2[k]:
                transpositions += 1
            k += 1

        jaro = (matches / len1 + matches / len2 + (matches - transpositions / 2) / matches) / 3

        prefix = 0
        for i in range(min(len1, len2, 4)):
            if s1[i] == s2[i]:
                prefix += 1
            else:
                break

        return jaro + prefix * 0.1 * (1 - jaro)

34.6 知识图谱

34.6.1 NLP技术体系

自然语言处理技术层次

┌─────────────────────────────────────────────────────────────┐
│                    应用层                                   │
│  机器翻译、问答系统、情感分析、文本摘要、对话系统          │
└─────────────────────────────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────┐
│                    算法层                                   │
│  分类、序列标注、生成、匹配、聚类                          │
└─────────────────────────────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────┐
│                    表示层                                   │
│  词袋、TF-IDF、词向量、上下文嵌入                          │
└─────────────────────────────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────┐
│                    预处理层                                 │
│  分词、清洗、规范化、标注                                  │
└─────────────────────────────────────────────────────────────┘

NLP工具链:
┌─────────────────────────────────────────┐
│ NLTK      经典NLP工具包                 │
│ spaCy     工业级NLP库                   │
│ Transformers  预训练模型                │
│ jieba     中文分词                      │
│ Gensim    主题模型                      │
└─────────────────────────────────────────┘

34.6.2 文本处理流程

NLP标准处理流程

┌─────────────────────────────────────────┐
│ 1. 文本获取    爬虫、API、文件          │
│ 2. 文本清洗    去除噪声、HTML标签       │
│ 3. 分词        中文/英文分词            │
│ 4. 规范化      词干提取、词形还原       │
│ 5. 停用词过滤  去除无意义词             │
│ 6. 特征提取    TF-IDF、词向量           │
│ 7. 模型训练    分类、聚类等             │
│ 8. 评估优化    准确率、召回率           │
└─────────────────────────────────────────┘

34.7 技术选型指南

34.7.1 NLP库选型

场景推荐库原因
教学/研究NLTK功能全面
生产环境spaCy性能优秀
中文处理jieba + spaCy中文支持好
深度学习Transformers预训练模型

34.7.2 分词工具选型

语言推荐工具说明
中文jieba简单易用
中文HanLP功能丰富
英文spaCy工业级
多语言stanza斯坦福NLP

34.7.3 词向量选型

场景推荐方案说明
静态词向量Word2Vec/FastText训练快
上下文向量BERT效果好
领域特定微调预训练模型定制化

34.8 常见问题与解决方案

34.8.1 中文分词问题

python
# 问题:中文分词不准确
# 解决方案:使用自定义词典

import jieba

# 添加自定义词
jieba.add_word('Python编程')
jieba.add_word('机器学习')

# 或加载词典文件
jieba.load_userdict('custom_dict.txt')

text = "Python编程是机器学习的基础"
words = jieba.lcut(text)

34.8.2 编码问题

python
# 问题:文本编码错误
# 解决方案:统一编码

import chardet

def safe_decode(content):
    encoding = chardet.detect(content)['encoding']
    return content.decode(encoding or 'utf-8', errors='ignore')

34.8.3 内存问题

python
# 问题:大文本处理内存溢出
# 解决方案:流式处理

def process_large_file(filepath):
    with open(filepath, 'r', encoding='utf-8') as f:
        for line in f:
            yield process_line(line)

# 使用生成器
for processed in process_large_file('large.txt'):
    save_result(processed)

34.9 本章小结

本章详细介绍了Python自然语言处理的核心概念和实践:

  1. 文本预处理:文本清洗、分词、词干提取、词形还原
  2. 文本表示:词袋模型、TF-IDF、词向量
  3. 文本分类:情感分析、朴素贝叶斯分类器
  4. 主题建模:LDA主题模型
  5. 命名实体识别:基于规则和基于模型的NER
  6. 文本相似度:Jaccard、余弦相似度、编辑距离
  7. 应用实例:完整的NLP处理流程

练习题

  1. 实现一个文本预处理流水线,支持多种清洗和规范化操作
  2. 开发一个垃圾邮件分类器,使用TF-IDF和朴素贝叶斯
  3. 实现一个简单的问答系统,基于文本相似度匹配
  4. 开发一个关键词提取工具,使用TF-IDF和TextRank算法
  5. 实现一个文档摘要生成器,提取文档的关键句子

扩展阅读

Python技术丛书 - 江苏省宿城中等专业学校