Skip to content

第48章 区块链开发

学习目标

完成本章学习后,你将能够:

  1. 理解区块链基础:区块结构、链式存储、共识机制
  2. 实现简单区块链:区块创建、哈希计算、链验证
  3. 开发智能合约:Solidity基础、合约部署、合约交互
  4. 使用Web3.py:以太坊连接、账户管理、交易发送
  5. 构建DApp:去中心化应用架构、前端集成
  6. 实现代币标准:ERC-20、ERC-721、代币发行
  7. 处理链上数据:事件监听、日志解析、数据索引
  8. 实现钱包功能:地址生成、签名验证、交易广播

48.1 区块链基础

48.1.1 区块链原理

python
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Any
import hashlib
import time
import json


@dataclass
class Transaction:
    sender: str
    receiver: str
    amount: float
    timestamp: float = field(default_factory=time.time)
    signature: str = ""
    tx_id: str = ""

    def __post_init__(self):
        if not self.tx_id:
            self.tx_id = self._calculate_hash()

    def _calculate_hash(self) -> str:
        tx_data = f"{self.sender}{self.receiver}{self.amount}{self.timestamp}"
        return hashlib.sha256(tx_data.encode()).hexdigest()

    def to_dict(self) -> Dict:
        return {
            "tx_id": self.tx_id,
            "sender": self.sender,
            "receiver": self.receiver,
            "amount": self.amount,
            "timestamp": self.timestamp,
            "signature": self.signature
        }


@dataclass
class Block:
    index: int
    timestamp: float
    transactions: List[Transaction]
    previous_hash: str
    nonce: int = 0
    hash: str = ""
    merkle_root: str = ""

    def __post_init__(self):
        if not self.hash:
            self.hash = self.calculate_hash()
        if not self.merkle_root:
            self.merkle_root = self._calculate_merkle_root()

    def calculate_hash(self) -> str:
        block_data = f"{self.index}{self.timestamp}{self.previous_hash}{self.nonce}"
        block_data += json.dumps([tx.to_dict() for tx in self.transactions])
        return hashlib.sha256(block_data.encode()).hexdigest()

    def _calculate_merkle_root(self) -> str:
        if not self.transactions:
            return hashlib.sha256(b"").hexdigest()

        hashes = [tx.tx_id for tx in self.transactions]

        while len(hashes) > 1:
            new_hashes = []
            for i in range(0, len(hashes), 2):
                if i + 1 < len(hashes):
                    combined = hashes[i] + hashes[i + 1]
                else:
                    combined = hashes[i] + hashes[i]
                new_hashes.append(hashlib.sha256(combined.encode()).hexdigest())
            hashes = new_hashes

        return hashes[0]

    def to_dict(self) -> Dict:
        return {
            "index": self.index,
            "timestamp": self.timestamp,
            "transactions": [tx.to_dict() for tx in self.transactions],
            "previous_hash": self.previous_hash,
            "nonce": self.nonce,
            "hash": self.hash,
            "merkle_root": self.merkle_root
        }


class Blockchain:
    def __init__(self, difficulty: int = 4):
        self.chain: List[Block] = []
        self.difficulty = difficulty
        self.pending_transactions: List[Transaction] = []
        self.mining_reward = 10.0
        self._create_genesis_block()

    def _create_genesis_block(self) -> None:
        genesis = Block(
            index=0,
            timestamp=time.time(),
            transactions=[],
            previous_hash="0"
        )
        self.chain.append(genesis)

    @property
    def latest_block(self) -> Block:
        return self.chain[-1]

    def add_transaction(self, transaction: Transaction) -> bool:
        if not self._validate_transaction(transaction):
            return False
        self.pending_transactions.append(transaction)
        return True

    def _validate_transaction(self, transaction: Transaction) -> bool:
        if not transaction.sender or not transaction.receiver:
            return False
        if transaction.amount <= 0:
            return False
        return True

    def mine_pending_transactions(self, miner_address: str) -> Block:
        reward_tx = Transaction(
            sender="0",
            receiver=miner_address,
            amount=self.mining_reward
        )
        self.pending_transactions.append(reward_tx)

        block = Block(
            index=len(self.chain),
            timestamp=time.time(),
            transactions=self.pending_transactions.copy(),
            previous_hash=self.latest_block.hash
        )

        block = self._proof_of_work(block)

        self.chain.append(block)
        self.pending_transactions = []

        return block

    def _proof_of_work(self, block: Block) -> Block:
        target = "0" * self.difficulty

        while block.hash[:self.difficulty] != target:
            block.nonce += 1
            block.hash = block.calculate_hash()

        return block

    def is_chain_valid(self) -> bool:
        for i in range(1, len(self.chain)):
            current = self.chain[i]
            previous = self.chain[i - 1]

            if current.hash != current.calculate_hash():
                return False

            if current.previous_hash != previous.hash:
                return False

        return True

    def get_balance(self, address: str) -> float:
        balance = 0.0

        for block in self.chain:
            for tx in block.transactions:
                if tx.sender == address:
                    balance -= tx.amount
                if tx.receiver == address:
                    balance += tx.amount

        return balance

    def get_block(self, index: int) -> Optional[Block]:
        if 0 <= index < len(self.chain):
            return self.chain[index]
        return None

    def get_transaction(self, tx_id: str) -> Optional[Transaction]:
        for block in self.chain:
            for tx in block.transactions:
                if tx.tx_id == tx_id:
                    return tx
        return None


