第25章 实战:命令行工具开发
学习目标
- 掌握 Python CLI 开发的核心框架与设计模式
- 理解终端 UI 设计原则与用户体验优化
- 熟练运用 Click 框架构建复杂命令行应用
- 掌握数据持久化、配置管理与插件架构
- 了解 CLI 应用的打包、分发与跨平台兼容
25.1 CLI 开发基础理论
25.1.1 命令行界面设计原则
优秀的 CLI 工具应遵循 POSIX/GNU 命令行惯例:
| 原则 | 说明 | 示例 |
|---|---|---|
| 短选项 | 单横线 + 单字母 | -v, -f |
| 长选项 | 双横线 + 完整词 | --verbose, --file |
| 参数合并 | 短选项可合并 | -avz 等价于 -a -v -z |
| 选项终止 | -- 表示后续为位置参数 | rm -- -filename |
| 退出码 | 0 成功,非零失败 | sys.exit(1) |
| 标准流 | stdin/stdout/stderr | 错误信息输出到 stderr |
| 帮助信息 | -h / --help | 自动生成 |
| 版本信息 | -V / --version | 显示版本号 |
25.1.2 Python CLI 框架对比
| 框架 | 特点 | 适用场景 |
|---|---|---|
| argparse | 标准库,功能完备 | 简单脚本、无外部依赖需求 |
| Click | 装饰器风格,功能丰富 | 中大型 CLI 应用 |
| Typer | 基于 Click + 类型注解 | 现代化快速开发 |
| Rich | 终端富文本渲染 | 美化输出 |
| Textual | 终端 TUI 框架 | 复杂交互式应用 |
25.2 项目概述
本章将构建一个功能完整的任务管理 CLI 工具 taskman,涵盖以下核心功能:
- 任务的增删改查与状态管理
- 分类、标签与优先级系统
- 全文搜索与过滤
- 多种输出格式(表格、JSON、Markdown)
- 配置文件管理
- 插件扩展机制
- 数据导入导出
25.3 项目结构
taskman/
├── src/
│ └── taskman/
│ ├── __init__.py
│ ├── __version__.py
│ ├── cli.py # CLI 入口与命令定义
│ ├── models.py # 数据模型
│ ├── storage.py # 存储引擎
│ ├── config.py # 配置管理
│ ├── formatter.py # 输出格式化
│ ├── search.py # 搜索引擎
│ └── plugins/ # 插件系统
│ ├── __init__.py
│ └── base.py
├── tests/
│ ├── __init__.py
│ ├── conftest.py
│ ├── test_models.py
│ ├── test_storage.py
│ ├── test_cli.py
│ └── test_search.py
├── pyproject.toml
├── README.md
└── CHANGELOG.md25.4 数据模型
python
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Any
class Priority(Enum):
LOW = ("low", 1, "\033[94m")
MEDIUM = ("medium", 2, "\033[93m")
HIGH = ("high", 3, "\033[91m")
CRITICAL = ("critical", 4, "\033[95m")
def __init__(self, label: str, weight: int, color: str) -> None:
self.label = label
self.weight = weight
self.color = color
@classmethod
def from_str(cls, value: str) -> Priority:
mapping = {p.label: p for p in cls}
if value.lower() not in mapping:
raise ValueError(f"Invalid priority: {value}. Choose from: {', '.join(mapping)}")
return mapping[value.lower()]
class Status(Enum):
TODO = ("todo", "\033[90m")
IN_PROGRESS = ("in_progress", "\033[94m")
BLOCKED = ("blocked", "\033[91m")
DONE = ("done", "\033[92m")
CANCELLED = ("cancelled", "\033[90m")
def __init__(self, label: str, color: str) -> None:
self.label = label
self.color = color
@classmethod
def from_str(cls, value: str) -> Status:
mapping = {s.label: s for s in cls}
if value.lower() not in mapping:
raise ValueError(f"Invalid status: {value}. Choose from: {', '.join(mapping)}")
return mapping[value.lower()]
@dataclass
class Task:
id: int = 0
title: str = ""
description: str = ""
priority: Priority = Priority.MEDIUM
status: Status = Status.TODO
category: str = "default"
tags: list[str] = field(default_factory=list)
created_at: datetime = field(default_factory=datetime.now)
updated_at: datetime = field(default_factory=datetime.now)
due_date: datetime | None = None
completed_at: datetime | None = None
metadata: dict[str, Any] = field(default_factory=dict)
def mark_done(self) -> None:
if self.status == Status.DONE:
return
self.status = Status.DONE
self.completed_at = datetime.now()
self.updated_at = datetime.now()
def update(self, **kwargs: Any) -> None:
for key, value in kwargs.items():
if hasattr(self, key) and value is not None:
setattr(self, key, value)
self.updated_at = datetime.now()
def matches(
self,
status: Status | None = None,
category: str | None = None,
priority: Priority | None = None,
tag: str | None = None,
query: str | None = None,
) -> bool:
if status and self.status != status:
return False
if category and self.category != category:
return False
if priority and self.priority != priority:
return False
if tag and tag not in self.tags:
return False
if query:
q = query.lower()
if q not in self.title.lower() and q not in self.description.lower():
return False
return True
def to_dict(self) -> dict[str, Any]:
return {
"id": self.id,
"title": self.title,
"description": self.description,
"priority": self.priority.label,
"status": self.status.label,
"category": self.category,
"tags": self.tags,
"created_at": self.created_at.isoformat(),
"updated_at": self.updated_at.isoformat(),
"due_date": self.due_date.isoformat() if self.due_date else None,
"completed_at": self.completed_at.isoformat() if self.completed_at else None,
"metadata": self.metadata,
}
@classmethod
def from_dict(cls, data: dict[str, Any]) -> Task:
return cls(
id=data.get("id", 0),
title=data.get("title", ""),
description=data.get("description", ""),
priority=Priority.from_str(data.get("priority", "medium")),
status=Status.from_str(data.get("status", "todo")),
category=data.get("category", "default"),
tags=data.get("tags", []),
created_at=datetime.fromisoformat(data["created_at"]) if data.get("created_at") else datetime.now(),
updated_at=datetime.fromisoformat(data["updated_at"]) if data.get("updated_at") else datetime.now(),
due_date=datetime.fromisoformat(data["due_date"]) if data.get("due_date") else None,
completed_at=datetime.fromisoformat(data["completed_at"]) if data.get("completed_at") else None,
metadata=data.get("metadata", {}),
)25.5 存储引擎
python
from __future__ import annotations
import json
import shutil
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Any
from .models import Task
class StorageBackend(ABC):
@abstractmethod
def load(self) -> list[Task]: ...
@abstractmethod
def save(self, tasks: list[Task]) -> None: ...
class JsonStorage(StorageBackend):
def __init__(self, file_path: Path) -> None:
self.file_path = file_path
self._ensure_file()
def _ensure_file(self) -> None:
if not self.file_path.exists():
self.file_path.parent.mkdir(parents=True, exist_ok=True)
self.file_path.write_text("[]", encoding="utf-8")
def load(self) -> list[Task]:
data = json.loads(self.file_path.read_text(encoding="utf-8"))
return [Task.from_dict(item) for item in data]
def save(self, tasks: list[Task]) -> None:
data = [task.to_dict() for task in tasks]
tmp_path = self.file_path.with_suffix(".tmp")
tmp_path.write_text(
json.dumps(data, indent=2, ensure_ascii=False),
encoding="utf-8",
)
shutil.move(str(tmp_path), str(self.file_path))
class TaskRepository:
def __init__(self, storage: StorageBackend) -> None:
self._storage = storage
self._tasks: list[Task] = []
self._next_id: int = 1
self._load()
def _load(self) -> None:
self._tasks = self._storage.load()
if self._tasks:
self._next_id = max(t.id for t in self._tasks) + 1
def _persist(self) -> None:
self._storage.save(self._tasks)
def add(self, task: Task) -> Task:
task.id = self._next_id
self._next_id += 1
self._tasks.append(task)
self._persist()
return task
def get(self, task_id: int) -> Task | None:
return next((t for t in self._tasks if t.id == task_id), None)
def update(self, task_id: int, **kwargs: Any) -> Task | None:
task = self.get(task_id)
if task is None:
return None
task.update(**kwargs)
self._persist()
return task
def delete(self, task_id: int) -> bool:
task = self.get(task_id)
if task is None:
return False
self._tasks.remove(task)
self._persist()
return True
def list_all(
self,
status: str | None = None,
category: str | None = None,
priority: str | None = None,
tag: str | None = None,
query: str | None = None,
sort_by: str = "created_at",
reverse: bool = True,
) -> list[Task]:
from .models import Priority as P, Status as S
status_filter = S.from_str(status) if status else None
priority_filter = P.from_str(priority) if priority else None
results = [
t for t in self._tasks
if t.matches(
status=status_filter,
category=category,
priority=priority_filter,
tag=tag,
query=query,
)
]
sort_key = lambda t: getattr(t, sort_by, t.created_at)
results.sort(key=sort_key, reverse=reverse)
return results
def count(self) -> dict[str, int]:
from .models import Status as S
counts: dict[str, int] = {s.label: 0 for s in S}
for task in self._tasks:
counts[task.status.label] += 1
counts["total"] = len(self._tasks)
return counts
def export_data(self) -> list[dict[str, Any]]:
return [t.to_dict() for t in self._tasks]
def import_data(self, data: list[dict[str, Any]]) -> int:
imported = 0
for item in data:
task = Task.from_dict(item)
task.id = self._next_id
self._next_id += 1
self._tasks.append(task)
imported += 1
self._persist()
return imported25.6 配置管理
python
from __future__ import annotations
import os
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
import yaml
def get_default_config_path() -> Path:
xdg = os.getenv("XDG_CONFIG_HOME")
if xdg:
return Path(xdg) / "taskman"
return Path.home() / ".config" / "taskman"
def get_default_data_path() -> Path:
xdg = os.getenv("XDG_DATA_HOME")
if xdg:
return Path(xdg) / "taskman"
return Path.home() / ".local" / "share" / "taskman"
@dataclass
class TaskmanConfig:
data_file: Path = field(default_factory=lambda: get_default_data_path() / "tasks.json")
default_priority: str = "medium"
default_category: str = "default"
date_format: str = "%Y-%m-%d %H:%M"
color_enabled: bool = True
pager: str = os.getenv("PAGER", "less")
editor: str = os.getenv("EDITOR", "vi")
export_format: str = "table"
sort_by: str = "created_at"
sort_reverse: bool = True
@classmethod
def load(cls, config_path: Path | None = None) -> TaskmanConfig:
if config_path is None:
config_path = get_default_config_path() / "config.yaml"
if not config_path.exists():
config = cls()
config.save(config_path)
return config
with open(config_path, "r", encoding="utf-8") as f:
data = yaml.safe_load(f) or {}
config = cls()
for key, value in data.items():
if hasattr(config, key):
if key == "data_file":
value = Path(value)
setattr(config, key, value)
return config
def save(self, config_path: Path | None = None) -> None:
if config_path is None:
config_path = get_default_config_path() / "config.yaml"
config_path.parent.mkdir(parents=True, exist_ok=True)
data: dict[str, Any] = {
"data_file": str(self.data_file),
"default_priority": self.default_priority,
"default_category": self.default_category,
"date_format": self.date_format,
"color_enabled": self.color_enabled,
"pager": self.pager,
"editor": self.editor,
"export_format": self.export_format,
"sort_by": self.sort_by,
"sort_reverse": self.sort_reverse,
}
with open(config_path, "w", encoding="utf-8") as f:
yaml.dump(data, f, default_flow_style=False, allow_unicode=True)25.7 输出格式化
python
from __future__ import annotations
import json
from datetime import datetime
from typing import Any
from .models import Task, Priority, Status
RESET = "\033[0m"
BOLD = "\033[1m"
DIM = "\033[2m"
def colorize(text: str, color: str, bold: bool = False) -> str:
prefix = BOLD if bold else ""
return f"{prefix}{color}{text}{RESET}"
def format_datetime(dt: datetime, fmt: str = "%Y-%m-%d %H:%M") -> str:
return dt.strftime(fmt)
def format_priority(p: Priority) -> str:
return colorize(f"● {p.label.upper():<8}", p.color, bold=p.weight >= 3)
def format_status(s: Status) -> str:
symbols = {
Status.TODO: "○",
Status.IN_PROGRESS: "◐",
Status.BLOCKED: "✖",
Status.DONE: "●",
Status.CANCELLED: "○",
}
symbol = symbols.get(s, "○")
return colorize(f"{symbol} {s.label:<12}", s.color)
class TableFormatter:
@staticmethod
def format_tasks(tasks: list[Task], date_format: str = "%Y-%m-%d %H:%M") -> str:
if not tasks:
return "No tasks found."
lines: list[str] = []
lines.append(
f"{'ID':>4} {'Status':<14} {'Priority':<10} {'Category':<12} {'Title'}"
)
lines.append("─" * 72)
for task in tasks:
status = format_status(task.status)
priority = format_priority(task.priority)
category = colorize(f"{task.category:<12}", DIM)
title = task.title
if task.status == Status.DONE:
title = colorize(title, DIM)
lines.append(
f"{task.id:>4} {status} {priority} {category} {title}"
)
return "\n".join(lines)
@staticmethod
def format_task_detail(task: Task, date_format: str = "%Y-%m-%d %H:%M") -> str:
lines: list[str] = []
lines.append(f"{'═' * 60}")
lines.append(f" Task #{task.id}: {colorize(task.title, BOLD)}")
lines.append(f"{'═' * 60}")
lines.append(f" Status: {format_status(task.status)}")
lines.append(f" Priority: {format_priority(task.priority)}")
lines.append(f" Category: {task.category}")
if task.description:
lines.append(f"\n Description:")
for line in task.description.split("\n"):
lines.append(f" {line}")
if task.tags:
lines.append(f"\n Tags: {', '.join(f'#{t}' for t in task.tags)}")
lines.append(f"\n Created: {format_datetime(task.created_at, date_format)}")
lines.append(f" Updated: {format_datetime(task.updated_at, date_format)}")
if task.due_date:
lines.append(f" Due: {format_datetime(task.due_date, date_format)}")
if task.completed_at:
lines.append(f" Completed: {format_datetime(task.completed_at, date_format)}")
lines.append(f"{'═' * 60}")
return "\n".join(lines)
@staticmethod
def format_stats(counts: dict[str, int]) -> str:
total = counts.get("total", 0)
if total == 0:
return "No tasks yet."
lines: list[str] = []
lines.append(f"\n Task Statistics (Total: {total})")
lines.append(f" {'─' * 40}")
status_items = [
("Todo", counts.get("todo", 0), Status.TODO),
("In Progress", counts.get("in_progress", 0), Status.IN_PROGRESS),
("Blocked", counts.get("blocked", 0), Status.BLOCKED),
("Done", counts.get("done", 0), Status.DONE),
("Cancelled", counts.get("cancelled", 0), Status.CANCELLED),
]
for label, count, status in status_items:
pct = (count / total) * 100 if total > 0 else 0
bar_len = int(pct / 5)
bar = "█" * bar_len + "░" * (20 - bar_len)
lines.append(f" {label:<12} {count:>3} {colorize(bar, status.color)} {pct:.0f}%")
return "\n".join(lines)
class JsonFormatter:
@staticmethod
def format_tasks(tasks: list[Task]) -> str:
return json.dumps(
[t.to_dict() for t in tasks],
indent=2,
ensure_ascii=False,
)
@staticmethod
def format_task_detail(task: Task) -> str:
return json.dumps(task.to_dict(), indent=2, ensure_ascii=False)
class MarkdownFormatter:
@staticmethod
def format_tasks(tasks: list[Task]) -> str:
lines: list[str] = []
lines.append("| ID | Status | Priority | Category | Title |")
lines.append("|----|--------|----------|----------|-------|")
for t in tasks:
lines.append(
f"| {t.id} | {t.status.label} | {t.priority.label} | "
f"{t.category} | {t.title} |"
)
return "\n".join(lines)25.8 CLI 入口
python
from __future__ import annotations
import sys
from datetime import datetime
from pathlib import Path
from typing import Any
import click
from .models import Task, Priority, Status
from .storage import JsonStorage, TaskRepository
from .config import TaskmanConfig
from .formatter import TableFormatter, JsonFormatter, MarkdownFormatter
def get_repo(ctx: click.Context) -> TaskRepository:
config = ctx.obj["config"]
storage = JsonStorage(config.data_file)
return TaskRepository(storage)
@click.group()
@click.option("--config", "config_path", type=click.Path(), envvar="TASKMAN_CONFIG", help="Config file path")
@click.option("--no-color", is_flag=True, help="Disable colored output")
@click.version_option(version="1.0.0", prog_name="taskman")
@click.pass_context
def cli(ctx: click.Context, config_path: str | None, no_color: bool) -> None:
"""Taskman - A powerful task management CLI tool."""
ctx.ensure_object(dict)
config = TaskmanConfig.load(Path(config_path) if config_path else None)
if no_color:
config.color_enabled = False
ctx.obj["config"] = config
@cli.command()
@click.argument("title")
@click.option("-d", "--description", default="", help="Task description")
@click.option("-p", "--priority", type=click.Choice(["low", "medium", "high", "critical"]), help="Task priority")
@click.option("-c", "--category", help="Task category")
@click.option("-t", "--tags", help="Comma-separated tags")
@click.option("--due", type=click.DateTime(formats=["%Y-%m-%d", "%Y-%m-%d %H:%M"]), help="Due date")
@click.pass_context
def add(
ctx: click.Context,
title: str,
description: str,
priority: str | None,
category: str | None,
tags: str | None,
due: datetime | None,
) -> None:
"""Add a new task."""
config = ctx.obj["config"]
repo = get_repo(ctx)
task = Task(
title=title,
description=description,
priority=Priority.from_str(priority or config.default_priority),
category=category or config.default_category,
tags=[t.strip() for t in tags.split(",")] if tags else [],
due_date=due,
)
added = repo.add(task)
click.echo(f"✓ Task #{added.id} added: {added.title}")
@cli.command("list")
@click.option("-s", "--status", type=click.Choice(["todo", "in_progress", "blocked", "done", "cancelled"]), help="Filter by status")
@click.option("-p", "--priority", type=click.Choice(["low", "medium", "high", "critical"]), help="Filter by priority")
@click.option("-c", "--category", help="Filter by category")
@click.option("-t", "--tag", help="Filter by tag")
@click.option("-q", "--query", help="Search in title and description")
@click.option("--sort-by", type=click.Choice(["created_at", "updated_at", "priority", "due_date"]), help="Sort field")
@click.option("--reverse/--no-reverse", default=True, help="Sort direction")
@click.option("-f", "--format", "output_format", type=click.Choice(["table", "json", "markdown"]), help="Output format")
@click.pass_context
def list_tasks(
ctx: click.Context,
status: str | None,
priority: str | None,
category: str | None,
tag: str | None,
query: str | None,
sort_by: str | None,
reverse: bool,
output_format: str | None,
) -> None:
"""List tasks with optional filters."""
config = ctx.obj["config"]
repo = get_repo(ctx)
tasks = repo.list_all(
status=status,
category=category,
priority=priority,
tag=tag,
query=query,
sort_by=sort_by or config.sort_by,
reverse=reverse if sort_by else config.sort_reverse,
)
fmt = output_format or config.export_format
if fmt == "json":
click.echo(JsonFormatter.format_tasks(tasks))
elif fmt == "markdown":
click.echo(MarkdownFormatter.format_tasks(tasks))
else:
click.echo(TableFormatter.format_tasks(tasks, config.date_format))
@cli.command()
@click.argument("task_id", type=int)
@click.pass_context
def show(ctx: click.Context, task_id: int) -> None:
"""Show task details."""
config = ctx.obj["config"]
repo = get_repo(ctx)
task = repo.get(task_id)
if task is None:
click.echo(f"Error: Task #{task_id} not found.", err=True)
sys.exit(1)
click.echo(TableFormatter.format_task_detail(task, config.date_format))
@cli.command()
@click.argument("task_id", type=int)
@click.option("-t", "--title", help="New title")
@click.option("-d", "--description", help="New description")
@click.option("-p", "--priority", type=click.Choice(["low", "medium", "high", "critical"]), help="New priority")
@click.option("-c", "--category", help="New category")
@click.option("-s", "--status", type=click.Choice(["todo", "in_progress", "blocked", "done", "cancelled"]), help="New status")
@click.option("--due", type=click.DateTime(formats=["%Y-%m-%d", "%Y-%m-%d %H:%M"]), help="New due date")
@click.option("--add-tags", help="Tags to add (comma-separated)")
@click.option("--remove-tags", help="Tags to remove (comma-separated)")
@click.pass_context
def update(
ctx: click.Context,
task_id: int,
title: str | None,
description: str | None,
priority: str | None,
category: str | None,
status: str | None,
due: datetime | None,
add_tags: str | None,
remove_tags: str | None,
) -> None:
"""Update a task."""
repo = get_repo(ctx)
kwargs: dict[str, Any] = {}
if title:
kwargs["title"] = title
if description is not None:
kwargs["description"] = description
if priority:
kwargs["priority"] = Priority.from_str(priority)
if category:
kwargs["category"] = category
if status:
kwargs["status"] = Status.from_str(status)
if due:
kwargs["due_date"] = due
task = repo.get(task_id)
if task is None:
click.echo(f"Error: Task #{task_id} not found.", err=True)
sys.exit(1)
if add_tags:
new_tags = set(task.tags) | {t.strip() for t in add_tags.split(",")}
kwargs["tags"] = list(new_tags)
if remove_tags:
new_tags = set(task.tags) - {t.strip() for t in remove_tags.split(",")}
kwargs["tags"] = list(new_tags)
updated = repo.update(task_id, **kwargs)
if updated:
click.echo(f"✓ Task #{task_id} updated")
@cli.command()
@click.argument("task_id", type=int)
@click.confirmation_option(prompt="Delete this task?")
@click.pass_context
def delete(ctx: click.Context, task_id: int) -> None:
"""Delete a task."""
repo = get_repo(ctx)
if repo.delete(task_id):
click.echo(f"✓ Task #{task_id} deleted")
else:
click.echo(f"Error: Task #{task_id} not found.", err=True)
sys.exit(1)
@cli.command()
@click.argument("task_id", type=int)
@click.pass_context
def done(ctx: click.Context, task_id: int) -> None:
"""Mark a task as done."""
repo = get_repo(ctx)
task = repo.get(task_id)
if task is None:
click.echo(f"Error: Task #{task_id} not found.", err=True)
sys.exit(1)
task.mark_done()
repo.update(task_id, status=Status.DONE, completed_at=task.completed_at)
click.echo(f"✓ Task #{task_id} marked as done")
@cli.command()
@click.argument("query")
@click.option("-f", "--format", "output_format", type=click.Choice(["table", "json", "markdown"]), help="Output format")
@click.pass_context
def search(ctx: click.Context, query: str, output_format: str | None) -> None:
"""Search tasks by title or description."""
config = ctx.obj["config"]
repo = get_repo(ctx)
tasks = repo.list_all(query=query)
fmt = output_format or config.export_format
if fmt == "json":
click.echo(JsonFormatter.format_tasks(tasks))
elif fmt == "markdown":
click.echo(MarkdownFormatter.format_tasks(tasks))
else:
click.echo(TableFormatter.format_tasks(tasks, config.date_format))
@cli.command()
@click.pass_context
def stats(ctx: click.Context) -> None:
"""Show task statistics."""
repo = get_repo(ctx)
counts = repo.count()
click.echo(TableFormatter.format_stats(counts))
@cli.command()
@click.argument("file_path", type=click.Path())
@click.option("-f", "--format", "export_format", type=click.Choice(["json", "markdown"]), default="json", help="Export format")
@click.pass_context
def export(ctx: click.Context, file_path: str, export_format: str) -> None:
"""Export tasks to a file."""
repo = get_repo(ctx)
path = Path(file_path)
if export_format == "json":
content = JsonFormatter.format_tasks(repo.list_all())
else:
content = MarkdownFormatter.format_tasks(repo.list_all())
path.write_text(content, encoding="utf-8")
click.echo(f"✓ Exported {len(repo.list_all())} tasks to {file_path}")
@cli.command()
@click.argument("file_path", type=click.Path(exists=True))
@click.option("-f", "--format", "import_format", type=click.Choice(["json"]), default="json", help="Import format")
@click.pass_context
def import_tasks(ctx: click.Context, file_path: str, import_format: str) -> None:
"""Import tasks from a file."""
repo = get_repo(ctx)
path = Path(file_path)
content = path.read_text(encoding="utf-8")
data = json.loads(content)
count = repo.import_data(data)
click.echo(f"✓ Imported {count} tasks from {file_path}")
@cli.command()
@click.option("--show", "show_config", is_flag=True, help="Show current configuration")
@click.option("--set", "set_values", nargs=2, multiple=True, help="Set config key=value")
@click.pass_context
def config_cmd(ctx: click.Context, show_config: bool, set_values: tuple[tuple[str, str], ...]) -> None:
"""Manage configuration."""
cfg = ctx.obj["config"]
if show_config:
click.echo(f"Config file: {TaskmanConfig.get_default_config_path() / 'config.yaml'}")
click.echo(f"Data file: {cfg.data_file}")
click.echo(f"Priority: {cfg.default_priority}")
click.echo(f"Category: {cfg.default_category}")
click.echo(f"Date format: {cfg.date_format}")
click.echo(f"Color: {cfg.color_enabled}")
click.echo(f"Editor: {cfg.editor}")
click.echo(f"Pager: {cfg.pager}")
return
for key, value in set_values:
if hasattr(cfg, key):
setattr(cfg, key, value)
click.echo(f"✓ Set {key} = {value}")
else:
click.echo(f"Error: Unknown config key: {key}", err=True)
cfg.save()
def main() -> None:
cli(obj={})
if __name__ == "__main__":
main()25.9 测试
python
import json
import tempfile
from datetime import datetime
from pathlib import Path
import pytest
from click.testing import CliRunner
from taskman.models import Task, Priority, Status
from taskman.storage import JsonStorage, TaskRepository
from taskman.cli import cli
@pytest.fixture
def temp_dir():
with tempfile.TemporaryDirectory() as d:
yield Path(d)
@pytest.fixture
def storage(temp_dir):
return JsonStorage(temp_dir / "tasks.json")
@pytest.fixture
def repo(storage):
return TaskRepository(storage)
class TestTaskModel:
def test_create_task(self):
task = Task(title="Test Task", priority=Priority.HIGH)
assert task.title == "Test Task"
assert task.priority == Priority.HIGH
assert task.status == Status.TODO
def test_mark_done(self):
task = Task(title="Test Task")
task.mark_done()
assert task.status == Status.DONE
assert task.completed_at is not None
def test_mark_done_idempotent(self):
task = Task(title="Test Task")
task.mark_done()
first_completed = task.completed_at
task.mark_done()
assert task.completed_at == first_completed
def test_matches_status(self):
task = Task(title="Test", status=Status.IN_PROGRESS)
assert task.matches(status=Status.IN_PROGRESS)
assert not task.matches(status=Status.DONE)
def test_matches_query(self):
task = Task(title="Buy groceries", description="Milk and eggs")
assert task.matches(query="groceries")
assert task.matches(query="milk")
assert not task.matches(query="python")
def test_to_dict_roundtrip(self):
task = Task(
id=1,
title="Test",
priority=Priority.HIGH,
tags=["work", "urgent"],
)
data = task.to_dict()
restored = Task.from_dict(data)
assert restored.id == task.id
assert restored.title == task.title
assert restored.priority == task.priority
assert restored.tags == task.tags
def test_invalid_priority(self):
with pytest.raises(ValueError):
Priority.from_str("invalid")
def test_invalid_status(self):
with pytest.raises(ValueError):
Status.from_str("invalid")
class TestTaskRepository:
def test_add_task(self, repo):
task = Task(title="Test Task")
added = repo.add(task)
assert added.id == 1
assert added.title == "Test Task"
def test_add_multiple_tasks(self, repo):
repo.add(Task(title="Task 1"))
repo.add(Task(title="Task 2"))
assert len(repo.list_all()) == 2
assert repo.get(2).title == "Task 2"
def test_get_nonexistent(self, repo):
assert repo.get(999) is None
def test_update_task(self, repo):
repo.add(Task(title="Original"))
updated = repo.update(1, title="Updated", priority=Priority.HIGH)
assert updated.title == "Updated"
assert updated.priority == Priority.HIGH
def test_delete_task(self, repo):
repo.add(Task(title="To Delete"))
assert repo.delete(1) is True
assert repo.get(1) is None
assert repo.delete(999) is False
def test_list_with_filters(self, repo):
repo.add(Task(title="Work Task", category="work", priority=Priority.HIGH))
repo.add(Task(title="Personal Task", category="personal", priority=Priority.LOW))
repo.add(Task(title="Work Urgent", category="work", priority=Priority.CRITICAL))
work_tasks = repo.list_all(category="work")
assert len(work_tasks) == 2
high_tasks = repo.list_all(priority="high")
assert len(high_tasks) == 1
def test_search(self, repo):
repo.add(Task(title="Buy groceries", description="Milk and eggs"))
repo.add(Task(title="Read Python book", description="Advanced topics"))
repo.add(Task(title="Buy Python course", description="Online class"))
results = repo.list_all(query="python")
assert len(results) == 2
def test_count(self, repo):
repo.add(Task(title="T1", status=Status.TODO))
repo.add(Task(title="T2", status=Status.DONE))
repo.add(Task(title="T3", status=Status.IN_PROGRESS))
counts = repo.count()
assert counts["todo"] == 1
assert counts["done"] == 1
assert counts["in_progress"] == 1
assert counts["total"] == 3
def test_export_import(self, repo, temp_dir):
repo.add(Task(title="Task 1", category="work"))
repo.add(Task(title="Task 2", category="personal"))
data = repo.export_data()
export_path = temp_dir / "export.json"
export_path.write_text(json.dumps(data, indent=2), encoding="utf-8")
new_storage = JsonStorage(temp_dir / "imported.json")
new_repo = TaskRepository(new_storage)
imported_count = new_repo.import_data(data)
assert imported_count == 2
assert len(new_repo.list_all()) == 2
def test_persistence(self, storage):
repo1 = TaskRepository(storage)
repo1.add(Task(title="Persistent Task"))
repo2 = TaskRepository(storage)
assert len(repo2.list_all()) == 1
assert repo2.get(1).title == "Persistent Task"
class TestCLI:
@pytest.fixture
def runner(self, temp_dir):
runner = CliRunner()
config_path = temp_dir / "config.yaml"
data_path = temp_dir / "tasks.json"
return runner, config_path, data_path
def test_add_task(self, runner):
runner_obj, config_path, data_path = runner
result = runner_obj.invoke(cli, [
"--config", str(config_path),
"add", "My First Task",
"-p", "high",
"-c", "work",
])
assert result.exit_code == 0
assert "Task #1 added" in result.output
def test_list_tasks(self, runner):
runner_obj, config_path, data_path = runner
runner_obj.invoke(cli, ["--config", str(config_path), "add", "Task 1"])
runner_obj.invoke(cli, ["--config", str(config_path), "add", "Task 2"])
result = runner_obj.invoke(cli, ["--config", str(config_path), "list"])
assert result.exit_code == 0
assert "Task 1" in result.output
assert "Task 2" in result.output
def test_show_task(self, runner):
runner_obj, config_path, data_path = runner
runner_obj.invoke(cli, ["--config", str(config_path), "add", "Detail Task", "-d", "Some description"])
result = runner_obj.invoke(cli, ["--config", str(config_path), "show", "1"])
assert result.exit_code == 0
assert "Detail Task" in result.output
def test_show_nonexistent(self, runner):
runner_obj, config_path, data_path = runner
result = runner_obj.invoke(cli, ["--config", str(config_path), "show", "999"])
assert result.exit_code == 1
def test_update_task(self, runner):
runner_obj, config_path, data_path = runner
runner_obj.invoke(cli, ["--config", str(config_path), "add", "Original"])
result = runner_obj.invoke(cli, [
"--config", str(config_path),
"update", "1", "-t", "Updated", "-p", "critical",
])
assert result.exit_code == 0
assert "updated" in result.output
def test_done_task(self, runner):
runner_obj, config_path, data_path = runner
runner_obj.invoke(cli, ["--config", str(config_path), "add", "To Complete"])
result = runner_obj.invoke(cli, ["--config", str(config_path), "done", "1"])
assert result.exit_code == 0
assert "done" in result.output
def test_delete_task(self, runner):
runner_obj, config_path, data_path = runner
runner_obj.invoke(cli, ["--config", str(config_path), "add", "To Delete"])
result = runner_obj.invoke(cli, [
"--config", str(config_path),
"delete", "1", "--yes",
])
assert result.exit_code == 0
def test_search(self, runner):
runner_obj, config_path, data_path = runner
runner_obj.invoke(cli, ["--config", str(config_path), "add", "Buy Python book"])
runner_obj.invoke(cli, ["config", str(config_path), "add", "Buy groceries"])
result = runner_obj.invoke(cli, [
"--config", str(config_path),
"search", "python",
])
assert result.exit_code == 0
assert "Python" in result.output
def test_stats(self, runner):
runner_obj, config_path, data_path = runner
runner_obj.invoke(cli, ["--config", str(config_path), "add", "Task 1"])
runner_obj.invoke(cli, ["--config", str(config_path), "add", "Task 2"])
result = runner_obj.invoke(cli, ["--config", str(config_path), "stats"])
assert result.exit_code == 0
assert "Total: 2" in result.output
def test_json_output(self, runner):
runner_obj, config_path, data_path = runner
runner_obj.invoke(cli, ["--config", str(config_path), "add", "JSON Task"])
result = runner_obj.invoke(cli, [
"--config", str(config_path),
"list", "-f", "json",
])
assert result.exit_code == 0
data = json.loads(result.output)
assert len(data) == 1
assert data[0]["title"] == "JSON Task"
def test_version(self, runner):
runner_obj, config_path, data_path = runner
result = runner_obj.invoke(cli, ["--version"])
assert result.exit_code == 0
assert "taskman" in result.output25.10 pyproject.toml
toml
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "taskman"
version = "1.0.0"
description = "A powerful task management CLI tool"
readme = "README.md"
license = "MIT"
requires-python = ">=3.10"
dependencies = [
"click>=8.1.0",
"pyyaml>=6.0",
]
[project.optional-dependencies]
dev = [
"pytest>=8.0.0",
"pytest-cov>=5.0.0",
"ruff>=0.4.0",
"mypy>=1.10.0",
]
[project.scripts]
taskman = "taskman.cli:main"
[tool.ruff]
target-version = "py310"
line-length = 88
src = ["src"]
[tool.ruff.lint]
select = ["E", "W", "F", "I", "N", "UP", "B", "SIM"]
[tool.mypy]
python_version = "3.12"
strict = true
[tool.pytest.ini_options]
testpaths = ["tests"]25.11 前沿技术动态
25.11.1 现代CLI框架
python
import typer
from rich.console import Console
app = typer.Typer()
console = Console()
@app.command()
def hello(name: str, count: int = 1):
for _ in range(count):
console.print(f"Hello [bold green]{name}[/bold green]!")
if __name__ == "__main__":
app()25.11.2 Rich终端输出
python
from rich.console import Console
from rich.table import Table
from rich.progress import track
console = Console()
table = Table(title="Tasks")
table.add_column("ID", style="cyan")
table.add_column("Task", style="magenta")
table.add_column("Status", style="green")
console.print(table)
for item in track(data, description="Processing..."):
process(item)25.11.3 类型化CLI参数
python
from typing import Annotated
import typer
@app.command()
def create(
name: Annotated[str, typer.Option(help="Task name")],
priority: Annotated[int, typer.Option(min=1, max=5)] = 3,
tags: Annotated[list[str], typer.Option()] = []
):
pass25.11.4 异步CLI应用
python
import asyncio
import typer
async def fetch_data():
await asyncio.sleep(1)
return "data"
@app.command()
def fetch():
result = asyncio.run(fetch_data())
console.print(result)25.12 本章小结
本章通过构建完整的 taskman CLI 工具,系统实践了以下核心知识:
- 数据建模:使用 dataclass + Enum 构建类型安全的领域模型
- 存储引擎:抽象存储接口 + JSON 持久化 + 原子写入
- 配置管理:XDG 规范 + YAML 配置 + 分层覆盖
- 输出格式化:表格/JSON/Markdown 多格式 + 彩色终端输出
- CLI 框架:Click 装饰器风格 + 命令分组 + 选项验证
- 测试策略:模型测试 + 存储测试 + CLI 集成测试
- 打包分发:pyproject.toml + entry_points + 可执行脚本
25.13 习题与项目练习
基础练习
功能扩展:为
taskman添加子任务(subtask)支持,允许任务包含嵌套子任务。输出增强:集成 Rich 库,实现更美观的表格和进度条输出。
交互模式:添加
taskman interactive命令,进入交互式 REPL 模式。
进阶练习
插件系统:实现基于 entry_points 的插件架构,允许第三方扩展命令和输出格式。
多存储后端:添加 SQLite 存储后端,支持通过配置切换 JSON/SQLite。
定时提醒:集成系统通知,对即将到期的任务发送桌面提醒。
项目练习
完整 CLI 工具:独立开发一个文件管理 CLI 工具,功能包括:
- 文件搜索与过滤
- 批量重命名
- 重复文件检测
- 目录大小统计
- 配置文件管理
API 客户端 CLI:开发一个 REST API 客户端工具:
- 支持保存和管理 API 端点
- 请求历史记录
- 环境变量替换
- 响应格式化(JSON/YAML/Table)
思考题
如何设计 CLI 工具的配置系统,使其同时支持全局配置、项目级配置和命令行覆盖,并保证优先级一致性?
在开发跨平台 CLI 工具时,需要考虑哪些兼容性问题(路径分隔符、终端编码、信号处理、权限模型)?如何通过抽象层解决?
25.14 延伸阅读
25.14.1 CLI框架
- Click官方文档 (https://click.palletsprojects.com/) — Click框架指南
- Typer文档 (https://typer.tiangolo.com/) — 基于类型提示的CLI框架
- Rich文档 (https://rich.readthedocs.io/) — 终端富文本输出
- Textual文档 (https://textual.textualize.io/) — TUI应用框架
25.14.2 CLI设计规范
- 12 Factor CLI Apps (https://medium.com/@jdxcode/12-factor-cli-apps-dd3c227a0e46) — CLI设计原则
- Command Line Interface Guidelines (https://clig.dev/) — CLI设计指南
- POSIX参数约定 (https://www.gnu.org/software/libc/manual/html_node/Argument-Syntax.html) — 参数语法规范
25.14.3 打包与分发
- Python打包指南 (https://packaging.python.org/) — 官方打包文档
- PyPI发布指南 (https://packaging.python.org/en/latest/tutorials/packaging-projects/) — 发布到PyPI
- Homebrew Python (https://docs.brew.sh/Homebrew-and-Python) — Homebrew打包
25.14.4 经典CLI工具源码
- pip源码 (https://github.com/pypa/pip) — Python包管理器
- pytest源码 (https://github.com/pytest-dev/pytest) — 测试框架CLI
- ripgrep (Rust) (https://github.com/BurntSushi/ripgrep) — 高性能搜索工具
下一章:第26章 Web应用实战