Skip to content

第50章 自动化运维

学习目标

完成本章学习后,你将能够:

  1. 实现自动化脚本:系统管理、批量操作、定时任务
  2. 使用配置管理:Ansible、配置模板、环境管理
  3. 实现基础设施即代码:Terraform、资源编排、版本控制
  4. 进行系统监控:资源监控、日志收集、告警通知
  5. 实现自动化部署:CI/CD流水线、滚动更新、回滚机制
  6. 管理服务器集群:批量执行、配置同步、状态管理
  7. 实现故障自愈:健康检查、自动恢复、故障转移
  8. 构建运维平台:Web界面、API服务、权限管理

50.1 自动化脚本

50.1.1 系统管理

python
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any, Callable
from enum import Enum
import subprocess
import asyncio
import platform
import os
import time
from datetime import datetime


class OSType(Enum):
    WINDOWS = "windows"
    LINUX = "linux"
    MACOS = "macos"


@dataclass
class CommandResult:
    command: str
    exit_code: int
    stdout: str
    stderr: str
    execution_time: float
    timestamp: datetime = field(default_factory=datetime.now)

    @property
    def success(self) -> bool:
        return self.exit_code == 0


class SystemInfo:
    @staticmethod
    def get_os_type() -> OSType:
        system = platform.system().lower()
        if system == "windows":
            return OSType.WINDOWS
        elif system == "linux":
            return OSType.LINUX
        elif system == "darwin":
            return OSType.MACOS
        return OSType.LINUX

    @staticmethod
    def get_hostname() -> str:
        return platform.node()

    @staticmethod
    def get_cpu_count() -> int:
        return os.cpu_count() or 1

    @staticmethod
    def get_memory_info() -> Dict[str, float]:
        return {
            "total_gb": 16.0,
            "available_gb": 8.0,
            "used_percent": 50.0
        }

    @staticmethod
    def get_disk_info() -> List[Dict]:
        return [
            {
                "path": "/",
                "total_gb": 500.0,
                "used_gb": 250.0,
                "free_gb": 250.0,
                "used_percent": 50.0
            }
        ]


class CommandExecutor:
    def __init__(self, timeout: int = 300):
        self.timeout = timeout
        self.history: List[CommandResult] = []

    def run(
        self,
        command: str,
        cwd: str = None,
        env: Dict[str, str] = None,
        shell: bool = True
    ) -> CommandResult:
        start_time = time.time()

        try:
            result = subprocess.run(
                command,
                shell=shell,
                cwd=cwd,
                env={**os.environ, **(env or {})},
                capture_output=True,
                text=True,
                timeout=self.timeout
            )

            execution_time = time.time() - start_time

            cmd_result = CommandResult(
                command=command,
                exit_code=result.returncode,
                stdout=result.stdout,
                stderr=result.stderr,
                execution_time=execution_time
            )

        except subprocess.TimeoutExpired:
            execution_time = time.time() - start_time
            cmd_result = CommandResult(
                command=command,
                exit_code=-1,
                stdout="",
                stderr=f"Command timed out after {self.timeout} seconds",
                execution_time=execution_time
            )

        except Exception as e:
            execution_time = time.time() - start_time
            cmd_result = CommandResult(
                command=command,
                exit_code=-1,
                stdout="",
                stderr=str(e),
                execution_time=execution_time
            )

        self.history.append(cmd_result)
        return cmd_result

    async def run_async(
        self,
        command: str,
        cwd: str = None,
        env: Dict[str, str] = None
    ) -> CommandResult:
        start_time = time.time()

        try:
            process = await asyncio.create_subprocess_shell(
                command,
                cwd=cwd,
                env={**os.environ, **(env or {})},
                stdout=asyncio.subprocess.PIPE,
                stderr=asyncio.subprocess.PIPE
            )

            stdout, stderr = await asyncio.wait_for(
                process.communicate(),
                timeout=self.timeout
            )

            execution_time = time.time() - start_time

            return CommandResult(
                command=command,
                exit_code=process.returncode,
                stdout=stdout.decode() if stdout else "",
                stderr=stderr.decode() if stderr else "",
                execution_time=execution_time
            )

        except asyncio.TimeoutError:
            execution_time = time.time() - start_time
            return CommandResult(
                command=command,
                exit_code=-1,
                stdout="",
                stderr=f"Command timed out after {self.timeout} seconds",
                execution_time=execution_time
            )

    def get_history(self, limit: int = 100) -> List[CommandResult]:
        return self.history[-limit:]


class ServiceManager:
    def __init__(self, executor: CommandExecutor = None):
        self.executor = executor or CommandExecutor()
        self.os_type = SystemInfo.get_os_type()

    def start(self, service_name: str) -> CommandResult:
        if self.os_type == OSType.WINDOWS:
            return self.executor.run(f"net start {service_name}")
        else:
            return self.executor.run(f"systemctl start {service_name}")

    def stop(self, service_name: str) -> CommandResult:
        if self.os_type == OSType.WINDOWS:
            return self.executor.run(f"net stop {service_name}")
        else:
            return self.executor.run(f"systemctl stop {service_name}")

    def restart(self, service_name: str) -> CommandResult:
        if self.os_type == OSType.WINDOWS:
            self.stop(service_name)
            return self.start(service_name)
        else:
            return self.executor.run(f"systemctl restart {service_name}")

    def status(self, service_name: str) -> Dict:
        if self.os_type == OSType.WINDOWS:
            result = self.executor.run(f"sc query {service_name}")
            return {
                "running": "RUNNING" in result.stdout,
                "status": "running" if "RUNNING" in result.stdout else "stopped"
            }
        else:
            result = self.executor.run(f"systemctl is-active {service_name}")
            return {
                "running": result.stdout.strip() == "active",
                "status": result.stdout.strip()
            }

    def enable(self, service_name: str) -> CommandResult:
        if self.os_type == OSType.WINDOWS:
            return self.executor.run(f"sc config {service_name} start= auto")
        else:
            return self.executor.run(f"systemctl enable {service_name}")

    def disable(self, service_name: str) -> CommandResult:
        if self.os_type == OSType.WINDOWS:
            return self.executor.run(f"sc config {service_name} start= disabled")
        else:
            return self.executor.run(f"systemctl disable {service_name}")


