eth共识算法分析,从本地节点挖到块开始分析

首先目前生产环境上面,肯定不是以CPU的形式挖矿的,那么就是remoteAgent这种形式,也就是矿机通过网络请求从以太的节点获取当前节点的出块任务,

然后矿机根据算出符合该块难度hash值,提交给节点,也就是对应的以下方法.

  1. func (a *RemoteAgent) SubmitWork(nonce types.BlockNonce, mixDigest, hash common.Hash) bool {
  2. a.mu.Lock()
  3. defer a.mu.Unlock()
  4. // Make sure the work submitted is present
  5. work := a.work[hash]
  6. if work == nil {
  7. log.Info("Work submitted but none pending", "hash", hash)
  8. return false
  9. }
  10. // Make sure the Engine solutions is indeed valid
  11. result := work.Block.Header()
  12. result.Nonce = nonce
  13. result.MixDigest = mixDigest
  14. if err := a.engine.VerifySeal(a.chain, result); err != nil {
  15. log.Warn("Invalid proof-of-work submitted", "hash", hash, "err", err)
  16. return false
  17. }
  18. block := work.Block.WithSeal(result)
  19. // Solutions seems to be valid, return to the miner and notify acceptance
  20. a.returnCh <- &Result{work, block}
  21. delete(a.work, hash)
  22. return true
  23. }

该方法会校验提交过来的块的hash难度,如果是正常的话,则会将该生成的块写到管道中,管道接收的方法在/miner/worker.go/Wait方法中

  1. func (self *worker) wait() {
  2. for {
  3. mustCommitNewWork := true
  4. for result := range self.recv {
  5. atomic.AddInt32(&self.atWork, -1)
  6. if result == nil {
  7. continue
  8. }
  9. block := result.Block
  10. work := result.Work
  11. // Update the block hash in all logs since it is now available and not when the
  12. // receipt/log of individual transactions were created.
  13. for _, r := range work.receipts {
  14. for _, l := range r.Logs {
  15. l.BlockHash = block.Hash()
  16. }
  17. }
  18. for _, log := range work.state.Logs() {
  19. log.BlockHash = block.Hash()
  20. }
  21. stat, err := self.chain.WriteBlockAndState(block, work.receipts, work.state)
  22. if err != nil {
  23. log.Error("Failed writing block to chain", "err", err)
  24. continue
  25. }
  26. // check if canon block and write transactions
  27. if stat == core.CanonStatTy {
  28. // implicit by posting ChainHeadEvent
  29. mustCommitNewWork = false
  30. }
  31. // Broadcast the block and announce chain insertion event
  32. // 通过p2p的形式将块广播到连接的节点,走的还是channel
  33. self.mux.Post(core.NewMinedBlockEvent{Block: block})
  34. var (
  35. events []interface{}
  36. logs = work.state.Logs()
  37. )
  38. events = append(events, core.ChainEvent{Block: block, Hash: block.Hash(), Logs: logs})
  39. if stat == core.CanonStatTy {
  40. events = append(events, core.ChainHeadEvent{Block: block})
  41. }
  42. self.chain.PostChainEvents(events, logs)
  43. // Insert the block into the set of pending ones to wait for confirmations
  44. self.unconfirmed.Insert(block.NumberU64(), block.Hash())
  45. if mustCommitNewWork {
  46. self.commitNewWork()
  47. }
  48. }
  49. }
  50. }

这里发送了一个新挖到块的事件,接着跟,调用栈是

  1. /geth/main.go/geth --> startNode --> utils.StartNode(stack)
  2. --> stack.Start() --> /node/node.go/Start() --> service.Start(running)
  3. --> /eth/backend.go/Start() --> /eth/handler.go/Start()

好了核心逻辑在handler.go/Start()里面

  1. func (pm *ProtocolManager) Start(maxPeers int) {
  2. pm.maxPeers = maxPeers
  3. // broadcast transactions
  4. // 广播交易的通道。 txCh会作为txpool的TxPreEvent订阅通道。txpool有了这种消息会通知给这个txCh。 广播交易的goroutine会把这个消息广播出去。
  5. pm.txCh = make(chan core.TxPreEvent, txChanSize)
  6. // 订阅的回执
  7. pm.txSub = pm.txpool.SubscribeTxPreEvent(pm.txCh)
  8. go pm.txBroadcastLoop()
  9. // 订阅挖矿消息。当新的Block被挖出来的时候会产生消息。 这个订阅和上面的那个订阅采用了两种不同的模式,这种是标记为Deprecated的订阅方式。
  10. // broadcast mined blocks
  11. pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})
  12. // 挖矿广播 goroutine 当挖出来的时候需要尽快的广播到网络上面去 本地挖出的块通过这种形式广播出去
  13. go pm.minedBroadcastLoop()
  14. // 同步器负责周期性地与网络同步,下载散列和块以及处理通知处理程序。
  15. // start sync handlers
  16. go pm.syncer()
  17. // txsyncLoop负责每个新连接的初始事务同步。 当新的peer出现时,我们转发所有当前待处理的事务。 为了最小化出口带宽使用,我们一次只发送一个小包。
  18. go pm.txsyncLoop()
  19. }

