Skip to content

第52章 物联网开发

学习目标

完成本章学习后,你将能够:

  1. 理解物联网架构:设备层、网络层、应用层、云平台
  2. 使用物联网协议:MQTT、CoAP、HTTP、WebSocket
  3. 处理传感器数据:数据采集、数据清洗、数据存储
  4. 开发嵌入式应用:MicroPython、设备控制、GPIO编程
  5. 构建智能家居:设备管理、场景联动、远程控制
  6. 实现边缘计算:本地处理、数据过滤、实时响应
  7. 开发物联网平台:设备注册、数据可视化、规则引擎
  8. 保障物联网安全:设备认证、数据加密、安全通信

52.1 物联网架构

52.1.1 核心概念

python
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any, Callable
from enum import Enum
from datetime import datetime
import json
import asyncio


class DeviceType(Enum):
    SENSOR = "sensor"
    ACTUATOR = "actuator"
    GATEWAY = "gateway"
    CONTROLLER = "controller"


class DeviceStatus(Enum):
    ONLINE = "online"
    OFFLINE = "offline"
    BUSY = "busy"
    ERROR = "error"


@dataclass
class Device:
    device_id: str
    name: str
    device_type: DeviceType
    status: DeviceStatus = DeviceStatus.OFFLINE
    properties: Dict[str, Any] = field(default_factory=dict)
    last_seen: Optional[datetime] = None
    firmware_version: str = "1.0.0"

    def to_dict(self) -> Dict:
        return {
            "device_id": self.device_id,
            "name": self.name,
            "device_type": self.device_type.value,
            "status": self.status.value,
            "properties": self.properties,
            "last_seen": self.last_seen.isoformat() if self.last_seen else None,
            "firmware_version": self.firmware_version
        }


@dataclass
class SensorReading:
    device_id: str
    sensor_type: str
    value: Any
    unit: str
    timestamp: datetime = field(default_factory=datetime.now)
    metadata: Dict[str, Any] = field(default_factory=dict)

    def to_dict(self) -> Dict:
        return {
            "device_id": self.device_id,
            "sensor_type": self.sensor_type,
            "value": self.value,
            "unit": self.unit,
            "timestamp": self.timestamp.isoformat(),
            "metadata": self.metadata
        }


@dataclass
class Command:
    command_id: str
    device_id: str
    action: str
    parameters: Dict[str, Any] = field(default_factory=dict)
    timestamp: datetime = field(default_factory=datetime.now)
    status: str = "pending"
    result: Optional[Dict] = None


class DeviceRegistry:
    def __init__(self):
        self.devices: Dict[str, Device] = {}
        self._callbacks: List[Callable] = []

    def register(self, device: Device) -> None:
        self.devices[device.device_id] = device
        self._notify_callbacks("register", device)

    def unregister(self, device_id: str) -> None:
        if device_id in self.devices:
            device = self.devices.pop(device_id)
            self._notify_callbacks("unregister", device)

    def get_device(self, device_id: str) -> Optional[Device]:
        return self.devices.get(device_id)

    def get_devices_by_type(self, device_type: DeviceType) -> List[Device]:
        return [
            d for d in self.devices.values()
            if d.device_type == device_type
        ]

    def update_status(self, device_id: str, status: DeviceStatus) -> None:
        if device_id in self.devices:
            self.devices[device_id].status = status
            self.devices[device_id].last_seen = datetime.now()
            self._notify_callbacks("status_update", self.devices[device_id])

    def add_callback(self, callback: Callable) -> None:
        self._callbacks.append(callback)

    def _notify_callbacks(self, event: str, device: Device) -> None:
        for callback in self._callbacks:
            try:
                callback(event, device)
            except Exception:
                pass