class PackageManager:
    def __init__(self, executor: CommandExecutor = None):
        self.executor = executor or CommandExecutor()
        self.os_type = SystemInfo.get_os_type()

    def _get_package_manager(self) -> str:
        if self.os_type == OSType.WINDOWS:
            return "choco"
        elif self.os_type == OSType.MACOS:
            return "brew"
        else:
            if os.path.exists("/usr/bin/apt"):
                return "apt"
            elif os.path.exists("/usr/bin/yum"):
                return "yum"
            elif os.path.exists("/usr/bin/dnf"):
                return "dnf"
            return "apt"

    def install(self, package: str) -> CommandResult:
        pm = self._get_package_manager()
        if pm == "apt":
            return self.executor.run(f"apt install -y {package}")
        elif pm == "yum":
            return self.executor.run(f"yum install -y {package}")
        elif pm == "dnf":
            return self.executor.run(f"dnf install -y {package}")
        elif pm == "brew":
            return self.executor.run(f"brew install {package}")
        elif pm == "choco":
            return self.executor.run(f"choco install -y {package}")
        return CommandResult(package, -1, "", "Unknown package manager", 0)

    def remove(self, package: str) -> CommandResult:
        pm = self._get_package_manager()
        if pm == "apt":
            return self.executor.run(f"apt remove -y {package}")
        elif pm in ["yum", "dnf"]:
            return self.executor.run(f"{pm} remove -y {package}")
        elif pm == "brew":
            return self.executor.run(f"brew uninstall {package}")
        elif pm == "choco":
            return self.executor.run(f"choco uninstall -y {package}")
        return CommandResult(package, -1, "", "Unknown package manager", 0)

    def update(self) -> CommandResult:
        pm = self._get_package_manager()
        if pm == "apt":
            return self.executor.run("apt update && apt upgrade -y")
        elif pm in ["yum", "dnf"]:
            return self.executor.run(f"{pm} update -y")
        elif pm == "brew":
            return self.executor.run("brew update && brew upgrade")
        elif pm == "choco":
            return self.executor.run("choco upgrade all -y")
        return CommandResult("update", -1, "", "Unknown package manager", 0)


class FileSync:
    def __init__(self, executor: CommandExecutor = None):
        self.executor = executor or CommandExecutor()

    def copy(
        self,
        src: str,
        dst: str,
        recursive: bool = True
    ) -> CommandResult:
        if SystemInfo.get_os_type() == OSType.WINDOWS:
            flags = "/E /I" if recursive else ""
            return self.executor.run(f'xcopy "{src}" "{dst}" {flags}')
        else:
            flags = "-r" if recursive else ""
            return self.executor.run(f"cp {flags} {src} {dst}")

    def move(self, src: str, dst: str) -> CommandResult:
        return self.executor.run(f"mv {src} {dst}")

    def delete(self, path: str, recursive: bool = False) -> CommandResult:
        if SystemInfo.get_os_type() == OSType.WINDOWS:
            if recursive:
                return self.executor.run(f'rmdir /s /q "{path}"')
            return self.executor.run(f'del "{path}"')
        else:
            flags = "-rf" if recursive else "-f"
            return self.executor.run(f"rm {flags} {path}")

    def sync_directories(
        self,
        src: str,
        dst: str,
        delete: bool = False
    ) -> CommandResult:
        if SystemInfo.get_os_type() == OSType.WINDOWS:
            flags = "/MIR" if delete else "/E"
            return self.executor.run(f'robocopy "{src}" "{dst}" {flags}')
        else:
            flags = "-av --delete" if delete else "-av"
            return self.executor.run(f"rsync {flags} {src}/ {dst}/")

50.1.2 批量操作

python
from typing import List, Dict, Optional, Any
from dataclasses import dataclass
import asyncio


@dataclass
class Host:
    hostname: str
    ip: str
    port: int = 22
    username: str = "root"
    password: str = ""
    key_file: str = ""
    tags: List[str] = field(default_factory=list)
    groups: List[str] = field(default_factory=list)


@dataclass
class TaskResult:
    host: str
    success: bool
    output: str
    error: str
    duration: float


class HostInventory:
    def __init__(self):
        self.hosts: Dict[str, Host] = {}

    def add_host(self, host: Host) -> None:
        self.hosts[host.hostname] = host

    def remove_host(self, hostname: str) -> None:
        self.hosts.pop(hostname, None)

    def get_host(self, hostname: str) -> Optional[Host]:
        return self.hosts.get(hostname)

    def get_hosts_by_tag(self, tag: str) -> List[Host]:
        return [h for h in self.hosts.values() if tag in h.tags]

    def get_hosts_by_group(self, group: str) -> List[Host]:
        return [h for h in self.hosts.values() if group in h.groups]

    def load_from_file(self, filepath: str) -> None:
        import yaml
        with open(filepath, 'r') as f:
            data = yaml.safe_load(f)

        for hostname, config in data.get('hosts', {}).items():
            host = Host(
                hostname=hostname,
                ip=config.get('ip', ''),
                port=config.get('port', 22),
                username=config.get('username', 'root'),
                tags=config.get('tags', []),
                groups=config.get('groups', [])
            )
            self.add_host(host)


