第52章 物联网开发
学习目标
完成本章学习后,你将能够:
- 理解物联网架构:设备层、网络层、应用层、云平台
- 使用物联网协议:MQTT、CoAP、HTTP、WebSocket
- 处理传感器数据:数据采集、数据清洗、数据存储
- 开发嵌入式应用:MicroPython、设备控制、GPIO编程
- 构建智能家居:设备管理、场景联动、远程控制
- 实现边缘计算:本地处理、数据过滤、实时响应
- 开发物联网平台:设备注册、数据可视化、规则引擎
- 保障物联网安全:设备认证、数据加密、安全通信
52.1 物联网架构
52.1.1 核心概念
python
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any, Callable
from enum import Enum
from datetime import datetime
import json
import asyncio
class DeviceType(Enum):
SENSOR = "sensor"
ACTUATOR = "actuator"
GATEWAY = "gateway"
CONTROLLER = "controller"
class DeviceStatus(Enum):
ONLINE = "online"
OFFLINE = "offline"
BUSY = "busy"
ERROR = "error"
@dataclass
class Device:
device_id: str
name: str
device_type: DeviceType
status: DeviceStatus = DeviceStatus.OFFLINE
properties: Dict[str, Any] = field(default_factory=dict)
last_seen: Optional[datetime] = None
firmware_version: str = "1.0.0"
def to_dict(self) -> Dict:
return {
"device_id": self.device_id,
"name": self.name,
"device_type": self.device_type.value,
"status": self.status.value,
"properties": self.properties,
"last_seen": self.last_seen.isoformat() if self.last_seen else None,
"firmware_version": self.firmware_version
}
@dataclass
class SensorReading:
device_id: str
sensor_type: str
value: Any
unit: str
timestamp: datetime = field(default_factory=datetime.now)
metadata: Dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> Dict:
return {
"device_id": self.device_id,
"sensor_type": self.sensor_type,
"value": self.value,
"unit": self.unit,
"timestamp": self.timestamp.isoformat(),
"metadata": self.metadata
}
@dataclass
class Command:
command_id: str
device_id: str
action: str
parameters: Dict[str, Any] = field(default_factory=dict)
timestamp: datetime = field(default_factory=datetime.now)
status: str = "pending"
result: Optional[Dict] = None
class DeviceRegistry:
def __init__(self):
self.devices: Dict[str, Device] = {}
self._callbacks: List[Callable] = []
def register(self, device: Device) -> None:
self.devices[device.device_id] = device
self._notify_callbacks("register", device)
def unregister(self, device_id: str) -> None:
if device_id in self.devices:
device = self.devices.pop(device_id)
self._notify_callbacks("unregister", device)
def get_device(self, device_id: str) -> Optional[Device]:
return self.devices.get(device_id)
def get_devices_by_type(self, device_type: DeviceType) -> List[Device]:
return [
d for d in self.devices.values()
if d.device_type == device_type
]
def update_status(self, device_id: str, status: DeviceStatus) -> None:
if device_id in self.devices:
self.devices[device_id].status = status
self.devices[device_id].last_seen = datetime.now()
self._notify_callbacks("status_update", self.devices[device_id])
def add_callback(self, callback: Callable) -> None:
self._callbacks.append(callback)
def _notify_callbacks(self, event: str, device: Device) -> None:
for callback in self._callbacks:
try:
callback(event, device)
except Exception:
pass
class DataStore:
def __init__(self, max_readings: int = 10000):
self.readings: List[SensorReading] = []
self.max_readings = max_readings
self._by_device: Dict[str, List[SensorReading]] = {}
self._by_type: Dict[str, List[SensorReading]] = {}
def store(self, reading: SensorReading) -> None:
self.readings.append(reading)
if len(self.readings) > self.max_readings:
removed = self.readings.pop(0)
if removed.device_id in self._by_device:
self._by_device[removed.device_id] = [
r for r in self._by_device[removed.device_id]
if r.timestamp != removed.timestamp
]
if reading.device_id not in self._by_device:
self._by_device[reading.device_id] = []
self._by_device[reading.device_id].append(reading)
if reading.sensor_type not in self._by_type:
self._by_type[reading.sensor_type] = []
self._by_type[reading.sensor_type].append(reading)
def get_readings(
self,
device_id: str = None,
sensor_type: str = None,
start_time: datetime = None,
end_time: datetime = None,
limit: int = 100
) -> List[SensorReading]:
if device_id:
readings = self._by_device.get(device_id, [])
elif sensor_type:
readings = self._by_type.get(sensor_type, [])
else:
readings = self.readings
filtered = []
for r in readings:
if start_time and r.timestamp < start_time:
continue
if end_time and r.timestamp > end_time:
continue
filtered.append(r)
return filtered[-limit:]
def get_latest(self, device_id: str) -> Optional[SensorReading]:
readings = self._by_device.get(device_id, [])
return readings[-1] if readings else None
def get_statistics(
self,
device_id: str,
sensor_type: str,
window_minutes: int = 60
) -> Dict:
readings = self.get_readings(
device_id=device_id,
sensor_type=sensor_type
)
if not readings:
return {"count": 0}
values = [r.value for r in readings if isinstance(r.value, (int, float))]
if not values:
return {"count": len(readings)}
return {
"count": len(values),
"min": min(values),
"max": max(values),
"avg": sum(values) / len(values),
"latest": values[-1]
}52.2 MQTT协议
52.2.1 MQTT客户端
python
from dataclasses import dataclass
from typing import Dict, List, Optional, Callable, Any
from enum import Enum
import asyncio
import json
class QoS(Enum):
AT_MOST_ONCE = 0
AT_LEAST_ONCE = 1
EXACTLY_ONCE = 2
@dataclass
class MQTTMessage:
topic: str
payload: bytes
qos: QoS = QoS.AT_MOST_ONCE
retain: bool = False
message_id: Optional[int] = None
@dataclass
class MQTTConfig:
host: str = "localhost"
port: int = 1883
client_id: str = ""
username: str = ""
password: str = ""
keepalive: int = 60
clean_session: bool = True
class MQTTClient:
def __init__(self, config: MQTTConfig = None):
self.config = config or MQTTConfig()
self.connected = False
self._subscriptions: Dict[str, List[Callable]] = {}
self._message_queue: asyncio.Queue = asyncio.Queue()
self._message_id = 0
async def connect(self) -> bool:
self.connected = True
return True
async def disconnect(self) -> None:
self.connected = False
self._subscriptions.clear()
async def subscribe(
self,
topic: str,
callback: Callable,
qos: QoS = QoS.AT_MOST_ONCE
) -> bool:
if topic not in self._subscriptions:
self._subscriptions[topic] = []
self._subscriptions[topic].append(callback)
return True
async def unsubscribe(self, topic: str) -> bool:
if topic in self._subscriptions:
del self._subscriptions[topic]
return True
async def publish(
self,
topic: str,
payload: Any,
qos: QoS = QoS.AT_MOST_ONCE,
retain: bool = False
) -> bool:
if not self.connected:
return False
if isinstance(payload, (dict, list)):
payload = json.dumps(payload)
if isinstance(payload, str):
payload = payload.encode()
message = MQTTMessage(
topic=topic,
payload=payload,
qos=qos,
retain=retain
)
await self._message_queue.put(message)
return True
async def _handle_message(self, message: MQTTMessage) -> None:
for topic, callbacks in self._subscriptions.items():
if self._topic_matches(topic, message.topic):
for callback in callbacks:
try:
await callback(message)
except Exception:
pass
def _topic_matches(self, pattern: str, topic: str) -> bool:
pattern_parts = pattern.split('/')
topic_parts = topic.split('/')
for i, p in enumerate(pattern_parts):
if p == '#':
return True
if i >= len(topic_parts):
return False
if p != '+' and p != topic_parts[i]:
return False
return len(pattern_parts) == len(topic_parts)
def _next_message_id(self) -> int:
self._message_id = (self._message_id + 1) % 65536
return self._message_id
class MQTTBroker:
def __init__(self):
self.clients: Dict[str, MQTTClient] = {}
self.retained_messages: Dict[str, MQTTMessage] = {}
self._topics: Dict[str, List[str]] = {}
async def handle_connect(self, client_id: str, client: MQTTClient) -> None:
self.clients[client_id] = client
for topic, message in self.retained_messages.items():
await client._handle_message(message)
async def handle_disconnect(self, client_id: str) -> None:
self.clients.pop(client_id, None)
async def handle_publish(self, message: MQTTMessage) -> None:
if message.retain:
self.retained_messages[message.topic] = message
for client_id, client in self.clients.items():
await client._handle_message(message)
async def handle_subscribe(
self,
client_id: str,
topic: str,
qos: QoS
) -> None:
if client_id in self.clients:
if topic not in self._topics:
self._topics[topic] = []
if client_id not in self._topics[topic]:
self._topics[topic].append(client_id)52.2.2 传感器模拟
python
from dataclasses import dataclass
from typing import Dict, Optional
import random
import math
@dataclass
class SensorConfig:
sensor_id: str
sensor_type: str
unit: str
min_value: float
max_value: float
update_interval: float = 1.0
noise_level: float = 0.1
class SensorSimulator:
def __init__(self, config: SensorConfig):
self.config = config
self._current_value = (config.min_value + config.max_value) / 2
self._trend = 0.0
def read(self) -> SensorReading:
self._update_value()
return SensorReading(
device_id=self.config.sensor_id,
sensor_type=self.config.sensor_type,
value=round(self._current_value, 2),
unit=self.config.unit,
metadata={
"min": self.config.min_value,
"max": self.config.max_value
}
)
def _update_value(self) -> None:
self._trend += random.uniform(-0.1, 0.1)
self._trend = max(-1, min(1, self._trend))
change = self._trend * 0.5
noise = random.uniform(-1, 1) * self.config.noise_level
self._current_value += change + noise
self._current_value = max(
self.config.min_value,
min(self.config.max_value, self._current_value)
)
if random.random() < 0.05:
self._trend = -self._trend
class TemperatureSensor(SensorSimulator):
def __init__(self, sensor_id: str):
super().__init__(SensorConfig(
sensor_id=sensor_id,
sensor_type="temperature",
unit="°C",
min_value=-10.0,
max_value=50.0,
noise_level=0.5
))
class HumiditySensor(SensorSimulator):
def __init__(self, sensor_id: str):
super().__init__(SensorConfig(
sensor_id=sensor_id,
sensor_type="humidity",
unit="%",
min_value=0.0,
max_value=100.0,
noise_level=2.0
))
class PressureSensor(SensorSimulator):
def __init__(self, sensor_id: str):
super().__init__(SensorConfig(
sensor_id=sensor_id,
sensor_type="pressure",
unit="hPa",
min_value=950.0,
max_value=1050.0,
noise_level=1.0
))
class LightSensor(SensorSimulator):
def __init__(self, sensor_id: str):
super().__init__(SensorConfig(
sensor_id=sensor_id,
sensor_type="light",
unit="lux",
min_value=0.0,
max_value=100000.0,
noise_level=100.0
))
class MotionSensor:
def __init__(self, sensor_id: str):
self.sensor_id = sensor_id
self.sensor_type = "motion"
self._motion_detected = False
self._cooldown = 0
def read(self) -> SensorReading:
if self._cooldown > 0:
self._cooldown -= 1
else:
self._motion_detected = random.random() < 0.1
if self._motion_detected:
self._cooldown = 5
return SensorReading(
device_id=self.sensor_id,
sensor_type=self.sensor_type,
value=self._motion_detected,
unit="boolean"
)52.3 智能家居系统
52.3.1 设备控制
python
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Callable, Any
from enum import Enum
from datetime import datetime
import asyncio
class DeviceCapability(Enum):
ON_OFF = "on_off"
BRIGHTNESS = "brightness"
COLOR = "color"
TEMPERATURE = "temperature"
SPEED = "speed"
@dataclass
class SmartDevice:
device_id: str
name: str
room: str
capabilities: List[DeviceCapability]
state: Dict[str, Any] = field(default_factory=dict)
is_online: bool = True
def turn_on(self) -> None:
if DeviceCapability.ON_OFF in self.capabilities:
self.state["power"] = True
def turn_off(self) -> None:
if DeviceCapability.ON_OFF in self.capabilities:
self.state["power"] = False
def set_brightness(self, level: int) -> None:
if DeviceCapability.BRIGHTNESS in self.capabilities:
self.state["brightness"] = max(0, min(100, level))
def set_color(self, r: int, g: int, b: int) -> None:
if DeviceCapability.COLOR in self.capabilities:
self.state["color"] = {
"r": max(0, min(255, r)),
"g": max(0, min(255, g)),
"b": max(0, min(255, b))
}
def set_temperature(self, temp: float) -> None:
if DeviceCapability.TEMPERATURE in self.capabilities:
self.state["temperature"] = temp
class SmartLight(SmartDevice):
def __init__(self, device_id: str, name: str, room: str):
super().__init__(
device_id=device_id,
name=name,
room=room,
capabilities=[
DeviceCapability.ON_OFF,
DeviceCapability.BRIGHTNESS,
DeviceCapability.COLOR
],
state={
"power": False,
"brightness": 100,
"color": {"r": 255, "g": 255, "b": 255}
}
)
class SmartThermostat(SmartDevice):
def __init__(self, device_id: str, name: str, room: str):
super().__init__(
device_id=device_id,
name=name,
room=room,
capabilities=[
DeviceCapability.ON_OFF,
DeviceCapability.TEMPERATURE
],
state={
"power": True,
"temperature": 22.0,
"mode": "auto",
"target_temp": 22.0
}
)
def set_mode(self, mode: str) -> None:
if mode in ["auto", "heat", "cool", "off"]:
self.state["mode"] = mode
class SmartPlug(SmartDevice):
def __init__(self, device_id: str, name: str, room: str):
super().__init__(
device_id=device_id,
name=name,
room=room,
capabilities=[DeviceCapability.ON_OFF],
state={
"power": False,
"energy_consumption": 0.0
}
)
class SceneManager:
def __init__(self):
self.scenes: Dict[str, Dict] = {}
self.devices: Dict[str, SmartDevice] = {}
def register_device(self, device: SmartDevice) -> None:
self.devices[device.device_id] = device
def create_scene(
self,
name: str,
actions: List[Dict]
) -> None:
self.scenes[name] = {
"name": name,
"actions": actions,
"created_at": datetime.now()
}
def activate_scene(self, name: str) -> bool:
if name not in self.scenes:
return False
scene = self.scenes[name]
for action in scene["actions"]:
device_id = action.get("device_id")
device = self.devices.get(device_id)
if not device:
continue
for key, value in action.get("state", {}).items():
if key == "power":
if value:
device.turn_on()
else:
device.turn_off()
elif key == "brightness":
device.set_brightness(value)
elif key == "color":
device.set_color(**value)
elif key == "temperature":
device.set_temperature(value)
return True
def get_scenes(self) -> List[str]:
return list(self.scenes.keys())
class AutomationRule:
def __init__(
self,
name: str,
trigger: Dict,
actions: List[Dict],
enabled: bool = True
):
self.name = name
self.trigger = trigger
self.actions = actions
self.enabled = enabled
self.last_triggered: Optional[datetime] = None
def check_trigger(self, event: Dict) -> bool:
if not self.enabled:
return False
trigger_type = self.trigger.get("type")
if trigger_type == "time":
return self._check_time_trigger(event)
elif trigger_type == "sensor":
return self._check_sensor_trigger(event)
elif trigger_type == "device":
return self._check_device_trigger(event)
return False
def _check_time_trigger(self, event: Dict) -> bool:
if event.get("type") != "time":
return False
now = datetime.now()
trigger_time = self.trigger.get("time")
return now.strftime("%H:%M") == trigger_time
def _check_sensor_trigger(self, event: Dict) -> bool:
if event.get("type") != "sensor":
return False
sensor_id = self.trigger.get("sensor_id")
condition = self.trigger.get("condition")
value = event.get("value")
if event.get("sensor_id") != sensor_id:
return False
if condition == "above":
return value > self.trigger.get("threshold", 0)
elif condition == "below":
return value < self.trigger.get("threshold", 0)
elif condition == "equals":
return value == self.trigger.get("value")
return False
def _check_device_trigger(self, event: Dict) -> bool:
if event.get("type") != "device":
return False
device_id = self.trigger.get("device_id")
state = self.trigger.get("state")
return (
event.get("device_id") == device_id and
event.get("state") == state
)
class AutomationEngine:
def __init__(self, scene_manager: SceneManager):
self.scene_manager = scene_manager
self.rules: List[AutomationRule] = []
self._event_queue: asyncio.Queue = asyncio.Queue()
def add_rule(self, rule: AutomationRule) -> None:
self.rules.append(rule)
def remove_rule(self, name: str) -> None:
self.rules = [r for r in self.rules if r.name != name]
async def process_event(self, event: Dict) -> None:
await self._event_queue.put(event)
async def run(self) -> None:
while True:
event = await self._event_queue.get()
for rule in self.rules:
if rule.check_trigger(event):
await self._execute_actions(rule.actions)
rule.last_triggered = datetime.now()
async def _execute_actions(self, actions: List[Dict]) -> None:
for action in actions:
action_type = action.get("type")
if action_type == "scene":
self.scene_manager.activate_scene(action.get("scene"))
elif action_type == "device":
device = self.scene_manager.devices.get(action.get("device_id"))
if device:
for key, value in action.get("state", {}).items():
if key == "power":
if value:
device.turn_on()
else:
device.turn_off()52.4 边缘计算
52.4.1 数据处理
python
from dataclasses import dataclass
from typing import List, Dict, Optional, Callable
from datetime import datetime, timedelta
import statistics
@dataclass
class DataWindow:
readings: List[SensorReading]
window_size: int
start_time: datetime
end_time: datetime
class EdgeProcessor:
def __init__(self, window_size: int = 100):
self.window_size = window_size
self.windows: Dict[str, List[SensorReading]] = {}
self.rules: List[Dict] = []
self._callbacks: List[Callable] = []
def process(self, reading: SensorReading) -> Optional[Dict]:
device_id = reading.device_id
if device_id not in self.windows:
self.windows[device_id] = []
self.windows[device_id].append(reading)
if len(self.windows[device_id]) > self.window_size:
self.windows[device_id].pop(0)
result = self._analyze_window(device_id)
for rule in self.rules:
if self._check_rule(rule, reading, result):
self._trigger_alert(rule, reading)
return result
def _analyze_window(self, device_id: str) -> Dict:
readings = self.windows.get(device_id, [])
if not readings:
return {}
values = [r.value for r in readings if isinstance(r.value, (int, float))]
if not values:
return {"count": len(readings)}
return {
"count": len(values),
"min": min(values),
"max": max(values),
"avg": statistics.mean(values),
"median": statistics.median(values),
"std_dev": statistics.stdev(values) if len(values) > 1 else 0,
"latest": values[-1],
"trend": self._calculate_trend(values)
}
def _calculate_trend(self, values: List[float]) -> str:
if len(values) < 2:
return "stable"
recent = values[-10:] if len(values) >= 10 else values
older = values[:-10] if len(values) >= 20 else values[:len(values)//2]
if not older:
return "stable"
recent_avg = statistics.mean(recent)
older_avg = statistics.mean(older)
change = (recent_avg - older_avg) / older_avg * 100
if change > 5:
return "increasing"
elif change < -5:
return "decreasing"
return "stable"
def add_rule(
self,
name: str,
condition: str,
threshold: float,
action: Callable
) -> None:
self.rules.append({
"name": name,
"condition": condition,
"threshold": threshold,
"action": action
})
def _check_rule(
self,
rule: Dict,
reading: SensorReading,
analysis: Dict
) -> bool:
condition = rule["condition"]
threshold = rule["threshold"]
if condition == "above":
return reading.value > threshold
elif condition == "below":
return reading.value < threshold
elif condition == "std_dev_high":
return analysis.get("std_dev", 0) > threshold
return False
def _trigger_alert(self, rule: Dict, reading: SensorReading) -> None:
alert = {
"rule": rule["name"],
"device_id": reading.device_id,
"value": reading.value,
"timestamp": datetime.now()
}
for callback in self._callbacks:
try:
callback(alert)
except Exception:
pass
def add_alert_callback(self, callback: Callable) -> None:
self._callbacks.append(callback)
class DataFilter:
def __init__(self):
self.filters: List[Callable] = []
def add_filter(self, filter_func: Callable) -> None:
self.filters.append(filter_func)
def apply(self, reading: SensorReading) -> Optional[SensorReading]:
for filter_func in self.filters:
if not filter_func(reading):
return None
return reading
@staticmethod
def range_filter(min_val: float, max_val: float) -> Callable:
def filter_func(reading: SensorReading) -> bool:
if isinstance(reading.value, (int, float)):
return min_val <= reading.value <= max_val
return True
return filter_func
@staticmethod
def rate_limit(min_interval: float) -> Callable:
last_time: Dict[str, datetime] = {}
def filter_func(reading: SensorReading) -> bool:
device_id = reading.device_id
now = datetime.now()
if device_id in last_time:
elapsed = (now - last_time[device_id]).total_seconds()
if elapsed < min_interval:
return False
last_time[device_id] = now
return True
return filter_func
@staticmethod
def outlier_filter(std_dev_threshold: float = 3.0) -> Callable:
values: Dict[str, List[float]] = {}
def filter_func(reading: SensorReading) -> bool:
if not isinstance(reading.value, (int, float)):
return True
device_id = reading.device_id
if device_id not in values:
values[device_id] = []
history = values[device_id]
if len(history) < 3:
history.append(reading.value)
return True
mean = statistics.mean(history)
std = statistics.stdev(history)
if std == 0:
history.append(reading.value)
return True
z_score = abs(reading.value - mean) / std
if z_score > std_dev_threshold:
return False
history.append(reading.value)
if len(history) > 100:
history.pop(0)
return True
return filter_func52.5 物联网安全
52.5.1 设备认证
python
from dataclasses import dataclass
from typing import Dict, Optional
from datetime import datetime, timedelta
import hashlib
import secrets
import hmac
@dataclass
class DeviceCredentials:
device_id: str
api_key: str
secret_key: str
created_at: datetime
expires_at: Optional[datetime] = None
permissions: list = None
def __post_init__(self):
if self.permissions is None:
self.permissions = ["read", "write"]
class AuthenticationManager:
def __init__(self):
self.credentials: Dict[str, DeviceCredentials] = {}
self._sessions: Dict[str, dict] = {}
def register_device(
self,
device_id: str,
permissions: list = None
) -> DeviceCredentials:
api_key = secrets.token_urlsafe(32)
secret_key = secrets.token_urlsafe(64)
credentials = DeviceCredentials(
device_id=device_id,
api_key=api_key,
secret_key=secret_key,
created_at=datetime.now(),
permissions=permissions or ["read"]
)
self.credentials[device_id] = credentials
return credentials
def authenticate(
self,
device_id: str,
api_key: str,
signature: str,
timestamp: str,
payload: str = ""
) -> bool:
if device_id not in self.credentials:
return False
cred = self.credentials[device_id]
if cred.api_key != api_key:
return False
try:
ts = datetime.fromisoformat(timestamp)
if abs((datetime.now() - ts).total_seconds()) > 300:
return False
except ValueError:
return False
expected_signature = self._generate_signature(
cred.secret_key,
timestamp,
payload
)
return hmac.compare_digest(signature, expected_signature)
def _generate_signature(
self,
secret_key: str,
timestamp: str,
payload: str
) -> str:
message = f"{timestamp}:{payload}"
return hmac.new(
secret_key.encode(),
message.encode(),
hashlib.sha256
).hexdigest()
def create_session(
self,
device_id: str,
duration_hours: int = 24
) -> str:
session_token = secrets.token_urlsafe(48)
self._sessions[session_token] = {
"device_id": device_id,
"created_at": datetime.now(),
"expires_at": datetime.now() + timedelta(hours=duration_hours)
}
return session_token
def validate_session(self, session_token: str) -> Optional[str]:
if session_token not in self._sessions:
return None
session = self._sessions[session_token]
if datetime.now() > session["expires_at"]:
del self._sessions[session_token]
return None
return session["device_id"]
def revoke_session(self, session_token: str) -> None:
self._sessions.pop(session_token, None)
class SecureCommunicator:
def __init__(self, secret_key: str):
self.secret_key = secret_key.encode()
def encrypt_message(self, plaintext: str) -> dict:
from cryptography.fernet import Fernet
import base64
key = base64.urlsafe_b64encode(
hashlib.sha256(self.secret_key).digest()
)
fernet = Fernet(key)
ciphertext = fernet.encrypt(plaintext.encode())
return {
"ciphertext": ciphertext.decode(),
"algorithm": "AES-256-Fernet"
}
def decrypt_message(self, ciphertext: str) -> str:
from cryptography.fernet import Fernet
import base64
key = base64.urlsafe_b64encode(
hashlib.sha256(self.secret_key).digest()
)
fernet = Fernet(key)
return fernet.decrypt(ciphertext.encode()).decode()52.5.2 安全最佳实践
python
class SecurityBestPractices:
@staticmethod
def validate_device_input(data: dict) -> bool:
required_fields = ["device_id", "timestamp", "signature"]
for field in required_fields:
if field not in data:
return False
if len(data.get("device_id", "")) > 64:
return False
if len(data.get("signature", "")) != 64:
return False
return True
@staticmethod
def sanitize_topic(topic: str) -> str:
invalid_chars = ['\x00', '\n', '\r']
for char in invalid_chars:
topic = topic.replace(char, '')
if '..' in topic or topic.startswith('/'):
raise ValueError("Invalid topic format")
return topic
@staticmethod
def rate_limit_check(
device_id: str,
max_requests: int = 100,
window_seconds: int = 60
) -> bool:
import time
if not hasattr(SecurityBestPractices, '_rate_limits'):
SecurityBestPractices._rate_limits = {}
now = time.time()
key = device_id
if key not in SecurityBestPractices._rate_limits:
SecurityBestPractices._rate_limits[key] = []
requests = SecurityBestPractices._rate_limits[key]
requests[:] = [t for t in requests if now - t < window_seconds]
if len(requests) >= max_requests:
return False
requests.append(now)
return True学术注记:物联网安全面临独特挑战,包括设备资源受限、网络环境复杂、攻击面广泛等问题。OWASP IoT Top 10列出了最常见的物联网安全风险:弱密码、不安全的网络服务、不安全的生态系统接口等。在设计物联网系统时,应遵循"安全即设计"(Security by Design)原则。
52.6 本章小结
本章详细介绍了Python物联网开发的核心概念和实践:
- 物联网架构:设备注册、数据存储、设备管理
- MQTT协议:消息发布订阅、QoS级别、主题匹配
- 智能家居:设备控制、场景管理、自动化规则
- 边缘计算:数据处理、数据过滤、异常检测
- 物联网安全:设备认证、加密通信、安全最佳实践
52.6.1 知识图谱
物联网开发
├── 架构设计
│ ├── 设备层(传感器、执行器)
│ ├── 网络层(MQTT、CoAP、HTTP)
│ ├── 边缘层(数据处理、过滤)
│ └── 云平台(存储、分析、可视化)
├── 通信协议
│ ├── MQTT(发布订阅、QoS)
│ ├── CoAP(REST风格、UDP)
│ ├── HTTP/REST(通用、易用)
│ └── WebSocket(实时双向)
├── 数据处理
│ ├── 数据采集(传感器模拟)
│ ├── 数据清洗(过滤、验证)
│ ├── 数据存储(时序数据库)
│ └── 数据分析(统计、趋势)
├── 智能控制
│ ├── 设备管理(注册、状态)
│ ├── 场景联动(预设、触发)
│ ├── 自动化规则(条件、动作)
│ └── 远程控制(API、界面)
└── 安全机制
├── 设备认证(API Key、证书)
├── 数据加密(TLS、AES)
├── 访问控制(权限、角色)
└── 安全审计(日志、监控)52.6.2 最佳实践清单
| 场景 | 推荐做法 | 避免做法 |
|---|---|---|
| 设备标识 | 使用UUID或唯一序列号 | 使用易猜测的ID |
| 通信协议 | MQTT用于实时数据,HTTP用于配置 | 所有场景都用HTTP轮询 |
| QoS选择 | QoS 1用于重要数据,QoS 0用于遥测 | 所有消息都用QoS 2 |
| 数据格式 | JSON或Protocol Buffers | 自定义二进制格式 |
| 安全认证 | 双向TLS + API Key | 仅使用密码 |
| 边缘处理 | 本地过滤、聚合后上传 | 原始数据全部上传 |
| 错误处理 | 指数退避重连 | 无限重试 |
| 固件更新 | OTA安全签名验证 | 直接执行下载的代码 |
52.6.3 技术选型指南
| 需求场景 | 推荐方案 | 备选方案 |
|---|---|---|
| 轻量级协议 | MQTT | CoAP |
| 实时双向 | WebSocket | MQTT |
| 设备数量大 | MQTT Broker集群 | 云IoT平台 |
| 边缘计算 | EdgeX Foundry | AWS Greengrass |
| 时序数据 | InfluxDB | TimescaleDB |
| 可视化 | Grafana | 自定义Dashboard |
| 设备管理 | Home Assistant | 自研平台 |
练习题
基础题
实现一个简单的温度传感器模拟器,每秒生成一个随机温度值(15-30°C),并通过MQTT发布。
创建一个设备注册表,支持设备的添加、删除、查询和状态更新。
实现一个简单的场景管理器,支持创建"回家模式"和"离家模式"场景。
进阶题
实现一个完整的MQTT客户端,支持QoS 1和QoS 2,包括消息确认和重传机制。
开发一个边缘计算网关,实现数据过滤、聚合和异常检测功能。
实现一个物联网安全系统,包括设备认证、消息加密和访问控制。
项目实践
- 智能家居控制中心:构建一个完整的智能家居控制系统,要求:
- 支持多种设备类型(灯光、空调、窗帘、传感器)
- 实现场景联动和自动化规则
- 提供Web界面和API接口
- 实现设备认证和安全通信
- 支持历史数据存储和可视化
思考题
MQTT的QoS 0、1、2三种级别分别适用于什么场景?在选择QoS级别时需要考虑哪些因素?
边缘计算与云计算相比有哪些优势和劣势?在什么场景下应该优先选择边缘计算?
物联网设备的安全挑战与传统IT系统有何不同?如何设计一个安全的物联网架构?
扩展阅读
52.7.1 协议规范
- MQTT官方文档 (https://mqtt.org/) — MQTT协议的官方规范和实现指南
- CoAP RFC 7252 (https://tools.ietf.org/html/rfc7252) — 受限应用协议的完整规范
- WebSocket RFC 6455 (https://tools.ietf.org/html/rfc6455) — WebSocket协议标准
- OMA LwM2M (https://openmobilealliance.org/iot/lightweight-m2m-lwm2m/) — 轻量级机器到机器协议
52.7.2 平台与框架
- Home Assistant文档 (https://www.home-assistant.io/docs/) — 开源智能家居平台
- EdgeX Foundry (https://www.edgexfoundry.org/) — 边缘计算开源框架
- AWS IoT Core (https://aws.amazon.com/iot-core/) — 亚马逊物联网平台
- Azure IoT Hub (https://azure.microsoft.com/services/iot-hub/) — 微软物联网中心
52.7.3 嵌入式开发
- MicroPython文档 (https://docs.micropython.org/) — 微控制器上的Python实现
- CircuitPython (https://circuitpython.org/) — Adafruit的Python变体
- ESP32开发指南 (https://docs.espressif.com/projects/esp-idf/) — ESP32官方文档
- Raspberry Pi Pico (https://www.raspberrypi.com/documentation/microcontrollers/) — 树莓派Pico文档
52.7.4 安全标准
- OWASP IoT Top 10 (https://owasp.org/www-project-internet-of-things/) — 物联网安全风险清单
- IoT Security Foundation (https://www.iotsecurityfoundation.org/) — 物联网安全最佳实践
- NIST IoT Cybersecurity (https://www.nist.gov/programs-projects/iot-cybersecurity) — 美国国家标准与技术研究院指南
- ETSI EN 303 645 (https://www.etsi.org/deliver/etsi_en/303600_303699/303645/) — 欧洲消费类IoT安全标准
52.7.5 进阶书籍
- 《物联网系统设计》 — 系统架构与实现方法
- 《MQTT Essentials》 — MQTT协议深度解析
- 《Building the Internet of Things》 — 物联网企业级应用开发
- 《Practical IoT Hacking》 — 物联网安全测试指南
下一章:第53章 3D图形编程