pm.minedBroadcastLoop()里面就有管道接收到上面post出来的出块消息,跟进去将会看到通过p2p网络发送给节点的逻辑

  1. // BroadcastBlock will either propagate a block to a subset of it's peers, or
  2. // will only announce it's availability (depending what's requested).
  3. func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) {
  4. hash := block.Hash()
  5. peers := pm.peers.PeersWithoutBlock(hash)
  6. // If propagation is requested, send to a subset of the peer
  7. if propagate {
  8. // Calculate the TD of the block (it's not imported yet, so block.Td is not valid)
  9. var td *big.Int
  10. if parent := pm.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1); parent != nil {
  11. td = new(big.Int).Add(block.Difficulty(), pm.blockchain.GetTd(block.ParentHash(), block.NumberU64()-1))
  12. } else {
  13. log.Error("Propagating dangling block", "number", block.Number(), "hash", hash)
  14. return
  15. }
  16. // Send the block to a subset of our peers
  17. transfer := peers[:int(math.Sqrt(float64(len(peers))))]
  18. for _, peer := range transfer {
  19. peer.SendNewBlock(block, td)
  20. }
  21. log.Trace("Propagated block", "hash", hash, "recipients", len(transfer), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))
  22. return
  23. }
  24. // Otherwise if the block is indeed in out own chain, announce it
  25. if pm.blockchain.HasBlock(hash, block.NumberU64()) {
  26. for _, peer := range peers {
  27. peer.SendNewBlockHashes([]common.Hash{hash}, []uint64{block.NumberU64()})
  28. }
  29. log.Trace("Announced block", "hash", hash, "recipients", len(peers), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))
  30. }
  31. }

这里面会发送两种时间,一种是NewBlockMsg,另外一种是NewBlockHashesMsg,好了到此本地节点挖到的块就通过p2p网络的形式开始扩散出去了 接着看下一个重要的方法

  1. // NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
  2. // with the ethereum network.
  3. func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, networkId uint64, mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database) (*ProtocolManager, error) {
  4. // Create the protocol manager with the base fields
  5. manager := &ProtocolManager{
  6. networkId: networkId,
  7. eventMux: mux,
  8. txpool: txpool,
  9. blockchain: blockchain,
  10. chaindb: chaindb,
  11. chainconfig: config,
  12. peers: newPeerSet(),
  13. newPeerCh: make(chan *peer),
  14. noMorePeers: make(chan struct{}),
  15. txsyncCh: make(chan *txsync),
  16. quitSync: make(chan struct{}),
  17. }
  18. // Figure out whether to allow fast sync or not
  19. if mode == downloader.FastSync && blockchain.CurrentBlock().NumberU64() > 0 {
  20. log.Warn("Blockchain not empty, fast sync disabled")
  21. mode = downloader.FullSync
  22. }
  23. if mode == downloader.FastSync {
  24. manager.fastSync = uint32(1)
  25. }
  26. // Initiate a sub-protocol for every implemented version we can handle
  27. manager.SubProtocols = make([]p2p.Protocol, 0, len(ProtocolVersions))
  28. for i, version := range ProtocolVersions {
  29. // Skip protocol version if incompatible with the mode of operation
  30. if mode == downloader.FastSync && version < eth63 {
  31. continue
  32. }
  33. // Compatible; initialise the sub-protocol
  34. version := version // Closure for the run
  35. manager.SubProtocols = append(manager.SubProtocols, p2p.Protocol{
  36. Name: ProtocolName,
  37. Version: version,
  38. Length: ProtocolLengths[i],
  39. Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
  40. peer := manager.newPeer(int(version), p, rw)
  41. select {
  42. case manager.newPeerCh <- peer:
  43. manager.wg.Add(1)
  44. defer manager.wg.Done()
  45. return manager.handle(peer)
  46. case <-manager.quitSync:
  47. return p2p.DiscQuitting
  48. }
  49. },
  50. NodeInfo: func() interface{} {
  51. return manager.NodeInfo()
  52. },
  53. PeerInfo: func(id discover.NodeID) interface{} {
  54. if p := manager.peers.Peer(fmt.Sprintf("%x", id[:8])); p != nil {
  55. return p.Info()
  56. }
  57. return nil
  58. },
  59. })
  60. }
  61. if len(manager.SubProtocols) == 0 {
  62. return nil, errIncompatibleConfig
  63. }
  64. // downloader是负责从其他的peer来同步自身数据。
  65. // downloader是全链同步工具
  66. // Construct the different synchronisation mechanisms
  67. manager.downloader = downloader.New(mode, chaindb, manager.eventMux, blockchain, nil, manager.removePeer)
  68. validator := func(header *types.Header) error {
  69. return engine.VerifyHeader(blockchain, header, true)
  70. }
  71. heighter := func() uint64 {
  72. return blockchain.CurrentBlock().NumberU64()
  73. }
  74. inserter := func(blocks types.Blocks) (int, error) {
  75. // If fast sync is running, deny importing weird blocks
  76. if atomic.LoadUint32(&manager.fastSync) == 1 {
  77. log.Warn("Discarded bad propagated block", "number", blocks[0].Number(), "hash", blocks[0].Hash())
  78. return 0, nil
  79. }
  80. // 设置开始接收交易
  81. atomic.StoreUint32(&manager.acceptTxs, 1) // Mark initial sync done on any fetcher import
  82. return manager.blockchain.InsertChain(blocks)
  83. }
  84. // 生成一个fetcher
  85. // Fetcher负责积累来自各个peer的区块通知并安排进行检索。
  86. manager.fetcher = fetcher.New(blockchain.GetBlockByHash, validator, manager.BroadcastBlock, heighter, inserter, manager.removePeer)
  87. return manager, nil
  88. }