class DataStore:
    def __init__(self, max_readings: int = 10000):
        self.readings: List[SensorReading] = []
        self.max_readings = max_readings
        self._by_device: Dict[str, List[SensorReading]] = {}
        self._by_type: Dict[str, List[SensorReading]] = {}

    def store(self, reading: SensorReading) -> None:
        self.readings.append(reading)

        if len(self.readings) > self.max_readings:
            removed = self.readings.pop(0)
            if removed.device_id in self._by_device:
                self._by_device[removed.device_id] = [
                    r for r in self._by_device[removed.device_id]
                    if r.timestamp != removed.timestamp
                ]

        if reading.device_id not in self._by_device:
            self._by_device[reading.device_id] = []
        self._by_device[reading.device_id].append(reading)

        if reading.sensor_type not in self._by_type:
            self._by_type[reading.sensor_type] = []
        self._by_type[reading.sensor_type].append(reading)

    def get_readings(
        self,
        device_id: str = None,
        sensor_type: str = None,
        start_time: datetime = None,
        end_time: datetime = None,
        limit: int = 100
    ) -> List[SensorReading]:
        if device_id:
            readings = self._by_device.get(device_id, [])
        elif sensor_type:
            readings = self._by_type.get(sensor_type, [])
        else:
            readings = self.readings

        filtered = []
        for r in readings:
            if start_time and r.timestamp < start_time:
                continue
            if end_time and r.timestamp > end_time:
                continue
            filtered.append(r)

        return filtered[-limit:]

    def get_latest(self, device_id: str) -> Optional[SensorReading]:
        readings = self._by_device.get(device_id, [])
        return readings[-1] if readings else None

    def get_statistics(
        self,
        device_id: str,
        sensor_type: str,
        window_minutes: int = 60
    ) -> Dict:
        readings = self.get_readings(
            device_id=device_id,
            sensor_type=sensor_type
        )

        if not readings:
            return {"count": 0}

        values = [r.value for r in readings if isinstance(r.value, (int, float))]

        if not values:
            return {"count": len(readings)}

        return {
            "count": len(values),
            "min": min(values),
            "max": max(values),
            "avg": sum(values) / len(values),
            "latest": values[-1]
        }

52.2 MQTT协议

52.2.1 MQTT客户端

python
from dataclasses import dataclass
from typing import Dict, List, Optional, Callable, Any
from enum import Enum
import asyncio
import json


class QoS(Enum):
    AT_MOST_ONCE = 0
    AT_LEAST_ONCE = 1
    EXACTLY_ONCE = 2


@dataclass
class MQTTMessage:
    topic: str
    payload: bytes
    qos: QoS = QoS.AT_MOST_ONCE
    retain: bool = False
    message_id: Optional[int] = None


@dataclass
class MQTTConfig:
    host: str = "localhost"
    port: int = 1883
    client_id: str = ""
    username: str = ""
    password: str = ""
    keepalive: int = 60
    clean_session: bool = True


class MQTTClient:
    def __init__(self, config: MQTTConfig = None):
        self.config = config or MQTTConfig()
        self.connected = False
        self._subscriptions: Dict[str, List[Callable]] = {}
        self._message_queue: asyncio.Queue = asyncio.Queue()
        self._message_id = 0

    async def connect(self) -> bool:
        self.connected = True
        return True

    async def disconnect(self) -> None:
        self.connected = False
        self._subscriptions.clear()

    async def subscribe(
        self,
        topic: str,
        callback: Callable,
        qos: QoS = QoS.AT_MOST_ONCE
    ) -> bool:
        if topic not in self._subscriptions:
            self._subscriptions[topic] = []
        self._subscriptions[topic].append(callback)
        return True

    async def unsubscribe(self, topic: str) -> bool:
        if topic in self._subscriptions:
            del self._subscriptions[topic]
        return True

    async def publish(
        self,
        topic: str,
        payload: Any,
        qos: QoS = QoS.AT_MOST_ONCE,
        retain: bool = False
    ) -> bool:
        if not self.connected:
            return False

        if isinstance(payload, (dict, list)):
            payload = json.dumps(payload)
        if isinstance(payload, str):
            payload = payload.encode()

        message = MQTTMessage(
            topic=topic,
            payload=payload,
            qos=qos,
            retain=retain
        )

        await self._message_queue.put(message)
        return True

    async def _handle_message(self, message: MQTTMessage) -> None:
        for topic, callbacks in self._subscriptions.items():
            if self._topic_matches(topic, message.topic):
                for callback in callbacks:
                    try:
                        await callback(message)
                    except Exception:
                        pass

    def _topic_matches(self, pattern: str, topic: str) -> bool:
        pattern_parts = pattern.split('/')
        topic_parts = topic.split('/')

        for i, p in enumerate(pattern_parts):
            if p == '#':
                return True
            if i >= len(topic_parts):
                return False
            if p != '+' and p != topic_parts[i]:
                return False

        return len(pattern_parts) == len(topic_parts)

    def _next_message_id(self) -> int:
        self._message_id = (self._message_id + 1) % 65536
        return self._message_id


