DEX 流程详解:从原理到实现

Web3 中的 DEX 流程详解:从原理到实现

什么是 DEX?

去中心化交易所(Decentralized Exchange, DEX)是区块链技术的重要应用之一,它允许用户在没有中央机构的情况下直接进行数字资产交易。与传统的中心化交易所(CEX)不同,DEX 不依赖中介机构来托管用户资金或执行交易,而是通过智能合约实现自动化交易。

核心概念:什么是 AMM?

DEX 的核心是自动做市商(Automated Market Maker, AMM)机制。AMM 是一种无需传统订单簿的交易协议,它通过数学公式来定价资产。最经典的 AMM 公式是恒定乘积公式:

x * y = k

其中:

这个公式确保了交易前后 k 的值几乎不变(除了手续费导致的微小变化),从而实现了资产的自动定价和交易。


第一部分:智能合约实现 (MiniSwap.sol)

注意:此合约仅用于测试和学习,请勿用于生产环境。

1. 模拟 ERC20 代币合约

首先,我们需要一个简单的 ERC20 代币合约用于测试:

// contracts/MiniSwap.sol
// SPDX-License-Identifier: MIT
pragma solidity ^0.8.20;

/**
 * @dev 简易 ERC20 代币,用于实验测试
 */
contract MockERC20 {
    string public name;
    string public symbol;
    uint8 public decimals = 18;
    uint256 public totalSupply;
    mapping(address => uint256) public balanceOf;
    mapping(address => mapping(address => uint256)) public allowance;

    constructor(string memory _name, string memory _symbol) {
        name = _name;
        symbol = _symbol;
    }

    function mint(address to, uint256 amount) external {
        totalSupply += amount;
        balanceOf[to] += amount;
    }

    function approve(address spender, uint256 amount) external returns (bool) {
        allowance[msg.sender][spender] = amount;
        return true;
    }

    function transfer(address to, uint256 amount) external returns (bool) {
        require(balanceOf[msg.sender] >= amount, "Insufficient balance");
        balanceOf[msg.sender] -= amount;
        balanceOf[to] += amount;
        return true;
    }

    function transferFrom(
        address from,
        address to,
        uint256 amount
    ) external returns (bool) {
        require(balanceOf[from] >= amount, "Insufficient balance");
        require(allowance[from][msg.sender] >= amount, "Allowance exceeded");
        
        allowance[from][msg.sender] -= amount;
        balanceOf[from] -= amount;
        balanceOf[to] += amount;
        return true;
    }
}

2. MiniSwap 核心合约

接下来是实现 AMM 机制的核心 DEX 合约:

/**
 * @title MiniSwap
 * @dev 这是一个教学用的极简 DEX
 */