class BatchExecutor:
    def __init__(
        self,
        inventory: HostInventory,
        max_workers: int = 10
    ):
        self.inventory = inventory
        self.max_workers = max_workers

    async def execute_on_host(
        self,
        host: Host,
        command: str
    ) -> TaskResult:
        start_time = time.time()

        try:
            executor = CommandExecutor()
            result = executor.run(command)

            return TaskResult(
                host=host.hostname,
                success=result.success,
                output=result.stdout,
                error=result.stderr,
                duration=time.time() - start_time
            )

        except Exception as e:
            return TaskResult(
                host=host.hostname,
                success=False,
                output="",
                error=str(e),
                duration=time.time() - start_time
            )

    async def execute_on_hosts(
        self,
        hosts: List[Host],
        command: str
    ) -> List[TaskResult]:
        tasks = [
            self.execute_on_host(host, command)
            for host in hosts
        ]

        return await asyncio.gather(*tasks)

    def execute_on_group(
        self,
        group: str,
        command: str
    ) -> List[TaskResult]:
        hosts = self.inventory.get_hosts_by_group(group)
        return asyncio.run(self.execute_on_hosts(hosts, command))

    def execute_on_tag(
        self,
        tag: str,
        command: str
    ) -> List[TaskResult]:
        hosts = self.inventory.get_hosts_by_tag(tag)
        return asyncio.run(self.execute_on_hosts(hosts, command))


class TaskRunner:
    def __init__(self, inventory: HostInventory):
        self.inventory = inventory
        self.tasks: List[Dict] = []

    def add_task(
        self,
        name: str,
        hosts: List[str],
        commands: List[str],
        when: str = None
    ) -> None:
        self.tasks.append({
            "name": name,
            "hosts": hosts,
            "commands": commands,
            "when": when
        })

    def run(self) -> Dict[str, List[TaskResult]]:
        results = {}

        for task in self.tasks:
            task_results = []
            hosts = [
                self.inventory.get_host(h)
                for h in task["hosts"]
            ]
            hosts = [h for h in hosts if h]

            for command in task["commands"]:
                executor = BatchExecutor(self.inventory)
                task_results.extend(
                    asyncio.run(executor.execute_on_hosts(hosts, command))
                )

            results[task["name"]] = task_results

        return results


class ScheduledTask:
    def __init__(
        self,
        name: str,
        command: str,
        schedule: str,
        enabled: bool = True
    ):
        self.name = name
        self.command = command
        self.schedule = schedule
        self.enabled = enabled
        self.last_run: Optional[datetime] = None
        self.next_run: Optional[datetime] = None
        self.run_count: int = 0

    def should_run(self, now: datetime) -> bool:
        if not self.enabled:
            return False
        return True


class TaskScheduler:
    def __init__(self):
        self.tasks: Dict[str, ScheduledTask] = {}
        self.running = False

    def add_task(self, task: ScheduledTask) -> None:
        self.tasks[task.name] = task

    def remove_task(self, name: str) -> None:
        self.tasks.pop(name, None)

    def enable_task(self, name: str) -> None:
        if name in self.tasks:
            self.tasks[name].enabled = True

    def disable_task(self, name: str) -> None:
        if name in self.tasks:
            self.tasks[name].enabled = False

    async def run_task(self, name: str) -> CommandResult:
        task = self.tasks.get(name)
        if not task:
            return CommandResult("", -1, "", "Task not found", 0)

        executor = CommandExecutor()
        result = executor.run(task.command)

        task.last_run = datetime.now()
        task.run_count += 1

        return result

    async def start(self) -> None:
        self.running = True

        while self.running:
            now = datetime.now()

            for task in self.tasks.values():
                if task.should_run(now):
                    await self.run_task(task.name)

            await asyncio.sleep(60)

    def stop(self) -> None:
        self.running = False

50.2 配置管理

50.2.1 Ansible风格配置

python
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any
from enum import Enum
import json
import yaml


class TaskState(Enum):
    PRESENT = "present"
    ABSENT = "absent"
    LATEST = "latest"


@dataclass
class AnsibleTask:
    name: str
    module: str
    args: Dict[str, Any]
    when: str = None
    notify: List[str] = field(default_factory=list)
    register: str = None
    become: bool = False
    ignore_errors: bool = False


@dataclass
class AnsibleHandler:
    name: str
    tasks: List[AnsibleTask] = field(default_factory=list)


@dataclass
class AnsiblePlay:
    name: str
    hosts: List[str]
    tasks: List[AnsibleTask] = field(default_factory=list)
    handlers: List[AnsibleHandler] = field(default_factory=list)
    vars: Dict[str, Any] = field(default_factory=dict)
    become: bool = False


