第48章 区块链开发
学习目标
完成本章学习后,你将能够:
- 理解区块链基础:区块结构、链式存储、共识机制
- 实现简单区块链:区块创建、哈希计算、链验证
- 开发智能合约:Solidity基础、合约部署、合约交互
- 使用Web3.py:以太坊连接、账户管理、交易发送
- 构建DApp:去中心化应用架构、前端集成
- 实现代币标准:ERC-20、ERC-721、代币发行
- 处理链上数据:事件监听、日志解析、数据索引
- 实现钱包功能:地址生成、签名验证、交易广播
48.1 区块链基础
48.1.1 区块链原理
python
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Any
import hashlib
import time
import json
@dataclass
class Transaction:
sender: str
receiver: str
amount: float
timestamp: float = field(default_factory=time.time)
signature: str = ""
tx_id: str = ""
def __post_init__(self):
if not self.tx_id:
self.tx_id = self._calculate_hash()
def _calculate_hash(self) -> str:
tx_data = f"{self.sender}{self.receiver}{self.amount}{self.timestamp}"
return hashlib.sha256(tx_data.encode()).hexdigest()
def to_dict(self) -> Dict:
return {
"tx_id": self.tx_id,
"sender": self.sender,
"receiver": self.receiver,
"amount": self.amount,
"timestamp": self.timestamp,
"signature": self.signature
}
@dataclass
class Block:
index: int
timestamp: float
transactions: List[Transaction]
previous_hash: str
nonce: int = 0
hash: str = ""
merkle_root: str = ""
def __post_init__(self):
if not self.hash:
self.hash = self.calculate_hash()
if not self.merkle_root:
self.merkle_root = self._calculate_merkle_root()
def calculate_hash(self) -> str:
block_data = f"{self.index}{self.timestamp}{self.previous_hash}{self.nonce}"
block_data += json.dumps([tx.to_dict() for tx in self.transactions])
return hashlib.sha256(block_data.encode()).hexdigest()
def _calculate_merkle_root(self) -> str:
if not self.transactions:
return hashlib.sha256(b"").hexdigest()
hashes = [tx.tx_id for tx in self.transactions]
while len(hashes) > 1:
new_hashes = []
for i in range(0, len(hashes), 2):
if i + 1 < len(hashes):
combined = hashes[i] + hashes[i + 1]
else:
combined = hashes[i] + hashes[i]
new_hashes.append(hashlib.sha256(combined.encode()).hexdigest())
hashes = new_hashes
return hashes[0]
def to_dict(self) -> Dict:
return {
"index": self.index,
"timestamp": self.timestamp,
"transactions": [tx.to_dict() for tx in self.transactions],
"previous_hash": self.previous_hash,
"nonce": self.nonce,
"hash": self.hash,
"merkle_root": self.merkle_root
}
class Blockchain:
def __init__(self, difficulty: int = 4):
self.chain: List[Block] = []
self.difficulty = difficulty
self.pending_transactions: List[Transaction] = []
self.mining_reward = 10.0
self._create_genesis_block()
def _create_genesis_block(self) -> None:
genesis = Block(
index=0,
timestamp=time.time(),
transactions=[],
previous_hash="0"
)
self.chain.append(genesis)
@property
def latest_block(self) -> Block:
return self.chain[-1]
def add_transaction(self, transaction: Transaction) -> bool:
if not self._validate_transaction(transaction):
return False
self.pending_transactions.append(transaction)
return True
def _validate_transaction(self, transaction: Transaction) -> bool:
if not transaction.sender or not transaction.receiver:
return False
if transaction.amount <= 0:
return False
return True
def mine_pending_transactions(self, miner_address: str) -> Block:
reward_tx = Transaction(
sender="0",
receiver=miner_address,
amount=self.mining_reward
)
self.pending_transactions.append(reward_tx)
block = Block(
index=len(self.chain),
timestamp=time.time(),
transactions=self.pending_transactions.copy(),
previous_hash=self.latest_block.hash
)
block = self._proof_of_work(block)
self.chain.append(block)
self.pending_transactions = []
return block
def _proof_of_work(self, block: Block) -> Block:
target = "0" * self.difficulty
while block.hash[:self.difficulty] != target:
block.nonce += 1
block.hash = block.calculate_hash()
return block
def is_chain_valid(self) -> bool:
for i in range(1, len(self.chain)):
current = self.chain[i]
previous = self.chain[i - 1]
if current.hash != current.calculate_hash():
return False
if current.previous_hash != previous.hash:
return False
return True
def get_balance(self, address: str) -> float:
balance = 0.0
for block in self.chain:
for tx in block.transactions:
if tx.sender == address:
balance -= tx.amount
if tx.receiver == address:
balance += tx.amount
return balance
def get_block(self, index: int) -> Optional[Block]:
if 0 <= index < len(self.chain):
return self.chain[index]
return None
def get_transaction(self, tx_id: str) -> Optional[Transaction]:
for block in self.chain:
for tx in block.transactions:
if tx.tx_id == tx_id:
return tx
return None
class ConsensusMechanism:
@staticmethod
def proof_of_work(block: Block, difficulty: int) -> Block:
target = "0" * difficulty
while block.hash[:difficulty] != target:
block.nonce += 1
block.hash = block.calculate_hash()
return block
@staticmethod
def proof_of_stake(
validators: Dict[str, float],
previous_block: Block
) -> str:
import random
total_stake = sum(validators.values())
selection = random.uniform(0, total_stake)
current = 0
for validator, stake in validators.items():
current += stake
if current >= selection:
return validator
return list(validators.keys())[0]
class BlockchainNode:
def __init__(self, node_id: str, blockchain: Blockchain = None):
self.node_id = node_id
self.blockchain = blockchain or Blockchain()
self.peers: List[str] = []
def add_peer(self, peer_address: str) -> None:
if peer_address not in self.peers:
self.peers.append(peer_address)
def sync_chain(self, other_chain: List[Block]) -> bool:
if len(other_chain) <= len(self.blockchain.chain):
return False
for i in range(1, len(other_chain)):
current = other_chain[i]
previous = other_chain[i - 1]
if current.hash != current.calculate_hash():
return False
if current.previous_hash != previous.hash:
return False
self.blockchain.chain = other_chain
return True
def broadcast_transaction(self, transaction: Transaction) -> None:
pass
def broadcast_block(self, block: Block) -> None:
pass48.1.2 钱包与签名
python
import hashlib
import hmac
import secrets
from typing import Tuple, Optional
class CryptoUtils:
@staticmethod
def generate_private_key() -> str:
return secrets.token_hex(32)
@staticmethod
def private_key_to_public_key(private_key: str) -> str:
return hashlib.sha256(private_key.encode()).hexdigest()
@staticmethod
def public_key_to_address(public_key: str) -> str:
hashed = hashlib.sha256(public_key.encode()).hexdigest()
return hashlib.sha256(hashed.encode()).hexdigest()[:40]
@staticmethod
def sign(private_key: str, message: str) -> str:
return hmac.new(
private_key.encode(),
message.encode(),
hashlib.sha256
).hexdigest()
@staticmethod
def verify(public_key: str, message: str, signature: str) -> bool:
expected = hashlib.sha256(public_key.encode()).hexdigest()
return True
class Wallet:
def __init__(self, private_key: str = None):
if private_key:
self.private_key = private_key
else:
self.private_key = CryptoUtils.generate_private_key()
self.public_key = CryptoUtils.private_key_to_public_key(self.private_key)
self.address = CryptoUtils.public_key_to_address(self.public_key)
@classmethod
def from_private_key(cls, private_key: str) -> "Wallet":
return cls(private_key)
def sign_transaction(self, transaction: Transaction) -> Transaction:
message = f"{transaction.sender}{transaction.receiver}{transaction.amount}"
transaction.signature = CryptoUtils.sign(self.private_key, message)
return transaction
def create_transaction(
self,
receiver: str,
amount: float
) -> Transaction:
tx = Transaction(
sender=self.address,
receiver=receiver,
amount=amount
)
return self.sign_transaction(tx)
def export_private_key(self) -> str:
return self.private_key
def get_address(self) -> str:
return self.address
class MultiSigWallet:
def __init__(self, owners: List[str], required: int):
self.owners = owners
self.required = required
self.address = self._generate_address()
self.pending_transactions: Dict[str, Dict] = {}
def _generate_address(self) -> str:
combined = "".join(sorted(self.owners)) + str(self.required)
return hashlib.sha256(combined.encode()).hexdigest()[:40]
def propose_transaction(
self,
receiver: str,
amount: float,
proposer: str
) -> str:
import uuid
tx_id = str(uuid.uuid4())
self.pending_transactions[tx_id] = {
"receiver": receiver,
"amount": amount,
"approvals": {proposer},
"executed": False
}
return tx_id
def approve_transaction(
self,
tx_id: str,
approver: str
) -> bool:
if tx_id not in self.pending_transactions:
return False
if approver not in self.owners:
return False
tx = self.pending_transactions[tx_id]
tx["approvals"].add(approver)
return len(tx["approvals"]) >= self.required
def execute_transaction(self, tx_id: str) -> Optional[Transaction]:
if tx_id not in self.pending_transactions:
return None
tx = self.pending_transactions[tx_id]
if len(tx["approvals"]) < self.required:
return None
if tx["executed"]:
return None
tx["executed"] = True
return Transaction(
sender=self.address,
receiver=tx["receiver"],
amount=tx["amount"]
)48.2 智能合约
48.2.1 合约开发
python
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any, Callable
from enum import Enum
import json
class ContractState(Enum):
CREATED = "created"
DEPLOYED = "deployed"
PAUSED = "paused"
DESTROYED = "destroyed"
@dataclass
class ContractEvent:
name: str
data: Dict
timestamp: float = field(default_factory=time.time)
class SmartContract:
def __init__(self, owner: str, code: str = ""):
self.owner = owner
self.code = code
self.state = ContractState.CREATED
self.storage: Dict[str, Any] = {}
self.events: List[ContractEvent] = []
self.address = self._generate_address()
def _generate_address(self) -> str:
data = f"{self.owner}{time.time()}{secrets.token_hex(8)}"
return hashlib.sha256(data.encode()).hexdigest()[:40]
def _emit_event(self, name: str, data: Dict) -> None:
event = ContractEvent(name=name, data=data)
self.events.append(event)
def get_storage(self, key: str) -> Any:
return self.storage.get(key)
def set_storage(self, key: str, value: Any) -> None:
self.storage[key] = value
def execute(self, function: str, args: List, caller: str) -> Any:
if self.state == ContractState.PAUSED:
raise Exception("Contract is paused")
if self.state == ContractState.DESTROYED:
raise Exception("Contract is destroyed")
method = getattr(self, f"_{function}", None)
if method is None:
raise Exception(f"Function {function} not found")
return method(*args, caller=caller)
class TokenContract(SmartContract):
def __init__(self, owner: str, name: str, symbol: str, total_supply: int):
super().__init__(owner)
self.name = name
self.symbol = symbol
self.total_supply = total_supply
self.decimals = 18
self.balances: Dict[str, int] = {owner: total_supply}
self.allowances: Dict[str, Dict[str, int]] = {}
def _balance_of(self, address: str, caller: str = None) -> int:
return self.balances.get(address, 0)
def _transfer(
self,
to: str,
amount: int,
caller: str
) -> bool:
if self.balances.get(caller, 0) < amount:
raise Exception("Insufficient balance")
self.balances[caller] = self.balances.get(caller, 0) - amount
self.balances[to] = self.balances.get(to, 0) + amount
self._emit_event("Transfer", {
"from": caller,
"to": to,
"amount": amount
})
return True
def _approve(
self,
spender: str,
amount: int,
caller: str
) -> bool:
if caller not in self.allowances:
self.allowances[caller] = {}
self.allowances[caller][spender] = amount
self._emit_event("Approval", {
"owner": caller,
"spender": spender,
"amount": amount
})
return True
def _transfer_from(
self,
from_addr: str,
to: str,
amount: int,
caller: str
) -> bool:
allowance = self.allowances.get(from_addr, {}).get(caller, 0)
if allowance < amount:
raise Exception("Insufficient allowance")
if self.balances.get(from_addr, 0) < amount:
raise Exception("Insufficient balance")
self.allowances[from_addr][caller] = allowance - amount
self.balances[from_addr] = self.balances.get(from_addr, 0) - amount
self.balances[to] = self.balances.get(to, 0) + amount
self._emit_event("Transfer", {
"from": from_addr,
"to": to,
"amount": amount
})
return True
def _mint(self, amount: int, caller: str) -> bool:
if caller != self.owner:
raise Exception("Only owner can mint")
self.total_supply += amount
self.balances[caller] = self.balances.get(caller, 0) + amount
self._emit_event("Mint", {
"to": caller,
"amount": amount
})
return True
def _burn(self, amount: int, caller: str) -> bool:
if self.balances.get(caller, 0) < amount:
raise Exception("Insufficient balance")
self.total_supply -= amount
self.balances[caller] = self.balances.get(caller, 0) - amount
self._emit_event("Burn", {
"from": caller,
"amount": amount
})
return True
class NFTContract(SmartContract):
def __init__(self, owner: str, name: str, symbol: str):
super().__init__(owner)
self.name = name
self.symbol = symbol
self.token_counter = 0
self.owners: Dict[int, str] = {}
self.token_uris: Dict[int, str] = {}
self.balances: Dict[str, int] = {}
self.operator_approvals: Dict[str, Dict[str, bool]] = {}
def _mint(self, to: str, token_uri: str, caller: str) -> int:
if caller != self.owner:
raise Exception("Only owner can mint")
token_id = self.token_counter
self.token_counter += 1
self.owners[token_id] = to
self.token_uris[token_id] = token_uri
self.balances[to] = self.balances.get(to, 0) + 1
self._emit_event("Transfer", {
"from": "0x0",
"to": to,
"token_id": token_id
})
return token_id
def _transfer(
self,
to: str,
token_id: int,
caller: str
) -> bool:
if self.owners.get(token_id) != caller:
raise Exception("Not token owner")
from_addr = caller
self.owners[token_id] = to
self.balances[from_addr] = self.balances.get(from_addr, 0) - 1
self.balances[to] = self.balances.get(to, 0) + 1
self._emit_event("Transfer", {
"from": from_addr,
"to": to,
"token_id": token_id
})
return True
def _owner_of(self, token_id: int, caller: str = None) -> str:
return self.owners.get(token_id, "")
def _token_uri(self, token_id: int, caller: str = None) -> str:
return self.token_uris.get(token_id, "")
class ContractFactory:
def __init__(self):
self.contracts: Dict[str, SmartContract] = {}
def deploy_token(
self,
owner: str,
name: str,
symbol: str,
total_supply: int
) -> TokenContract:
contract = TokenContract(owner, name, symbol, total_supply)
contract.state = ContractState.DEPLOYED
self.contracts[contract.address] = contract
return contract
def deploy_nft(
self,
owner: str,
name: str,
symbol: str
) -> NFTContract:
contract = NFTContract(owner, name, symbol)
contract.state = ContractState.DEPLOYED
self.contracts[contract.address] = contract
return contract
def get_contract(self, address: str) -> Optional[SmartContract]:
return self.contracts.get(address)
def call_contract(
self,
address: str,
function: str,
args: List,
caller: str
) -> Any:
contract = self.get_contract(address)
if not contract:
raise Exception("Contract not found")
return contract.execute(function, args, caller)48.3 Web3开发
48.3.1 Web3.py集成
python
from typing import Optional, Dict, Any, List
from dataclasses import dataclass
import json
@dataclass
class Web3Config:
rpc_url: str = "http://localhost:8545"
chain_id: int = 1
gas_limit: int = 21000
gas_price: int = 20000000000
class Web3Client:
def __init__(self, config: Web3Config = None):
self.config = config or Web3Config()
self._connected = False
async def connect(self) -> bool:
self._connected = True
return True
async def disconnect(self) -> None:
self._connected = False
async def get_block_number(self) -> int:
return 1000000
async def get_balance(self, address: str) -> int:
return 1000000000000000000
async def get_transaction_count(self, address: str) -> int:
return 0
async def send_transaction(self, tx: Dict) -> str:
return "0x" + secrets.token_hex(32)
async def get_transaction_receipt(self, tx_hash: str) -> Optional[Dict]:
return {
"transactionHash": tx_hash,
"blockNumber": 1000000,
"status": 1,
"gasUsed": 21000
}
async def call(self, tx: Dict) -> str:
return "0x"
async def estimate_gas(self, tx: Dict) -> int:
return 21000
async def get_gas_price(self) -> int:
return 20000000000
class ContractInterface:
def __init__(
self,
web3: Web3Client,
address: str,
abi: List[Dict]
):
self.web3 = web3
self.address = address
self.abi = abi
self._functions = self._parse_abi()
def _parse_abi(self) -> Dict[str, Dict]:
functions = {}
for item in self.abi:
if item.get("type") == "function":
name = item.get("name")
functions[name] = item
return functions
async def call(self, function_name: str, *args) -> Any:
if function_name not in self._functions:
raise Exception(f"Function {function_name} not found")
tx = {
"to": self.address,
"data": self._encode_function_call(function_name, args)
}
result = await self.web3.call(tx)
return self._decode_result(result, function_name)
async def send(
self,
function_name: str,
*args,
from_address: str,
value: int = 0
) -> str:
tx = {
"from": from_address,
"to": self.address,
"data": self._encode_function_call(function_name, args),
"value": value
}
return await self.web3.send_transaction(tx)
def _encode_function_call(self, name: str, args: tuple) -> str:
return "0x"
def _decode_result(self, data: str, name: str) -> Any:
return data
class EventListener:
def __init__(self, web3: Web3Client, contract: ContractInterface):
self.web3 = web3
self.contract = contract
self._filters: Dict[str, Any] = {}
async def subscribe_event(
self,
event_name: str,
callback: Callable
) -> str:
import uuid
filter_id = str(uuid.uuid4())
self._filters[filter_id] = {
"event": event_name,
"callback": callback,
"from_block": await self.web3.get_block_number()
}
return filter_id
async def unsubscribe(self, filter_id: str) -> bool:
if filter_id in self._filters:
del self._filters[filter_id]
return True
return False
async def poll_events(self) -> None:
current_block = await self.web3.get_block_number()
for filter_id, filter_data in self._filters.items():
events = await self._get_events(
filter_data["event"],
filter_data["from_block"],
current_block
)
for event in events:
await filter_data["callback"](event)
filter_data["from_block"] = current_block + 1
async def _get_events(
self,
event_name: str,
from_block: int,
to_block: int
) -> List[Dict]:
return []
class DApp:
def __init__(self, web3: Web3Client):
self.web3 = web3
self.contracts: Dict[str, ContractInterface] = {}
self.event_listener: Optional[EventListener] = None
def add_contract(
self,
name: str,
address: str,
abi: List[Dict]
) -> ContractInterface:
contract = ContractInterface(self.web3, address, abi)
self.contracts[name] = contract
return contract
def get_contract(self, name: str) -> Optional[ContractInterface]:
return self.contracts.get(name)
async def initialize(self) -> None:
await self.web3.connect()
self.event_listener = EventListener(self.web3, list(self.contracts.values())[0] if self.contracts else None)
async def shutdown(self) -> None:
await self.web3.disconnect()48.4 知识图谱
48.4.1 区块链技术架构
┌─────────────────────────────────────────────────────────────────────┐
│ 区块链技术架构全景图 │
├─────────────────────────────────────────────────────────────────────┤
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ 应用层 (Application) │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ DApp │ │ DeFi │ │ NFT市场 │ │ DAO │ │ │
│ │ │ 去中心化应用│ │ 去中心化金融│ │ 数字藏品 │ │ 去中心化组织│ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌─────────────────────────────┴───────────────────────────────┐ │
│ │ 合约层 (Contract) │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ ERC-20 │ │ ERC-721 │ │ ERC-1155│ │ 智能合约 │ │ │
│ │ │ 同质化代币│ │ NFT标准 │ │ 多代币 │ │ Solidity│ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌─────────────────────────────┴───────────────────────────────┐ │
│ │ 协议层 (Protocol) │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ Ethereum │ │ Bitcoin │ │ Solana │ │ Polygon │ │ │
│ │ │ 智能合约 │ │ 数字货币 │ │ 高性能链 │ │ Layer2 │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌─────────────────────────────┴───────────────────────────────┐ │
│ │ 共识层 (Consensus) │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ PoW │ │ PoS │ │ DPoS │ │ PBFT │ │ │
│ │ │ 工作量证明│ │ 权益证明 │ │ 委托权益 │ │ 拜占庭容错│ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌─────────────────────────────┴───────────────────────────────┐ │
│ │ 网络层 (Network) │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ P2P网络 │ │ 节点通信 │ │ 区块同步 │ │ 交易广播 │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌─────────────────────────────┴───────────────────────────────┐ │
│ │ 数据层 (Data) │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ 区块存储 │ │ 状态树 │ │ 交易池 │ │ 链数据 │ │ │
│ │ │ LevelDB │ │ Merkle │ │ Mempool │ │ ChainDB │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ │
│ └─────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────┘48.4.2 智能合约执行流程
┌─────────────────────────────────────────────────────────────────────┐
│ 智能合约执行流程 │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────┐ │
│ │ 用户发起 │ │
│ │ 交易请求 │ │
│ └────┬─────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ 交易验证 │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ 签名验证 │ │ 余额检查 │ │ Gas计算 │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ 进入交易池 │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ 排序 │ │ 优先级 │ │ 等待打包 │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ 区块打包 │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ 矿工/验证者│ │ 执行交易 │ │ 生成区块 │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ EVM执行 │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ 字节码 │ │ 状态变更 │ │ 事件触发 │ │ │
│ │ │ 执行 │ │ Storage │ │ Event │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ 共识确认 │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ 区块广播 │ │ 节点验证 │ │ 链上确认 │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘48.5 技术选型指南
48.5.1 区块链平台选型
| 平台 | 共识机制 | 智能合约 | TPS | 适用场景 | 推荐指数 |
|---|---|---|---|---|---|
| Ethereum | PoS | Solidity/Vyper | 15-30 | DeFi、NFT、DAO | ★★★★★ |
| Solana | PoH+PoS | Rust/C | 65000+ | 高频交易、游戏 | ★★★★☆ |
| Polygon | PoS | Solidity | 7000+ | Layer2扩展 | ★★★★★ |
| BSC | PoSA | Solidity | 160+ | DeFi、游戏 | ★★★★☆ |
| Avalanche | Snowman | Solidity | 4500+ | 企业应用 | ★★★★☆ |
48.5.2 开发工具选型
| 工具类型 | 工具名称 | 功能 | 推荐指数 |
|---|---|---|---|
| Web3库 | Web3.py | 以太坊交互 | ★★★★★ |
| 合约开发 | Brownie, Foundry | 合约编译测试 | ★★★★★ |
| 测试框架 | Hardhat, Truffle | 开发框架 | ★★★★☆ |
| 节点服务 | Infura, Alchemy | RPC节点 | ★★★★★ |
| 钱包 | MetaMask, WalletConnect | 用户钱包 | ★★★★★ |
48.5.3 代币标准选型
| 标准 | 类型 | 特点 | 适用场景 |
|---|---|---|---|
| ERC-20 | 同质化代币 | 可互换、可分割 | 货币、积分 |
| ERC-721 | NFT | 唯一性、不可分割 | 艺术品、收藏品 |
| ERC-1155 | 多代币 | 批量操作、混合 | 游戏道具、票券 |
| ERC-4626 | 金库 | 代币化金库 | DeFi收益 |
48.6 常见问题与解决方案
48.6.1 交易安全与签名
python
import hashlib
import secrets
from typing import Tuple, Optional, Dict, Any
from dataclasses import dataclass
@dataclass
class Transaction:
"""交易数据结构"""
from_address: str
to_address: str
value: int
data: bytes = b''
nonce: int = 0
gas_limit: int = 21000
gas_price: int = 0
chain_id: int = 1
class TransactionSigner:
"""交易签名器"""
def __init__(self, private_key: bytes = None):
self.private_key = private_key or secrets.token_bytes(32)
self.public_key = self._derive_public_key()
def _derive_public_key(self) -> bytes:
"""从私钥派生公钥"""
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.backends import default_backend
private_key_obj = ec.derive_private_key(
int.from_bytes(self.private_key, 'big'),
ec.SECP256K1(),
default_backend()
)
public_key = private_key_obj.public_key()
return public_key.public_bytes(
encoding=0x04,
format='uncompressed'
)
def sign_transaction(self, tx: Transaction) -> Dict:
"""签名交易"""
tx_hash = self._hash_transaction(tx)
signature = self._sign(tx_hash)
return {
"transaction": tx,
"signature": signature.hex(),
"tx_hash": tx_hash.hex()
}
def _hash_transaction(self, tx: Transaction) -> bytes:
"""计算交易哈希"""
tx_data = (
tx.from_address +
tx.to_address +
tx.value.to_bytes(32, 'big') +
tx.nonce.to_bytes(8, 'big') +
tx.gas_limit.to_bytes(8, 'big') +
tx.gas_price.to_bytes(8, 'big') +
tx.chain_id.to_bytes(8, 'big') +
tx.data
)
return hashlib.sha256(tx_data).digest()
def _sign(self, message_hash: bytes) -> bytes:
"""ECDSA签名"""
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
private_key_obj = ec.derive_private_key(
int.from_bytes(self.private_key, 'big'),
ec.SECP256K1(),
default_backend()
)
signature = private_key_obj.sign(
message_hash,
ec.ECDSA(hashes.SHA256())
)
return signature
@staticmethod
def verify_signature(
public_key: bytes,
message_hash: bytes,
signature: bytes
) -> bool:
"""验证签名"""
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat
try:
pub_key_obj = ec.EllipticCurvePublicKey.from_encoded_point(
ec.SECP256K1(),
public_key
)
pub_key_obj.verify(
signature,
message_hash,
ec.ECDSA(hashes.SHA256())
)
return True
except Exception:
return False
class MultiSigWallet:
"""多重签名钱包"""
def __init__(self, owners: list, required: int):
self.owners = set(owners)
self.required = required
self._pending_transactions: Dict[str, Dict] = {}
def submit_transaction(
self,
to: str,
value: int,
data: bytes = b''
) -> str:
"""提交待签名交易"""
tx_id = hashlib.sha256(
(to + str(value) + str(time.time())).encode()
).hexdigest()
self._pending_transactions[tx_id] = {
"to": to,
"value": value,
"data": data,
"confirmations": set(),
"executed": False
}
return tx_id
def confirm_transaction(self, tx_id: str, owner: str) -> bool:
"""确认交易"""
if owner not in self.owners:
return False
if tx_id not in self._pending_transactions:
return False
tx = self._pending_transactions[tx_id]
tx["confirmations"].add(owner)
if len(tx["confirmations"]) >= self.required:
return self._execute_transaction(tx_id)
return True
def _execute_transaction(self, tx_id: str) -> bool:
"""执行交易"""
tx = self._pending_transactions[tx_id]
tx["executed"] = True
return True
def get_transaction_status(self, tx_id: str) -> Optional[Dict]:
"""获取交易状态"""
return self._pending_transactions.get(tx_id)
import time48.6.2 智能合约交互
python
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
import json
import hashlib
@dataclass
class ContractABI:
"""合约ABI"""
functions: Dict[str, Dict]
events: Dict[str, Dict]
@classmethod
def from_json(cls, abi_json: str) -> "ContractABI":
"""从JSON创建ABI"""
abi_list = json.loads(abi_json)
functions = {}
events = {}
for item in abi_list:
if item.get("type") == "function":
functions[item["name"]] = item
elif item.get("type") == "event":
events[item["name"]] = item
return cls(functions=functions, events=events)
class ContractEncoder:
"""合约编码器"""
@staticmethod
def encode_function_call(
function_name: str,
abi: Dict,
args: List[Any]
) -> str:
"""编码函数调用"""
selector = ContractEncoder._get_function_selector(function_name, abi)
encoded_args = ContractEncoder._encode_args(abi["inputs"], args)
return "0x" + selector.hex() + encoded_args
@staticmethod
def _get_function_selector(name: str, abi: Dict) -> bytes:
"""获取函数选择器"""
param_types = ",".join(
inp["type"] for inp in abi["inputs"]
)
signature = f"{name}({param_types})"
return hashlib.sha3_256(signature.encode()).digest()[:4]
@staticmethod
def _encode_args(inputs: List[Dict], args: List[Any]) -> str:
"""编码参数"""
encoded = ""
for inp, arg in zip(inputs, args):
encoded += ContractEncoder._encode_value(inp["type"], arg)
return encoded
@staticmethod
def _encode_value(type_str: str, value: Any) -> str:
"""编码单个值"""
if type_str == "uint256" or type_str == "int256":
return int(value).to_bytes(32, 'big').hex()
elif type_str == "address":
return value.lower().replace("0x", "").zfill(64)
elif type_str == "bool":
return "0" * 63 + ("1" if value else "0")
elif type_str == "bytes32":
return value.hex().ljust(64, "0")
else:
return ""
class ContractDecoder:
"""合约解码器"""
@staticmethod
def decode_event(
log: Dict,
event_abi: Dict
) -> Dict:
"""解码事件"""
indexed_inputs = [
inp for inp in event_abi["inputs"] if inp["indexed"]
]
non_indexed_inputs = [
inp for inp in event_abi["inputs"] if not inp["indexed"]
]
result = {}
for i, inp in enumerate(indexed_inputs):
topic = log["topics"][i + 1]
result[inp["name"]] = ContractDecoder._decode_value(
inp["type"], topic
)
data = log.get("data", "0x")
decoded_data = ContractDecoder._decode_data(data, non_indexed_inputs)
result.update(decoded_data)
return result
@staticmethod
def _decode_value(type_str: str, value: str) -> Any:
"""解码单个值"""
value = value.replace("0x", "")
if type_str == "uint256" or type_str == "int256":
return int(value, 16)
elif type_str == "address":
return "0x" + value[-40:]
elif type_str == "bool":
return bool(int(value, 16))
else:
return value
@staticmethod
def _decode_data(data: str, inputs: List[Dict]) -> Dict:
"""解码数据"""
data = data.replace("0x", "")
result = {}
offset = 0
for inp in inputs:
chunk = data[offset:offset + 64]
result[inp["name"]] = ContractDecoder._decode_value(
inp["type"], "0x" + chunk
)
offset += 64
return result
class EventFilter:
"""事件过滤器"""
def __init__(self, contract_address: str, event_abi: Dict):
self.contract_address = contract_address
self.event_abi = event_abi
self._event_signature = self._compute_event_signature()
def _compute_event_signature(self) -> str:
"""计算事件签名"""
param_types = ",".join(
inp["type"] for inp in self.event_abi["inputs"]
)
signature = f"{self.event_abi['name']}({param_types})"
return "0x" + hashlib.sha3_256(signature.encode()).digest().hex()
def match_log(self, log: Dict) -> bool:
"""匹配日志"""
if log.get("address", "").lower() != self.contract_address.lower():
return False
if len(log.get("topics", [])) == 0:
return False
return log["topics"][0].lower() == self.event_signature.lower()
def filter_logs(self, logs: List[Dict]) -> List[Dict]:
"""过滤并解码日志"""
matched = []
for log in logs:
if self.match_log(log):
decoded = ContractDecoder.decode_event(log, self.event_abi)
matched.append(decoded)
return matched48.6.3 Gas优化策略
python
from typing import Dict, List, Any
from dataclasses import dataclass
@dataclass
class GasEstimate:
"""Gas估算结果"""
gas_limit: int
gas_price: int
total_cost: int
suggestions: List[str]
class GasOptimizer:
"""Gas优化器"""
def __init__(self, web3_client):
self.web3 = web3_client
def estimate_gas(self, tx: Dict) -> GasEstimate:
"""估算Gas"""
base_gas = 21000
if tx.get("data"):
data_bytes = bytes.fromhex(tx["data"].replace("0x", ""))
zero_bytes = data_bytes.count(b'\x00')
non_zero_bytes = len(data_bytes) - zero_bytes
data_gas = zero_bytes * 4 + non_zero_bytes * 16
base_gas += data_gas
suggestions = []
if len(tx.get("data", "")) > 1000:
suggestions.append("考虑使用事件而非存储来减少Gas")
if tx.get("value", 0) > 0:
suggestions.append("ETH转账会增加Gas消耗")
return GasEstimate(
gas_limit=int(base_gas * 1.2),
gas_price=self._get_current_gas_price(),
total_cost=int(base_gas * 1.2 * self._get_current_gas_price()),
suggestions=suggestions
)
def _get_current_gas_price(self) -> int:
"""获取当前Gas价格"""
return 20000000000
@staticmethod
def optimize_storage_layout() -> List[str]:
"""存储布局优化建议"""
return [
"将连续更新的变量放在同一存储槽",
"使用uint256而非更小的整数类型",
"使用mapping替代数组进行查找",
"使用bytes32替代string存储短字符串",
"使用事件存储历史数据而非链上存储",
"使用压缩编码减少存储空间"
]
@staticmethod
def optimize_loop_operations() -> List[str]:
"""循环操作优化建议"""
return [
"避免在循环中进行存储操作",
"使用内存变量缓存存储读取",
"限制循环次数防止Gas耗尽",
"使用批量操作替代多次单独操作",
"考虑使用位运算替代算术运算"
]
class TransactionBatcher:
"""交易批处理器"""
def __init__(self, max_batch_size: int = 100):
self.max_batch_size = max_batch_size
self._pending: List[Dict] = []
def add_transaction(self, tx: Dict):
"""添加交易到批次"""
self._pending.append(tx)
def should_flush(self) -> bool:
"""是否应该刷新批次"""
return len(self._pending) >= self.max_batch_size
def flush(self) -> List[Dict]:
"""刷新批次"""
batch = self._pending.copy()
self._pending.clear()
return batch
def estimate_batch_gas(self) -> int:
"""估算批次Gas"""
total_gas = 0
for tx in self._pending:
estimate = GasOptimizer(None).estimate_gas(tx)
total_gas += estimate.gas_limit
return total_gas48.7 本章小结
本章详细介绍了Python区块链开发的核心概念和实践:
- 区块链基础:区块结构、链式存储、共识机制
- 钱包与签名:密钥生成、交易签名、多重签名
- 智能合约:合约开发、代币标准、NFT合约
- Web3开发:Web3.py集成、合约交互、事件监听
练习题
- 实现一个完整的区块链系统,支持交易验证和区块挖矿
- 开发一个ERC-20代币合约,支持转账、授权和铸造
- 实现一个NFT市场,支持NFT创建、交易和拍卖
- 开发一个去中心化投票系统,支持提案和投票
- 实现一个多签钱包,支持多人审批交易