contract MiniSwap {
    MockERC20 public immutable tokenA;
    MockERC20 public immutable tokenB;
    
    uint256 public reserveA;
    uint256 public reserveB;
    uint256 public totalShares;
    mapping(address => uint256) public lpShares;
    
    // 手续费 0.3%
    uint256 public constant FEE_RATE = 997; // 实际费率为 (1000-997)/1000 = 0.3%

    event Mint(address indexed provider, uint256 amountA, uint256 amountB, uint256 share);
    event Burn(address indexed provider, uint256 amountA, uint256 amountB, uint256 share);
    event Swap(address indexed user, uint256 amountIn, uint256 amountOut, address tokenIn, address tokenOut);

    constructor(address _tokenA, address _tokenB) {
        tokenA = MockERC20(_tokenA);
        tokenB = MockERC20(_tokenB);
    }

    // 1. 添加流动性
    function addLiquidity(uint256 amountA, uint256 amountB) external {
        require(amountA > 0 && amountB > 0, "Amounts must be positive");
        
        tokenA.transferFrom(msg.sender, address(this), amountA);
        tokenB.transferFrom(msg.sender, address(this), amountB);

        uint256 share;
        if (totalShares == 0) {
            // 初始流动性
            share = amountA; // 简单定为 amountA 的数量
        } else {
            // 后续流动性必须按比例添加
            require(amountA * reserveB == amountB * reserveA, "Wrong ratio");
            share = (amountA * totalShares) / reserveA;
        }

        reserveA += amountA;
        reserveB += amountB;
        totalShares += share;
        lpShares[msg.sender] += share;

        emit Mint(msg.sender, amountA, amountB, share);
    }

    // 2. Token A 换 Token B
    function swapAtoB(uint256 amountAIn) external returns (uint256 amountBOut) {
        require(amountAIn > 0, "Amount must be positive");
        uint256 amountInWithFee = (amountAIn * FEE_RATE) / 1000;
        
        // 公式: newResB = k / newResA => amountOut = oldResB - newResB
        amountBOut = reserveB - (reserveA * reserveB) / (reserveA + amountInWithFee);
        
        require(amountBOut > 0, "Insufficient liquidity");
        
        tokenA.transferFrom(msg.sender, address(this), amountAIn);
        tokenB.transfer(msg.sender, amountBOut);

        reserveA += amountAIn;
        reserveB -= amountBOut;

        emit Swap(msg.sender, amountAIn, amountBOut, address(tokenA), address(tokenB));
    }

    // 3. Token B 换 Token A (对称实现)
    function swapBtoA(uint256 amountBIn) external returns (uint256 amountAOut) {
        require(amountBIn > 0, "Amount must be positive");
        uint256 amountInWithFee = (amountBIn * FEE_RATE) / 1000;

        // 对称公式
        amountAOut = reserveA - (reserveA * reserveB) / (reserveB + amountInWithFee);
        
        require(amountAOut > 0, "Insufficient liquidity");
        
        tokenB.transferFrom(msg.sender, address(this), amountBIn);
        tokenA.transfer(msg.sender, amountAOut);

        reserveB += amountBIn;
        reserveA -= amountAOut;

        emit Swap(msg.sender, amountBIn, amountAOut, address(tokenB), address(tokenA));
    }

    // 4. 移除流动性
    function removeLiquidity(uint256 share) external {
        require(lpShares[msg.sender] >= share, "Insufficient shares");
        require(share > 0, "Share amount must be positive");

        uint256 amountA = (share * reserveA) / totalShares;
        uint256 amountB = (share * reserveB) / totalShares;

        lpShares[msg.sender] -= share;
        totalShares -= share;
        reserveA -= amountA;
        reserveB -= amountB;

        tokenA.transfer(msg.sender, amountA);
        tokenB.transfer(msg.sender, amountB);

        emit Burn(msg.sender, amountA, amountB, share);
    }

    // 辅助函数:获取价格
    function getPriceA() external view returns (uint256) {
        if (reserveA == 0) return 0;
        return (reserveB * 1e18) / reserveA;
    }
}

第二部分:Python 交互全流程详解

接下来,我们使用 Python (Web3.py) 来模拟整个 DEX 的生命周期。请确保你已经启动了本地测试节点(如 Anvil 或 Ganache)。

1. 环境准备与部署

这一步相当于搭建交易所的地基:

from web3 import Web3
from solcx import compile_source

# 连接本地节点
w3 = Web3(Web3.HTTPProvider("http://127.0.0.1:8545"))
assert w3.is_connected(), "Failed to connect to the node"

# 设置账户
ac_lp = w3.eth.accounts[0]   # 流动性提供者 (庄家)
user1 = w3.eth.accounts[1]   # 交易员 1
user2 = w3.eth.accounts[2]   # 交易员 2

# 编译合约
code_resource = """
// 前面定义的合约代码...
"""

compiled = compile_source(code_resource, output_values=['abi','bin'])

2. 部署代币和 DEX 合约

# 部署 MockERC20 代币
_, mock_erc20_compiled = compiled.popitem()
abi = mock_erc20_compiled["abi"]
bytecode = mock_erc20_compiled["bin"]
contract_mock_erc20 = w3.eth.contract(abi=abi, bytecode=bytecode)