该方法是用来管理以太坊协议下的多个子协议,其中的Run方法在每个节点启动的时候就会调用,可以看到是阻塞的,跟进handler方法能看到这样的一块关键代码

  1. for {
  2. if err := pm.handleMsg(p); err != nil {
  3. p.Log().Debug("Ethereum message handling failed", "err", err)
  4. return err
  5. }
  6. }

死循环,处理p2p网络过来的消息,接着看handleMsg方法

  1. func (pm *ProtocolManager) handleMsg(p *peer) error {
  2. // Read the next message from the remote peer, and ensure it's fully consumed
  3. msg, err := p.rw.ReadMsg()
  4. if err != nil {
  5. return err
  6. }
  7. if msg.Size > ProtocolMaxMsgSize {
  8. return errResp(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize)
  9. }
  10. defer msg.Discard()
  11. // Handle the message depending on its contents
  12. switch {
  13. case msg.Code == StatusMsg:
  14. // Status messages should never arrive after the handshake
  15. return errResp(ErrExtraStatusMsg, "uncontrolled status message")
  16. // Block header query, collect the requested headers and reply
  17. case msg.Code == GetBlockHeadersMsg:
  18. // Decode the complex header query
  19. var query getBlockHeadersData
  20. if err := msg.Decode(&query); err != nil {
  21. return errResp(ErrDecode, "%v: %v", msg, err)
  22. }
  23. hashMode := query.Origin.Hash != (common.Hash{})
  24. // Gather headers until the fetch or network limits is reached
  25. var (
  26. bytes common.StorageSize
  27. headers []*types.Header
  28. unknown bool
  29. )
  30. for !unknown && len(headers) < int(query.Amount) && bytes < softResponseLimit && len(headers) < downloader.MaxHeaderFetch {
  31. // Retrieve the next header satisfying the query
  32. var origin *types.Header
  33. if hashMode {
  34. origin = pm.blockchain.GetHeaderByHash(query.Origin.Hash)
  35. } else {
  36. origin = pm.blockchain.GetHeaderByNumber(query.Origin.Number)
  37. }
  38. if origin == nil {
  39. break
  40. }
  41. number := origin.Number.Uint64()
  42. headers = append(headers, origin)
  43. bytes += estHeaderRlpSize
  44. // Advance to the next header of the query
  45. switch {
  46. case query.Origin.Hash != (common.Hash{}) && query.Reverse:
  47. // Hash based traversal towards the genesis block
  48. for i := 0; i < int(query.Skip)+1; i++ {
  49. if header := pm.blockchain.GetHeader(query.Origin.Hash, number); header != nil {
  50. query.Origin.Hash = header.ParentHash
  51. number--
  52. } else {
  53. unknown = true
  54. break
  55. }
  56. }
  57. case query.Origin.Hash != (common.Hash{}) && !query.Reverse:
  58. // Hash based traversal towards the leaf block
  59. var (
  60. current = origin.Number.Uint64()
  61. next = current + query.Skip + 1
  62. )
  63. if next <= current {
  64. infos, _ := json.MarshalIndent(p.Peer.Info(), "", " ")
  65. p.Log().Warn("GetBlockHeaders skip overflow attack", "current", current, "skip", query.Skip, "next", next, "attacker", infos)
  66. unknown = true
  67. } else {
  68. if header := pm.blockchain.GetHeaderByNumber(next); header != nil {
  69. if pm.blockchain.GetBlockHashesFromHash(header.Hash(), query.Skip+1)[query.Skip] == query.Origin.Hash {
  70. query.Origin.Hash = header.Hash()
  71. } else {
  72. unknown = true
  73. }
  74. } else {
  75. unknown = true
  76. }
  77. }
  78. case query.Reverse:
  79. // Number based traversal towards the genesis block
  80. if query.Origin.Number >= query.Skip+1 {
  81. query.Origin.Number -= (query.Skip + 1)
  82. } else {
  83. unknown = true
  84. }
  85. case !query.Reverse:
  86. // Number based traversal towards the leaf block
  87. query.Origin.Number += (query.Skip + 1)
  88. }
  89. }
  90. return p.SendBlockHeaders(headers)
  91. case msg.Code == BlockHeadersMsg:
  92. // A batch of headers arrived to one of our previous requests
  93. var headers []*types.Header
  94. if err := msg.Decode(&headers); err != nil {
  95. return errResp(ErrDecode, "msg %v: %v", msg, err)
  96. }
  97. // If no headers were received, but we're expending a DAO fork check, maybe it's that
  98. if len(headers) == 0 && p.forkDrop != nil {
  99. // Possibly an empty reply to the fork header checks, sanity check TDs
  100. verifyDAO := true
  101. // If we already have a DAO header, we can check the peer's TD against it. If
  102. // the peer's ahead of this, it too must have a reply to the DAO check
  103. if daoHeader := pm.blockchain.GetHeaderByNumber(pm.chainconfig.DAOForkBlock.Uint64()); daoHeader != nil {
  104. if _, td := p.Head(); td.Cmp(pm.blockchain.GetTd(daoHeader.Hash(), daoHeader.Number.Uint64())) >= 0 {
  105. verifyDAO = false
  106. }
  107. }
  108. // If we're seemingly on the same chain, disable the drop timer
  109. if verifyDAO {
  110. p.Log().Debug("Seems to be on the same side of the DAO fork")
  111. p.forkDrop.Stop()
  112. p.forkDrop = nil
  113. return nil
  114. }
  115. }
  116. // Filter out any explicitly requested headers, deliver the rest to the downloader
  117. filter := len(headers) == 1
  118. if filter {
  119. // If it's a potential DAO fork check, validate against the rules
  120. if p.forkDrop != nil && pm.chainconfig.DAOForkBlock.Cmp(headers[0].Number) == 0 {
  121. // Disable the fork drop timer
  122. p.forkDrop.Stop()
  123. p.forkDrop = nil
  124. // Validate the header and either drop the peer or continue
  125. if err := misc.VerifyDAOHeaderExtraData(pm.chainconfig, headers[0]); err != nil {
  126. p.Log().Debug("Verified to be on the other side of the DAO fork, dropping")
  127. return err
  128. }
  129. p.Log().Debug("Verified to be on the same side of the DAO fork")
  130. return nil
  131. }
  132. // Irrelevant of the fork checks, send the header to the fetcher just in case
  133. headers = pm.fetcher.FilterHeaders(p.id, headers, time.Now())
  134. }
  135. if len(headers) > 0 || !filter {
  136. err := pm.downloader.DeliverHeaders(p.id, headers)
  137. if err != nil {
  138. log.Debug("Failed to deliver headers", "err", err)
  139. }
  140. }
  141. case msg.Code == GetBlockBodiesMsg:
  142. // Decode the retrieval message
  143. msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
  144. if _, err := msgStream.List(); err != nil {
  145. return err
  146. }
  147. // Gather blocks until the fetch or network limits is reached
  148. var (
  149. hash common.Hash
  150. bytes int
  151. bodies []rlp.RawValue
  152. )
  153. for bytes < softResponseLimit && len(bodies) < downloader.MaxBlockFetch {
  154. // Retrieve the hash of the next block
  155. if err := msgStream.Decode(&hash); err == rlp.EOL {
  156. break
  157. } else if err != nil {
  158. return errResp(ErrDecode, "msg %v: %v", msg, err)
  159. }
  160. // Retrieve the requested block body, stopping if enough was found
  161. if data := pm.blockchain.GetBodyRLP(hash); len(data) != 0 {
  162. bodies = append(bodies, data)
  163. bytes += len(data)
  164. }
  165. }
  166. return p.SendBlockBodiesRLP(bodies)
  167. case msg.Code == BlockBodiesMsg:
  168. // A batch of block bodies arrived to one of our previous requests
  169. var request blockBodiesData
  170. if err := msg.Decode(&request); err != nil {
  171. return errResp(ErrDecode, "msg %v: %v", msg, err)
  172. }
  173. // Deliver them all to the downloader for queuing
  174. trasactions := make([][]*types.Transaction, len(request))
  175. uncles := make([][]*types.Header, len(request))
  176. for i, body := range request {
  177. trasactions[i] = body.Transactions
  178. uncles[i] = body.Uncles
  179. }
  180. // Filter out any explicitly requested bodies, deliver the rest to the downloader
  181. filter := len(trasactions) > 0 || len(uncles) > 0
  182. if filter {
  183. trasactions, uncles = pm.fetcher.FilterBodies(p.id, trasactions, uncles, time.Now())
  184. }
  185. if len(trasactions) > 0 || len(uncles) > 0 || !filter {
  186. err := pm.downloader.DeliverBodies(p.id, trasactions, uncles)
  187. if err != nil {
  188. log.Debug("Failed to deliver bodies", "err", err)
  189. }
  190. }
  191. case p.version >= eth63 && msg.Code == GetNodeDataMsg:
  192. // Decode the retrieval message
  193. msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
  194. if _, err := msgStream.List(); err != nil {
  195. return err
  196. }
  197. // Gather state data until the fetch or network limits is reached
  198. var (
  199. hash common.Hash
  200. bytes int
  201. data [][]byte
  202. )
  203. for bytes < softResponseLimit && len(data) < downloader.MaxStateFetch {
  204. // Retrieve the hash of the next state entry
  205. if err := msgStream.Decode(&hash); err == rlp.EOL {
  206. break
  207. } else if err != nil {
  208. return errResp(ErrDecode, "msg %v: %v", msg, err)
  209. }
  210. // Retrieve the requested state entry, stopping if enough was found
  211. if entry, err := pm.chaindb.Get(hash.Bytes()); err == nil {
  212. data = append(data, entry)
  213. bytes += len(entry)
  214. }
  215. }
  216. return p.SendNodeData(data)
  217. case p.version >= eth63 && msg.Code == NodeDataMsg:
  218. // A batch of node state data arrived to one of our previous requests
  219. var data [][]byte
  220. if err := msg.Decode(&data); err != nil {
  221. return errResp(ErrDecode, "msg %v: %v", msg, err)
  222. }
  223. // Deliver all to the downloader
  224. if err := pm.downloader.DeliverNodeData(p.id, data); err != nil {
  225. log.Debug("Failed to deliver node state data", "err", err)
  226. }
  227. case p.version >= eth63 && msg.Code == GetReceiptsMsg:
  228. // Decode the retrieval message
  229. msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
  230. if _, err := msgStream.List(); err != nil {
  231. return err
  232. }
  233. // Gather state data until the fetch or network limits is reached
  234. var (
  235. hash common.Hash
  236. bytes int
  237. receipts []rlp.RawValue
  238. )
  239. for bytes < softResponseLimit && len(receipts) < downloader.MaxReceiptFetch {
  240. // Retrieve the hash of the next block
  241. if err := msgStream.Decode(&hash); err == rlp.EOL {
  242. break
  243. } else if err != nil {
  244. return errResp(ErrDecode, "msg %v: %v", msg, err)
  245. }
  246. // Retrieve the requested block's receipts, skipping if unknown to us
  247. results := core.GetBlockReceipts(pm.chaindb, hash, core.GetBlockNumber(pm.chaindb, hash))
  248. if results == nil {
  249. if header := pm.blockchain.GetHeaderByHash(hash); header == nil || header.ReceiptHash != types.EmptyRootHash {
  250. continue
  251. }
  252. }
  253. // If known, encode and queue for response packet
  254. if encoded, err := rlp.EncodeToBytes(results); err != nil {
  255. log.Error("Failed to encode receipt", "err", err)
  256. } else {
  257. receipts = append(receipts, encoded)
  258. bytes += len(encoded)
  259. }
  260. }
  261. return p.SendReceiptsRLP(receipts)
  262. case p.version >= eth63 && msg.Code == ReceiptsMsg:
  263. // A batch of receipts arrived to one of our previous requests
  264. var receipts [][]*types.Receipt
  265. if err := msg.Decode(&receipts); err != nil {
  266. return errResp(ErrDecode, "msg %v: %v", msg, err)
  267. }
  268. // Deliver all to the downloader
  269. if err := pm.downloader.DeliverReceipts(p.id, receipts); err != nil {
  270. log.Debug("Failed to deliver receipts", "err", err)
  271. }
  272. case msg.Code == NewBlockHashesMsg:
  273. var announces newBlockHashesData
  274. if err := msg.Decode(&announces); err != nil {
  275. return errResp(ErrDecode, "%v: %v", msg, err)
  276. }
  277. // Mark the hashes as present at the remote node
  278. for _, block := range announces {
  279. p.MarkBlock(block.Hash)
  280. }
  281. // Schedule all the unknown hashes for retrieval
  282. unknown := make(newBlockHashesData, 0, len(announces))
  283. for _, block := range announces {
  284. if !pm.blockchain.HasBlock(block.Hash, block.Number) {
  285. unknown = append(unknown, block)
  286. }
  287. }
  288. for _, block := range unknown {
  289. pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestOneHeader, p.RequestBodies)
  290. }
  291. case msg.Code == NewBlockMsg:
  292. // Retrieve and decode the propagated block
  293. var request newBlockData
  294. if err := msg.Decode(&request); err != nil {
  295. return errResp(ErrDecode, "%v: %v", msg, err)
  296. }
  297. request.Block.ReceivedAt = msg.ReceivedAt
  298. request.Block.ReceivedFrom = p
  299. // Mark the peer as owning the block and schedule it for import
  300. p.MarkBlock(request.Block.Hash())
  301. pm.fetcher.Enqueue(p.id, request.Block)
  302. // Assuming the block is importable by the peer, but possibly not yet done so,
  303. // calculate the head hash and TD that the peer truly must have.
  304. var (
  305. trueHead = request.Block.ParentHash()
  306. trueTD = new(big.Int).Sub(request.TD, request.Block.Difficulty())
  307. )
  308. // Update the peers total difficulty if better than the previous
  309. if _, td := p.Head(); trueTD.Cmp(td) > 0 {
  310. p.SetHead(trueHead, trueTD)
  311. // Schedule a sync if above ours. Note, this will not fire a sync for a gap of
  312. // a singe block (as the true TD is below the propagated block), however this
  313. // scenario should easily be covered by the fetcher.
  314. currentBlock := pm.blockchain.CurrentBlock()
  315. if trueTD.Cmp(pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64())) > 0 {
  316. go pm.synchronise(p)
  317. }
  318. }
  319. case msg.Code == TxMsg:
  320. // Transactions arrived, make sure we have a valid and fresh chain to handle them
  321. if atomic.LoadUint32(&pm.acceptTxs) == 0 {
  322. break
  323. }
  324. // Transactions can be processed, parse all of them and deliver to the pool
  325. var txs []*types.Transaction
  326. if err := msg.Decode(&txs); err != nil {
  327. return errResp(ErrDecode, "msg %v: %v", msg, err)
  328. }
  329. for i, tx := range txs {
  330. // Validate and mark the remote transaction
  331. if tx == nil {
  332. return errResp(ErrDecode, "transaction %d is nil", i)
  333. }
  334. p.MarkTransaction(tx.Hash())
  335. }
  336. pm.txpool.AddRemotes(txs)
  337. default:
  338. return errResp(ErrInvalidMsgCode, "%v", msg.Code)
  339. }
  340. return nil
  341. }