class AnsiblePlaybook:
    def __init__(self, name: str):
        self.name = name
        self.plays: List[AnsiblePlay] = []

    def add_play(self, play: AnsiblePlay) -> None:
        self.plays.append(play)

    def to_yaml(self) -> str:
        playbook = []

        for play in self.plays:
            play_dict = {
                "name": play.name,
                "hosts": ",".join(play.hosts),
                "vars": play.vars,
                "become": play.become,
                "tasks": []
            }

            for task in play.tasks:
                task_dict = {
                    "name": task.name,
                    task.module: task.args
                }

                if task.when:
                    task_dict["when"] = task.when
                if task.notify:
                    task_dict["notify"] = task.notify
                if task.register:
                    task_dict["register"] = task.register
                if task.become:
                    task_dict["become"] = task.become
                if task.ignore_errors:
                    task_dict["ignore_errors"] = task.ignore_errors

                play_dict["tasks"].append(task_dict)

            if play.handlers:
                play_dict["handlers"] = []
                for handler in play.handlers:
                    handler_dict = {
                        "name": handler.name,
                        "tasks": []
                    }
                    for task in handler.tasks:
                        handler_dict["tasks"].append({
                            "name": task.name,
                            task.module: task.args
                        })
                    play_dict["handlers"].append(handler_dict)

            playbook.append(play_dict)

        return yaml.dump(playbook, default_flow_style=False)

    @classmethod
    def from_yaml(cls, yaml_content: str) -> "AnsiblePlaybook":
        data = yaml.safe_load(yaml_content)

        playbook = cls("imported")
        playbook.name = "imported_playbook"

        for play_data in data:
            play = AnsiblePlay(
                name=play_data.get("name", ""),
                hosts=play_data.get("hosts", "").split(","),
                vars=play_data.get("vars", {}),
                become=play_data.get("become", False)
            )

            for task_data in play_data.get("tasks", []):
                module = None
                args = {}

                for key, value in task_data.items():
                    if key not in ["name", "when", "notify", "register", "become", "ignore_errors"]:
                        module = key
                        args = value
                        break

                if module:
                    task = AnsibleTask(
                        name=task_data.get("name", ""),
                        module=module,
                        args=args,
                        when=task_data.get("when"),
                        notify=task_data.get("notify", []),
                        register=task_data.get("register"),
                        become=task_data.get("become", False),
                        ignore_errors=task_data.get("ignore_errors", False)
                    )
                    play.tasks.append(task)

            playbook.plays.append(play)

        return playbook


class ModuleExecutor:
    def __init__(self, executor: CommandExecutor = None):
        self.executor = executor or CommandExecutor()

    def apt(
        self,
        name: str,
        state: TaskState = TaskState.PRESENT,
        update_cache: bool = False
    ) -> CommandResult:
        if update_cache:
            self.executor.run("apt update")

        if state == TaskState.PRESENT:
            return self.executor.run(f"apt install -y {name}")
        elif state == TaskState.ABSENT:
            return self.executor.run(f"apt remove -y {name}")
        elif state == TaskState.LATEST:
            return self.executor.run(f"apt install -y --only-upgrade {name}")

        return CommandResult(name, -1, "", "Unknown state", 0)

    def yum(
        self,
        name: str,
        state: TaskState = TaskState.PRESENT
    ) -> CommandResult:
        if state == TaskState.PRESENT:
            return self.executor.run(f"yum install -y {name}")
        elif state == TaskState.ABSENT:
            return self.executor.run(f"yum remove -y {name}")
        elif state == TaskState.LATEST:
            return self.executor.run(f"yum update -y {name}")

        return CommandResult(name, -1, "", "Unknown state", 0)

    def pip(
        self,
        name: str,
        state: TaskState = TaskState.PRESENT,
        version: str = None
    ) -> CommandResult:
        if version:
            name = f"{name}=={version}"

        if state == TaskState.PRESENT:
            return self.executor.run(f"pip install {name}")
        elif state == TaskState.ABSENT:
            return self.executor.run(f"pip uninstall -y {name}")
        elif state == TaskState.LATEST:
            return self.executor.run(f"pip install --upgrade {name}")

        return CommandResult(name, -1, "", "Unknown state", 0)

    def file(
        self,
        path: str,
        state: TaskState = TaskState.PRESENT,
        mode: str = None,
        owner: str = None,
        group: str = None
    ) -> CommandResult:
        if state == TaskState.ABSENT:
            return self.executor.run(f"rm -rf {path}")

        if state == TaskState.PRESENT:
            self.executor.run(f"mkdir -p {path}")

            if mode:
                self.executor.run(f"chmod {mode} {path}")
            if owner:
                self.executor.run(f"chown {owner} {path}")
            if group:
                self.executor.run(f"chgrp {group} {path}")

        return CommandResult(path, 0, "", "", 0)

    def template(
        self,
        src: str,
        dest: str,
        variables: Dict[str, Any] = None
    ) -> CommandResult:
        with open(src, 'r') as f:
            content = f.read()

        if variables:
            for key, value in variables.items():
                content = content.replace(f"{{{{ {key} }}}}", str(value))

        with open(dest, 'w') as f:
            f.write(content)

        return CommandResult(dest, 0, "", "", 0)

    def copy(
        self,
        src: str,
        dest: str,
        mode: str = None
    ) -> CommandResult:
        result = self.executor.run(f"cp {src} {dest}")

        if mode and result.success:
            self.executor.run(f"chmod {mode} {dest}")

        return result

    def service(
        self,
        name: str,
        state: str = "started",
        enabled: bool = None
    ) -> CommandResult:
        manager = ServiceManager(self.executor)

        if enabled is not None:
            if enabled:
                manager.enable(name)
            else:
                manager.disable(name)

        if state == "started":
            return manager.start(name)
        elif state == "stopped":
            return manager.stop(name)
        elif state == "restarted":
            return manager.restart(name)

        return CommandResult(name, 0, "", "", 0)

    def command(self, cmd: str) -> CommandResult:
        return self.executor.run(cmd)

    def shell(self, cmd: str) -> CommandResult:
        return self.executor.run(cmd)