class MQTTBroker:
    def __init__(self):
        self.clients: Dict[str, MQTTClient] = {}
        self.retained_messages: Dict[str, MQTTMessage] = {}
        self._topics: Dict[str, List[str]] = {}

    async def handle_connect(self, client_id: str, client: MQTTClient) -> None:
        self.clients[client_id] = client

        for topic, message in self.retained_messages.items():
            await client._handle_message(message)

    async def handle_disconnect(self, client_id: str) -> None:
        self.clients.pop(client_id, None)

    async def handle_publish(self, message: MQTTMessage) -> None:
        if message.retain:
            self.retained_messages[message.topic] = message

        for client_id, client in self.clients.items():
            await client._handle_message(message)

    async def handle_subscribe(
        self,
        client_id: str,
        topic: str,
        qos: QoS
    ) -> None:
        if client_id in self.clients:
            if topic not in self._topics:
                self._topics[topic] = []
            if client_id not in self._topics[topic]:
                self._topics[topic].append(client_id)

52.2.2 传感器模拟

python
from dataclasses import dataclass
from typing import Dict, Optional
import random
import math


@dataclass
class SensorConfig:
    sensor_id: str
    sensor_type: str
    unit: str
    min_value: float
    max_value: float
    update_interval: float = 1.0
    noise_level: float = 0.1


class SensorSimulator:
    def __init__(self, config: SensorConfig):
        self.config = config
        self._current_value = (config.min_value + config.max_value) / 2
        self._trend = 0.0

    def read(self) -> SensorReading:
        self._update_value()

        return SensorReading(
            device_id=self.config.sensor_id,
            sensor_type=self.config.sensor_type,
            value=round(self._current_value, 2),
            unit=self.config.unit,
            metadata={
                "min": self.config.min_value,
                "max": self.config.max_value
            }
        )

    def _update_value(self) -> None:
        self._trend += random.uniform(-0.1, 0.1)
        self._trend = max(-1, min(1, self._trend))

        change = self._trend * 0.5
        noise = random.uniform(-1, 1) * self.config.noise_level

        self._current_value += change + noise

        self._current_value = max(
            self.config.min_value,
            min(self.config.max_value, self._current_value)
        )

        if random.random() < 0.05:
            self._trend = -self._trend


class TemperatureSensor(SensorSimulator):
    def __init__(self, sensor_id: str):
        super().__init__(SensorConfig(
            sensor_id=sensor_id,
            sensor_type="temperature",
            unit="°C",
            min_value=-10.0,
            max_value=50.0,
            noise_level=0.5
        ))


class HumiditySensor(SensorSimulator):
    def __init__(self, sensor_id: str):
        super().__init__(SensorConfig(
            sensor_id=sensor_id,
            sensor_type="humidity",
            unit="%",
            min_value=0.0,
            max_value=100.0,
            noise_level=2.0
        ))


class PressureSensor(SensorSimulator):
    def __init__(self, sensor_id: str):
        super().__init__(SensorConfig(
            sensor_id=sensor_id,
            sensor_type="pressure",
            unit="hPa",
            min_value=950.0,
            max_value=1050.0,
            noise_level=1.0
        ))


class LightSensor(SensorSimulator):
    def __init__(self, sensor_id: str):
        super().__init__(SensorConfig(
            sensor_id=sensor_id,
            sensor_type="light",
            unit="lux",
            min_value=0.0,
            max_value=100000.0,
            noise_level=100.0
        ))


class MotionSensor:
    def __init__(self, sensor_id: str):
        self.sensor_id = sensor_id
        self.sensor_type = "motion"
        self._motion_detected = False
        self._cooldown = 0

    def read(self) -> SensorReading:
        if self._cooldown > 0:
            self._cooldown -= 1
        else:
            self._motion_detected = random.random() < 0.1
            if self._motion_detected:
                self._cooldown = 5

        return SensorReading(
            device_id=self.sensor_id,
            sensor_type=self.sensor_type,
            value=self._motion_detected,
            unit="boolean"
        )