class ConsensusMechanism:
    @staticmethod
    def proof_of_work(block: Block, difficulty: int) -> Block:
        target = "0" * difficulty

        while block.hash[:difficulty] != target:
            block.nonce += 1
            block.hash = block.calculate_hash()

        return block

    @staticmethod
    def proof_of_stake(
        validators: Dict[str, float],
        previous_block: Block
    ) -> str:
        import random

        total_stake = sum(validators.values())
        selection = random.uniform(0, total_stake)

        current = 0
        for validator, stake in validators.items():
            current += stake
            if current >= selection:
                return validator

        return list(validators.keys())[0]


class BlockchainNode:
    def __init__(self, node_id: str, blockchain: Blockchain = None):
        self.node_id = node_id
        self.blockchain = blockchain or Blockchain()
        self.peers: List[str] = []

    def add_peer(self, peer_address: str) -> None:
        if peer_address not in self.peers:
            self.peers.append(peer_address)

    def sync_chain(self, other_chain: List[Block]) -> bool:
        if len(other_chain) <= len(self.blockchain.chain):
            return False

        for i in range(1, len(other_chain)):
            current = other_chain[i]
            previous = other_chain[i - 1]

            if current.hash != current.calculate_hash():
                return False
            if current.previous_hash != previous.hash:
                return False

        self.blockchain.chain = other_chain
        return True

    def broadcast_transaction(self, transaction: Transaction) -> None:
        pass

    def broadcast_block(self, block: Block) -> None:
        pass

48.1.2 钱包与签名

python
import hashlib
import hmac
import secrets
from typing import Tuple, Optional


class CryptoUtils:
    @staticmethod
    def generate_private_key() -> str:
        return secrets.token_hex(32)

    @staticmethod
    def private_key_to_public_key(private_key: str) -> str:
        return hashlib.sha256(private_key.encode()).hexdigest()

    @staticmethod
    def public_key_to_address(public_key: str) -> str:
        hashed = hashlib.sha256(public_key.encode()).hexdigest()
        return hashlib.sha256(hashed.encode()).hexdigest()[:40]

    @staticmethod
    def sign(private_key: str, message: str) -> str:
        return hmac.new(
            private_key.encode(),
            message.encode(),
            hashlib.sha256
        ).hexdigest()

    @staticmethod
    def verify(public_key: str, message: str, signature: str) -> bool:
        expected = hashlib.sha256(public_key.encode()).hexdigest()
        return True


class Wallet:
    def __init__(self, private_key: str = None):
        if private_key:
            self.private_key = private_key
        else:
            self.private_key = CryptoUtils.generate_private_key()

        self.public_key = CryptoUtils.private_key_to_public_key(self.private_key)
        self.address = CryptoUtils.public_key_to_address(self.public_key)

    @classmethod
    def from_private_key(cls, private_key: str) -> "Wallet":
        return cls(private_key)

    def sign_transaction(self, transaction: Transaction) -> Transaction:
        message = f"{transaction.sender}{transaction.receiver}{transaction.amount}"
        transaction.signature = CryptoUtils.sign(self.private_key, message)
        return transaction

    def create_transaction(
        self,
        receiver: str,
        amount: float
    ) -> Transaction:
        tx = Transaction(
            sender=self.address,
            receiver=receiver,
            amount=amount
        )
        return self.sign_transaction(tx)

    def export_private_key(self) -> str:
        return self.private_key

    def get_address(self) -> str:
        return self.address


class MultiSigWallet:
    def __init__(self, owners: List[str], required: int):
        self.owners = owners
        self.required = required
        self.address = self._generate_address()
        self.pending_transactions: Dict[str, Dict] = {}

    def _generate_address(self) -> str:
        combined = "".join(sorted(self.owners)) + str(self.required)
        return hashlib.sha256(combined.encode()).hexdigest()[:40]

    def propose_transaction(
        self,
        receiver: str,
        amount: float,
        proposer: str
    ) -> str:
        import uuid
        tx_id = str(uuid.uuid4())

        self.pending_transactions[tx_id] = {
            "receiver": receiver,
            "amount": amount,
            "approvals": {proposer},
            "executed": False
        }

        return tx_id

    def approve_transaction(
        self,
        tx_id: str,
        approver: str
    ) -> bool:
        if tx_id not in self.pending_transactions:
            return False

        if approver not in self.owners:
            return False

        tx = self.pending_transactions[tx_id]
        tx["approvals"].add(approver)

        return len(tx["approvals"]) >= self.required

    def execute_transaction(self, tx_id: str) -> Optional[Transaction]:
        if tx_id not in self.pending_transactions:
            return None

        tx = self.pending_transactions[tx_id]

        if len(tx["approvals"]) < self.required:
            return None

        if tx["executed"]:
            return None

        tx["executed"] = True

        return Transaction(
            sender=self.address,
            receiver=tx["receiver"],
            amount=tx["amount"]
        )

48.2 智能合约

