从交易扔到tx的pending池开始分析,不言而喻现在该是矿工登场的时候了

/miner/miner.go/Start

  1. func (self *Miner) Start(coinbase common.Address) {
  2. atomic.StoreInt32(&self.shouldStart, 1)
  3. self.worker.setEtherbase(coinbase)
  4. self.coinbase = coinbase
  5. if atomic.LoadInt32(&self.canStart) == 0 {
  6. log.Info("Network syncing, will start miner afterwards")
  7. return
  8. }
  9. atomic.StoreInt32(&self.mining, 1)
  10. log.Info("Starting mining operation")
  11. self.worker.start()
  12. self.worker.commitNewWork()
  13. }
该方法就是挖矿的开始处,其中start()方法是调用所有代理cpu或者远程代理开始挖矿工作,让我们看核心的commitNewWork()代码
  1. func (self *worker) commitNewWork() {
  2. self.mu.Lock()
  3. defer self.mu.Unlock()
  4. self.uncleMu.Lock()
  5. defer self.uncleMu.Unlock()
  6. self.currentMu.Lock()
  7. defer self.currentMu.Unlock()
  8. tstart := time.Now()
  9. parent := self.chain.CurrentBlock()
  10. tstamp := tstart.Unix()
  11. if parent.Time().Cmp(new(big.Int).SetInt64(tstamp)) >= 0 {
  12. tstamp = parent.Time().Int64() + 1
  13. }
  14. // this will ensure we're not going off too far in the future
  15. if now := time.Now().Unix(); tstamp > now+1 {
  16. wait := time.Duration(tstamp-now) * time.Second
  17. log.Info("Mining too far in the future", "wait", common.PrettyDuration(wait))
  18. time.Sleep(wait)
  19. }
  20. num := parent.Number()
  21. header := &types.Header{
  22. ParentHash: parent.Hash(),
  23. Number: num.Add(num, common.Big1),
  24. GasLimit: core.CalcGasLimit(parent),
  25. GasUsed: new(big.Int),
  26. Extra: self.extra,
  27. Time: big.NewInt(tstamp),
  28. }
  29. // Only set the coinbase if we are mining (avoid spurious block rewards)
  30. if atomic.LoadInt32(&self.mining) == 1 {
  31. header.Coinbase = self.coinbase
  32. }
  33. if err := self.engine.Prepare(self.chain, header); err != nil {
  34. log.Error("Failed to prepare header for mining", "err", err)
  35. return
  36. }
  37. // If we are care about TheDAO hard-fork check whether to override the extra-data or not
  38. if daoBlock := self.config.DAOForkBlock; daoBlock != nil {
  39. // Check whether the block is among the fork extra-override range
  40. limit := new(big.Int).Add(daoBlock, params.DAOForkExtraRange)
  41. if header.Number.Cmp(daoBlock) >= 0 && header.Number.Cmp(limit) < 0 {
  42. // Depending whether we support or oppose the fork, override differently
  43. if self.config.DAOForkSupport {
  44. header.Extra = common.CopyBytes(params.DAOForkBlockExtra)
  45. } else if bytes.Equal(header.Extra, params.DAOForkBlockExtra) {
  46. header.Extra = []byte{} // If miner opposes, don't let it use the reserved extra-data
  47. }
  48. }
  49. }
  50. // Could potentially happen if starting to mine in an odd state.
  51. err := self.makeCurrent(parent, header)
  52. if err != nil {
  53. log.Error("Failed to create mining context", "err", err)
  54. return
  55. }
  56. // Create the current work task and check any fork transitions needed
  57. work := self.current
  58. if self.config.DAOForkSupport && self.config.DAOForkBlock != nil && self.config.DAOForkBlock.Cmp(header.Number) == 0 {
  59. misc.ApplyDAOHardFork(work.state)
  60. }
  61. pending, err := self.eth.TxPool().Pending()
  62. if err != nil {
  63. log.Error("Failed to fetch pending transactions", "err", err)
  64. return
  65. }
  66. txs := types.NewTransactionsByPriceAndNonce(self.current.signer, pending)
  67. work.commitTransactions(self.mux, txs, self.chain, self.coinbase)
  68. // compute uncles for the new block.
  69. var (
  70. uncles []*types.Header
  71. badUncles []common.Hash
  72. )
  73. for hash, uncle := range self.possibleUncles {
  74. if len(uncles) == 2 {
  75. break
  76. }
  77. if err := self.commitUncle(work, uncle.Header()); err != nil {
  78. log.Trace("Bad uncle found and will be removed", "hash", hash)
  79. log.Trace(fmt.Sprint(uncle))
  80. badUncles = append(badUncles, hash)
  81. } else {
  82. log.Debug("Committing new uncle to block", "hash", hash)
  83. uncles = append(uncles, uncle.Header())
  84. }
  85. }
  86. for _, hash := range badUncles {
  87. delete(self.possibleUncles, hash)
  88. }
  89. // Create the new block to seal with the consensus engine
  90. if work.Block, err = self.engine.Finalize(self.chain, header, work.state, work.txs, uncles, work.receipts); err != nil {
  91. log.Error("Failed to finalize block for sealing", "err", err)
  92. return
  93. }
  94. // We only care about logging if we're actually mining.
  95. if atomic.LoadInt32(&self.mining) == 1 {
  96. log.Info("Commit new mining work", "number", work.Block.Number(), "txs", work.tcount, "uncles", len(uncles), "elapsed", common.PrettyDuration(time.Since(tstart)))
  97. self.unconfirmed.Shift(work.Block.NumberU64() - 1)
  98. }
  99. self.push(work)
  100. }