52.3 智能家居系统

52.3.1 设备控制

python
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Callable, Any
from enum import Enum
from datetime import datetime
import asyncio


class DeviceCapability(Enum):
    ON_OFF = "on_off"
    BRIGHTNESS = "brightness"
    COLOR = "color"
    TEMPERATURE = "temperature"
    SPEED = "speed"


@dataclass
class SmartDevice:
    device_id: str
    name: str
    room: str
    capabilities: List[DeviceCapability]
    state: Dict[str, Any] = field(default_factory=dict)
    is_online: bool = True

    def turn_on(self) -> None:
        if DeviceCapability.ON_OFF in self.capabilities:
            self.state["power"] = True

    def turn_off(self) -> None:
        if DeviceCapability.ON_OFF in self.capabilities:
            self.state["power"] = False

    def set_brightness(self, level: int) -> None:
        if DeviceCapability.BRIGHTNESS in self.capabilities:
            self.state["brightness"] = max(0, min(100, level))

    def set_color(self, r: int, g: int, b: int) -> None:
        if DeviceCapability.COLOR in self.capabilities:
            self.state["color"] = {
                "r": max(0, min(255, r)),
                "g": max(0, min(255, g)),
                "b": max(0, min(255, b))
            }

    def set_temperature(self, temp: float) -> None:
        if DeviceCapability.TEMPERATURE in self.capabilities:
            self.state["temperature"] = temp


class SmartLight(SmartDevice):
    def __init__(self, device_id: str, name: str, room: str):
        super().__init__(
            device_id=device_id,
            name=name,
            room=room,
            capabilities=[
                DeviceCapability.ON_OFF,
                DeviceCapability.BRIGHTNESS,
                DeviceCapability.COLOR
            ],
            state={
                "power": False,
                "brightness": 100,
                "color": {"r": 255, "g": 255, "b": 255}
            }
        )


class SmartThermostat(SmartDevice):
    def __init__(self, device_id: str, name: str, room: str):
        super().__init__(
            device_id=device_id,
            name=name,
            room=room,
            capabilities=[
                DeviceCapability.ON_OFF,
                DeviceCapability.TEMPERATURE
            ],
            state={
                "power": True,
                "temperature": 22.0,
                "mode": "auto",
                "target_temp": 22.0
            }
        )

    def set_mode(self, mode: str) -> None:
        if mode in ["auto", "heat", "cool", "off"]:
            self.state["mode"] = mode


class SmartPlug(SmartDevice):
    def __init__(self, device_id: str, name: str, room: str):
        super().__init__(
            device_id=device_id,
            name=name,
            room=room,
            capabilities=[DeviceCapability.ON_OFF],
            state={
                "power": False,
                "energy_consumption": 0.0
            }
        )


class SceneManager:
    def __init__(self):
        self.scenes: Dict[str, Dict] = {}
        self.devices: Dict[str, SmartDevice] = {}

    def register_device(self, device: SmartDevice) -> None:
        self.devices[device.device_id] = device

    def create_scene(
        self,
        name: str,
        actions: List[Dict]
    ) -> None:
        self.scenes[name] = {
            "name": name,
            "actions": actions,
            "created_at": datetime.now()
        }

    def activate_scene(self, name: str) -> bool:
        if name not in self.scenes:
            return False

        scene = self.scenes[name]

        for action in scene["actions"]:
            device_id = action.get("device_id")
            device = self.devices.get(device_id)

            if not device:
                continue

            for key, value in action.get("state", {}).items():
                if key == "power":
                    if value:
                        device.turn_on()
                    else:
                        device.turn_off()
                elif key == "brightness":
                    device.set_brightness(value)
                elif key == "color":
                    device.set_color(**value)
                elif key == "temperature":
                    device.set_temperature(value)

        return True

    def get_scenes(self) -> List[str]:
        return list(self.scenes.keys())