class PlaybookRunner:
    def __init__(self, inventory: HostInventory):
        self.inventory = inventory
        self.module_executor = ModuleExecutor()
        self.results: List[Dict] = []

    def run(self, playbook: AnsiblePlaybook) -> List[Dict]:
        self.results = []

        for play in playbook.plays:
            for task in play.tasks:
                result = self._execute_task(task, play.vars)
                self.results.append(result)

        return self.results

    def _execute_task(
        self,
        task: AnsibleTask,
        variables: Dict[str, Any]
    ) -> Dict:
        start_time = time.time()

        try:
            module = task.module
            args = task.args

            if module == "apt":
                result = self.module_executor.apt(
                    name=args.get("name"),
                    state=TaskState(args.get("state", "present")),
                    update_cache=args.get("update_cache", False)
                )
            elif module == "pip":
                result = self.module_executor.pip(
                    name=args.get("name"),
                    state=TaskState(args.get("state", "present")),
                    version=args.get("version")
                )
            elif module == "file":
                result = self.module_executor.file(
                    path=args.get("path"),
                    state=TaskState(args.get("state", "present")),
                    mode=args.get("mode"),
                    owner=args.get("owner"),
                    group=args.get("group")
                )
            elif module == "template":
                result = self.module_executor.template(
                    src=args.get("src"),
                    dest=args.get("dest"),
                    variables=variables
                )
            elif module == "copy":
                result = self.module_executor.copy(
                    src=args.get("src"),
                    dest=args.get("dest"),
                    mode=args.get("mode")
                )
            elif module == "service":
                result = self.module_executor.service(
                    name=args.get("name"),
                    state=args.get("state", "started"),
                    enabled=args.get("enabled")
                )
            elif module in ["command", "shell"]:
                result = self.module_executor.command(args.get("cmd", args.get("_raw_params", "")))
            else:
                result = CommandResult(task.name, -1, "", f"Unknown module: {module}", 0)

            return {
                "task": task.name,
                "success": result.success,
                "output": result.stdout,
                "error": result.stderr,
                "duration": time.time() - start_time
            }

        except Exception as e:
            return {
                "task": task.name,
                "success": False,
                "output": "",
                "error": str(e),
                "duration": time.time() - start_time
            }

50.3 系统监控

50.3.1 资源监控

python
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Callable
from datetime import datetime
import threading
import time


@dataclass
class MetricPoint:
    timestamp: datetime
    value: float
    tags: Dict[str, str] = field(default_factory=dict)


@dataclass
class MetricSeries:
    name: str
    points: List[MetricPoint] = field(default_factory=list)
    tags: Dict[str, str] = field(default_factory=dict)

    def add_point(self, value: float, tags: Dict[str, str] = None) -> None:
        point = MetricPoint(
            timestamp=datetime.now(),
            value=value,
            tags=tags or {}
        )
        self.points.append(point)

    def get_latest(self) -> Optional[MetricPoint]:
        return self.points[-1] if self.points else None

    def get_average(self, window_seconds: int = 60) -> float:
        if not self.points:
            return 0.0

        now = datetime.now()
        cutoff = now.timestamp() - window_seconds

        recent_points = [
            p for p in self.points
            if p.timestamp.timestamp() >= cutoff
        ]

        if not recent_points:
            return 0.0

        return sum(p.value for p in recent_points) / len(recent_points)


class MetricsCollector:
    def __init__(self):
        self.metrics: Dict[str, MetricSeries] = {}
        self.collectors: Dict[str, Callable] = {}
        self._running = False
        self._thread: Optional[threading.Thread] = None

    def register_collector(
        self,
        name: str,
        collector: Callable[[], float],
        tags: Dict[str, str] = None
    ) -> None:
        self.collectors[name] = collector
        if name not in self.metrics:
            self.metrics[name] = MetricSeries(name=name, tags=tags or {})

    def unregister_collector(self, name: str) -> None:
        self.collectors.pop(name, None)

    def collect(self) -> Dict[str, float]:
        values = {}

        for name, collector in self.collectors.items():
            try:
                value = collector()
                values[name] = value

                if name in self.metrics:
                    self.metrics[name].add_point(value)

            except Exception as e:
                values[name] = None

        return values

    def start(self, interval: int = 60) -> None:
        self._running = True

        def _collect_loop():
            while self._running:
                self.collect()
                time.sleep(interval)

        self._thread = threading.Thread(target=_collect_loop, daemon=True)
        self._thread.start()

    def stop(self) -> None:
        self._running = False
        if self._thread:
            self._thread.join(timeout=5)


class SystemMonitor:
    def __init__(self):
        self.collector = MetricsCollector()
        self._setup_default_collectors()

    def _setup_default_collectors(self) -> None:
        self.collector.register_collector(
            "cpu_percent",
            self._get_cpu_percent
        )
        self.collector.register_collector(
            "memory_percent",
            self._get_memory_percent
        )
        self.collector.register_collector(
            "disk_percent",
            self._get_disk_percent
        )

    def _get_cpu_percent(self) -> float:
        return 50.0

    def _get_memory_percent(self) -> float:
        return 60.0

    def _get_disk_percent(self) -> float:
        return 40.0

    def start(self, interval: int = 60) -> None:
        self.collector.start(interval)

    def stop(self) -> None:
        self.collector.stop()

    def get_current_metrics(self) -> Dict[str, float]:
        return self.collector.collect()

    def get_metric_history(
        self,
        name: str,
        window_seconds: int = 3600
    ) -> List[MetricPoint]:
        if name not in self.collector.metrics:
            return []

        series = self.collector.metrics[name]
        now = datetime.now()
        cutoff = now.timestamp() - window_seconds

        return [
            p for p in series.points
            if p.timestamp.timestamp() >= cutoff
        ]