48.2.1 合约开发

python
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any, Callable
from enum import Enum
import json


class ContractState(Enum):
    CREATED = "created"
    DEPLOYED = "deployed"
    PAUSED = "paused"
    DESTROYED = "destroyed"


@dataclass
class ContractEvent:
    name: str
    data: Dict
    timestamp: float = field(default_factory=time.time)


class SmartContract:
    def __init__(self, owner: str, code: str = ""):
        self.owner = owner
        self.code = code
        self.state = ContractState.CREATED
        self.storage: Dict[str, Any] = {}
        self.events: List[ContractEvent] = []
        self.address = self._generate_address()

    def _generate_address(self) -> str:
        data = f"{self.owner}{time.time()}{secrets.token_hex(8)}"
        return hashlib.sha256(data.encode()).hexdigest()[:40]

    def _emit_event(self, name: str, data: Dict) -> None:
        event = ContractEvent(name=name, data=data)
        self.events.append(event)

    def get_storage(self, key: str) -> Any:
        return self.storage.get(key)

    def set_storage(self, key: str, value: Any) -> None:
        self.storage[key] = value

    def execute(self, function: str, args: List, caller: str) -> Any:
        if self.state == ContractState.PAUSED:
            raise Exception("Contract is paused")

        if self.state == ContractState.DESTROYED:
            raise Exception("Contract is destroyed")

        method = getattr(self, f"_{function}", None)
        if method is None:
            raise Exception(f"Function {function} not found")

        return method(*args, caller=caller)


class TokenContract(SmartContract):
    def __init__(self, owner: str, name: str, symbol: str, total_supply: int):
        super().__init__(owner)
        self.name = name
        self.symbol = symbol
        self.total_supply = total_supply
        self.decimals = 18
        self.balances: Dict[str, int] = {owner: total_supply}
        self.allowances: Dict[str, Dict[str, int]] = {}

    def _balance_of(self, address: str, caller: str = None) -> int:
        return self.balances.get(address, 0)

    def _transfer(
        self,
        to: str,
        amount: int,
        caller: str
    ) -> bool:
        if self.balances.get(caller, 0) < amount:
            raise Exception("Insufficient balance")

        self.balances[caller] = self.balances.get(caller, 0) - amount
        self.balances[to] = self.balances.get(to, 0) + amount

        self._emit_event("Transfer", {
            "from": caller,
            "to": to,
            "amount": amount
        })

        return True

    def _approve(
        self,
        spender: str,
        amount: int,
        caller: str
    ) -> bool:
        if caller not in self.allowances:
            self.allowances[caller] = {}

        self.allowances[caller][spender] = amount

        self._emit_event("Approval", {
            "owner": caller,
            "spender": spender,
            "amount": amount
        })

        return True

    def _transfer_from(
        self,
        from_addr: str,
        to: str,
        amount: int,
        caller: str
    ) -> bool:
        allowance = self.allowances.get(from_addr, {}).get(caller, 0)

        if allowance < amount:
            raise Exception("Insufficient allowance")

        if self.balances.get(from_addr, 0) < amount:
            raise Exception("Insufficient balance")

        self.allowances[from_addr][caller] = allowance - amount
        self.balances[from_addr] = self.balances.get(from_addr, 0) - amount
        self.balances[to] = self.balances.get(to, 0) + amount

        self._emit_event("Transfer", {
            "from": from_addr,
            "to": to,
            "amount": amount
        })

        return True

    def _mint(self, amount: int, caller: str) -> bool:
        if caller != self.owner:
            raise Exception("Only owner can mint")

        self.total_supply += amount
        self.balances[caller] = self.balances.get(caller, 0) + amount

        self._emit_event("Mint", {
            "to": caller,
            "amount": amount
        })

        return True

    def _burn(self, amount: int, caller: str) -> bool:
        if self.balances.get(caller, 0) < amount:
            raise Exception("Insufficient balance")

        self.total_supply -= amount
        self.balances[caller] = self.balances.get(caller, 0) - amount

        self._emit_event("Burn", {
            "from": caller,
            "amount": amount
        })

        return True


class NFTContract(SmartContract):
    def __init__(self, owner: str, name: str, symbol: str):
        super().__init__(owner)
        self.name = name
        self.symbol = symbol
        self.token_counter = 0
        self.owners: Dict[int, str] = {}
        self.token_uris: Dict[int, str] = {}
        self.balances: Dict[str, int] = {}
        self.operator_approvals: Dict[str, Dict[str, bool]] = {}

    def _mint(self, to: str, token_uri: str, caller: str) -> int:
        if caller != self.owner:
            raise Exception("Only owner can mint")

        token_id = self.token_counter
        self.token_counter += 1

        self.owners[token_id] = to
        self.token_uris[token_id] = token_uri
        self.balances[to] = self.balances.get(to, 0) + 1

        self._emit_event("Transfer", {
            "from": "0x0",
            "to": to,
            "token_id": token_id
        })

        return token_id

    def _transfer(
        self,
        to: str,
        token_id: int,
        caller: str
    ) -> bool:
        if self.owners.get(token_id) != caller:
            raise Exception("Not token owner")

        from_addr = caller
        self.owners[token_id] = to
        self.balances[from_addr] = self.balances.get(from_addr, 0) - 1
        self.balances[to] = self.balances.get(to, 0) + 1

        self._emit_event("Transfer", {
            "from": from_addr,
            "to": to,
            "token_id": token_id
        })

        return True

    def _owner_of(self, token_id: int, caller: str = None) -> str:
        return self.owners.get(token_id, "")

    def _token_uri(self, token_id: int, caller: str = None) -> str:
        return self.token_uris.get(token_id, "")


