diff options
Diffstat (limited to 'eth')
-rw-r--r-- | eth/backend.go | 122 | ||||
-rw-r--r-- | eth/backend_test.go | 4 | ||||
-rw-r--r-- | eth/downloader/downloader.go | 488 | ||||
-rw-r--r-- | eth/downloader/downloader_test.go | 135 | ||||
-rw-r--r-- | eth/downloader/peer.go | 240 | ||||
-rw-r--r-- | eth/downloader/queue.go | 301 | ||||
-rw-r--r-- | eth/filters/filter_test.go | 8 | ||||
-rw-r--r-- | eth/gasprice.go | 2 | ||||
-rw-r--r-- | eth/handler.go | 42 | ||||
-rw-r--r-- | eth/helper_test.go | 4 | ||||
-rw-r--r-- | eth/peer.go | 47 | ||||
-rw-r--r-- | eth/protocol.go | 3 | ||||
-rw-r--r-- | eth/sync.go | 4 | ||||
-rw-r--r-- | eth/sync_test.go | 4 |
14 files changed, 726 insertions, 678 deletions
diff --git a/eth/backend.go b/eth/backend.go index 0683705df..5bd6ac55d 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -65,7 +65,7 @@ const ( var ( jsonlogger = logger.NewJsonLogger() - datadirInUseErrNos = []uint{11, 32, 35} + datadirInUseErrnos = map[uint]bool{11: true, 32: true, 35: true} portInUseErrRE = regexp.MustCompile("address already in use") defaultBootNodes = []*discover.Node{ @@ -231,9 +231,7 @@ type Ethereum struct { chainDb ethdb.Database // Block chain database dappDb ethdb.Database // Dapp database - //*** SERVICES *** - // State manager for processing new blocks and managing the over all states - blockProcessor *core.BlockProcessor + // Handlers txPool *core.TxPool blockchain *core.BlockChain accountManager *accounts.Manager @@ -286,15 +284,7 @@ func New(config *Config) (*Ethereum, error) { // Open the chain database and perform any upgrades needed chainDb, err := newdb(filepath.Join(config.DataDir, "chaindata")) if err != nil { - var ok bool - errno := uint(err.(syscall.Errno)) - for _, no := range datadirInUseErrNos { - if errno == no { - ok = true - break - } - } - if ok { + if errno, ok := err.(syscall.Errno); ok && datadirInUseErrnos[uint(errno)] { err = fmt.Errorf("%v (check if another instance of geth is already running with the same data directory '%s')", err, config.DataDir) } return nil, fmt.Errorf("blockchain db err: %v", err) @@ -311,14 +301,7 @@ func New(config *Config) (*Ethereum, error) { dappDb, err := newdb(filepath.Join(config.DataDir, "dapp")) if err != nil { - var ok bool - for _, no := range datadirInUseErrNos { - if uint(err.(syscall.Errno)) == no { - ok = true - break - } - } - if ok { + if errno, ok := err.(syscall.Errno); ok && datadirInUseErrnos[uint(errno)] { err = fmt.Errorf("%v (check if another instance of geth is already running with the same data directory '%s')", err, config.DataDir) } return nil, fmt.Errorf("dapp db err: %v", err) @@ -422,8 +405,6 @@ func New(config *Config) (*Ethereum, error) { newPool := core.NewTxPool(eth.EventMux(), eth.blockchain.State, eth.blockchain.GasLimit) eth.txPool = newPool - eth.blockProcessor = core.NewBlockProcessor(chainDb, eth.pow, eth.blockchain, eth.EventMux()) - eth.blockchain.SetProcessor(eth.blockProcessor) if eth.protocolManager, err = NewProtocolManager(config.FastSync, config.NetworkId, eth.eventMux, eth.txPool, eth.pow, eth.blockchain, chainDb); err != nil { return nil, err } @@ -467,62 +448,10 @@ func New(config *Config) (*Ethereum, error) { return eth, nil } -type NodeInfo struct { - Name string - NodeUrl string - NodeID string - IP string - DiscPort int // UDP listening port for discovery protocol - TCPPort int // TCP listening port for RLPx - Td string - ListenAddr string -} - -func (s *Ethereum) NodeInfo() *NodeInfo { - node := s.net.Self() - - return &NodeInfo{ - Name: s.Name(), - NodeUrl: node.String(), - NodeID: node.ID.String(), - IP: node.IP.String(), - DiscPort: int(node.UDP), - TCPPort: int(node.TCP), - ListenAddr: s.net.ListenAddr, - Td: s.BlockChain().GetTd(s.BlockChain().CurrentBlock().Hash()).String(), - } -} - -type PeerInfo struct { - ID string - Name string - Caps string - RemoteAddress string - LocalAddress string -} - -func newPeerInfo(peer *p2p.Peer) *PeerInfo { - var caps []string - for _, cap := range peer.Caps() { - caps = append(caps, cap.String()) - } - return &PeerInfo{ - ID: peer.ID().String(), - Name: peer.Name(), - Caps: strings.Join(caps, ", "), - RemoteAddress: peer.RemoteAddr().String(), - LocalAddress: peer.LocalAddr().String(), - } -} - -// PeersInfo returns an array of PeerInfo objects describing connected peers -func (s *Ethereum) PeersInfo() (peersinfo []*PeerInfo) { - for _, peer := range s.net.Peers() { - if peer != nil { - peersinfo = append(peersinfo, newPeerInfo(peer)) - } - } - return +// Network retrieves the underlying P2P network server. This should eventually +// be moved out into a protocol independent package, but for now use an accessor. +func (s *Ethereum) Network() *p2p.Server { + return s.net } func (s *Ethereum) ResetWithGenesisBlock(gb *types.Block) { @@ -552,24 +481,23 @@ func (s *Ethereum) IsMining() bool { return s.miner.Mining() } func (s *Ethereum) Miner() *miner.Miner { return s.miner } // func (s *Ethereum) Logger() logger.LogSystem { return s.logger } -func (s *Ethereum) Name() string { return s.net.Name } -func (s *Ethereum) AccountManager() *accounts.Manager { return s.accountManager } -func (s *Ethereum) BlockChain() *core.BlockChain { return s.blockchain } -func (s *Ethereum) BlockProcessor() *core.BlockProcessor { return s.blockProcessor } -func (s *Ethereum) TxPool() *core.TxPool { return s.txPool } -func (s *Ethereum) Whisper() *whisper.Whisper { return s.whisper } -func (s *Ethereum) EventMux() *event.TypeMux { return s.eventMux } -func (s *Ethereum) ChainDb() ethdb.Database { return s.chainDb } -func (s *Ethereum) DappDb() ethdb.Database { return s.dappDb } -func (s *Ethereum) IsListening() bool { return true } // Always listening -func (s *Ethereum) PeerCount() int { return s.net.PeerCount() } -func (s *Ethereum) Peers() []*p2p.Peer { return s.net.Peers() } -func (s *Ethereum) MaxPeers() int { return s.net.MaxPeers } -func (s *Ethereum) ClientVersion() string { return s.clientVersion } -func (s *Ethereum) EthVersion() int { return int(s.protocolManager.SubProtocols[0].Version) } -func (s *Ethereum) NetVersion() int { return s.netVersionId } -func (s *Ethereum) ShhVersion() int { return s.shhVersionId } -func (s *Ethereum) Downloader() *downloader.Downloader { return s.protocolManager.downloader } +func (s *Ethereum) Name() string { return s.net.Name } +func (s *Ethereum) AccountManager() *accounts.Manager { return s.accountManager } +func (s *Ethereum) BlockChain() *core.BlockChain { return s.blockchain } +func (s *Ethereum) TxPool() *core.TxPool { return s.txPool } +func (s *Ethereum) Whisper() *whisper.Whisper { return s.whisper } +func (s *Ethereum) EventMux() *event.TypeMux { return s.eventMux } +func (s *Ethereum) ChainDb() ethdb.Database { return s.chainDb } +func (s *Ethereum) DappDb() ethdb.Database { return s.dappDb } +func (s *Ethereum) IsListening() bool { return true } // Always listening +func (s *Ethereum) PeerCount() int { return s.net.PeerCount() } +func (s *Ethereum) Peers() []*p2p.Peer { return s.net.Peers() } +func (s *Ethereum) MaxPeers() int { return s.net.MaxPeers } +func (s *Ethereum) ClientVersion() string { return s.clientVersion } +func (s *Ethereum) EthVersion() int { return int(s.protocolManager.SubProtocols[0].Version) } +func (s *Ethereum) NetVersion() int { return s.netVersionId } +func (s *Ethereum) ShhVersion() int { return s.shhVersionId } +func (s *Ethereum) Downloader() *downloader.Downloader { return s.protocolManager.downloader } // Start the ethereum func (s *Ethereum) Start() error { diff --git a/eth/backend_test.go b/eth/backend_test.go index 0379fc843..83219de62 100644 --- a/eth/backend_test.go +++ b/eth/backend_test.go @@ -32,7 +32,7 @@ func TestMipmapUpgrade(t *testing.T) { } // store the receipts - err := core.PutReceipts(db, receipts) + err := core.WriteReceipts(db, receipts) if err != nil { t.Fatal(err) } @@ -45,7 +45,7 @@ func TestMipmapUpgrade(t *testing.T) { if err := core.WriteHeadBlockHash(db, block.Hash()); err != nil { t.Fatalf("failed to insert block number: %v", err) } - if err := core.PutBlockReceipts(db, block.Hash(), receipts[i]); err != nil { + if err := core.WriteBlockReceipts(db, block.Hash(), receipts[i]); err != nil { t.Fatal("error writing block receipts:", err) } } diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 153427ee4..c272d05af 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -45,16 +45,17 @@ var ( MaxReceiptFetch = 256 // Amount of transaction receipts to allow fetching per request MaxStateFetch = 384 // Amount of node state values to allow fetching per request - hashTTL = 5 * time.Second // [eth/61] Time it takes for a hash request to time out - blockSoftTTL = 3 * time.Second // [eth/61] Request completion threshold for increasing or decreasing a peer's bandwidth - blockHardTTL = 3 * blockSoftTTL // [eth/61] Maximum time allowance before a block request is considered expired - headerTTL = 5 * time.Second // [eth/62] Time it takes for a header request to time out - bodySoftTTL = 3 * time.Second // [eth/62] Request completion threshold for increasing or decreasing a peer's bandwidth - bodyHardTTL = 3 * bodySoftTTL // [eth/62] Maximum time allowance before a block body request is considered expired - receiptSoftTTL = 3 * time.Second // [eth/63] Request completion threshold for increasing or decreasing a peer's bandwidth - receiptHardTTL = 3 * receiptSoftTTL // [eth/63] Maximum time allowance before a receipt request is considered expired - stateSoftTTL = 2 * time.Second // [eth/63] Request completion threshold for increasing or decreasing a peer's bandwidth - stateHardTTL = 3 * stateSoftTTL // [eth/63] Maximum time allowance before a node data request is considered expired + hashTTL = 3 * time.Second // [eth/61] Time it takes for a hash request to time out + blockTargetRTT = 3 * time.Second / 2 // [eth/61] Target time for completing a block retrieval request + blockTTL = 3 * blockTargetRTT // [eth/61] Maximum time allowance before a block request is considered expired + + headerTTL = 3 * time.Second // [eth/62] Time it takes for a header request to time out + bodyTargetRTT = 3 * time.Second / 2 // [eth/62] Target time for completing a block body retrieval request + bodyTTL = 3 * bodyTargetRTT // [eth/62] Maximum time allowance before a block body request is considered expired + receiptTargetRTT = 3 * time.Second / 2 // [eth/63] Target time for completing a receipt retrieval request + receiptTTL = 3 * receiptTargetRTT // [eth/63] Maximum time allowance before a receipt request is considered expired + stateTargetRTT = 2 * time.Second / 2 // [eth/63] Target time for completing a state trie retrieval request + stateTTL = 3 * stateTargetRTT // [eth/63] Maximum time allowance before a node data request is considered expired maxQueuedHashes = 256 * 1024 // [eth/61] Maximum number of hashes to queue for import (DOS protection) maxQueuedHeaders = 256 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection) @@ -74,7 +75,6 @@ var ( errBadPeer = errors.New("action from bad peer ignored") errStallingPeer = errors.New("peer is stalling") errNoPeers = errors.New("no peers to keep download active") - errPendingQueue = errors.New("pending items in queue") errTimeout = errors.New("timeout") errEmptyHashSet = errors.New("empty hash set by peer") errEmptyHeaderSet = errors.New("empty header set by peer") @@ -90,6 +90,7 @@ var ( errCancelBodyFetch = errors.New("block body download canceled (requested)") errCancelReceiptFetch = errors.New("receipt download canceled (requested)") errCancelStateFetch = errors.New("state data download canceled (requested)") + errCancelProcessing = errors.New("processing canceled (requested)") errNoSyncActive = errors.New("no sync active") ) @@ -129,7 +130,6 @@ type Downloader struct { // Status synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing synchronising int32 - processing int32 notified int32 // Channels @@ -215,7 +215,7 @@ func (d *Downloader) Progress() (uint64, uint64, uint64) { // Synchronising returns whether the downloader is currently retrieving blocks. func (d *Downloader) Synchronising() bool { - return atomic.LoadInt32(&d.synchronising) > 0 || atomic.LoadInt32(&d.processing) > 0 + return atomic.LoadInt32(&d.synchronising) > 0 } // RegisterPeer injects a new download peer into the set of block source to be @@ -263,9 +263,6 @@ func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode glog.V(logger.Debug).Infof("Removing peer %v: %v", id, err) d.dropPeer(id) - case errPendingQueue: - glog.V(logger.Debug).Infoln("Synchronisation aborted:", err) - default: glog.V(logger.Warn).Infof("Synchronisation failed: %v", err) } @@ -290,10 +287,6 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode if atomic.CompareAndSwapInt32(&d.notified, 0, 1) { glog.V(logger.Info).Infoln("Block synchronisation started") } - // Abort if the queue still contains some leftover data - if d.queue.GetHeadResult() != nil { - return errPendingQueue - } // Reset the queue, peer set and wake channels to clean any internal leftover state d.queue.Reset() d.peers.Reset() @@ -335,7 +328,6 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e defer func() { // reset on error if err != nil { - d.cancel() d.mux.Post(FailedEvent{err}) } else { d.mux.Post(DoneEvent{}) @@ -365,23 +357,15 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e d.syncStatsChainHeight = latest d.syncStatsLock.Unlock() - // Initiate the sync using a concurrent hash and block retrieval algorithm + // Initiate the sync using a concurrent hash and block retrieval algorithm + d.queue.Prepare(origin+1, d.mode, 0) if d.syncInitHook != nil { d.syncInitHook(origin, latest) } - d.queue.Prepare(origin+1, d.mode, 0) - - errc := make(chan error, 2) - go func() { errc <- d.fetchHashes61(p, td, origin+1) }() - go func() { errc <- d.fetchBlocks61(origin + 1) }() - - // If any fetcher fails, cancel the other - if err := <-errc; err != nil { - d.cancel() - <-errc - return err - } - return <-errc + return d.spawnSync( + func() error { return d.fetchHashes61(p, td, origin+1) }, + func() error { return d.fetchBlocks61(origin + 1) }, + ) case p.version >= 62: // Look up the sync boundaries: the common ancestor and the target block @@ -405,7 +389,6 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e switch d.mode { case LightSync: pivot = latest - case FastSync: // Calculate the new fast/slow sync pivot point pivotOffset, err := rand.Int(rand.Reader, big.NewInt(int64(fsPivotInterval))) @@ -426,34 +409,51 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e glog.V(logger.Debug).Infof("Fast syncing until pivot block #%d", pivot) } d.queue.Prepare(origin+1, d.mode, pivot) - if d.syncInitHook != nil { d.syncInitHook(origin, latest) } - errc := make(chan error, 4) - go func() { errc <- d.fetchHeaders(p, td, origin+1) }() // Headers are always retrieved - go func() { errc <- d.fetchBodies(origin + 1) }() // Bodies are retrieved during normal and fast sync - go func() { errc <- d.fetchReceipts(origin + 1) }() // Receipts are retrieved during fast sync - go func() { errc <- d.fetchNodeData() }() // Node state data is retrieved during fast sync - - // If any fetcher fails, cancel the others - var fail error - for i := 0; i < cap(errc); i++ { - if err := <-errc; err != nil { - if fail == nil { - fail = err - d.cancel() - } - } - } - return fail + return d.spawnSync( + func() error { return d.fetchHeaders(p, td, origin+1) }, // Headers are always retrieved + func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync + func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync + func() error { return d.fetchNodeData() }, // Node state data is retrieved during fast sync + ) default: // Something very wrong, stop right here glog.V(logger.Error).Infof("Unsupported eth protocol: %d", p.version) return errBadPeer } - return nil +} + +// spawnSync runs d.process and all given fetcher functions to completion in +// separate goroutines, returning the first error that appears. +func (d *Downloader) spawnSync(fetchers ...func() error) error { + var wg sync.WaitGroup + errc := make(chan error, len(fetchers)+1) + wg.Add(len(fetchers) + 1) + go func() { defer wg.Done(); errc <- d.process() }() + for _, fn := range fetchers { + fn := fn + go func() { defer wg.Done(); errc <- fn() }() + } + // Wait for the first error, then terminate the others. + var err error + for i := 0; i < len(fetchers)+1; i++ { + if i == len(fetchers) { + // Close the queue when all fetchers have exited. + // This will cause the block processor to end when + // it has processed the queue. + d.queue.Close() + } + if err = <-errc; err != nil { + break + } + } + d.queue.Close() + d.cancel() + wg.Wait() + return err } // cancel cancels all of the operations and resets the queue. It returns true @@ -470,12 +470,10 @@ func (d *Downloader) cancel() { } } d.cancelLock.Unlock() - - // Reset the queue - d.queue.Reset() } // Terminate interrupts the downloader, canceling all pending operations. +// The downloader cannot be reused after calling Terminate. func (d *Downloader) Terminate() { atomic.StoreInt32(&d.interrupt, 1) d.cancel() @@ -489,21 +487,12 @@ func (d *Downloader) fetchHeight61(p *peer) (uint64, error) { // Request the advertised remote head block and wait for the response go p.getBlocks([]common.Hash{p.head}) - timeout := time.After(blockSoftTTL) + timeout := time.After(hashTTL) for { select { case <-d.cancelCh: return 0, errCancelBlockFetch - case <-d.headerCh: - // Out of bounds eth/62 block headers received, ignore them - - case <-d.bodyCh: - // Out of bounds eth/62 block bodies received, ignore them - - case <-d.hashCh: - // Out of bounds hashes received, ignore them - case packet := <-d.blockCh: // Discard anything not from the origin peer if packet.PeerId() != p.id { @@ -521,6 +510,16 @@ func (d *Downloader) fetchHeight61(p *peer) (uint64, error) { case <-timeout: glog.V(logger.Debug).Infof("%v: head block timeout", p) return 0, errTimeout + + case <-d.hashCh: + // Out of bounds hashes received, ignore them + + case <-d.headerCh: + case <-d.bodyCh: + case <-d.stateCh: + case <-d.receiptCh: + // Ignore eth/{62,63} packets because this is eth/61. + // These can arrive as a late delivery from a previous sync. } } } @@ -571,18 +570,19 @@ func (d *Downloader) findAncestor61(p *peer) (uint64, error) { } } + case <-timeout: + glog.V(logger.Debug).Infof("%v: head hash timeout", p) + return 0, errTimeout + case <-d.blockCh: // Out of bounds blocks received, ignore them case <-d.headerCh: - // Out of bounds eth/62 block headers received, ignore them - case <-d.bodyCh: - // Out of bounds eth/62 block bodies received, ignore them - - case <-timeout: - glog.V(logger.Debug).Infof("%v: head hash timeout", p) - return 0, errTimeout + case <-d.stateCh: + case <-d.receiptCh: + // Ignore eth/{62,63} packets because this is eth/61. + // These can arrive as a late delivery from a previous sync. } } // If the head fetch already found an ancestor, return @@ -631,18 +631,19 @@ func (d *Downloader) findAncestor61(p *peer) (uint64, error) { } start = check + case <-timeout: + glog.V(logger.Debug).Infof("%v: search hash timeout", p) + return 0, errTimeout + case <-d.blockCh: // Out of bounds blocks received, ignore them case <-d.headerCh: - // Out of bounds eth/62 block headers received, ignore them - case <-d.bodyCh: - // Out of bounds eth/62 block bodies received, ignore them - - case <-timeout: - glog.V(logger.Debug).Infof("%v: search hash timeout", p) - return 0, errTimeout + case <-d.stateCh: + case <-d.receiptCh: + // Ignore eth/{62,63} packets because this is eth/61. + // These can arrive as a late delivery from a previous sync. } } } @@ -676,12 +677,6 @@ func (d *Downloader) fetchHashes61(p *peer, td *big.Int, from uint64) error { case <-d.cancelCh: return errCancelHashFetch - case <-d.headerCh: - // Out of bounds eth/62 block headers received, ignore them - - case <-d.bodyCh: - // Out of bounds eth/62 block bodies received, ignore them - case packet := <-d.hashCh: // Make sure the active peer is giving us the hashes if packet.PeerId() != p.id { @@ -750,6 +745,13 @@ func (d *Downloader) fetchHashes61(p *peer, td *big.Int, from uint64) error { glog.V(logger.Debug).Infof("%v: hash request timed out", p) hashTimeoutMeter.Mark(1) return errTimeout + + case <-d.headerCh: + case <-d.bodyCh: + case <-d.stateCh: + case <-d.receiptCh: + // Ignore eth/{62,63} packets because this is eth/61. + // These can arrive as a late delivery from a previous sync. } } } @@ -774,59 +776,31 @@ func (d *Downloader) fetchBlocks61(from uint64) error { case <-d.cancelCh: return errCancelBlockFetch - case <-d.headerCh: - // Out of bounds eth/62 block headers received, ignore them - - case <-d.bodyCh: - // Out of bounds eth/62 block bodies received, ignore them - case packet := <-d.blockCh: // If the peer was previously banned and failed to deliver it's pack // in a reasonable time frame, ignore it's message. if peer := d.peers.Peer(packet.PeerId()); peer != nil { - // Deliver the received chunk of blocks, and demote in case of errors blocks := packet.(*blockPack).blocks - err := d.queue.DeliverBlocks(peer.id, blocks) - switch err { - case nil: - // If no blocks were delivered, demote the peer (need the delivery above) - if len(blocks) == 0 { - peer.Demote() - peer.SetBlocksIdle() - glog.V(logger.Detail).Infof("%s: no blocks delivered", peer) - break - } - // All was successful, promote the peer and potentially start processing - peer.Promote() - peer.SetBlocksIdle() - glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blocks)) - go d.process() - case errInvalidChain: - // The hash chain is invalid (blocks are not ordered properly), abort + // Deliver the received chunk of blocks and check chain validity + accepted, err := d.queue.DeliverBlocks(peer.id, blocks) + if err == errInvalidChain { return err - - case errNoFetchesPending: - // Peer probably timed out with its delivery but came through - // in the end, demote, but allow to to pull from this peer. - peer.Demote() - peer.SetBlocksIdle() - glog.V(logger.Detail).Infof("%s: out of bound delivery", peer) - - case errStaleDelivery: - // Delivered something completely else than requested, usually - // caused by a timeout and delivery during a new sync cycle. - // Don't set it to idle as the original request should still be - // in flight. - peer.Demote() - glog.V(logger.Detail).Infof("%s: stale delivery", peer) - + } + // Unless a peer delivered something completely else than requested (usually + // caused by a timed out request which came through in the end), set it to + // idle. If the delivery's stale, the peer should have already been idled. + if err != errStaleDelivery { + peer.SetBlocksIdle(accepted) + } + // Issue a log to the user to see what's going on + switch { + case err == nil && len(blocks) == 0: + glog.V(logger.Detail).Infof("%s: no blocks delivered", peer) + case err == nil: + glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blocks)) default: - // Peer did something semi-useful, demote but keep it around - peer.Demote() - peer.SetBlocksIdle() - glog.V(logger.Detail).Infof("%s: delivery partially failed: %v", peer, err) - go d.process() + glog.V(logger.Detail).Infof("%s: delivery failed: %v", peer, err) } } // Blocks arrived, try to update the progress @@ -859,10 +833,15 @@ func (d *Downloader) fetchBlocks61(from uint64) error { return errNoPeers } // Check for block request timeouts and demote the responsible peers - for _, pid := range d.queue.ExpireBlocks(blockHardTTL) { + for pid, fails := range d.queue.ExpireBlocks(blockTTL) { if peer := d.peers.Peer(pid); peer != nil { - peer.Demote() - glog.V(logger.Detail).Infof("%s: block delivery timeout", peer) + if fails > 1 { + glog.V(logger.Detail).Infof("%s: block delivery timeout", peer) + peer.SetBlocksIdle(0) + } else { + glog.V(logger.Debug).Infof("%s: stalling block delivery, dropping", peer) + d.dropPeer(pid) + } } } // If there's nothing more to fetch, wait or terminate @@ -909,6 +888,13 @@ func (d *Downloader) fetchBlocks61(from uint64) error { if !throttled && !d.queue.InFlightBlocks() && len(idles) == total { return errPeersUnavailable } + + case <-d.headerCh: + case <-d.bodyCh: + case <-d.stateCh: + case <-d.receiptCh: + // Ignore eth/{62,63} packets because this is eth/61. + // These can arrive as a late delivery from a previous sync. } } } @@ -941,18 +927,19 @@ func (d *Downloader) fetchHeight(p *peer) (uint64, error) { } return headers[0].Number.Uint64(), nil + case <-timeout: + glog.V(logger.Debug).Infof("%v: head header timeout", p) + return 0, errTimeout + case <-d.bodyCh: - // Out of bounds block bodies received, ignore them + case <-d.stateCh: + case <-d.receiptCh: + // Out of bounds delivery, ignore case <-d.hashCh: - // Out of bounds eth/61 hashes received, ignore them - case <-d.blockCh: - // Out of bounds eth/61 blocks received, ignore them - - case <-timeout: - glog.V(logger.Debug).Infof("%v: head header timeout", p) - return 0, errTimeout + // Ignore eth/61 packets because this is eth/62+. + // These can arrive as a late delivery from a previous sync. } } } @@ -1008,18 +995,19 @@ func (d *Downloader) findAncestor(p *peer) (uint64, error) { } } + case <-timeout: + glog.V(logger.Debug).Infof("%v: head header timeout", p) + return 0, errTimeout + case <-d.bodyCh: - // Out of bounds block bodies received, ignore them + case <-d.stateCh: + case <-d.receiptCh: + // Out of bounds delivery, ignore case <-d.hashCh: - // Out of bounds eth/61 hashes received, ignore them - case <-d.blockCh: - // Out of bounds eth/61 blocks received, ignore them - - case <-timeout: - glog.V(logger.Debug).Infof("%v: head header timeout", p) - return 0, errTimeout + // Ignore eth/61 packets because this is eth/62+. + // These can arrive as a late delivery from a previous sync. } } // If the head fetch already found an ancestor, return @@ -1068,18 +1056,19 @@ func (d *Downloader) findAncestor(p *peer) (uint64, error) { } start = check + case <-timeout: + glog.V(logger.Debug).Infof("%v: search header timeout", p) + return 0, errTimeout + case <-d.bodyCh: - // Out of bounds block bodies received, ignore them + case <-d.stateCh: + case <-d.receiptCh: + // Out of bounds delivery, ignore case <-d.hashCh: - // Out of bounds eth/61 hashes received, ignore them - case <-d.blockCh: - // Out of bounds eth/61 blocks received, ignore them - - case <-timeout: - glog.V(logger.Debug).Infof("%v: search header timeout", p) - return 0, errTimeout + // Ignore eth/61 packets because this is eth/62+. + // These can arrive as a late delivery from a previous sync. } } } @@ -1141,12 +1130,6 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error { case <-d.cancelCh: return errCancelHeaderFetch - case <-d.hashCh: - // Out of bounds eth/61 hashes received, ignore them - - case <-d.blockCh: - // Out of bounds eth/61 blocks received, ignore them - case packet := <-d.headerCh: // Make sure the active peer is giving us the headers if packet.PeerId() != p.id { @@ -1268,6 +1251,11 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error { } } return nil + + case <-d.hashCh: + case <-d.blockCh: + // Ignore eth/61 packets because this is eth/62+. + // These can arrive as a late delivery from a previous sync. } } } @@ -1279,14 +1267,14 @@ func (d *Downloader) fetchBodies(from uint64) error { glog.V(logger.Debug).Infof("Downloading block bodies from #%d", from) var ( - deliver = func(packet dataPack) error { + deliver = func(packet dataPack) (int, error) { pack := packet.(*bodyPack) return d.queue.DeliverBodies(pack.peerId, pack.transactions, pack.uncles) } - expire = func() []string { return d.queue.ExpireBodies(bodyHardTTL) } + expire = func() map[string]int { return d.queue.ExpireBodies(bodyTTL) } fetch = func(p *peer, req *fetchRequest) error { return p.FetchBodies(req) } capacity = func(p *peer) int { return p.BlockCapacity() } - setIdle = func(p *peer) { p.SetBodiesIdle() } + setIdle = func(p *peer, accepted int) { p.SetBodiesIdle(accepted) } ) err := d.fetchParts(errCancelBodyFetch, d.bodyCh, deliver, d.bodyWakeCh, expire, d.queue.PendingBlocks, d.queue.InFlightBlocks, d.queue.ShouldThrottleBlocks, d.queue.ReserveBodies, @@ -1303,14 +1291,14 @@ func (d *Downloader) fetchReceipts(from uint64) error { glog.V(logger.Debug).Infof("Downloading receipts from #%d", from) var ( - deliver = func(packet dataPack) error { + deliver = func(packet dataPack) (int, error) { pack := packet.(*receiptPack) return d.queue.DeliverReceipts(pack.peerId, pack.receipts) } - expire = func() []string { return d.queue.ExpireReceipts(receiptHardTTL) } + expire = func() map[string]int { return d.queue.ExpireReceipts(receiptTTL) } fetch = func(p *peer, req *fetchRequest) error { return p.FetchReceipts(req) } capacity = func(p *peer) int { return p.ReceiptCapacity() } - setIdle = func(p *peer) { p.SetReceiptsIdle() } + setIdle = func(p *peer, accepted int) { p.SetReceiptsIdle(accepted) } ) err := d.fetchParts(errCancelReceiptFetch, d.receiptCh, deliver, d.receiptWakeCh, expire, d.queue.PendingReceipts, d.queue.InFlightReceipts, d.queue.ShouldThrottleReceipts, d.queue.ReserveReceipts, @@ -1327,7 +1315,7 @@ func (d *Downloader) fetchNodeData() error { glog.V(logger.Debug).Infof("Downloading node state data") var ( - deliver = func(packet dataPack) error { + deliver = func(packet dataPack) (int, error) { start := time.Now() return d.queue.DeliverNodeData(packet.PeerId(), packet.(*statePack).states, func(err error, delivered int) { if err != nil { @@ -1336,10 +1324,8 @@ func (d *Downloader) fetchNodeData() error { d.cancel() return } - // Processing succeeded, notify state fetcher and processor of continuation - if d.queue.PendingNodeData() == 0 { - go d.process() - } else { + // Processing succeeded, notify state fetcher of continuation + if d.queue.PendingNodeData() > 0 { select { case d.stateWakeCh <- true: default: @@ -1348,19 +1334,18 @@ func (d *Downloader) fetchNodeData() error { // Log a message to the user and return d.syncStatsLock.Lock() defer d.syncStatsLock.Unlock() - d.syncStatsStateDone += uint64(delivered) glog.V(logger.Info).Infof("imported %d state entries in %v: processed %d in total", delivered, time.Since(start), d.syncStatsStateDone) }) } - expire = func() []string { return d.queue.ExpireNodeData(stateHardTTL) } + expire = func() map[string]int { return d.queue.ExpireNodeData(stateTTL) } throttle = func() bool { return false } reserve = func(p *peer, count int) (*fetchRequest, bool, error) { return d.queue.ReserveNodeData(p, count), false, nil } fetch = func(p *peer, req *fetchRequest) error { return p.FetchNodeData(req) } capacity = func(p *peer) int { return p.NodeDataCapacity() } - setIdle = func(p *peer) { p.SetNodeDataIdle() } + setIdle = func(p *peer, accepted int) { p.SetNodeDataIdle(accepted) } ) err := d.fetchParts(errCancelStateFetch, d.stateCh, deliver, d.stateWakeCh, expire, d.queue.PendingNodeData, d.queue.InFlightNodeData, throttle, reserve, nil, fetch, @@ -1373,10 +1358,10 @@ func (d *Downloader) fetchNodeData() error { // fetchParts iteratively downloads scheduled block parts, taking any available // peers, reserving a chunk of fetch requests for each, waiting for delivery and // also periodically checking for timeouts. -func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliver func(packet dataPack) error, wakeCh chan bool, - expire func() []string, pending func() int, inFlight func() bool, throttle func() bool, reserve func(*peer, int) (*fetchRequest, bool, error), +func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliver func(dataPack) (int, error), wakeCh chan bool, + expire func() map[string]int, pending func() int, inFlight func() bool, throttle func() bool, reserve func(*peer, int) (*fetchRequest, bool, error), fetchHook func([]*types.Header), fetch func(*peer, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peer) int, - idle func() ([]*peer, int), setIdle func(*peer), kind string) error { + idle func() ([]*peer, int), setIdle func(*peer, int), kind string) error { // Create a ticker to detect expired retrieval tasks ticker := time.NewTicker(100 * time.Millisecond) @@ -1391,57 +1376,29 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv case <-d.cancelCh: return errCancel - case <-d.hashCh: - // Out of bounds eth/61 hashes received, ignore them - - case <-d.blockCh: - // Out of bounds eth/61 blocks received, ignore them - case packet := <-deliveryCh: // If the peer was previously banned and failed to deliver it's pack // in a reasonable time frame, ignore it's message. if peer := d.peers.Peer(packet.PeerId()); peer != nil { - // Deliver the received chunk of data, and demote in case of errors - switch err := deliver(packet); err { - case nil: - // If no blocks were delivered, demote the peer (need the delivery above to clean internal queue!) - if packet.Items() == 0 { - peer.Demote() - setIdle(peer) - glog.V(logger.Detail).Infof("%s: no %s delivered", peer, strings.ToLower(kind)) - break - } - // All was successful, promote the peer and potentially start processing - peer.Promote() - setIdle(peer) - glog.V(logger.Detail).Infof("%s: delivered %s %s(s)", peer, packet.Stats(), strings.ToLower(kind)) - go d.process() - - case errInvalidChain: - // The hash chain is invalid (blocks are not ordered properly), abort + // Deliver the received chunk of data and check chain validity + accepted, err := deliver(packet) + if err == errInvalidChain { return err - - case errNoFetchesPending: - // Peer probably timed out with its delivery but came through - // in the end, demote, but allow to to pull from this peer. - peer.Demote() - setIdle(peer) - glog.V(logger.Detail).Infof("%s: out of bound %s delivery", peer, strings.ToLower(kind)) - - case errStaleDelivery: - // Delivered something completely else than requested, usually - // caused by a timeout and delivery during a new sync cycle. - // Don't set it to idle as the original request should still be - // in flight. - peer.Demote() - glog.V(logger.Detail).Infof("%s: %s stale delivery", peer, strings.ToLower(kind)) - + } + // Unless a peer delivered something completely else than requested (usually + // caused by a timed out request which came through in the end), set it to + // idle. If the delivery's stale, the peer should have already been idled. + if err != errStaleDelivery { + setIdle(peer, accepted) + } + // Issue a log to the user to see what's going on + switch { + case err == nil && packet.Items() == 0: + glog.V(logger.Detail).Infof("%s: no %s delivered", peer, strings.ToLower(kind)) + case err == nil: + glog.V(logger.Detail).Infof("%s: delivered %s %s(s)", peer, packet.Stats(), strings.ToLower(kind)) default: - // Peer did something semi-useful, demote but keep it around - peer.Demote() - setIdle(peer) - glog.V(logger.Detail).Infof("%s: %s delivery partially failed: %v", peer, strings.ToLower(kind), err) - go d.process() + glog.V(logger.Detail).Infof("%s: %s delivery failed: %v", peer, strings.ToLower(kind), err) } } // Blocks assembled, try to update the progress @@ -1474,11 +1431,15 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv return errNoPeers } // Check for fetch request timeouts and demote the responsible peers - for _, pid := range expire() { + for pid, fails := range expire() { if peer := d.peers.Peer(pid); peer != nil { - peer.Demote() - setIdle(peer) - glog.V(logger.Detail).Infof("%s: %s delivery timeout", peer, strings.ToLower(kind)) + if fails > 1 { + glog.V(logger.Detail).Infof("%s: %s delivery timeout", peer, strings.ToLower(kind)) + setIdle(peer, 0) + } else { + glog.V(logger.Debug).Infof("%s: stalling %s delivery, dropping", peer, strings.ToLower(kind)) + d.dropPeer(pid) + } } } // If there's nothing more to fetch, wait or terminate @@ -1508,7 +1469,6 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv } if progress { progressed = true - go d.process() } if request == nil { continue @@ -1540,51 +1500,23 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv if !progressed && !throttled && !running && len(idles) == total && pending() > 0 { return errPeersUnavailable } + + case <-d.hashCh: + case <-d.blockCh: + // Ignore eth/61 packets because this is eth/62+. + // These can arrive as a late delivery from a previous sync. } } } // process takes fetch results from the queue and tries to import them into the -// chain. The type of import operation will depend on the result contents: -// - -// -// The algorithmic flow is as follows: -// - The `processing` flag is swapped to 1 to ensure singleton access -// - The current `cancel` channel is retrieved to detect sync abortions -// - Blocks are iteratively taken from the cache and inserted into the chain -// - When the cache becomes empty, insertion stops -// - The `processing` flag is swapped back to 0 -// - A post-exit check is made whether new blocks became available -// - This step is important: it handles a potential race condition between -// checking for no more work, and releasing the processing "mutex". In -// between these state changes, a block may have arrived, but a processing -// attempt denied, so we need to re-enter to ensure the block isn't left -// to idle in the cache. -func (d *Downloader) process() { - // Make sure only one goroutine is ever allowed to process blocks at once - if !atomic.CompareAndSwapInt32(&d.processing, 0, 1) { - return - } - // If the processor just exited, but there are freshly pending items, try to - // reenter. This is needed because the goroutine spinned up for processing - // the fresh results might have been rejected entry to to this present thread - // not yet releasing the `processing` state. - defer func() { - if atomic.LoadInt32(&d.interrupt) == 0 && d.queue.GetHeadResult() != nil { - d.process() - } - }() - // Release the lock upon exit (note, before checking for reentry!) - // the import statistics to zero. - defer atomic.StoreInt32(&d.processing, 0) - - // Repeat the processing as long as there are results to process +// chain. The type of import operation will depend on the result contents. +func (d *Downloader) process() error { + pivot := d.queue.FastSyncPivot() for { - // Fetch the next batch of results - pivot := d.queue.FastSyncPivot() // Fetch pivot before results to prevent reset race - results := d.queue.TakeResults() + results := d.queue.WaitResults() if len(results) == 0 { - return + return nil // queue empty } if d.chainInsertHook != nil { d.chainInsertHook(results) @@ -1597,7 +1529,7 @@ func (d *Downloader) process() { for len(results) != 0 { // Check for any termination requests if atomic.LoadInt32(&d.interrupt) == 1 { - return + return errCancelProcessing } // Retrieve the a batch of results to import var ( @@ -1633,8 +1565,7 @@ func (d *Downloader) process() { } if err != nil { glog.V(logger.Debug).Infof("Result #%d [%x…] processing failed: %v", results[index].Header.Number, results[index].Header.Hash().Bytes()[:4], err) - d.cancel() - return + return err } // Shift the results to the next batch results = results[items:] @@ -1685,19 +1616,16 @@ func (d *Downloader) deliver(id string, destCh chan dataPack, packet dataPack, i dropMeter.Mark(int64(packet.Items())) } }() - // Make sure the downloader is active - if atomic.LoadInt32(&d.synchronising) == 0 { - return errNoSyncActive - } // Deliver or abort if the sync is canceled while queuing d.cancelLock.RLock() cancel := d.cancelCh d.cancelLock.RUnlock() - + if cancel == nil { + return errNoSyncActive + } select { case destCh <- packet: return nil - case <-cancel: return errNoSyncActive } diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index ef6f74a6b..cfcc8a2ef 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -169,17 +169,7 @@ func (dl *downloadTester) sync(id string, td *big.Int, mode SyncMode) error { } } dl.lock.RUnlock() - - err := dl.downloader.synchronise(id, hash, td, mode) - for { - // If the queue is empty and processing stopped, break - if dl.downloader.queue.Idle() && atomic.LoadInt32(&dl.downloader.processing) == 0 { - break - } - // Otherwise sleep a bit and retry - time.Sleep(time.Millisecond) - } - return err + return dl.downloader.synchronise(id, hash, td, mode) } // hasHeader checks if a header is present in the testers canonical chain. @@ -701,6 +691,8 @@ func TestCanonicalSynchronisation64Fast(t *testing.T) { testCanonicalSynchronis func TestCanonicalSynchronisation64Light(t *testing.T) { testCanonicalSynchronisation(t, 64, LightSync) } func testCanonicalSynchronisation(t *testing.T, protocol int, mode SyncMode) { + t.Parallel() + // Create a small enough block chain to download targetBlocks := blockCacheLimit - 15 hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil) @@ -725,6 +717,8 @@ func TestThrottling64Full(t *testing.T) { testThrottling(t, 64, FullSync) } func TestThrottling64Fast(t *testing.T) { testThrottling(t, 64, FastSync) } func testThrottling(t *testing.T, protocol int, mode SyncMode) { + t.Parallel() + // Create a long block chain to download and the tester targetBlocks := 8 * blockCacheLimit hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil) @@ -757,8 +751,8 @@ func testThrottling(t *testing.T, protocol int, mode SyncMode) { for start := time.Now(); time.Since(start) < time.Second; { time.Sleep(25 * time.Millisecond) - tester.lock.RLock() - tester.downloader.queue.lock.RLock() + tester.lock.Lock() + tester.downloader.queue.lock.Lock() cached = len(tester.downloader.queue.blockDonePool) if mode == FastSync { if receipts := len(tester.downloader.queue.receiptDonePool); receipts < cached { @@ -769,8 +763,8 @@ func testThrottling(t *testing.T, protocol int, mode SyncMode) { } frozen = int(atomic.LoadUint32(&blocked)) retrieved = len(tester.ownBlocks) - tester.downloader.queue.lock.RUnlock() - tester.lock.RUnlock() + tester.downloader.queue.lock.Unlock() + tester.lock.Unlock() if cached == blockCacheLimit || retrieved+cached+frozen == targetBlocks+1 { break @@ -810,6 +804,8 @@ func TestForkedSynchronisation64Fast(t *testing.T) { testForkedSynchronisation( func TestForkedSynchronisation64Light(t *testing.T) { testForkedSynchronisation(t, 64, LightSync) } func testForkedSynchronisation(t *testing.T, protocol int, mode SyncMode) { + t.Parallel() + // Create a long enough forked chain common, fork := MaxHashFetch, 2*MaxHashFetch hashesA, hashesB, headersA, headersB, blocksA, blocksB, receiptsA, receiptsB := makeChainFork(common+fork, fork, genesis, nil) @@ -833,6 +829,7 @@ func testForkedSynchronisation(t *testing.T, protocol int, mode SyncMode) { // Tests that an inactive downloader will not accept incoming hashes and blocks. func TestInactiveDownloader61(t *testing.T) { + t.Parallel() tester := newTester() // Check that neither hashes nor blocks are accepted @@ -847,6 +844,7 @@ func TestInactiveDownloader61(t *testing.T) { // Tests that an inactive downloader will not accept incoming block headers and // bodies. func TestInactiveDownloader62(t *testing.T) { + t.Parallel() tester := newTester() // Check that neither block headers nor bodies are accepted @@ -861,6 +859,7 @@ func TestInactiveDownloader62(t *testing.T) { // Tests that an inactive downloader will not accept incoming block headers, // bodies and receipts. func TestInactiveDownloader63(t *testing.T) { + t.Parallel() tester := newTester() // Check that neither block headers nor bodies are accepted @@ -885,6 +884,8 @@ func TestCancel64Fast(t *testing.T) { testCancel(t, 64, FastSync) } func TestCancel64Light(t *testing.T) { testCancel(t, 64, LightSync) } func testCancel(t *testing.T, protocol int, mode SyncMode) { + t.Parallel() + // Create a small enough block chain to download and the tester targetBlocks := blockCacheLimit - 15 if targetBlocks >= MaxHashFetch { @@ -923,6 +924,8 @@ func TestMultiSynchronisation64Fast(t *testing.T) { testMultiSynchronisation(t, func TestMultiSynchronisation64Light(t *testing.T) { testMultiSynchronisation(t, 64, LightSync) } func testMultiSynchronisation(t *testing.T, protocol int, mode SyncMode) { + t.Parallel() + // Create various peers with various parts of the chain targetPeers := 8 targetBlocks := targetPeers*blockCacheLimit - 15 @@ -950,6 +953,8 @@ func TestMultiProtoSynchronisation64Fast(t *testing.T) { testMultiProtoSync(t, func TestMultiProtoSynchronisation64Light(t *testing.T) { testMultiProtoSync(t, 64, LightSync) } func testMultiProtoSync(t *testing.T, protocol int, mode SyncMode) { + t.Parallel() + // Create a small enough block chain to download targetBlocks := blockCacheLimit - 15 hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil) @@ -986,6 +991,8 @@ func TestEmptyShortCircuit64Fast(t *testing.T) { testEmptyShortCircuit(t, 64, F func TestEmptyShortCircuit64Light(t *testing.T) { testEmptyShortCircuit(t, 64, LightSync) } func testEmptyShortCircuit(t *testing.T, protocol int, mode SyncMode) { + t.Parallel() + // Create a block chain to download targetBlocks := 2*blockCacheLimit - 15 hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil) @@ -1037,6 +1044,8 @@ func TestMissingHeaderAttack64Fast(t *testing.T) { testMissingHeaderAttack(t, 6 func TestMissingHeaderAttack64Light(t *testing.T) { testMissingHeaderAttack(t, 64, LightSync) } func testMissingHeaderAttack(t *testing.T, protocol int, mode SyncMode) { + t.Parallel() + // Create a small enough block chain to download targetBlocks := blockCacheLimit - 15 hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil) @@ -1188,6 +1197,8 @@ func TestHighTDStarvationAttack64Fast(t *testing.T) { testHighTDStarvationAttac func TestHighTDStarvationAttack64Light(t *testing.T) { testHighTDStarvationAttack(t, 64, LightSync) } func testHighTDStarvationAttack(t *testing.T, protocol int, mode SyncMode) { + t.Parallel() + tester := newTester() hashes, headers, blocks, receipts := makeChain(0, 0, genesis, nil) @@ -1209,25 +1220,26 @@ func testBlockHeaderAttackerDropping(t *testing.T, protocol int) { result error drop bool }{ - {nil, false}, // Sync succeeded, all is well - {errBusy, false}, // Sync is already in progress, no problem - {errUnknownPeer, false}, // Peer is unknown, was already dropped, don't double drop - {errBadPeer, true}, // Peer was deemed bad for some reason, drop it - {errStallingPeer, true}, // Peer was detected to be stalling, drop it - {errNoPeers, false}, // No peers to download from, soft race, no issue - {errPendingQueue, false}, // There are blocks still cached, wait to exhaust, no issue - {errTimeout, true}, // No hashes received in due time, drop the peer - {errEmptyHashSet, true}, // No hashes were returned as a response, drop as it's a dead end - {errEmptyHeaderSet, true}, // No headers were returned as a response, drop as it's a dead end - {errPeersUnavailable, true}, // Nobody had the advertised blocks, drop the advertiser - {errInvalidChain, true}, // Hash chain was detected as invalid, definitely drop - {errInvalidBlock, false}, // A bad peer was detected, but not the sync origin - {errInvalidBody, false}, // A bad peer was detected, but not the sync origin - {errInvalidReceipt, false}, // A bad peer was detected, but not the sync origin - {errCancelHashFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop - {errCancelBlockFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop - {errCancelHeaderFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop - {errCancelBodyFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop + {nil, false}, // Sync succeeded, all is well + {errBusy, false}, // Sync is already in progress, no problem + {errUnknownPeer, false}, // Peer is unknown, was already dropped, don't double drop + {errBadPeer, true}, // Peer was deemed bad for some reason, drop it + {errStallingPeer, true}, // Peer was detected to be stalling, drop it + {errNoPeers, false}, // No peers to download from, soft race, no issue + {errTimeout, true}, // No hashes received in due time, drop the peer + {errEmptyHashSet, true}, // No hashes were returned as a response, drop as it's a dead end + {errEmptyHeaderSet, true}, // No headers were returned as a response, drop as it's a dead end + {errPeersUnavailable, true}, // Nobody had the advertised blocks, drop the advertiser + {errInvalidChain, true}, // Hash chain was detected as invalid, definitely drop + {errInvalidBlock, false}, // A bad peer was detected, but not the sync origin + {errInvalidBody, false}, // A bad peer was detected, but not the sync origin + {errInvalidReceipt, false}, // A bad peer was detected, but not the sync origin + {errCancelHashFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop + {errCancelBlockFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop + {errCancelHeaderFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop + {errCancelBodyFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop + {errCancelReceiptFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop + {errCancelProcessing, false}, // Synchronisation was canceled, origin may be innocent, don't drop } // Run the tests and check disconnection status tester := newTester() @@ -1261,6 +1273,8 @@ func TestSyncProgress64Fast(t *testing.T) { testSyncProgress(t, 64, FastSync) } func TestSyncProgress64Light(t *testing.T) { testSyncProgress(t, 64, LightSync) } func testSyncProgress(t *testing.T, protocol int, mode SyncMode) { + t.Parallel() + // Create a small enough block chain to download targetBlocks := blockCacheLimit - 15 hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil) @@ -1331,6 +1345,8 @@ func TestForkedSyncProgress64Fast(t *testing.T) { testForkedSyncProgress(t, 64, func TestForkedSyncProgress64Light(t *testing.T) { testForkedSyncProgress(t, 64, LightSync) } func testForkedSyncProgress(t *testing.T, protocol int, mode SyncMode) { + t.Parallel() + // Create a forked chain to simulate origin revertal common, fork := MaxHashFetch, 2*MaxHashFetch hashesA, hashesB, headersA, headersB, blocksA, blocksB, receiptsA, receiptsB := makeChainFork(common+fork, fork, genesis, nil) @@ -1404,6 +1420,8 @@ func TestFailedSyncProgress64Fast(t *testing.T) { testFailedSyncProgress(t, 64, func TestFailedSyncProgress64Light(t *testing.T) { testFailedSyncProgress(t, 64, LightSync) } func testFailedSyncProgress(t *testing.T, protocol int, mode SyncMode) { + t.Parallel() + // Create a small enough block chain to download targetBlocks := blockCacheLimit - 15 hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil) @@ -1478,6 +1496,8 @@ func TestFakedSyncProgress64Fast(t *testing.T) { testFakedSyncProgress(t, 64, F func TestFakedSyncProgress64Light(t *testing.T) { testFakedSyncProgress(t, 64, LightSync) } func testFakedSyncProgress(t *testing.T, protocol int, mode SyncMode) { + t.Parallel() + // Create a small block chain targetBlocks := blockCacheLimit - 15 hashes, headers, blocks, receipts := makeChain(targetBlocks+3, 0, genesis, nil) @@ -1541,3 +1561,50 @@ func testFakedSyncProgress(t *testing.T, protocol int, mode SyncMode) { t.Fatalf("Final progress mismatch: have %v/%v/%v, want 0-%v/%v/%v", origin, current, latest, targetBlocks, targetBlocks, targetBlocks) } } + +// This test reproduces an issue where unexpected deliveries would +// block indefinitely if they arrived at the right time. +func TestDeliverHeadersHang62(t *testing.T) { testDeliverHeadersHang(t, 62, FullSync) } +func TestDeliverHeadersHang63Full(t *testing.T) { testDeliverHeadersHang(t, 63, FullSync) } +func TestDeliverHeadersHang63Fast(t *testing.T) { testDeliverHeadersHang(t, 63, FastSync) } +func TestDeliverHeadersHang64Full(t *testing.T) { testDeliverHeadersHang(t, 64, FullSync) } +func TestDeliverHeadersHang64Fast(t *testing.T) { testDeliverHeadersHang(t, 64, FastSync) } +func TestDeliverHeadersHang64Light(t *testing.T) { testDeliverHeadersHang(t, 64, LightSync) } + +func testDeliverHeadersHang(t *testing.T, protocol int, mode SyncMode) { + t.Parallel() + hashes, headers, blocks, receipts := makeChain(5, 0, genesis, nil) + fakeHeads := []*types.Header{{}, {}, {}, {}} + for i := 0; i < 200; i++ { + tester := newTester() + tester.newPeer("peer", protocol, hashes, headers, blocks, receipts) + // Whenever the downloader requests headers, flood it with + // a lot of unrequested header deliveries. + tester.downloader.peers.peers["peer"].getAbsHeaders = func(from uint64, count, skip int, reverse bool) error { + deliveriesDone := make(chan struct{}, 500) + for i := 0; i < cap(deliveriesDone); i++ { + peer := fmt.Sprintf("fake-peer%d", i) + go func() { + tester.downloader.DeliverHeaders(peer, fakeHeads) + deliveriesDone <- struct{}{} + }() + } + // Deliver the actual requested headers. + impl := tester.peerGetAbsHeadersFn("peer", 0) + go impl(from, count, skip, reverse) + // None of the extra deliveries should block. + timeout := time.After(5 * time.Second) + for i := 0; i < cap(deliveriesDone); i++ { + select { + case <-deliveriesDone: + case <-timeout: + panic("blocked") + } + } + return nil + } + if err := tester.sync("peer", nil, mode); err != nil { + t.Errorf("sync failed: %v", err) + } + } +} diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go index 1f457cb15..80f08b68f 100644 --- a/eth/downloader/peer.go +++ b/eth/downloader/peer.go @@ -28,7 +28,11 @@ import ( "time" "github.com/ethereum/go-ethereum/common" - "gopkg.in/fatih/set.v0" +) + +const ( + maxLackingHashes = 4096 // Maximum number of entries allowed on the list or lacking items + throughputImpact = 0.1 // The impact a single measurement has on a peer's final throughput value. ) // Hash and block fetchers belonging to eth/61 and below @@ -57,17 +61,16 @@ type peer struct { blockIdle int32 // Current block activity state of the peer (idle = 0, active = 1) receiptIdle int32 // Current receipt activity state of the peer (idle = 0, active = 1) stateIdle int32 // Current node data activity state of the peer (idle = 0, active = 1) - rep int32 // Simple peer reputation - blockCapacity int32 // Number of blocks (bodies) allowed to fetch per request - receiptCapacity int32 // Number of receipts allowed to fetch per request - stateCapacity int32 // Number of node data pieces allowed to fetch per request + blockThroughput float64 // Number of blocks (bodies) measured to be retrievable per second + receiptThroughput float64 // Number of receipts measured to be retrievable per second + stateThroughput float64 // Number of node data pieces measured to be retrievable per second blockStarted time.Time // Time instance when the last block (body)fetch was started receiptStarted time.Time // Time instance when the last receipt fetch was started stateStarted time.Time // Time instance when the last node data fetch was started - ignored *set.Set // Set of hashes not to request (didn't have previously) + lacking map[common.Hash]struct{} // Set of hashes not to request (didn't have previously) getRelHashes relativeHashFetcherFn // [eth/61] Method to retrieve a batch of hashes from an origin hash getAbsHashes absoluteHashFetcherFn // [eth/61] Method to retrieve a batch of hashes from an absolute position @@ -81,6 +84,7 @@ type peer struct { getNodeData stateFetcherFn // [eth/63] Method to retrieve a batch of state trie data version int // Eth protocol version number to switch strategies + lock sync.RWMutex } // newPeer create a new downloader peer, with specific hash and block retrieval @@ -90,12 +94,9 @@ func newPeer(id string, version int, head common.Hash, getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn, getReceipts receiptFetcherFn, getNodeData stateFetcherFn) *peer { return &peer{ - id: id, - head: head, - blockCapacity: 1, - receiptCapacity: 1, - stateCapacity: 1, - ignored: set.New(), + id: id, + head: head, + lacking: make(map[common.Hash]struct{}), getRelHashes: getRelHashes, getAbsHashes: getAbsHashes, @@ -114,12 +115,18 @@ func newPeer(id string, version int, head common.Hash, // Reset clears the internal state of a peer entity. func (p *peer) Reset() { + p.lock.Lock() + defer p.lock.Unlock() + atomic.StoreInt32(&p.blockIdle, 0) atomic.StoreInt32(&p.receiptIdle, 0) - atomic.StoreInt32(&p.blockCapacity, 1) - atomic.StoreInt32(&p.receiptCapacity, 1) - atomic.StoreInt32(&p.stateCapacity, 1) - p.ignored.Clear() + atomic.StoreInt32(&p.stateIdle, 0) + + p.blockThroughput = 0 + p.receiptThroughput = 0 + p.stateThroughput = 0 + + p.lacking = make(map[common.Hash]struct{}) } // Fetch61 sends a block retrieval request to the remote peer. @@ -210,108 +217,116 @@ func (p *peer) FetchNodeData(request *fetchRequest) error { return nil } -// SetBlocksIdle sets the peer to idle, allowing it to execute new retrieval requests. -// Its block retrieval allowance will also be updated either up- or downwards, -// depending on whether the previous fetch completed in time. -func (p *peer) SetBlocksIdle() { - p.setIdle(p.blockStarted, blockSoftTTL, blockHardTTL, MaxBlockFetch, &p.blockCapacity, &p.blockIdle) +// SetBlocksIdle sets the peer to idle, allowing it to execute new block retrieval +// requests. Its estimated block retrieval throughput is updated with that measured +// just now. +func (p *peer) SetBlocksIdle(delivered int) { + p.setIdle(p.blockStarted, delivered, &p.blockThroughput, &p.blockIdle) } -// SetBodiesIdle sets the peer to idle, allowing it to execute new retrieval requests. -// Its block body retrieval allowance will also be updated either up- or downwards, -// depending on whether the previous fetch completed in time. -func (p *peer) SetBodiesIdle() { - p.setIdle(p.blockStarted, bodySoftTTL, bodyHardTTL, MaxBodyFetch, &p.blockCapacity, &p.blockIdle) +// SetBodiesIdle sets the peer to idle, allowing it to execute block body retrieval +// requests. Its estimated body retrieval throughput is updated with that measured +// just now. +func (p *peer) SetBodiesIdle(delivered int) { + p.setIdle(p.blockStarted, delivered, &p.blockThroughput, &p.blockIdle) } -// SetReceiptsIdle sets the peer to idle, allowing it to execute new retrieval requests. -// Its receipt retrieval allowance will also be updated either up- or downwards, -// depending on whether the previous fetch completed in time. -func (p *peer) SetReceiptsIdle() { - p.setIdle(p.receiptStarted, receiptSoftTTL, receiptHardTTL, MaxReceiptFetch, &p.receiptCapacity, &p.receiptIdle) +// SetReceiptsIdle sets the peer to idle, allowing it to execute new receipt +// retrieval requests. Its estimated receipt retrieval throughput is updated +// with that measured just now. +func (p *peer) SetReceiptsIdle(delivered int) { + p.setIdle(p.receiptStarted, delivered, &p.receiptThroughput, &p.receiptIdle) } -// SetNodeDataIdle sets the peer to idle, allowing it to execute new retrieval -// requests. Its node data retrieval allowance will also be updated either up- or -// downwards, depending on whether the previous fetch completed in time. -func (p *peer) SetNodeDataIdle() { - p.setIdle(p.stateStarted, stateSoftTTL, stateSoftTTL, MaxStateFetch, &p.stateCapacity, &p.stateIdle) +// SetNodeDataIdle sets the peer to idle, allowing it to execute new state trie +// data retrieval requests. Its estimated state retrieval throughput is updated +// with that measured just now. +func (p *peer) SetNodeDataIdle(delivered int) { + p.setIdle(p.stateStarted, delivered, &p.stateThroughput, &p.stateIdle) } // setIdle sets the peer to idle, allowing it to execute new retrieval requests. -// Its data retrieval allowance will also be updated either up- or downwards, -// depending on whether the previous fetch completed in time. -func (p *peer) setIdle(started time.Time, softTTL, hardTTL time.Duration, maxFetch int, capacity, idle *int32) { - // Update the peer's download allowance based on previous performance - scale := 2.0 - if time.Since(started) > softTTL { - scale = 0.5 - if time.Since(started) > hardTTL { - scale = 1 / float64(maxFetch) // reduces capacity to 1 - } +// Its estimated retrieval throughput is updated with that measured just now. +func (p *peer) setIdle(started time.Time, delivered int, throughput *float64, idle *int32) { + // Irrelevant of the scaling, make sure the peer ends up idle + defer atomic.StoreInt32(idle, 0) + + p.lock.RLock() + defer p.lock.RUnlock() + + // If nothing was delivered (hard timeout / unavailable data), reduce throughput to minimum + if delivered == 0 { + *throughput = 0 + return } - for { - // Calculate the new download bandwidth allowance - prev := atomic.LoadInt32(capacity) - next := int32(math.Max(1, math.Min(float64(maxFetch), float64(prev)*scale))) - - // Try to update the old value - if atomic.CompareAndSwapInt32(capacity, prev, next) { - // If we're having problems at 1 capacity, try to find better peers - if next == 1 { - p.Demote() - } - break - } - } - // Set the peer to idle to allow further fetch requests - atomic.StoreInt32(idle, 0) + // Otherwise update the throughput with a new measurement + measured := float64(delivered) / (float64(time.Since(started)+1) / float64(time.Second)) // +1 (ns) to ensure non-zero divisor + *throughput = (1-throughputImpact)*(*throughput) + throughputImpact*measured } // BlockCapacity retrieves the peers block download allowance based on its -// previously discovered bandwidth capacity. +// previously discovered throughput. func (p *peer) BlockCapacity() int { - return int(atomic.LoadInt32(&p.blockCapacity)) + p.lock.RLock() + defer p.lock.RUnlock() + + return int(math.Max(1, math.Min(p.blockThroughput*float64(blockTargetRTT)/float64(time.Second), float64(MaxBlockFetch)))) } -// ReceiptCapacity retrieves the peers block download allowance based on its -// previously discovered bandwidth capacity. +// ReceiptCapacity retrieves the peers receipt download allowance based on its +// previously discovered throughput. func (p *peer) ReceiptCapacity() int { - return int(atomic.LoadInt32(&p.receiptCapacity)) + p.lock.RLock() + defer p.lock.RUnlock() + + return int(math.Max(1, math.Min(p.receiptThroughput*float64(receiptTargetRTT)/float64(time.Second), float64(MaxReceiptFetch)))) } -// NodeDataCapacity retrieves the peers block download allowance based on its -// previously discovered bandwidth capacity. +// NodeDataCapacity retrieves the peers state download allowance based on its +// previously discovered throughput. func (p *peer) NodeDataCapacity() int { - return int(atomic.LoadInt32(&p.stateCapacity)) -} + p.lock.RLock() + defer p.lock.RUnlock() -// Promote increases the peer's reputation. -func (p *peer) Promote() { - atomic.AddInt32(&p.rep, 1) + return int(math.Max(1, math.Min(p.stateThroughput*float64(stateTargetRTT)/float64(time.Second), float64(MaxStateFetch)))) } -// Demote decreases the peer's reputation or leaves it at 0. -func (p *peer) Demote() { - for { - // Calculate the new reputation value - prev := atomic.LoadInt32(&p.rep) - next := prev / 2 +// MarkLacking appends a new entity to the set of items (blocks, receipts, states) +// that a peer is known not to have (i.e. have been requested before). If the +// set reaches its maximum allowed capacity, items are randomly dropped off. +func (p *peer) MarkLacking(hash common.Hash) { + p.lock.Lock() + defer p.lock.Unlock() - // Try to update the old value - if atomic.CompareAndSwapInt32(&p.rep, prev, next) { - return + for len(p.lacking) >= maxLackingHashes { + for drop, _ := range p.lacking { + delete(p.lacking, drop) + break } } + p.lacking[hash] = struct{}{} +} + +// Lacks retrieves whether the hash of a blockchain item is on the peers lacking +// list (i.e. whether we know that the peer does not have it). +func (p *peer) Lacks(hash common.Hash) bool { + p.lock.RLock() + defer p.lock.RUnlock() + + _, ok := p.lacking[hash] + return ok } // String implements fmt.Stringer. func (p *peer) String() string { + p.lock.RLock() + defer p.lock.RUnlock() + return fmt.Sprintf("Peer %s [%s]", p.id, - fmt.Sprintf("reputation %3d, ", atomic.LoadInt32(&p.rep))+ - fmt.Sprintf("block cap %3d, ", atomic.LoadInt32(&p.blockCapacity))+ - fmt.Sprintf("receipt cap %3d, ", atomic.LoadInt32(&p.receiptCapacity))+ - fmt.Sprintf("ignored %4d", p.ignored.Size()), + fmt.Sprintf("blocks %3.2f/s, ", p.blockThroughput)+ + fmt.Sprintf("receipts %3.2f/s, ", p.receiptThroughput)+ + fmt.Sprintf("states %3.2f/s, ", p.stateThroughput)+ + fmt.Sprintf("lacking %4d", len(p.lacking)), ) } @@ -342,6 +357,10 @@ func (ps *peerSet) Reset() { // Register injects a new peer into the working set, or returns an error if the // peer is already known. +// +// The method also sets the starting throughput values of the new peer to the +// average of all existing peers, to give it a realistic change of being used +// for data retrievals. func (ps *peerSet) Register(p *peer) error { ps.lock.Lock() defer ps.lock.Unlock() @@ -349,6 +368,20 @@ func (ps *peerSet) Register(p *peer) error { if _, ok := ps.peers[p.id]; ok { return errAlreadyRegistered } + if len(ps.peers) > 0 { + p.blockThroughput, p.receiptThroughput, p.stateThroughput = 0, 0, 0 + + for _, peer := range ps.peers { + peer.lock.RLock() + p.blockThroughput += peer.blockThroughput + p.receiptThroughput += peer.receiptThroughput + p.stateThroughput += peer.stateThroughput + peer.lock.RUnlock() + } + p.blockThroughput /= float64(len(ps.peers)) + p.receiptThroughput /= float64(len(ps.peers)) + p.stateThroughput /= float64(len(ps.peers)) + } ps.peers[p.id] = p return nil } @@ -400,7 +433,12 @@ func (ps *peerSet) BlockIdlePeers() ([]*peer, int) { idle := func(p *peer) bool { return atomic.LoadInt32(&p.blockIdle) == 0 } - return ps.idlePeers(61, 61, idle) + throughput := func(p *peer) float64 { + p.lock.RLock() + defer p.lock.RUnlock() + return p.blockThroughput + } + return ps.idlePeers(61, 61, idle, throughput) } // BodyIdlePeers retrieves a flat list of all the currently body-idle peers within @@ -409,7 +447,12 @@ func (ps *peerSet) BodyIdlePeers() ([]*peer, int) { idle := func(p *peer) bool { return atomic.LoadInt32(&p.blockIdle) == 0 } - return ps.idlePeers(62, 64, idle) + throughput := func(p *peer) float64 { + p.lock.RLock() + defer p.lock.RUnlock() + return p.blockThroughput + } + return ps.idlePeers(62, 64, idle, throughput) } // ReceiptIdlePeers retrieves a flat list of all the currently receipt-idle peers @@ -418,7 +461,12 @@ func (ps *peerSet) ReceiptIdlePeers() ([]*peer, int) { idle := func(p *peer) bool { return atomic.LoadInt32(&p.receiptIdle) == 0 } - return ps.idlePeers(63, 64, idle) + throughput := func(p *peer) float64 { + p.lock.RLock() + defer p.lock.RUnlock() + return p.receiptThroughput + } + return ps.idlePeers(63, 64, idle, throughput) } // NodeDataIdlePeers retrieves a flat list of all the currently node-data-idle @@ -427,12 +475,18 @@ func (ps *peerSet) NodeDataIdlePeers() ([]*peer, int) { idle := func(p *peer) bool { return atomic.LoadInt32(&p.stateIdle) == 0 } - return ps.idlePeers(63, 64, idle) + throughput := func(p *peer) float64 { + p.lock.RLock() + defer p.lock.RUnlock() + return p.stateThroughput + } + return ps.idlePeers(63, 64, idle, throughput) } // idlePeers retrieves a flat list of all currently idle peers satisfying the // protocol version constraints, using the provided function to check idleness. -func (ps *peerSet) idlePeers(minProtocol, maxProtocol int, idleCheck func(*peer) bool) ([]*peer, int) { +// The resulting set of peers are sorted by their measure throughput. +func (ps *peerSet) idlePeers(minProtocol, maxProtocol int, idleCheck func(*peer) bool, throughput func(*peer) float64) ([]*peer, int) { ps.lock.RLock() defer ps.lock.RUnlock() @@ -447,7 +501,7 @@ func (ps *peerSet) idlePeers(minProtocol, maxProtocol int, idleCheck func(*peer) } for i := 0; i < len(idle); i++ { for j := i + 1; j < len(idle); j++ { - if atomic.LoadInt32(&idle[i].rep) < atomic.LoadInt32(&idle[j].rep) { + if throughput(idle[i]) < throughput(idle[j]) { idle[i], idle[j] = idle[j], idle[i] } } diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index 56b46e285..1e55560db 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -101,11 +101,14 @@ type queue struct { resultCache []*fetchResult // Downloaded but not yet delivered fetch results resultOffset uint64 // Offset of the first cached fetch result in the block chain - lock sync.RWMutex + lock *sync.Mutex + active *sync.Cond + closed bool } // newQueue creates a new download queue for scheduling block retrieval. func newQueue(stateDb ethdb.Database) *queue { + lock := new(sync.Mutex) return &queue{ hashPool: make(map[common.Hash]int), hashQueue: prque.New(), @@ -122,6 +125,8 @@ func newQueue(stateDb ethdb.Database) *queue { statePendPool: make(map[string]*fetchRequest), stateDatabase: stateDb, resultCache: make([]*fetchResult, blockCacheLimit), + active: sync.NewCond(lock), + lock: lock, } } @@ -133,6 +138,7 @@ func (q *queue) Reset() { q.stateSchedLock.Lock() defer q.stateSchedLock.Unlock() + q.closed = false q.mode = FullSync q.fastSyncPivot = 0 @@ -162,18 +168,27 @@ func (q *queue) Reset() { q.resultOffset = 0 } +// Close marks the end of the sync, unblocking WaitResults. +// It may be called even if the queue is already closed. +func (q *queue) Close() { + q.lock.Lock() + q.closed = true + q.lock.Unlock() + q.active.Broadcast() +} + // PendingBlocks retrieves the number of block (body) requests pending for retrieval. func (q *queue) PendingBlocks() int { - q.lock.RLock() - defer q.lock.RUnlock() + q.lock.Lock() + defer q.lock.Unlock() return q.hashQueue.Size() + q.blockTaskQueue.Size() } // PendingReceipts retrieves the number of block receipts pending for retrieval. func (q *queue) PendingReceipts() int { - q.lock.RLock() - defer q.lock.RUnlock() + q.lock.Lock() + defer q.lock.Unlock() return q.receiptTaskQueue.Size() } @@ -192,8 +207,8 @@ func (q *queue) PendingNodeData() int { // InFlightBlocks retrieves whether there are block fetch requests currently in // flight. func (q *queue) InFlightBlocks() bool { - q.lock.RLock() - defer q.lock.RUnlock() + q.lock.Lock() + defer q.lock.Unlock() return len(q.blockPendPool) > 0 } @@ -201,8 +216,8 @@ func (q *queue) InFlightBlocks() bool { // InFlightReceipts retrieves whether there are receipt fetch requests currently // in flight. func (q *queue) InFlightReceipts() bool { - q.lock.RLock() - defer q.lock.RUnlock() + q.lock.Lock() + defer q.lock.Unlock() return len(q.receiptPendPool) > 0 } @@ -210,8 +225,8 @@ func (q *queue) InFlightReceipts() bool { // InFlightNodeData retrieves whether there are node data entry fetch requests // currently in flight. func (q *queue) InFlightNodeData() bool { - q.lock.RLock() - defer q.lock.RUnlock() + q.lock.Lock() + defer q.lock.Unlock() return len(q.statePendPool)+int(atomic.LoadInt32(&q.stateProcessors)) > 0 } @@ -219,8 +234,8 @@ func (q *queue) InFlightNodeData() bool { // Idle returns if the queue is fully idle or has some data still inside. This // method is used by the tester to detect termination events. func (q *queue) Idle() bool { - q.lock.RLock() - defer q.lock.RUnlock() + q.lock.Lock() + defer q.lock.Unlock() queued := q.hashQueue.Size() + q.blockTaskQueue.Size() + q.receiptTaskQueue.Size() + q.stateTaskQueue.Size() pending := len(q.blockPendPool) + len(q.receiptPendPool) + len(q.statePendPool) @@ -237,8 +252,8 @@ func (q *queue) Idle() bool { // FastSyncPivot retrieves the currently used fast sync pivot point. func (q *queue) FastSyncPivot() uint64 { - q.lock.RLock() - defer q.lock.RUnlock() + q.lock.Lock() + defer q.lock.Unlock() return q.fastSyncPivot } @@ -246,8 +261,8 @@ func (q *queue) FastSyncPivot() uint64 { // ShouldThrottleBlocks checks if the download should be throttled (active block (body) // fetches exceed block cache). func (q *queue) ShouldThrottleBlocks() bool { - q.lock.RLock() - defer q.lock.RUnlock() + q.lock.Lock() + defer q.lock.Unlock() // Calculate the currently in-flight block (body) requests pending := 0 @@ -261,8 +276,8 @@ func (q *queue) ShouldThrottleBlocks() bool { // ShouldThrottleReceipts checks if the download should be throttled (active receipt // fetches exceed block cache). func (q *queue) ShouldThrottleReceipts() bool { - q.lock.RLock() - defer q.lock.RUnlock() + q.lock.Lock() + defer q.lock.Unlock() // Calculate the currently in-flight receipt requests pending := 0 @@ -351,91 +366,74 @@ func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header { return inserts } -// GetHeadResult retrieves the first fetch result from the cache, or nil if it hasn't -// been downloaded yet (or simply non existent). -func (q *queue) GetHeadResult() *fetchResult { - q.lock.RLock() - defer q.lock.RUnlock() +// WaitResults retrieves and permanently removes a batch of fetch +// results from the cache. the result slice will be empty if the queue +// has been closed. +func (q *queue) WaitResults() []*fetchResult { + q.lock.Lock() + defer q.lock.Unlock() - // If there are no results pending, return nil - if len(q.resultCache) == 0 || q.resultCache[0] == nil { - return nil - } - // If the next result is still incomplete, return nil - if q.resultCache[0].Pending > 0 { - return nil + nproc := q.countProcessableItems() + for nproc == 0 && !q.closed { + q.active.Wait() + nproc = q.countProcessableItems() } - // If the next result is the fast sync pivot... - if q.mode == FastSync && q.resultCache[0].Header.Number.Uint64() == q.fastSyncPivot { - // If the pivot state trie is still being pulled, return nil - if len(q.stateTaskPool) > 0 { - return nil + results := make([]*fetchResult, nproc) + copy(results, q.resultCache[:nproc]) + if len(results) > 0 { + // Mark results as done before dropping them from the cache. + for _, result := range results { + hash := result.Header.Hash() + delete(q.blockDonePool, hash) + delete(q.receiptDonePool, hash) } - if q.PendingNodeData() > 0 { - return nil - } - // If the state is done, but not enough post-pivot headers were verified, stall... - for i := 0; i < fsHeaderForceVerify; i++ { - if i+1 >= len(q.resultCache) || q.resultCache[i+1] == nil { - return nil - } + // Delete the results from the cache and clear the tail. + copy(q.resultCache, q.resultCache[nproc:]) + for i := len(q.resultCache) - nproc; i < len(q.resultCache); i++ { + q.resultCache[i] = nil } + // Advance the expected block number of the first cache entry. + q.resultOffset += uint64(nproc) } - return q.resultCache[0] + return results } -// TakeResults retrieves and permanently removes a batch of fetch results from -// the cache. -func (q *queue) TakeResults() []*fetchResult { - q.lock.Lock() - defer q.lock.Unlock() - - // Accumulate all available results - results := []*fetchResult{} +// countProcessableItems counts the processable items. +func (q *queue) countProcessableItems() int { for i, result := range q.resultCache { - // Stop if no more results are ready + // Don't process incomplete or unavailable items. if result == nil || result.Pending > 0 { - break + return i } - // The fast sync pivot block may only be processed after state fetch completes - if q.mode == FastSync && result.Header.Number.Uint64() == q.fastSyncPivot { - if len(q.stateTaskPool) > 0 { - break - } - if q.PendingNodeData() > 0 { - break - } - // Even is state fetch is done, ensure post-pivot headers passed verifications - safe := true - for j := 0; j < fsHeaderForceVerify; j++ { - if i+j+1 >= len(q.resultCache) || q.resultCache[i+j+1] == nil { - safe = false + // Special handling for the fast-sync pivot block: + if q.mode == FastSync { + bnum := result.Header.Number.Uint64() + if bnum == q.fastSyncPivot { + // If the state of the pivot block is not + // available yet, we cannot proceed and return 0. + // + // Stop before processing the pivot block to ensure that + // resultCache has space for fsHeaderForceVerify items. Not + // doing this could leave us unable to download the required + // amount of headers. + if i > 0 || len(q.stateTaskPool) > 0 || q.PendingNodeData() > 0 { + return i + } + for j := 0; j < fsHeaderForceVerify; j++ { + if i+j+1 >= len(q.resultCache) || q.resultCache[i+j+1] == nil { + return i + } } } - if !safe { - break + // If we're just the fast sync pivot, stop as well + // because the following batch needs different insertion. + // This simplifies handling the switchover in d.process. + if bnum == q.fastSyncPivot+1 && i > 0 { + return i } } - // If we've just inserted the fast sync pivot, stop as the following batch needs different insertion - if q.mode == FastSync && result.Header.Number.Uint64() == q.fastSyncPivot+1 && len(results) > 0 { - break - } - results = append(results, result) - - hash := result.Header.Hash() - delete(q.blockDonePool, hash) - delete(q.receiptDonePool, hash) - } - // Delete the results from the slice and let them be garbage collected - // without this slice trick the results would stay in memory until nil - // would be assigned to them. - copy(q.resultCache, q.resultCache[len(results):]) - for k, n := len(q.resultCache)-len(results), len(q.resultCache); k < n; k++ { - q.resultCache[k] = nil } - q.resultOffset += uint64(len(results)) - - return results + return len(q.resultCache) } // ReserveBlocks reserves a set of block hashes for the given peer, skipping any @@ -501,7 +499,7 @@ func (q *queue) reserveHashes(p *peer, count int, taskQueue *prque.Prque, taskGe for proc := 0; (allowance == 0 || proc < allowance) && len(send) < count && !taskQueue.Empty(); proc++ { hash, priority := taskQueue.Pop() - if p.ignored.Has(hash) { + if p.Lacks(hash.(common.Hash)) { skip[hash.(common.Hash)] = int(priority) } else { send[hash.(common.Hash)] = int(priority) @@ -584,6 +582,7 @@ func (q *queue) reserveHeaders(p *peer, count int, taskPool map[common.Hash]*typ // If we're the first to request this task, initialise the result container index := int(header.Number.Int64() - int64(q.resultOffset)) if index >= len(q.resultCache) || index < 0 { + common.Report("index allocation went beyond available resultCache space") return nil, false, errInvalidChain } if q.resultCache[index] == nil { @@ -607,7 +606,7 @@ func (q *queue) reserveHeaders(p *peer, count int, taskPool map[common.Hash]*typ continue } // Otherwise unless the peer is known not to have the data, add to the retrieve list - if p.ignored.Has(header.Hash()) { + if p.Lacks(header.Hash()) { skip = append(skip, header) } else { send = append(send, header) @@ -617,6 +616,10 @@ func (q *queue) reserveHeaders(p *peer, count int, taskPool map[common.Hash]*typ for _, header := range skip { taskQueue.Push(header, -float32(header.Number.Uint64())) } + if progress { + // Wake WaitResults, resultCache was modified + q.active.Signal() + } // Assemble and return the block download request if len(send) == 0 { return nil, progress, nil @@ -700,7 +703,7 @@ func (q *queue) Revoke(peerId string) { // ExpireBlocks checks for in flight requests that exceeded a timeout allowance, // canceling them and returning the responsible peers for penalisation. -func (q *queue) ExpireBlocks(timeout time.Duration) []string { +func (q *queue) ExpireBlocks(timeout time.Duration) map[string]int { q.lock.Lock() defer q.lock.Unlock() @@ -709,7 +712,7 @@ func (q *queue) ExpireBlocks(timeout time.Duration) []string { // ExpireBodies checks for in flight block body requests that exceeded a timeout // allowance, canceling them and returning the responsible peers for penalisation. -func (q *queue) ExpireBodies(timeout time.Duration) []string { +func (q *queue) ExpireBodies(timeout time.Duration) map[string]int { q.lock.Lock() defer q.lock.Unlock() @@ -718,7 +721,7 @@ func (q *queue) ExpireBodies(timeout time.Duration) []string { // ExpireReceipts checks for in flight receipt requests that exceeded a timeout // allowance, canceling them and returning the responsible peers for penalisation. -func (q *queue) ExpireReceipts(timeout time.Duration) []string { +func (q *queue) ExpireReceipts(timeout time.Duration) map[string]int { q.lock.Lock() defer q.lock.Unlock() @@ -727,7 +730,7 @@ func (q *queue) ExpireReceipts(timeout time.Duration) []string { // ExpireNodeData checks for in flight node data requests that exceeded a timeout // allowance, canceling them and returning the responsible peers for penalisation. -func (q *queue) ExpireNodeData(timeout time.Duration) []string { +func (q *queue) ExpireNodeData(timeout time.Duration) map[string]int { q.lock.Lock() defer q.lock.Unlock() @@ -737,12 +740,12 @@ func (q *queue) ExpireNodeData(timeout time.Duration) []string { // expire is the generic check that move expired tasks from a pending pool back // into a task pool, returning all entities caught with expired tasks. // -// Note, this method expects the queue lock to be already held for writing. The +// Note, this method expects the queue lock to be already held. The // reason the lock is not obtained in here is because the parameters already need // to access the queue, so they already need a lock anyway. -func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, taskQueue *prque.Prque, timeoutMeter metrics.Meter) []string { +func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, taskQueue *prque.Prque, timeoutMeter metrics.Meter) map[string]int { // Iterate over the expired requests and return each to the queue - peers := []string{} + expiries := make(map[string]int) for id, request := range pendPool { if time.Since(request.Time) > timeout { // Update the metrics with the timeout @@ -755,25 +758,32 @@ func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, for _, header := range request.Headers { taskQueue.Push(header, -float32(header.Number.Uint64())) } - peers = append(peers, id) + // Add the peer to the expiry report along the the number of failed requests + expirations := len(request.Hashes) + if expirations < len(request.Headers) { + expirations = len(request.Headers) + } + expiries[id] = expirations } } // Remove the expired requests from the pending pool - for _, id := range peers { + for id, _ := range expiries { delete(pendPool, id) } - return peers + return expiries } -// DeliverBlocks injects a block retrieval response into the download queue. -func (q *queue) DeliverBlocks(id string, blocks []*types.Block) error { +// DeliverBlocks injects a block retrieval response into the download queue. The +// method returns the number of blocks accepted from the delivery and also wakes +// any threads waiting for data delivery. +func (q *queue) DeliverBlocks(id string, blocks []*types.Block) (int, error) { q.lock.Lock() defer q.lock.Unlock() // Short circuit if the blocks were never requested request := q.blockPendPool[id] if request == nil { - return errNoFetchesPending + return 0, errNoFetchesPending } blockReqTimer.UpdateSince(request.Time) delete(q.blockPendPool, id) @@ -781,11 +791,11 @@ func (q *queue) DeliverBlocks(id string, blocks []*types.Block) error { // If no blocks were retrieved, mark them as unavailable for the origin peer if len(blocks) == 0 { for hash, _ := range request.Hashes { - request.Peer.ignored.Add(hash) + request.Peer.MarkLacking(hash) } } // Iterate over the downloaded blocks and add each of them - errs := make([]error, 0) + accepted, errs := 0, make([]error, 0) for _, block := range blocks { // Skip any blocks that were not requested hash := block.Hash() @@ -808,29 +818,33 @@ func (q *queue) DeliverBlocks(id string, blocks []*types.Block) error { delete(request.Hashes, hash) delete(q.hashPool, hash) + accepted++ } // Return all failed or missing fetches to the queue for hash, index := range request.Hashes { q.hashQueue.Push(hash, float32(index)) } + // Wake up WaitResults + if accepted > 0 { + q.active.Signal() + } // If none of the blocks were good, it's a stale delivery switch { case len(errs) == 0: - return nil - + return accepted, nil case len(errs) == 1 && (errs[0] == errInvalidChain || errs[0] == errInvalidBlock): - return errs[0] - + return accepted, errs[0] case len(errs) == len(blocks): - return errStaleDelivery - + return accepted, errStaleDelivery default: - return fmt.Errorf("multiple failures: %v", errs) + return accepted, fmt.Errorf("multiple failures: %v", errs) } } // DeliverBodies injects a block body retrieval response into the results queue. -func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, uncleLists [][]*types.Header) error { +// The method returns the number of blocks bodies accepted from the delivery and +// also wakes any threads waiting for data delivery. +func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, uncleLists [][]*types.Header) (int, error) { q.lock.Lock() defer q.lock.Unlock() @@ -846,7 +860,9 @@ func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, uncleLi } // DeliverReceipts injects a receipt retrieval response into the results queue. -func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt) error { +// The method returns the number of transaction receipts accepted from the delivery +// and also wakes any threads waiting for data delivery. +func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt) (int, error) { q.lock.Lock() defer q.lock.Unlock() @@ -865,26 +881,29 @@ func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt) error // Note, this method expects the queue lock to be already held for writing. The // reason the lock is not obtained in here is because the parameters already need // to access the queue, so they already need a lock anyway. -func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque, pendPool map[string]*fetchRequest, - donePool map[common.Hash]struct{}, reqTimer metrics.Timer, results int, reconstruct func(header *types.Header, index int, result *fetchResult) error) error { +func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque, + pendPool map[string]*fetchRequest, donePool map[common.Hash]struct{}, reqTimer metrics.Timer, + results int, reconstruct func(header *types.Header, index int, result *fetchResult) error) (int, error) { + // Short circuit if the data was never requested request := pendPool[id] if request == nil { - return errNoFetchesPending + return 0, errNoFetchesPending } reqTimer.UpdateSince(request.Time) delete(pendPool, id) // If no data items were retrieved, mark them as unavailable for the origin peer if results == 0 { - for hash, _ := range request.Headers { - request.Peer.ignored.Add(hash) + for _, header := range request.Headers { + request.Peer.MarkLacking(header.Hash()) } } // Assemble each of the results with their headers and retrieved data parts var ( - failure error - useful bool + accepted int + failure error + useful bool ) for i, header := range request.Headers { // Short circuit assembly if no more fetch results are found @@ -904,6 +923,7 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ donePool[header.Hash()] = struct{}{} q.resultCache[index].Pending-- useful = true + accepted++ // Clean up a successful fetch request.Headers[i] = nil @@ -915,28 +935,32 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ taskQueue.Push(header, -float32(header.Number.Uint64())) } } + // Wake up WaitResults + if accepted > 0 { + q.active.Signal() + } // If none of the data was good, it's a stale delivery switch { case failure == nil || failure == errInvalidChain: - return failure - + return accepted, failure case useful: - return fmt.Errorf("partial failure: %v", failure) - + return accepted, fmt.Errorf("partial failure: %v", failure) default: - return errStaleDelivery + return accepted, errStaleDelivery } } // DeliverNodeData injects a node state data retrieval response into the queue. -func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, int)) error { +// The method returns the number of node state entries originally requested, and +// the number of them actually accepted from the delivery. +func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, int)) (int, error) { q.lock.Lock() defer q.lock.Unlock() // Short circuit if the data was never requested request := q.statePendPool[id] if request == nil { - return errNoFetchesPending + return 0, errNoFetchesPending } stateReqTimer.UpdateSince(request.Time) delete(q.statePendPool, id) @@ -944,14 +968,14 @@ func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, i // If no data was retrieved, mark their hashes as unavailable for the origin peer if len(data) == 0 { for hash, _ := range request.Hashes { - request.Peer.ignored.Add(hash) + request.Peer.MarkLacking(hash) } } // Iterate over the downloaded data and verify each of them - errs := make([]error, 0) + accepted, errs := 0, make([]error, 0) process := []trie.SyncResult{} for _, blob := range data { - // Skip any blocks that were not requested + // Skip any state trie entires that were not requested hash := common.BytesToHash(crypto.Sha3(blob)) if _, ok := request.Hashes[hash]; !ok { errs = append(errs, fmt.Errorf("non-requested state data %x", hash)) @@ -959,6 +983,7 @@ func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, i } // Inject the next state trie item into the processing queue process = append(process, trie.SyncResult{hash, blob}) + accepted++ delete(request.Hashes, hash) delete(q.stateTaskPool, hash) @@ -976,19 +1001,21 @@ func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, i // If none of the data items were good, it's a stale delivery switch { case len(errs) == 0: - return nil - + return accepted, nil case len(errs) == len(request.Hashes): - return errStaleDelivery - + return accepted, errStaleDelivery default: - return fmt.Errorf("multiple failures: %v", errs) + return accepted, fmt.Errorf("multiple failures: %v", errs) } } // deliverNodeData is the asynchronous node data processor that injects a batch // of sync results into the state scheduler. func (q *queue) deliverNodeData(results []trie.SyncResult, callback func(error, int)) { + // Wake up WaitResults after the state has been written because it + // might be waiting for the pivot block state to get completed. + defer q.active.Signal() + // Process results one by one to permit task fetches in between for i, result := range results { q.stateSchedLock.Lock() diff --git a/eth/filters/filter_test.go b/eth/filters/filter_test.go index a5418e2e7..5772114b3 100644 --- a/eth/filters/filter_test.go +++ b/eth/filters/filter_test.go @@ -64,7 +64,7 @@ func BenchmarkMipmaps(b *testing.B) { } // store the receipts - err := core.PutReceipts(db, receipts) + err := core.WriteReceipts(db, receipts) if err != nil { b.Fatal(err) } @@ -78,7 +78,7 @@ func BenchmarkMipmaps(b *testing.B) { if err := core.WriteHeadBlockHash(db, block.Hash()); err != nil { b.Fatalf("failed to insert block number: %v", err) } - if err := core.PutBlockReceipts(db, block.Hash(), receipts[i]); err != nil { + if err := core.WriteBlockReceipts(db, block.Hash(), receipts[i]); err != nil { b.Fatal("error writing block receipts:", err) } } @@ -163,7 +163,7 @@ func TestFilters(t *testing.T) { } // store the receipts - err := core.PutReceipts(db, receipts) + err := core.WriteReceipts(db, receipts) if err != nil { t.Fatal(err) } @@ -180,7 +180,7 @@ func TestFilters(t *testing.T) { if err := core.WriteHeadBlockHash(db, block.Hash()); err != nil { t.Fatalf("failed to insert block number: %v", err) } - if err := core.PutBlockReceipts(db, block.Hash(), receipts[i]); err != nil { + if err := core.WriteBlockReceipts(db, block.Hash(), receipts[i]); err != nil { t.Fatal("error writing block receipts:", err) } } diff --git a/eth/gasprice.go b/eth/gasprice.go index b752c22dd..e0de89e62 100644 --- a/eth/gasprice.go +++ b/eth/gasprice.go @@ -166,7 +166,7 @@ func (self *GasPriceOracle) processBlock(block *types.Block) { func (self *GasPriceOracle) lowestPrice(block *types.Block) *big.Int { gasUsed := big.NewInt(0) - receipts := self.eth.BlockProcessor().GetBlockReceipts(block.Hash()) + receipts := core.GetBlockReceipts(self.eth.ChainDb(), block.Hash()) if len(receipts) > 0 { if cgu := receipts[len(receipts)-1].CumulativeGasUsed; cgu != nil { gasUsed = receipts[len(receipts)-1].CumulativeGasUsed diff --git a/eth/handler.go b/eth/handler.go index 7dc7de80e..d8c5b4b64 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -34,6 +34,7 @@ import ( "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/discover" "github.com/ethereum/go-ethereum/pow" "github.com/ethereum/go-ethereum/rlp" ) @@ -55,6 +56,8 @@ type hashFetcherFn func(common.Hash) error type blockFetcherFn func([]common.Hash) error type ProtocolManager struct { + networkId int + fastSync bool txpool txPool blockchain *core.BlockChain @@ -91,6 +94,7 @@ func NewProtocolManager(fastSync bool, networkId int, mux *event.TypeMux, txpool } // Create the protocol manager with the base fields manager := &ProtocolManager{ + networkId: networkId, fastSync: fastSync, eventMux: mux, txpool: txpool, @@ -111,14 +115,23 @@ func NewProtocolManager(fastSync bool, networkId int, mux *event.TypeMux, txpool // Compatible; initialise the sub-protocol version := version // Closure for the run manager.SubProtocols = append(manager.SubProtocols, p2p.Protocol{ - Name: "eth", + Name: ProtocolName, Version: version, Length: ProtocolLengths[i], Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error { - peer := manager.newPeer(int(version), networkId, p, rw) + peer := manager.newPeer(int(version), p, rw) manager.newPeerCh <- peer return manager.handle(peer) }, + NodeInfo: func() interface{} { + return manager.NodeInfo() + }, + PeerInfo: func(id discover.NodeID) interface{} { + if p := manager.peers.Peer(fmt.Sprintf("%x", id[:8])); p != nil { + return p.Info() + } + return nil + }, }) } if len(manager.SubProtocols) == 0 { @@ -188,8 +201,8 @@ func (pm *ProtocolManager) Stop() { glog.V(logger.Info).Infoln("Ethereum protocol handler stopped") } -func (pm *ProtocolManager) newPeer(pv, nv int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { - return newPeer(pv, nv, p, newMeteredMsgWriter(rw)) +func (pm *ProtocolManager) newPeer(pv int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { + return newPeer(pv, p, newMeteredMsgWriter(rw)) } // handle is the callback invoked to manage the life cycle of an eth peer. When @@ -199,7 +212,7 @@ func (pm *ProtocolManager) handle(p *peer) error { // Execute the Ethereum handshake td, head, genesis := pm.blockchain.Status() - if err := p.Handshake(td, head, genesis); err != nil { + if err := p.Handshake(pm.networkId, td, head, genesis); err != nil { glog.V(logger.Debug).Infof("%v: handshake failed: %v", p, err) return err } @@ -730,3 +743,22 @@ func (self *ProtocolManager) txBroadcastLoop() { self.BroadcastTx(event.Tx.Hash(), event.Tx) } } + +// EthNodeInfo represents a short summary of the Ethereum sub-protocol metadata known +// about the host peer. +type EthNodeInfo struct { + Network int `json:"network"` // Ethereum network ID (0=Olympic, 1=Frontier, 2=Morden) + Difficulty *big.Int `json:"difficulty"` // Total difficulty of the host's blockchain + Genesis string `json:"genesis"` // SHA3 hash of the host's genesis block + Head string `json:"head"` // SHA3 hash of the host's best owned block +} + +// NodeInfo retrieves some protocol metadata about the running host node. +func (self *ProtocolManager) NodeInfo() *EthNodeInfo { + return &EthNodeInfo{ + Network: self.networkId, + Difficulty: self.blockchain.GetTd(self.blockchain.CurrentBlock().Hash()), + Genesis: fmt.Sprintf("%x", self.blockchain.Genesis().Hash()), + Head: fmt.Sprintf("%x", self.blockchain.CurrentBlock().Hash()), + } +} diff --git a/eth/helper_test.go b/eth/helper_test.go index 16907be8b..bbd1fb818 100644 --- a/eth/helper_test.go +++ b/eth/helper_test.go @@ -35,9 +35,7 @@ func newTestProtocolManager(fastSync bool, blocks int, generator func(int, *core db, _ = ethdb.NewMemDatabase() genesis = core.WriteGenesisBlockForTesting(db, core.GenesisAccount{testBankAddress, testBankFunds}) blockchain, _ = core.NewBlockChain(db, pow, evmux) - blockproc = core.NewBlockProcessor(db, pow, blockchain, evmux) ) - blockchain.SetProcessor(blockproc) chain, _ := core.GenerateChain(genesis, db, blocks, generator) if _, err := blockchain.InsertChain(chain); err != nil { panic(err) @@ -117,7 +115,7 @@ func newTestPeer(name string, version int, pm *ProtocolManager, shake bool) (*te var id discover.NodeID rand.Read(id[:]) - peer := pm.newPeer(version, NetworkId, p2p.NewPeer(id, name, nil), net) + peer := pm.newPeer(version, p2p.NewPeer(id, name, nil), net) // Start the peer on a new thread errc := make(chan error, 1) diff --git a/eth/peer.go b/eth/peer.go index 695e910f6..15ba22ff5 100644 --- a/eth/peer.go +++ b/eth/peer.go @@ -44,38 +44,51 @@ const ( handshakeTimeout = 5 * time.Second ) +// PeerInfo represents a short summary of the Ethereum sub-protocol metadata known +// about a connected peer. +type PeerInfo struct { + Version int `json:"version"` // Ethereum protocol version negotiated + Difficulty *big.Int `json:"difficulty"` // Total difficulty of the peer's blockchain + Head string `json:"head"` // SHA3 hash of the peer's best owned block +} + type peer struct { - *p2p.Peer + id string + *p2p.Peer rw p2p.MsgReadWriter version int // Protocol version negotiated - network int // Network ID being on - - id string - - head common.Hash - td *big.Int - lock sync.RWMutex + head common.Hash + td *big.Int + lock sync.RWMutex knownTxs *set.Set // Set of transaction hashes known to be known by this peer knownBlocks *set.Set // Set of block hashes known to be known by this peer } -func newPeer(version, network int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { +func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { id := p.ID() return &peer{ Peer: p, rw: rw, version: version, - network: network, id: fmt.Sprintf("%x", id[:8]), knownTxs: set.New(), knownBlocks: set.New(), } } +// Info gathers and returns a collection of metadata known about a peer. +func (p *peer) Info() *PeerInfo { + return &PeerInfo{ + Version: p.version, + Difficulty: p.Td(), + Head: fmt.Sprintf("%x", p.Head()), + } +} + // Head retrieves a copy of the current head (most recent) hash of the peer. func (p *peer) Head() (hash common.Hash) { p.lock.RLock() @@ -268,20 +281,22 @@ func (p *peer) RequestReceipts(hashes []common.Hash) error { // Handshake executes the eth protocol handshake, negotiating version number, // network IDs, difficulties, head and genesis blocks. -func (p *peer) Handshake(td *big.Int, head common.Hash, genesis common.Hash) error { +func (p *peer) Handshake(network int, td *big.Int, head common.Hash, genesis common.Hash) error { + // Send out own handshake in a new thread errc := make(chan error, 2) var status statusData // safe to read after two values have been received from errc + go func() { errc <- p2p.Send(p.rw, StatusMsg, &statusData{ ProtocolVersion: uint32(p.version), - NetworkId: uint32(p.network), + NetworkId: uint32(network), TD: td, CurrentBlock: head, GenesisBlock: genesis, }) }() go func() { - errc <- p.readStatus(&status, genesis) + errc <- p.readStatus(network, &status, genesis) }() timeout := time.NewTimer(handshakeTimeout) defer timeout.Stop() @@ -299,7 +314,7 @@ func (p *peer) Handshake(td *big.Int, head common.Hash, genesis common.Hash) err return nil } -func (p *peer) readStatus(status *statusData, genesis common.Hash) (err error) { +func (p *peer) readStatus(network int, status *statusData, genesis common.Hash) (err error) { msg, err := p.rw.ReadMsg() if err != nil { return err @@ -317,8 +332,8 @@ func (p *peer) readStatus(status *statusData, genesis common.Hash) (err error) { if status.GenesisBlock != genesis { return errResp(ErrGenesisBlockMismatch, "%x (!= %x)", status.GenesisBlock, genesis) } - if int(status.NetworkId) != p.network { - return errResp(ErrNetworkIdMismatch, "%d (!= %d)", status.NetworkId, p.network) + if int(status.NetworkId) != network { + return errResp(ErrNetworkIdMismatch, "%d (!= %d)", status.NetworkId, network) } if int(status.ProtocolVersion) != p.version { return errResp(ErrProtocolVersionMismatch, "%d (!= %d)", status.ProtocolVersion, p.version) diff --git a/eth/protocol.go b/eth/protocol.go index 410347ed3..808ac0601 100644 --- a/eth/protocol.go +++ b/eth/protocol.go @@ -33,6 +33,9 @@ const ( eth63 = 63 ) +// Official short name of the protocol used during capability negotiation. +var ProtocolName = "eth" + // Supported versions of the eth protocol (first is primary). var ProtocolVersions = []uint{eth63, eth62, eth61} diff --git a/eth/sync.go b/eth/sync.go index bbf2abc04..dd8aef8e4 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -175,10 +175,6 @@ func (pm *ProtocolManager) synchronise(peer *peer) { } // If fast sync was enabled, and we synced up, disable it if pm.fastSync { - // Wait until all pending imports finish processing - for pm.downloader.Synchronising() { - time.Sleep(100 * time.Millisecond) - } // Disable fast sync if we indeed have something in our chain if pm.blockchain.CurrentBlock().NumberU64() > 0 { glog.V(logger.Info).Infof("fast sync complete, auto disabling") diff --git a/eth/sync_test.go b/eth/sync_test.go index f3a6718ab..afd90c9b6 100644 --- a/eth/sync_test.go +++ b/eth/sync_test.go @@ -40,8 +40,8 @@ func TestFastSyncDisabling(t *testing.T) { // Sync up the two peers io1, io2 := p2p.MsgPipe() - go pmFull.handle(pmFull.newPeer(63, NetworkId, p2p.NewPeer(discover.NodeID{}, "empty", nil), io2)) - go pmEmpty.handle(pmEmpty.newPeer(63, NetworkId, p2p.NewPeer(discover.NodeID{}, "full", nil), io1)) + go pmFull.handle(pmFull.newPeer(63, p2p.NewPeer(discover.NodeID{}, "empty", nil), io2)) + go pmEmpty.handle(pmEmpty.newPeer(63, p2p.NewPeer(discover.NodeID{}, "full", nil), io1)) time.Sleep(250 * time.Millisecond) pmEmpty.synchronise(pmEmpty.peers.BestPeer()) |