eth共识算法分析,从本地节点挖到块开始分析
首先目前生产环境上面,肯定不是以CPU的形式挖矿的,那么就是remoteAgent这种形式,也就是矿机通过网络请求从以太的节点获取当前节点的出块任务,
然后矿机根据算出符合该块难度hash值,提交给节点,也就是对应的以下方法.
func (a *RemoteAgent) SubmitWork(nonce types.BlockNonce, mixDigest, hash common.Hash) bool {a.mu.Lock()defer a.mu.Unlock()// Make sure the work submitted is presentwork := a.work[hash]if work == nil {log.Info("Work submitted but none pending", "hash", hash)return false}// Make sure the Engine solutions is indeed validresult := work.Block.Header()result.Nonce = nonceresult.MixDigest = mixDigestif err := a.engine.VerifySeal(a.chain, result); err != nil {log.Warn("Invalid proof-of-work submitted", "hash", hash, "err", err)return false}block := work.Block.WithSeal(result)// Solutions seems to be valid, return to the miner and notify acceptancea.returnCh <- &Result{work, block}delete(a.work, hash)return true}
该方法会校验提交过来的块的hash难度,如果是正常的话,则会将该生成的块写到管道中,管道接收的方法在/miner/worker.go/Wait方法中
func (self *worker) wait() {for {mustCommitNewWork := truefor result := range self.recv {atomic.AddInt32(&self.atWork, -1)if result == nil {continue}block := result.Blockwork := result.Work// Update the block hash in all logs since it is now available and not when the// receipt/log of individual transactions were created.for _, r := range work.receipts {for _, l := range r.Logs {l.BlockHash = block.Hash()}}for _, log := range work.state.Logs() {log.BlockHash = block.Hash()}stat, err := self.chain.WriteBlockAndState(block, work.receipts, work.state)if err != nil {log.Error("Failed writing block to chain", "err", err)continue}// check if canon block and write transactionsif stat == core.CanonStatTy {// implicit by posting ChainHeadEventmustCommitNewWork = false}// Broadcast the block and announce chain insertion event// 通过p2p的形式将块广播到连接的节点,走的还是channelself.mux.Post(core.NewMinedBlockEvent{Block: block})var (events []interface{}logs = work.state.Logs())events = append(events, core.ChainEvent{Block: block, Hash: block.Hash(), Logs: logs})if stat == core.CanonStatTy {events = append(events, core.ChainHeadEvent{Block: block})}self.chain.PostChainEvents(events, logs)// Insert the block into the set of pending ones to wait for confirmationsself.unconfirmed.Insert(block.NumberU64(), block.Hash())if mustCommitNewWork {self.commitNewWork()}}}}
这里发送了一个新挖到块的事件,接着跟,调用栈是
/geth/main.go/geth --> startNode --> utils.StartNode(stack)--> stack.Start() --> /node/node.go/Start() --> service.Start(running)--> /eth/backend.go/Start() --> /eth/handler.go/Start()
好了核心逻辑在handler.go/Start()里面
func (pm *ProtocolManager) Start(maxPeers int) {pm.maxPeers = maxPeers// broadcast transactions// 广播交易的通道。 txCh会作为txpool的TxPreEvent订阅通道。txpool有了这种消息会通知给这个txCh。 广播交易的goroutine会把这个消息广播出去。pm.txCh = make(chan core.TxPreEvent, txChanSize)// 订阅的回执pm.txSub = pm.txpool.SubscribeTxPreEvent(pm.txCh)go pm.txBroadcastLoop()// 订阅挖矿消息。当新的Block被挖出来的时候会产生消息。 这个订阅和上面的那个订阅采用了两种不同的模式,这种是标记为Deprecated的订阅方式。// broadcast mined blockspm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})// 挖矿广播 goroutine 当挖出来的时候需要尽快的广播到网络上面去 本地挖出的块通过这种形式广播出去go pm.minedBroadcastLoop()// 同步器负责周期性地与网络同步,下载散列和块以及处理通知处理程序。// start sync handlersgo pm.syncer()// txsyncLoop负责每个新连接的初始事务同步。 当新的peer出现时,我们转发所有当前待处理的事务。 为了最小化出口带宽使用,我们一次只发送一个小包。go pm.txsyncLoop()}
pm.minedBroadcastLoop()里面就有管道接收到上面post出来的出块消息,跟进去将会看到通过p2p网络发送给节点的逻辑
// BroadcastBlock will either propagate a block to a subset of it's peers, or// will only announce it's availability (depending what's requested).func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) {hash := block.Hash()peers := pm.peers.PeersWithoutBlock(hash)// If propagation is requested, send to a subset of the peerif propagate {// Calculate the TD of the block (it's not imported yet, so block.Td is not valid)var td *big.Intif parent := pm.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1); parent != nil {td = new(big.Int).Add(block.Difficulty(), pm.blockchain.GetTd(block.ParentHash(), block.NumberU64()-1))} else {log.Error("Propagating dangling block", "number", block.Number(), "hash", hash)return}// Send the block to a subset of our peerstransfer := peers[:int(math.Sqrt(float64(len(peers))))]for _, peer := range transfer {peer.SendNewBlock(block, td)}log.Trace("Propagated block", "hash", hash, "recipients", len(transfer), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))return}// Otherwise if the block is indeed in out own chain, announce itif pm.blockchain.HasBlock(hash, block.NumberU64()) {for _, peer := range peers {peer.SendNewBlockHashes([]common.Hash{hash}, []uint64{block.NumberU64()})}log.Trace("Announced block", "hash", hash, "recipients", len(peers), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))}}
这里面会发送两种时间,一种是NewBlockMsg,另外一种是NewBlockHashesMsg,好了到此本地节点挖到的块就通过p2p网络的形式开始扩散出去了
接着看下一个重要的方法
// NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable// with the ethereum network.func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, networkId uint64, mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database) (*ProtocolManager, error) {// Create the protocol manager with the base fieldsmanager := &ProtocolManager{networkId: networkId,eventMux: mux,txpool: txpool,blockchain: blockchain,chaindb: chaindb,chainconfig: config,peers: newPeerSet(),newPeerCh: make(chan *peer),noMorePeers: make(chan struct{}),txsyncCh: make(chan *txsync),quitSync: make(chan struct{}),}// Figure out whether to allow fast sync or notif mode == downloader.FastSync && blockchain.CurrentBlock().NumberU64() > 0 {log.Warn("Blockchain not empty, fast sync disabled")mode = downloader.FullSync}if mode == downloader.FastSync {manager.fastSync = uint32(1)}// Initiate a sub-protocol for every implemented version we can handlemanager.SubProtocols = make([]p2p.Protocol, 0, len(ProtocolVersions))for i, version := range ProtocolVersions {// Skip protocol version if incompatible with the mode of operationif mode == downloader.FastSync && version < eth63 {continue}// Compatible; initialise the sub-protocolversion := version // Closure for the runmanager.SubProtocols = append(manager.SubProtocols, p2p.Protocol{Name: ProtocolName,Version: version,Length: ProtocolLengths[i],Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {peer := manager.newPeer(int(version), p, rw)select {case manager.newPeerCh <- peer:manager.wg.Add(1)defer manager.wg.Done()return manager.handle(peer)case <-manager.quitSync:return p2p.DiscQuitting}},NodeInfo: func() interface{} {return manager.NodeInfo()},PeerInfo: func(id discover.NodeID) interface{} {if p := manager.peers.Peer(fmt.Sprintf("%x", id[:8])); p != nil {return p.Info()}return nil},})}if len(manager.SubProtocols) == 0 {return nil, errIncompatibleConfig}// downloader是负责从其他的peer来同步自身数据。// downloader是全链同步工具// Construct the different synchronisation mechanismsmanager.downloader = downloader.New(mode, chaindb, manager.eventMux, blockchain, nil, manager.removePeer)validator := func(header *types.Header) error {return engine.VerifyHeader(blockchain, header, true)}heighter := func() uint64 {return blockchain.CurrentBlock().NumberU64()}inserter := func(blocks types.Blocks) (int, error) {// If fast sync is running, deny importing weird blocksif atomic.LoadUint32(&manager.fastSync) == 1 {log.Warn("Discarded bad propagated block", "number", blocks[0].Number(), "hash", blocks[0].Hash())return 0, nil}// 设置开始接收交易atomic.StoreUint32(&manager.acceptTxs, 1) // Mark initial sync done on any fetcher importreturn manager.blockchain.InsertChain(blocks)}// 生成一个fetcher// Fetcher负责积累来自各个peer的区块通知并安排进行检索。manager.fetcher = fetcher.New(blockchain.GetBlockByHash, validator, manager.BroadcastBlock, heighter, inserter, manager.removePeer)return manager, nil}
该方法是用来管理以太坊协议下的多个子协议,其中的Run方法在每个节点启动的时候就会调用,可以看到是阻塞的,跟进handler方法能看到这样的一块关键代码
for {if err := pm.handleMsg(p); err != nil {p.Log().Debug("Ethereum message handling failed", "err", err)return err}}
死循环,处理p2p网络过来的消息,接着看handleMsg方法
func (pm *ProtocolManager) handleMsg(p *peer) error {// Read the next message from the remote peer, and ensure it's fully consumedmsg, err := p.rw.ReadMsg()if err != nil {return err}if msg.Size > ProtocolMaxMsgSize {return errResp(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize)}defer msg.Discard()// Handle the message depending on its contentsswitch {case msg.Code == StatusMsg:// Status messages should never arrive after the handshakereturn errResp(ErrExtraStatusMsg, "uncontrolled status message")// Block header query, collect the requested headers and replycase msg.Code == GetBlockHeadersMsg:// Decode the complex header queryvar query getBlockHeadersDataif err := msg.Decode(&query); err != nil {return errResp(ErrDecode, "%v: %v", msg, err)}hashMode := query.Origin.Hash != (common.Hash{})// Gather headers until the fetch or network limits is reachedvar (bytes common.StorageSizeheaders []*types.Headerunknown bool)for !unknown && len(headers) < int(query.Amount) && bytes < softResponseLimit && len(headers) < downloader.MaxHeaderFetch {// Retrieve the next header satisfying the queryvar origin *types.Headerif hashMode {origin = pm.blockchain.GetHeaderByHash(query.Origin.Hash)} else {origin = pm.blockchain.GetHeaderByNumber(query.Origin.Number)}if origin == nil {break}number := origin.Number.Uint64()headers = append(headers, origin)bytes += estHeaderRlpSize// Advance to the next header of the queryswitch {case query.Origin.Hash != (common.Hash{}) && query.Reverse:// Hash based traversal towards the genesis blockfor i := 0; i < int(query.Skip)+1; i++ {if header := pm.blockchain.GetHeader(query.Origin.Hash, number); header != nil {query.Origin.Hash = header.ParentHashnumber--} else {unknown = truebreak}}case query.Origin.Hash != (common.Hash{}) && !query.Reverse:// Hash based traversal towards the leaf blockvar (current = origin.Number.Uint64()next = current + query.Skip + 1)if next <= current {infos, _ := json.MarshalIndent(p.Peer.Info(), "", " ")p.Log().Warn("GetBlockHeaders skip overflow attack", "current", current, "skip", query.Skip, "next", next, "attacker", infos)unknown = true} else {if header := pm.blockchain.GetHeaderByNumber(next); header != nil {if pm.blockchain.GetBlockHashesFromHash(header.Hash(), query.Skip+1)[query.Skip] == query.Origin.Hash {query.Origin.Hash = header.Hash()} else {unknown = true}} else {unknown = true}}case query.Reverse:// Number based traversal towards the genesis blockif query.Origin.Number >= query.Skip+1 {query.Origin.Number -= (query.Skip + 1)} else {unknown = true}case !query.Reverse:// Number based traversal towards the leaf blockquery.Origin.Number += (query.Skip + 1)}}return p.SendBlockHeaders(headers)case msg.Code == BlockHeadersMsg:// A batch of headers arrived to one of our previous requestsvar headers []*types.Headerif err := msg.Decode(&headers); err != nil {return errResp(ErrDecode, "msg %v: %v", msg, err)}// If no headers were received, but we're expending a DAO fork check, maybe it's thatif len(headers) == 0 && p.forkDrop != nil {// Possibly an empty reply to the fork header checks, sanity check TDsverifyDAO := true// If we already have a DAO header, we can check the peer's TD against it. If// the peer's ahead of this, it too must have a reply to the DAO checkif daoHeader := pm.blockchain.GetHeaderByNumber(pm.chainconfig.DAOForkBlock.Uint64()); daoHeader != nil {if _, td := p.Head(); td.Cmp(pm.blockchain.GetTd(daoHeader.Hash(), daoHeader.Number.Uint64())) >= 0 {verifyDAO = false}}// If we're seemingly on the same chain, disable the drop timerif verifyDAO {p.Log().Debug("Seems to be on the same side of the DAO fork")p.forkDrop.Stop()p.forkDrop = nilreturn nil}}// Filter out any explicitly requested headers, deliver the rest to the downloaderfilter := len(headers) == 1if filter {// If it's a potential DAO fork check, validate against the rulesif p.forkDrop != nil && pm.chainconfig.DAOForkBlock.Cmp(headers[0].Number) == 0 {// Disable the fork drop timerp.forkDrop.Stop()p.forkDrop = nil// Validate the header and either drop the peer or continueif err := misc.VerifyDAOHeaderExtraData(pm.chainconfig, headers[0]); err != nil {p.Log().Debug("Verified to be on the other side of the DAO fork, dropping")return err}p.Log().Debug("Verified to be on the same side of the DAO fork")return nil}// Irrelevant of the fork checks, send the header to the fetcher just in caseheaders = pm.fetcher.FilterHeaders(p.id, headers, time.Now())}if len(headers) > 0 || !filter {err := pm.downloader.DeliverHeaders(p.id, headers)if err != nil {log.Debug("Failed to deliver headers", "err", err)}}case msg.Code == GetBlockBodiesMsg:// Decode the retrieval messagemsgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))if _, err := msgStream.List(); err != nil {return err}// Gather blocks until the fetch or network limits is reachedvar (hash common.Hashbytes intbodies []rlp.RawValue)for bytes < softResponseLimit && len(bodies) < downloader.MaxBlockFetch {// Retrieve the hash of the next blockif err := msgStream.Decode(&hash); err == rlp.EOL {break} else if err != nil {return errResp(ErrDecode, "msg %v: %v", msg, err)}// Retrieve the requested block body, stopping if enough was foundif data := pm.blockchain.GetBodyRLP(hash); len(data) != 0 {bodies = append(bodies, data)bytes += len(data)}}return p.SendBlockBodiesRLP(bodies)case msg.Code == BlockBodiesMsg:// A batch of block bodies arrived to one of our previous requestsvar request blockBodiesDataif err := msg.Decode(&request); err != nil {return errResp(ErrDecode, "msg %v: %v", msg, err)}// Deliver them all to the downloader for queuingtrasactions := make([][]*types.Transaction, len(request))uncles := make([][]*types.Header, len(request))for i, body := range request {trasactions[i] = body.Transactionsuncles[i] = body.Uncles}// Filter out any explicitly requested bodies, deliver the rest to the downloaderfilter := len(trasactions) > 0 || len(uncles) > 0if filter {trasactions, uncles = pm.fetcher.FilterBodies(p.id, trasactions, uncles, time.Now())}if len(trasactions) > 0 || len(uncles) > 0 || !filter {err := pm.downloader.DeliverBodies(p.id, trasactions, uncles)if err != nil {log.Debug("Failed to deliver bodies", "err", err)}}case p.version >= eth63 && msg.Code == GetNodeDataMsg:// Decode the retrieval messagemsgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))if _, err := msgStream.List(); err != nil {return err}// Gather state data until the fetch or network limits is reachedvar (hash common.Hashbytes intdata [][]byte)for bytes < softResponseLimit && len(data) < downloader.MaxStateFetch {// Retrieve the hash of the next state entryif err := msgStream.Decode(&hash); err == rlp.EOL {break} else if err != nil {return errResp(ErrDecode, "msg %v: %v", msg, err)}// Retrieve the requested state entry, stopping if enough was foundif entry, err := pm.chaindb.Get(hash.Bytes()); err == nil {data = append(data, entry)bytes += len(entry)}}return p.SendNodeData(data)case p.version >= eth63 && msg.Code == NodeDataMsg:// A batch of node state data arrived to one of our previous requestsvar data [][]byteif err := msg.Decode(&data); err != nil {return errResp(ErrDecode, "msg %v: %v", msg, err)}// Deliver all to the downloaderif err := pm.downloader.DeliverNodeData(p.id, data); err != nil {log.Debug("Failed to deliver node state data", "err", err)}case p.version >= eth63 && msg.Code == GetReceiptsMsg:// Decode the retrieval messagemsgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))if _, err := msgStream.List(); err != nil {return err}// Gather state data until the fetch or network limits is reachedvar (hash common.Hashbytes intreceipts []rlp.RawValue)for bytes < softResponseLimit && len(receipts) < downloader.MaxReceiptFetch {// Retrieve the hash of the next blockif err := msgStream.Decode(&hash); err == rlp.EOL {break} else if err != nil {return errResp(ErrDecode, "msg %v: %v", msg, err)}// Retrieve the requested block's receipts, skipping if unknown to usresults := core.GetBlockReceipts(pm.chaindb, hash, core.GetBlockNumber(pm.chaindb, hash))if results == nil {if header := pm.blockchain.GetHeaderByHash(hash); header == nil || header.ReceiptHash != types.EmptyRootHash {continue}}// If known, encode and queue for response packetif encoded, err := rlp.EncodeToBytes(results); err != nil {log.Error("Failed to encode receipt", "err", err)} else {receipts = append(receipts, encoded)bytes += len(encoded)}}return p.SendReceiptsRLP(receipts)case p.version >= eth63 && msg.Code == ReceiptsMsg:// A batch of receipts arrived to one of our previous requestsvar receipts [][]*types.Receiptif err := msg.Decode(&receipts); err != nil {return errResp(ErrDecode, "msg %v: %v", msg, err)}// Deliver all to the downloaderif err := pm.downloader.DeliverReceipts(p.id, receipts); err != nil {log.Debug("Failed to deliver receipts", "err", err)}case msg.Code == NewBlockHashesMsg:var announces newBlockHashesDataif err := msg.Decode(&announces); err != nil {return errResp(ErrDecode, "%v: %v", msg, err)}// Mark the hashes as present at the remote nodefor _, block := range announces {p.MarkBlock(block.Hash)}// Schedule all the unknown hashes for retrievalunknown := make(newBlockHashesData, 0, len(announces))for _, block := range announces {if !pm.blockchain.HasBlock(block.Hash, block.Number) {unknown = append(unknown, block)}}for _, block := range unknown {pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestOneHeader, p.RequestBodies)}case msg.Code == NewBlockMsg:// Retrieve and decode the propagated blockvar request newBlockDataif err := msg.Decode(&request); err != nil {return errResp(ErrDecode, "%v: %v", msg, err)}request.Block.ReceivedAt = msg.ReceivedAtrequest.Block.ReceivedFrom = p// Mark the peer as owning the block and schedule it for importp.MarkBlock(request.Block.Hash())pm.fetcher.Enqueue(p.id, request.Block)// Assuming the block is importable by the peer, but possibly not yet done so,// calculate the head hash and TD that the peer truly must have.var (trueHead = request.Block.ParentHash()trueTD = new(big.Int).Sub(request.TD, request.Block.Difficulty()))// Update the peers total difficulty if better than the previousif _, td := p.Head(); trueTD.Cmp(td) > 0 {p.SetHead(trueHead, trueTD)// Schedule a sync if above ours. Note, this will not fire a sync for a gap of// a singe block (as the true TD is below the propagated block), however this// scenario should easily be covered by the fetcher.currentBlock := pm.blockchain.CurrentBlock()if trueTD.Cmp(pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64())) > 0 {go pm.synchronise(p)}}case msg.Code == TxMsg:// Transactions arrived, make sure we have a valid and fresh chain to handle themif atomic.LoadUint32(&pm.acceptTxs) == 0 {break}// Transactions can be processed, parse all of them and deliver to the poolvar txs []*types.Transactionif err := msg.Decode(&txs); err != nil {return errResp(ErrDecode, "msg %v: %v", msg, err)}for i, tx := range txs {// Validate and mark the remote transactionif tx == nil {return errResp(ErrDecode, "transaction %d is nil", i)}p.MarkTransaction(tx.Hash())}pm.txpool.AddRemotes(txs)default:return errResp(ErrInvalidMsgCode, "%v", msg.Code)}return nil}
该方法中就解码了p2p网络过来的消息,并且处理了NewBlockMsg和NewBlockHashesMsg这两种事件,如NewBlockMsg中的处理逻辑是直接通过管道发送到本地了,pm.fetcher.Enqueue(p.id, request.Block)
,对应的管道名是:f.inject,其中是一个队列,/fetcher.go/enqueue方法中写入了一个FIFO队列中
func (f *Fetcher) enqueue(peer string, block *types.Block) {hash := block.Hash()// Ensure the peer isn't DOSing uscount := f.queues[peer] + 1if count > blockLimit {log.Debug("Discarded propagated block, exceeded allowance", "peer", peer, "number", block.Number(), "hash", hash, "limit", blockLimit)propBroadcastDOSMeter.Mark(1)f.forgetHash(hash)return}// Discard any past or too distant blocksif dist := int64(block.NumberU64()) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist {log.Debug("Discarded propagated block, too far away", "peer", peer, "number", block.Number(), "hash", hash, "distance", dist)propBroadcastDropMeter.Mark(1)f.forgetHash(hash)return}// Schedule the block for future importingif _, ok := f.queued[hash]; !ok {op := &inject{origin: peer,block: block,}f.queues[peer] = countf.queued[hash] = opf.queue.Push(op, -float32(block.NumberU64()))if f.queueChangeHook != nil {f.queueChangeHook(op.block.Hash(), true)}log.Debug("Queued propagated block", "peer", peer, "number", block.Number(), "hash", hash, "queued", f.queue.Size())}}
该队列的消费端在/fetcher.go/loop中,是一个死循环,核心代码
for !f.queue.Empty() {op := f.queue.PopItem().(*inject)if f.queueChangeHook != nil {f.queueChangeHook(op.block.Hash(), false)}// If too high up the chain or phase, continue laternumber := op.block.NumberU64()if number > height+1 {f.queue.Push(op, -float32(op.block.NumberU64()))if f.queueChangeHook != nil {f.queueChangeHook(op.block.Hash(), true)}break}// Otherwise if fresh and still unknown, try and importhash := op.block.Hash()if number+maxUncleDist < height || f.getBlock(hash) != nil {f.forgetBlock(hash)continue}f.insert(op.origin, op.block)}
从队列中取出,接着看insert方法
func (f *Fetcher) insert(peer string, block *types.Block) {hash := block.Hash()// Run the import on a new threadlog.Debug("Importing propagated block", "peer", peer, "number", block.Number(), "hash", hash)go func() {defer func() { f.done <- hash }()// If the parent's unknown, abort insertionparent := f.getBlock(block.ParentHash())if parent == nil {log.Debug("Unknown parent of propagated block", "peer", peer, "number", block.Number(), "hash", hash, "parent", block.ParentHash())return}// Quickly validate the header and propagate the block if it passesswitch err := f.verifyHeader(block.Header()); err {case nil:// All ok, quickly propagate to our peerspropBroadcastOutTimer.UpdateSince(block.ReceivedAt)go f.broadcastBlock(block, true)case consensus.ErrFutureBlock:// Weird future block, don't fail, but neither propagatedefault:// Something went very wrong, drop the peerlog.Debug("Propagated block verification failed", "peer", peer, "number", block.Number(), "hash", hash, "err", err)f.dropPeer(peer)return}// Run the actual import and log any issuesif _, err := f.insertChain(types.Blocks{block}); err != nil {log.Debug("Propagated block import failed", "peer", peer, "number", block.Number(), "hash", hash, "err", err)return}// If import succeeded, broadcast the blockpropAnnounceOutTimer.UpdateSince(block.ReceivedAt)go f.broadcastBlock(block, false)// Invoke the testing hook if neededif f.importedHook != nil {f.importedHook(block)}}()}
可以看到,该方法会调用verifyHeader方法去校验区块,如果没问题的话就通过p2p的形式广播出去,然后调用insertChain方法插入到本地的leveldb中,插入没问题的话,会再广播一次,不过这次只会广播block的hash,
如此,通过一个对等网络,只要块合法,那么就会被全网采纳,其中的verifyHeader,insertChain方法都是在/handler.go/NewProtocolManager中定义传过来的,所有启动的逻辑都是handler.go/Start方法中.
fetch.go的start方法在syncer方法中用一个单独的协程触发的
/handler.go/handleMsg --> go pm.synchronise(p) --> pm.downloader.Synchronise(peer.id, pHead, pTd, mode) --> d.synchronise(id, head, td, mode)
--> d.syncWithPeer(p, hash, td),让我们看下核心方法
func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.Int) (err error) {d.mux.Post(StartEvent{})defer func() {// reset on errorif err != nil {d.mux.Post(FailedEvent{err})} else {d.mux.Post(DoneEvent{})}}()if p.version < 62 {return errTooOld}log.Debug("Synchronising with the network", "peer", p.id, "eth", p.version, "head", hash, "td", td, "mode", d.mode)defer func(start time.Time) {log.Debug("Synchronisation terminated", "elapsed", time.Since(start))}(time.Now())// Look up the sync boundaries: the common ancestor and the target blocklatest, err := d.fetchHeight(p)if err != nil {return err}height := latest.Number.Uint64()origin, err := d.findAncestor(p, height)if err != nil {return err}d.syncStatsLock.Lock()if d.syncStatsChainHeight <= origin || d.syncStatsChainOrigin > origin {d.syncStatsChainOrigin = origin}d.syncStatsChainHeight = heightd.syncStatsLock.Unlock()// Initiate the sync using a concurrent header and content retrieval algorithmpivot := uint64(0)switch d.mode {case LightSync:pivot = heightcase FastSync:// Calculate the new fast/slow sync pivot pointif d.fsPivotLock == nil {pivotOffset, err := rand.Int(rand.Reader, big.NewInt(int64(fsPivotInterval)))if err != nil {panic(fmt.Sprintf("Failed to access crypto random source: %v", err))}if height > uint64(fsMinFullBlocks)+pivotOffset.Uint64() {pivot = height - uint64(fsMinFullBlocks) - pivotOffset.Uint64()}} else {// Pivot point locked in, use this and do not pick a new one!pivot = d.fsPivotLock.Number.Uint64()}// If the point is below the origin, move origin back to ensure state downloadif pivot < origin {if pivot > 0 {origin = pivot - 1} else {origin = 0}}log.Debug("Fast syncing until pivot block", "pivot", pivot)}d.queue.Prepare(origin+1, d.mode, pivot, latest)if d.syncInitHook != nil {d.syncInitHook(origin, height)}fetchers := []func() error{func() error { return d.fetchHeaders(p, origin+1) }, // Headers are always retrievedfunc() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast syncfunc() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast syncfunc() error { return d.processHeaders(origin+1, td) },}if d.mode == FastSync {fetchers = append(fetchers, func() error { return d.processFastSyncContent(latest) })} else if d.mode == FullSync {fetchers = append(fetchers, d.processFullSyncContent)}err = d.spawnSync(fetchers)if err != nil && d.mode == FastSync && d.fsPivotLock != nil {// If sync failed in the critical section, bump the fail counter.atomic.AddUint32(&d.fsPivotFails, 1)}return err}
由于上述整个调用栈是在newBlockMsg的条件中触发的,这里的StartEvent会通过通道的形式传递到miner.go/update中
func (self *Miner) update() {events := self.mux.Subscribe(downloader.StartEvent{}, downloader.DoneEvent{}, downloader.FailedEvent{})out:for ev := range events.Chan() {switch ev.Data.(type) {case downloader.StartEvent:atomic.StoreInt32(&self.canStart, 0)if self.Mining() {self.Stop()atomic.StoreInt32(&self.shouldStart, 1)log.Info("Mining aborted due to sync")}case downloader.DoneEvent, downloader.FailedEvent:shouldStart := atomic.LoadInt32(&self.shouldStart) == 1atomic.StoreInt32(&self.canStart, 1)atomic.StoreInt32(&self.shouldStart, 0)if shouldStart {self.Start(self.coinbase)}// unsubscribe. we're only interested in this event onceevents.Unsubscribe()// stop immediately and ignore all further pending eventsbreak out}}}
可以看到接收到这个StartEvent就会通知所有的代理,调用stop停止当前相同块的挖矿,remote_Agent中的stop方法
最后再看一下新块如何广播给其他节点的,处理的方法在/eth/handle.go/BroadcastBlock
func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) {hash := block.Hash()peers := pm.peers.PeersWithoutBlock(hash)// If propagation is requested, send to a subset of the peerif propagate {// Calculate the TD of the block (it's not imported yet, so block.Td is not valid)var td *big.Intif parent := pm.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1); parent != nil {td = new(big.Int).Add(block.Difficulty(), pm.blockchain.GetTd(block.ParentHash(), block.NumberU64()-1))} else {log.Error("Propagating dangling block", "number", block.Number(), "hash", hash)return}// Send the block to a subset of our peerstransfer := peers[:int(math.Sqrt(float64(len(peers))))]for _, peer := range transfer {peer.SendNewBlock(block, td)}log.Trace("Propagated block", "hash", hash, "recipients", len(transfer), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))return}// Otherwise if the block is indeed in out own chain, announce itif pm.blockchain.HasBlock(hash, block.NumberU64()) {for _, peer := range peers {peer.SendNewBlockHashes([]common.Hash{hash}, []uint64{block.NumberU64()})}log.Trace("Announced block", "hash", hash, "recipients", len(peers), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))}}
可以看到该方法中循环每个连接的peer节点,调用peer.SendNewBlock发送产块消息过去
func (p *peer) SendNewBlock(block *types.Block, td *big.Int) error {p.knownBlocks.Add(block.Hash())return p2p.Send(p.rw, NewBlockMsg, []interface{}{block, td})}
func Send(w MsgWriter, msgcode uint64, data interface{}) error {size, r, err := rlp.EncodeToReader(data)if err != nil {return err}return w.WriteMsg(Msg{Code: msgcode, Size: uint32(size), Payload: r})}
可以看到通过writeMsg写入该节点里,该方法的实现是rw *netWrapper) WriteMsg(msg Msg)
func (rw *netWrapper) WriteMsg(msg Msg) error {rw.wmu.Lock()defer rw.wmu.Unlock()rw.conn.SetWriteDeadline(time.Now().Add(rw.wtimeout))return rw.wrapped.WriteMsg(msg)}
该方法设置了一个超时时间,底层调用了net.go的Write(b []byte) (n int, err error),通过网络写给对应的节点了,然后接收端的方法为ReadMsg
func (pm *ProtocolManager) handleMsg(p *peer) error {// Read the next message from the remote peer, and ensure it's fully consumedmsg, err := p.rw.ReadMsg()if err != nil {return err}if msg.Size > ProtocolMaxMsgSize {return errResp(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize)}defer msg.Discard()
可以看到在这边读取网络写入来的消息,然后根据不同的msgCode作不同的处理,由于handleMsg是在一个死循环中调用的,所以就能一直接收到节点广播过来的消息
//eth/handler.gofunc (pm *ProtocolManager) handle(p *peer) error {td, head, genesis := pm.blockchain.Status()p.Handshake(pm.networkId, td, head, genesis)if rw, ok := p.rw.(*meteredMsgReadWriter); ok {rm.Init(p.version)}pm.peers.Register(p)defer pm.removePeer(p.id)pm.downloader.RegisterPeer(p.id, p.version, p)pm.syncTransactions(p)...for {if err := pm.handleMsg(p); err != nil {return err}}}
handle()函数针对一个新peer做了如下几件事: 握手,与对方peer沟通己方的区块链状态 初始化一个读写通道,用以跟对方peer相互数据传输。 注册对方peer,存入己方peer列表;只有handle()函数退出时,才会将这个peer移除出列表。 Downloader成员注册这个新peer;Downloader会自己维护一个相邻peer列表。 调用syncTransactions(),用当前txpool中新累计的tx对象组装成一个txsync{}对象,推送到内部通道txsyncCh。还记得Start()启动的四个函数么? 其中第四项txsyncLoop()中用以等待txsync{}数据的通道txsyncCh,正是在这里被推入txsync{}的。 在无限循环中启动handleMsg(),当对方peer发出任何msg时,handleMsg()可以捕捉相应类型的消息并在己方进行处理。