@dataclass
class Alert:
    name: str
    condition: str
    threshold: float
    severity: str
    message: str
    enabled: bool = True


class AlertManager:
    def __init__(self, monitor: SystemMonitor):
        self.monitor = monitor
        self.alerts: Dict[str, Alert] = {}
        self.handlers: List[Callable] = []
        self._triggered: Dict[str, datetime] = {}

    def add_alert(self, alert: Alert) -> None:
        self.alerts[alert.name] = alert

    def remove_alert(self, name: str) -> None:
        self.alerts.pop(name, None)

    def add_handler(self, handler: Callable) -> None:
        self.handlers.append(handler)

    def check(self) -> List[Dict]:
        triggered = []
        metrics = self.monitor.get_current_metrics()

        for name, alert in self.alerts.items():
            if not alert.enabled:
                continue

            metric_value = metrics.get(alert.condition)

            if metric_value is None:
                continue

            is_triggered = False

            if alert.severity == "critical":
                is_triggered = metric_value >= alert.threshold
            elif alert.severity == "warning":
                is_triggered = metric_value >= alert.threshold

            if is_triggered:
                if name not in self._triggered:
                    self._triggered[name] = datetime.now()

                    alert_data = {
                        "name": alert.name,
                        "condition": alert.condition,
                        "value": metric_value,
                        "threshold": alert.threshold,
                        "severity": alert.severity,
                        "message": alert.message,
                        "timestamp": datetime.now()
                    }

                    triggered.append(alert_data)

                    for handler in self.handlers:
                        try:
                            handler(alert_data)
                        except Exception:
                            pass
            else:
                self._triggered.pop(name, None)

        return triggered


class LogCollector:
    def __init__(self, log_dir: str = "/var/log"):
        self.log_dir = log_dir
        self.parsers: Dict[str, Callable] = {}

    def register_parser(
        self,
        log_type: str,
        parser: Callable
    ) -> None:
        self.parsers[log_type] = parser

    def tail_log(
        self,
        filepath: str,
        lines: int = 100
    ) -> List[str]:
        try:
            with open(filepath, 'r') as f:
                all_lines = f.readlines()
                return all_lines[-lines:]
        except Exception:
            return []

    def parse_log_line(
        self,
        line: str,
        log_type: str
    ) -> Optional[Dict]:
        parser = self.parsers.get(log_type)
        if parser:
            return parser(line)
        return {"raw": line}

    def search_logs(
        self,
        filepath: str,
        pattern: str,
        context_lines: int = 2
    ) -> List[Dict]:
        import re
        results = []

        try:
            with open(filepath, 'r') as f:
                lines = f.readlines()

            for i, line in enumerate(lines):
                if re.search(pattern, line):
                    start = max(0, i - context_lines)
                    end = min(len(lines), i + context_lines + 1)

                    results.append({
                        "line_number": i + 1,
                        "line": line.strip(),
                        "context": [l.strip() for l in lines[start:end]]
                    })

        except Exception:
            pass

        return results

50.4 知识图谱

50.4.1 自动化运维技术架构

┌─────────────────────────────────────────────────────────────────────┐
│                      自动化运维技术架构                               │
├─────────────────────────────────────────────────────────────────────┤
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │                      监控告警层                               │   │
│  │  ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐       │   │
│  │  │Prometheus│ │ Grafana  │ │ AlertMgr │ │ 日志系统  │       │   │
│  │  │ 指标采集  │ │ 可视化   │ │ 告警管理  │ │ ELK    │       │   │
│  │  └──────────┘ └──────────┘ └──────────┘ └──────────┘       │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                │                                    │
│  ┌─────────────────────────────┴───────────────────────────────┐   │
│  │                      编排调度层                               │   │
│  │  ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐       │   │
│  │  │ Ansible │ │ Terraform│ │ Jenkins  │ │ K8s     │       │   │
│  │  │ 配置管理  │ │ IaC     │ │ CI/CD   │ │ 容器编排  │       │   │
│  │  └──────────┘ └──────────┘ └──────────┘ └──────────┘       │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                │                                    │
│  ┌─────────────────────────────┴───────────────────────────────┐   │
│  │                      自动化执行层                             │   │
│  │  ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐       │   │
│  │  │ 脚本执行  │ │ 批量操作  │ │ 任务调度  │ │ 远程执行  │       │   │
│  │  │ Python  │ │ Fabric  │ │ Celery  │ │ SSH     │       │   │
│  │  └──────────┘ └──────────┘ └──────────┘ └──────────┘       │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                │                                    │
│  ┌─────────────────────────────┴───────────────────────────────┐   │
│  │                      基础设施层                               │   │
│  │  ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐       │   │
│  │  │ 物理服务器│ │ 虚拟机   │ │ 容器     │ │ 云服务   │       │   │
│  │  │ BareMetal│ │ VM      │ │ Docker  │ │ AWS/Ali │       │   │
│  │  └──────────┘ └──────────┘ └──────────┘ └──────────┘       │   │
│  └─────────────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────────────┘