该方法中就解码了p2p网络过来的消息,并且处理了NewBlockMsgNewBlockHashesMsg这两种事件,如NewBlockMsg中的处理逻辑是直接通过管道发送到本地了,pm.fetcher.Enqueue(p.id, request.Block) ,对应的管道名是:f.inject,其中是一个队列,/fetcher.go/enqueue方法中写入了一个FIFO队列中

  1. func (f *Fetcher) enqueue(peer string, block *types.Block) {
  2. hash := block.Hash()
  3. // Ensure the peer isn't DOSing us
  4. count := f.queues[peer] + 1
  5. if count > blockLimit {
  6. log.Debug("Discarded propagated block, exceeded allowance", "peer", peer, "number", block.Number(), "hash", hash, "limit", blockLimit)
  7. propBroadcastDOSMeter.Mark(1)
  8. f.forgetHash(hash)
  9. return
  10. }
  11. // Discard any past or too distant blocks
  12. if dist := int64(block.NumberU64()) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist {
  13. log.Debug("Discarded propagated block, too far away", "peer", peer, "number", block.Number(), "hash", hash, "distance", dist)
  14. propBroadcastDropMeter.Mark(1)
  15. f.forgetHash(hash)
  16. return
  17. }
  18. // Schedule the block for future importing
  19. if _, ok := f.queued[hash]; !ok {
  20. op := &inject{
  21. origin: peer,
  22. block: block,
  23. }
  24. f.queues[peer] = count
  25. f.queued[hash] = op
  26. f.queue.Push(op, -float32(block.NumberU64()))
  27. if f.queueChangeHook != nil {
  28. f.queueChangeHook(op.block.Hash(), true)
  29. }
  30. log.Debug("Queued propagated block", "peer", peer, "number", block.Number(), "hash", hash, "queued", f.queue.Size())
  31. }
  32. }