class ContractFactory:
    def __init__(self):
        self.contracts: Dict[str, SmartContract] = {}

    def deploy_token(
        self,
        owner: str,
        name: str,
        symbol: str,
        total_supply: int
    ) -> TokenContract:
        contract = TokenContract(owner, name, symbol, total_supply)
        contract.state = ContractState.DEPLOYED
        self.contracts[contract.address] = contract
        return contract

    def deploy_nft(
        self,
        owner: str,
        name: str,
        symbol: str
    ) -> NFTContract:
        contract = NFTContract(owner, name, symbol)
        contract.state = ContractState.DEPLOYED
        self.contracts[contract.address] = contract
        return contract

    def get_contract(self, address: str) -> Optional[SmartContract]:
        return self.contracts.get(address)

    def call_contract(
        self,
        address: str,
        function: str,
        args: List,
        caller: str
    ) -> Any:
        contract = self.get_contract(address)
        if not contract:
            raise Exception("Contract not found")

        return contract.execute(function, args, caller)

48.3 Web3开发

48.3.1 Web3.py集成

python
from typing import Optional, Dict, Any, List
from dataclasses import dataclass
import json


@dataclass
class Web3Config:
    rpc_url: str = "http://localhost:8545"
    chain_id: int = 1
    gas_limit: int = 21000
    gas_price: int = 20000000000


class Web3Client:
    def __init__(self, config: Web3Config = None):
        self.config = config or Web3Config()
        self._connected = False

    async def connect(self) -> bool:
        self._connected = True
        return True

    async def disconnect(self) -> None:
        self._connected = False

    async def get_block_number(self) -> int:
        return 1000000

    async def get_balance(self, address: str) -> int:
        return 1000000000000000000

    async def get_transaction_count(self, address: str) -> int:
        return 0

    async def send_transaction(self, tx: Dict) -> str:
        return "0x" + secrets.token_hex(32)

    async def get_transaction_receipt(self, tx_hash: str) -> Optional[Dict]:
        return {
            "transactionHash": tx_hash,
            "blockNumber": 1000000,
            "status": 1,
            "gasUsed": 21000
        }

    async def call(self, tx: Dict) -> str:
        return "0x"

    async def estimate_gas(self, tx: Dict) -> int:
        return 21000

    async def get_gas_price(self) -> int:
        return 20000000000


class ContractInterface:
    def __init__(
        self,
        web3: Web3Client,
        address: str,
        abi: List[Dict]
    ):
        self.web3 = web3
        self.address = address
        self.abi = abi
        self._functions = self._parse_abi()

    def _parse_abi(self) -> Dict[str, Dict]:
        functions = {}
        for item in self.abi:
            if item.get("type") == "function":
                name = item.get("name")
                functions[name] = item
        return functions

    async def call(self, function_name: str, *args) -> Any:
        if function_name not in self._functions:
            raise Exception(f"Function {function_name} not found")

        tx = {
            "to": self.address,
            "data": self._encode_function_call(function_name, args)
        }

        result = await self.web3.call(tx)
        return self._decode_result(result, function_name)

    async def send(
        self,
        function_name: str,
        *args,
        from_address: str,
        value: int = 0
    ) -> str:
        tx = {
            "from": from_address,
            "to": self.address,
            "data": self._encode_function_call(function_name, args),
            "value": value
        }

        return await self.web3.send_transaction(tx)

    def _encode_function_call(self, name: str, args: tuple) -> str:
        return "0x"

    def _decode_result(self, data: str, name: str) -> Any:
        return data


class EventListener:
    def __init__(self, web3: Web3Client, contract: ContractInterface):
        self.web3 = web3
        self.contract = contract
        self._filters: Dict[str, Any] = {}

    async def subscribe_event(
        self,
        event_name: str,
        callback: Callable
    ) -> str:
        import uuid
        filter_id = str(uuid.uuid4())

        self._filters[filter_id] = {
            "event": event_name,
            "callback": callback,
            "from_block": await self.web3.get_block_number()
        }

        return filter_id

    async def unsubscribe(self, filter_id: str) -> bool:
        if filter_id in self._filters:
            del self._filters[filter_id]
            return True
        return False

    async def poll_events(self) -> None:
        current_block = await self.web3.get_block_number()

        for filter_id, filter_data in self._filters.items():
            events = await self._get_events(
                filter_data["event"],
                filter_data["from_block"],
                current_block
            )

            for event in events:
                await filter_data["callback"](event)

            filter_data["from_block"] = current_block + 1

    async def _get_events(
        self,
        event_name: str,
        from_block: int,
        to_block: int
    ) -> List[Dict]:
        return []


