statesync 用来获取pivot point所指定的区块的所有的state 的trie树,也就是所有的账号的信息,包括普通账号和合约账户。

数据结构

stateSync调度下载由给定state root所定义的特定state trie的请求。

  1. // stateSync schedules requests for downloading a particular state trie defined
  2. // by a given state root.
  3. type stateSync struct {
  4. d *Downloader // Downloader instance to access and manage current peerset
  5. sched *trie.TrieSync // State trie sync scheduler defining the tasks
  6. keccak hash.Hash // Keccak256 hasher to verify deliveries with
  7. tasks map[common.Hash]*stateTask // Set of tasks currently queued for retrieval
  8. numUncommitted int
  9. bytesUncommitted int
  10. deliver chan *stateReq // Delivery channel multiplexing peer responses
  11. cancel chan struct{} // Channel to signal a termination request
  12. cancelOnce sync.Once // Ensures cancel only ever gets called once
  13. done chan struct{} // Channel to signal termination completion
  14. err error // Any error hit during sync (set before completion)
  15. }

构造函数

  1. func newStateSync(d *Downloader, root common.Hash) *stateSync {
  2. return &stateSync{
  3. d: d,
  4. sched: state.NewStateSync(root, d.stateDB),
  5. keccak: sha3.NewKeccak256(),
  6. tasks: make(map[common.Hash]*stateTask),
  7. deliver: make(chan *stateReq),
  8. cancel: make(chan struct{}),
  9. done: make(chan struct{}),
  10. }
  11. }

NewStateSync

  1. // NewStateSync create a new state trie download scheduler.
  2. func NewStateSync(root common.Hash, database trie.DatabaseReader) *trie.TrieSync {
  3. var syncer *trie.TrieSync
  4. callback := func(leaf []byte, parent common.Hash) error {
  5. var obj Account
  6. if err := rlp.Decode(bytes.NewReader(leaf), &obj); err != nil {
  7. return err
  8. }
  9. syncer.AddSubTrie(obj.Root, 64, parent, nil)
  10. syncer.AddRawEntry(common.BytesToHash(obj.CodeHash), 64, parent)
  11. return nil
  12. }
  13. syncer = trie.NewTrieSync(root, database, callback)
  14. return syncer
  15. }

syncState, 这个函数是downloader调用的。

  1. // syncState starts downloading state with the given root hash.
  2. func (d *Downloader) syncState(root common.Hash) *stateSync {
  3. s := newStateSync(d, root)
  4. select {
  5. case d.stateSyncStart <- s:
  6. case <-d.quitCh:
  7. s.err = errCancelStateFetch
  8. close(s.done)
  9. }
  10. return s
  11. }

启动

在downloader中启动了一个新的goroutine 来运行stateFetcher函数。 这个函数首先试图往stateSyncStart通道来以获取信息。 而syncState这个函数会给stateSyncStart通道发送数据。

  1. // stateFetcher manages the active state sync and accepts requests
  2. // on its behalf.
  3. func (d *Downloader) stateFetcher() {
  4. for {
  5. select {
  6. case s := <-d.stateSyncStart:
  7. for next := s; next != nil; { // 这个for循环代表了downloader可以通过发送信号来随时改变需要同步的对象。
  8. next = d.runStateSync(next)
  9. }
  10. case <-d.stateCh:
  11. // Ignore state responses while no sync is running.
  12. case <-d.quitCh:
  13. return
  14. }
  15. }
  16. }

