Skip to content

第25章 实战:命令行工具开发

学习目标

  • 掌握 Python CLI 开发的核心框架与设计模式
  • 理解终端 UI 设计原则与用户体验优化
  • 熟练运用 Click 框架构建复杂命令行应用
  • 掌握数据持久化、配置管理与插件架构
  • 了解 CLI 应用的打包、分发与跨平台兼容

25.1 CLI 开发基础理论

25.1.1 命令行界面设计原则

优秀的 CLI 工具应遵循 POSIX/GNU 命令行惯例:

原则说明示例
短选项单横线 + 单字母-v, -f
长选项双横线 + 完整词--verbose, --file
参数合并短选项可合并-avz 等价于 -a -v -z
选项终止-- 表示后续为位置参数rm -- -filename
退出码0 成功,非零失败sys.exit(1)
标准流stdin/stdout/stderr错误信息输出到 stderr
帮助信息-h / --help自动生成
版本信息-V / --version显示版本号

25.1.2 Python CLI 框架对比

框架特点适用场景
argparse标准库,功能完备简单脚本、无外部依赖需求
Click装饰器风格,功能丰富中大型 CLI 应用
Typer基于 Click + 类型注解现代化快速开发
Rich终端富文本渲染美化输出
Textual终端 TUI 框架复杂交互式应用

25.2 项目概述

本章将构建一个功能完整的任务管理 CLI 工具 taskman,涵盖以下核心功能:

  • 任务的增删改查与状态管理
  • 分类、标签与优先级系统
  • 全文搜索与过滤
  • 多种输出格式(表格、JSON、Markdown)
  • 配置文件管理
  • 插件扩展机制
  • 数据导入导出

25.3 项目结构

taskman/
├── src/
│   └── taskman/
│       ├── __init__.py
│       ├── __version__.py
│       ├── cli.py              # CLI 入口与命令定义
│       ├── models.py           # 数据模型
│       ├── storage.py          # 存储引擎
│       ├── config.py           # 配置管理
│       ├── formatter.py        # 输出格式化
│       ├── search.py           # 搜索引擎
│       └── plugins/            # 插件系统
│           ├── __init__.py
│           └── base.py
├── tests/
│   ├── __init__.py
│   ├── conftest.py
│   ├── test_models.py
│   ├── test_storage.py
│   ├── test_cli.py
│   └── test_search.py
├── pyproject.toml
├── README.md
└── CHANGELOG.md

25.4 数据模型

python
from __future__ import annotations

from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Any


class Priority(Enum):
    LOW = ("low", 1, "\033[94m")
    MEDIUM = ("medium", 2, "\033[93m")
    HIGH = ("high", 3, "\033[91m")
    CRITICAL = ("critical", 4, "\033[95m")

    def __init__(self, label: str, weight: int, color: str) -> None:
        self.label = label
        self.weight = weight
        self.color = color

    @classmethod
    def from_str(cls, value: str) -> Priority:
        mapping = {p.label: p for p in cls}
        if value.lower() not in mapping:
            raise ValueError(f"Invalid priority: {value}. Choose from: {', '.join(mapping)}")
        return mapping[value.lower()]


class Status(Enum):
    TODO = ("todo", "\033[90m")
    IN_PROGRESS = ("in_progress", "\033[94m")
    BLOCKED = ("blocked", "\033[91m")
    DONE = ("done", "\033[92m")
    CANCELLED = ("cancelled", "\033[90m")

    def __init__(self, label: str, color: str) -> None:
        self.label = label
        self.color = color

    @classmethod
    def from_str(cls, value: str) -> Status:
        mapping = {s.label: s for s in cls}
        if value.lower() not in mapping:
            raise ValueError(f"Invalid status: {value}. Choose from: {', '.join(mapping)}")
        return mapping[value.lower()]


@dataclass
class Task:
    id: int = 0
    title: str = ""
    description: str = ""
    priority: Priority = Priority.MEDIUM
    status: Status = Status.TODO
    category: str = "default"
    tags: list[str] = field(default_factory=list)
    created_at: datetime = field(default_factory=datetime.now)
    updated_at: datetime = field(default_factory=datetime.now)
    due_date: datetime | None = None
    completed_at: datetime | None = None
    metadata: dict[str, Any] = field(default_factory=dict)

    def mark_done(self) -> None:
        if self.status == Status.DONE:
            return
        self.status = Status.DONE
        self.completed_at = datetime.now()
        self.updated_at = datetime.now()

    def update(self, **kwargs: Any) -> None:
        for key, value in kwargs.items():
            if hasattr(self, key) and value is not None:
                setattr(self, key, value)
        self.updated_at = datetime.now()

    def matches(
        self,
        status: Status | None = None,
        category: str | None = None,
        priority: Priority | None = None,
        tag: str | None = None,
        query: str | None = None,
    ) -> bool:
        if status and self.status != status:
            return False
        if category and self.category != category:
            return False
        if priority and self.priority != priority:
            return False
        if tag and tag not in self.tags:
            return False
        if query:
            q = query.lower()
            if q not in self.title.lower() and q not in self.description.lower():
                return False
        return True

    def to_dict(self) -> dict[str, Any]:
        return {
            "id": self.id,
            "title": self.title,
            "description": self.description,
            "priority": self.priority.label,
            "status": self.status.label,
            "category": self.category,
            "tags": self.tags,
            "created_at": self.created_at.isoformat(),
            "updated_at": self.updated_at.isoformat(),
            "due_date": self.due_date.isoformat() if self.due_date else None,
            "completed_at": self.completed_at.isoformat() if self.completed_at else None,
            "metadata": self.metadata,
        }

    @classmethod
    def from_dict(cls, data: dict[str, Any]) -> Task:
        return cls(
            id=data.get("id", 0),
            title=data.get("title", ""),
            description=data.get("description", ""),
            priority=Priority.from_str(data.get("priority", "medium")),
            status=Status.from_str(data.get("status", "todo")),
            category=data.get("category", "default"),
            tags=data.get("tags", []),
            created_at=datetime.fromisoformat(data["created_at"]) if data.get("created_at") else datetime.now(),
            updated_at=datetime.fromisoformat(data["updated_at"]) if data.get("updated_at") else datetime.now(),
            due_date=datetime.fromisoformat(data["due_date"]) if data.get("due_date") else None,
            completed_at=datetime.fromisoformat(data["completed_at"]) if data.get("completed_at") else None,
            metadata=data.get("metadata", {}),
        )