该队列的消费端在/fetcher.go/loop中,是一个死循环,核心代码

  1. for !f.queue.Empty() {
  2. op := f.queue.PopItem().(*inject)
  3. if f.queueChangeHook != nil {
  4. f.queueChangeHook(op.block.Hash(), false)
  5. }
  6. // If too high up the chain or phase, continue later
  7. number := op.block.NumberU64()
  8. if number > height+1 {
  9. f.queue.Push(op, -float32(op.block.NumberU64()))
  10. if f.queueChangeHook != nil {
  11. f.queueChangeHook(op.block.Hash(), true)
  12. }
  13. break
  14. }
  15. // Otherwise if fresh and still unknown, try and import
  16. hash := op.block.Hash()
  17. if number+maxUncleDist < height || f.getBlock(hash) != nil {
  18. f.forgetBlock(hash)
  19. continue
  20. }
  21. f.insert(op.origin, op.block)
  22. }

从队列中取出,接着看insert方法

  1. func (f *Fetcher) insert(peer string, block *types.Block) {
  2. hash := block.Hash()
  3. // Run the import on a new thread
  4. log.Debug("Importing propagated block", "peer", peer, "number", block.Number(), "hash", hash)
  5. go func() {
  6. defer func() { f.done <- hash }()
  7. // If the parent's unknown, abort insertion
  8. parent := f.getBlock(block.ParentHash())
  9. if parent == nil {
  10. log.Debug("Unknown parent of propagated block", "peer", peer, "number", block.Number(), "hash", hash, "parent", block.ParentHash())
  11. return
  12. }
  13. // Quickly validate the header and propagate the block if it passes
  14. switch err := f.verifyHeader(block.Header()); err {
  15. case nil:
  16. // All ok, quickly propagate to our peers
  17. propBroadcastOutTimer.UpdateSince(block.ReceivedAt)
  18. go f.broadcastBlock(block, true)
  19. case consensus.ErrFutureBlock:
  20. // Weird future block, don't fail, but neither propagate
  21. default:
  22. // Something went very wrong, drop the peer
  23. log.Debug("Propagated block verification failed", "peer", peer, "number", block.Number(), "hash", hash, "err", err)
  24. f.dropPeer(peer)
  25. return
  26. }
  27. // Run the actual import and log any issues
  28. if _, err := f.insertChain(types.Blocks{block}); err != nil {
  29. log.Debug("Propagated block import failed", "peer", peer, "number", block.Number(), "hash", hash, "err", err)
  30. return
  31. }
  32. // If import succeeded, broadcast the block
  33. propAnnounceOutTimer.UpdateSince(block.ReceivedAt)
  34. go f.broadcastBlock(block, false)
  35. // Invoke the testing hook if needed
  36. if f.importedHook != nil {
  37. f.importedHook(block)
  38. }
  39. }()
  40. }