50.4.2 自动化运维工作流程

┌─────────────────────────────────────────────────────────────────────┐
│                      自动化运维工作流程                               │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│   ┌──────────┐                                                      │
│   │ 需求触发  │ ─── 监控告警 / 定时任务 / 手动触发                   │
│   └────┬─────┘                                                      │
│        │                                                            │
│        ▼                                                            │
│   ┌──────────────────────────────────────────────────────────┐     │
│   │                    任务编排                               │     │
│   │  ┌──────────┐ ┌──────────┐ ┌──────────┐                 │     │
│   │  │ 任务定义  │ │ 依赖管理  │ │ 并行调度  │                 │     │
│   │  └──────────┘ └──────────┘ └──────────┘                 │     │
│   └──────────────────────────────────────────────────────────┘     │
│        │                                                            │
│        ▼                                                            │
│   ┌──────────────────────────────────────────────────────────┐     │
│   │                    目标选择                               │     │
│   │  ┌──────────┐ ┌──────────┐ ┌──────────┐                 │     │
│   │  │ 主机清单  │ │ 分组过滤  │ │ 条件匹配  │                 │     │
│   │  └──────────┘ └──────────┘ └──────────┘                 │     │
│   └──────────────────────────────────────────────────────────┘     │
│        │                                                            │
│        ▼                                                            │
│   ┌──────────────────────────────────────────────────────────┐     │
│   │                    执行操作                               │     │
│   │  ┌──────────┐ ┌──────────┐ ┌──────────┐                 │     │
│   │  │ 命令执行  │ │ 文件传输  │ │ 配置应用  │                 │     │
│   │  └──────────┘ └──────────┘ └──────────┘                 │     │
│   └──────────────────────────────────────────────────────────┘     │
│        │                                                            │
│        ▼                                                            │
│   ┌──────────────────────────────────────────────────────────┐     │
│   │                    结果收集                               │     │
│   │  ┌──────────┐ ┌──────────┐ ┌──────────┐                 │     │
│   │  │ 输出捕获  │ │ 状态记录  │ │ 日志存储  │                 │     │
│   │  └──────────┘ └──────────┘ └──────────┘                 │     │
│   └──────────────────────────────────────────────────────────┘     │
│        │                                                            │
│        ▼                                                            │
│   ┌──────────────────────────────────────────────────────────┐     │
│   │                    反馈处理                               │     │
│   │  ┌──────────┐ ┌──────────┐ ┌──────────┐                 │     │
│   │  │ 成功确认  │ │ 失败重试  │ │ 告警通知  │                 │     │
│   │  └──────────┘ └──────────┘ └──────────┘                 │     │
│   └──────────────────────────────────────────────────────────┘     │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

50.5 技术选型指南

50.5.1 配置管理工具选型

工具适用场景学习曲线Agent要求推荐指数
Ansible通用配置管理★★★★★
SaltStack大规模环境需要★★★★☆
Puppet企业级环境需要★★★☆☆
Chef复杂基础设施需要★★★☆☆

50.5.2 监控系统选型

系统数据类型扩展性学习曲线推荐指数
Prometheus时序指标★★★★★
Zabbix综合★★★★☆
Nagios状态检查★★★☆☆
Datadog云原生★★★★★

50.5.3 日志系统选型

系统吞吐量查询能力学习曲线推荐指数
ELK Stack极强★★★★★
Loki★★★★☆
Fluentd-★★★★☆

50.6 常见问题与解决方案

50.6.1 批量执行优化

python
import asyncio
import paramiko
from typing import List, Dict, Any, Callable
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass

@dataclass
class Host:
    hostname: str
    port: int = 22
    username: str = "root"
    password: str = None
    key_file: str = None


class BatchExecutor:
    """批量执行器"""
    
    def __init__(self, max_workers: int = 10):
        self.max_workers = max_workers
    
    def execute_parallel(
        self,
        hosts: List[Host],
        command: str,
        timeout: int = 30
    ) -> Dict[str, Dict]:
        """并行执行命令"""
        results = {}
        
        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            futures = {
                executor.submit(
                    self._execute_on_host,
                    host, command, timeout
                ): host.hostname for host in hosts
            }
            
            for future in futures:
                hostname = futures[future]
                try:
                    results[hostname] = future.result()
                except Exception as e:
                    results[hostname] = {
                        "success": False,
                        "error": str(e)
                    }
        
        return results
    
    def _execute_on_host(
        self,
        host: Host,
        command: str,
        timeout: int
    ) -> Dict:
        """在单个主机执行命令"""
        client = paramiko.SSHClient()
        client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        
        try:
            client.connect(
                hostname=host.hostname,
                port=host.port,
                username=host.username,
                password=host.password,
                key_filename=host.key_file,
                timeout=timeout
            )
            
            stdin, stdout, stderr = client.exec_command(command, timeout=timeout)
            
            return {
                "success": True,
                "stdout": stdout.read().decode(),
                "stderr": stderr.read().decode(),
                "exit_code": stdout.channel.recv_exit_status()
            }
        except Exception as e:
            return {
                "success": False,
                "error": str(e)
            }
        finally:
            client.close()


