第50章 自动化运维
学习目标
完成本章学习后,你将能够:
- 实现自动化脚本:系统管理、批量操作、定时任务
- 使用配置管理:Ansible、配置模板、环境管理
- 实现基础设施即代码:Terraform、资源编排、版本控制
- 进行系统监控:资源监控、日志收集、告警通知
- 实现自动化部署:CI/CD流水线、滚动更新、回滚机制
- 管理服务器集群:批量执行、配置同步、状态管理
- 实现故障自愈:健康检查、自动恢复、故障转移
- 构建运维平台:Web界面、API服务、权限管理
50.1 自动化脚本
50.1.1 系统管理
python
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any, Callable
from enum import Enum
import subprocess
import asyncio
import platform
import os
import time
from datetime import datetime
class OSType(Enum):
WINDOWS = "windows"
LINUX = "linux"
MACOS = "macos"
@dataclass
class CommandResult:
command: str
exit_code: int
stdout: str
stderr: str
execution_time: float
timestamp: datetime = field(default_factory=datetime.now)
@property
def success(self) -> bool:
return self.exit_code == 0
class SystemInfo:
@staticmethod
def get_os_type() -> OSType:
system = platform.system().lower()
if system == "windows":
return OSType.WINDOWS
elif system == "linux":
return OSType.LINUX
elif system == "darwin":
return OSType.MACOS
return OSType.LINUX
@staticmethod
def get_hostname() -> str:
return platform.node()
@staticmethod
def get_cpu_count() -> int:
return os.cpu_count() or 1
@staticmethod
def get_memory_info() -> Dict[str, float]:
return {
"total_gb": 16.0,
"available_gb": 8.0,
"used_percent": 50.0
}
@staticmethod
def get_disk_info() -> List[Dict]:
return [
{
"path": "/",
"total_gb": 500.0,
"used_gb": 250.0,
"free_gb": 250.0,
"used_percent": 50.0
}
]
class CommandExecutor:
def __init__(self, timeout: int = 300):
self.timeout = timeout
self.history: List[CommandResult] = []
def run(
self,
command: str,
cwd: str = None,
env: Dict[str, str] = None,
shell: bool = True
) -> CommandResult:
start_time = time.time()
try:
result = subprocess.run(
command,
shell=shell,
cwd=cwd,
env={**os.environ, **(env or {})},
capture_output=True,
text=True,
timeout=self.timeout
)
execution_time = time.time() - start_time
cmd_result = CommandResult(
command=command,
exit_code=result.returncode,
stdout=result.stdout,
stderr=result.stderr,
execution_time=execution_time
)
except subprocess.TimeoutExpired:
execution_time = time.time() - start_time
cmd_result = CommandResult(
command=command,
exit_code=-1,
stdout="",
stderr=f"Command timed out after {self.timeout} seconds",
execution_time=execution_time
)
except Exception as e:
execution_time = time.time() - start_time
cmd_result = CommandResult(
command=command,
exit_code=-1,
stdout="",
stderr=str(e),
execution_time=execution_time
)
self.history.append(cmd_result)
return cmd_result
async def run_async(
self,
command: str,
cwd: str = None,
env: Dict[str, str] = None
) -> CommandResult:
start_time = time.time()
try:
process = await asyncio.create_subprocess_shell(
command,
cwd=cwd,
env={**os.environ, **(env or {})},
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await asyncio.wait_for(
process.communicate(),
timeout=self.timeout
)
execution_time = time.time() - start_time
return CommandResult(
command=command,
exit_code=process.returncode,
stdout=stdout.decode() if stdout else "",
stderr=stderr.decode() if stderr else "",
execution_time=execution_time
)
except asyncio.TimeoutError:
execution_time = time.time() - start_time
return CommandResult(
command=command,
exit_code=-1,
stdout="",
stderr=f"Command timed out after {self.timeout} seconds",
execution_time=execution_time
)
def get_history(self, limit: int = 100) -> List[CommandResult]:
return self.history[-limit:]
class ServiceManager:
def __init__(self, executor: CommandExecutor = None):
self.executor = executor or CommandExecutor()
self.os_type = SystemInfo.get_os_type()
def start(self, service_name: str) -> CommandResult:
if self.os_type == OSType.WINDOWS:
return self.executor.run(f"net start {service_name}")
else:
return self.executor.run(f"systemctl start {service_name}")
def stop(self, service_name: str) -> CommandResult:
if self.os_type == OSType.WINDOWS:
return self.executor.run(f"net stop {service_name}")
else:
return self.executor.run(f"systemctl stop {service_name}")
def restart(self, service_name: str) -> CommandResult:
if self.os_type == OSType.WINDOWS:
self.stop(service_name)
return self.start(service_name)
else:
return self.executor.run(f"systemctl restart {service_name}")
def status(self, service_name: str) -> Dict:
if self.os_type == OSType.WINDOWS:
result = self.executor.run(f"sc query {service_name}")
return {
"running": "RUNNING" in result.stdout,
"status": "running" if "RUNNING" in result.stdout else "stopped"
}
else:
result = self.executor.run(f"systemctl is-active {service_name}")
return {
"running": result.stdout.strip() == "active",
"status": result.stdout.strip()
}
def enable(self, service_name: str) -> CommandResult:
if self.os_type == OSType.WINDOWS:
return self.executor.run(f"sc config {service_name} start= auto")
else:
return self.executor.run(f"systemctl enable {service_name}")
def disable(self, service_name: str) -> CommandResult:
if self.os_type == OSType.WINDOWS:
return self.executor.run(f"sc config {service_name} start= disabled")
else:
return self.executor.run(f"systemctl disable {service_name}")
class PackageManager:
def __init__(self, executor: CommandExecutor = None):
self.executor = executor or CommandExecutor()
self.os_type = SystemInfo.get_os_type()
def _get_package_manager(self) -> str:
if self.os_type == OSType.WINDOWS:
return "choco"
elif self.os_type == OSType.MACOS:
return "brew"
else:
if os.path.exists("/usr/bin/apt"):
return "apt"
elif os.path.exists("/usr/bin/yum"):
return "yum"
elif os.path.exists("/usr/bin/dnf"):
return "dnf"
return "apt"
def install(self, package: str) -> CommandResult:
pm = self._get_package_manager()
if pm == "apt":
return self.executor.run(f"apt install -y {package}")
elif pm == "yum":
return self.executor.run(f"yum install -y {package}")
elif pm == "dnf":
return self.executor.run(f"dnf install -y {package}")
elif pm == "brew":
return self.executor.run(f"brew install {package}")
elif pm == "choco":
return self.executor.run(f"choco install -y {package}")
return CommandResult(package, -1, "", "Unknown package manager", 0)
def remove(self, package: str) -> CommandResult:
pm = self._get_package_manager()
if pm == "apt":
return self.executor.run(f"apt remove -y {package}")
elif pm in ["yum", "dnf"]:
return self.executor.run(f"{pm} remove -y {package}")
elif pm == "brew":
return self.executor.run(f"brew uninstall {package}")
elif pm == "choco":
return self.executor.run(f"choco uninstall -y {package}")
return CommandResult(package, -1, "", "Unknown package manager", 0)
def update(self) -> CommandResult:
pm = self._get_package_manager()
if pm == "apt":
return self.executor.run("apt update && apt upgrade -y")
elif pm in ["yum", "dnf"]:
return self.executor.run(f"{pm} update -y")
elif pm == "brew":
return self.executor.run("brew update && brew upgrade")
elif pm == "choco":
return self.executor.run("choco upgrade all -y")
return CommandResult("update", -1, "", "Unknown package manager", 0)
class FileSync:
def __init__(self, executor: CommandExecutor = None):
self.executor = executor or CommandExecutor()
def copy(
self,
src: str,
dst: str,
recursive: bool = True
) -> CommandResult:
if SystemInfo.get_os_type() == OSType.WINDOWS:
flags = "/E /I" if recursive else ""
return self.executor.run(f'xcopy "{src}" "{dst}" {flags}')
else:
flags = "-r" if recursive else ""
return self.executor.run(f"cp {flags} {src} {dst}")
def move(self, src: str, dst: str) -> CommandResult:
return self.executor.run(f"mv {src} {dst}")
def delete(self, path: str, recursive: bool = False) -> CommandResult:
if SystemInfo.get_os_type() == OSType.WINDOWS:
if recursive:
return self.executor.run(f'rmdir /s /q "{path}"')
return self.executor.run(f'del "{path}"')
else:
flags = "-rf" if recursive else "-f"
return self.executor.run(f"rm {flags} {path}")
def sync_directories(
self,
src: str,
dst: str,
delete: bool = False
) -> CommandResult:
if SystemInfo.get_os_type() == OSType.WINDOWS:
flags = "/MIR" if delete else "/E"
return self.executor.run(f'robocopy "{src}" "{dst}" {flags}')
else:
flags = "-av --delete" if delete else "-av"
return self.executor.run(f"rsync {flags} {src}/ {dst}/")50.1.2 批量操作
python
from typing import List, Dict, Optional, Any
from dataclasses import dataclass
import asyncio
@dataclass
class Host:
hostname: str
ip: str
port: int = 22
username: str = "root"
password: str = ""
key_file: str = ""
tags: List[str] = field(default_factory=list)
groups: List[str] = field(default_factory=list)
@dataclass
class TaskResult:
host: str
success: bool
output: str
error: str
duration: float
class HostInventory:
def __init__(self):
self.hosts: Dict[str, Host] = {}
def add_host(self, host: Host) -> None:
self.hosts[host.hostname] = host
def remove_host(self, hostname: str) -> None:
self.hosts.pop(hostname, None)
def get_host(self, hostname: str) -> Optional[Host]:
return self.hosts.get(hostname)
def get_hosts_by_tag(self, tag: str) -> List[Host]:
return [h for h in self.hosts.values() if tag in h.tags]
def get_hosts_by_group(self, group: str) -> List[Host]:
return [h for h in self.hosts.values() if group in h.groups]
def load_from_file(self, filepath: str) -> None:
import yaml
with open(filepath, 'r') as f:
data = yaml.safe_load(f)
for hostname, config in data.get('hosts', {}).items():
host = Host(
hostname=hostname,
ip=config.get('ip', ''),
port=config.get('port', 22),
username=config.get('username', 'root'),
tags=config.get('tags', []),
groups=config.get('groups', [])
)
self.add_host(host)
class BatchExecutor:
def __init__(
self,
inventory: HostInventory,
max_workers: int = 10
):
self.inventory = inventory
self.max_workers = max_workers
async def execute_on_host(
self,
host: Host,
command: str
) -> TaskResult:
start_time = time.time()
try:
executor = CommandExecutor()
result = executor.run(command)
return TaskResult(
host=host.hostname,
success=result.success,
output=result.stdout,
error=result.stderr,
duration=time.time() - start_time
)
except Exception as e:
return TaskResult(
host=host.hostname,
success=False,
output="",
error=str(e),
duration=time.time() - start_time
)
async def execute_on_hosts(
self,
hosts: List[Host],
command: str
) -> List[TaskResult]:
tasks = [
self.execute_on_host(host, command)
for host in hosts
]
return await asyncio.gather(*tasks)
def execute_on_group(
self,
group: str,
command: str
) -> List[TaskResult]:
hosts = self.inventory.get_hosts_by_group(group)
return asyncio.run(self.execute_on_hosts(hosts, command))
def execute_on_tag(
self,
tag: str,
command: str
) -> List[TaskResult]:
hosts = self.inventory.get_hosts_by_tag(tag)
return asyncio.run(self.execute_on_hosts(hosts, command))
class TaskRunner:
def __init__(self, inventory: HostInventory):
self.inventory = inventory
self.tasks: List[Dict] = []
def add_task(
self,
name: str,
hosts: List[str],
commands: List[str],
when: str = None
) -> None:
self.tasks.append({
"name": name,
"hosts": hosts,
"commands": commands,
"when": when
})
def run(self) -> Dict[str, List[TaskResult]]:
results = {}
for task in self.tasks:
task_results = []
hosts = [
self.inventory.get_host(h)
for h in task["hosts"]
]
hosts = [h for h in hosts if h]
for command in task["commands"]:
executor = BatchExecutor(self.inventory)
task_results.extend(
asyncio.run(executor.execute_on_hosts(hosts, command))
)
results[task["name"]] = task_results
return results
class ScheduledTask:
def __init__(
self,
name: str,
command: str,
schedule: str,
enabled: bool = True
):
self.name = name
self.command = command
self.schedule = schedule
self.enabled = enabled
self.last_run: Optional[datetime] = None
self.next_run: Optional[datetime] = None
self.run_count: int = 0
def should_run(self, now: datetime) -> bool:
if not self.enabled:
return False
return True
class TaskScheduler:
def __init__(self):
self.tasks: Dict[str, ScheduledTask] = {}
self.running = False
def add_task(self, task: ScheduledTask) -> None:
self.tasks[task.name] = task
def remove_task(self, name: str) -> None:
self.tasks.pop(name, None)
def enable_task(self, name: str) -> None:
if name in self.tasks:
self.tasks[name].enabled = True
def disable_task(self, name: str) -> None:
if name in self.tasks:
self.tasks[name].enabled = False
async def run_task(self, name: str) -> CommandResult:
task = self.tasks.get(name)
if not task:
return CommandResult("", -1, "", "Task not found", 0)
executor = CommandExecutor()
result = executor.run(task.command)
task.last_run = datetime.now()
task.run_count += 1
return result
async def start(self) -> None:
self.running = True
while self.running:
now = datetime.now()
for task in self.tasks.values():
if task.should_run(now):
await self.run_task(task.name)
await asyncio.sleep(60)
def stop(self) -> None:
self.running = False50.2 配置管理
50.2.1 Ansible风格配置
python
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any
from enum import Enum
import json
import yaml
class TaskState(Enum):
PRESENT = "present"
ABSENT = "absent"
LATEST = "latest"
@dataclass
class AnsibleTask:
name: str
module: str
args: Dict[str, Any]
when: str = None
notify: List[str] = field(default_factory=list)
register: str = None
become: bool = False
ignore_errors: bool = False
@dataclass
class AnsibleHandler:
name: str
tasks: List[AnsibleTask] = field(default_factory=list)
@dataclass
class AnsiblePlay:
name: str
hosts: List[str]
tasks: List[AnsibleTask] = field(default_factory=list)
handlers: List[AnsibleHandler] = field(default_factory=list)
vars: Dict[str, Any] = field(default_factory=dict)
become: bool = False
class AnsiblePlaybook:
def __init__(self, name: str):
self.name = name
self.plays: List[AnsiblePlay] = []
def add_play(self, play: AnsiblePlay) -> None:
self.plays.append(play)
def to_yaml(self) -> str:
playbook = []
for play in self.plays:
play_dict = {
"name": play.name,
"hosts": ",".join(play.hosts),
"vars": play.vars,
"become": play.become,
"tasks": []
}
for task in play.tasks:
task_dict = {
"name": task.name,
task.module: task.args
}
if task.when:
task_dict["when"] = task.when
if task.notify:
task_dict["notify"] = task.notify
if task.register:
task_dict["register"] = task.register
if task.become:
task_dict["become"] = task.become
if task.ignore_errors:
task_dict["ignore_errors"] = task.ignore_errors
play_dict["tasks"].append(task_dict)
if play.handlers:
play_dict["handlers"] = []
for handler in play.handlers:
handler_dict = {
"name": handler.name,
"tasks": []
}
for task in handler.tasks:
handler_dict["tasks"].append({
"name": task.name,
task.module: task.args
})
play_dict["handlers"].append(handler_dict)
playbook.append(play_dict)
return yaml.dump(playbook, default_flow_style=False)
@classmethod
def from_yaml(cls, yaml_content: str) -> "AnsiblePlaybook":
data = yaml.safe_load(yaml_content)
playbook = cls("imported")
playbook.name = "imported_playbook"
for play_data in data:
play = AnsiblePlay(
name=play_data.get("name", ""),
hosts=play_data.get("hosts", "").split(","),
vars=play_data.get("vars", {}),
become=play_data.get("become", False)
)
for task_data in play_data.get("tasks", []):
module = None
args = {}
for key, value in task_data.items():
if key not in ["name", "when", "notify", "register", "become", "ignore_errors"]:
module = key
args = value
break
if module:
task = AnsibleTask(
name=task_data.get("name", ""),
module=module,
args=args,
when=task_data.get("when"),
notify=task_data.get("notify", []),
register=task_data.get("register"),
become=task_data.get("become", False),
ignore_errors=task_data.get("ignore_errors", False)
)
play.tasks.append(task)
playbook.plays.append(play)
return playbook
class ModuleExecutor:
def __init__(self, executor: CommandExecutor = None):
self.executor = executor or CommandExecutor()
def apt(
self,
name: str,
state: TaskState = TaskState.PRESENT,
update_cache: bool = False
) -> CommandResult:
if update_cache:
self.executor.run("apt update")
if state == TaskState.PRESENT:
return self.executor.run(f"apt install -y {name}")
elif state == TaskState.ABSENT:
return self.executor.run(f"apt remove -y {name}")
elif state == TaskState.LATEST:
return self.executor.run(f"apt install -y --only-upgrade {name}")
return CommandResult(name, -1, "", "Unknown state", 0)
def yum(
self,
name: str,
state: TaskState = TaskState.PRESENT
) -> CommandResult:
if state == TaskState.PRESENT:
return self.executor.run(f"yum install -y {name}")
elif state == TaskState.ABSENT:
return self.executor.run(f"yum remove -y {name}")
elif state == TaskState.LATEST:
return self.executor.run(f"yum update -y {name}")
return CommandResult(name, -1, "", "Unknown state", 0)
def pip(
self,
name: str,
state: TaskState = TaskState.PRESENT,
version: str = None
) -> CommandResult:
if version:
name = f"{name}=={version}"
if state == TaskState.PRESENT:
return self.executor.run(f"pip install {name}")
elif state == TaskState.ABSENT:
return self.executor.run(f"pip uninstall -y {name}")
elif state == TaskState.LATEST:
return self.executor.run(f"pip install --upgrade {name}")
return CommandResult(name, -1, "", "Unknown state", 0)
def file(
self,
path: str,
state: TaskState = TaskState.PRESENT,
mode: str = None,
owner: str = None,
group: str = None
) -> CommandResult:
if state == TaskState.ABSENT:
return self.executor.run(f"rm -rf {path}")
if state == TaskState.PRESENT:
self.executor.run(f"mkdir -p {path}")
if mode:
self.executor.run(f"chmod {mode} {path}")
if owner:
self.executor.run(f"chown {owner} {path}")
if group:
self.executor.run(f"chgrp {group} {path}")
return CommandResult(path, 0, "", "", 0)
def template(
self,
src: str,
dest: str,
variables: Dict[str, Any] = None
) -> CommandResult:
with open(src, 'r') as f:
content = f.read()
if variables:
for key, value in variables.items():
content = content.replace(f"{{{{ {key} }}}}", str(value))
with open(dest, 'w') as f:
f.write(content)
return CommandResult(dest, 0, "", "", 0)
def copy(
self,
src: str,
dest: str,
mode: str = None
) -> CommandResult:
result = self.executor.run(f"cp {src} {dest}")
if mode and result.success:
self.executor.run(f"chmod {mode} {dest}")
return result
def service(
self,
name: str,
state: str = "started",
enabled: bool = None
) -> CommandResult:
manager = ServiceManager(self.executor)
if enabled is not None:
if enabled:
manager.enable(name)
else:
manager.disable(name)
if state == "started":
return manager.start(name)
elif state == "stopped":
return manager.stop(name)
elif state == "restarted":
return manager.restart(name)
return CommandResult(name, 0, "", "", 0)
def command(self, cmd: str) -> CommandResult:
return self.executor.run(cmd)
def shell(self, cmd: str) -> CommandResult:
return self.executor.run(cmd)
class PlaybookRunner:
def __init__(self, inventory: HostInventory):
self.inventory = inventory
self.module_executor = ModuleExecutor()
self.results: List[Dict] = []
def run(self, playbook: AnsiblePlaybook) -> List[Dict]:
self.results = []
for play in playbook.plays:
for task in play.tasks:
result = self._execute_task(task, play.vars)
self.results.append(result)
return self.results
def _execute_task(
self,
task: AnsibleTask,
variables: Dict[str, Any]
) -> Dict:
start_time = time.time()
try:
module = task.module
args = task.args
if module == "apt":
result = self.module_executor.apt(
name=args.get("name"),
state=TaskState(args.get("state", "present")),
update_cache=args.get("update_cache", False)
)
elif module == "pip":
result = self.module_executor.pip(
name=args.get("name"),
state=TaskState(args.get("state", "present")),
version=args.get("version")
)
elif module == "file":
result = self.module_executor.file(
path=args.get("path"),
state=TaskState(args.get("state", "present")),
mode=args.get("mode"),
owner=args.get("owner"),
group=args.get("group")
)
elif module == "template":
result = self.module_executor.template(
src=args.get("src"),
dest=args.get("dest"),
variables=variables
)
elif module == "copy":
result = self.module_executor.copy(
src=args.get("src"),
dest=args.get("dest"),
mode=args.get("mode")
)
elif module == "service":
result = self.module_executor.service(
name=args.get("name"),
state=args.get("state", "started"),
enabled=args.get("enabled")
)
elif module in ["command", "shell"]:
result = self.module_executor.command(args.get("cmd", args.get("_raw_params", "")))
else:
result = CommandResult(task.name, -1, "", f"Unknown module: {module}", 0)
return {
"task": task.name,
"success": result.success,
"output": result.stdout,
"error": result.stderr,
"duration": time.time() - start_time
}
except Exception as e:
return {
"task": task.name,
"success": False,
"output": "",
"error": str(e),
"duration": time.time() - start_time
}50.3 系统监控
50.3.1 资源监控
python
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Callable
from datetime import datetime
import threading
import time
@dataclass
class MetricPoint:
timestamp: datetime
value: float
tags: Dict[str, str] = field(default_factory=dict)
@dataclass
class MetricSeries:
name: str
points: List[MetricPoint] = field(default_factory=list)
tags: Dict[str, str] = field(default_factory=dict)
def add_point(self, value: float, tags: Dict[str, str] = None) -> None:
point = MetricPoint(
timestamp=datetime.now(),
value=value,
tags=tags or {}
)
self.points.append(point)
def get_latest(self) -> Optional[MetricPoint]:
return self.points[-1] if self.points else None
def get_average(self, window_seconds: int = 60) -> float:
if not self.points:
return 0.0
now = datetime.now()
cutoff = now.timestamp() - window_seconds
recent_points = [
p for p in self.points
if p.timestamp.timestamp() >= cutoff
]
if not recent_points:
return 0.0
return sum(p.value for p in recent_points) / len(recent_points)
class MetricsCollector:
def __init__(self):
self.metrics: Dict[str, MetricSeries] = {}
self.collectors: Dict[str, Callable] = {}
self._running = False
self._thread: Optional[threading.Thread] = None
def register_collector(
self,
name: str,
collector: Callable[[], float],
tags: Dict[str, str] = None
) -> None:
self.collectors[name] = collector
if name not in self.metrics:
self.metrics[name] = MetricSeries(name=name, tags=tags or {})
def unregister_collector(self, name: str) -> None:
self.collectors.pop(name, None)
def collect(self) -> Dict[str, float]:
values = {}
for name, collector in self.collectors.items():
try:
value = collector()
values[name] = value
if name in self.metrics:
self.metrics[name].add_point(value)
except Exception as e:
values[name] = None
return values
def start(self, interval: int = 60) -> None:
self._running = True
def _collect_loop():
while self._running:
self.collect()
time.sleep(interval)
self._thread = threading.Thread(target=_collect_loop, daemon=True)
self._thread.start()
def stop(self) -> None:
self._running = False
if self._thread:
self._thread.join(timeout=5)
class SystemMonitor:
def __init__(self):
self.collector = MetricsCollector()
self._setup_default_collectors()
def _setup_default_collectors(self) -> None:
self.collector.register_collector(
"cpu_percent",
self._get_cpu_percent
)
self.collector.register_collector(
"memory_percent",
self._get_memory_percent
)
self.collector.register_collector(
"disk_percent",
self._get_disk_percent
)
def _get_cpu_percent(self) -> float:
return 50.0
def _get_memory_percent(self) -> float:
return 60.0
def _get_disk_percent(self) -> float:
return 40.0
def start(self, interval: int = 60) -> None:
self.collector.start(interval)
def stop(self) -> None:
self.collector.stop()
def get_current_metrics(self) -> Dict[str, float]:
return self.collector.collect()
def get_metric_history(
self,
name: str,
window_seconds: int = 3600
) -> List[MetricPoint]:
if name not in self.collector.metrics:
return []
series = self.collector.metrics[name]
now = datetime.now()
cutoff = now.timestamp() - window_seconds
return [
p for p in series.points
if p.timestamp.timestamp() >= cutoff
]
@dataclass
class Alert:
name: str
condition: str
threshold: float
severity: str
message: str
enabled: bool = True
class AlertManager:
def __init__(self, monitor: SystemMonitor):
self.monitor = monitor
self.alerts: Dict[str, Alert] = {}
self.handlers: List[Callable] = []
self._triggered: Dict[str, datetime] = {}
def add_alert(self, alert: Alert) -> None:
self.alerts[alert.name] = alert
def remove_alert(self, name: str) -> None:
self.alerts.pop(name, None)
def add_handler(self, handler: Callable) -> None:
self.handlers.append(handler)
def check(self) -> List[Dict]:
triggered = []
metrics = self.monitor.get_current_metrics()
for name, alert in self.alerts.items():
if not alert.enabled:
continue
metric_value = metrics.get(alert.condition)
if metric_value is None:
continue
is_triggered = False
if alert.severity == "critical":
is_triggered = metric_value >= alert.threshold
elif alert.severity == "warning":
is_triggered = metric_value >= alert.threshold
if is_triggered:
if name not in self._triggered:
self._triggered[name] = datetime.now()
alert_data = {
"name": alert.name,
"condition": alert.condition,
"value": metric_value,
"threshold": alert.threshold,
"severity": alert.severity,
"message": alert.message,
"timestamp": datetime.now()
}
triggered.append(alert_data)
for handler in self.handlers:
try:
handler(alert_data)
except Exception:
pass
else:
self._triggered.pop(name, None)
return triggered
class LogCollector:
def __init__(self, log_dir: str = "/var/log"):
self.log_dir = log_dir
self.parsers: Dict[str, Callable] = {}
def register_parser(
self,
log_type: str,
parser: Callable
) -> None:
self.parsers[log_type] = parser
def tail_log(
self,
filepath: str,
lines: int = 100
) -> List[str]:
try:
with open(filepath, 'r') as f:
all_lines = f.readlines()
return all_lines[-lines:]
except Exception:
return []
def parse_log_line(
self,
line: str,
log_type: str
) -> Optional[Dict]:
parser = self.parsers.get(log_type)
if parser:
return parser(line)
return {"raw": line}
def search_logs(
self,
filepath: str,
pattern: str,
context_lines: int = 2
) -> List[Dict]:
import re
results = []
try:
with open(filepath, 'r') as f:
lines = f.readlines()
for i, line in enumerate(lines):
if re.search(pattern, line):
start = max(0, i - context_lines)
end = min(len(lines), i + context_lines + 1)
results.append({
"line_number": i + 1,
"line": line.strip(),
"context": [l.strip() for l in lines[start:end]]
})
except Exception:
pass
return results50.4 知识图谱
50.4.1 自动化运维技术架构
┌─────────────────────────────────────────────────────────────────────┐
│ 自动化运维技术架构 │
├─────────────────────────────────────────────────────────────────────┤
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ 监控告警层 │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │Prometheus│ │ Grafana │ │ AlertMgr │ │ 日志系统 │ │ │
│ │ │ 指标采集 │ │ 可视化 │ │ 告警管理 │ │ ELK │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌─────────────────────────────┴───────────────────────────────┐ │
│ │ 编排调度层 │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ Ansible │ │ Terraform│ │ Jenkins │ │ K8s │ │ │
│ │ │ 配置管理 │ │ IaC │ │ CI/CD │ │ 容器编排 │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌─────────────────────────────┴───────────────────────────────┐ │
│ │ 自动化执行层 │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ 脚本执行 │ │ 批量操作 │ │ 任务调度 │ │ 远程执行 │ │ │
│ │ │ Python │ │ Fabric │ │ Celery │ │ SSH │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌─────────────────────────────┴───────────────────────────────┐ │
│ │ 基础设施层 │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ 物理服务器│ │ 虚拟机 │ │ 容器 │ │ 云服务 │ │ │
│ │ │ BareMetal│ │ VM │ │ Docker │ │ AWS/Ali │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ │
│ └─────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────┘50.4.2 自动化运维工作流程
┌─────────────────────────────────────────────────────────────────────┐
│ 自动化运维工作流程 │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────┐ │
│ │ 需求触发 │ ─── 监控告警 / 定时任务 / 手动触发 │
│ └────┬─────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ 任务编排 │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ 任务定义 │ │ 依赖管理 │ │ 并行调度 │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ 目标选择 │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ 主机清单 │ │ 分组过滤 │ │ 条件匹配 │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ 执行操作 │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ 命令执行 │ │ 文件传输 │ │ 配置应用 │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ 结果收集 │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ 输出捕获 │ │ 状态记录 │ │ 日志存储 │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ 反馈处理 │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ 成功确认 │ │ 失败重试 │ │ 告警通知 │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘50.5 技术选型指南
50.5.1 配置管理工具选型
| 工具 | 适用场景 | 学习曲线 | Agent要求 | 推荐指数 |
|---|---|---|---|---|
| Ansible | 通用配置管理 | 低 | 无 | ★★★★★ |
| SaltStack | 大规模环境 | 中 | 需要 | ★★★★☆ |
| Puppet | 企业级环境 | 高 | 需要 | ★★★☆☆ |
| Chef | 复杂基础设施 | 高 | 需要 | ★★★☆☆ |
50.5.2 监控系统选型
| 系统 | 数据类型 | 扩展性 | 学习曲线 | 推荐指数 |
|---|---|---|---|---|
| Prometheus | 时序指标 | 高 | 中 | ★★★★★ |
| Zabbix | 综合 | 中 | 中 | ★★★★☆ |
| Nagios | 状态检查 | 中 | 高 | ★★★☆☆ |
| Datadog | 云原生 | 高 | 低 | ★★★★★ |
50.5.3 日志系统选型
| 系统 | 吞吐量 | 查询能力 | 学习曲线 | 推荐指数 |
|---|---|---|---|---|
| ELK Stack | 高 | 极强 | 高 | ★★★★★ |
| Loki | 高 | 中 | 中 | ★★★★☆ |
| Fluentd | 高 | - | 中 | ★★★★☆ |
50.6 常见问题与解决方案
50.6.1 批量执行优化
python
import asyncio
import paramiko
from typing import List, Dict, Any, Callable
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
@dataclass
class Host:
hostname: str
port: int = 22
username: str = "root"
password: str = None
key_file: str = None
class BatchExecutor:
"""批量执行器"""
def __init__(self, max_workers: int = 10):
self.max_workers = max_workers
def execute_parallel(
self,
hosts: List[Host],
command: str,
timeout: int = 30
) -> Dict[str, Dict]:
"""并行执行命令"""
results = {}
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = {
executor.submit(
self._execute_on_host,
host, command, timeout
): host.hostname for host in hosts
}
for future in futures:
hostname = futures[future]
try:
results[hostname] = future.result()
except Exception as e:
results[hostname] = {
"success": False,
"error": str(e)
}
return results
def _execute_on_host(
self,
host: Host,
command: str,
timeout: int
) -> Dict:
"""在单个主机执行命令"""
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
client.connect(
hostname=host.hostname,
port=host.port,
username=host.username,
password=host.password,
key_filename=host.key_file,
timeout=timeout
)
stdin, stdout, stderr = client.exec_command(command, timeout=timeout)
return {
"success": True,
"stdout": stdout.read().decode(),
"stderr": stderr.read().decode(),
"exit_code": stdout.channel.recv_exit_status()
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
finally:
client.close()
class AsyncBatchExecutor:
"""异步批量执行器"""
def __init__(self, max_concurrent: int = 20):
self.max_concurrent = max_concurrent
self._semaphore = asyncio.Semaphore(max_concurrent)
async def execute_on_hosts(
self,
hosts: List[Host],
command: str
) -> Dict[str, Dict]:
"""异步并行执行"""
tasks = [
self._execute_with_semaphore(host, command)
for host in hosts
]
results = await asyncio.gather(*tasks, return_exceptions=True)
return {
host.hostname: result if not isinstance(result, Exception)
else {"success": False, "error": str(result)}
for host, result in zip(hosts, results)
}
async def _execute_with_semaphore(self, host: Host, command: str):
async with self._semaphore:
return await self._execute_async(host, command)
async def _execute_async(self, host: Host, command: str):
await asyncio.sleep(0.1)
return {"success": True, "output": f"Executed on {host.hostname}"}50.6.2 配置管理最佳实践
python
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, field
import yaml
import json
import os
@dataclass
class Playbook:
"""Playbook定义"""
name: str
hosts: str
tasks: List[Dict]
vars: Dict = field(default_factory=dict)
become: bool = False
class ConfigurationManager:
"""配置管理器"""
def __init__(self):
self._inventory: Dict[str, List[str]] = {}
self._variables: Dict[str, Dict] = {}
self._templates: Dict[str, str] = {}
def load_inventory(self, filepath: str):
"""加载主机清单"""
with open(filepath, 'r') as f:
if filepath.endswith('.yaml') or filepath.endswith('.yml'):
data = yaml.safe_load(f)
else:
data = json.load(f)
self._inventory = data.get('hosts', {})
self._variables = data.get('vars', {})
def get_hosts(self, pattern: str = None) -> List[str]:
"""获取匹配的主机"""
if pattern is None:
return [h for hosts in self._inventory.values() for h in hosts]
if pattern in self._inventory:
return self._inventory[pattern]
return []
def render_template(
self,
template_name: str,
variables: Dict
) -> str:
"""渲染模板"""
from jinja2 import Template
if template_name not in self._templates:
return ""
template = Template(self._templates[template_name])
return template.render(**variables)
class TaskRunner:
"""任务执行器"""
def __init__(self, dry_run: bool = False):
self.dry_run = dry_run
self._results: List[Dict] = []
def run_playbook(self, playbook: Playbook) -> Dict:
"""执行Playbook"""
results = {
"playbook": playbook.name,
"hosts": playbook.hosts,
"tasks": []
}
for task in playbook.tasks:
task_result = self._run_task(task, playbook.vars)
results["tasks"].append(task_result)
if not task_result.get("success") and task.get("ignore_errors", False) == False:
break
return results
def _run_task(self, task: Dict, variables: Dict) -> Dict:
"""执行单个任务"""
task_name = task.get("name", "unnamed task")
if self.dry_run:
return {
"name": task_name,
"success": True,
"dry_run": True,
"message": f"Would execute: {task}"
}
module = task.get("module", "command")
args = task.get("args", {})
try:
if module == "command":
result = self._module_command(args)
elif module == "copy":
result = self._module_copy(args)
elif module == "template":
result = self._module_template(args, variables)
elif module == "service":
result = self._module_service(args)
else:
result = {"success": False, "error": f"Unknown module: {module}"}
return {"name": task_name, **result}
except Exception as e:
return {"name": task_name, "success": False, "error": str(e)}
def _module_command(self, args: Dict) -> Dict:
import subprocess
cmd = args.get("cmd")
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
return {
"success": result.returncode == 0,
"stdout": result.stdout,
"stderr": result.stderr
}
def _module_copy(self, args: Dict) -> Dict:
import shutil
src = args.get("src")
dest = args.get("dest")
shutil.copy(src, dest)
return {"success": True, "message": f"Copied {src} to {dest}"}
def _module_template(self, args: Dict, variables: Dict) -> Dict:
return {"success": True, "message": "Template applied"}
def _module_service(self, args: Dict) -> Dict:
return {"success": True, "message": "Service managed"}
class HealthChecker:
"""健康检查器"""
def __init__(self):
self._checks: Dict[str, Callable] = {}
def register(self, name: str, check_func: Callable):
self._checks[name] = check_func
def run_all(self) -> Dict:
results = {}
for name, check_func in self._checks.items():
try:
result = check_func()
results[name] = {
"status": "healthy" if result else "unhealthy",
"details": result if isinstance(result, dict) else None
}
except Exception as e:
results[name] = {
"status": "error",
"error": str(e)
}
return results50.7 本章小结
本章详细介绍了Python自动化运维的核心概念和实践:
- 自动化脚本:命令执行、服务管理、包管理、文件同步
- 批量操作:主机清单、并行执行、任务编排
- 配置管理:Ansible风格Playbook、模块执行器
- 系统监控:指标收集、告警管理、日志收集
练习题
- 实现一个完整的服务器批量管理工具,支持分组执行
- 开发一个简单的配置管理系统,支持模板渲染
- 实现一个系统监控面板,支持实时指标展示
- 开发一个日志分析工具,支持关键字搜索和告警
- 实现一个定时任务调度系统,支持Cron表达式