可以看到,该方法会调用verifyHeader方法去校验区块,如果没问题的话就通过p2p的形式广播出去,然后调用insertChain方法插入到本地的leveldb中,插入没问题的话,会再广播一次,不过这次只会广播block的hash, 如此,通过一个对等网络,只要块合法,那么就会被全网采纳,其中的verifyHeader,insertChain方法都是在/handler.go/NewProtocolManager中定义传过来的,所有启动的逻辑都是handler.go/Start方法中. fetch.go的start方法在syncer方法中用一个单独的协程触发的

/handler.go/handleMsg --> go pm.synchronise(p) --> pm.downloader.Synchronise(peer.id, pHead, pTd, mode) --> d.synchronise(id, head, td, mode) --> d.syncWithPeer(p, hash, td),让我们看下核心方法

  1. func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.Int) (err error) {
  2. d.mux.Post(StartEvent{})
  3. defer func() {
  4. // reset on error
  5. if err != nil {
  6. d.mux.Post(FailedEvent{err})
  7. } else {
  8. d.mux.Post(DoneEvent{})
  9. }
  10. }()
  11. if p.version < 62 {
  12. return errTooOld
  13. }
  14. log.Debug("Synchronising with the network", "peer", p.id, "eth", p.version, "head", hash, "td", td, "mode", d.mode)
  15. defer func(start time.Time) {
  16. log.Debug("Synchronisation terminated", "elapsed", time.Since(start))
  17. }(time.Now())
  18. // Look up the sync boundaries: the common ancestor and the target block
  19. latest, err := d.fetchHeight(p)
  20. if err != nil {
  21. return err
  22. }
  23. height := latest.Number.Uint64()
  24. origin, err := d.findAncestor(p, height)
  25. if err != nil {
  26. return err
  27. }
  28. d.syncStatsLock.Lock()
  29. if d.syncStatsChainHeight <= origin || d.syncStatsChainOrigin > origin {
  30. d.syncStatsChainOrigin = origin
  31. }
  32. d.syncStatsChainHeight = height
  33. d.syncStatsLock.Unlock()
  34. // Initiate the sync using a concurrent header and content retrieval algorithm
  35. pivot := uint64(0)
  36. switch d.mode {
  37. case LightSync:
  38. pivot = height
  39. case FastSync:
  40. // Calculate the new fast/slow sync pivot point
  41. if d.fsPivotLock == nil {
  42. pivotOffset, err := rand.Int(rand.Reader, big.NewInt(int64(fsPivotInterval)))
  43. if err != nil {
  44. panic(fmt.Sprintf("Failed to access crypto random source: %v", err))
  45. }
  46. if height > uint64(fsMinFullBlocks)+pivotOffset.Uint64() {
  47. pivot = height - uint64(fsMinFullBlocks) - pivotOffset.Uint64()
  48. }
  49. } else {
  50. // Pivot point locked in, use this and do not pick a new one!
  51. pivot = d.fsPivotLock.Number.Uint64()
  52. }
  53. // If the point is below the origin, move origin back to ensure state download
  54. if pivot < origin {
  55. if pivot > 0 {
  56. origin = pivot - 1
  57. } else {
  58. origin = 0
  59. }
  60. }
  61. log.Debug("Fast syncing until pivot block", "pivot", pivot)
  62. }
  63. d.queue.Prepare(origin+1, d.mode, pivot, latest)
  64. if d.syncInitHook != nil {
  65. d.syncInitHook(origin, height)
  66. }
  67. fetchers := []func() error{
  68. func() error { return d.fetchHeaders(p, origin+1) }, // Headers are always retrieved
  69. func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync
  70. func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync
  71. func() error { return d.processHeaders(origin+1, td) },
  72. }
  73. if d.mode == FastSync {
  74. fetchers = append(fetchers, func() error { return d.processFastSyncContent(latest) })
  75. } else if d.mode == FullSync {
  76. fetchers = append(fetchers, d.processFullSyncContent)
  77. }
  78. err = d.spawnSync(fetchers)
  79. if err != nil && d.mode == FastSync && d.fsPivotLock != nil {
  80. // If sync failed in the critical section, bump the fail counter.
  81. atomic.AddUint32(&d.fsPivotFails, 1)
  82. }
  83. return err
  84. }

