第34章 自然语言处理
学习目标
完成本章学习后,你将能够:
- 理解NLP基础:文本预处理、分词、词性标注
- 掌握文本处理技术:停用词过滤、词干提取、词形还原
- 实现文本表示:词袋模型、TF-IDF、词向量
- 进行文本分类:情感分析、主题分类、文本聚类
- 实现命名实体识别:人名、地名、组织名识别
- 应用文本相似度:余弦相似度、编辑距离、语义相似度
- 构建文本生成:语言模型、文本摘要
- 开发NLP应用:聊天机器人、问答系统
34.1 文本预处理
34.1.1 文本清洗
python
import re
import string
from typing import List, Optional, Dict, Tuple, Set
from dataclasses import dataclass
from collections import Counter
import unicodedata
@dataclass
class TextStats:
char_count: int
word_count: int
sentence_count: int
avg_word_length: float
avg_sentence_length: float
unique_words: int
vocabulary_richness: float
class TextCleaner:
@staticmethod
def to_lowercase(text: str) -> str:
return text.lower()
@staticmethod
def remove_punctuation(text: str) -> str:
return text.translate(str.maketrans("", "", string.punctuation))
@staticmethod
def remove_digits(text: str) -> str:
return re.sub(r"\d+", "", text)
@staticmethod
def remove_whitespace(text: str) -> str:
return " ".join(text.split())
@staticmethod
def remove_urls(text: str) -> str:
return re.sub(r"http\S+|www\.\S+", "", text)
@staticmethod
def remove_emails(text: str) -> str:
return re.sub(r"\S+@\S+", "", text)
@staticmethod
def remove_html_tags(text: str) -> str:
return re.sub(r"<[^>]+>", "", text)
@staticmethod
def remove_emojis(text: str) -> str:
emoji_pattern = re.compile(
"["
"\U0001F600-\U0001F64F"
"\U0001F300-\U0001F5FF"
"\U0001F680-\U0001F6FF"
"\U0001F1E0-\U0001F1FF"
"\U00002702-\U000027B0"
"\U000024C2-\U0001F251"
"]+",
flags=re.UNICODE
)
return emoji_pattern.sub("", text)
@staticmethod
def remove_special_chars(text: str, keep: str = "") -> str:
pattern = f"[^{re.escape(keep)}a-zA-Z0-9\s]"
return re.sub(pattern, "", text)
@staticmethod
def normalize_unicode(text: str, form: str = "NFKC") -> str:
return unicodedata.normalize(form, text)
@staticmethod
def expand_contractions(text: str) -> str:
contractions = {
"won't": "will not",
"can't": "cannot",
"n't": " not",
"'re": " are",
"'s": " is",
"'d": " would",
"'ll": " will",
"'ve": " have",
"'m": " am"
}
for contraction, expansion in contractions.items():
text = text.replace(contraction, expansion)
return text
@staticmethod
def clean_all(text: str) -> str:
text = TextCleaner.remove_html_tags(text)
text = TextCleaner.remove_urls(text)
text = TextCleaner.remove_emails(text)
text = TextCleaner.remove_emojis(text)
text = TextCleaner.normalize_unicode(text)
text = TextCleaner.expand_contractions(text)
text = TextCleaner.remove_punctuation(text)
text = TextCleaner.remove_digits(text)
text = TextCleaner.remove_whitespace(text)
text = TextCleaner.to_lowercase(text)
return text
class TextStatistics:
@staticmethod
def get_stats(text: str) -> TextStats:
words = text.split()
sentences = re.split(r"[.!?]+", text)
sentences = [s.strip() for s in sentences if s.strip()]
char_count = len(text)
word_count = len(words)
sentence_count = len(sentences)
avg_word_length = sum(len(w) for w in words) / word_count if word_count > 0 else 0
avg_sentence_length = word_count / sentence_count if sentence_count > 0 else 0
unique_words = len(set(w.lower() for w in words))
vocabulary_richness = unique_words / word_count if word_count > 0 else 0
return TextStats(
char_count=char_count,
word_count=word_count,
sentence_count=sentence_count,
avg_word_length=avg_word_length,
avg_sentence_length=avg_sentence_length,
unique_words=unique_words,
vocabulary_richness=vocabulary_richness
)
@staticmethod
def word_frequency(text: str, top_n: int = 10) -> Dict[str, int]:
words = text.lower().split()
counter = Counter(words)
return dict(counter.most_common(top_n))
@staticmethod
def ngram_frequency(text: str, n: int = 2, top_n: int = 10) -> Dict[str, int]:
words = text.lower().split()
ngrams = [" ".join(words[i:i + n]) for i in range(len(words) - n + 1)]
counter = Counter(ngrams)
return dict(counter.most_common(top_n))34.1.2 分词
python
class Tokenizer:
@staticmethod
def word_tokenize(text: str) -> List[str]:
pattern = r"\b\w+\b"
return re.findall(pattern, text.lower())
@staticmethod
def sentence_tokenize(text: str) -> List[str]:
sentences = re.split(r"(?<=[.!?])\s+", text)
return [s.strip() for s in sentences if s.strip()]
@staticmethod
def char_tokenize(text: str) -> List[str]:
return list(text)
@staticmethod
def word_tokenize_with_punctuation(text: str) -> List[str]:
pattern = r"\w+|[^\w\s]"
return re.findall(pattern, text)
@staticmethod
def tokenize_by_regex(text: str, pattern: str) -> List[str]:
return re.findall(pattern, text)
class ChineseTokenizer:
@staticmethod
def tokenize_jieba(text: str) -> List[str]:
try:
import jieba
return list(jieba.cut(text))
except ImportError:
return list(text)
@staticmethod
def tokenize_jieba_pos(text: str) -> List[Tuple[str, str]]:
try:
import jieba.posseg as pseg
return [(word, flag) for word, flag in pseg.cut(text)]
except ImportError:
return [(char, "x") for char in text]
@staticmethod
def tokenize_by_char(text: str) -> List[str]:
return list(text)
class TokenFilter:
def __init__(self):
self._stopwords: Set[str] = set()
self._min_length: int = 1
self._max_length: int = 100
def load_stopwords(self, filepath: str) -> None:
with open(filepath, "r", encoding="utf-8") as f:
self._stopwords = set(line.strip().lower() for line in f)
def set_stopwords(self, stopwords: Set[str]) -> None:
self._stopwords = stopwords
def set_length_filter(self, min_length: int, max_length: int) -> None:
self._min_length = min_length
self._max_length = max_length
def filter_tokens(self, tokens: List[str]) -> List[str]:
filtered = []
for token in tokens:
if token.lower() in self._stopwords:
continue
if len(token) < self._min_length or len(token) > self._max_length:
continue
filtered.append(token)
return filtered
def filter_stopwords(self, tokens: List[str]) -> List[str]:
return [t for t in tokens if t.lower() not in self._stopwords]
def filter_by_length(self, tokens: List[str], min_len: int = 1, max_len: int = 100) -> List[str]:
return [t for t in tokens if min_len <= len(t) <= max_len]
def filter_digits(self, tokens: List[str]) -> List[str]:
return [t for t in tokens if not t.isdigit()]
def filter_punctuation_tokens(self, tokens: List[str]) -> List[str]:
return [t for t in tokens if t.isalnum()]34.1.3 词干提取与词形还原
python
class Stemmer:
def __init__(self):
try:
from nltk.stem import PorterStemmer, LancasterStemmer, SnowballStemmer
self._porter = PorterStemmer()
self._lancaster = LancasterStemmer()
self._snowball = SnowballStemmer("english")
self._available = True
except ImportError:
self._available = False
def porter_stem(self, word: str) -> str:
if self._available:
return self._porter.stem(word)
return word
def lancaster_stem(self, word: str) -> str:
if self._available:
return self._lancaster.stem(word)
return word
def snowball_stem(self, word: str) -> str:
if self._available:
return self._snowball.stem(word)
return word
def stem_tokens(self, tokens: List[str], method: str = "porter") -> List[str]:
stem_func = {
"porter": self.porter_stem,
"lancaster": self.lancaster_stem,
"snowball": self.snowball_stem
}.get(method, self.porter_stem)
return [stem_func(token) for token in tokens]
class Lemmatizer:
def __init__(self):
try:
from nltk.stem import WordNetLemmatizer
self._lemmatizer = WordNetLemmatizer()
self._available = True
except ImportError:
self._available = False
def lemmatize(self, word: str, pos: str = "n") -> str:
if self._available:
return self._lemmatizer.lemmatize(word, pos)
return word
def lemmatize_tokens(self, tokens: List[str], pos: str = "n") -> List[str]:
return [self.lemmatize(token, pos) for token in tokens]
def lemmatize_with_pos(self, tokens: List[Tuple[str, str]]) -> List[str]:
pos_map = {
"NN": "n", "NNS": "n", "NNP": "n", "NNPS": "n",
"VB": "v", "VBD": "v", "VBG": "v", "VBN": "v", "VBP": "v", "VBZ": "v",
"JJ": "a", "JJR": "a", "JJS": "a",
"RB": "r", "RBR": "r", "RBS": "r"
}
return [
self.lemmatize(word, pos_map.get(pos, "n"))
for word, pos in tokens
]
class POSProcessor:
def __init__(self):
try:
import nltk
nltk.download("punkt", quiet=True)
nltk.download("averaged_perceptron_tagger", quiet=True)
self._available = True
except ImportError:
self._available = False
def pos_tag(self, tokens: List[str]) -> List[Tuple[str, str]]:
if self._available:
import nltk
return nltk.pos_tag(tokens)
return [(token, "UNK") for token in tokens]
def get_nouns(self, tagged: List[Tuple[str, str]]) -> List[str]:
return [word for word, pos in tagged if pos.startswith("NN")]
def get_verbs(self, tagged: List[Tuple[str, str]]) -> List[str]:
return [word for word, pos in tagged if pos.startswith("VB")]
def get_adjectives(self, tagged: List[Tuple[str, str]]) -> List[str]:
return [word for word, pos in tagged if pos.startswith("JJ")]
def get_adverbs(self, tagged: List[Tuple[str, str]]) -> List[str]:
return [word for word, pos in tagged if pos.startswith("RB")]34.2 文本表示
34.2.1 词袋模型
python
from typing import Dict, List, Tuple, Optional
import math
from collections import Counter, defaultdict
class BagOfWords:
def __init__(self):
self.vocabulary: Dict[str, int] = {}
self.inverse_vocabulary: Dict[int, str] = {}
self.document_count: int = 0
def fit(self, documents: List[List[str]]) -> "BagOfWords":
unique_words = set()
for doc in documents:
unique_words.update(doc)
self.vocabulary = {word: idx for idx, word in enumerate(sorted(unique_words))}
self.inverse_vocabulary = {idx: word for word, idx in self.vocabulary.items()}
self.document_count = len(documents)
return self
def transform(self, documents: List[List[str]]) -> List[List[int]]:
vectors = []
for doc in documents:
vector = [0] * len(self.vocabulary)
for word in doc:
if word in self.vocabulary:
vector[self.vocabulary[word]] += 1
vectors.append(vector)
return vectors
def fit_transform(self, documents: List[List[str]]) -> List[List[int]]:
self.fit(documents)
return self.transform(documents)
def get_vocabulary_size(self) -> int:
return len(self.vocabulary)
def get_word_from_index(self, index: int) -> Optional[str]:
return self.inverse_vocabulary.get(index)
def get_index_from_word(self, word: str) -> Optional[int]:
return self.vocabulary.get(word)
class TFIDFVectorizer:
def __init__(self, min_df: int = 1, max_df: float = 1.0, use_idf: bool = True):
self.min_df = min_df
self.max_df = max_df
self.use_idf = use_idf
self.vocabulary: Dict[str, int] = {}
self.idf: Dict[str, float] = {}
self.document_count: int = 0
def fit(self, documents: List[List[str]]) -> "TFIDFVectorizer":
self.document_count = len(documents)
doc_freq: Dict[str, int] = defaultdict(int)
all_words = set()
for doc in documents:
unique_words = set(doc)
all_words.update(unique_words)
for word in unique_words:
doc_freq[word] += 1
max_doc_count = self.max_df * self.document_count if isinstance(self.max_df, float) else self.max_df
filtered_words = [
word for word in all_words
if doc_freq[word] >= self.min_df and doc_freq[word] <= max_doc_count
]
self.vocabulary = {word: idx for idx, word in enumerate(sorted(filtered_words))}
if self.use_idf:
for word in self.vocabulary:
df = doc_freq[word]
self.idf[word] = math.log((self.document_count + 1) / (df + 1)) + 1
return self
def transform(self, documents: List[List[str]]) -> List[List[float]]:
vectors = []
for doc in documents:
vector = [0.0] * len(self.vocabulary)
term_freq = Counter(doc)
for word, count in term_freq.items():
if word in self.vocabulary:
tf = count / len(doc) if len(doc) > 0 else 0
tfidf = tf * self.idf.get(word, 1.0) if self.use_idf else tf
vector[self.vocabulary[word]] = tfidf
vectors.append(vector)
return vectors
def fit_transform(self, documents: List[List[str]]) -> List[List[float]]:
self.fit(documents)
return self.transform(documents)
def get_feature_names(self) -> List[str]:
return [self.vocabulary_inverse[idx] for idx in range(len(self.vocabulary))]
class NGramVectorizer:
def __init__(self, n: int = 2):
self.n = n
self.vocabulary: Dict[str, int] = {}
def _get_ngrams(self, tokens: List[str]) -> List[str]:
return [" ".join(tokens[i:i + self.n]) for i in range(len(tokens) - self.n + 1)]
def fit(self, documents: List[List[str]]) -> "NGramVectorizer":
all_ngrams = set()
for doc in documents:
ngrams = self._get_ngrams(doc)
all_ngrams.update(ngrams)
self.vocabulary = {ngram: idx for idx, ngram in enumerate(sorted(all_ngrams))}
return self
def transform(self, documents: List[List[str]]) -> List[List[int]]:
vectors = []
for doc in documents:
vector = [0] * len(self.vocabulary)
ngrams = self._get_ngrams(doc)
for ngram in ngrams:
if ngram in self.vocabulary:
vector[self.vocabulary[ngram]] += 1
vectors.append(vector)
return vectors
def fit_transform(self, documents: List[List[str]]) -> List[List[int]]:
self.fit(documents)
return self.transform(documents)34.2.2 词向量
python
class WordEmbedding:
def __init__(self):
self.embeddings: Dict[str, np.ndarray] = {}
self.vector_size: int = 0
def load_glove(self, filepath: str) -> None:
with open(filepath, "r", encoding="utf-8") as f:
for line in f:
values = line.strip().split()
word = values[0]
vector = np.array(values[1:], dtype=np.float32)
self.embeddings[word] = vector
if self.vector_size == 0:
self.vector_size = len(vector)
def load_word2vec(self, filepath: str) -> None:
try:
from gensim.models import KeyedVectors
model = KeyedVectors.load_word2vec_format(filepath, binary=True)
self.embeddings = {word: model[word] for word in model.key_to_index}
self.vector_size = model.vector_size
except ImportError:
pass
def get_vector(self, word: str) -> Optional[np.ndarray]:
return self.embeddings.get(word.lower())
def get_sentence_vector(self, tokens: List[str], method: str = "mean") -> Optional[np.ndarray]:
vectors = []
for token in tokens:
vec = self.get_vector(token)
if vec is not None:
vectors.append(vec)
if not vectors:
return None
vectors = np.array(vectors)
if method == "mean":
return np.mean(vectors, axis=0)
elif method == "sum":
return np.sum(vectors, axis=0)
elif method == "max":
return np.max(vectors, axis=0)
else:
return np.mean(vectors, axis=0)
def cosine_similarity(self, word1: str, word2: str) -> float:
vec1 = self.get_vector(word1)
vec2 = self.get_vector(word2)
if vec1 is None or vec2 is None:
return 0.0
dot_product = np.dot(vec1, vec2)
norm1 = np.linalg.norm(vec1)
norm2 = np.linalg.norm(vec2)
if norm1 == 0 or norm2 == 0:
return 0.0
return dot_product / (norm1 * norm2)
def most_similar(self, word: str, top_n: int = 10) -> List[Tuple[str, float]]:
target_vec = self.get_vector(word)
if target_vec is None:
return []
similarities = []
for other_word, other_vec in self.embeddings.items():
if other_word == word.lower():
continue
similarity = np.dot(target_vec, other_vec) / (
np.linalg.norm(target_vec) * np.linalg.norm(other_vec)
)
similarities.append((other_word, float(similarity)))
similarities.sort(key=lambda x: x[1], reverse=True)
return similarities[:top_n]
def analogy(
self,
word_a: str,
word_b: str,
word_c: str,
top_n: int = 5
) -> List[Tuple[str, float]]:
vec_a = self.get_vector(word_a)
vec_b = self.get_vector(word_b)
vec_c = self.get_vector(word_c)
if vec_a is None or vec_b is None or vec_c is None:
return []
target_vec = vec_b - vec_a + vec_c
similarities = []
exclude = {word_a.lower(), word_b.lower(), word_c.lower()}
for word, vec in self.embeddings.items():
if word in exclude:
continue
similarity = np.dot(target_vec, vec) / (
np.linalg.norm(target_vec) * np.linalg.norm(vec)
)
similarities.append((word, float(similarity)))
similarities.sort(key=lambda x: x[1], reverse=True)
return similarities[:top_n]
import numpy as np
class SimpleWord2Vec:
def __init__(self, vector_size: int = 100, window: int = 5, min_count: int = 1):
self.vector_size = vector_size
self.window = window
self.min_count = min_count
self.vocabulary: Dict[str, int] = {}
self.embeddings: Optional[np.ndarray] = None
def _build_vocabulary(self, sentences: List[List[str]]) -> None:
word_freq = Counter()
for sentence in sentences:
word_freq.update(sentence)
self.vocabulary = {
word: idx
for idx, (word, freq) in enumerate(word_freq.items())
if freq >= self.min_count
}
def _generate_training_data(self, sentences: List[List[str]]) -> List[Tuple[int, int]]:
training_data = []
for sentence in sentences:
indices = [self.vocabulary[word] for word in sentence if word in self.vocabulary]
for i, center in enumerate(indices):
start = max(0, i - self.window)
end = min(len(indices), i + self.window + 1)
for j in range(start, end):
if i != j:
training_data.append((center, indices[j]))
return training_data
def train(self, sentences: List[List[str]], epochs: int = 10, learning_rate: float = 0.01) -> None:
self._build_vocabulary(sentences)
vocab_size = len(self.vocabulary)
if vocab_size == 0:
return
self.embeddings = np.random.uniform(-0.5, 0.5, (vocab_size, self.vector_size))
context_embeddings = np.random.uniform(-0.5, 0.5, (vocab_size, self.vector_size))
training_data = self._generate_training_data(sentences)
for _ in range(epochs):
for center_idx, context_idx in training_data:
center_vec = self.embeddings[center_idx]
context_vec = context_embeddings[context_idx]
dot_product = np.dot(center_vec, context_vec)
sigmoid = 1 / (1 + np.exp(-dot_product))
gradient = (sigmoid - 1) * learning_rate
self.embeddings[center_idx] -= gradient * context_vec
context_embeddings[context_idx] -= gradient * center_vec
def get_vector(self, word: str) -> Optional[np.ndarray]:
if word in self.vocabulary and self.embeddings is not None:
return self.embeddings[self.vocabulary[word]]
return None34.3 文本分类
34.3.1 情感分析
python
from typing import Dict, List, Tuple, Optional
from dataclasses import dataclass
from enum import Enum
class Sentiment(Enum):
POSITIVE = "positive"
NEGATIVE = "negative"
NEUTRAL = "neutral"
@dataclass
class SentimentResult:
sentiment: Sentiment
score: float
confidence: float
details: Dict[str, float]
class LexiconSentimentAnalyzer:
def __init__(self):
self.positive_words: Set[str] = set()
self.negative_words: Set[str] = set()
self.word_scores: Dict[str, float] = {}
def load_lexicon(self, positive_file: str, negative_file: str) -> None:
with open(positive_file, "r", encoding="utf-8") as f:
self.positive_words = set(line.strip().lower() for line in f)
with open(negative_file, "r", encoding="utf-8") as f:
self.negative_words = set(line.strip().lower() for line in f)
def load_vader_lexicon(self, filepath: str) -> None:
with open(filepath, "r", encoding="utf-8") as f:
for line in f:
parts = line.strip().split("\t")
if len(parts) >= 2:
word, score = parts[0], float(parts[1])
self.word_scores[word.lower()] = score
def analyze(self, text: str) -> SentimentResult:
tokens = text.lower().split()
if self.word_scores:
return self._analyze_with_scores(tokens)
else:
return self._analyze_with_sets(tokens)
def _analyze_with_sets(self, tokens: List[str]) -> SentimentResult:
positive_count = sum(1 for t in tokens if t in self.positive_words)
negative_count = sum(1 for t in tokens if t in self.negative_words)
total = positive_count + negative_count
if total == 0:
return SentimentResult(
sentiment=Sentiment.NEUTRAL,
score=0.0,
confidence=0.0,
details={"positive": 0, "negative": 0}
)
score = (positive_count - negative_count) / total
confidence = total / len(tokens) if tokens else 0
if score > 0.1:
sentiment = Sentiment.POSITIVE
elif score < -0.1:
sentiment = Sentiment.NEGATIVE
else:
sentiment = Sentiment.NEUTRAL
return SentimentResult(
sentiment=sentiment,
score=score,
confidence=confidence,
details={"positive": positive_count, "negative": negative_count}
)
def _analyze_with_scores(self, tokens: List[str]) -> SentimentResult:
scores = [self.word_scores.get(t, 0) for t in tokens]
if not scores:
return SentimentResult(
sentiment=Sentiment.NEUTRAL,
score=0.0,
confidence=0.0,
details={"compound": 0}
)
compound = sum(scores) / len(scores)
if compound >= 0.05:
sentiment = Sentiment.POSITIVE
elif compound <= -0.05:
sentiment = Sentiment.NEGATIVE
else:
sentiment = Sentiment.NEUTRAL
return SentimentResult(
sentiment=sentiment,
score=compound,
confidence=abs(compound),
details={"compound": compound}
)
class TextClassifier:
def __init__(self):
self.vocabulary: Dict[str, int] = {}
self.classes: List[str] = []
self.class_priors: Dict[str, float] = {}
self.word_probs: Dict[str, Dict[str, float]] = {}
def fit(self, documents: List[str], labels: List[str]) -> "TextClassifier":
self.classes = list(set(labels))
class_docs: Dict[str, List[str]] = {c: [] for c in self.classes}
all_words = set()
for doc, label in zip(documents, labels):
words = doc.lower().split()
class_docs[label].extend(words)
all_words.update(words)
self.vocabulary = {word: idx for idx, word in enumerate(sorted(all_words))}
total_docs = len(documents)
for cls in self.classes:
self.class_priors[cls] = labels.count(cls) / total_docs
vocab_size = len(self.vocabulary)
for cls in self.classes:
words = class_docs[cls]
word_counts = Counter(words)
total_words = len(words)
self.word_probs[cls] = {}
for word in self.vocabulary:
count = word_counts.get(word, 0)
self.word_probs[cls][word] = (count + 1) / (total_words + vocab_size)
return self
def predict(self, document: str) -> Tuple[str, Dict[str, float]]:
words = document.lower().split()
scores = {}
for cls in self.classes:
score = math.log(self.class_priors[cls])
for word in words:
if word in self.vocabulary:
score += math.log(self.word_probs[cls][word])
scores[cls] = score
best_class = max(scores, key=scores.get)
max_score = max(scores.values())
exp_scores = {cls: math.exp(score - max_score) for cls, score in scores.items()}
total = sum(exp_scores.values())
probabilities = {cls: exp_score / total for cls, exp_score in exp_scores.items()}
return best_class, probabilities
def predict_batch(self, documents: List[str]) -> List[Tuple[str, Dict[str, float]]]:
return [self.predict(doc) for doc in documents]34.3.2 主题建模
python
class LDATopicModel:
def __init__(self, n_topics: int = 10, alpha: float = 0.1, beta: float = 0.1):
self.n_topics = n_topics
self.alpha = alpha
self.beta = beta
self.vocabulary: Dict[str, int] = {}
self.topic_word_dist: Optional[np.ndarray] = None
self.doc_topic_dist: Optional[np.ndarray] = None
def fit(
self,
documents: List[List[str]],
n_iterations: int = 1000,
random_state: int = 42
) -> "LDATopicModel":
np.random.seed(random_state)
all_words = set()
for doc in documents:
all_words.update(doc)
self.vocabulary = {word: idx for idx, word in enumerate(sorted(all_words))}
vocab_size = len(self.vocabulary)
n_docs = len(documents)
doc_word_ids = []
for doc in documents:
word_ids = [self.vocabulary[word] for word in doc if word in self.vocabulary]
doc_word_ids.append(word_ids)
topic_assignments = []
for doc_words in doc_word_ids:
topics = np.random.randint(0, self.n_topics, len(doc_words))
topic_assignments.append(topics)
doc_topic_counts = np.zeros((n_docs, self.n_topics))
topic_word_counts = np.zeros((self.n_topics, vocab_size))
topic_counts = np.zeros(self.n_topics)
for d, (doc_words, doc_topics) in enumerate(zip(doc_word_ids, topic_assignments)):
for w, t in zip(doc_words, doc_topics):
doc_topic_counts[d, t] += 1
topic_word_counts[t, w] += 1
topic_counts[t] += 1
for _ in range(n_iterations):
for d, (doc_words, doc_topics) in enumerate(zip(doc_word_ids, topic_assignments)):
for i, (w, t) in enumerate(zip(doc_words, doc_topics)):
doc_topic_counts[d, t] -= 1
topic_word_counts[t, w] -= 1
topic_counts[t] -= 1
probs = (doc_topic_counts[d] + self.alpha) * \
(topic_word_counts[:, w] + self.beta) / \
(topic_counts + vocab_size * self.beta)
new_t = np.random.choice(self.n_topics, p=probs / probs.sum())
doc_topics[i] = new_t
doc_topic_counts[d, new_t] += 1
topic_word_counts[new_t, w] += 1
topic_counts[new_t] += 1
self.topic_word_dist = (topic_word_counts + self.beta) / \
(topic_counts[:, np.newaxis] + vocab_size * self.beta)
self.doc_topic_dist = (doc_topic_counts + self.alpha) / \
(doc_topic_counts.sum(axis=1, keepdims=True) + self.n_topics * self.alpha)
return self
def get_topic_words(self, topic_id: int, top_n: int = 10) -> List[Tuple[str, float]]:
if self.topic_word_dist is None:
return []
inv_vocab = {idx: word for word, idx in self.vocabulary.items()}
topic_probs = self.topic_word_dist[topic_id]
top_indices = np.argsort(topic_probs)[::-1][:top_n]
return [(inv_vocab[idx], topic_probs[idx]) for idx in top_indices]
def get_document_topics(self, doc_id: int) -> List[Tuple[int, float]]:
if self.doc_topic_dist is None:
return []
probs = self.doc_topic_dist[doc_id]
return [(t, probs[t]) for t in range(self.n_topics) if probs[t] > 0.01]
def get_all_topics(self, top_n: int = 10) -> Dict[int, List[Tuple[str, float]]]:
return {t: self.get_topic_words(t, top_n) for t in range(self.n_topics)}34.4 命名实体识别
34.4.1 基于规则的NER
python
from typing import List, Tuple, Dict, Optional
from dataclasses import dataclass
from enum import Enum
import re
class EntityType(Enum):
PERSON = "PERSON"
ORGANIZATION = "ORGANIZATION"
LOCATION = "LOCATION"
DATE = "DATE"
TIME = "TIME"
MONEY = "MONEY"
PERCENT = "PERCENT"
EMAIL = "EMAIL"
PHONE = "PHONE"
URL = "URL"
@dataclass
class Entity:
text: str
entity_type: EntityType
start: int
end: int
confidence: float = 1.0
class RuleBasedNER:
def __init__(self):
self._patterns: Dict[EntityType, List[str]] = {}
self._gazetteers: Dict[EntityType, Set[str]] = {}
self._compile_patterns()
def _compile_patterns(self) -> None:
self._compiled_patterns = {
EntityType.EMAIL: re.compile(r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b"),
EntityType.PHONE: re.compile(r"\b\d{3}[-.]?\d{3}[-.]?\d{4}\b"),
EntityType.URL: re.compile(r"https?://[^\s]+"),
EntityType.DATE: re.compile(
r"\b\d{1,2}[/-]\d{1,2}[/-]\d{2,4}\b|"
r"\b(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)[a-z]*\s+\d{1,2},?\s+\d{4}\b"
),
EntityType.TIME: re.compile(r"\b\d{1,2}:\d{2}(?::\d{2})?(?:\s*[AP]M)?\b"),
EntityType.MONEY: re.compile(r"\$\d+(?:,\d{3})*(?:\.\d{2})?|\d+\s*(?:dollars?|euros?|yen)\b"),
EntityType.PERCENT: re.compile(r"\d+(?:\.\d+)?%")
}
def add_gazetteer(self, entity_type: EntityType, entities: List[str]) -> None:
if entity_type not in self._gazetteers:
self._gazetteers[entity_type] = set()
self._gazetteers[entity_type].update(e.lower() for e in entities)
def extract(self, text: str) -> List[Entity]:
entities = []
for entity_type, pattern in self._compiled_patterns.items():
for match in pattern.finditer(text):
entities.append(Entity(
text=match.group(),
entity_type=entity_type,
start=match.start(),
end=match.end()
))
for entity_type, gazetteer in self._gazetteers.items():
for entity_text in gazetteer:
pattern = re.compile(r"\b" + re.escape(entity_text) + r"\b", re.IGNORECASE)
for match in pattern.finditer(text):
entities.append(Entity(
text=match.group(),
entity_type=entity_type,
start=match.start(),
end=match.end()
))
entities.sort(key=lambda e: e.start)
return self._remove_overlaps(entities)
def _remove_overlaps(self, entities: List[Entity]) -> List[Entity]:
if not entities:
return []
filtered = [entities[0]]
for entity in entities[1:]:
if entity.start >= filtered[-1].end:
filtered.append(entity)
return filtered
class SpacyNER:
def __init__(self, model: str = "en_core_web_sm"):
self._available = False
try:
import spacy
self.nlp = spacy.load(model)
self._available = True
except ImportError:
pass
def extract(self, text: str) -> List[Entity]:
if not self._available:
return []
doc = self.nlp(text)
entities = []
for ent in doc.ents:
entity_type = self._map_entity_type(ent.label_)
entities.append(Entity(
text=ent.text,
entity_type=entity_type,
start=ent.start_char,
end=ent.end_char
))
return entities
@staticmethod
def _map_entity_type(spacy_label: str) -> EntityType:
mapping = {
"PERSON": EntityType.PERSON,
"ORG": EntityType.ORGANIZATION,
"GPE": EntityType.LOCATION,
"LOC": EntityType.LOCATION,
"DATE": EntityType.DATE,
"TIME": EntityType.TIME,
"MONEY": EntityType.MONEY,
"PERCENT": EntityType.PERCENT
}
return mapping.get(spacy_label, EntityType.PERSON)34.5 文本相似度
34.5.1 相似度计算
python
class TextSimilarity:
@staticmethod
def jaccard_similarity(set1: Set[str], set2: Set[str]) -> float:
if not set1 and not set2:
return 1.0
intersection = len(set1 & set2)
union = len(set1 | set2)
return intersection / union if union > 0 else 0.0
@staticmethod
def dice_similarity(set1: Set[str], set2: Set[str]) -> float:
if not set1 and not set2:
return 1.0
intersection = len(set1 & set2)
return 2 * intersection / (len(set1) + len(set2)) if (len(set1) + len(set2)) > 0 else 0.0
@staticmethod
def cosine_similarity(vec1: List[float], vec2: List[float]) -> float:
vec1 = np.array(vec1)
vec2 = np.array(vec2)
dot_product = np.dot(vec1, vec2)
norm1 = np.linalg.norm(vec1)
norm2 = np.linalg.norm(vec2)
if norm1 == 0 or norm2 == 0:
return 0.0
return float(dot_product / (norm1 * norm2))
@staticmethod
def euclidean_distance(vec1: List[float], vec2: List[float]) -> float:
return float(np.linalg.norm(np.array(vec1) - np.array(vec2)))
@staticmethod
def manhattan_distance(vec1: List[float], vec2: List[float]) -> float:
return float(np.sum(np.abs(np.array(vec1) - np.array(vec2))))
class EditDistance:
@staticmethod
def levenshtein_distance(s1: str, s2: str) -> int:
if len(s1) < len(s2):
return EditDistance.levenshtein_distance(s2, s1)
if len(s2) == 0:
return len(s1)
previous_row = range(len(s2) + 1)
for i, c1 in enumerate(s1):
current_row = [i + 1]
for j, c2 in enumerate(s2):
insertions = previous_row[j + 1] + 1
deletions = current_row[j] + 1
substitutions = previous_row[j] + (c1 != c2)
current_row.append(min(insertions, deletions, substitutions))
previous_row = current_row
return previous_row[-1]
@staticmethod
def levenshtein_similarity(s1: str, s2: str) -> float:
distance = EditDistance.levenshtein_distance(s1, s2)
max_len = max(len(s1), len(s2))
return 1 - distance / max_len if max_len > 0 else 1.0
@staticmethod
def damerau_levenshtein_distance(s1: str, s2: str) -> int:
d = {}
lenstr1 = len(s1)
lenstr2 = len(s2)
for i in range(-1, lenstr1 + 1):
d[(i, -1)] = i + 1
for j in range(-1, lenstr2 + 1):
d[(-1, j)] = j + 1
for i in range(lenstr1):
for j in range(lenstr2):
if s1[i] == s2[j]:
cost = 0
else:
cost = 1
d[(i, j)] = min(
d[(i - 1, j)] + 1,
d[(i, j - 1)] + 1,
d[(i - 1, j - 1)] + cost
)
if i and j and s1[i] == s2[j - 1] and s1[i - 1] == s2[j]:
d[(i, j)] = min(d[(i, j)], d[(i - 2, j - 2)] + cost)
return d[(lenstr1 - 1, lenstr2 - 1)]
@staticmethod
def jaro_winkler_similarity(s1: str, s2: str) -> float:
if s1 == s2:
return 1.0
len1 = len(s1)
len2 = len(s2)
if len1 == 0 or len2 == 0:
return 0.0
match_distance = max(len1, len2) // 2 - 1
if match_distance < 0:
match_distance = 0
s1_matches = [False] * len1
s2_matches = [False] * len2
matches = 0
transpositions = 0
for i in range(len1):
start = max(0, i - match_distance)
end = min(i + match_distance + 1, len2)
for j in range(start, end):
if s2_matches[j] or s1[i] != s2[j]:
continue
s1_matches[i] = True
s2_matches[j] = True
matches += 1
break
if matches == 0:
return 0.0
k = 0
for i in range(len1):
if not s1_matches[i]:
continue
while not s2_matches[k]:
k += 1
if s1[i] != s2[k]:
transpositions += 1
k += 1
jaro = (matches / len1 + matches / len2 + (matches - transpositions / 2) / matches) / 3
prefix = 0
for i in range(min(len1, len2, 4)):
if s1[i] == s2[i]:
prefix += 1
else:
break
return jaro + prefix * 0.1 * (1 - jaro)34.6 知识图谱
34.6.1 NLP技术体系
自然语言处理技术层次
┌─────────────────────────────────────────────────────────────┐
│ 应用层 │
│ 机器翻译、问答系统、情感分析、文本摘要、对话系统 │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ 算法层 │
│ 分类、序列标注、生成、匹配、聚类 │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ 表示层 │
│ 词袋、TF-IDF、词向量、上下文嵌入 │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ 预处理层 │
│ 分词、清洗、规范化、标注 │
└─────────────────────────────────────────────────────────────┘
NLP工具链:
┌─────────────────────────────────────────┐
│ NLTK 经典NLP工具包 │
│ spaCy 工业级NLP库 │
│ Transformers 预训练模型 │
│ jieba 中文分词 │
│ Gensim 主题模型 │
└─────────────────────────────────────────┘34.6.2 文本处理流程
NLP标准处理流程
┌─────────────────────────────────────────┐
│ 1. 文本获取 爬虫、API、文件 │
│ 2. 文本清洗 去除噪声、HTML标签 │
│ 3. 分词 中文/英文分词 │
│ 4. 规范化 词干提取、词形还原 │
│ 5. 停用词过滤 去除无意义词 │
│ 6. 特征提取 TF-IDF、词向量 │
│ 7. 模型训练 分类、聚类等 │
│ 8. 评估优化 准确率、召回率 │
└─────────────────────────────────────────┘34.7 技术选型指南
34.7.1 NLP库选型
| 场景 | 推荐库 | 原因 |
|---|---|---|
| 教学/研究 | NLTK | 功能全面 |
| 生产环境 | spaCy | 性能优秀 |
| 中文处理 | jieba + spaCy | 中文支持好 |
| 深度学习 | Transformers | 预训练模型 |
34.7.2 分词工具选型
| 语言 | 推荐工具 | 说明 |
|---|---|---|
| 中文 | jieba | 简单易用 |
| 中文 | HanLP | 功能丰富 |
| 英文 | spaCy | 工业级 |
| 多语言 | stanza | 斯坦福NLP |
34.7.3 词向量选型
| 场景 | 推荐方案 | 说明 |
|---|---|---|
| 静态词向量 | Word2Vec/FastText | 训练快 |
| 上下文向量 | BERT | 效果好 |
| 领域特定 | 微调预训练模型 | 定制化 |
34.8 常见问题与解决方案
34.8.1 中文分词问题
python
# 问题:中文分词不准确
# 解决方案:使用自定义词典
import jieba
# 添加自定义词
jieba.add_word('Python编程')
jieba.add_word('机器学习')
# 或加载词典文件
jieba.load_userdict('custom_dict.txt')
text = "Python编程是机器学习的基础"
words = jieba.lcut(text)34.8.2 编码问题
python
# 问题:文本编码错误
# 解决方案:统一编码
import chardet
def safe_decode(content):
encoding = chardet.detect(content)['encoding']
return content.decode(encoding or 'utf-8', errors='ignore')34.8.3 内存问题
python
# 问题:大文本处理内存溢出
# 解决方案:流式处理
def process_large_file(filepath):
with open(filepath, 'r', encoding='utf-8') as f:
for line in f:
yield process_line(line)
# 使用生成器
for processed in process_large_file('large.txt'):
save_result(processed)34.9 本章小结
本章详细介绍了Python自然语言处理的核心概念和实践:
- 文本预处理:文本清洗、分词、词干提取、词形还原
- 文本表示:词袋模型、TF-IDF、词向量
- 文本分类:情感分析、朴素贝叶斯分类器
- 主题建模:LDA主题模型
- 命名实体识别:基于规则和基于模型的NER
- 文本相似度:Jaccard、余弦相似度、编辑距离
- 应用实例:完整的NLP处理流程
练习题
- 实现一个文本预处理流水线,支持多种清洗和规范化操作
- 开发一个垃圾邮件分类器,使用TF-IDF和朴素贝叶斯
- 实现一个简单的问答系统,基于文本相似度匹配
- 开发一个关键词提取工具,使用TF-IDF和TextRank算法
- 实现一个文档摘要生成器,提取文档的关键句子