25.5 存储引擎

python
from __future__ import annotations

import json
import shutil
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Any

from .models import Task


class StorageBackend(ABC):
    @abstractmethod
    def load(self) -> list[Task]: ...

    @abstractmethod
    def save(self, tasks: list[Task]) -> None: ...


class JsonStorage(StorageBackend):
    def __init__(self, file_path: Path) -> None:
        self.file_path = file_path
        self._ensure_file()

    def _ensure_file(self) -> None:
        if not self.file_path.exists():
            self.file_path.parent.mkdir(parents=True, exist_ok=True)
            self.file_path.write_text("[]", encoding="utf-8")

    def load(self) -> list[Task]:
        data = json.loads(self.file_path.read_text(encoding="utf-8"))
        return [Task.from_dict(item) for item in data]

    def save(self, tasks: list[Task]) -> None:
        data = [task.to_dict() for task in tasks]
        tmp_path = self.file_path.with_suffix(".tmp")
        tmp_path.write_text(
            json.dumps(data, indent=2, ensure_ascii=False),
            encoding="utf-8",
        )
        shutil.move(str(tmp_path), str(self.file_path))


class TaskRepository:
    def __init__(self, storage: StorageBackend) -> None:
        self._storage = storage
        self._tasks: list[Task] = []
        self._next_id: int = 1
        self._load()

    def _load(self) -> None:
        self._tasks = self._storage.load()
        if self._tasks:
            self._next_id = max(t.id for t in self._tasks) + 1

    def _persist(self) -> None:
        self._storage.save(self._tasks)

    def add(self, task: Task) -> Task:
        task.id = self._next_id
        self._next_id += 1
        self._tasks.append(task)
        self._persist()
        return task

    def get(self, task_id: int) -> Task | None:
        return next((t for t in self._tasks if t.id == task_id), None)

    def update(self, task_id: int, **kwargs: Any) -> Task | None:
        task = self.get(task_id)
        if task is None:
            return None
        task.update(**kwargs)
        self._persist()
        return task

    def delete(self, task_id: int) -> bool:
        task = self.get(task_id)
        if task is None:
            return False
        self._tasks.remove(task)
        self._persist()
        return True

    def list_all(
        self,
        status: str | None = None,
        category: str | None = None,
        priority: str | None = None,
        tag: str | None = None,
        query: str | None = None,
        sort_by: str = "created_at",
        reverse: bool = True,
    ) -> list[Task]:
        from .models import Priority as P, Status as S

        status_filter = S.from_str(status) if status else None
        priority_filter = P.from_str(priority) if priority else None

        results = [
            t for t in self._tasks
            if t.matches(
                status=status_filter,
                category=category,
                priority=priority_filter,
                tag=tag,
                query=query,
            )
        ]

        sort_key = lambda t: getattr(t, sort_by, t.created_at)
        results.sort(key=sort_key, reverse=reverse)
        return results

    def count(self) -> dict[str, int]:
        from .models import Status as S
        counts: dict[str, int] = {s.label: 0 for s in S}
        for task in self._tasks:
            counts[task.status.label] += 1
        counts["total"] = len(self._tasks)
        return counts

    def export_data(self) -> list[dict[str, Any]]:
        return [t.to_dict() for t in self._tasks]

    def import_data(self, data: list[dict[str, Any]]) -> int:
        imported = 0
        for item in data:
            task = Task.from_dict(item)
            task.id = self._next_id
            self._next_id += 1
            self._tasks.append(task)
            imported += 1
        self._persist()
        return imported

25.6 配置管理

python
from __future__ import annotations

import os
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any

import yaml


def get_default_config_path() -> Path:
    xdg = os.getenv("XDG_CONFIG_HOME")
    if xdg:
        return Path(xdg) / "taskman"
    return Path.home() / ".config" / "taskman"


