第15章 解释器模式
学习目标
完成本章学习后,读者将能够:
- 理解解释器模式的核心概念与形式化定义
- 掌握文法(Grammar)与抽象语法树(AST)的构建方法
- 实现词法分析器(Lexer)与语法分析器(Parser)
- 设计领域特定语言(DSL)解释器
- 分析解释器模式与编译原理的关系
- 识别解释器模式的适用场景与替代方案
15.1 模式定义
15.1.1 正式定义
解释器模式(Interpreter Pattern) 给定一个语言,定义它的文法的一种表示,并定义一个解释器,这个解释器使用该表示来解释语言中的句子。
$$\text{Interpret}: \mathcal{L} \times \mathcal{G} \rightarrow \mathcal{R}$$
其中:
- $\mathcal{L}$ 表示语言(Language)
- $\mathcal{G}$ 表示文法(Grammar)
- $\mathcal{R}$ 表示解释结果(Result)
文法定义(BNF范式):
$$\langle S \rangle ::= \langle A \rangle \mid \langle B \rangle \mid \text{terminal}$$
语法树构建:
$$\text{AST}(s) = \text{Parse}(\text{Tokenize}(s))$$
解释执行:
$$\text{Evaluate}(\text{AST}, \text{Context}) = \text{Result}$$
15.1.2 历史背景与学术脉络
| 时期 | 发展阶段 | 关键贡献 | 代表人物/文献 |
|---|---|---|---|
| 1960 | 编译原理 | 形式语言与自动机理论 | Noam Chomsky |
| 1968 | 编译器设计 | 递归下降解析器 | Niklaus Wirth |
| 1975 | 解释器实现 | LISP解释器 | John McCarthy |
| 1986 | Smalltalk | 解释器模式原型 | Smalltalk Team |
| 1994 | GoF正式定义 | 《设计模式》收录为行为型模式 | Gamma, Helm, Johnson, Vlissides |
| 1995 | 解析器生成 | ANTLR、YACC等工具 | Terence Parr |
| 2000 | DSL兴起 | 领域特定语言 | Martin Fowler |
| 2010 | 函数式解析 | Parser Combinators | Haskell Community |
| 2015 | Python应用 | ast模块、解析库 | Python Community |
| 2020 | 现代演进 | 类型安全的解释器 | Rust/TypeScript |
15.1.3 模式动机
问题场景:当需要解释执行一种特定语言时,面临以下挑战:
- 文法复杂:语言规则多样,需要结构化表示
- 扩展需求:需要动态添加新的语法规则
- 执行效率:需要高效解释执行
- 可读性:代码需要清晰反映文法结构
解决方案:将文法规则映射为类层次结构:
┌─────────────────────────────────────────────────────────────────────────┐
│ 解释器模式架构 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ 源代码字符串 │
│ │ │
│ ▼ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Lexer │─────▶│ Parser │─────▶│ AST │ │
│ │ (词法分析) │ │ (语法分析) │ │ (抽象语法树) │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────┐ │
│ │ Interpreter │ │
│ │ (解释器) │ │
│ └──────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────┐ │
│ │ Context │ │
│ │ (上下文环境) │ │
│ └──────────────┘ │
│ │
│ 核心组件: │
│ • Lexer: 将源代码分解为Token序列 │
│ • Parser: 根据文法规则构建AST │
│ • AST: 抽象语法树,表示程序结构 │
│ • Interpreter: 遍历AST执行解释 │
│ • Context: 存储解释过程中的环境信息 │
│ │
└─────────────────────────────────────────────────────────────────────────┘15.1.4 解释器模式分类
| 类型 | 特征 | 示例 |
|---|---|---|
| 语法树解释器 | 直接遍历AST执行 | 表达式计算器 |
| 字节码解释器 | 编译为字节码后执行 | Python、Java |
| AST转换器 | 将AST转换为其他形式 | 编译器前端 |
| 规则引擎 | 解释执行业务规则 | Drools、规则脚本 |
| 查询解释器 | 解释查询语言 | SQL解析器 |
15.2 理论基础
15.2.1 UML结构模型
┌─────────────────────────────────────────────────────────────────────────┐
│ 解释器模式结构图 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────┐ ┌──────────────────────────┐ │
│ │ <<abstract>> │ │ Context │ │
│ │ AbstractExpression │ ├──────────────────────────┤ │
│ ├─────────────────────────────┤ │ - variables: Dict │ │
│ │ + interpret(context): R │◄────────│ - output: Any │ │
│ └─────────────────────────────┘ uses │ + set_variable(k, v) │ │
│ △ │ + get_variable(k) │ │
│ │ └──────────────────────────┘ │
│ ┌────────┴────────┐ │
│ │ │ │
│ ┌──┴──────────┐ ┌───┴──────────┐ ┌──────────────────┐ │
│ │ Terminal │ │ NonTerminal │ │ Client │ │
│ │ Expression │ │ Expression │ ├──────────────────┤ │
│ ├─────────────┤ ├──────────────┤ │ + build_ast() │ │
│ │ - value: T │ │ - children │ │ + interpret() │ │
│ ├─────────────┤ ├──────────────┤ └──────────────────┘ │
│ │ + interpret │ │ + interpret()│ │
│ └─────────────┘ └──────────────┘ │
│ │
│ 终结符表达式: │
│ • Number: 数字常量 │
│ • Variable: 变量引用 │
│ • Literal: 字面量 │
│ │
│ 非终结符表达式: │
│ • BinaryOp: 二元运算 │
│ • UnaryOp: 一元运算 │
│ • FunctionCall: 函数调用 │
│ │
└─────────────────────────────────────────────────────────────────────────┘15.2.2 参与者职责
| 参与者 | 职责 | Python实现要点 |
|---|---|---|
| AbstractExpression | 声明解释接口 | ABC或Protocol |
| TerminalExpression | 实现终结符解释 | 叶子节点,无子节点 |
| NonterminalExpression | 实现非终结符解释 | 组合节点,包含子表达式 |
| Context | 存储解释环境 | 全局状态、变量表 |
| Client | 构建AST并调用解释 | 解析器、主程序 |
15.2.3 文法与AST的关系
┌─────────────────────────────────────────────────────────────────────────┐
│ 文法规则到AST映射 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ 文法规则 (BNF): │
│ │
│ <expr> ::= <term> (('+' | '-') <term>)* │
│ <term> ::= <factor> (('*' | '/') <factor>)* │
│ <factor> ::= NUMBER | '(' <expr> ')' | <variable> │
│ <variable>::= IDENTIFIER │
│ │
│ 表达式: "2 + 3 * 4" │
│ │
│ 对应的AST: │
│ │
│ ┌─────┐ │
│ │ + │ │
│ └──┬──┘ │
│ ┌───┴───┐ │
│ │ │ │
│ ┌──┴──┐ ┌─┴──┐ │
│ │ 2 │ │ * │ │
│ └─────┘ └──┬─┘ │
│ ┌──┴──┐ │
│ │ │ │
│ ┌──┴─┐ ┌──┴──┐ │
│ │ 3 │ │ 4 │ │
│ └────┘ └─────┘ │
│ │
│ 解释过程: │
│ 1. 遍历AST │
│ 2. 先计算子节点(后序遍历) │
│ 3. 3 * 4 = 12 │
│ 4. 2 + 12 = 14 │
│ │
└─────────────────────────────────────────────────────────────────────────┘15.3 Python实现
15.3.1 标准ABC实现
from abc import ABC, abstractmethod
from typing import Dict, Any, Optional
from dataclasses import dataclass
class Context:
"""解释上下文:存储变量和环境信息"""
def __init__(self):
self._variables: Dict[str, Any] = {}
self._output: list = []
def set_variable(self, name: str, value: Any) -> None:
self._variables[name] = value
def get_variable(self, name: str) -> Any:
if name not in self._variables:
raise NameError(f"未定义的变量: {name}")
return self._variables[name]
def has_variable(self, name: str) -> bool:
return name in self._variables
def output(self, value: Any) -> None:
self._output.append(value)
def get_output(self) -> list:
return self._output
class Expression(ABC):
"""抽象表达式:声明解释接口"""
@abstractmethod
def interpret(self, context: Context) -> Any:
pass
@abstractmethod
def __str__(self) -> str:
pass
@dataclass
class Number(Expression):
"""终结符表达式:数字常量"""
_value: float
def interpret(self, context: Context) -> float:
return self._value
def __str__(self) -> str:
return str(self._value)
@dataclass
class Variable(Expression):
"""终结符表达式:变量引用"""
_name: str
def interpret(self, context: Context) -> Any:
return context.get_variable(self._name)
def __str__(self) -> str:
return self._name
class BinaryExpression(Expression):
"""非终结符表达式:二元运算"""
def __init__(self, left: Expression, operator: str, right: Expression):
self._left = left
self._operator = operator
self._right = right
def interpret(self, context: Context) -> Any:
left_val = self._left.interpret(context)
right_val = self._right.interpret(context)
operators = {
'+': lambda a, b: a + b,
'-': lambda a, b: a - b,
'*': lambda a, b: a * b,
'/': lambda a, b: a / b if b != 0 else float('inf'),
'%': lambda a, b: a % b,
'**': lambda a, b: a ** b,
}
if self._operator not in operators:
raise ValueError(f"未知运算符: {self._operator}")
return operators[self._operator](left_val, right_val)
def __str__(self) -> str:
return f"({self._left} {self._operator} {self._right})"
class UnaryExpression(Expression):
"""非终结符表达式:一元运算"""
def __init__(self, operator: str, operand: Expression):
self._operator = operator
self._operand = operand
def interpret(self, context: Context) -> Any:
val = self._operand.interpret(context)
operators = {
'-': lambda x: -x,
'+': lambda x: +x,
'abs': lambda x: abs(x),
}
if self._operator not in operators:
raise ValueError(f"未知运算符: {self._operator}")
return operators[self._operator](val)
def __str__(self) -> str:
return f"({self._operator} {self._operand})"
class FunctionCall(Expression):
"""非终结符表达式:函数调用"""
def __init__(self, name: str, args: list[Expression]):
self._name = name
self._args = args
def interpret(self, context: Context) -> Any:
import math
arg_values = [arg.interpret(context) for arg in self._args]
functions = {
'sin': lambda x: math.sin(x),
'cos': lambda x: math.cos(x),
'sqrt': lambda x: math.sqrt(x),
'log': lambda x: math.log(x),
'max': lambda *args: max(args),
'min': lambda *args: min(args),
}
if self._name not in functions:
raise NameError(f"未定义的函数: {self._name}")
return functions[self._name](*arg_values)
def __str__(self) -> str:
args_str = ", ".join(str(arg) for arg in self._args)
return f"{self._name}({args_str})"
if __name__ == "__main__":
context = Context()
context.set_variable("x", 10)
context.set_variable("y", 5)
expr = BinaryExpression(
Variable("x"),
"+",
BinaryExpression(
Number(2),
"*",
Variable("y")
)
)
print(f"表达式: {expr}")
print(f"结果: {expr.interpret(context)}")
expr2 = FunctionCall("sqrt", [
BinaryExpression(Variable("x"), "+", Variable("y"))
])
print(f"\n表达式: {expr2}")
print(f"结果: {expr2.interpret(context)}")15.3.2 完整的词法分析与语法分析
from typing import List, Optional, Any, Tuple
from dataclasses import dataclass
from enum import Enum, auto
import re
class TokenType(Enum):
"""Token类型"""
NUMBER = auto()
IDENTIFIER = auto()
PLUS = auto()
MINUS = auto()
MULTIPLY = auto()
DIVIDE = auto()
POWER = auto()
LPAREN = auto()
RPAREN = auto()
COMMA = auto()
ASSIGN = auto()
EOF = auto()
@dataclass
class Token:
"""词法单元"""
type: TokenType
value: Any
position: int = 0
def __repr__(self) -> str:
return f"Token({self.type.name}, {self.value!r})"
class Lexer:
"""词法分析器:将源代码分解为Token序列"""
def __init__(self, text: str):
self._text = text
self._pos = 0
self._tokens: List[Token] = []
def tokenize(self) -> List[Token]:
"""执行词法分析"""
self._tokens = []
token_specs = [
(TokenType.NUMBER, r'\d+(\.\d+)?'),
(TokenType.IDENTIFIER, r'[a-zA-Z_][a-zA-Z0-9_]*'),
(TokenType.PLUS, r'\+'),
(TokenType.MINUS, r'-'),
(TokenType.MULTIPLY, r'\*'),
(TokenType.DIVIDE, r'/'),
(TokenType.POWER, r'\*\*'),
(TokenType.LPAREN, r'\('),
(TokenType.RPAREN, r'\)'),
(TokenType.COMMA, r','),
(TokenType.ASSIGN, r'='),
]
patterns = []
for token_type, pattern in token_specs:
patterns.append(f'(?P<{token_type.name}>{pattern})')
regex = '|'.join(patterns)
skip_pattern = r'[ \t]+'
pos = 0
while pos < len(self._text):
match = re.match(skip_pattern, self._text[pos:])
if match:
pos += match.end()
continue
match = re.match(regex, self._text[pos:])
if match:
for token_type in TokenType:
value = match.group(token_type.name)
if value is not None:
if token_type == TokenType.NUMBER:
value = float(value) if '.' in value else int(value)
self._tokens.append(Token(token_type, value, pos))
break
pos += match.end()
else:
raise SyntaxError(f"未知字符: {self._text[pos]!r} at position {pos}")
self._tokens.append(Token(TokenType.EOF, None, pos))
return self._tokens
class ASTNode:
"""抽象语法树节点基类"""
def evaluate(self, context: dict) -> Any:
raise NotImplementedError
def __str__(self) -> str:
return self.__class__.__name__
@dataclass
class NumberNode(ASTNode):
"""数字节点"""
value: float
def evaluate(self, context: dict) -> float:
return self.value
def __str__(self) -> str:
return str(self.value)
@dataclass
class VariableNode(ASTNode):
"""变量节点"""
name: str
def evaluate(self, context: dict) -> Any:
if self.name not in context:
raise NameError(f"未定义的变量: {self.name}")
return context[self.name]
def __str__(self) -> str:
return self.name
@dataclass
class BinaryOpNode(ASTNode):
"""二元运算节点"""
left: ASTNode
operator: str
right: ASTNode
def evaluate(self, context: dict) -> Any:
left = self.left.evaluate(context)
right = self.right.evaluate(context)
ops = {
'+': lambda a, b: a + b,
'-': lambda a, b: a - b,
'*': lambda a, b: a * b,
'/': lambda a, b: a / b,
'**': lambda a, b: a ** b,
}
return ops[self.operator](left, right)
def __str__(self) -> str:
return f"({self.left} {self.operator} {self.right})"
@dataclass
class UnaryOpNode(ASTNode):
"""一元运算节点"""
operator: str
operand: ASTNode
def evaluate(self, context: dict) -> Any:
val = self.operand.evaluate(context)
if self.operator == '-':
return -val
return val
def __str__(self) -> str:
return f"({self.operator}{self.operand})"
@dataclass
class FunctionCallNode(ASTNode):
"""函数调用节点"""
name: str
args: List[ASTNode]
def evaluate(self, context: dict) -> Any:
import math
arg_values = [arg.evaluate(context) for arg in self.args]
functions = {
'sin': math.sin,
'cos': math.cos,
'tan': math.tan,
'sqrt': math.sqrt,
'abs': abs,
'log': math.log,
'exp': math.exp,
'floor': math.floor,
'ceil': math.ceil,
'round': round,
'max': max,
'min': min,
}
if self.name not in functions:
raise NameError(f"未定义的函数: {self.name}")
return functions[self.name](*arg_values)
def __str__(self) -> str:
args = ", ".join(str(arg) for arg in self.args)
return f"{self.name}({args})"
@dataclass
class AssignmentNode(ASTNode):
"""赋值节点"""
name: str
value: ASTNode
def evaluate(self, context: dict) -> Any:
result = self.value.evaluate(context)
context[self.name] = result
return result
def __str__(self) -> str:
return f"{self.name} = {self.value}"
class Parser:
"""语法分析器:构建抽象语法树"""
def __init__(self, tokens: List[Token]):
self._tokens = tokens
self._pos = 0
def parse(self) -> ASTNode:
"""执行语法分析"""
result = self._assignment()
if self._current().type != TokenType.EOF:
raise SyntaxError(f"意外的Token: {self._current()}")
return result
def _current(self) -> Token:
return self._tokens[self._pos] if self._pos < len(self._tokens) else Token(TokenType.EOF, None)
def _consume(self, expected: TokenType) -> Token:
token = self._current()
if token.type != expected:
raise SyntaxError(f"期望 {expected.name}, 但得到 {token.type.name}")
self._pos += 1
return token
def _match(self, *types: TokenType) -> bool:
return self._current().type in types
def _assignment(self) -> ASTNode:
"""解析赋值表达式"""
if self._match(TokenType.IDENTIFIER):
if self._pos + 1 < len(self._tokens) and self._tokens[self._pos + 1].type == TokenType.ASSIGN:
name = self._consume(TokenType.IDENTIFIER).value
self._consume(TokenType.ASSIGN)
value = self._additive()
return AssignmentNode(name, value)
return self._additive()
def _additive(self) -> ASTNode:
"""解析加减表达式"""
node = self._multiplicative()
while self._match(TokenType.PLUS, TokenType.MINUS):
op = self._consume(self._current().type).value
right = self._multiplicative()
node = BinaryOpNode(node, op, right)
return node
def _multiplicative(self) -> ASTNode:
"""解析乘除表达式"""
node = self._power()
while self._match(TokenType.MULTIPLY, TokenType.DIVIDE):
op = self._consume(self._current().type).value
right = self._power()
node = BinaryOpNode(node, op, right)
return node
def _power(self) -> ASTNode:
"""解析幂运算表达式"""
node = self._unary()
if self._match(TokenType.POWER):
op = self._consume(TokenType.POWER).value
right = self._power()
node = BinaryOpNode(node, op, right)
return node
def _unary(self) -> ASTNode:
"""解析一元表达式"""
if self._match(TokenType.MINUS):
self._consume(TokenType.MINUS)
operand = self._unary()
return UnaryOpNode('-', operand)
if self._match(TokenType.PLUS):
self._consume(TokenType.PLUS)
return self._unary()
return self._primary()
def _primary(self) -> ASTNode:
"""解析基本表达式"""
if self._match(TokenType.NUMBER):
return NumberNode(self._consume(TokenType.NUMBER).value)
if self._match(TokenType.IDENTIFIER):
name = self._consume(TokenType.IDENTIFIER).value
if self._match(TokenType.LPAREN):
self._consume(TokenType.LPAREN)
args = []
if not self._match(TokenType.RPAREN):
args.append(self._assignment())
while self._match(TokenType.COMMA):
self._consume(TokenType.COMMA)
args.append(self._assignment())
self._consume(TokenType.RPAREN)
return FunctionCallNode(name, args)
return VariableNode(name)
if self._match(TokenType.LPAREN):
self._consume(TokenType.LPAREN)
node = self._assignment()
self._consume(TokenType.RPAREN)
return node
raise SyntaxError(f"意外的Token: {self._current()}")
class Interpreter:
"""解释器:执行AST"""
def __init__(self):
self._context: dict = {}
def interpret(self, source: str) -> Any:
"""解释执行源代码"""
lexer = Lexer(source)
tokens = lexer.tokenize()
parser = Parser(tokens)
ast = parser.parse()
return ast.evaluate(self._context)
def get_context(self) -> dict:
return dict(self._context)
def set_variable(self, name: str, value: Any) -> None:
self._context[name] = value
if __name__ == "__main__":
interpreter = Interpreter()
expressions = [
"2 + 3 * 4",
"(2 + 3) * 4",
"x = 10",
"y = 5",
"x + y * 2",
"sqrt(16)",
"sin(0) + cos(0)",
"max(1, 2, 3, 4, 5)",
"2 ** 3 ** 2",
"-5 + 10",
]
for expr in expressions:
try:
result = interpreter.interpret(expr)
print(f"{expr} = {result}")
except Exception as e:
print(f"{expr} -> 错误: {e}")
print(f"\n当前上下文: {interpreter.get_context()}")15.3.3 使用Python ast模块
import ast
import operator
from typing import Any, Dict
import math
class SafeEvalVisitor(ast.NodeVisitor):
"""安全的表达式求值器"""
ALLOWED_NODES = {
ast.Expression, ast.BinOp, ast.UnaryOp, ast.Num, ast.Constant,
ast.Name, ast.Load, ast.Call, ast.keyword,
ast.Add, ast.Sub, ast.Mult, ast.Div, ast.Pow, ast.Mod,
ast.USub, ast.UAdd,
}
ALLOWED_FUNCTIONS = {
'abs': abs,
'round': round,
'min': min,
'max': max,
'sum': sum,
'len': len,
'sin': math.sin,
'cos': math.cos,
'tan': math.tan,
'sqrt': math.sqrt,
'log': math.log,
'exp': math.exp,
'floor': math.floor,
'ceil': math.ceil,
}
def __init__(self, variables: Dict[str, Any] = None):
self._variables = variables or {}
self._operators = {
ast.Add: operator.add,
ast.Sub: operator.sub,
ast.Mult: operator.mul,
ast.Div: operator.truediv,
ast.Pow: operator.pow,
ast.Mod: operator.mod,
}
self._unary_ops = {
ast.USub: operator.neg,
ast.UAdd: operator.pos,
}
def visit(self, node: ast.AST) -> Any:
if type(node) not in self.ALLOWED_NODES:
raise ValueError(f"不允许的节点类型: {type(node).__name__}")
return super().visit(node)
def visit_Expression(self, node: ast.Expression) -> Any:
return self.visit(node.body)
def visit_BinOp(self, node: ast.BinOp) -> Any:
left = self.visit(node.left)
right = self.visit(node.right)
op_func = self._operators.get(type(node.op))
if op_func is None:
raise ValueError(f"不支持的运算符: {type(node.op).__name__}")
return op_func(left, right)
def visit_UnaryOp(self, node: ast.UnaryOp) -> Any:
operand = self.visit(node.operand)
op_func = self._unary_ops.get(type(node.op))
if op_func is None:
raise ValueError(f"不支持的一元运算符: {type(node.op).__name__}")
return op_func(operand)
def visit_Num(self, node: ast.Num) -> Any:
return node.n
def visit_Constant(self, node: ast.Constant) -> Any:
if isinstance(node.value, (int, float)):
return node.value
raise ValueError(f"不支持的常量类型: {type(node.value)}")
def visit_Name(self, node: ast.Name) -> Any:
if node.id in self._variables:
return self._variables[node.id]
raise NameError(f"未定义的变量: {node.id}")
def visit_Call(self, node: ast.Call) -> Any:
if isinstance(node.func, ast.Name):
func_name = node.func.id
if func_name in self.ALLOWED_FUNCTIONS:
args = [self.visit(arg) for arg in node.args]
kwargs = {kw.arg: self.visit(kw.value) for kw in node.keywords if kw.arg}
return self.ALLOWED_FUNCTIONS[func_name](*args, **kwargs)
raise ValueError(f"不支持的函数调用")
def safe_eval(expression: str, variables: Dict[str, Any] = None) -> Any:
"""安全地计算表达式"""
tree = ast.parse(expression, mode='eval')
visitor = SafeEvalVisitor(variables)
return visitor.visit(tree)
if __name__ == "__main__":
variables = {"x": 10, "y": 5, "pi": 3.14159}
expressions = [
"2 + 3 * 4",
"(2 + 3) * 4",
"x + y",
"x ** 2 + y ** 2",
"sqrt(x ** 2 + y ** 2)",
"sin(pi / 2)",
"max(x, y, 15)",
"abs(-10)",
]
for expr in expressions:
try:
result = safe_eval(expr, variables)
print(f"{expr} = {result}")
except Exception as e:
print(f"{expr} -> 错误: {e}")15.4 企业级应用示例
15.4.1 规则引擎
from abc import ABC, abstractmethod
from typing import Dict, Any, List, Optional, Callable
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum, auto
import re
class ComparisonOperator(Enum):
"""比较运算符"""
EQ = auto()
NE = auto()
GT = auto()
GE = auto()
LT = auto()
LE = auto()
CONTAINS = auto()
STARTS_WITH = auto()
ENDS_WITH = auto()
MATCHES = auto()
class LogicalOperator(Enum):
"""逻辑运算符"""
AND = auto()
OR = auto()
NOT = auto()
@dataclass
class RuleContext:
"""规则上下文"""
facts: Dict[str, Any] = field(default_factory=dict)
metadata: Dict[str, Any] = field(default_factory=dict)
matched_rules: List[str] = field(default_factory=list)
actions: List[Dict[str, Any]] = field(default_factory=list)
class RuleExpression(ABC):
"""规则表达式基类"""
@abstractmethod
def evaluate(self, context: RuleContext) -> bool:
pass
@abstractmethod
def __str__(self) -> str:
pass
@dataclass
class FieldComparison(RuleExpression):
"""字段比较表达式"""
field: str
operator: ComparisonOperator
value: Any
def evaluate(self, context: RuleContext) -> bool:
if self.field not in context.facts:
return False
field_value = context.facts[self.field]
ops = {
ComparisonOperator.EQ: lambda a, b: a == b,
ComparisonOperator.NE: lambda a, b: a != b,
ComparisonOperator.GT: lambda a, b: a > b,
ComparisonOperator.GE: lambda a, b: a >= b,
ComparisonOperator.LT: lambda a, b: a < b,
ComparisonOperator.LE: lambda a, b: a <= b,
ComparisonOperator.CONTAINS: lambda a, b: b in a,
ComparisonOperator.STARTS_WITH: lambda a, b: str(a).startswith(str(b)),
ComparisonOperator.ENDS_WITH: lambda a, b: str(a).endswith(str(b)),
ComparisonOperator.MATCHES: lambda a, b: bool(re.match(str(b), str(a))),
}
return ops[self.operator](field_value, self.value)
def __str__(self) -> str:
op_symbols = {
ComparisonOperator.EQ: "==",
ComparisonOperator.NE: "!=",
ComparisonOperator.GT: ">",
ComparisonOperator.GE: ">=",
ComparisonOperator.LT: "<",
ComparisonOperator.LE: "<=",
ComparisonOperator.CONTAINS: "contains",
ComparisonOperator.STARTS_WITH: "starts_with",
ComparisonOperator.ENDS_WITH: "ends_with",
ComparisonOperator.MATCHES: "matches",
}
return f"{self.field} {op_symbols[self.operator]} {self.value!r}"
@dataclass
class LogicalExpression(RuleExpression):
"""逻辑表达式"""
operator: LogicalOperator
operands: List[RuleExpression]
def evaluate(self, context: RuleContext) -> bool:
if self.operator == LogicalOperator.AND:
return all(op.evaluate(context) for op in self.operands)
elif self.operator == LogicalOperator.OR:
return any(op.evaluate(context) for op in self.operands)
elif self.operator == LogicalOperator.NOT:
return not self.operands[0].evaluate(context)
return False
def __str__(self) -> str:
if self.operator == LogicalOperator.NOT:
return f"NOT {self.operands[0]}"
op_str = " AND " if self.operator == LogicalOperator.AND else " OR "
return f"({op_str.join(str(op) for op in self.operands)})"
@dataclass
class Rule:
"""规则定义"""
name: str
condition: RuleExpression
actions: List[Dict[str, Any]]
priority: int = 0
enabled: bool = True
def evaluate(self, context: RuleContext) -> bool:
if not self.enabled:
return False
return self.condition.evaluate(context)
def execute(self, context: RuleContext) -> None:
if self.evaluate(context):
context.matched_rules.append(self.name)
context.actions.extend(self.actions)
class RuleEngine:
"""规则引擎"""
def __init__(self):
self._rules: List[Rule] = []
def add_rule(self, rule: Rule) -> None:
self._rules.append(rule)
self._rules.sort(key=lambda r: r.priority, reverse=True)
def remove_rule(self, name: str) -> bool:
for i, rule in enumerate(self._rules):
if rule.name == name:
del self._rules[i]
return True
return False
def evaluate(self, facts: Dict[str, Any]) -> RuleContext:
context = RuleContext(facts=facts)
for rule in self._rules:
rule.execute(context)
return context
def get_rules(self) -> List[str]:
return [rule.name for rule in self._rules]
class RuleParser:
"""规则解析器"""
@staticmethod
def parse(condition_str: str) -> RuleExpression:
"""解析规则条件字符串"""
condition_str = condition_str.strip()
if " AND " in condition_str or " and " in condition_str:
parts = re.split(r'\s+AND\s+|\s+and\s+', condition_str)
operands = [RuleParser.parse(p.strip()) for p in parts]
return LogicalExpression(LogicalOperator.AND, operands)
if " OR " in condition_str or " or " in condition_str:
parts = re.split(r'\s+OR\s+|\s+or\s+', condition_str)
operands = [RuleParser.parse(p.strip()) for p in parts]
return LogicalExpression(LogicalOperator.OR, operands)
if condition_str.startswith("NOT ") or condition_str.startswith("not "):
operand = RuleParser.parse(condition_str[4:].strip())
return LogicalExpression(LogicalOperator.NOT, [operand])
op_patterns = [
(r'(\w+)\s*==\s*(.+)', ComparisonOperator.EQ),
(r'(\w+)\s*!=\s*(.+)', ComparisonOperator.NE),
(r'(\w+)\s*>=\s*(.+)', ComparisonOperator.GE),
(r'(\w+)\s*<=\s*(.+)', ComparisonOperator.LE),
(r'(\w+)\s*>\s*(.+)', ComparisonOperator.GT),
(r'(\w+)\s*<\s*(.+)', ComparisonOperator.LT),
(r'(\w+)\s+contains\s+(.+)', ComparisonOperator.CONTAINS),
(r'(\w+)\s+matches\s+(.+)', ComparisonOperator.MATCHES),
]
for pattern, op in op_patterns:
match = re.match(pattern, condition_str, re.IGNORECASE)
if match:
field = match.group(1)
value_str = match.group(2).strip()
value = RuleParser._parse_value(value_str)
return FieldComparison(field, op, value)
raise ValueError(f"无法解析条件: {condition_str}")
@staticmethod
def _parse_value(value_str: str) -> Any:
"""解析值"""
value_str = value_str.strip()
if value_str.startswith('"') and value_str.endswith('"'):
return value_str[1:-1]
if value_str.startswith("'") and value_str.endswith("'"):
return value_str[1:-1]
if value_str.lower() == 'true':
return True
if value_str.lower() == 'false':
return False
if value_str.lower() == 'null' or value_str.lower() == 'none':
return None
try:
if '.' in value_str:
return float(value_str)
return int(value_str)
except ValueError:
return value_str
if __name__ == "__main__":
engine = RuleEngine()
engine.add_rule(Rule(
name="high_value_customer",
condition=LogicalExpression(LogicalOperator.AND, [
FieldComparison("total_purchases", ComparisonOperator.GT, 10000),
FieldComparison("loyalty_years", ComparisonOperator.GE, 3),
]),
actions=[{"type": "discount", "value": 0.2}, {"type": "free_shipping"}],
priority=10
))
engine.add_rule(Rule(
name="new_customer_bonus",
condition=LogicalExpression(LogicalOperator.AND, [
FieldComparison("loyalty_years", ComparisonOperator.LT, 1),
FieldComparison("total_purchases", ComparisonOperator.GE, 100),
]),
actions=[{"type": "points", "value": 500}],
priority=5
))
engine.add_rule(Rule(
name="weekend_promo",
condition=FieldComparison("is_weekend", ComparisonOperator.EQ, True),
actions=[{"type": "discount", "value": 0.1}],
priority=1
))
customers = [
{"name": "Alice", "total_purchases": 15000, "loyalty_years": 5, "is_weekend": False},
{"name": "Bob", "total_purchases": 500, "loyalty_years": 0, "is_weekend": True},
{"name": "Charlie", "total_purchases": 2000, "loyalty_years": 2, "is_weekend": True},
]
for customer in customers:
context = engine.evaluate(customer)
print(f"\n{customer['name']}:")
print(f" 匹配规则: {context.matched_rules}")
print(f" 执行动作: {context.actions}")15.4.2 SQL查询解释器
from abc import ABC, abstractmethod
from typing import Dict, List, Any, Optional, Tuple
from dataclasses import dataclass, field
from enum import Enum, auto
import re
class TokenType(Enum):
SELECT = auto()
FROM = auto()
WHERE = auto()
AND = auto()
OR = auto()
NOT = auto()
ORDER = auto()
BY = auto()
ASC = auto()
DESC = auto()
LIMIT = auto()
OFFSET = auto()
IDENTIFIER = auto()
NUMBER = auto()
STRING = auto()
COMMA = auto()
DOT = auto()
STAR = auto()
LPAREN = auto()
RPAREN = auto()
EQ = auto()
NE = auto()
GT = auto()
GE = auto()
LT = auto()
LE = auto()
EOF = auto()
@dataclass
class Token:
type: TokenType
value: Any
def __repr__(self) -> str:
return f"Token({self.type.name}, {self.value!r})"
class SQLLexer:
"""SQL词法分析器"""
KEYWORDS = {
'SELECT': TokenType.SELECT,
'FROM': TokenType.FROM,
'WHERE': TokenType.WHERE,
'AND': TokenType.AND,
'OR': TokenType.OR,
'NOT': TokenType.NOT,
'ORDER': TokenType.ORDER,
'BY': TokenType.BY,
'ASC': TokenType.ASC,
'DESC': TokenType.DESC,
'LIMIT': TokenType.LIMIT,
'OFFSET': TokenType.OFFSET,
}
def __init__(self, text: str):
self._text = text
self._pos = 0
def tokenize(self) -> List[Token]:
tokens = []
while self._pos < len(self._text):
self._skip_whitespace()
if self._pos >= len(self._text):
break
char = self._text[self._pos]
if char.isalpha() or char == '_':
tokens.append(self._read_identifier())
elif char.isdigit():
tokens.append(self._read_number())
elif char in ('"', "'"):
tokens.append(self._read_string(char))
elif char == ',':
tokens.append(Token(TokenType.COMMA, ','))
self._pos += 1
elif char == '.':
tokens.append(Token(TokenType.DOT, '.'))
self._pos += 1
elif char == '*':
tokens.append(Token(TokenType.STAR, '*'))
self._pos += 1
elif char == '(':
tokens.append(Token(TokenType.LPAREN, '('))
self._pos += 1
elif char == ')':
tokens.append(Token(TokenType.RPAREN, ')'))
self._pos += 1
elif char == '=':
tokens.append(Token(TokenType.EQ, '='))
self._pos += 1
elif char == '!' and self._peek() == '=':
tokens.append(Token(TokenType.NE, '!='))
self._pos += 2
elif char == '>' and self._peek() == '=':
tokens.append(Token(TokenType.GE, '>='))
self._pos += 2
elif char == '>':
tokens.append(Token(TokenType.GT, '>'))
self._pos += 1
elif char == '<' and self._peek() == '=':
tokens.append(Token(TokenType.LE, '<='))
self._pos += 2
elif char == '<':
tokens.append(Token(TokenType.LT, '<'))
self._pos += 1
else:
raise SyntaxError(f"未知字符: {char}")
tokens.append(Token(TokenType.EOF, None))
return tokens
def _skip_whitespace(self) -> None:
while self._pos < len(self._text) and self._text[self._pos].isspace():
self._pos += 1
def _peek(self) -> Optional[str]:
if self._pos + 1 < len(self._text):
return self._text[self._pos + 1]
return None
def _read_identifier(self) -> Token:
start = self._pos
while self._pos < len(self._text) and (self._text[self._pos].isalnum() or self._text[self._pos] == '_'):
self._pos += 1
value = self._text[start:self._pos]
token_type = self.KEYWORDS.get(value.upper(), TokenType.IDENTIFIER)
return Token(token_type, value)
def _read_number(self) -> Token:
start = self._pos
while self._pos < len(self._text) and (self._text[self._pos].isdigit() or self._text[self._pos] == '.'):
self._pos += 1
value = self._text[start:self._pos]
return Token(TokenType.NUMBER, float(value) if '.' in value else int(value))
def _read_string(self, quote: str) -> Token:
self._pos += 1
start = self._pos
while self._pos < len(self._text) and self._text[self._pos] != quote:
self._pos += 1
value = self._text[start:self._pos]
self._pos += 1
return Token(TokenType.STRING, value)
@dataclass
class SelectColumn:
"""选择列"""
name: str
alias: Optional[str] = None
@dataclass
class Condition:
"""查询条件"""
field: str
operator: str
value: Any
@dataclass
class OrderBy:
"""排序"""
field: str
ascending: bool = True
@dataclass
class SelectQuery:
"""SELECT查询"""
columns: List[SelectColumn]
table: str
conditions: List[Condition]
logical_ops: List[str]
order_by: List[OrderBy]
limit: Optional[int] = None
offset: Optional[int] = None
class SQLParser:
"""SQL语法分析器"""
def __init__(self, tokens: List[Token]):
self._tokens = tokens
self._pos = 0
def parse(self) -> SelectQuery:
return self._parse_select()
def _current(self) -> Token:
return self._tokens[self._pos]
def _consume(self, expected: TokenType) -> Token:
token = self._current()
if token.type != expected:
raise SyntaxError(f"期望 {expected.name}, 但得到 {token.type.name}")
self._pos += 1
return token
def _match(self, *types: TokenType) -> bool:
return self._current().type in types
def _parse_select(self) -> SelectQuery:
self._consume(TokenType.SELECT)
columns = self._parse_columns()
self._consume(TokenType.FROM)
table = self._consume(TokenType.IDENTIFIER).value
conditions = []
logical_ops = []
if self._match(TokenType.WHERE):
self._consume(TokenType.WHERE)
conditions, logical_ops = self._parse_conditions()
order_by = []
if self._match(TokenType.ORDER):
self._consume(TokenType.ORDER)
self._consume(TokenType.BY)
order_by = self._parse_order_by()
limit = None
if self._match(TokenType.LIMIT):
self._consume(TokenType.LIMIT)
limit = self._consume(TokenType.NUMBER).value
offset = None
if self._match(TokenType.OFFSET):
self._consume(TokenType.OFFSET)
offset = self._consume(TokenType.NUMBER).value
return SelectQuery(
columns=columns,
table=table,
conditions=conditions,
logical_ops=logical_ops,
order_by=order_by,
limit=limit,
offset=offset
)
def _parse_columns(self) -> List[SelectColumn]:
columns = []
if self._match(TokenType.STAR):
self._consume(TokenType.STAR)
columns.append(SelectColumn(name='*'))
else:
columns.append(self._parse_column())
while self._match(TokenType.COMMA):
self._consume(TokenType.COMMA)
columns.append(self._parse_column())
return columns
def _parse_column(self) -> SelectColumn:
name = self._consume(TokenType.IDENTIFIER).value
alias = None
return SelectColumn(name=name, alias=alias)
def _parse_conditions(self) -> Tuple[List[Condition], List[str]]:
conditions = []
logical_ops = []
conditions.append(self._parse_condition())
while self._match(TokenType.AND, TokenType.OR):
op = 'AND' if self._current().type == TokenType.AND else 'OR'
self._pos += 1
logical_ops.append(op)
conditions.append(self._parse_condition())
return conditions, logical_ops
def _parse_condition(self) -> Condition:
field = self._consume(TokenType.IDENTIFIER).value
op_map = {
TokenType.EQ: '=',
TokenType.NE: '!=',
TokenType.GT: '>',
TokenType.GE: '>=',
TokenType.LT: '<',
TokenType.LE: '<=',
}
if self._current().type not in op_map:
raise SyntaxError(f"期望比较运算符, 但得到 {self._current().type.name}")
operator = op_map[self._current().type]
self._pos += 1
if self._match(TokenType.NUMBER):
value = self._consume(TokenType.NUMBER).value
elif self._match(TokenType.STRING):
value = self._consume(TokenType.STRING).value
else:
raise SyntaxError(f"期望值, 但得到 {self._current().type.name}")
return Condition(field=field, operator=operator, value=value)
def _parse_order_by(self) -> List[OrderBy]:
order_list = []
field = self._consume(TokenType.IDENTIFIER).value
ascending = True
if self._match(TokenType.ASC):
self._consume(TokenType.ASC)
elif self._match(TokenType.DESC):
self._consume(TokenType.DESC)
ascending = False
order_list.append(OrderBy(field=field, ascending=ascending))
while self._match(TokenType.COMMA):
self._consume(TokenType.COMMA)
field = self._consume(TokenType.IDENTIFIER).value
ascending = True
if self._match(TokenType.ASC):
self._consume(TokenType.ASC)
elif self._match(TokenType.DESC):
self._consume(TokenType.DESC)
ascending = False
order_list.append(OrderBy(field=field, ascending=ascending))
return order_list
class SQLInterpreter:
"""SQL解释器:在内存数据上执行查询"""
def __init__(self, tables: Dict[str, List[Dict[str, Any]]]):
self._tables = tables
def execute(self, sql: str) -> List[Dict[str, Any]]:
lexer = SQLLexer(sql)
tokens = lexer.tokenize()
parser = SQLParser(tokens)
query = parser.parse()
return self._execute_query(query)
def _execute_query(self, query: SelectQuery) -> List[Dict[str, Any]]:
if query.table not in self._tables:
raise ValueError(f"表不存在: {query.table}")
result = list(self._tables[query.table])
if query.conditions:
result = self._apply_conditions(result, query.conditions, query.logical_ops)
if query.order_by:
result = self._apply_order_by(result, query.order_by)
if query.offset:
result = result[query.offset:]
if query.limit:
result = result[:query.limit]
if query.columns and query.columns[0].name != '*':
result = self._select_columns(result, query.columns)
return result
def _apply_conditions(
self,
data: List[Dict],
conditions: List[Condition],
logical_ops: List[str]
) -> List[Dict]:
if not conditions:
return data
def matches(row: Dict) -> bool:
if not conditions:
return True
result = self._evaluate_condition(row, conditions[0])
for i, op in enumerate(logical_ops):
next_result = self._evaluate_condition(row, conditions[i + 1])
if op == 'AND':
result = result and next_result
else:
result = result or next_result
return result
return [row for row in data if matches(row)]
def _evaluate_condition(self, row: Dict, condition: Condition) -> bool:
if condition.field not in row:
return False
field_value = row[condition.field]
compare_value = condition.value
ops = {
'=': lambda a, b: a == b,
'!=': lambda a, b: a != b,
'>': lambda a, b: a > b,
'>=': lambda a, b: a >= b,
'<': lambda a, b: a < b,
'<=': lambda a, b: a <= b,
}
return ops[condition.operator](field_value, compare_value)
def _apply_order_by(self, data: List[Dict], order_by: List[OrderBy]) -> List[Dict]:
result = list(data)
for order in reversed(order_by):
result.sort(
key=lambda x: x.get(order.field, ''),
reverse=not order.ascending
)
return result
def _select_columns(self, data: List[Dict], columns: List[SelectColumn]) -> List[Dict]:
result = []
for row in data:
new_row = {}
for col in columns:
if col.name in row:
key = col.alias or col.name
new_row[key] = row[col.name]
result.append(new_row)
return result
if __name__ == "__main__":
tables = {
'users': [
{'id': 1, 'name': 'Alice', 'age': 30, 'department': 'Engineering'},
{'id': 2, 'name': 'Bob', 'age': 25, 'department': 'Marketing'},
{'id': 3, 'name': 'Charlie', 'age': 35, 'department': 'Engineering'},
{'id': 4, 'name': 'Diana', 'age': 28, 'department': 'Sales'},
{'id': 5, 'name': 'Eve', 'age': 32, 'department': 'Engineering'},
]
}
interpreter = SQLInterpreter(tables)
queries = [
"SELECT * FROM users",
"SELECT name, age FROM users WHERE age > 25",
"SELECT * FROM users WHERE department = 'Engineering' AND age > 30",
"SELECT * FROM users ORDER BY age DESC LIMIT 3",
"SELECT * FROM users WHERE age >= 30 ORDER BY name ASC",
]
for sql in queries:
print(f"\nSQL: {sql}")
result = interpreter.execute(sql)
for row in result:
print(f" {row}")15.4.3 配置脚本解释器
from abc import ABC, abstractmethod
from typing import Dict, Any, List, Optional
from dataclasses import dataclass, field
from enum import Enum, auto
import re
class ConfigTokenType(Enum):
IDENTIFIER = auto()
STRING = auto()
NUMBER = auto()
BOOLEAN = auto()
EQUALS = auto()
COLON = auto()
COMMA = auto()
LBRACE = auto()
RBRACE = auto()
LBRACKET = auto()
RBRACKET = auto()
NEWLINE = auto()
EOF = auto()
@dataclass
class ConfigToken:
type: ConfigTokenType
value: Any
line: int = 0
class ConfigLexer:
"""配置文件词法分析器"""
def __init__(self, text: str):
self._text = text
self._pos = 0
self._line = 1
def tokenize(self) -> List[ConfigToken]:
tokens = []
while self._pos < len(self._text):
char = self._text[self._pos]
if char == '\n':
tokens.append(ConfigToken(ConfigTokenType.NEWLINE, '\n', self._line))
self._line += 1
self._pos += 1
elif char.isspace():
self._pos += 1
elif char == '#':
while self._pos < len(self._text) and self._text[self._pos] != '\n':
self._pos += 1
elif char == '=':
tokens.append(ConfigToken(ConfigTokenType.EQUALS, '=', self._line))
self._pos += 1
elif char == ':':
tokens.append(ConfigToken(ConfigTokenType.COLON, ':', self._line))
self._pos += 1
elif char == ',':
tokens.append(ConfigToken(ConfigTokenType.COMMA, ',', self._line))
self._pos += 1
elif char == '{':
tokens.append(ConfigToken(ConfigTokenType.LBRACE, '{', self._line))
self._pos += 1
elif char == '}':
tokens.append(ConfigToken(ConfigTokenType.RBRACE, '}', self._line))
self._pos += 1
elif char == '[':
tokens.append(ConfigToken(ConfigTokenType.LBRACKET, '[', self._line))
self._pos += 1
elif char == ']':
tokens.append(ConfigToken(ConfigTokenType.RBRACKET, ']', self._line))
self._pos += 1
elif char in ('"', "'"):
tokens.append(self._read_string(char))
elif char.isdigit() or (char == '-' and self._peek() and self._peek().isdigit()):
tokens.append(self._read_number())
elif char.isalpha() or char == '_':
tokens.append(self._read_identifier())
else:
raise SyntaxError(f"未知字符 '{char}' at line {self._line}")
tokens.append(ConfigToken(ConfigTokenType.EOF, None, self._line))
return tokens
def _peek(self) -> Optional[str]:
if self._pos + 1 < len(self._text):
return self._text[self._pos + 1]
return None
def _read_string(self, quote: str) -> ConfigToken:
self._pos += 1
start = self._pos
line = self._line
while self._pos < len(self._text) and self._text[self._pos] != quote:
if self._text[self._pos] == '\n':
self._line += 1
self._pos += 1
value = self._text[start:self._pos]
self._pos += 1
return ConfigToken(ConfigTokenType.STRING, value, line)
def _read_number(self) -> ConfigToken:
start = self._pos
line = self._line
if self._text[self._pos] == '-':
self._pos += 1
while self._pos < len(self._text) and (self._text[self._pos].isdigit() or self._text[self._pos] == '.'):
self._pos += 1
value = self._text[start:self._pos]
num_value = float(value) if '.' in value else int(value)
return ConfigToken(ConfigTokenType.NUMBER, num_value, line)
def _read_identifier(self) -> ConfigToken:
start = self._pos
line = self._line
while self._pos < len(self._text) and (self._text[self._pos].isalnum() or self._text[self._pos] == '_'):
self._pos += 1
value = self._text[start:self._pos]
if value.lower() in ('true', 'false'):
return ConfigToken(ConfigTokenType.BOOLEAN, value.lower() == 'true', line)
return ConfigToken(ConfigTokenType.IDENTIFIER, value, line)
class ConfigValue(ABC):
"""配置值基类"""
@abstractmethod
def to_python(self) -> Any:
pass
@dataclass
class ScalarValue(ConfigValue):
"""标量值"""
value: Any
def to_python(self) -> Any:
return self.value
@dataclass
class DictValue(ConfigValue):
"""字典值"""
items: Dict[str, ConfigValue] = field(default_factory=dict)
def to_python(self) -> Dict[str, Any]:
return {k: v.to_python() for k, v in self.items.items()}
@dataclass
class ListValue(ConfigValue):
"""列表值"""
items: List[ConfigValue] = field(default_factory=list)
def to_python(self) -> List[Any]:
return [item.to_python() for item in self.items]
class ConfigParser:
"""配置文件语法分析器"""
def __init__(self, tokens: List[ConfigToken]):
self._tokens = tokens
self._pos = 0
def parse(self) -> DictValue:
return self._parse_dict()
def _current(self) -> ConfigToken:
return self._tokens[self._pos]
def _consume(self, expected: ConfigTokenType) -> ConfigToken:
token = self._current()
if token.type != expected:
raise SyntaxError(f"期望 {expected.name}, 但得到 {token.type.name} at line {token.line}")
self._pos += 1
return token
def _match(self, *types: ConfigTokenType) -> bool:
return self._current().type in types
def _skip_newlines(self) -> None:
while self._match(ConfigTokenType.NEWLINE):
self._pos += 1
def _parse_dict(self) -> DictValue:
result = DictValue()
self._skip_newlines()
while not self._match(ConfigTokenType.EOF, ConfigTokenType.RBRACE):
if self._match(ConfigTokenType.IDENTIFIER):
key = self._consume(ConfigTokenType.IDENTIFIER).value
if self._match(ConfigTokenType.EQUALS):
self._consume(ConfigTokenType.EQUALS)
elif self._match(ConfigTokenType.COLON):
self._consume(ConfigTokenType.COLON)
else:
raise SyntaxError(f"期望 '=' 或 ':' at line {self._current().line}")
value = self._parse_value()
result.items[key] = value
self._skip_newlines()
return result
def _parse_value(self) -> ConfigValue:
if self._match(ConfigTokenType.STRING):
return ScalarValue(self._consume(ConfigTokenType.STRING).value)
if self._match(ConfigTokenType.NUMBER):
return ScalarValue(self._consume(ConfigTokenType.NUMBER).value)
if self._match(ConfigTokenType.BOOLEAN):
return ScalarValue(self._consume(ConfigTokenType.BOOLEAN).value)
if self._match(ConfigTokenType.LBRACE):
return self._parse_nested_dict()
if self._match(ConfigTokenType.LBRACKET):
return self._parse_list()
raise SyntaxError(f"意外的Token: {self._current().type.name} at line {self._current().line}")
def _parse_nested_dict(self) -> DictValue:
self._consume(ConfigTokenType.LBRACE)
result = self._parse_dict()
if self._match(ConfigTokenType.RBRACE):
self._consume(ConfigTokenType.RBRACE)
return result
def _parse_list(self) -> ListValue:
self._consume(ConfigTokenType.LBRACKET)
result = ListValue()
self._skip_newlines()
while not self._match(ConfigTokenType.RBRACKET, ConfigTokenType.EOF):
result.items.append(self._parse_value())
if self._match(ConfigTokenType.COMMA):
self._consume(ConfigTokenType.COMMA)
self._skip_newlines()
if self._match(ConfigTokenType.RBRACKET):
self._consume(ConfigTokenType.RBRACKET)
return result
class ConfigInterpreter:
"""配置文件解释器"""
def __init__(self):
self._config: Dict[str, Any] = {}
def interpret(self, source: str) -> Dict[str, Any]:
lexer = ConfigLexer(source)
tokens = lexer.tokenize()
parser = ConfigParser(tokens)
config = parser.parse()
self._config = config.to_python()
return self._config
def get(self, key: str, default: Any = None) -> Any:
keys = key.split('.')
value = self._config
for k in keys:
if isinstance(value, dict) and k in value:
value = value[k]
else:
return default
return value
def get_int(self, key: str, default: int = 0) -> int:
value = self.get(key, default)
return int(value) if value is not None else default
def get_str(self, key: str, default: str = '') -> str:
value = self.get(key, default)
return str(value) if value is not None else default
def get_bool(self, key: str, default: bool = False) -> bool:
value = self.get(key, default)
return bool(value) if value is not None else default
def get_list(self, key: str, default: list = None) -> list:
value = self.get(key, default or [])
return list(value) if isinstance(value, (list, tuple)) else default or []
if __name__ == "__main__":
config_source = """
# 应用配置
app_name = "MyApplication"
version = "1.0.0"
debug = true
# 服务器配置
server {
host = "localhost"
port = 8080
workers = 4
}
# 数据库配置
database {
driver = "postgresql"
host = "localhost"
port = 5432
name = "mydb"
pool_size = 10
}
# 日志配置
logging {
level = "INFO"
format = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
handlers = ["console", "file"]
}
# 特性开关
features = ["feature_a", "feature_b", "feature_c"]
"""
interpreter = ConfigInterpreter()
config = interpreter.interpret(config_source)
print("完整配置:")
import json
print(json.dumps(config, indent=2))
print("\n便捷访问:")
print(f"app_name: {interpreter.get('app_name')}")
print(f"server.port: {interpreter.get_int('server.port')}")
print(f"database.host: {interpreter.get_str('database.host')}")
print(f"debug: {interpreter.get_bool('debug')}")
print(f"features: {interpreter.get_list('features')}")15.5 模式变体与扩展
15.5.1 解析器组合子
from typing import Generic, TypeVar, Callable, Optional, List, Tuple, Any
from dataclasses import dataclass
from functools import reduce
T = TypeVar('T')
R = TypeVar('R')
@dataclass
class ParseResult(Generic[T]):
"""解析结果"""
value: T
remaining: str
success: bool = True
error: str = ""
@classmethod
def fail(cls, error: str) -> 'ParseResult':
return cls(value=None, remaining="", success=False, error=error)
class Parser(Generic[T]):
"""解析器基类"""
def __init__(self, parse_func: Callable[[str], ParseResult[T]]):
self._parse = parse_func
def __call__(self, input_str: str) -> ParseResult[T]:
return self._parse(input_str)
def map(self, func: Callable[[T], R]) -> 'Parser[R]':
"""转换解析结果"""
def parse(input_str: str) -> ParseResult[R]:
result = self(input_str)
if result.success:
return ParseResult(func(result.value), result.remaining)
return ParseResult.fail(result.error)
return Parser(parse)
def or_else(self, other: 'Parser[T]') -> 'Parser[T]':
"""备选解析器"""
def parse(input_str: str) -> ParseResult[T]:
result = self(input_str)
if result.success:
return result
return other(input_str)
return Parser(parse)
def and_then(self, other: 'Parser[R]') -> 'Parser[Tuple[T, R]]':
"""顺序解析"""
def parse(input_str: str) -> ParseResult[Tuple[T, R]]:
result1 = self(input_str)
if not result1.success:
return ParseResult.fail(result1.error)
result2 = other(result1.remaining)
if not result2.success:
return ParseResult.fail(result2.error)
return ParseResult((result1.value, result2.value), result2.remaining)
return Parser(parse)
def many(self) -> 'Parser[List[T]]':
"""零次或多次"""
def parse(input_str: str) -> ParseResult[List[T]]:
results = []
remaining = input_str
while True:
result = self(remaining)
if not result.success:
break
results.append(result.value)
remaining = result.remaining
return ParseResult(results, remaining)
return Parser(parse)
def many1(self) -> 'Parser[List[T]]':
"""一次或多次"""
def parse(input_str: str) -> ParseResult[List[T]]:
result = self.many()(input_str)
if result.success and len(result.value) > 0:
return result
return ParseResult.fail("期望至少一个匹配")
return Parser(parse)
def optional(self) -> 'Parser[Optional[T]]':
"""可选"""
def parse(input_str: str) -> ParseResult[Optional[T]]:
result = self(input_str)
if result.success:
return result
return ParseResult(None, input_str)
return Parser(parse)
def char(c: str) -> Parser[str]:
"""匹配单个字符"""
def parse(input_str: str) -> ParseResult[str]:
if input_str and input_str[0] == c:
return ParseResult(c, input_str[1:])
return ParseResult.fail(f"期望字符 '{c}'")
return Parser(parse)
def string(s: str) -> Parser[str]:
"""匹配字符串"""
def parse(input_str: str) -> ParseResult[str]:
if input_str.startswith(s):
return ParseResult(s, input_str[len(s):])
return ParseResult.fail(f"期望字符串 '{s}'")
return Parser(parse)
def regex(pattern: str) -> Parser[str]:
"""正则匹配"""
import re
compiled = re.compile(pattern)
def parse(input_str: str) -> ParseResult[str]:
match = compiled.match(input_str)
if match:
return ParseResult(match.group(), input_str[match.end():])
return ParseResult.fail(f"不匹配模式 '{pattern}'")
return Parser(parse)
def digit() -> Parser[str]:
"""数字"""
return regex(r'\d')
def letter() -> Parser[str]:
"""字母"""
return regex(r'[a-zA-Z]')
def whitespace() -> Parser[str]:
"""空白字符"""
return regex(r'\s+')
def integer() -> Parser[int]:
"""整数"""
return regex(r'-?\d+').map(int)
def float_num() -> Parser[float]:
"""浮点数"""
return regex(r'-?\d+(\.\d+)?').map(float)
def identifier() -> Parser[str]:
"""标识符"""
return regex(r'[a-zA-Z_][a-zA-Z0-9_]*')
def symbol(s: str) -> Parser[str]:
"""符号(跳过前后空白)"""
return whitespace().optional().and_then(string(s)).map(lambda x: x[1])
def parens(parser: Parser[T]) -> Parser[T]:
"""括号包裹"""
def parse(input_str: str) -> ParseResult[T]:
result1 = symbol('(')(input_str)
if not result1.success:
return ParseResult.fail(result1.error)
result2 = parser(result1.remaining)
if not result2.success:
return ParseResult.fail(result2.error)
result3 = symbol(')')(result2.remaining)
if not result3.success:
return ParseResult.fail(result3.error)
return ParseResult(result2.value, result3.remaining)
return Parser(parse)
if __name__ == "__main__":
expr_parser = (
integer()
.and_then(symbol('+').optional())
.and_then(integer())
.map(lambda x: (x[0][0], x[0][1], x[1]))
)
test_cases = [
"123",
"42 + 10",
"7+3",
]
for test in test_cases:
result = expr_parser(test.strip())
if result.success:
print(f"'{test}' -> {result.value}")
else:
print(f"'{test}' -> 错误: {result.error}")15.5.2 字节码解释器
from typing import List, Dict, Any, Tuple
from dataclasses import dataclass, field
from enum import Enum, auto
import struct
class OpCode(Enum):
"""操作码"""
LOAD_CONST = auto()
LOAD_VAR = auto()
STORE_VAR = auto()
ADD = auto()
SUB = auto()
MUL = auto()
DIV = auto()
NEG = auto()
PRINT = auto()
JUMP = auto()
JUMP_IF_FALSE = auto()
CALL = auto()
RETURN = auto()
HALT = auto()
@dataclass
class Instruction:
"""指令"""
opcode: OpCode
operand: Any = None
class BytecodeCompiler:
"""字节码编译器"""
def __init__(self):
self._constants: List[Any] = []
self._variables: Dict[str, int] = {}
self._instructions: List[Instruction] = []
def compile(self, ast) -> List[Instruction]:
self._instructions = []
self._compile_node(ast)
self._instructions.append(Instruction(OpCode.HALT))
return self._instructions
def _compile_node(self, node) -> None:
if isinstance(node, NumberNode):
self._instructions.append(Instruction(OpCode.LOAD_CONST, node.value))
elif isinstance(node, VariableNode):
self._instructions.append(Instruction(OpCode.LOAD_VAR, node.name))
elif isinstance(node, BinaryOpNode):
self._compile_node(node.left)
self._compile_node(node.right)
ops = {'+': OpCode.ADD, '-': OpCode.SUB, '*': OpCode.MUL, '/': OpCode.DIV}
self._instructions.append(Instruction(ops[node.operator]))
elif isinstance(node, UnaryOpNode):
self._compile_node(node.operand)
if node.operator == '-':
self._instructions.append(Instruction(OpCode.NEG))
class VirtualMachine:
"""虚拟机:执行字节码"""
def __init__(self):
self._stack: List[Any] = []
self._variables: Dict[str, Any] = {}
self._pc = 0
self._instructions: List[Instruction] = []
def execute(self, instructions: List[Instruction], variables: Dict[str, Any] = None) -> Any:
self._instructions = instructions
self._variables = variables or {}
self._stack = []
self._pc = 0
while self._pc < len(self._instructions):
instr = self._instructions[self._pc]
self._execute_instruction(instr)
if instr.opcode == OpCode.HALT:
break
return self._stack[-1] if self._stack else None
def _execute_instruction(self, instr: Instruction) -> None:
opcode = instr.opcode
if opcode == OpCode.LOAD_CONST:
self._stack.append(instr.operand)
self._pc += 1
elif opcode == OpCode.LOAD_VAR:
value = self._variables.get(instr.operand, 0)
self._stack.append(value)
self._pc += 1
elif opcode == OpCode.STORE_VAR:
value = self._stack.pop()
self._variables[instr.operand] = value
self._pc += 1
elif opcode == OpCode.ADD:
b, a = self._stack.pop(), self._stack.pop()
self._stack.append(a + b)
self._pc += 1
elif opcode == OpCode.SUB:
b, a = self._stack.pop(), self._stack.pop()
self._stack.append(a - b)
self._pc += 1
elif opcode == OpCode.MUL:
b, a = self._stack.pop(), self._stack.pop()
self._stack.append(a * b)
self._pc += 1
elif opcode == OpCode.DIV:
b, a = self._stack.pop(), self._stack.pop()
self._stack.append(a / b)
self._pc += 1
elif opcode == OpCode.NEG:
a = self._stack.pop()
self._stack.append(-a)
self._pc += 1
elif opcode == OpCode.PRINT:
value = self._stack.pop()
print(value)
self._pc += 1
elif opcode == OpCode.HALT:
pass
else:
raise RuntimeError(f"未知操作码: {opcode}")
if __name__ == "__main__":
ast = BinaryOpNode(
left=BinaryOpNode(
left=NumberNode(2),
operator='*',
right=NumberNode(3)
),
operator='+',
right=NumberNode(4)
)
compiler = BytecodeCompiler()
bytecode = compiler.compile(ast)
print("字节码:")
for i, instr in enumerate(bytecode):
print(f" {i}: {instr.opcode.name} {instr.operand or ''}")
vm = VirtualMachine()
result = vm.execute(bytecode)
print(f"\n执行结果: {result}")15.6 反模式与最佳实践
15.6.1 常见反模式
反模式1:过度使用解释器
from abc import ABC, abstractmethod
from typing import Any
class OverEngineeredExpression(ABC):
"""错误示例:简单计算使用解释器"""
@abstractmethod
def interpret(self) -> Any:
pass
class SimpleSolution:
"""正确示例:直接计算"""
def calculate(self, x: float, y: float, operator: str) -> float:
ops = {'+': lambda a, b: a + b, '-': lambda a, b: a - b}
return ops[operator](x, y)反模式2:忽略错误处理
from abc import ABC, abstractmethod
from typing import Any
class UnsafeExpression(ABC):
"""错误示例:没有错误处理"""
@abstractmethod
def interpret(self, context: dict) -> Any:
pass
class SafeExpression(ABC):
"""正确示例:完善的错误处理"""
@abstractmethod
def interpret(self, context: dict) -> Any:
pass
def safe_interpret(self, context: dict, default: Any = None) -> Any:
try:
return self.interpret(context)
except Exception as e:
print(f"解释错误: {e}")
return default15.6.2 最佳实践清单
| 实践 | 说明 | 代码示例 |
|---|---|---|
| 明确文法 | 使用BNF/EBNF定义文法 | `<expr> ::= <term> (('+' |
| 分离关注点 | 词法分析、语法分析、解释分离 | Lexer → Parser → Interpreter |
| 错误恢复 | 提供有意义的错误信息 | SyntaxError(f"期望 {expected}") |
| 性能优化 | 使用缓存、预编译 | @lru_cache |
| 安全限制 | 限制执行时间和资源 | timeout, recursion limit |
| 可扩展性 | 支持自定义函数和变量 | context.functions[name] |
| 测试驱动 | 为每个文法规则编写测试 | test_expression_parsing() |
15.7 决策指南
15.7.1 是否使用解释器模式?
┌─────────────────────────────────────────────────────────────┐
│ 解释器模式决策树 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 问题:是否需要解释执行一种特定语言/规则? │
│ │ │
│ ├── 否 ──→ 不需要解释器模式 │
│ │ │
│ └── 是 │
│ │ │
│ ▼ │
│ 问题:文法是否相对简单且稳定? │
│ │ │
│ ├── 否 ──→ 考虑解析器生成工具(ANTLR等) │
│ │ │
│ └── 是 │
│ │ │
│ ▼ │
│ 问题:是否需要频繁扩展文法规则? │
│ │ │
│ ├── 否 ──→ 考虑硬编码解析 │
│ │ │
│ └── 是 ──→ ✓ 使用解释器模式 │
│ │
└─────────────────────────────────────────────────────────────┘15.7.2 实现技术选择
| 场景 | 推荐实现 | 理由 |
|---|---|---|
| 简单表达式 | 手写递归下降 | 简单、可控 |
| 复杂语言 | 解析器生成器 | ANTLR、PLY |
| 配置文件 | 现有库 | JSON、YAML、TOML |
| 规则引擎 | 解释器模式 | 灵活、可扩展 |
| DSL | 组合子库 | parsec、parsimonious |
15.7.3 与其他模式的关系
| 模式 | 关系 | 协作方式 |
|---|---|---|
| 组合 | AST结构 | AST节点使用组合模式 |
| 访问者 | AST遍历 | 访问者遍历AST |
| 迭代器 | Token遍历 | 词法分析返回Token迭代器 |
| 工厂方法 | AST构建 | 解析器使用工厂创建节点 |
| 备忘录 | 状态保存 | 解释器状态快照 |
15.8 快速参考卡片
┌─────────────────────────────────────────────────────────────────────────┐
│ 解释器模式速查表 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ 定义: 给定语言,定义文法表示,定义解释器解释句子 │
│ │
│ 核心公式: │
│ Interpret: L × G → R │
│ AST(s) = Parse(Tokenize(s)) │
│ │
│ ───────────────────────────────────────────────────────────────────── │
│ │
│ 参与者: │
│ • AbstractExpression → 声明解释接口 │
│ • TerminalExpression → 终结符实现(叶子节点) │
│ • NonterminalExpression → 非终结符实现(组合节点) │
│ • Context → 存储解释环境 │
│ • Client → 构建AST并调用解释 │
│ │
│ ───────────────────────────────────────────────────────────────────── │
│ │
│ 处理流程: │
│ 源代码 → Lexer → Token序列 → Parser → AST → Interpreter → 结果 │
│ │
│ ───────────────────────────────────────────────────────────────────── │
│ │
│ Python实现要点: │
│ class Expression(ABC): │
│ @abstractmethod │
│ def interpret(self, context: Context) -> Any: │
│ pass │
│ │
│ class Lexer: │
│ def tokenize(self, source: str) -> List[Token] │
│ │
│ class Parser: │
│ def parse(self, tokens: List[Token]) -> ASTNode │
│ │
│ class Interpreter: │
│ def interpret(self, ast: ASTNode, context: dict) -> Any │
│ │
│ ───────────────────────────────────────────────────────────────────── │
│ │
│ 适用场景: │
│ ✓ 需要解释执行特定语言 │
│ ✓ 文法相对简单且稳定 │
│ ✓ 需要频繁扩展文法规则 │
│ ✓ 规则引擎、查询语言、配置脚本 │
│ │
│ 不适用场景: │
│ ✗ 文法非常复杂 │
│ ✗ 性能要求极高 │
│ ✗ 已有成熟的解析工具可用 │
│ │
│ ───────────────────────────────────────────────────────────────────── │
│ │
│ 经典案例: │
│ • 正则表达式引擎 │
│ • SQL解析器 │
│ • 规则引擎 │
│ • 数学表达式计算器 │
│ • 配置文件解析 │
│ • DSL解释器 │
│ │
└─────────────────────────────────────────────────────────────────────────┘15.9 思考与实践
15.9.1 思考题
概念辨析:解释器模式与编译器有什么本质区别?什么情况下选择解释执行而非编译执行?
性能优化:如何优化解释器的性能?考虑字节码编译、JIT编译等技术。
错误处理:如何设计友好的错误提示?考虑错误位置、期望内容、修复建议。
安全考虑:如何防止解释器执行恶意代码?考虑沙箱、权限控制、资源限制。
扩展性设计:如何设计可扩展的解释器架构?考虑插件系统、自定义函数、运算符重载。
15.9.2 实践练习
练习1:实现JSON解释器
设计一个JSON解析器,支持:
- 基本类型:字符串、数字、布尔值、null
- 复合类型:对象、数组
- 错误处理和位置信息
练习2:实现正则表达式解释器
设计一个简化的正则表达式引擎,支持:
- 字符匹配、字符类
- 量词:*、+、?
- 分组和选择
练习3:实现模板引擎
设计一个模板语言解释器,支持:
- 变量插值
- 条件语句
{% if %} - 循环语句
{% for %} - 过滤器
0
15.10 小结
解释器模式是一种行为型设计模式,通过定义文法表示和解释器,实现对特定语言的解释执行。本章深入探讨了:
- 理论基础:文法定义、抽象语法树、解释执行模型
- 实现技术:词法分析、语法分析、AST遍历、Python ast模块
- 企业应用:规则引擎、SQL查询解释器、配置脚本解释器
- 模式变体:解析器组合子、字节码解释器
- 最佳实践:避免过度设计、完善错误处理、性能优化
解释器模式的核心价值在于:通过将文法规则映射为类层次结构,实现对特定语言的灵活解释执行,适用于规则引擎、DSL、查询语言等场景。
参考资料
- Gamma, E., et al. Design Patterns: Elements of Reusable Object-Oriented Software. Addison-Wesley, 1994.
- Aho, A., et al. Compilers: Principles, Techniques, and Tools. Pearson, 2006.
- Parr, T. The Definitive ANTLR 4 Reference. Pragmatic Bookshelf, 2013.
- Fowler, M. Domain-Specific Languages. Addison-Wesley, 2010.
- Python Documentation. ast — Abstract Syntax Trees. https://docs.python.org/3/library/ast.html