class AutomationRule:
    def __init__(
        self,
        name: str,
        trigger: Dict,
        actions: List[Dict],
        enabled: bool = True
    ):
        self.name = name
        self.trigger = trigger
        self.actions = actions
        self.enabled = enabled
        self.last_triggered: Optional[datetime] = None

    def check_trigger(self, event: Dict) -> bool:
        if not self.enabled:
            return False

        trigger_type = self.trigger.get("type")

        if trigger_type == "time":
            return self._check_time_trigger(event)
        elif trigger_type == "sensor":
            return self._check_sensor_trigger(event)
        elif trigger_type == "device":
            return self._check_device_trigger(event)

        return False

    def _check_time_trigger(self, event: Dict) -> bool:
        if event.get("type") != "time":
            return False

        now = datetime.now()
        trigger_time = self.trigger.get("time")

        return now.strftime("%H:%M") == trigger_time

    def _check_sensor_trigger(self, event: Dict) -> bool:
        if event.get("type") != "sensor":
            return False

        sensor_id = self.trigger.get("sensor_id")
        condition = self.trigger.get("condition")
        value = event.get("value")

        if event.get("sensor_id") != sensor_id:
            return False

        if condition == "above":
            return value > self.trigger.get("threshold", 0)
        elif condition == "below":
            return value < self.trigger.get("threshold", 0)
        elif condition == "equals":
            return value == self.trigger.get("value")

        return False

    def _check_device_trigger(self, event: Dict) -> bool:
        if event.get("type") != "device":
            return False

        device_id = self.trigger.get("device_id")
        state = self.trigger.get("state")

        return (
            event.get("device_id") == device_id and
            event.get("state") == state
        )


class AutomationEngine:
    def __init__(self, scene_manager: SceneManager):
        self.scene_manager = scene_manager
        self.rules: List[AutomationRule] = []
        self._event_queue: asyncio.Queue = asyncio.Queue()

    def add_rule(self, rule: AutomationRule) -> None:
        self.rules.append(rule)

    def remove_rule(self, name: str) -> None:
        self.rules = [r for r in self.rules if r.name != name]

    async def process_event(self, event: Dict) -> None:
        await self._event_queue.put(event)

    async def run(self) -> None:
        while True:
            event = await self._event_queue.get()

            for rule in self.rules:
                if rule.check_trigger(event):
                    await self._execute_actions(rule.actions)
                    rule.last_triggered = datetime.now()

    async def _execute_actions(self, actions: List[Dict]) -> None:
        for action in actions:
            action_type = action.get("type")

            if action_type == "scene":
                self.scene_manager.activate_scene(action.get("scene"))
            elif action_type == "device":
                device = self.scene_manager.devices.get(action.get("device_id"))
                if device:
                    for key, value in action.get("state", {}).items():
                        if key == "power":
                            if value:
                                device.turn_on()
                            else:
                                device.turn_off()

52.4 边缘计算

52.4.1 数据处理

python
from dataclasses import dataclass
from typing import List, Dict, Optional, Callable
from datetime import datetime, timedelta
import statistics


@dataclass
class DataWindow:
    readings: List[SensorReading]
    window_size: int
    start_time: datetime
    end_time: datetime