def get_default_data_path() -> Path:
    xdg = os.getenv("XDG_DATA_HOME")
    if xdg:
        return Path(xdg) / "taskman"
    return Path.home() / ".local" / "share" / "taskman"


@dataclass
class TaskmanConfig:
    data_file: Path = field(default_factory=lambda: get_default_data_path() / "tasks.json")
    default_priority: str = "medium"
    default_category: str = "default"
    date_format: str = "%Y-%m-%d %H:%M"
    color_enabled: bool = True
    pager: str = os.getenv("PAGER", "less")
    editor: str = os.getenv("EDITOR", "vi")
    export_format: str = "table"
    sort_by: str = "created_at"
    sort_reverse: bool = True

    @classmethod
    def load(cls, config_path: Path | None = None) -> TaskmanConfig:
        if config_path is None:
            config_path = get_default_config_path() / "config.yaml"

        if not config_path.exists():
            config = cls()
            config.save(config_path)
            return config

        with open(config_path, "r", encoding="utf-8") as f:
            data = yaml.safe_load(f) or {}

        config = cls()
        for key, value in data.items():
            if hasattr(config, key):
                if key == "data_file":
                    value = Path(value)
                setattr(config, key, value)

        return config

    def save(self, config_path: Path | None = None) -> None:
        if config_path is None:
            config_path = get_default_config_path() / "config.yaml"

        config_path.parent.mkdir(parents=True, exist_ok=True)

        data: dict[str, Any] = {
            "data_file": str(self.data_file),
            "default_priority": self.default_priority,
            "default_category": self.default_category,
            "date_format": self.date_format,
            "color_enabled": self.color_enabled,
            "pager": self.pager,
            "editor": self.editor,
            "export_format": self.export_format,
            "sort_by": self.sort_by,
            "sort_reverse": self.sort_reverse,
        }

        with open(config_path, "w", encoding="utf-8") as f:
            yaml.dump(data, f, default_flow_style=False, allow_unicode=True)

25.7 输出格式化

python
from __future__ import annotations

import json
from datetime import datetime
from typing import Any

from .models import Task, Priority, Status


RESET = "\033[0m"
BOLD = "\033[1m"
DIM = "\033[2m"


def colorize(text: str, color: str, bold: bool = False) -> str:
    prefix = BOLD if bold else ""
    return f"{prefix}{color}{text}{RESET}"


def format_datetime(dt: datetime, fmt: str = "%Y-%m-%d %H:%M") -> str:
    return dt.strftime(fmt)


def format_priority(p: Priority) -> str:
    return colorize(f"● {p.label.upper():<8}", p.color, bold=p.weight >= 3)


def format_status(s: Status) -> str:
    symbols = {
        Status.TODO: "○",
        Status.IN_PROGRESS: "◐",
        Status.BLOCKED: "✖",
        Status.DONE: "●",
        Status.CANCELLED: "○",
    }
    symbol = symbols.get(s, "○")
    return colorize(f"{symbol} {s.label:<12}", s.color)


class TableFormatter:
    @staticmethod
    def format_tasks(tasks: list[Task], date_format: str = "%Y-%m-%d %H:%M") -> str:
        if not tasks:
            return "No tasks found."

        lines: list[str] = []
        lines.append(
            f"{'ID':>4}  {'Status':<14} {'Priority':<10} {'Category':<12} {'Title'}"
        )
        lines.append("─" * 72)

        for task in tasks:
            status = format_status(task.status)
            priority = format_priority(task.priority)
            category = colorize(f"{task.category:<12}", DIM)
            title = task.title
            if task.status == Status.DONE:
                title = colorize(title, DIM)

            lines.append(
                f"{task.id:>4}  {status} {priority} {category} {title}"
            )

        return "\n".join(lines)

    @staticmethod
    def format_task_detail(task: Task, date_format: str = "%Y-%m-%d %H:%M") -> str:
        lines: list[str] = []
        lines.append(f"{'═' * 60}")
        lines.append(f"  Task #{task.id}: {colorize(task.title, BOLD)}")
        lines.append(f"{'═' * 60}")
        lines.append(f"  Status:   {format_status(task.status)}")
        lines.append(f"  Priority: {format_priority(task.priority)}")
        lines.append(f"  Category: {task.category}")

        if task.description:
            lines.append(f"\n  Description:")
            for line in task.description.split("\n"):
                lines.append(f"    {line}")

        if task.tags:
            lines.append(f"\n  Tags: {', '.join(f'#{t}' for t in task.tags)}")

        lines.append(f"\n  Created:  {format_datetime(task.created_at, date_format)}")
        lines.append(f"  Updated:  {format_datetime(task.updated_at, date_format)}")

        if task.due_date:
            lines.append(f"  Due:      {format_datetime(task.due_date, date_format)}")

        if task.completed_at:
            lines.append(f"  Completed: {format_datetime(task.completed_at, date_format)}")

        lines.append(f"{'═' * 60}")
        return "\n".join(lines)

    @staticmethod
    def format_stats(counts: dict[str, int]) -> str:
        total = counts.get("total", 0)
        if total == 0:
            return "No tasks yet."

        lines: list[str] = []
        lines.append(f"\n  Task Statistics (Total: {total})")
        lines.append(f"  {'─' * 40}")

        status_items = [
            ("Todo", counts.get("todo", 0), Status.TODO),
            ("In Progress", counts.get("in_progress", 0), Status.IN_PROGRESS),
            ("Blocked", counts.get("blocked", 0), Status.BLOCKED),
            ("Done", counts.get("done", 0), Status.DONE),
            ("Cancelled", counts.get("cancelled", 0), Status.CANCELLED),
        ]

        for label, count, status in status_items:
            pct = (count / total) * 100 if total > 0 else 0
            bar_len = int(pct / 5)
            bar = "█" * bar_len + "░" * (20 - bar_len)
            lines.append(f"  {label:<12} {count:>3} {colorize(bar, status.color)} {pct:.0f}%")

        return "\n".join(lines)