class AsyncBatchExecutor:
    """异步批量执行器"""
    
    def __init__(self, max_concurrent: int = 20):
        self.max_concurrent = max_concurrent
        self._semaphore = asyncio.Semaphore(max_concurrent)
    
    async def execute_on_hosts(
        self,
        hosts: List[Host],
        command: str
    ) -> Dict[str, Dict]:
        """异步并行执行"""
        tasks = [
            self._execute_with_semaphore(host, command)
            for host in hosts
        ]
        
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        return {
            host.hostname: result if not isinstance(result, Exception) 
            else {"success": False, "error": str(result)}
            for host, result in zip(hosts, results)
        }
    
    async def _execute_with_semaphore(self, host: Host, command: str):
        async with self._semaphore:
            return await self._execute_async(host, command)
    
    async def _execute_async(self, host: Host, command: str):
        await asyncio.sleep(0.1)
        return {"success": True, "output": f"Executed on {host.hostname}"}

50.6.2 配置管理最佳实践

python
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, field
import yaml
import json
import os

@dataclass
class Playbook:
    """Playbook定义"""
    name: str
    hosts: str
    tasks: List[Dict]
    vars: Dict = field(default_factory=dict)
    become: bool = False


class ConfigurationManager:
    """配置管理器"""
    
    def __init__(self):
        self._inventory: Dict[str, List[str]] = {}
        self._variables: Dict[str, Dict] = {}
        self._templates: Dict[str, str] = {}
    
    def load_inventory(self, filepath: str):
        """加载主机清单"""
        with open(filepath, 'r') as f:
            if filepath.endswith('.yaml') or filepath.endswith('.yml'):
                data = yaml.safe_load(f)
            else:
                data = json.load(f)
        
        self._inventory = data.get('hosts', {})
        self._variables = data.get('vars', {})
    
    def get_hosts(self, pattern: str = None) -> List[str]:
        """获取匹配的主机"""
        if pattern is None:
            return [h for hosts in self._inventory.values() for h in hosts]
        
        if pattern in self._inventory:
            return self._inventory[pattern]
        
        return []
    
    def render_template(
        self,
        template_name: str,
        variables: Dict
    ) -> str:
        """渲染模板"""
        from jinja2 import Template
        
        if template_name not in self._templates:
            return ""
        
        template = Template(self._templates[template_name])
        return template.render(**variables)


class TaskRunner:
    """任务执行器"""
    
    def __init__(self, dry_run: bool = False):
        self.dry_run = dry_run
        self._results: List[Dict] = []
    
    def run_playbook(self, playbook: Playbook) -> Dict:
        """执行Playbook"""
        results = {
            "playbook": playbook.name,
            "hosts": playbook.hosts,
            "tasks": []
        }
        
        for task in playbook.tasks:
            task_result = self._run_task(task, playbook.vars)
            results["tasks"].append(task_result)
            
            if not task_result.get("success") and task.get("ignore_errors", False) == False:
                break
        
        return results
    
    def _run_task(self, task: Dict, variables: Dict) -> Dict:
        """执行单个任务"""
        task_name = task.get("name", "unnamed task")
        
        if self.dry_run:
            return {
                "name": task_name,
                "success": True,
                "dry_run": True,
                "message": f"Would execute: {task}"
            }
        
        module = task.get("module", "command")
        args = task.get("args", {})
        
        try:
            if module == "command":
                result = self._module_command(args)
            elif module == "copy":
                result = self._module_copy(args)
            elif module == "template":
                result = self._module_template(args, variables)
            elif module == "service":
                result = self._module_service(args)
            else:
                result = {"success": False, "error": f"Unknown module: {module}"}
            
            return {"name": task_name, **result}
        except Exception as e:
            return {"name": task_name, "success": False, "error": str(e)}
    
    def _module_command(self, args: Dict) -> Dict:
        import subprocess
        cmd = args.get("cmd")
        result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
        return {
            "success": result.returncode == 0,
            "stdout": result.stdout,
            "stderr": result.stderr
        }
    
    def _module_copy(self, args: Dict) -> Dict:
        import shutil
        src = args.get("src")
        dest = args.get("dest")
        shutil.copy(src, dest)
        return {"success": True, "message": f"Copied {src} to {dest}"}
    
    def _module_template(self, args: Dict, variables: Dict) -> Dict:
        return {"success": True, "message": "Template applied"}
    
    def _module_service(self, args: Dict) -> Dict:
        return {"success": True, "message": "Service managed"}


class HealthChecker:
    """健康检查器"""
    
    def __init__(self):
        self._checks: Dict[str, Callable] = {}
    
    def register(self, name: str, check_func: Callable):
        self._checks[name] = check_func
    
    def run_all(self) -> Dict:
        results = {}
        for name, check_func in self._checks.items():
            try:
                result = check_func()
                results[name] = {
                    "status": "healthy" if result else "unhealthy",
                    "details": result if isinstance(result, dict) else None
                }
            except Exception as e:
                results[name] = {
                    "status": "error",
                    "error": str(e)
                }
        return results

50.7 本章小结

本章详细介绍了Python自动化运维的核心概念和实践:

  1. 自动化脚本:命令执行、服务管理、包管理、文件同步
  2. 批量操作:主机清单、并行执行、任务编排
  3. 配置管理:Ansible风格Playbook、模块执行器
  4. 系统监控:指标收集、告警管理、日志收集

练习题

  1. 实现一个完整的服务器批量管理工具,支持分组执行
  2. 开发一个简单的配置管理系统,支持模板渲染
  3. 实现一个系统监控面板,支持实时指标展示
  4. 开发一个日志分析工具,支持关键字搜索和告警
  5. 实现一个定时任务调度系统,支持Cron表达式

扩展阅读

Python技术丛书 - 江苏省宿城中等专业学校