由于上述整个调用栈是在newBlockMsg的条件中触发的,这里的StartEvent会通过通道的形式传递到miner.go/update中

  1. func (self *Miner) update() {
  2. events := self.mux.Subscribe(downloader.StartEvent{}, downloader.DoneEvent{}, downloader.FailedEvent{})
  3. out:
  4. for ev := range events.Chan() {
  5. switch ev.Data.(type) {
  6. case downloader.StartEvent:
  7. atomic.StoreInt32(&self.canStart, 0)
  8. if self.Mining() {
  9. self.Stop()
  10. atomic.StoreInt32(&self.shouldStart, 1)
  11. log.Info("Mining aborted due to sync")
  12. }
  13. case downloader.DoneEvent, downloader.FailedEvent:
  14. shouldStart := atomic.LoadInt32(&self.shouldStart) == 1
  15. atomic.StoreInt32(&self.canStart, 1)
  16. atomic.StoreInt32(&self.shouldStart, 0)
  17. if shouldStart {
  18. self.Start(self.coinbase)
  19. }
  20. // unsubscribe. we're only interested in this event once
  21. events.Unsubscribe()
  22. // stop immediately and ignore all further pending events
  23. break out
  24. }
  25. }
  26. }

可以看到接收到这个StartEvent就会通知所有的代理,调用stop停止当前相同块的挖矿,remote_Agent中的stop方法