class JsonFormatter:
    @staticmethod
    def format_tasks(tasks: list[Task]) -> str:
        return json.dumps(
            [t.to_dict() for t in tasks],
            indent=2,
            ensure_ascii=False,
        )

    @staticmethod
    def format_task_detail(task: Task) -> str:
        return json.dumps(task.to_dict(), indent=2, ensure_ascii=False)


class MarkdownFormatter:
    @staticmethod
    def format_tasks(tasks: list[Task]) -> str:
        lines: list[str] = []
        lines.append("| ID | Status | Priority | Category | Title |")
        lines.append("|----|--------|----------|----------|-------|")
        for t in tasks:
            lines.append(
                f"| {t.id} | {t.status.label} | {t.priority.label} | "
                f"{t.category} | {t.title} |"
            )
        return "\n".join(lines)

25.8 CLI 入口

python
from __future__ import annotations

import sys
from datetime import datetime
from pathlib import Path
from typing import Any

import click

from .models import Task, Priority, Status
from .storage import JsonStorage, TaskRepository
from .config import TaskmanConfig
from .formatter import TableFormatter, JsonFormatter, MarkdownFormatter


def get_repo(ctx: click.Context) -> TaskRepository:
    config = ctx.obj["config"]
    storage = JsonStorage(config.data_file)
    return TaskRepository(storage)


@click.group()
@click.option("--config", "config_path", type=click.Path(), envvar="TASKMAN_CONFIG", help="Config file path")
@click.option("--no-color", is_flag=True, help="Disable colored output")
@click.version_option(version="1.0.0", prog_name="taskman")
@click.pass_context
def cli(ctx: click.Context, config_path: str | None, no_color: bool) -> None:
    """Taskman - A powerful task management CLI tool."""
    ctx.ensure_object(dict)
    config = TaskmanConfig.load(Path(config_path) if config_path else None)
    if no_color:
        config.color_enabled = False
    ctx.obj["config"] = config


@cli.command()
@click.argument("title")
@click.option("-d", "--description", default="", help="Task description")
@click.option("-p", "--priority", type=click.Choice(["low", "medium", "high", "critical"]), help="Task priority")
@click.option("-c", "--category", help="Task category")
@click.option("-t", "--tags", help="Comma-separated tags")
@click.option("--due", type=click.DateTime(formats=["%Y-%m-%d", "%Y-%m-%d %H:%M"]), help="Due date")
@click.pass_context
def add(
    ctx: click.Context,
    title: str,
    description: str,
    priority: str | None,
    category: str | None,
    tags: str | None,
    due: datetime | None,
) -> None:
    """Add a new task."""
    config = ctx.obj["config"]
    repo = get_repo(ctx)

    task = Task(
        title=title,
        description=description,
        priority=Priority.from_str(priority or config.default_priority),
        category=category or config.default_category,
        tags=[t.strip() for t in tags.split(",")] if tags else [],
        due_date=due,
    )

    added = repo.add(task)
    click.echo(f"✓ Task #{added.id} added: {added.title}")


@cli.command("list")
@click.option("-s", "--status", type=click.Choice(["todo", "in_progress", "blocked", "done", "cancelled"]), help="Filter by status")
@click.option("-p", "--priority", type=click.Choice(["low", "medium", "high", "critical"]), help="Filter by priority")
@click.option("-c", "--category", help="Filter by category")
@click.option("-t", "--tag", help="Filter by tag")
@click.option("-q", "--query", help="Search in title and description")
@click.option("--sort-by", type=click.Choice(["created_at", "updated_at", "priority", "due_date"]), help="Sort field")
@click.option("--reverse/--no-reverse", default=True, help="Sort direction")
@click.option("-f", "--format", "output_format", type=click.Choice(["table", "json", "markdown"]), help="Output format")
@click.pass_context
def list_tasks(
    ctx: click.Context,
    status: str | None,
    priority: str | None,
    category: str | None,
    tag: str | None,
    query: str | None,
    sort_by: str | None,
    reverse: bool,
    output_format: str | None,
) -> None:
    """List tasks with optional filters."""
    config = ctx.obj["config"]
    repo = get_repo(ctx)

    tasks = repo.list_all(
        status=status,
        category=category,
        priority=priority,
        tag=tag,
        query=query,
        sort_by=sort_by or config.sort_by,
        reverse=reverse if sort_by else config.sort_reverse,
    )

    fmt = output_format or config.export_format
    if fmt == "json":
        click.echo(JsonFormatter.format_tasks(tasks))
    elif fmt == "markdown":
        click.echo(MarkdownFormatter.format_tasks(tasks))
    else:
        click.echo(TableFormatter.format_tasks(tasks, config.date_format))