该方法根据上一个块的信息构造出了本次出块的header信息,本次出块的难度,如果上一个块的时间是未来的某个时间,那么就需要一直sleep直到对应的时间,然后work := self.current构建了当前出块任务的对象,

然后就是从pending交易池里面获取交易进行创建交易,然后就是提交交易,接着看work.commitTransactions方法下的env.commitTransaction方法

  1. func (env *Work) commitTransaction(tx *types.Transaction, bc *core.BlockChain, coinbase common.Address, gp *core.GasPool) (error, []*types.Log) {
  2. snap := env.state.Snapshot()
  3. receipt, _, err := core.ApplyTransaction(env.config, bc, &coinbase, gp, env.state, env.header, tx, env.header.GasUsed, vm.Config{})
  4. if err != nil {
  5. env.state.RevertToSnapshot(snap)
  6. return err, nil
  7. }
  8. env.txs = append(env.txs, tx)
  9. env.receipts = append(env.receipts, receipt)
  10. return nil, receipt.Logs
  11. }
该方法就是验证当前work中的每一笔交易是不是合法的,如果合法就加入到当前work的交易列表中,接着看commitNewWork()方法下的Finalize方法
  1. // Finalize implements consensus.Engine, accumulating the block and uncle rewards,
  2. // setting the final state and assembling the block.
  3. func (ethash *Ethash) Finalize(chain consensus.ChainReader, header *types.Header, state *state.StateDB, txs []*types.Transaction, uncles []*types.Header, receipts []*types.Receipt) (*types.Block, error) {
  4. // Accumulate any block and uncle rewards and commit the final state root
  5. AccumulateRewards(chain.Config(), state, header, uncles)
  6. header.Root = state.IntermediateRoot(chain.Config().IsEIP158(header.Number))
  7. // Header seems complete, assemble into a block and return
  8. return types.NewBlock(header, txs, uncles, receipts), nil
  9. }
该方法其实就是计算好该块的出块奖励,接着看commitNewWork()方法下的最后一个方法
  1. // push sends a new work task to currently live miner agents.
  2. func (self *worker) push(work *Work) {
  3. if atomic.LoadInt32(&self.mining) != 1 {
  4. return
  5. }
  6. for agent := range self.agents {
  7. atomic.AddInt32(&self.atWork, 1)
  8. if ch := agent.Work(); ch != nil {
  9. ch <- work
  10. }
  11. }
  12. }
该方法就是把当前出块的任务推送到每一个代理,通过管道的形式写入到每个代理的work管道,到此为止,下个块的信息已经发送给每个代理了,那么接着看代理如何出块,首先看结构
  1. // Agent can register themself with the worker
  2. type Agent interface {
  3. Work() chan<- *Work
  4. SetReturnCh(chan<- *Result)
  5. Stop()
  6. Start()
  7. GetHashRate() int64
  8. }
上面的分析当前块的任务以及信息已经通过管道写入到work中了,那么让我们看work方法,谁来接收并处理呢?让我们将目光放到/miner/agent.go/update方法
  1. func (self *CpuAgent) update() {
  2. out:
  3. for {
  4. select {
  5. case work := <-self.workCh:
  6. self.mu.Lock()
  7. if self.quitCurrentOp != nil {
  8. close(self.quitCurrentOp)
  9. }
  10. self.quitCurrentOp = make(chan struct{})
  11. go self.mine(work, self.quitCurrentOp)
  12. self.mu.Unlock()
  13. case <-self.stop:
  14. self.mu.Lock()
  15. if self.quitCurrentOp != nil {
  16. close(self.quitCurrentOp)
  17. self.quitCurrentOp = nil
  18. }
  19. self.mu.Unlock()
  20. break out
  21. }
  22. }
  23. }
可以看到刚才写入到管道的当前块任务在这里并接收并且处理,调用self.mine(work, self.quitCurrentOp)进行挖矿,谁先计算出符合该块上面的难度hash,谁就能够产块,至此和共识包下的pow的分析第二篇形式闭环