# 部署 TokenA
tx_hash = contract_mock_erc20.constructor("TokenA", "TKA").transact({'from': user1})
tx_receipt = w3.eth.wait_for_transaction_receipt(tx_hash)
token_a = w3.eth.contract(address=tx_receipt.contractAddress, abi=abi)

# 部署 TokenB
tx_hash = contract_mock_erc20.constructor("TokenB", "TKB").transact({'from': user2})
tx_receipt = w3.eth.wait_for_transaction_receipt(tx_hash)
token_b = w3.eth.contract(address=tx_receipt.contractAddress, abi=abi)

# 部署 MiniSwap
_, dex_compiled = compiled.popitem()
abi = dex_compiled["abi"]
bytecode = dex_compiled["bin"]
contract_dex = w3.eth.contract(abi=abi, bytecode=bytecode)
dex_tx = contract_dex.constructor(token_a.address, token_b.address).transact({'from': ac_lp})
dex_addr = w3.eth.get_transaction_receipt(dex_tx).contractAddress
dex = w3.eth.contract(address=dex_addr, abi=abi)

3. 账户状态报告函数

def print_status(swap_contract, user_address=None):
    print("="*40)
    print("个人账户状态报告")
    if user_address:
        print(f"  - {token_a.functions.symbol().call()} 余额: {token_a.functions.balanceOf(user_address).call()/10**18:.2f}")
        print(f"  - {token_b.functions.symbol().call()} 余额: {token_b.functions.balanceOf(user_address).call()/10**18:.2f}")
    
    print("\n🔍 MINISWAP 实时状态报告")
    print("="*40)

    # 读取储备量
    resA = swap_contract.functions.reserveA().call()
    resB = swap_contract.functions.reserveB().call()
    
    # 获取代币符号
    symbolA = token_a.functions.symbol().call()
    symbolB = token_b.functions.symbol().call()
    
    # 计算恒定乘积 K
    k_value = resA * resB
    
    # 获取当前价格
    price_raw = swap_contract.functions.getPriceA().call()
    price_decimal = price_raw / 10**18
    
    # 读取总份额
    total_shares = swap_contract.functions.totalShares().call()

    print(f"📈 储备状态:")
    print(f"  - {symbolA} 储备: {resA/10**18:.2f}")
    print(f"  - {symbolB} 储备: {resB/10**18:.2f}")
    print(f"  - 恒定乘积 (K): {k_value/10**36:.2f}")
    
    print(f"\n💰 当前定价:")
    print(f"  - 1 {symbolA} = {price_decimal:.4f} {symbolB}")
    
    print(f"\n📊 流动性总量:")
    print(f"  - 总 LP 份额: {total_shares/10**18:.2f}")
    
    if user_address:
        user_share = swap_contract.functions.lpShares(user_address).call()
        share_percent = (user_share / total_shares * 100) if total_shares > 0 else 0
        print(f"\n👤 个人持仓 ({user_address[:6]}...):")
        print(f"  - 持有份额: {user_share/10**18:.2f}")
        print(f"  - 资金占比: {share_percent:.2f}%")
    
    print("="*40)

4. 分发代币与授权

# 给 LP 发钱
token_a.functions.mint(ac_lp, 10000 * 10**18).transact({'from': ac_lp})
token_b.functions.mint(ac_lp, 10000 * 10**18).transact({'from': ac_lp})

# 给 User1 发钱 (只给 Token A)
token_a.functions.mint(user1, 5000 * 10**18).transact({'from': ac_lp})

# 授权 DEX 花费用户的代币
token_a.functions.approve(dex_addr, 2**256 - 1).transact({'from': ac_lp})
token_b.functions.approve(dex_addr, 2**256 - 1).transact({'from': ac_lp})
token_a.functions.approve(dex_addr, 2**256 - 1).transact({'from': user1})

5. 添加初始流动性