@cli.command()
@click.argument("task_id", type=int)
@click.pass_context
def show(ctx: click.Context, task_id: int) -> None:
    """Show task details."""
    config = ctx.obj["config"]
    repo = get_repo(ctx)

    task = repo.get(task_id)
    if task is None:
        click.echo(f"Error: Task #{task_id} not found.", err=True)
        sys.exit(1)

    click.echo(TableFormatter.format_task_detail(task, config.date_format))


@cli.command()
@click.argument("task_id", type=int)
@click.option("-t", "--title", help="New title")
@click.option("-d", "--description", help="New description")
@click.option("-p", "--priority", type=click.Choice(["low", "medium", "high", "critical"]), help="New priority")
@click.option("-c", "--category", help="New category")
@click.option("-s", "--status", type=click.Choice(["todo", "in_progress", "blocked", "done", "cancelled"]), help="New status")
@click.option("--due", type=click.DateTime(formats=["%Y-%m-%d", "%Y-%m-%d %H:%M"]), help="New due date")
@click.option("--add-tags", help="Tags to add (comma-separated)")
@click.option("--remove-tags", help="Tags to remove (comma-separated)")
@click.pass_context
def update(
    ctx: click.Context,
    task_id: int,
    title: str | None,
    description: str | None,
    priority: str | None,
    category: str | None,
    status: str | None,
    due: datetime | None,
    add_tags: str | None,
    remove_tags: str | None,
) -> None:
    """Update a task."""
    repo = get_repo(ctx)

    kwargs: dict[str, Any] = {}
    if title:
        kwargs["title"] = title
    if description is not None:
        kwargs["description"] = description
    if priority:
        kwargs["priority"] = Priority.from_str(priority)
    if category:
        kwargs["category"] = category
    if status:
        kwargs["status"] = Status.from_str(status)
    if due:
        kwargs["due_date"] = due

    task = repo.get(task_id)
    if task is None:
        click.echo(f"Error: Task #{task_id} not found.", err=True)
        sys.exit(1)

    if add_tags:
        new_tags = set(task.tags) | {t.strip() for t in add_tags.split(",")}
        kwargs["tags"] = list(new_tags)
    if remove_tags:
        new_tags = set(task.tags) - {t.strip() for t in remove_tags.split(",")}
        kwargs["tags"] = list(new_tags)

    updated = repo.update(task_id, **kwargs)
    if updated:
        click.echo(f"✓ Task #{task_id} updated")


@cli.command()
@click.argument("task_id", type=int)
@click.confirmation_option(prompt="Delete this task?")
@click.pass_context
def delete(ctx: click.Context, task_id: int) -> None:
    """Delete a task."""
    repo = get_repo(ctx)

    if repo.delete(task_id):
        click.echo(f"✓ Task #{task_id} deleted")
    else:
        click.echo(f"Error: Task #{task_id} not found.", err=True)
        sys.exit(1)


@cli.command()
@click.argument("task_id", type=int)
@click.pass_context
def done(ctx: click.Context, task_id: int) -> None:
    """Mark a task as done."""
    repo = get_repo(ctx)

    task = repo.get(task_id)
    if task is None:
        click.echo(f"Error: Task #{task_id} not found.", err=True)
        sys.exit(1)

    task.mark_done()
    repo.update(task_id, status=Status.DONE, completed_at=task.completed_at)
    click.echo(f"✓ Task #{task_id} marked as done")


@cli.command()
@click.argument("query")
@click.option("-f", "--format", "output_format", type=click.Choice(["table", "json", "markdown"]), help="Output format")
@click.pass_context
def search(ctx: click.Context, query: str, output_format: str | None) -> None:
    """Search tasks by title or description."""
    config = ctx.obj["config"]
    repo = get_repo(ctx)

    tasks = repo.list_all(query=query)

    fmt = output_format or config.export_format
    if fmt == "json":
        click.echo(JsonFormatter.format_tasks(tasks))
    elif fmt == "markdown":
        click.echo(MarkdownFormatter.format_tasks(tasks))
    else:
        click.echo(TableFormatter.format_tasks(tasks, config.date_format))


@cli.command()
@click.pass_context
def stats(ctx: click.Context) -> None:
    """Show task statistics."""
    repo = get_repo(ctx)
    counts = repo.count()
    click.echo(TableFormatter.format_stats(counts))


@cli.command()
@click.argument("file_path", type=click.Path())
@click.option("-f", "--format", "export_format", type=click.Choice(["json", "markdown"]), default="json", help="Export format")
@click.pass_context
def export(ctx: click.Context, file_path: str, export_format: str) -> None:
    """Export tasks to a file."""
    repo = get_repo(ctx)
    path = Path(file_path)

    if export_format == "json":
        content = JsonFormatter.format_tasks(repo.list_all())
    else:
        content = MarkdownFormatter.format_tasks(repo.list_all())

    path.write_text(content, encoding="utf-8")
    click.echo(f"✓ Exported {len(repo.list_all())} tasks to {file_path}")