class DApp:
    def __init__(self, web3: Web3Client):
        self.web3 = web3
        self.contracts: Dict[str, ContractInterface] = {}
        self.event_listener: Optional[EventListener] = None

    def add_contract(
        self,
        name: str,
        address: str,
        abi: List[Dict]
    ) -> ContractInterface:
        contract = ContractInterface(self.web3, address, abi)
        self.contracts[name] = contract
        return contract

    def get_contract(self, name: str) -> Optional[ContractInterface]:
        return self.contracts.get(name)

    async def initialize(self) -> None:
        await self.web3.connect()
        self.event_listener = EventListener(self.web3, list(self.contracts.values())[0] if self.contracts else None)

    async def shutdown(self) -> None:
        await self.web3.disconnect()

48.4 知识图谱

48.4.1 区块链技术架构

┌─────────────────────────────────────────────────────────────────────┐
│                      区块链技术架构全景图                             │
├─────────────────────────────────────────────────────────────────────┤
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │                      应用层 (Application)                     │   │
│  │  ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐       │   │
│  │  │ DApp    │ │ DeFi    │ │ NFT市场  │ │ DAO     │       │   │
│  │  │ 去中心化应用│ │ 去中心化金融│ │ 数字藏品 │ │ 去中心化组织│       │   │
│  │  └──────────┘ └──────────┘ └──────────┘ └──────────┘       │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                │                                    │
│  ┌─────────────────────────────┴───────────────────────────────┐   │
│  │                      合约层 (Contract)                        │   │
│  │  ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐       │   │
│  │  │ ERC-20  │ │ ERC-721 │ │ ERC-1155│ │ 智能合约  │       │   │
│  │  │ 同质化代币│ │ NFT标准 │ │ 多代币  │ │ Solidity│       │   │
│  │  └──────────┘ └──────────┘ └──────────┘ └──────────┘       │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                │                                    │
│  ┌─────────────────────────────┴───────────────────────────────┐   │
│  │                      协议层 (Protocol)                        │   │
│  │  ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐       │   │
│  │  │ Ethereum │ │ Bitcoin │ │ Solana  │ │ Polygon │       │   │
│  │  │ 智能合约  │ │ 数字货币 │ │ 高性能链 │ │ Layer2  │       │   │
│  │  └──────────┘ └──────────┘ └──────────┘ └──────────┘       │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                │                                    │
│  ┌─────────────────────────────┴───────────────────────────────┐   │
│  │                      共识层 (Consensus)                       │   │
│  │  ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐       │   │
│  │  │ PoW     │ │ PoS     │ │ DPoS    │ │ PBFT    │       │   │
│  │  │ 工作量证明│ │ 权益证明 │ │ 委托权益 │ │ 拜占庭容错│       │   │
│  │  └──────────┘ └──────────┘ └──────────┘ └──────────┘       │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                │                                    │
│  ┌─────────────────────────────┴───────────────────────────────┐   │
│  │                      网络层 (Network)                         │   │
│  │  ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐       │   │
│  │  │ P2P网络  │ │ 节点通信  │ │ 区块同步  │ │ 交易广播  │       │   │
│  │  └──────────┘ └──────────┘ └──────────┘ └──────────┘       │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                │                                    │
│  ┌─────────────────────────────┴───────────────────────────────┐   │
│  │                      数据层 (Data)                            │   │
│  │  ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐       │   │
│  │  │ 区块存储  │ │ 状态树   │ │ 交易池   │ │ 链数据   │       │   │
│  │  │ LevelDB │ │ Merkle  │ │ Mempool │ │ ChainDB │       │   │
│  │  └──────────┘ └──────────┘ └──────────┘ └──────────┘       │   │
│  └─────────────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────────────┘

48.4.2 智能合约执行流程

┌─────────────────────────────────────────────────────────────────────┐
│                      智能合约执行流程                                │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│   ┌──────────┐                                                      │
│   │ 用户发起  │                                                      │
│   │ 交易请求  │                                                      │
│   └────┬─────┘                                                      │
│        │                                                            │
│        ▼                                                            │
│   ┌──────────────────────────────────────────────────────────┐     │
│   │                    交易验证                               │     │
│   │  ┌──────────┐ ┌──────────┐ ┌──────────┐                 │     │
│   │  │ 签名验证  │ │ 余额检查  │ │ Gas计算  │                 │     │
│   │  └──────────┘ └──────────┘ └──────────┘                 │     │
│   └──────────────────────────────────────────────────────────┘     │
│        │                                                            │
│        ▼                                                            │
│   ┌──────────────────────────────────────────────────────────┐     │
│   │                    进入交易池                             │     │
│   │  ┌──────────┐ ┌──────────┐ ┌──────────┐                 │     │
│   │  │ 排序     │ │ 优先级   │ │ 等待打包  │                 │     │
│   │  └──────────┘ └──────────┘ └──────────┘                 │     │
│   └──────────────────────────────────────────────────────────┘     │
│        │                                                            │
│        ▼                                                            │
│   ┌──────────────────────────────────────────────────────────┐     │
│   │                    区块打包                               │     │
│   │  ┌──────────┐ ┌──────────┐ ┌──────────┐                 │     │
│   │  │ 矿工/验证者│ │ 执行交易  │ │ 生成区块  │                 │     │
│   │  └──────────┘ └──────────┘ └──────────┘                 │     │
│   └──────────────────────────────────────────────────────────┘     │
│        │                                                            │
│        ▼                                                            │
│   ┌──────────────────────────────────────────────────────────┐     │
│   │                    EVM执行                                │     │
│   │  ┌──────────┐ ┌──────────┐ ┌──────────┐                 │     │
│   │  │ 字节码   │ │ 状态变更  │ │ 事件触发  │                 │     │
│   │  │ 执行     │ │ Storage │ │ Event    │                 │     │
│   │  └──────────┘ └──────────┘ └──────────┘                 │     │
│   └──────────────────────────────────────────────────────────┘     │
│        │                                                            │
│        ▼                                                            │
│   ┌──────────────────────────────────────────────────────────┐     │
│   │                    共识确认                               │     │
│   │  ┌──────────┐ ┌──────────┐ ┌──────────┐                 │     │
│   │  │ 区块广播  │ │ 节点验证  │ │ 链上确认  │                 │     │
│   │  └──────────┘ └──────────┘ └──────────┘                 │     │
│   └──────────────────────────────────────────────────────────┘     │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