LP 进场决定初始价格。假设 LP 放入 1000 A 和 2000 B,此时价格为 2 B/A:

amount_a_init = 1000 * 10**18
amount_b_init = 2000 * 10**18

print("--- 步骤 1: LP 添加初始流动性 ---")
dex.functions.addLiquidity(amount_a_init, amount_b_init).transact({'from': ac_lp})
print_status(dex, ac_lp)

--- 步骤 1: LP 添加初始流动性 ---
========================================
个人账户状态报告
- TKA 余额: 9000000000000000000000
- TKB 余额: 8000000000000000000000
========================================
🔍 MINISWAP 实时状态报告
========================================
📈 储备状态:
- TKA 储备: 1000000000000000000000
- TKB 储备: 2000000000000000000000
- 恒定乘积 (K): 2000000000000000000000000000000000000000000

💰 当前定价:
- 1 TKA = 2.0000 TKB

📊 流动性总量:
- 总 LP 份额 (Total Shares): 1000000000000000000000

👤 个人持仓 (0xf39F...):
- 持有份额: 1000000000000000000000
- 资金占比: 100.00%
========================================

6. 发生交易

User1 卖出 100 个 A:

print("\n--- 步骤 2: User1 用 100 A 购买 B ---")
swap_amount = 100 * 10**18
dex.functions.swapAtoB(swap_amount).transact({'from': user1})
print_status(dex, user1)

--- 步骤 2: User1 用 100 A 购买 B ---
========================================
个人账户状态报告
- TKA 余额: 4900000000000000000000
- TKB 余额: 181322178776029826317
========================================
🔍 MINISWAP 实时状态报告
========================================
📈 储备状态:
- TKA 储备: 1100000000000000000000
- TKB 储备: 1818677821223970173683
- 恒定乘积 (K): 2000545603346367191051300000000000000000000

💰 当前定价:
- 1 TKA = 1.6533 TKB

📊 流动性总量:
- 总 LP 份额 (Total Shares): 1000000000000000000000

👤 个人持仓 (0x7099...):
- 持有份额: 0
- 资金占比: 0.00%
========================================

7. 移除流动性

LP 退出并拿回资金:

print("\n--- 步骤 3: LP 移除流动性 ---")
lp_share = dex.functions.lpShares(ac_lp).call()
dex.functions.removeLiquidity(lp_share).transact({'from': ac_lp})
print_status(dex, ac_lp)

--- 步骤 3: LP 移除流动性 ---
========================================
个人账户状态报告
- TKA 余额: 10100000000000000000000
- TKB 余额: 9818677821223970173683
========================================
🔍 MINISWAP 实时状态报告
========================================
📈 储备状态:
- TKA 储备: 0
- TKB 储备: 0
- 恒定乘积 (K): 0

💰 当前定价:
- 1 TKA = 0.0000 TKB

📊 流动性总量:
- 总 LP 份额 (Total Shares): 0

👤 个人持仓 (0xf39F...):
- 持有份额: 0
- 资金占比: 0.00%
========================================


第三部分:关键概念解析

1. 价格计算公式

swapAtoB 中,核心价格计算逻辑是:

amountBOut = reserveB - (reserveA * reserveB) / (reserveA + amountInWithFee);

这实际上是恒定乘积公式的变形:

x * y = k
(x + Δx) * (y - Δy) = k
=> Δy = y - k/(x + Δx)

这个公式保证了交易前后 k 的值几乎不变(除了手续费导致的微增)。

2. 份额计算 (LP Tokens)

流动性提供者的份额计算遵循以下原则:

share = (amountA * totalShares) / reserveA;

这确保了公平性:如果你投入的资金占当前池子总资金的 10%,那么你就应该获得总份额的 10%,从而有权分走池子 10% 的资产和手续费收益。

3. 滑点 (Slippage) 分析