@cli.command()
@click.argument("file_path", type=click.Path(exists=True))
@click.option("-f", "--format", "import_format", type=click.Choice(["json"]), default="json", help="Import format")
@click.pass_context
def import_tasks(ctx: click.Context, file_path: str, import_format: str) -> None:
    """Import tasks from a file."""
    repo = get_repo(ctx)
    path = Path(file_path)

    content = path.read_text(encoding="utf-8")
    data = json.loads(content)

    count = repo.import_data(data)
    click.echo(f"✓ Imported {count} tasks from {file_path}")


@cli.command()
@click.option("--show", "show_config", is_flag=True, help="Show current configuration")
@click.option("--set", "set_values", nargs=2, multiple=True, help="Set config key=value")
@click.pass_context
def config_cmd(ctx: click.Context, show_config: bool, set_values: tuple[tuple[str, str], ...]) -> None:
    """Manage configuration."""
    cfg = ctx.obj["config"]

    if show_config:
        click.echo(f"Config file: {TaskmanConfig.get_default_config_path() / 'config.yaml'}")
        click.echo(f"Data file:   {cfg.data_file}")
        click.echo(f"Priority:    {cfg.default_priority}")
        click.echo(f"Category:    {cfg.default_category}")
        click.echo(f"Date format: {cfg.date_format}")
        click.echo(f"Color:       {cfg.color_enabled}")
        click.echo(f"Editor:      {cfg.editor}")
        click.echo(f"Pager:       {cfg.pager}")
        return

    for key, value in set_values:
        if hasattr(cfg, key):
            setattr(cfg, key, value)
            click.echo(f"✓ Set {key} = {value}")
        else:
            click.echo(f"Error: Unknown config key: {key}", err=True)

    cfg.save()


def main() -> None:
    cli(obj={})


if __name__ == "__main__":
    main()

25.9 测试

python
import json
import tempfile
from datetime import datetime
from pathlib import Path

import pytest
from click.testing import CliRunner

from taskman.models import Task, Priority, Status
from taskman.storage import JsonStorage, TaskRepository
from taskman.cli import cli


@pytest.fixture
def temp_dir():
    with tempfile.TemporaryDirectory() as d:
        yield Path(d)


@pytest.fixture
def storage(temp_dir):
    return JsonStorage(temp_dir / "tasks.json")


@pytest.fixture
def repo(storage):
    return TaskRepository(storage)


class TestTaskModel:
    def test_create_task(self):
        task = Task(title="Test Task", priority=Priority.HIGH)
        assert task.title == "Test Task"
        assert task.priority == Priority.HIGH
        assert task.status == Status.TODO

    def test_mark_done(self):
        task = Task(title="Test Task")
        task.mark_done()
        assert task.status == Status.DONE
        assert task.completed_at is not None

    def test_mark_done_idempotent(self):
        task = Task(title="Test Task")
        task.mark_done()
        first_completed = task.completed_at
        task.mark_done()
        assert task.completed_at == first_completed

    def test_matches_status(self):
        task = Task(title="Test", status=Status.IN_PROGRESS)
        assert task.matches(status=Status.IN_PROGRESS)
        assert not task.matches(status=Status.DONE)

    def test_matches_query(self):
        task = Task(title="Buy groceries", description="Milk and eggs")
        assert task.matches(query="groceries")
        assert task.matches(query="milk")
        assert not task.matches(query="python")

    def test_to_dict_roundtrip(self):
        task = Task(
            id=1,
            title="Test",
            priority=Priority.HIGH,
            tags=["work", "urgent"],
        )
        data = task.to_dict()
        restored = Task.from_dict(data)
        assert restored.id == task.id
        assert restored.title == task.title
        assert restored.priority == task.priority
        assert restored.tags == task.tags

    def test_invalid_priority(self):
        with pytest.raises(ValueError):
            Priority.from_str("invalid")

    def test_invalid_status(self):
        with pytest.raises(ValueError):
            Status.from_str("invalid")