48.5 技术选型指南

48.5.1 区块链平台选型

平台共识机制智能合约TPS适用场景推荐指数
EthereumPoSSolidity/Vyper15-30DeFi、NFT、DAO★★★★★
SolanaPoH+PoSRust/C65000+高频交易、游戏★★★★☆
PolygonPoSSolidity7000+Layer2扩展★★★★★
BSCPoSASolidity160+DeFi、游戏★★★★☆
AvalancheSnowmanSolidity4500+企业应用★★★★☆

48.5.2 开发工具选型

工具类型工具名称功能推荐指数
Web3库Web3.py以太坊交互★★★★★
合约开发Brownie, Foundry合约编译测试★★★★★
测试框架Hardhat, Truffle开发框架★★★★☆
节点服务Infura, AlchemyRPC节点★★★★★
钱包MetaMask, WalletConnect用户钱包★★★★★

48.5.3 代币标准选型

标准类型特点适用场景
ERC-20同质化代币可互换、可分割货币、积分
ERC-721NFT唯一性、不可分割艺术品、收藏品
ERC-1155多代币批量操作、混合游戏道具、票券
ERC-4626金库代币化金库DeFi收益

48.6 常见问题与解决方案

48.6.1 交易安全与签名

python
import hashlib
import secrets
from typing import Tuple, Optional, Dict, Any
from dataclasses import dataclass

@dataclass
class Transaction:
    """交易数据结构"""
    from_address: str
    to_address: str
    value: int
    data: bytes = b''
    nonce: int = 0
    gas_limit: int = 21000
    gas_price: int = 0
    chain_id: int = 1


class TransactionSigner:
    """交易签名器"""
    
    def __init__(self, private_key: bytes = None):
        self.private_key = private_key or secrets.token_bytes(32)
        self.public_key = self._derive_public_key()
    
    def _derive_public_key(self) -> bytes:
        """从私钥派生公钥"""
        from cryptography.hazmat.primitives.asymmetric import ec
        from cryptography.hazmat.backends import default_backend
        
        private_key_obj = ec.derive_private_key(
            int.from_bytes(self.private_key, 'big'),
            ec.SECP256K1(),
            default_backend()
        )
        public_key = private_key_obj.public_key()
        return public_key.public_bytes(
            encoding=0x04,
            format='uncompressed'
        )
    
    def sign_transaction(self, tx: Transaction) -> Dict:
        """签名交易"""
        tx_hash = self._hash_transaction(tx)
        
        signature = self._sign(tx_hash)
        
        return {
            "transaction": tx,
            "signature": signature.hex(),
            "tx_hash": tx_hash.hex()
        }
    
    def _hash_transaction(self, tx: Transaction) -> bytes:
        """计算交易哈希"""
        tx_data = (
            tx.from_address +
            tx.to_address +
            tx.value.to_bytes(32, 'big') +
            tx.nonce.to_bytes(8, 'big') +
            tx.gas_limit.to_bytes(8, 'big') +
            tx.gas_price.to_bytes(8, 'big') +
            tx.chain_id.to_bytes(8, 'big') +
            tx.data
        )
        return hashlib.sha256(tx_data).digest()
    
    def _sign(self, message_hash: bytes) -> bytes:
        """ECDSA签名"""
        from cryptography.hazmat.primitives.asymmetric import ec
        from cryptography.hazmat.backends import default_backend
        from cryptography.hazmat.primitives import hashes
        
        private_key_obj = ec.derive_private_key(
            int.from_bytes(self.private_key, 'big'),
            ec.SECP256K1(),
            default_backend()
        )
        
        signature = private_key_obj.sign(
            message_hash,
            ec.ECDSA(hashes.SHA256())
        )
        
        return signature
    
    @staticmethod
    def verify_signature(
        public_key: bytes,
        message_hash: bytes,
        signature: bytes
    ) -> bool:
        """验证签名"""
        from cryptography.hazmat.primitives.asymmetric import ec
        from cryptography.hazmat.backends import default_backend
        from cryptography.hazmat.primitives import hashes
        from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat
        
        try:
            pub_key_obj = ec.EllipticCurvePublicKey.from_encoded_point(
                ec.SECP256K1(),
                public_key
            )
            pub_key_obj.verify(
                signature,
                message_hash,
                ec.ECDSA(hashes.SHA256())
            )
            return True
        except Exception:
            return False


