txpool主要用来存放当前提交的等待写入区块的交易,有远端和本地的。

    txpool里面的交易分为两种,

    1. 提交但是还不能执行的,放在queue里面等待能够执行(比如说nonce太高)。
    2. 等待执行的,放在pending里面等待执行。

    从txpool的测试案例来看,txpool主要功能有下面几点。

    1. 交易验证的功能,包括余额不足,Gas不足,Nonce太低, value值是合法的,不能为负数。
    2. 能够缓存Nonce比当前本地账号状态高的交易。 存放在queue字段。 如果是能够执行的交易存放在pending字段
    3. 相同用户的相同Nonce的交易只会保留一个GasPrice最大的那个。 其他的插入不成功。
    4. 如果账号没有钱了,那么queue和pending中对应账号的交易会被删除。
    5. 如果账号的余额小于一些交易的额度,那么对应的交易会被删除,同时有效的交易会从pending移动到queue里面。防止被广播。
    6. txPool支持一些限制PriceLimit(remove的最低GasPrice限制),PriceBump(替换相同Nonce的交易的价格的百分比) AccountSlots(每个账户的pending的槽位的最小值) GlobalSlots(全局pending队列的最大值)AccountQueue(每个账户的queueing的槽位的最小值) GlobalQueue(全局queueing的最大值) Lifetime(在queue队列的最长等待时间)
    7. 有限的资源情况下按照GasPrice的优先级进行替换。
    8. 本地的交易会使用journal的功能存放在磁盘上,重启之后会重新导入。 远程的交易不会。

    数据结构

    1. // TxPool contains all currently known transactions. Transactions
    2. // enter the pool when they are received from the network or submitted
    3. // locally. They exit the pool when they are included in the blockchain.
    4. // TxPool 包含了当前知的交易, 当前网络接收到交易,或者本地提交的交易会加入到TxPool。
    5. // 当他们已经被添加到区块链的时候被移除。
    6. // The pool separates processable transactions (which can be applied to the
    7. // current state) and future transactions. Transactions move between those
    8. // two states over time as they are received and processed.
    9. // TxPool分为可执行的交易(可以应用到当前的状态)和未来的交易。 交易在这两种状态之间转换,
    10. type TxPool struct {
    11. config TxPoolConfig
    12. chainconfig *params.ChainConfig
    13. chain blockChain
    14. gasPrice *big.Int //最低的GasPrice限制
    15. txFeed event.Feed //通过txFeed来订阅TxPool的消息
    16. scope event.SubscriptionScope
    17. chainHeadCh chan ChainHeadEvent // 订阅了区块头的消息,当有了新的区块头生成的时候会在这里收到通知
    18. chainHeadSub event.Subscription // 区块头消息的订阅器。
    19. signer types.Signer // 封装了事务签名处理。
    20. mu sync.RWMutex
    21. currentState *state.StateDB // Current state in the blockchain head
    22. pendingState *state.ManagedState // Pending state tracking virtual nonces
    23. currentMaxGas *big.Int // Current gas limit for transaction caps 目前交易上限的GasLimit
    24. locals *accountSet // Set of local transaction to exepmt from evicion rules 本地交易免除驱逐规则
    25. journal *txJournal // Journal of local transaction to back up to disk 本地交易会写入磁盘
    26. pending map[common.Address]*txList // All currently processable transactions 所有当前可以处理的交易
    27. queue map[common.Address]*txList // Queued but non-processable transactions 当前还不能处理的交易
    28. beats map[common.Address]time.Time // Last heartbeat from each known account 每一个已知账号的最后一次心跳信息的时间
    29. all map[common.Hash]*types.Transaction // All transactions to allow lookups 可以查找到所有交易
    30. priced *txPricedList // All transactions sorted by price 按照价格排序的交易
    31. wg sync.WaitGroup // for shutdown sync
    32. homestead bool // 家园版本
    33. }

    构建

    1. // NewTxPool creates a new transaction pool to gather, sort and filter inbound
    2. // trnsactions from the network.
    3. func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain blockChain) *TxPool {
    4. // Sanitize the input to ensure no vulnerable gas prices are set
    5. config = (&config).sanitize()
    6. // Create the transaction pool with its initial settings
    7. pool := &TxPool{
    8. config: config,
    9. chainconfig: chainconfig,
    10. chain: chain,
    11. signer: types.NewEIP155Signer(chainconfig.ChainId),
    12. pending: make(map[common.Address]*txList),
    13. queue: make(map[common.Address]*txList),
    14. beats: make(map[common.Address]time.Time),
    15. all: make(map[common.Hash]*types.Transaction),
    16. chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize),
    17. gasPrice: new(big.Int).SetUint64(config.PriceLimit),
    18. }
    19. pool.locals = newAccountSet(pool.signer)
    20. pool.priced = newTxPricedList(&pool.all)
    21. pool.reset(nil, chain.CurrentBlock().Header())
    22. // If local transactions and journaling is enabled, load from disk
    23. // 如果本地交易被允许,而且配置的Journal目录不为空,那么从指定的目录加载日志.
    24. // 然后rotate交易日志. 因为老的交易可能已经失效了, 所以调用add方法之后再把被接收的交易写入日志.
    25. //
    26. if !config.NoLocals && config.Journal != "" {
    27. pool.journal = newTxJournal(config.Journal)
    28. if err := pool.journal.load(pool.AddLocal); err != nil {
    29. log.Warn("Failed to load transaction journal", "err", err)
    30. }
    31. if err := pool.journal.rotate(pool.local()); err != nil {
    32. log.Warn("Failed to rotate transaction journal", "err", err)
    33. }
    34. }
    35. // Subscribe events from blockchain 从区块链订阅事件。
    36. pool.chainHeadSub = pool.chain.SubscribeChainHeadEvent(pool.chainHeadCh)
    37. // Start the event loop and return
    38. pool.wg.Add(1)
    39. go pool.loop()
    40. return pool
    41. }

    reset方法检索区块链的当前状态并且确保事务池的内容关于当前的区块链状态是有效的。主要功能包括:

    1. 因为更换了区块头,所以原有的区块中有一些交易因为区块头的更换而作废,这部分交易需要重新加入到txPool里面等待插入新的区块
    2. 生成新的currentState和pendingState
    3. 因为状态的改变。将pending中的部分交易移到queue里面
    4. 因为状态的改变,将queue里面的交易移入到pending里面。

    reset代码

    1. // reset retrieves the current state of the blockchain and ensures the content
    2. // of the transaction pool is valid with regard to the chain state.
    3. func (pool *TxPool) reset(oldHead, newHead *types.Header) {
    4. // If we're reorging an old state, reinject all dropped transactions
    5. var reinject types.Transactions
    6. if oldHead != nil && oldHead.Hash() != newHead.ParentHash {
    7. // If the reorg is too deep, avoid doing it (will happen during fast sync)
    8. oldNum := oldHead.Number.Uint64()
    9. newNum := newHead.Number.Uint64()
    10. if depth := uint64(math.Abs(float64(oldNum) - float64(newNum))); depth > 64 { //如果老的头和新的头差距太远, 那么取消重建
    11. log.Warn("Skipping deep transaction reorg", "depth", depth)
    12. } else {
    13. // Reorg seems shallow enough to pull in all transactions into memory
    14. var discarded, included types.Transactions
    15. var (
    16. rem = pool.chain.GetBlock(oldHead.Hash(), oldHead.Number.Uint64())
    17. add = pool.chain.GetBlock(newHead.Hash(), newHead.Number.Uint64())
    18. )
    19. // 如果老的高度大于新的.那么需要把多的全部删除.
    20. for rem.NumberU64() > add.NumberU64() {
    21. discarded = append(discarded, rem.Transactions()...)
    22. if rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil {
    23. log.Error("Unrooted old chain seen by tx pool", "block", oldHead.Number, "hash", oldHead.Hash())
    24. return
    25. }
    26. }
    27. // 如果新的高度大于老的, 那么需要增加.
    28. for add.NumberU64() > rem.NumberU64() {
    29. included = append(included, add.Transactions()...)
    30. if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil {
    31. log.Error("Unrooted new chain seen by tx pool", "block", newHead.Number, "hash", newHead.Hash())
    32. return
    33. }
    34. }
    35. // 高度相同了.如果hash不同,那么需要往后找,一直找到他们相同hash根的节点.
    36. for rem.Hash() != add.Hash() {
    37. discarded = append(discarded, rem.Transactions()...)
    38. if rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil {
    39. log.Error("Unrooted old chain seen by tx pool", "block", oldHead.Number, "hash", oldHead.Hash())
    40. return
    41. }
    42. included = append(included, add.Transactions()...)
    43. if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil {
    44. log.Error("Unrooted new chain seen by tx pool", "block", newHead.Number, "hash", newHead.Hash())
    45. return
    46. }
    47. }
    48. // 找出所有存在discard里面,但是不在included里面的值.
    49. // 需要等下把这些交易重新插入到pool里面。
    50. reinject = types.TxDifference(discarded, included)
    51. }
    52. }
    53. // Initialize the internal state to the current head
    54. if newHead == nil {
    55. newHead = pool.chain.CurrentBlock().Header() // Special case during testing
    56. }
    57. statedb, err := pool.chain.StateAt(newHead.Root)
    58. if err != nil {
    59. log.Error("Failed to reset txpool state", "err", err)
    60. return
    61. }
    62. pool.currentState = statedb
    63. pool.pendingState = state.ManageState(statedb)
    64. pool.currentMaxGas = newHead.GasLimit
    65. // Inject any transactions discarded due to reorgs
    66. log.Debug("Reinjecting stale transactions", "count", len(reinject))
    67. pool.addTxsLocked(reinject, false)
    68. // validate the pool of pending transactions, this will remove
    69. // any transactions that have been included in the block or
    70. // have been invalidated because of another transaction (e.g.
    71. // higher gas price)
    72. // 验证pending transaction池里面的交易, 会移除所有已经存在区块链里面的交易,或者是因为其他交易导致不可用的交易(比如有一个更高的gasPrice)
    73. // demote 降级 将pending中的一些交易降级到queue里面。
    74. pool.demoteUnexecutables()
    75. // Update all accounts to the latest known pending nonce
    76. // 根据pending队列的nonce更新所有账号的nonce
    77. for addr, list := range pool.pending {
    78. txs := list.Flatten() // Heavy but will be cached and is needed by the miner anyway
    79. pool.pendingState.SetNonce(addr, txs[len(txs)-1].Nonce()+1)
    80. }
    81. // Check the queue and move transactions over to the pending if possible
    82. // or remove those that have become invalid
    83. // 检查队列并尽可能地将事务移到pending,或删除那些已经失效的事务
    84. // promote 升级
    85. pool.promoteExecutables(nil)
    86. }

    addTx

    1. // addTx enqueues a single transaction into the pool if it is valid.
    2. func (pool *TxPool) addTx(tx *types.Transaction, local bool) error {
    3. pool.mu.Lock()
    4. defer pool.mu.Unlock()
    5. // Try to inject the transaction and update any state
    6. replace, err := pool.add(tx, local)
    7. if err != nil {
    8. return err
    9. }
    10. // If we added a new transaction, run promotion checks and return
    11. if !replace {
    12. from, _ := types.Sender(pool.signer, tx) // already validated
    13. pool.promoteExecutables([]common.Address{from})
    14. }
    15. return nil
    16. }

    addTxsLocked

    1. // addTxsLocked attempts to queue a batch of transactions if they are valid,
    2. // whilst assuming the transaction pool lock is already held.
    3. // addTxsLocked尝试把有效的交易放入queue队列,调用这个函数的时候假设已经获取到锁
    4. func (pool *TxPool) addTxsLocked(txs []*types.Transaction, local bool) error {
    5. // Add the batch of transaction, tracking the accepted ones
    6. dirty := make(map[common.Address]struct{})
    7. for _, tx := range txs {
    8. if replace, err := pool.add(tx, local); err == nil {
    9. if !replace { // replace 是替换的意思, 如果不是替换,那么就说明状态有更新,有可以下一步处理的可能。
    10. from, _ := types.Sender(pool.signer, tx) // already validated
    11. dirty[from] = struct{}{}
    12. }
    13. }
    14. }
    15. // Only reprocess the internal state if something was actually added
    16. if len(dirty) > 0 {
    17. addrs := make([]common.Address, 0, len(dirty))
    18. for addr, _ := range dirty {
    19. addrs = append(addrs, addr)
    20. }
    21. // 传入了被修改的地址,
    22. pool.promoteExecutables(addrs)
    23. }
    24. return nil
    25. }

    demoteUnexecutables 从pending删除无效的或者是已经处理过的交易,其他的不可执行的交易会被移动到future queue中。

    1. // demoteUnexecutables removes invalid and processed transactions from the pools
    2. // executable/pending queue and any subsequent transactions that become unexecutable
    3. // are moved back into the future queue.
    4. func (pool *TxPool) demoteUnexecutables() {
    5. // Iterate over all accounts and demote any non-executable transactions
    6. for addr, list := range pool.pending {
    7. nonce := pool.currentState.GetNonce(addr)
    8. // Drop all transactions that are deemed too old (low nonce)
    9. // 删除所有小于当前地址的nonce的交易,并从pool.all删除。
    10. for _, tx := range list.Forward(nonce) {
    11. hash := tx.Hash()
    12. log.Trace("Removed old pending transaction", "hash", hash)
    13. delete(pool.all, hash)
    14. pool.priced.Removed()
    15. }
    16. // Drop all transactions that are too costly (low balance or out of gas), and queue any invalids back for later
    17. // 删除所有的太昂贵的交易。 用户的balance可能不够用。或者是out of gas
    18. drops, invalids := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas)
    19. for _, tx := range drops {
    20. hash := tx.Hash()
    21. log.Trace("Removed unpayable pending transaction", "hash", hash)
    22. delete(pool.all, hash)
    23. pool.priced.Removed()
    24. pendingNofundsCounter.Inc(1)
    25. }
    26. for _, tx := range invalids {
    27. hash := tx.Hash()
    28. log.Trace("Demoting pending transaction", "hash", hash)
    29. pool.enqueueTx(hash, tx)
    30. }
    31. // If there's a gap in front, warn (should never happen) and postpone all transactions
    32. // 如果存在一个空洞(nonce空洞), 那么需要把所有的交易都放入future queue。
    33. // 这一步确实应该不可能发生,因为Filter已经把 invalids的都处理了。 应该不存在invalids的交易,也就是不存在空洞的。
    34. if list.Len() > 0 && list.txs.Get(nonce) == nil {
    35. for _, tx := range list.Cap(0) {
    36. hash := tx.Hash()
    37. log.Error("Demoting invalidated transaction", "hash", hash)
    38. pool.enqueueTx(hash, tx)
    39. }
    40. }
    41. // Delete the entire queue entry if it became empty.
    42. if list.Empty() {
    43. delete(pool.pending, addr)
    44. delete(pool.beats, addr)
    45. }
    46. }
    47. }

    enqueueTx 把一个新的交易插入到future queue。 这个方法假设已经获取了池的锁。

    1. // enqueueTx inserts a new transaction into the non-executable transaction queue.
    2. //
    3. // Note, this method assumes the pool lock is held!
    4. func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction) (bool, error) {
    5. // Try to insert the transaction into the future queue
    6. from, _ := types.Sender(pool.signer, tx) // already validated
    7. if pool.queue[from] == nil {
    8. pool.queue[from] = newTxList(false)
    9. }
    10. inserted, old := pool.queue[from].Add(tx, pool.config.PriceBump)
    11. if !inserted {
    12. // An older transaction was better, discard this
    13. queuedDiscardCounter.Inc(1)
    14. return false, ErrReplaceUnderpriced
    15. }
    16. // Discard any previous transaction and mark this
    17. if old != nil {
    18. delete(pool.all, old.Hash())
    19. pool.priced.Removed()
    20. queuedReplaceCounter.Inc(1)
    21. }
    22. pool.all[hash] = tx
    23. pool.priced.Put(tx)
    24. return old != nil, nil
    25. }

    promoteExecutables方法把 已经变得可以执行的交易从future queue 插入到pending queue。通过这个处理过程,所有的无效的交易(nonce太低,余额不足)会被删除。

    1. // promoteExecutables moves transactions that have become processable from the
    2. // future queue to the set of pending transactions. During this process, all
    3. // invalidated transactions (low nonce, low balance) are deleted.
    4. func (pool *TxPool) promoteExecutables(accounts []common.Address) {
    5. // Gather all the accounts potentially needing updates
    6. // accounts存储了所有潜在需要更新的账户。 如果账户传入为nil,代表所有已知的账户。
    7. if accounts == nil {
    8. accounts = make([]common.Address, 0, len(pool.queue))
    9. for addr, _ := range pool.queue {
    10. accounts = append(accounts, addr)
    11. }
    12. }
    13. // Iterate over all accounts and promote any executable transactions
    14. for _, addr := range accounts {
    15. list := pool.queue[addr]
    16. if list == nil {
    17. continue // Just in case someone calls with a non existing account
    18. }
    19. // Drop all transactions that are deemed too old (low nonce)
    20. // 删除所有的nonce太低的交易
    21. for _, tx := range list.Forward(pool.currentState.GetNonce(addr)) {
    22. hash := tx.Hash()
    23. log.Trace("Removed old queued transaction", "hash", hash)
    24. delete(pool.all, hash)
    25. pool.priced.Removed()
    26. }
    27. // Drop all transactions that are too costly (low balance or out of gas)
    28. // 删除所有余额不足的交易。
    29. drops, _ := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas)
    30. for _, tx := range drops {
    31. hash := tx.Hash()
    32. log.Trace("Removed unpayable queued transaction", "hash", hash)
    33. delete(pool.all, hash)
    34. pool.priced.Removed()
    35. queuedNofundsCounter.Inc(1)
    36. }
    37. // Gather all executable transactions and promote them
    38. // 得到所有的可以执行的交易,并promoteTx加入pending
    39. for _, tx := range list.Ready(pool.pendingState.GetNonce(addr)) {
    40. hash := tx.Hash()
    41. log.Trace("Promoting queued transaction", "hash", hash)
    42. pool.promoteTx(addr, hash, tx)
    43. }
    44. // Drop all transactions over the allowed limit
    45. // 删除所有超过限制的交易。
    46. if !pool.locals.contains(addr) {
    47. for _, tx := range list.Cap(int(pool.config.AccountQueue)) {
    48. hash := tx.Hash()
    49. delete(pool.all, hash)
    50. pool.priced.Removed()
    51. queuedRateLimitCounter.Inc(1)
    52. log.Trace("Removed cap-exceeding queued transaction", "hash", hash)
    53. }
    54. }
    55. // Delete the entire queue entry if it became empty.
    56. if list.Empty() {
    57. delete(pool.queue, addr)
    58. }
    59. }
    60. // If the pending limit is overflown, start equalizing allowances
    61. pending := uint64(0)
    62. for _, list := range pool.pending {
    63. pending += uint64(list.Len())
    64. }
    65. // 如果pending的总数超过系统的配置。
    66. if pending > pool.config.GlobalSlots {
    67. pendingBeforeCap := pending
    68. // Assemble a spam order to penalize large transactors first
    69. spammers := prque.New()
    70. for addr, list := range pool.pending {
    71. // Only evict transactions from high rollers
    72. // 首先把所有大于AccountSlots最小值的账户记录下来, 会从这些账户里面剔除一些交易。
    73. // 注意spammers是一个优先级队列,也就是说是按照交易的多少从大到小排序的。
    74. if !pool.locals.contains(addr) && uint64(list.Len()) > pool.config.AccountSlots {
    75. spammers.Push(addr, float32(list.Len()))
    76. }
    77. }
    78. // Gradually drop transactions from offenders
    79. offenders := []common.Address{}
    80. for pending > pool.config.GlobalSlots && !spammers.Empty() {
    81. /*
    82. 模拟一下offenders队列的账户交易数量的变化情况。
    83. 第一次循环 [10] 循环结束 [10]
    84. 第二次循环 [10, 9] 循环结束 [9,9]
    85. 第三次循环 [9, 9, 7] 循环结束 [7, 7, 7]
    86. 第四次循环 [7, 7 , 7 ,2] 循环结束 [2, 2 ,2, 2]
    87. */
    88. // Retrieve the next offender if not local address
    89. offender, _ := spammers.Pop()
    90. offenders = append(offenders, offender.(common.Address))
    91. // Equalize balances until all the same or below threshold
    92. if len(offenders) > 1 { // 第一次进入这个循环的时候, offenders队列里面有交易数量最大的两个账户
    93. // Calculate the equalization threshold for all current offenders
    94. // 把最后加入的账户的交易数量当成本次的阈值
    95. threshold := pool.pending[offender.(common.Address)].Len()
    96. // Iteratively reduce all offenders until below limit or threshold reached
    97. // 遍历直到pending有效,或者是倒数第二个的交易数量等于最后一个的交易数量
    98. for pending > pool.config.GlobalSlots && pool.pending[offenders[len(offenders)-2]].Len() > threshold {
    99. // 遍历除了最后一个账户以外的所有账户, 把他们的交易数量减去1.
    100. for i := 0; i < len(offenders)-1; i++ {
    101. list := pool.pending[offenders[i]]
    102. for _, tx := range list.Cap(list.Len() - 1) {
    103. // Drop the transaction from the global pools too
    104. hash := tx.Hash()
    105. delete(pool.all, hash)
    106. pool.priced.Removed()
    107. // Update the account nonce to the dropped transaction
    108. if nonce := tx.Nonce(); pool.pendingState.GetNonce(offenders[i]) > nonce {
    109. pool.pendingState.SetNonce(offenders[i], nonce)
    110. }
    111. log.Trace("Removed fairness-exceeding pending transaction", "hash", hash)
    112. }
    113. pending--
    114. }
    115. }
    116. }
    117. }
    118. // If still above threshold, reduce to limit or min allowance
    119. // 经过上面的循环,所有的超过AccountSlots的账户的交易数量都变成了之前的最小值。
    120. // 如果还是超过阈值,那么在继续从offenders里面每次删除一个。
    121. if pending > pool.config.GlobalSlots && len(offenders) > 0 {
    122. for pending > pool.config.GlobalSlots && uint64(pool.pending[offenders[len(offenders)-1]].Len()) > pool.config.AccountSlots {
    123. for _, addr := range offenders {
    124. list := pool.pending[addr]
    125. for _, tx := range list.Cap(list.Len() - 1) {
    126. // Drop the transaction from the global pools too
    127. hash := tx.Hash()
    128. delete(pool.all, hash)
    129. pool.priced.Removed()
    130. // Update the account nonce to the dropped transaction
    131. if nonce := tx.Nonce(); pool.pendingState.GetNonce(addr) > nonce {
    132. pool.pendingState.SetNonce(addr, nonce)
    133. }
    134. log.Trace("Removed fairness-exceeding pending transaction", "hash", hash)
    135. }
    136. pending--
    137. }
    138. }
    139. }
    140. pendingRateLimitCounter.Inc(int64(pendingBeforeCap - pending))
    141. } //end if pending > pool.config.GlobalSlots {
    142. // If we've queued more transactions than the hard limit, drop oldest ones
    143. // 我们处理了pending的限制, 下面需要处理future queue的限制了。
    144. queued := uint64(0)
    145. for _, list := range pool.queue {
    146. queued += uint64(list.Len())
    147. }
    148. if queued > pool.config.GlobalQueue {
    149. // Sort all accounts with queued transactions by heartbeat
    150. addresses := make(addresssByHeartbeat, 0, len(pool.queue))
    151. for addr := range pool.queue {
    152. if !pool.locals.contains(addr) { // don't drop locals
    153. addresses = append(addresses, addressByHeartbeat{addr, pool.beats[addr]})
    154. }
    155. }
    156. sort.Sort(addresses)
    157. // Drop transactions until the total is below the limit or only locals remain
    158. // 从后往前,也就是心跳越新的就越会被删除。
    159. for drop := queued - pool.config.GlobalQueue; drop > 0 && len(addresses) > 0; {
    160. addr := addresses[len(addresses)-1]
    161. list := pool.queue[addr.address]
    162. addresses = addresses[:len(addresses)-1]
    163. // Drop all transactions if they are less than the overflow
    164. if size := uint64(list.Len()); size <= drop {
    165. for _, tx := range list.Flatten() {
    166. pool.removeTx(tx.Hash())
    167. }
    168. drop -= size
    169. queuedRateLimitCounter.Inc(int64(size))
    170. continue
    171. }
    172. // Otherwise drop only last few transactions
    173. txs := list.Flatten()
    174. for i := len(txs) - 1; i >= 0 && drop > 0; i-- {
    175. pool.removeTx(txs[i].Hash())
    176. drop--
    177. queuedRateLimitCounter.Inc(1)
    178. }
    179. }
    180. }
    181. }

    promoteTx把某个交易加入到pending 队列. 这个方法假设已经获取到了锁.

    1. // promoteTx adds a transaction to the pending (processable) list of transactions.
    2. //
    3. // Note, this method assumes the pool lock is held!
    4. func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.Transaction) {
    5. // Try to insert the transaction into the pending queue
    6. if pool.pending[addr] == nil {
    7. pool.pending[addr] = newTxList(true)
    8. }
    9. list := pool.pending[addr]
    10. inserted, old := list.Add(tx, pool.config.PriceBump)
    11. if !inserted { // 如果不能替换, 已经存在一个老的交易了. 删除.
    12. // An older transaction was better, discard this
    13. delete(pool.all, hash)
    14. pool.priced.Removed()
    15. pendingDiscardCounter.Inc(1)
    16. return
    17. }
    18. // Otherwise discard any previous transaction and mark this
    19. if old != nil {
    20. delete(pool.all, old.Hash())
    21. pool.priced.Removed()
    22. pendingReplaceCounter.Inc(1)
    23. }
    24. // Failsafe to work around direct pending inserts (tests)
    25. if pool.all[hash] == nil {
    26. pool.all[hash] = tx
    27. pool.priced.Put(tx)
    28. }
    29. // Set the potentially new pending nonce and notify any subsystems of the new tx
    30. // 把交易加入到队列,并发送消息告诉所有的订阅者, 这个订阅者在eth协议内部. 会接收这个消息并把这个消息通过网路广播出去.
    31. pool.beats[addr] = time.Now()
    32. pool.pendingState.SetNonce(addr, tx.Nonce()+1)
    33. go pool.txFeed.Send(TxPreEvent{tx})
    34. }

    removeTx,删除某个交易, 并把所有后续的交易移动到future queue

    1. // removeTx removes a single transaction from the queue, moving all subsequent
    2. // transactions back to the future queue.
    3. func (pool *TxPool) removeTx(hash common.Hash) {
    4. // Fetch the transaction we wish to delete
    5. tx, ok := pool.all[hash]
    6. if !ok {
    7. return
    8. }
    9. addr, _ := types.Sender(pool.signer, tx) // already validated during insertion
    10. // Remove it from the list of known transactions
    11. delete(pool.all, hash)
    12. pool.priced.Removed()
    13. // Remove the transaction from the pending lists and reset the account nonce
    14. // 把交易从pending删除, 并把因为这个交易的删除而变得无效的交易放到future queue
    15. // 然后更新pendingState的状态
    16. if pending := pool.pending[addr]; pending != nil {
    17. if removed, invalids := pending.Remove(tx); removed {
    18. // If no more transactions are left, remove the list
    19. if pending.Empty() {
    20. delete(pool.pending, addr)
    21. delete(pool.beats, addr)
    22. } else {
    23. // Otherwise postpone any invalidated transactions
    24. for _, tx := range invalids {
    25. pool.enqueueTx(tx.Hash(), tx)
    26. }
    27. }
    28. // Update the account nonce if needed
    29. if nonce := tx.Nonce(); pool.pendingState.GetNonce(addr) > nonce {
    30. pool.pendingState.SetNonce(addr, nonce)
    31. }
    32. return
    33. }
    34. }
    35. // Transaction is in the future queue
    36. // 把交易从future queue删除.
    37. if future := pool.queue[addr]; future != nil {
    38. future.Remove(tx)
    39. if future.Empty() {
    40. delete(pool.queue, addr)
    41. }
    42. }
    43. }

    loop是txPool的一个goroutine.也是主要的事件循环.等待和响应外部区块链事件以及各种报告和交易驱逐事件。

    1. // loop is the transaction pool's main event loop, waiting for and reacting to
    2. // outside blockchain events as well as for various reporting and transaction
    3. // eviction events.
    4. func (pool *TxPool) loop() {
    5. defer pool.wg.Done()
    6. // Start the stats reporting and transaction eviction tickers
    7. var prevPending, prevQueued, prevStales int
    8. report := time.NewTicker(statsReportInterval)
    9. defer report.Stop()
    10. evict := time.NewTicker(evictionInterval)
    11. defer evict.Stop()
    12. journal := time.NewTicker(pool.config.Rejournal)
    13. defer journal.Stop()
    14. // Track the previous head headers for transaction reorgs
    15. head := pool.chain.CurrentBlock()
    16. // Keep waiting for and reacting to the various events
    17. for {
    18. select {
    19. // Handle ChainHeadEvent
    20. // 监听到区块头的事件, 获取到新的区块头.
    21. // 调用reset方法
    22. case ev := <-pool.chainHeadCh:
    23. if ev.Block != nil {
    24. pool.mu.Lock()
    25. if pool.chainconfig.IsHomestead(ev.Block.Number()) {
    26. pool.homestead = true
    27. }
    28. pool.reset(head.Header(), ev.Block.Header())
    29. head = ev.Block
    30. pool.mu.Unlock()
    31. }
    32. // Be unsubscribed due to system stopped
    33. case <-pool.chainHeadSub.Err():
    34. return
    35. // Handle stats reporting ticks 报告就是打印了一些日志
    36. case <-report.C:
    37. pool.mu.RLock()
    38. pending, queued := pool.stats()
    39. stales := pool.priced.stales
    40. pool.mu.RUnlock()
    41. if pending != prevPending || queued != prevQueued || stales != prevStales {
    42. log.Debug("Transaction pool status report", "executable", pending, "queued", queued, "stales", stales)
    43. prevPending, prevQueued, prevStales = pending, queued, stales
    44. }
    45. // Handle inactive account transaction eviction
    46. // 处理超时的交易信息,
    47. case <-evict.C:
    48. pool.mu.Lock()
    49. for addr := range pool.queue {
    50. // Skip local transactions from the eviction mechanism
    51. if pool.locals.contains(addr) {
    52. continue
    53. }
    54. // Any non-locals old enough should be removed
    55. if time.Since(pool.beats[addr]) > pool.config.Lifetime {
    56. for _, tx := range pool.queue[addr].Flatten() {
    57. pool.removeTx(tx.Hash())
    58. }
    59. }
    60. }
    61. pool.mu.Unlock()
    62. // Handle local transaction journal rotation 处理定时写交易日志的信息.
    63. case <-journal.C:
    64. if pool.journal != nil {
    65. pool.mu.Lock()
    66. if err := pool.journal.rotate(pool.local()); err != nil {
    67. log.Warn("Failed to rotate local tx journal", "err", err)
    68. }
    69. pool.mu.Unlock()
    70. }
    71. }
    72. }
    73. }

    add 方法, 验证交易并将其插入到future queue. 如果这个交易是替换了当前存在的某个交易,那么会返回之前的那个交易,这样外部就不用调用promote方法. 如果某个新增加的交易被标记为local, 那么它的发送账户会进入白名单,这个账户的关联的交易将不会因为价格的限制或者其他的一些限制被删除.

    1. // add validates a transaction and inserts it into the non-executable queue for
    2. // later pending promotion and execution. If the transaction is a replacement for
    3. // an already pending or queued one, it overwrites the previous and returns this
    4. // so outer code doesn't uselessly call promote.
    5. //
    6. // If a newly added transaction is marked as local, its sending account will be
    7. // whitelisted, preventing any associated transaction from being dropped out of
    8. // the pool due to pricing constraints.
    9. func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) {
    10. // If the transaction is already known, discard it
    11. hash := tx.Hash()
    12. if pool.all[hash] != nil {
    13. log.Trace("Discarding already known transaction", "hash", hash)
    14. return false, fmt.Errorf("known transaction: %x", hash)
    15. }
    16. // If the transaction fails basic validation, discard it
    17. // 如果交易不能通过基本的验证,那么丢弃它
    18. if err := pool.validateTx(tx, local); err != nil {
    19. log.Trace("Discarding invalid transaction", "hash", hash, "err", err)
    20. invalidTxCounter.Inc(1)
    21. return false, err
    22. }
    23. // If the transaction pool is full, discard underpriced transactions
    24. // 如果交易池满了. 那么删除一些低价的交易.
    25. if uint64(len(pool.all)) >= pool.config.GlobalSlots+pool.config.GlobalQueue {
    26. // If the new transaction is underpriced, don't accept it
    27. // 如果新交易本身就是低价的.那么不接收它
    28. if pool.priced.Underpriced(tx, pool.locals) {
    29. log.Trace("Discarding underpriced transaction", "hash", hash, "price", tx.GasPrice())
    30. underpricedTxCounter.Inc(1)
    31. return false, ErrUnderpriced
    32. }
    33. // New transaction is better than our worse ones, make room for it
    34. // 否则删除低价值的给他腾空间.
    35. drop := pool.priced.Discard(len(pool.all)-int(pool.config.GlobalSlots+pool.config.GlobalQueue-1), pool.locals)
    36. for _, tx := range drop {
    37. log.Trace("Discarding freshly underpriced transaction", "hash", tx.Hash(), "price", tx.GasPrice())
    38. underpricedTxCounter.Inc(1)
    39. pool.removeTx(tx.Hash())
    40. }
    41. }
    42. // If the transaction is replacing an already pending one, do directly
    43. from, _ := types.Sender(pool.signer, tx) // already validated
    44. if list := pool.pending[from]; list != nil && list.Overlaps(tx) {
    45. // Nonce already pending, check if required price bump is met
    46. // 如果交易对应的Nonce已经在pending队列了,那么产看是否能够替换.
    47. inserted, old := list.Add(tx, pool.config.PriceBump)
    48. if !inserted {
    49. pendingDiscardCounter.Inc(1)
    50. return false, ErrReplaceUnderpriced
    51. }
    52. // New transaction is better, replace old one
    53. if old != nil {
    54. delete(pool.all, old.Hash())
    55. pool.priced.Removed()
    56. pendingReplaceCounter.Inc(1)
    57. }
    58. pool.all[tx.Hash()] = tx
    59. pool.priced.Put(tx)
    60. pool.journalTx(from, tx)
    61. log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To())
    62. return old != nil, nil
    63. }
    64. // New transaction isn't replacing a pending one, push into queue
    65. // 新交易不能替换pending里面的任意一个交易,那么把他push到futuren 队列里面.
    66. replace, err := pool.enqueueTx(hash, tx)
    67. if err != nil {
    68. return false, err
    69. }
    70. // Mark local addresses and journal local transactions
    71. if local {
    72. pool.locals.add(from)
    73. }
    74. // 如果是本地的交易,会被记录进入journalTx
    75. pool.journalTx(from, tx)
    76. log.Trace("Pooled new future transaction", "hash", hash, "from", from, "to", tx.To())
    77. return replace, nil
    78. }

    validateTx 使用一致性规则来检查一个交易是否有效,并采用本地节点的一些启发式的限制.

    1. // validateTx checks whether a transaction is valid according to the consensus
    2. // rules and adheres to some heuristic limits of the local node (price and size).
    3. func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
    4. // Heuristic limit, reject transactions over 32KB to prevent DOS attacks
    5. if tx.Size() > 32*1024 {
    6. return ErrOversizedData
    7. }
    8. // Transactions can't be negative. This may never happen using RLP decoded
    9. // transactions but may occur if you create a transaction using the RPC.
    10. if tx.Value().Sign() < 0 {
    11. return ErrNegativeValue
    12. }
    13. // Ensure the transaction doesn't exceed the current block limit gas.
    14. if pool.currentMaxGas.Cmp(tx.Gas()) < 0 {
    15. return ErrGasLimit
    16. }
    17. // Make sure the transaction is signed properly
    18. // 确保交易被正确签名.
    19. from, err := types.Sender(pool.signer, tx)
    20. if err != nil {
    21. return ErrInvalidSender
    22. }
    23. // Drop non-local transactions under our own minimal accepted gas price
    24. local = local || pool.locals.contains(from) // account may be local even if the transaction arrived from the network
    25. // 如果不是本地的交易,并且GasPrice低于我们的设置,那么也不会接收.
    26. if !local && pool.gasPrice.Cmp(tx.GasPrice()) > 0 {
    27. return ErrUnderpriced
    28. }
    29. // Ensure the transaction adheres to nonce ordering
    30. // 确保交易遵守了Nonce的顺序
    31. if pool.currentState.GetNonce(from) > tx.Nonce() {
    32. return ErrNonceTooLow
    33. }
    34. // Transactor should have enough funds to cover the costs
    35. // cost == V + GP * GL
    36. // 确保用户有足够的余额来支付.
    37. if pool.currentState.GetBalance(from).Cmp(tx.Cost()) < 0 {
    38. return ErrInsufficientFunds
    39. }
    40. intrGas := IntrinsicGas(tx.Data(), tx.To() == nil, pool.homestead)
    41. // 如果交易是一个合约创建或者调用. 那么看看是否有足够的 初始Gas.
    42. if tx.Gas().Cmp(intrGas) < 0 {
    43. return ErrIntrinsicGas
    44. }
    45. return nil
    46. }