class TestTaskRepository:
    def test_add_task(self, repo):
        task = Task(title="Test Task")
        added = repo.add(task)
        assert added.id == 1
        assert added.title == "Test Task"

    def test_add_multiple_tasks(self, repo):
        repo.add(Task(title="Task 1"))
        repo.add(Task(title="Task 2"))
        assert len(repo.list_all()) == 2
        assert repo.get(2).title == "Task 2"

    def test_get_nonexistent(self, repo):
        assert repo.get(999) is None

    def test_update_task(self, repo):
        repo.add(Task(title="Original"))
        updated = repo.update(1, title="Updated", priority=Priority.HIGH)
        assert updated.title == "Updated"
        assert updated.priority == Priority.HIGH

    def test_delete_task(self, repo):
        repo.add(Task(title="To Delete"))
        assert repo.delete(1) is True
        assert repo.get(1) is None
        assert repo.delete(999) is False

    def test_list_with_filters(self, repo):
        repo.add(Task(title="Work Task", category="work", priority=Priority.HIGH))
        repo.add(Task(title="Personal Task", category="personal", priority=Priority.LOW))
        repo.add(Task(title="Work Urgent", category="work", priority=Priority.CRITICAL))

        work_tasks = repo.list_all(category="work")
        assert len(work_tasks) == 2

        high_tasks = repo.list_all(priority="high")
        assert len(high_tasks) == 1

    def test_search(self, repo):
        repo.add(Task(title="Buy groceries", description="Milk and eggs"))
        repo.add(Task(title="Read Python book", description="Advanced topics"))
        repo.add(Task(title="Buy Python course", description="Online class"))

        results = repo.list_all(query="python")
        assert len(results) == 2

    def test_count(self, repo):
        repo.add(Task(title="T1", status=Status.TODO))
        repo.add(Task(title="T2", status=Status.DONE))
        repo.add(Task(title="T3", status=Status.IN_PROGRESS))

        counts = repo.count()
        assert counts["todo"] == 1
        assert counts["done"] == 1
        assert counts["in_progress"] == 1
        assert counts["total"] == 3

    def test_export_import(self, repo, temp_dir):
        repo.add(Task(title="Task 1", category="work"))
        repo.add(Task(title="Task 2", category="personal"))

        data = repo.export_data()
        export_path = temp_dir / "export.json"
        export_path.write_text(json.dumps(data, indent=2), encoding="utf-8")

        new_storage = JsonStorage(temp_dir / "imported.json")
        new_repo = TaskRepository(new_storage)
        imported_count = new_repo.import_data(data)
        assert imported_count == 2
        assert len(new_repo.list_all()) == 2

    def test_persistence(self, storage):
        repo1 = TaskRepository(storage)
        repo1.add(Task(title="Persistent Task"))

        repo2 = TaskRepository(storage)
        assert len(repo2.list_all()) == 1
        assert repo2.get(1).title == "Persistent Task"


class TestCLI:
    @pytest.fixture
    def runner(self, temp_dir):
        runner = CliRunner()
        config_path = temp_dir / "config.yaml"
        data_path = temp_dir / "tasks.json"
        return runner, config_path, data_path

    def test_add_task(self, runner):
        runner_obj, config_path, data_path = runner
        result = runner_obj.invoke(cli, [
            "--config", str(config_path),
            "add", "My First Task",
            "-p", "high",
            "-c", "work",
        ])
        assert result.exit_code == 0
        assert "Task #1 added" in result.output

    def test_list_tasks(self, runner):
        runner_obj, config_path, data_path = runner
        runner_obj.invoke(cli, ["--config", str(config_path), "add", "Task 1"])
        runner_obj.invoke(cli, ["--config", str(config_path), "add", "Task 2"])

        result = runner_obj.invoke(cli, ["--config", str(config_path), "list"])
        assert result.exit_code == 0
        assert "Task 1" in result.output
        assert "Task 2" in result.output

    def test_show_task(self, runner):
        runner_obj, config_path, data_path = runner
        runner_obj.invoke(cli, ["--config", str(config_path), "add", "Detail Task", "-d", "Some description"])

        result = runner_obj.invoke(cli, ["--config", str(config_path), "show", "1"])
        assert result.exit_code == 0
        assert "Detail Task" in result.output

    def test_show_nonexistent(self, runner):
        runner_obj, config_path, data_path = runner
        result = runner_obj.invoke(cli, ["--config", str(config_path), "show", "999"])
        assert result.exit_code == 1

    def test_update_task(self, runner):
        runner_obj, config_path, data_path = runner
        runner_obj.invoke(cli, ["--config", str(config_path), "add", "Original"])

        result = runner_obj.invoke(cli, [
            "--config", str(config_path),
            "update", "1", "-t", "Updated", "-p", "critical",
        ])
        assert result.exit_code == 0
        assert "updated" in result.output

    def test_done_task(self, runner):
        runner_obj, config_path, data_path = runner
        runner_obj.invoke(cli, ["--config", str(config_path), "add", "To Complete"])

        result = runner_obj.invoke(cli, ["--config", str(config_path), "done", "1"])
        assert result.exit_code == 0
        assert "done" in result.output

    def test_delete_task(self, runner):
        runner_obj, config_path, data_path = runner
        runner_obj.invoke(cli, ["--config", str(config_path), "add", "To Delete"])

        result = runner_obj.invoke(cli, [
            "--config", str(config_path),
            "delete", "1", "--yes",
        ])
        assert result.exit_code == 0

    def test_search(self, runner):
        runner_obj, config_path, data_path = runner
        runner_obj.invoke(cli, ["--config", str(config_path), "add", "Buy Python book"])
        runner_obj.invoke(cli, ["config", str(config_path), "add", "Buy groceries"])

        result = runner_obj.invoke(cli, [
            "--config", str(config_path),
            "search", "python",
        ])
        assert result.exit_code == 0
        assert "Python" in result.output

    def test_stats(self, runner):
        runner_obj, config_path, data_path = runner
        runner_obj.invoke(cli, ["--config", str(config_path), "add", "Task 1"])
        runner_obj.invoke(cli, ["--config", str(config_path), "add", "Task 2"])

        result = runner_obj.invoke(cli, ["--config", str(config_path), "stats"])
        assert result.exit_code == 0
        assert "Total: 2" in result.output

    def test_json_output(self, runner):
        runner_obj, config_path, data_path = runner
        runner_obj.invoke(cli, ["--config", str(config_path), "add", "JSON Task"])

        result = runner_obj.invoke(cli, [
            "--config", str(config_path),
            "list", "-f", "json",
        ])
        assert result.exit_code == 0
        data = json.loads(result.output)
        assert len(data) == 1
        assert data[0]["title"] == "JSON Task"

    def test_version(self, runner):
        runner_obj, config_path, data_path = runner
        result = runner_obj.invoke(cli, ["--version"])
        assert result.exit_code == 0
        assert "taskman" in result.output

