From 089257a3da86184c66f666ff8e119e1ca7850357 Mon Sep 17 00:00:00 2001 From: Sonic Date: Fri, 9 Nov 2018 14:47:57 +0800 Subject: dex: copy fetcher and downloader from eth --- dex/downloader/statesync.go | 484 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 484 insertions(+) create mode 100644 dex/downloader/statesync.go (limited to 'dex/downloader/statesync.go') diff --git a/dex/downloader/statesync.go b/dex/downloader/statesync.go new file mode 100644 index 000000000..49117abbb --- /dev/null +++ b/dex/downloader/statesync.go @@ -0,0 +1,484 @@ +// Copyright 2017 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package downloader + +import ( + "fmt" + "hash" + "sync" + "time" + + "github.com/dexon-foundation/dexon/common" + "github.com/dexon-foundation/dexon/core/rawdb" + "github.com/dexon-foundation/dexon/core/state" + "github.com/dexon-foundation/dexon/ethdb" + "github.com/dexon-foundation/dexon/log" + "github.com/dexon-foundation/dexon/trie" + "golang.org/x/crypto/sha3" +) + +// stateReq represents a batch of state fetch requests grouped together into +// a single data retrieval network packet. +type stateReq struct { + items []common.Hash // Hashes of the state items to download + tasks map[common.Hash]*stateTask // Download tasks to track previous attempts + timeout time.Duration // Maximum round trip time for this to complete + timer *time.Timer // Timer to fire when the RTT timeout expires + peer *peerConnection // Peer that we're requesting from + response [][]byte // Response data of the peer (nil for timeouts) + dropped bool // Flag whether the peer dropped off early +} + +// timedOut returns if this request timed out. +func (req *stateReq) timedOut() bool { + return req.response == nil +} + +// stateSyncStats is a collection of progress stats to report during a state trie +// sync to RPC requests as well as to display in user logs. +type stateSyncStats struct { + processed uint64 // Number of state entries processed + duplicate uint64 // Number of state entries downloaded twice + unexpected uint64 // Number of non-requested state entries received + pending uint64 // Number of still pending state entries +} + +// syncState starts downloading state with the given root hash. +func (d *Downloader) syncState(root common.Hash) *stateSync { + s := newStateSync(d, root) + select { + case d.stateSyncStart <- s: + case <-d.quitCh: + s.err = errCancelStateFetch + close(s.done) + } + return s +} + +// stateFetcher manages the active state sync and accepts requests +// on its behalf. +func (d *Downloader) stateFetcher() { + for { + select { + case s := <-d.stateSyncStart: + for next := s; next != nil; { + next = d.runStateSync(next) + } + case <-d.stateCh: + // Ignore state responses while no sync is running. + case <-d.quitCh: + return + } + } +} + +// runStateSync runs a state synchronisation until it completes or another root +// hash is requested to be switched over to. +func (d *Downloader) runStateSync(s *stateSync) *stateSync { + var ( + active = make(map[string]*stateReq) // Currently in-flight requests + finished []*stateReq // Completed or failed requests + timeout = make(chan *stateReq) // Timed out active requests + ) + defer func() { + // Cancel active request timers on exit. Also set peers to idle so they're + // available for the next sync. + for _, req := range active { + req.timer.Stop() + req.peer.SetNodeDataIdle(len(req.items)) + } + }() + // Run the state sync. + go s.run() + defer s.Cancel() + + // Listen for peer departure events to cancel assigned tasks + peerDrop := make(chan *peerConnection, 1024) + peerSub := s.d.peers.SubscribePeerDrops(peerDrop) + defer peerSub.Unsubscribe() + + for { + // Enable sending of the first buffered element if there is one. + var ( + deliverReq *stateReq + deliverReqCh chan *stateReq + ) + if len(finished) > 0 { + deliverReq = finished[0] + deliverReqCh = s.deliver + } + + select { + // The stateSync lifecycle: + case next := <-d.stateSyncStart: + return next + + case <-s.done: + return nil + + // Send the next finished request to the current sync: + case deliverReqCh <- deliverReq: + // Shift out the first request, but also set the emptied slot to nil for GC + copy(finished, finished[1:]) + finished[len(finished)-1] = nil + finished = finished[:len(finished)-1] + + // Handle incoming state packs: + case pack := <-d.stateCh: + // Discard any data not requested (or previously timed out) + req := active[pack.PeerId()] + if req == nil { + log.Debug("Unrequested node data", "peer", pack.PeerId(), "len", pack.Items()) + continue + } + // Finalize the request and queue up for processing + req.timer.Stop() + req.response = pack.(*statePack).states + + finished = append(finished, req) + delete(active, pack.PeerId()) + + // Handle dropped peer connections: + case p := <-peerDrop: + // Skip if no request is currently pending + req := active[p.id] + if req == nil { + continue + } + // Finalize the request and queue up for processing + req.timer.Stop() + req.dropped = true + + finished = append(finished, req) + delete(active, p.id) + + // Handle timed-out requests: + case req := <-timeout: + // If the peer is already requesting something else, ignore the stale timeout. + // This can happen when the timeout and the delivery happens simultaneously, + // causing both pathways to trigger. + if active[req.peer.id] != req { + continue + } + // Move the timed out data back into the download queue + finished = append(finished, req) + delete(active, req.peer.id) + + // Track outgoing state requests: + case req := <-d.trackStateReq: + // If an active request already exists for this peer, we have a problem. In + // theory the trie node schedule must never assign two requests to the same + // peer. In practice however, a peer might receive a request, disconnect and + // immediately reconnect before the previous times out. In this case the first + // request is never honored, alas we must not silently overwrite it, as that + // causes valid requests to go missing and sync to get stuck. + if old := active[req.peer.id]; old != nil { + log.Warn("Busy peer assigned new state fetch", "peer", old.peer.id) + + // Make sure the previous one doesn't get siletly lost + old.timer.Stop() + old.dropped = true + + finished = append(finished, old) + } + // Start a timer to notify the sync loop if the peer stalled. + req.timer = time.AfterFunc(req.timeout, func() { + select { + case timeout <- req: + case <-s.done: + // Prevent leaking of timer goroutines in the unlikely case where a + // timer is fired just before exiting runStateSync. + } + }) + active[req.peer.id] = req + } + } +} + +// stateSync schedules requests for downloading a particular state trie defined +// by a given state root. +type stateSync struct { + d *Downloader // Downloader instance to access and manage current peerset + + sched *trie.Sync // State trie sync scheduler defining the tasks + keccak hash.Hash // Keccak256 hasher to verify deliveries with + tasks map[common.Hash]*stateTask // Set of tasks currently queued for retrieval + + numUncommitted int + bytesUncommitted int + + deliver chan *stateReq // Delivery channel multiplexing peer responses + cancel chan struct{} // Channel to signal a termination request + cancelOnce sync.Once // Ensures cancel only ever gets called once + done chan struct{} // Channel to signal termination completion + err error // Any error hit during sync (set before completion) +} + +// stateTask represents a single trie node download task, containing a set of +// peers already attempted retrieval from to detect stalled syncs and abort. +type stateTask struct { + attempts map[string]struct{} +} + +// newStateSync creates a new state trie download scheduler. This method does not +// yet start the sync. The user needs to call run to initiate. +func newStateSync(d *Downloader, root common.Hash) *stateSync { + return &stateSync{ + d: d, + sched: state.NewStateSync(root, d.stateDB), + keccak: sha3.NewLegacyKeccak256(), + tasks: make(map[common.Hash]*stateTask), + deliver: make(chan *stateReq), + cancel: make(chan struct{}), + done: make(chan struct{}), + } +} + +// run starts the task assignment and response processing loop, blocking until +// it finishes, and finally notifying any goroutines waiting for the loop to +// finish. +func (s *stateSync) run() { + s.err = s.loop() + close(s.done) +} + +// Wait blocks until the sync is done or canceled. +func (s *stateSync) Wait() error { + <-s.done + return s.err +} + +// Cancel cancels the sync and waits until it has shut down. +func (s *stateSync) Cancel() error { + s.cancelOnce.Do(func() { close(s.cancel) }) + return s.Wait() +} + +// loop is the main event loop of a state trie sync. It it responsible for the +// assignment of new tasks to peers (including sending it to them) as well as +// for the processing of inbound data. Note, that the loop does not directly +// receive data from peers, rather those are buffered up in the downloader and +// pushed here async. The reason is to decouple processing from data receipt +// and timeouts. +func (s *stateSync) loop() (err error) { + // Listen for new peer events to assign tasks to them + newPeer := make(chan *peerConnection, 1024) + peerSub := s.d.peers.SubscribeNewPeers(newPeer) + defer peerSub.Unsubscribe() + defer func() { + cerr := s.commit(true) + if err == nil { + err = cerr + } + }() + + // Keep assigning new tasks until the sync completes or aborts + for s.sched.Pending() > 0 { + if err = s.commit(false); err != nil { + return err + } + s.assignTasks() + // Tasks assigned, wait for something to happen + select { + case <-newPeer: + // New peer arrived, try to assign it download tasks + + case <-s.cancel: + return errCancelStateFetch + + case <-s.d.cancelCh: + return errCancelStateFetch + + case req := <-s.deliver: + // Response, disconnect or timeout triggered, drop the peer if stalling + log.Trace("Received node data response", "peer", req.peer.id, "count", len(req.response), "dropped", req.dropped, "timeout", !req.dropped && req.timedOut()) + if len(req.items) <= 2 && !req.dropped && req.timedOut() { + // 2 items are the minimum requested, if even that times out, we've no use of + // this peer at the moment. + log.Warn("Stalling state sync, dropping peer", "peer", req.peer.id) + s.d.dropPeer(req.peer.id) + } + // Process all the received blobs and check for stale delivery + delivered, err := s.process(req) + if err != nil { + log.Warn("Node data write error", "err", err) + return err + } + req.peer.SetNodeDataIdle(delivered) + } + } + return nil +} + +func (s *stateSync) commit(force bool) error { + if !force && s.bytesUncommitted < ethdb.IdealBatchSize { + return nil + } + start := time.Now() + b := s.d.stateDB.NewBatch() + if written, err := s.sched.Commit(b); written == 0 || err != nil { + return err + } + if err := b.Write(); err != nil { + return fmt.Errorf("DB write error: %v", err) + } + s.updateStats(s.numUncommitted, 0, 0, time.Since(start)) + s.numUncommitted = 0 + s.bytesUncommitted = 0 + return nil +} + +// assignTasks attempts to assign new tasks to all idle peers, either from the +// batch currently being retried, or fetching new data from the trie sync itself. +func (s *stateSync) assignTasks() { + // Iterate over all idle peers and try to assign them state fetches + peers, _ := s.d.peers.NodeDataIdlePeers() + for _, p := range peers { + // Assign a batch of fetches proportional to the estimated latency/bandwidth + cap := p.NodeDataCapacity(s.d.requestRTT()) + req := &stateReq{peer: p, timeout: s.d.requestTTL()} + s.fillTasks(cap, req) + + // If the peer was assigned tasks to fetch, send the network request + if len(req.items) > 0 { + req.peer.log.Trace("Requesting new batch of data", "type", "state", "count", len(req.items)) + select { + case s.d.trackStateReq <- req: + req.peer.FetchNodeData(req.items) + case <-s.cancel: + case <-s.d.cancelCh: + } + } + } +} + +// fillTasks fills the given request object with a maximum of n state download +// tasks to send to the remote peer. +func (s *stateSync) fillTasks(n int, req *stateReq) { + // Refill available tasks from the scheduler. + if len(s.tasks) < n { + new := s.sched.Missing(n - len(s.tasks)) + for _, hash := range new { + s.tasks[hash] = &stateTask{make(map[string]struct{})} + } + } + // Find tasks that haven't been tried with the request's peer. + req.items = make([]common.Hash, 0, n) + req.tasks = make(map[common.Hash]*stateTask, n) + for hash, t := range s.tasks { + // Stop when we've gathered enough requests + if len(req.items) == n { + break + } + // Skip any requests we've already tried from this peer + if _, ok := t.attempts[req.peer.id]; ok { + continue + } + // Assign the request to this peer + t.attempts[req.peer.id] = struct{}{} + req.items = append(req.items, hash) + req.tasks[hash] = t + delete(s.tasks, hash) + } +} + +// process iterates over a batch of delivered state data, injecting each item +// into a running state sync, re-queuing any items that were requested but not +// delivered. +// Returns whether the peer actually managed to deliver anything of value, +// and any error that occurred +func (s *stateSync) process(req *stateReq) (int, error) { + // Collect processing stats and update progress if valid data was received + duplicate, unexpected, successful := 0, 0, 0 + + defer func(start time.Time) { + if duplicate > 0 || unexpected > 0 { + s.updateStats(0, duplicate, unexpected, time.Since(start)) + } + }(time.Now()) + + // Iterate over all the delivered data and inject one-by-one into the trie + progress := false + for _, blob := range req.response { + prog, hash, err := s.processNodeData(blob) + switch err { + case nil: + s.numUncommitted++ + s.bytesUncommitted += len(blob) + progress = progress || prog + successful++ + case trie.ErrNotRequested: + unexpected++ + case trie.ErrAlreadyProcessed: + duplicate++ + default: + return successful, fmt.Errorf("invalid state node %s: %v", hash.TerminalString(), err) + } + if _, ok := req.tasks[hash]; ok { + delete(req.tasks, hash) + } + } + // Put unfulfilled tasks back into the retry queue + npeers := s.d.peers.Len() + for hash, task := range req.tasks { + // If the node did deliver something, missing items may be due to a protocol + // limit or a previous timeout + delayed delivery. Both cases should permit + // the node to retry the missing items (to avoid single-peer stalls). + if len(req.response) > 0 || req.timedOut() { + delete(task.attempts, req.peer.id) + } + // If we've requested the node too many times already, it may be a malicious + // sync where nobody has the right data. Abort. + if len(task.attempts) >= npeers { + return successful, fmt.Errorf("state node %s failed with all peers (%d tries, %d peers)", hash.TerminalString(), len(task.attempts), npeers) + } + // Missing item, place into the retry queue. + s.tasks[hash] = task + } + return successful, nil +} + +// processNodeData tries to inject a trie node data blob delivered from a remote +// peer into the state trie, returning whether anything useful was written or any +// error occurred. +func (s *stateSync) processNodeData(blob []byte) (bool, common.Hash, error) { + res := trie.SyncResult{Data: blob} + s.keccak.Reset() + s.keccak.Write(blob) + s.keccak.Sum(res.Hash[:0]) + committed, _, err := s.sched.Process([]trie.SyncResult{res}) + return committed, res.Hash, err +} + +// updateStats bumps the various state sync progress counters and displays a log +// message for the user to see. +func (s *stateSync) updateStats(written, duplicate, unexpected int, duration time.Duration) { + s.d.syncStatsLock.Lock() + defer s.d.syncStatsLock.Unlock() + + s.d.syncStatsState.pending = uint64(s.sched.Pending()) + s.d.syncStatsState.processed += uint64(written) + s.d.syncStatsState.duplicate += uint64(duplicate) + s.d.syncStatsState.unexpected += uint64(unexpected) + + if written > 0 || duplicate > 0 || unexpected > 0 { + log.Info("Imported new state entries", "count", written, "elapsed", common.PrettyDuration(duration), "processed", s.d.syncStatsState.processed, "pending", s.d.syncStatsState.pending, "retry", len(s.tasks), "duplicate", s.d.syncStatsState.duplicate, "unexpected", s.d.syncStatsState.unexpected) + } + if written > 0 { + rawdb.WriteFastTrieProgress(s.d.stateDB, s.d.syncStatsState.processed) + } +} -- cgit v1.2.3