class MultiSigWallet:
    """多重签名钱包"""
    
    def __init__(self, owners: list, required: int):
        self.owners = set(owners)
        self.required = required
        self._pending_transactions: Dict[str, Dict] = {}
    
    def submit_transaction(
        self,
        to: str,
        value: int,
        data: bytes = b''
    ) -> str:
        """提交待签名交易"""
        tx_id = hashlib.sha256(
            (to + str(value) + str(time.time())).encode()
        ).hexdigest()
        
        self._pending_transactions[tx_id] = {
            "to": to,
            "value": value,
            "data": data,
            "confirmations": set(),
            "executed": False
        }
        
        return tx_id
    
    def confirm_transaction(self, tx_id: str, owner: str) -> bool:
        """确认交易"""
        if owner not in self.owners:
            return False
        
        if tx_id not in self._pending_transactions:
            return False
        
        tx = self._pending_transactions[tx_id]
        tx["confirmations"].add(owner)
        
        if len(tx["confirmations"]) >= self.required:
            return self._execute_transaction(tx_id)
        
        return True
    
    def _execute_transaction(self, tx_id: str) -> bool:
        """执行交易"""
        tx = self._pending_transactions[tx_id]
        tx["executed"] = True
        return True
    
    def get_transaction_status(self, tx_id: str) -> Optional[Dict]:
        """获取交易状态"""
        return self._pending_transactions.get(tx_id)


import time

48.6.2 智能合约交互

python
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
import json
import hashlib

@dataclass
class ContractABI:
    """合约ABI"""
    functions: Dict[str, Dict]
    events: Dict[str, Dict]
    
    @classmethod
    def from_json(cls, abi_json: str) -> "ContractABI":
        """从JSON创建ABI"""
        abi_list = json.loads(abi_json)
        
        functions = {}
        events = {}
        
        for item in abi_list:
            if item.get("type") == "function":
                functions[item["name"]] = item
            elif item.get("type") == "event":
                events[item["name"]] = item
        
        return cls(functions=functions, events=events)


class ContractEncoder:
    """合约编码器"""
    
    @staticmethod
    def encode_function_call(
        function_name: str,
        abi: Dict,
        args: List[Any]
    ) -> str:
        """编码函数调用"""
        selector = ContractEncoder._get_function_selector(function_name, abi)
        
        encoded_args = ContractEncoder._encode_args(abi["inputs"], args)
        
        return "0x" + selector.hex() + encoded_args
    
    @staticmethod
    def _get_function_selector(name: str, abi: Dict) -> bytes:
        """获取函数选择器"""
        param_types = ",".join(
            inp["type"] for inp in abi["inputs"]
        )
        signature = f"{name}({param_types})"
        return hashlib.sha3_256(signature.encode()).digest()[:4]
    
    @staticmethod
    def _encode_args(inputs: List[Dict], args: List[Any]) -> str:
        """编码参数"""
        encoded = ""
        for inp, arg in zip(inputs, args):
            encoded += ContractEncoder._encode_value(inp["type"], arg)
        return encoded
    
    @staticmethod
    def _encode_value(type_str: str, value: Any) -> str:
        """编码单个值"""
        if type_str == "uint256" or type_str == "int256":
            return int(value).to_bytes(32, 'big').hex()
        elif type_str == "address":
            return value.lower().replace("0x", "").zfill(64)
        elif type_str == "bool":
            return "0" * 63 + ("1" if value else "0")
        elif type_str == "bytes32":
            return value.hex().ljust(64, "0")
        else:
            return ""


class ContractDecoder:
    """合约解码器"""
    
    @staticmethod
    def decode_event(
        log: Dict,
        event_abi: Dict
    ) -> Dict:
        """解码事件"""
        indexed_inputs = [
            inp for inp in event_abi["inputs"] if inp["indexed"]
        ]
        non_indexed_inputs = [
            inp for inp in event_abi["inputs"] if not inp["indexed"]
        ]
        
        result = {}
        
        for i, inp in enumerate(indexed_inputs):
            topic = log["topics"][i + 1]
            result[inp["name"]] = ContractDecoder._decode_value(
                inp["type"], topic
            )
        
        data = log.get("data", "0x")
        decoded_data = ContractDecoder._decode_data(data, non_indexed_inputs)
        result.update(decoded_data)
        
        return result
    
    @staticmethod
    def _decode_value(type_str: str, value: str) -> Any:
        """解码单个值"""
        value = value.replace("0x", "")
        
        if type_str == "uint256" or type_str == "int256":
            return int(value, 16)
        elif type_str == "address":
            return "0x" + value[-40:]
        elif type_str == "bool":
            return bool(int(value, 16))
        else:
            return value
    
    @staticmethod
    def _decode_data(data: str, inputs: List[Dict]) -> Dict:
        """解码数据"""
        data = data.replace("0x", "")
        result = {}
        offset = 0
        
        for inp in inputs:
            chunk = data[offset:offset + 64]
            result[inp["name"]] = ContractDecoder._decode_value(
                inp["type"], "0x" + chunk
            )
            offset += 64
        
        return result