25.10 pyproject.toml

toml
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[project]
name = "taskman"
version = "1.0.0"
description = "A powerful task management CLI tool"
readme = "README.md"
license = "MIT"
requires-python = ">=3.10"
dependencies = [
    "click>=8.1.0",
    "pyyaml>=6.0",
]

[project.optional-dependencies]
dev = [
    "pytest>=8.0.0",
    "pytest-cov>=5.0.0",
    "ruff>=0.4.0",
    "mypy>=1.10.0",
]

[project.scripts]
taskman = "taskman.cli:main"

[tool.ruff]
target-version = "py310"
line-length = 88
src = ["src"]

[tool.ruff.lint]
select = ["E", "W", "F", "I", "N", "UP", "B", "SIM"]

[tool.mypy]
python_version = "3.12"
strict = true

[tool.pytest.ini_options]
testpaths = ["tests"]

25.11 前沿技术动态

25.11.1 现代CLI框架

python
import typer
from rich.console import Console

app = typer.Typer()
console = Console()

@app.command()
def hello(name: str, count: int = 1):
    for _ in range(count):
        console.print(f"Hello [bold green]{name}[/bold green]!")

if __name__ == "__main__":
    app()

25.11.2 Rich终端输出

python
from rich.console import Console
from rich.table import Table
from rich.progress import track

console = Console()

table = Table(title="Tasks")
table.add_column("ID", style="cyan")
table.add_column("Task", style="magenta")
table.add_column("Status", style="green")
console.print(table)

for item in track(data, description="Processing..."):
    process(item)

25.11.3 类型化CLI参数

python
from typing import Annotated
import typer

@app.command()
def create(
    name: Annotated[str, typer.Option(help="Task name")],
    priority: Annotated[int, typer.Option(min=1, max=5)] = 3,
    tags: Annotated[list[str], typer.Option()] = []
):
    pass

25.11.4 异步CLI应用

python
import asyncio
import typer

async def fetch_data():
    await asyncio.sleep(1)
    return "data"

@app.command()
def fetch():
    result = asyncio.run(fetch_data())
    console.print(result)

25.12 本章小结

本章通过构建完整的 taskman CLI 工具,系统实践了以下核心知识:

  1. 数据建模:使用 dataclass + Enum 构建类型安全的领域模型
  2. 存储引擎:抽象存储接口 + JSON 持久化 + 原子写入
  3. 配置管理:XDG 规范 + YAML 配置 + 分层覆盖
  4. 输出格式化:表格/JSON/Markdown 多格式 + 彩色终端输出
  5. CLI 框架:Click 装饰器风格 + 命令分组 + 选项验证
  6. 测试策略:模型测试 + 存储测试 + CLI 集成测试
  7. 打包分发:pyproject.toml + entry_points + 可执行脚本

25.13 习题与项目练习

基础练习

  1. 功能扩展:为 taskman 添加子任务(subtask)支持,允许任务包含嵌套子任务。

  2. 输出增强:集成 Rich 库,实现更美观的表格和进度条输出。

  3. 交互模式:添加 taskman interactive 命令,进入交互式 REPL 模式。

进阶练习

  1. 插件系统:实现基于 entry_points 的插件架构,允许第三方扩展命令和输出格式。

  2. 多存储后端:添加 SQLite 存储后端,支持通过配置切换 JSON/SQLite。

  3. 定时提醒:集成系统通知,对即将到期的任务发送桌面提醒。

项目练习

  1. 完整 CLI 工具:独立开发一个文件管理 CLI 工具,功能包括:

    • 文件搜索与过滤
    • 批量重命名
    • 重复文件检测
    • 目录大小统计
    • 配置文件管理
  2. API 客户端 CLI:开发一个 REST API 客户端工具:

    • 支持保存和管理 API 端点
    • 请求历史记录
    • 环境变量替换
    • 响应格式化(JSON/YAML/Table)

思考题

  1. 如何设计 CLI 工具的配置系统,使其同时支持全局配置、项目级配置和命令行覆盖,并保证优先级一致性?

  2. 在开发跨平台 CLI 工具时,需要考虑哪些兼容性问题(路径分隔符、终端编码、信号处理、权限模型)?如何通过抽象层解决?

25.14 延伸阅读

25.14.1 CLI框架

25.14.2 CLI设计规范

25.14.3 打包与分发

25.14.4 经典CLI工具源码


下一章:第26章 Web应用实战

青少年创意编程 - 高中Python组 - 江苏省宿城中等专业学校