最后再看一下新块如何广播给其他节点的,处理的方法在/eth/handle.go/BroadcastBlock

  1. func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) {
  2. hash := block.Hash()
  3. peers := pm.peers.PeersWithoutBlock(hash)
  4. // If propagation is requested, send to a subset of the peer
  5. if propagate {
  6. // Calculate the TD of the block (it's not imported yet, so block.Td is not valid)
  7. var td *big.Int
  8. if parent := pm.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1); parent != nil {
  9. td = new(big.Int).Add(block.Difficulty(), pm.blockchain.GetTd(block.ParentHash(), block.NumberU64()-1))
  10. } else {
  11. log.Error("Propagating dangling block", "number", block.Number(), "hash", hash)
  12. return
  13. }
  14. // Send the block to a subset of our peers
  15. transfer := peers[:int(math.Sqrt(float64(len(peers))))]
  16. for _, peer := range transfer {
  17. peer.SendNewBlock(block, td)
  18. }
  19. log.Trace("Propagated block", "hash", hash, "recipients", len(transfer), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))
  20. return
  21. }
  22. // Otherwise if the block is indeed in out own chain, announce it
  23. if pm.blockchain.HasBlock(hash, block.NumberU64()) {
  24. for _, peer := range peers {
  25. peer.SendNewBlockHashes([]common.Hash{hash}, []uint64{block.NumberU64()})
  26. }
  27. log.Trace("Announced block", "hash", hash, "recipients", len(peers), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))
  28. }
  29. }

可以看到该方法中循环每个连接的peer节点,调用peer.SendNewBlock发送产块消息过去

  1. func (p *peer) SendNewBlock(block *types.Block, td *big.Int) error {
  2. p.knownBlocks.Add(block.Hash())
  3. return p2p.Send(p.rw, NewBlockMsg, []interface{}{block, td})
  4. }
  1. func Send(w MsgWriter, msgcode uint64, data interface{}) error {
  2. size, r, err := rlp.EncodeToReader(data)
  3. if err != nil {
  4. return err
  5. }
  6. return w.WriteMsg(Msg{Code: msgcode, Size: uint32(size), Payload: r})
  7. }

可以看到通过writeMsg写入该节点里,该方法的实现是rw *netWrapper) WriteMsg(msg Msg)

  1. func (rw *netWrapper) WriteMsg(msg Msg) error {
  2. rw.wmu.Lock()
  3. defer rw.wmu.Unlock()
  4. rw.conn.SetWriteDeadline(time.Now().Add(rw.wtimeout))
  5. return rw.wrapped.WriteMsg(msg)
  6. }

该方法设置了一个超时时间,底层调用了net.go的Write(b []byte) (n int, err error),通过网络写给对应的节点了,然后接收端的方法为ReadMsg

  1. func (pm *ProtocolManager) handleMsg(p *peer) error {
  2. // Read the next message from the remote peer, and ensure it's fully consumed
  3. msg, err := p.rw.ReadMsg()
  4. if err != nil {
  5. return err
  6. }
  7. if msg.Size > ProtocolMaxMsgSize {
  8. return errResp(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize)
  9. }
  10. defer msg.Discard()

可以看到在这边读取网络写入来的消息,然后根据不同的msgCode作不同的处理,由于handleMsg是在一个死循环中调用的,所以就能一直接收到节点广播过来的消息

  1. //eth/handler.go
  2. func (pm *ProtocolManager) handle(p *peer) error {
  3. td, head, genesis := pm.blockchain.Status()
  4. p.Handshake(pm.networkId, td, head, genesis)
  5. if rw, ok := p.rw.(*meteredMsgReadWriter); ok {
  6. rm.Init(p.version)
  7. }
  8. pm.peers.Register(p)
  9. defer pm.removePeer(p.id)
  10. pm.downloader.RegisterPeer(p.id, p.version, p)
  11. pm.syncTransactions(p)
  12. ...
  13. for {
  14. if err := pm.handleMsg(p); err != nil {
  15. return err
  16. }
  17. }
  18. }

handle()函数针对一个新peer做了如下几件事: 握手,与对方peer沟通己方的区块链状态 初始化一个读写通道,用以跟对方peer相互数据传输。 注册对方peer,存入己方peer列表;只有handle()函数退出时,才会将这个peer移除出列表。 Downloader成员注册这个新peer;Downloader会自己维护一个相邻peer列表。 调用syncTransactions(),用当前txpool中新累计的tx对象组装成一个txsync{}对象,推送到内部通道txsyncCh。还记得Start()启动的四个函数么? 其中第四项txsyncLoop()中用以等待txsync{}数据的通道txsyncCh,正是在这里被推入txsync{}的。 在无限循环中启动handleMsg(),当对方peer发出任何msg时,handleMsg()可以捕捉相应类型的消息并在己方进行处理。

参考资料