滑点是 DEX 交易中最重要的概念之一,它指预期价格与实际成交价格之间的差异。在 AMM 模型中,滑点主要由两部分组成:

  1. 价格冲击:由交易本身的规模引起的价格变动
  2. 交易延迟:在交易发出到确认期间,其他交易导致的价格变化

Python 实验:观察滑点变化

def calculate_slippage_study(swap_contract, amount_a_in):
    # 获取交易前状态
    resA = swap_contract.functions.reserveA().call()
    resB = swap_contract.functions.reserveB().call()
    
    # 计算理论价格
    current_price = resB / resA
    theoretical_b_out = amount_a_in * current_price
    
    # 模拟实际计算(考虑手续费)
    amount_in_with_fee = amount_a_in * 997 / 1000
    actual_b_out = resB - (resA * resB) / (resA + amount_in_with_fee)
    
    # 计算滑点
    slippage = (theoretical_b_out - actual_b_out) / theoretical_b_out * 100
    
    print(f"输入: {amount_a_in/10**18:>10.2f} TKA")
    print(f"理论应得: {theoretical_b_out/10**18:>10.2f} TKB")
    print(f"实际获得: {actual_b_out/10**18:>10.2f} TKB")
    print(f"滑点损失: {slippage:>10.2f}%")
    print("-" * 30)

# 实验:不同交易规模下的滑点
print("假设池子储备: A=10,000, B=10,000 (1 TKA = 1 TKB)")
test_amounts = [10, 100, 1000, 5000] # 交易规模逐渐增加
for amt in test_amounts:
    resA, resB = 10000 * 10**18, 10000 * 10**18
    current_price = resB / resA
    theoretical_b = amt * current_price
    in_fee = amt * 0.997
    actual_b = resB - (resA * resB) / (resA + in_fee)
    slippage = (theoretical_b - actual_b) / theoretical_b * 100
    print交易 {amt/10**18} A | 实际得 {actual_b/10**18:.2f} B | 滑点: {slippage:.2f}%")

实验结果示例:

    假设池子储备: A=10,000, B=10,000 (1 TKA = 1 TKB)
    交易 10 A | 实际得 9.96 B | 滑点: 0.40%
    交易 100 A | 实际得 98.72 B | 滑点: 1.28%
    交易 1000 A | 实际得 906.61 B | 滑点: 9.34%
    交易 5000 A | 实际得 3326.66 B | 滑点: 33.47%

4. 防止滑点损失的改进方案

在实际应用中,我们会添加 minAmountOut 参数来防止过大滑点:

function safeSwapAtoB(uint256 amountAIn, uint256 minAmountBOut) external returns (uint256 amountBOut) {
    uint256 amountInWithFee = (amountAIn * FEE_RATE) / 1000;
    amountBOut = reserveB - (reserveA * reserveB) / (reserveA + amountInWithFee);
    
    require(amountBOut >= minAmountBOut, "Slippage too high");
    
    tokenA.transferFrom(msg.sender, address(this), amountAIn);
    tokenB.transfer(msg.sender, amountBOut);

    reserveA += amountAIn;
    reserveB -= amountBOut;
    
    emit Swap(msg.sender, amountAIn, amountBOut, address(tokenA), address(tokenB));
}

第四部分:给学习者的实践建议

  1. 流动性深度:池子里的资金越多,滑点越小。这就是为什么 DEX 都在争夺 TVL(总锁定价值)。

  2. 交易策略

    • 大额交易分批次进行
    • 使用交易聚合器(如 1inch)寻找最佳价格
    • 设置合理的滑点容忍度(通常 0.5%-1%)
  3. 安全实践

    • 永远不要设置 minAmountOut = 0
    • 在前端实现滑点预警
    • 考虑使用时间加权平均价格(TWAP)策略
  4. 进一步学习

    • 研究不同的 AMM 变体(如 Curve 的稳定币模型)
    • 学习流动性挖矿机制
    • 探索跨链 DEX 的实现

通过理解这些核心概念和实践,你将能够更好地参与和开发去中心化金融应用。