我们下面看看哪里会调用syncState()函数。processFastSyncContent这个函数会在最开始发现peer的时候启动。

  1. // processFastSyncContent takes fetch results from the queue and writes them to the
  2. // database. It also controls the synchronisation of state nodes of the pivot block.
  3. func (d *Downloader) processFastSyncContent(latest *types.Header) error {
  4. // Start syncing state of the reported head block.
  5. // This should get us most of the state of the pivot block.
  6. stateSync := d.syncState(latest.Root)

runStateSync,这个方法从stateCh获取已经下载好的状态,然后把他投递到deliver通道上等待别人处理。

  1. // runStateSync runs a state synchronisation until it completes or another root
  2. // hash is requested to be switched over to.
  3. func (d *Downloader) runStateSync(s *stateSync) *stateSync {
  4. var (
  5. active = make(map[string]*stateReq) // Currently in-flight requests
  6. finished []*stateReq // Completed or failed requests
  7. timeout = make(chan *stateReq) // Timed out active requests
  8. )
  9. defer func() {
  10. // Cancel active request timers on exit. Also set peers to idle so they're
  11. // available for the next sync.
  12. for _, req := range active {
  13. req.timer.Stop()
  14. req.peer.SetNodeDataIdle(len(req.items))
  15. }
  16. }()
  17. // Run the state sync.
  18. // 运行状态同步
  19. go s.run()
  20. defer s.Cancel()
  21. // Listen for peer departure events to cancel assigned tasks
  22. peerDrop := make(chan *peerConnection, 1024)
  23. peerSub := s.d.peers.SubscribePeerDrops(peerDrop)
  24. defer peerSub.Unsubscribe()
  25. for {
  26. // Enable sending of the first buffered element if there is one.
  27. var (
  28. deliverReq *stateReq
  29. deliverReqCh chan *stateReq
  30. )
  31. if len(finished) > 0 {
  32. deliverReq = finished[0]
  33. deliverReqCh = s.deliver
  34. }
  35. select {
  36. // The stateSync lifecycle:
  37. // 另外一个stateSync申请运行。 我们退出。
  38. case next := <-d.stateSyncStart:
  39. return next
  40. case <-s.done:
  41. return nil
  42. // Send the next finished request to the current sync:
  43. // 发送已经下载好的数据给sync
  44. case deliverReqCh <- deliverReq:
  45. finished = append(finished[:0], finished[1:]...)
  46. // Handle incoming state packs:
  47. // 处理进入的数据包。 downloader接收到state的数据会发送到这个通道上面。
  48. case pack := <-d.stateCh:
  49. // Discard any data not requested (or previsouly timed out)
  50. req := active[pack.PeerId()]
  51. if req == nil {
  52. log.Debug("Unrequested node data", "peer", pack.PeerId(), "len", pack.Items())
  53. continue
  54. }
  55. // Finalize the request and queue up for processing
  56. req.timer.Stop()
  57. req.response = pack.(*statePack).states
  58. finished = append(finished, req)
  59. delete(active, pack.PeerId())
  60. // Handle dropped peer connections:
  61. case p := <-peerDrop:
  62. // Skip if no request is currently pending
  63. req := active[p.id]
  64. if req == nil {
  65. continue
  66. }
  67. // Finalize the request and queue up for processing
  68. req.timer.Stop()
  69. req.dropped = true
  70. finished = append(finished, req)
  71. delete(active, p.id)
  72. // Handle timed-out requests:
  73. case req := <-timeout:
  74. // If the peer is already requesting something else, ignore the stale timeout.
  75. // This can happen when the timeout and the delivery happens simultaneously,
  76. // causing both pathways to trigger.
  77. if active[req.peer.id] != req {
  78. continue
  79. }
  80. // Move the timed out data back into the download queue
  81. finished = append(finished, req)
  82. delete(active, req.peer.id)
  83. // Track outgoing state requests:
  84. case req := <-d.trackStateReq:
  85. // If an active request already exists for this peer, we have a problem. In
  86. // theory the trie node schedule must never assign two requests to the same
  87. // peer. In practive however, a peer might receive a request, disconnect and
  88. // immediately reconnect before the previous times out. In this case the first
  89. // request is never honored, alas we must not silently overwrite it, as that
  90. // causes valid requests to go missing and sync to get stuck.
  91. if old := active[req.peer.id]; old != nil {
  92. log.Warn("Busy peer assigned new state fetch", "peer", old.peer.id)
  93. // Make sure the previous one doesn't get siletly lost
  94. old.timer.Stop()
  95. old.dropped = true
  96. finished = append(finished, old)
  97. }
  98. // Start a timer to notify the sync loop if the peer stalled.
  99. req.timer = time.AfterFunc(req.timeout, func() {
  100. select {
  101. case timeout <- req:
  102. case <-s.done:
  103. // Prevent leaking of timer goroutines in the unlikely case where a
  104. // timer is fired just before exiting runStateSync.
  105. }
  106. })
  107. active[req.peer.id] = req
  108. }
  109. }
  110. }

run和loop方法,获取任务,分配任务,获取结果。

  1. func (s *stateSync) run() {
  2. s.err = s.loop()
  3. close(s.done)
  4. }
  5. // loop is the main event loop of a state trie sync. It it responsible for the
  6. // assignment of new tasks to peers (including sending it to them) as well as
  7. // for the processing of inbound data. Note, that the loop does not directly
  8. // receive data from peers, rather those are buffered up in the downloader and
  9. // pushed here async. The reason is to decouple processing from data receipt
  10. // and timeouts.
  11. func (s *stateSync) loop() error {
  12. // Listen for new peer events to assign tasks to them
  13. newPeer := make(chan *peerConnection, 1024)
  14. peerSub := s.d.peers.SubscribeNewPeers(newPeer)
  15. defer peerSub.Unsubscribe()
  16. // Keep assigning new tasks until the sync completes or aborts
  17. // 一直等到 sync完成或者被被终止
  18. for s.sched.Pending() > 0 {
  19. // 把数据从缓存里面刷新到持久化存储里面。 这也就是命令行 --cache指定的大小。
  20. if err := s.commit(false); err != nil {
  21. return err
  22. }
  23. // 指派任务,
  24. s.assignTasks()
  25. // Tasks assigned, wait for something to happen
  26. select {
  27. case <-newPeer:
  28. // New peer arrived, try to assign it download tasks
  29. case <-s.cancel:
  30. return errCancelStateFetch
  31. case req := <-s.deliver:
  32. // 接收到runStateSync方法投递过来的返回信息,注意 返回信息里面包含了成功请求的也包含了未成功请求的。
  33. // Response, disconnect or timeout triggered, drop the peer if stalling
  34. log.Trace("Received node data response", "peer", req.peer.id, "count", len(req.response), "dropped", req.dropped, "timeout", !req.dropped && req.timedOut())
  35. if len(req.items) <= 2 && !req.dropped && req.timedOut() {
  36. // 2 items are the minimum requested, if even that times out, we've no use of
  37. // this peer at the moment.
  38. log.Warn("Stalling state sync, dropping peer", "peer", req.peer.id)
  39. s.d.dropPeer(req.peer.id)
  40. }
  41. // Process all the received blobs and check for stale delivery
  42. stale, err := s.process(req)
  43. if err != nil {
  44. log.Warn("Node data write error", "err", err)
  45. return err
  46. }
  47. // The the delivery contains requested data, mark the node idle (otherwise it's a timed out delivery)
  48. if !stale {
  49. req.peer.SetNodeDataIdle(len(req.response))
  50. }
  51. }
  52. }
  53. return s.commit(true)
  54. }