class EdgeProcessor:
    def __init__(self, window_size: int = 100):
        self.window_size = window_size
        self.windows: Dict[str, List[SensorReading]] = {}
        self.rules: List[Dict] = []
        self._callbacks: List[Callable] = []

    def process(self, reading: SensorReading) -> Optional[Dict]:
        device_id = reading.device_id

        if device_id not in self.windows:
            self.windows[device_id] = []

        self.windows[device_id].append(reading)

        if len(self.windows[device_id]) > self.window_size:
            self.windows[device_id].pop(0)

        result = self._analyze_window(device_id)

        for rule in self.rules:
            if self._check_rule(rule, reading, result):
                self._trigger_alert(rule, reading)

        return result

    def _analyze_window(self, device_id: str) -> Dict:
        readings = self.windows.get(device_id, [])

        if not readings:
            return {}

        values = [r.value for r in readings if isinstance(r.value, (int, float))]

        if not values:
            return {"count": len(readings)}

        return {
            "count": len(values),
            "min": min(values),
            "max": max(values),
            "avg": statistics.mean(values),
            "median": statistics.median(values),
            "std_dev": statistics.stdev(values) if len(values) > 1 else 0,
            "latest": values[-1],
            "trend": self._calculate_trend(values)
        }

    def _calculate_trend(self, values: List[float]) -> str:
        if len(values) < 2:
            return "stable"

        recent = values[-10:] if len(values) >= 10 else values
        older = values[:-10] if len(values) >= 20 else values[:len(values)//2]

        if not older:
            return "stable"

        recent_avg = statistics.mean(recent)
        older_avg = statistics.mean(older)

        change = (recent_avg - older_avg) / older_avg * 100

        if change > 5:
            return "increasing"
        elif change < -5:
            return "decreasing"
        return "stable"

    def add_rule(
        self,
        name: str,
        condition: str,
        threshold: float,
        action: Callable
    ) -> None:
        self.rules.append({
            "name": name,
            "condition": condition,
            "threshold": threshold,
            "action": action
        })

    def _check_rule(
        self,
        rule: Dict,
        reading: SensorReading,
        analysis: Dict
    ) -> bool:
        condition = rule["condition"]
        threshold = rule["threshold"]

        if condition == "above":
            return reading.value > threshold
        elif condition == "below":
            return reading.value < threshold
        elif condition == "std_dev_high":
            return analysis.get("std_dev", 0) > threshold

        return False

    def _trigger_alert(self, rule: Dict, reading: SensorReading) -> None:
        alert = {
            "rule": rule["name"],
            "device_id": reading.device_id,
            "value": reading.value,
            "timestamp": datetime.now()
        }

        for callback in self._callbacks:
            try:
                callback(alert)
            except Exception:
                pass

    def add_alert_callback(self, callback: Callable) -> None:
        self._callbacks.append(callback)


class DataFilter:
    def __init__(self):
        self.filters: List[Callable] = []

    def add_filter(self, filter_func: Callable) -> None:
        self.filters.append(filter_func)

    def apply(self, reading: SensorReading) -> Optional[SensorReading]:
        for filter_func in self.filters:
            if not filter_func(reading):
                return None
        return reading

    @staticmethod
    def range_filter(min_val: float, max_val: float) -> Callable:
        def filter_func(reading: SensorReading) -> bool:
            if isinstance(reading.value, (int, float)):
                return min_val <= reading.value <= max_val
            return True
        return filter_func

    @staticmethod
    def rate_limit(min_interval: float) -> Callable:
        last_time: Dict[str, datetime] = {}

        def filter_func(reading: SensorReading) -> bool:
            device_id = reading.device_id
            now = datetime.now()

            if device_id in last_time:
                elapsed = (now - last_time[device_id]).total_seconds()
                if elapsed < min_interval:
                    return False

            last_time[device_id] = now
            return True

        return filter_func

    @staticmethod
    def outlier_filter(std_dev_threshold: float = 3.0) -> Callable:
        values: Dict[str, List[float]] = {}

        def filter_func(reading: SensorReading) -> bool:
            if not isinstance(reading.value, (int, float)):
                return True

            device_id = reading.device_id

            if device_id not in values:
                values[device_id] = []

            history = values[device_id]

            if len(history) < 3:
                history.append(reading.value)
                return True

            mean = statistics.mean(history)
            std = statistics.stdev(history)

            if std == 0:
                history.append(reading.value)
                return True

            z_score = abs(reading.value - mean) / std

            if z_score > std_dev_threshold:
                return False

            history.append(reading.value)
            if len(history) > 100:
                history.pop(0)

            return True

        return filter_func

52.5 物联网安全

52.5.1 设备认证

python
from dataclasses import dataclass
from typing import Dict, Optional
from datetime import datetime, timedelta
import hashlib
import secrets
import hmac


@dataclass
class DeviceCredentials:
    device_id: str
    api_key: str
    secret_key: str
    created_at: datetime
    expires_at: Optional[datetime] = None
    permissions: list = None

    def __post_init__(self):
        if self.permissions is None:
            self.permissions = ["read", "write"]


class AuthenticationManager:
    def __init__(self):
        self.credentials: Dict[str, DeviceCredentials] = {}
        self._sessions: Dict[str, dict] = {}

    def register_device(
        self,
        device_id: str,
        permissions: list = None
    ) -> DeviceCredentials:
        api_key = secrets.token_urlsafe(32)
        secret_key = secrets.token_urlsafe(64)

        credentials = DeviceCredentials(
            device_id=device_id,
            api_key=api_key,
            secret_key=secret_key,
            created_at=datetime.now(),
            permissions=permissions or ["read"]
        )

        self.credentials[device_id] = credentials
        return credentials

    def authenticate(
        self,
        device_id: str,
        api_key: str,
        signature: str,
        timestamp: str,
        payload: str = ""
    ) -> bool:
        if device_id not in self.credentials:
            return False

        cred = self.credentials[device_id]

        if cred.api_key != api_key:
            return False

        try:
            ts = datetime.fromisoformat(timestamp)
            if abs((datetime.now() - ts).total_seconds()) > 300:
                return False
        except ValueError:
            return False

        expected_signature = self._generate_signature(
            cred.secret_key,
            timestamp,
            payload
        )

        return hmac.compare_digest(signature, expected_signature)

    def _generate_signature(
        self,
        secret_key: str,
        timestamp: str,
        payload: str
    ) -> str:
        message = f"{timestamp}:{payload}"
        return hmac.new(
            secret_key.encode(),
            message.encode(),
            hashlib.sha256
        ).hexdigest()

    def create_session(
        self,
        device_id: str,
        duration_hours: int = 24
    ) -> str:
        session_token = secrets.token_urlsafe(48)

        self._sessions[session_token] = {
            "device_id": device_id,
            "created_at": datetime.now(),
            "expires_at": datetime.now() + timedelta(hours=duration_hours)
        }

        return session_token

    def validate_session(self, session_token: str) -> Optional[str]:
        if session_token not in self._sessions:
            return None

        session = self._sessions[session_token]

        if datetime.now() > session["expires_at"]:
            del self._sessions[session_token]
            return None

        return session["device_id"]

    def revoke_session(self, session_token: str) -> None:
        self._sessions.pop(session_token, None)


class SecureCommunicator:
    def __init__(self, secret_key: str):
        self.secret_key = secret_key.encode()

    def encrypt_message(self, plaintext: str) -> dict:
        from cryptography.fernet import Fernet
        import base64

        key = base64.urlsafe_b64encode(
            hashlib.sha256(self.secret_key).digest()
        )
        fernet = Fernet(key)

        ciphertext = fernet.encrypt(plaintext.encode())

        return {
            "ciphertext": ciphertext.decode(),
            "algorithm": "AES-256-Fernet"
        }

    def decrypt_message(self, ciphertext: str) -> str:
        from cryptography.fernet import Fernet
        import base64

        key = base64.urlsafe_b64encode(
            hashlib.sha256(self.secret_key).digest()
        )
        fernet = Fernet(key)

        return fernet.decrypt(ciphertext.encode()).decode()

52.5.2 安全最佳实践

python
class SecurityBestPractices:
    @staticmethod
    def validate_device_input(data: dict) -> bool:
        required_fields = ["device_id", "timestamp", "signature"]

        for field in required_fields:
            if field not in data:
                return False

        if len(data.get("device_id", "")) > 64:
            return False

        if len(data.get("signature", "")) != 64:
            return False

        return True

    @staticmethod
    def sanitize_topic(topic: str) -> str:
        invalid_chars = ['\x00', '\n', '\r']
        for char in invalid_chars:
            topic = topic.replace(char, '')

        if '..' in topic or topic.startswith('/'):
            raise ValueError("Invalid topic format")

        return topic

    @staticmethod
    def rate_limit_check(
        device_id: str,
        max_requests: int = 100,
        window_seconds: int = 60
    ) -> bool:
        import time

        if not hasattr(SecurityBestPractices, '_rate_limits'):
            SecurityBestPractices._rate_limits = {}

        now = time.time()
        key = device_id

        if key not in SecurityBestPractices._rate_limits:
            SecurityBestPractices._rate_limits[key] = []

        requests = SecurityBestPractices._rate_limits[key]

        requests[:] = [t for t in requests if now - t < window_seconds]

        if len(requests) >= max_requests:
            return False

        requests.append(now)
        return True

学术注记:物联网安全面临独特挑战,包括设备资源受限、网络环境复杂、攻击面广泛等问题。OWASP IoT Top 10列出了最常见的物联网安全风险:弱密码、不安全的网络服务、不安全的生态系统接口等。在设计物联网系统时,应遵循"安全即设计"(Security by Design)原则。


52.6 本章小结

本章详细介绍了Python物联网开发的核心概念和实践:

  1. 物联网架构:设备注册、数据存储、设备管理
  2. MQTT协议:消息发布订阅、QoS级别、主题匹配
  3. 智能家居:设备控制、场景管理、自动化规则
  4. 边缘计算:数据处理、数据过滤、异常检测
  5. 物联网安全:设备认证、加密通信、安全最佳实践

52.6.1 知识图谱

物联网开发
├── 架构设计
│   ├── 设备层(传感器、执行器)
│   ├── 网络层(MQTT、CoAP、HTTP)
│   ├── 边缘层(数据处理、过滤)
│   └── 云平台(存储、分析、可视化)
├── 通信协议
│   ├── MQTT(发布订阅、QoS)
│   ├── CoAP(REST风格、UDP)
│   ├── HTTP/REST(通用、易用)
│   └── WebSocket(实时双向)
├── 数据处理
│   ├── 数据采集(传感器模拟)
│   ├── 数据清洗(过滤、验证)
│   ├── 数据存储(时序数据库)
│   └── 数据分析(统计、趋势)
├── 智能控制
│   ├── 设备管理(注册、状态)
│   ├── 场景联动(预设、触发)
│   ├── 自动化规则(条件、动作)
│   └── 远程控制(API、界面)
└── 安全机制
    ├── 设备认证(API Key、证书)
    ├── 数据加密(TLS、AES)
    ├── 访问控制(权限、角色)
    └── 安全审计(日志、监控)

52.6.2 最佳实践清单

场景推荐做法避免做法
设备标识使用UUID或唯一序列号使用易猜测的ID
通信协议MQTT用于实时数据,HTTP用于配置所有场景都用HTTP轮询
QoS选择QoS 1用于重要数据,QoS 0用于遥测所有消息都用QoS 2
数据格式JSON或Protocol Buffers自定义二进制格式
安全认证双向TLS + API Key仅使用密码
边缘处理本地过滤、聚合后上传原始数据全部上传
错误处理指数退避重连无限重试
固件更新OTA安全签名验证直接执行下载的代码

52.6.3 技术选型指南

需求场景推荐方案备选方案
轻量级协议MQTTCoAP
实时双向WebSocketMQTT
设备数量大MQTT Broker集群云IoT平台
边缘计算EdgeX FoundryAWS Greengrass
时序数据InfluxDBTimescaleDB
可视化Grafana自定义Dashboard
设备管理Home Assistant自研平台

练习题

基础题

  1. 实现一个简单的温度传感器模拟器,每秒生成一个随机温度值(15-30°C),并通过MQTT发布。

  2. 创建一个设备注册表,支持设备的添加、删除、查询和状态更新。

  3. 实现一个简单的场景管理器,支持创建"回家模式"和"离家模式"场景。

进阶题

  1. 实现一个完整的MQTT客户端,支持QoS 1和QoS 2,包括消息确认和重传机制。

  2. 开发一个边缘计算网关,实现数据过滤、聚合和异常检测功能。

  3. 实现一个物联网安全系统,包括设备认证、消息加密和访问控制。

项目实践

  1. 智能家居控制中心:构建一个完整的智能家居控制系统,要求:
    • 支持多种设备类型(灯光、空调、窗帘、传感器)
    • 实现场景联动和自动化规则
    • 提供Web界面和API接口
    • 实现设备认证和安全通信
    • 支持历史数据存储和可视化

思考题

  1. MQTT的QoS 0、1、2三种级别分别适用于什么场景?在选择QoS级别时需要考虑哪些因素?

  2. 边缘计算与云计算相比有哪些优势和劣势?在什么场景下应该优先选择边缘计算?

  3. 物联网设备的安全挑战与传统IT系统有何不同?如何设计一个安全的物联网架构?

扩展阅读

52.7.1 协议规范

52.7.2 平台与框架

52.7.3 嵌入式开发

52.7.4 安全标准

52.7.5 进阶书籍

  • 《物联网系统设计》 — 系统架构与实现方法
  • 《MQTT Essentials》 — MQTT协议深度解析
  • 《Building the Internet of Things》 — 物联网企业级应用开发
  • 《Practical IoT Hacking》 — 物联网安全测试指南

下一章:第53章 3D图形编程

Python技术丛书 - 江苏省宿城中等专业学校