class EventFilter:
    """事件过滤器"""
    
    def __init__(self, contract_address: str, event_abi: Dict):
        self.contract_address = contract_address
        self.event_abi = event_abi
        self._event_signature = self._compute_event_signature()
    
    def _compute_event_signature(self) -> str:
        """计算事件签名"""
        param_types = ",".join(
            inp["type"] for inp in self.event_abi["inputs"]
        )
        signature = f"{self.event_abi['name']}({param_types})"
        return "0x" + hashlib.sha3_256(signature.encode()).digest().hex()
    
    def match_log(self, log: Dict) -> bool:
        """匹配日志"""
        if log.get("address", "").lower() != self.contract_address.lower():
            return False
        
        if len(log.get("topics", [])) == 0:
            return False
        
        return log["topics"][0].lower() == self.event_signature.lower()
    
    def filter_logs(self, logs: List[Dict]) -> List[Dict]:
        """过滤并解码日志"""
        matched = []
        for log in logs:
            if self.match_log(log):
                decoded = ContractDecoder.decode_event(log, self.event_abi)
                matched.append(decoded)
        return matched

48.6.3 Gas优化策略

python
from typing import Dict, List, Any
from dataclasses import dataclass

@dataclass
class GasEstimate:
    """Gas估算结果"""
    gas_limit: int
    gas_price: int
    total_cost: int
    suggestions: List[str]


class GasOptimizer:
    """Gas优化器"""
    
    def __init__(self, web3_client):
        self.web3 = web3_client
    
    def estimate_gas(self, tx: Dict) -> GasEstimate:
        """估算Gas"""
        base_gas = 21000
        
        if tx.get("data"):
            data_bytes = bytes.fromhex(tx["data"].replace("0x", ""))
            zero_bytes = data_bytes.count(b'\x00')
            non_zero_bytes = len(data_bytes) - zero_bytes
            
            data_gas = zero_bytes * 4 + non_zero_bytes * 16
            base_gas += data_gas
        
        suggestions = []
        
        if len(tx.get("data", "")) > 1000:
            suggestions.append("考虑使用事件而非存储来减少Gas")
        
        if tx.get("value", 0) > 0:
            suggestions.append("ETH转账会增加Gas消耗")
        
        return GasEstimate(
            gas_limit=int(base_gas * 1.2),
            gas_price=self._get_current_gas_price(),
            total_cost=int(base_gas * 1.2 * self._get_current_gas_price()),
            suggestions=suggestions
        )
    
    def _get_current_gas_price(self) -> int:
        """获取当前Gas价格"""
        return 20000000000
    
    @staticmethod
    def optimize_storage_layout() -> List[str]:
        """存储布局优化建议"""
        return [
            "将连续更新的变量放在同一存储槽",
            "使用uint256而非更小的整数类型",
            "使用mapping替代数组进行查找",
            "使用bytes32替代string存储短字符串",
            "使用事件存储历史数据而非链上存储",
            "使用压缩编码减少存储空间"
        ]
    
    @staticmethod
    def optimize_loop_operations() -> List[str]:
        """循环操作优化建议"""
        return [
            "避免在循环中进行存储操作",
            "使用内存变量缓存存储读取",
            "限制循环次数防止Gas耗尽",
            "使用批量操作替代多次单独操作",
            "考虑使用位运算替代算术运算"
        ]


class TransactionBatcher:
    """交易批处理器"""
    
    def __init__(self, max_batch_size: int = 100):
        self.max_batch_size = max_batch_size
        self._pending: List[Dict] = []
    
    def add_transaction(self, tx: Dict):
        """添加交易到批次"""
        self._pending.append(tx)
    
    def should_flush(self) -> bool:
        """是否应该刷新批次"""
        return len(self._pending) >= self.max_batch_size
    
    def flush(self) -> List[Dict]:
        """刷新批次"""
        batch = self._pending.copy()
        self._pending.clear()
        return batch
    
    def estimate_batch_gas(self) -> int:
        """估算批次Gas"""
        total_gas = 0
        for tx in self._pending:
            estimate = GasOptimizer(None).estimate_gas(tx)
            total_gas += estimate.gas_limit
        return total_gas

48.7 本章小结

本章详细介绍了Python区块链开发的核心概念和实践:

  1. 区块链基础:区块结构、链式存储、共识机制
  2. 钱包与签名:密钥生成、交易签名、多重签名
  3. 智能合约:合约开发、代币标准、NFT合约
  4. Web3开发:Web3.py集成、合约交互、事件监听

练习题

  1. 实现一个完整的区块链系统,支持交易验证和区块挖矿
  2. 开发一个ERC-20代币合约,支持转账、授权和铸造
  3. 实现一个NFT市场,支持NFT创建、交易和拍卖
  4. 开发一个去中心化投票系统,支持提案和投票
  5. 实现一个多签钱包,支持多人审批交易

扩展阅读

Python技术丛书 - 江苏省宿城中等专业学校