diff options
author | obscuren <geffobscura@gmail.com> | 2015-05-13 01:05:33 +0800 |
---|---|---|
committer | obscuren <geffobscura@gmail.com> | 2015-05-13 01:05:33 +0800 |
commit | 8e24378cc1acb074b56de75bf0baf6feb7927677 (patch) | |
tree | 71f866652de6b45da5ad4403b88f5def9a684e36 /eth | |
parent | 7d69679935ff8c04aebb60f27074f08c5e84ac95 (diff) | |
parent | 8fe01b4bfa28ad5a1fdde7f9837e8f982843389a (diff) | |
download | go-tangerine-8e24378cc1acb074b56de75bf0baf6feb7927677.tar go-tangerine-8e24378cc1acb074b56de75bf0baf6feb7927677.tar.gz go-tangerine-8e24378cc1acb074b56de75bf0baf6feb7927677.tar.bz2 go-tangerine-8e24378cc1acb074b56de75bf0baf6feb7927677.tar.lz go-tangerine-8e24378cc1acb074b56de75bf0baf6feb7927677.tar.xz go-tangerine-8e24378cc1acb074b56de75bf0baf6feb7927677.tar.zst go-tangerine-8e24378cc1acb074b56de75bf0baf6feb7927677.zip |
Merge branch 'release/0.9.20'v0.9.20
Diffstat (limited to 'eth')
-rw-r--r-- | eth/backend.go | 52 | ||||
-rw-r--r-- | eth/downloader/downloader.go | 148 | ||||
-rw-r--r-- | eth/downloader/downloader_test.go | 3 | ||||
-rw-r--r-- | eth/downloader/peer.go | 222 | ||||
-rw-r--r-- | eth/downloader/queue.go | 9 | ||||
-rw-r--r-- | eth/handler.go | 2 | ||||
-rw-r--r-- | eth/sync.go | 3 |
7 files changed, 254 insertions, 185 deletions
diff --git a/eth/backend.go b/eth/backend.go index 8f0789467..362a7eab7 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -7,7 +7,6 @@ import ( "io/ioutil" "math/big" "os" - "path" "path/filepath" "strings" "time" @@ -145,7 +144,7 @@ func (cfg *Config) nodeKey() (*ecdsa.PrivateKey, error) { return cfg.NodeKey, nil } // use persistent key if present - keyfile := path.Join(cfg.DataDir, "nodekey") + keyfile := filepath.Join(cfg.DataDir, "nodekey") key, err := crypto.LoadECDSA(keyfile) if err == nil { return key, nil @@ -207,29 +206,33 @@ func New(config *Config) (*Ethereum, error) { logger.NewJSONsystem(config.DataDir, config.LogJSON) } + // Let the database take 3/4 of the max open files (TODO figure out a way to get the actual limit of the open files) + const dbCount = 3 + ethdb.OpenFileLimit = 256 / (dbCount + 1) + newdb := config.NewDB if newdb == nil { newdb = func(path string) (common.Database, error) { return ethdb.NewLDBDatabase(path) } } - blockDb, err := newdb(path.Join(config.DataDir, "blockchain")) + blockDb, err := newdb(filepath.Join(config.DataDir, "blockchain")) if err != nil { - return nil, err + return nil, fmt.Errorf("blockchain db err: %v", err) } - stateDb, err := newdb(path.Join(config.DataDir, "state")) + stateDb, err := newdb(filepath.Join(config.DataDir, "state")) if err != nil { - return nil, err + return nil, fmt.Errorf("state db err: %v", err) } - extraDb, err := newdb(path.Join(config.DataDir, "extra")) + extraDb, err := newdb(filepath.Join(config.DataDir, "extra")) if err != nil { - return nil, err + return nil, fmt.Errorf("extra db err: %v", err) } - nodeDb := path.Join(config.DataDir, "nodes") + nodeDb := filepath.Join(config.DataDir, "nodes") // Perform database sanity checks d, _ := blockDb.Get([]byte("ProtocolVersion")) protov := int(common.NewValue(d).Uint()) if protov != config.ProtocolVersion && protov != 0 { - path := path.Join(config.DataDir, "blockchain") + path := filepath.Join(config.DataDir, "blockchain") return nil, fmt.Errorf("Database version mismatch. Protocol(%d / %d). `rm -rf %s`", protov, config.ProtocolVersion, path) } saveProtocolVersion(blockDb, config.ProtocolVersion) @@ -267,7 +270,7 @@ func New(config *Config) (*Ethereum, error) { eth.txPool = core.NewTxPool(eth.EventMux(), eth.chainManager.State, eth.chainManager.GasLimit) eth.blockProcessor = core.NewBlockProcessor(stateDb, extraDb, eth.pow, eth.txPool, eth.chainManager, eth.EventMux()) eth.chainManager.SetProcessor(eth.blockProcessor) - eth.miner = miner.New(eth, eth.pow, config.MinerThreads) + eth.miner = miner.New(eth, eth.pow) eth.miner.SetGasPrice(config.GasPrice) eth.protocolManager = NewProtocolManager(config.ProtocolVersion, config.NetworkId, eth.eventMux, eth.txPool, eth.chainManager, eth.downloader) @@ -368,7 +371,7 @@ func (s *Ethereum) ResetWithGenesisBlock(gb *types.Block) { s.chainManager.ResetWithGenesisBlock(gb) } -func (s *Ethereum) StartMining() error { +func (s *Ethereum) StartMining(threads int) error { eb, err := s.Etherbase() if err != nil { err = fmt.Errorf("Cannot start mining without etherbase address: %v", err) @@ -376,21 +379,24 @@ func (s *Ethereum) StartMining() error { return err } - go s.miner.Start(eb) + go s.miner.Start(eb, threads) return nil } func (s *Ethereum) Etherbase() (eb common.Address, err error) { eb = s.etherbase if (eb == common.Address{}) { - var ebbytes []byte - ebbytes, err = s.accountManager.Primary() - eb = common.BytesToAddress(ebbytes) - if (eb == common.Address{}) { + primary, err := s.accountManager.Primary() + if err != nil { + return eb, err + } + if (primary == common.Address{}) { err = fmt.Errorf("no accounts found") + return eb, err } + eb = primary } - return + return eb, nil } func (s *Ethereum) StopMining() { s.miner.Stop() } @@ -451,6 +457,8 @@ func (s *Ethereum) Start() error { return nil } +// sync databases every minute. If flushing fails we exit immediatly. The system +// may not continue under any circumstances. func (s *Ethereum) syncDatabases() { ticker := time.NewTicker(1 * time.Minute) done: @@ -459,13 +467,13 @@ done: case <-ticker.C: // don't change the order of database flushes if err := s.extraDb.Flush(); err != nil { - glog.V(logger.Error).Infof("error: flush extraDb: %v\n", err) + glog.Fatalf("fatal error: flush extraDb: %v (Restart your node. We are aware of this issue)\n", err) } if err := s.stateDb.Flush(); err != nil { - glog.V(logger.Error).Infof("error: flush stateDb: %v\n", err) + glog.Fatalf("fatal error: flush stateDb: %v (Restart your node. We are aware of this issue)\n", err) } if err := s.blockDb.Flush(); err != nil { - glog.V(logger.Error).Infof("error: flush blockDb: %v\n", err) + glog.Fatalf("fatal error: flush blockDb: %v (Restart your node. We are aware of this issue)\n", err) } case <-s.shutdownChan: break done @@ -537,7 +545,7 @@ func (self *Ethereum) syncAccounts(tx *types.Transaction) { return } - if self.accountManager.HasAccount(from.Bytes()) { + if self.accountManager.HasAccount(from) { if self.chainManager.TxState().GetNonce(from) < tx.Nonce() { self.chainManager.TxState().SetNonce(from, tx.Nonce()) } diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 14ca2cd3d..577152a21 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -28,7 +28,7 @@ var ( errUnknownPeer = errors.New("peer's unknown or unhealthy") errBadPeer = errors.New("action from bad peer ignored") errNoPeers = errors.New("no peers to keep download active") - errPendingQueue = errors.New("pending items in queue") + ErrPendingQueue = errors.New("pending items in queue") ErrTimeout = errors.New("timeout") errEmptyHashSet = errors.New("empty hash set by peer") errPeersUnavailable = errors.New("no peers available or all peers tried for block download process") @@ -49,12 +49,6 @@ type blockPack struct { blocks []*types.Block } -type syncPack struct { - peer *peer - hash common.Hash - ignoreInitial bool -} - type hashPack struct { peerId string hashes []common.Hash @@ -63,7 +57,7 @@ type hashPack struct { type Downloader struct { mu sync.RWMutex queue *queue - peers peers + peers *peerSet activePeer string // Callbacks @@ -83,7 +77,7 @@ type Downloader struct { func New(hasBlock hashCheckFn, getBlock getBlockFn) *Downloader { downloader := &Downloader{ queue: newQueue(), - peers: make(peers), + peers: newPeerSet(), hasBlock: hasBlock, getBlock: getBlock, newPeerCh: make(chan *peer, 1), @@ -98,29 +92,26 @@ func (d *Downloader) Stats() (current int, max int) { return d.queue.Size() } -func (d *Downloader) RegisterPeer(id string, hash common.Hash, getHashes hashFetcherFn, getBlocks blockFetcherFn) error { - d.mu.Lock() - defer d.mu.Unlock() - - glog.V(logger.Detail).Infoln("Register peer", id) - - // Create a new peer and add it to the list of known peers - peer := newPeer(id, hash, getHashes, getBlocks) - // add peer to our peer set - d.peers[id] = peer - // broadcast new peer - +// RegisterPeer injects a new download peer into the set of block source to be +// used for fetching hashes and blocks from. +func (d *Downloader) RegisterPeer(id string, head common.Hash, getHashes hashFetcherFn, getBlocks blockFetcherFn) error { + glog.V(logger.Detail).Infoln("Registering peer", id) + if err := d.peers.Register(newPeer(id, head, getHashes, getBlocks)); err != nil { + glog.V(logger.Error).Infoln("Register failed:", err) + return err + } return nil } -// UnregisterPeer unregisters a peer. This will prevent any action from the specified peer. -func (d *Downloader) UnregisterPeer(id string) { - d.mu.Lock() - defer d.mu.Unlock() - - glog.V(logger.Detail).Infoln("Unregister peer", id) - - delete(d.peers, id) +// UnregisterPeer remove a peer from the known list, preventing any action from +// the specified peer. +func (d *Downloader) UnregisterPeer(id string) error { + glog.V(logger.Detail).Infoln("Unregistering peer", id) + if err := d.peers.Unregister(id); err != nil { + glog.V(logger.Error).Infoln("Unregister failed:", err) + return err + } + return nil } // Synchronise will select the peer and use it for synchronising. If an empty string is given @@ -138,17 +129,18 @@ func (d *Downloader) Synchronise(id string, hash common.Hash) error { // Abort if the queue still contains some leftover data if _, cached := d.queue.Size(); cached > 0 && d.queue.GetHeadBlock() != nil { - return errPendingQueue + return ErrPendingQueue } - // Reset the queue to clean any internal leftover state + // Reset the queue and peer set to clean any internal leftover state d.queue.Reset() + d.peers.Reset() // Retrieve the origin peer and initiate the downloading process - p := d.peers[id] + p := d.peers.Peer(id) if p == nil { return errUnknownPeer } - return d.getFromPeer(p, hash, false) + return d.syncWithPeer(p, hash) } // TakeBlocks takes blocks from the queue and yields them to the blockTaker handler @@ -167,7 +159,9 @@ func (d *Downloader) Has(hash common.Hash) bool { return d.queue.Has(hash) } -func (d *Downloader) getFromPeer(p *peer, hash common.Hash, ignoreInitial bool) (err error) { +// syncWithPeer starts a block synchronization based on the hash chain from the +// specified peer and head hash. +func (d *Downloader) syncWithPeer(p *peer, hash common.Hash) (err error) { d.activePeer = p.id defer func() { // reset on error @@ -177,21 +171,12 @@ func (d *Downloader) getFromPeer(p *peer, hash common.Hash, ignoreInitial bool) }() glog.V(logger.Debug).Infoln("Synchronizing with the network using:", p.id) - // Start the fetcher. This will block the update entirely - // interupts need to be send to the appropriate channels - // respectively. - if err = d.startFetchingHashes(p, hash, ignoreInitial); err != nil { + if err = d.fetchHashes(p, hash); err != nil { return err } - - // Start fetching blocks in paralel. The strategy is simple - // take any available peers, seserve a chunk for each peer available, - // let the peer deliver the chunkn and periodically check if a peer - // has timedout. - if err = d.startFetchingBlocks(p); err != nil { + if err = d.fetchBlocks(); err != nil { return err } - glog.V(logger.Debug).Infoln("Synchronization completed") return nil @@ -234,17 +219,14 @@ blockDone: } // XXX Make synchronous -func (d *Downloader) startFetchingHashes(p *peer, h common.Hash, ignoreInitial bool) error { +func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { glog.V(logger.Debug).Infof("Downloading hashes (%x) from %s", h[:4], p.id) start := time.Now() - // We ignore the initial hash in some cases (e.g. we received a block without it's parent) - // In such circumstances we don't need to download the block so don't add it to the queue. - if !ignoreInitial { - // Add the hash to the queue first - d.queue.Insert([]common.Hash{h}) - } + // Add the hash to the queue first + d.queue.Insert([]common.Hash{h}) + // Get the first batch of hashes p.getHashes(h) @@ -308,20 +290,18 @@ out: // Attempt to find a new peer by checking inclusion of peers best hash in our // already fetched hash list. This can't guarantee 100% correctness but does // a fair job. This is always either correct or false incorrect. - for id, peer := range d.peers { - if d.queue.Has(peer.recentHash) && !attemptedPeers[id] { + for _, peer := range d.peers.AllPeers() { + if d.queue.Has(peer.head) && !attemptedPeers[p.id] { p = peer break } } - // if all peers have been tried, abort the process entirely or if the hash is // the zero hash. if p == nil || (hash == common.Hash{}) { d.queue.Reset() return ErrTimeout } - // set p to the active peer. this will invalidate any hashes that may be returned // by our previous (delayed) peer. activePeer = p @@ -334,14 +314,11 @@ out: return nil } -func (d *Downloader) startFetchingBlocks(p *peer) error { +// fetchBlocks iteratively downloads the entire schedules block-chain, taking +// any available peers, reserving a chunk of blocks for each, wait for delivery +// and periodically checking for timeouts. +func (d *Downloader) fetchBlocks() error { glog.V(logger.Debug).Infoln("Downloading", d.queue.Pending(), "block(s)") - - // Defer the peer reset. This will empty the peer requested set - // and makes sure there are no lingering peers with an incorrect - // state - defer d.peers.reset() - start := time.Now() // default ticker for re-fetching blocks every now and then @@ -354,19 +331,19 @@ out: case blockPack := <-d.blockCh: // If the peer was previously banned and failed to deliver it's pack // in a reasonable time frame, ignore it's message. - if d.peers[blockPack.peerId] != nil { - err := d.queue.Deliver(blockPack.peerId, blockPack.blocks) - if err != nil { - glog.V(logger.Debug).Infof("deliver failed for peer %s: %v\n", blockPack.peerId, err) - // FIXME d.UnregisterPeer(blockPack.peerId) + if peer := d.peers.Peer(blockPack.peerId); peer != nil { + // Deliver the received chunk of blocks, but drop the peer if invalid + if err := d.queue.Deliver(blockPack.peerId, blockPack.blocks); err != nil { + glog.V(logger.Debug).Infof("Failed delivery for peer %s: %v\n", blockPack.peerId, err) + peer.Demote() break } - if glog.V(logger.Debug) { - glog.Infof("adding %d blocks from: %s\n", len(blockPack.blocks), blockPack.peerId) + glog.Infof("Added %d blocks from: %s\n", len(blockPack.blocks), blockPack.peerId) } - d.peers[blockPack.peerId].promote() - d.peers.setState(blockPack.peerId, idleState) + // Promote the peer and update it's idle state + peer.Promote() + peer.SetIdle() } case <-ticker.C: // Check for bad peers. Bad peers may indicate a peer not responding @@ -381,13 +358,12 @@ out: // 1) Time for them to respond; // 2) Measure their speed; // 3) Amount and availability. - if peer := d.peers[pid]; peer != nil { - peer.demote() - peer.reset() + if peer := d.peers.Peer(pid); peer != nil { + peer.Demote() } } // After removing bad peers make sure we actually have sufficient peer left to keep downloading - if len(d.peers) == 0 { + if d.peers.Len() == 0 { d.queue.Reset() return errNoPeers } @@ -398,31 +374,33 @@ out: if d.queue.Throttle() { continue } - - availablePeers := d.peers.get(idleState) - for _, peer := range availablePeers { + // Send a download request to all idle peers, until throttled + idlePeers := d.peers.IdlePeers() + for _, peer := range idlePeers { + // Short circuit if throttling activated since above + if d.queue.Throttle() { + break + } // Get a possible chunk. If nil is returned no chunk // could be returned due to no hashes available. request := d.queue.Reserve(peer, maxBlockFetch) if request == nil { continue } - // XXX make fetch blocking. // Fetch the chunk and check for error. If the peer was somehow // already fetching a chunk due to a bug, it will be returned to // the queue - if err := peer.fetch(request); err != nil { - // log for tracing - glog.V(logger.Debug).Infof("peer %s received double work (state = %v)\n", peer.id, peer.state) + if err := peer.Fetch(request); err != nil { + glog.V(logger.Error).Infof("Peer %s received double work\n", peer.id) d.queue.Cancel(request) } } - // make sure that we have peers available for fetching. If all peers have been tried + // Make sure that we have peers available for fetching. If all peers have been tried // and all failed throw an error if d.queue.InFlight() == 0 { d.queue.Reset() - return fmt.Errorf("%v peers avaialable = %d. total peers = %d. hashes needed = %d", errPeersUnavailable, len(availablePeers), len(d.peers), d.queue.Pending()) + return fmt.Errorf("%v peers available = %d. total peers = %d. hashes needed = %d", errPeersUnavailable, len(idlePeers), d.peers.Len(), d.queue.Pending()) } } else if d.queue.InFlight() == 0 { diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index d0f8d4c8f..385ad2909 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -229,7 +229,7 @@ func TestThrottling(t *testing.T) { minDesiredPeerCount = 4 blockTtl = 1 * time.Second - targetBlocks := 4 * blockCacheLimit + targetBlocks := 16 * blockCacheLimit hashes := createHashes(0, targetBlocks) blocks := createBlocksFromHashes(hashes) tester := newTester(t, hashes, blocks) @@ -256,6 +256,7 @@ func TestThrottling(t *testing.T) { return default: took = append(took, tester.downloader.TakeBlocks()...) + time.Sleep(time.Millisecond) } } }() diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go index 45ec1cbfd..4abae8d5e 100644 --- a/eth/downloader/peer.go +++ b/eth/downloader/peer.go @@ -1,125 +1,197 @@ +// Contains the active peer-set of the downloader, maintaining both failures +// as well as reputation metrics to prioritize the block retrievals. + package downloader import ( "errors" "sync" + "sync/atomic" "github.com/ethereum/go-ethereum/common" "gopkg.in/fatih/set.v0" ) -const ( - workingState = 2 - idleState = 4 -) - type hashFetcherFn func(common.Hash) error type blockFetcherFn func([]common.Hash) error -// XXX make threadsafe!!!! -type peers map[string]*peer +var ( + errAlreadyFetching = errors.New("already fetching blocks from peer") + errAlreadyRegistered = errors.New("peer is already registered") + errNotRegistered = errors.New("peer is not registered") +) -func (p peers) reset() { - for _, peer := range p { - peer.reset() - } +// peer represents an active peer from which hashes and blocks are retrieved. +type peer struct { + id string // Unique identifier of the peer + head common.Hash // Hash of the peers latest known block + + idle int32 // Current activity state of the peer (idle = 0, active = 1) + rep int32 // Simple peer reputation (not used currently) + + mu sync.RWMutex + + ignored *set.Set + + getHashes hashFetcherFn + getBlocks blockFetcherFn } -func (p peers) get(state int) []*peer { - var peers []*peer - for _, peer := range p { - peer.mu.RLock() - if peer.state == state { - peers = append(peers, peer) - } - peer.mu.RUnlock() +// newPeer create a new downloader peer, with specific hash and block retrieval +// mechanisms. +func newPeer(id string, head common.Hash, getHashes hashFetcherFn, getBlocks blockFetcherFn) *peer { + return &peer{ + id: id, + head: head, + getHashes: getHashes, + getBlocks: getBlocks, + ignored: set.New(), } +} - return peers +// Reset clears the internal state of a peer entity. +func (p *peer) Reset() { + atomic.StoreInt32(&p.idle, 0) + p.ignored.Clear() } -func (p peers) setState(id string, state int) { - if peer, exist := p[id]; exist { - peer.mu.Lock() - defer peer.mu.Unlock() - peer.state = state +// Fetch sends a block retrieval request to the remote peer. +func (p *peer) Fetch(request *fetchRequest) error { + // Short circuit if the peer is already fetching + if !atomic.CompareAndSwapInt32(&p.idle, 0, 1) { + return errAlreadyFetching } + // Convert the hash set to a retrievable slice + hashes := make([]common.Hash, 0, len(request.Hashes)) + for hash, _ := range request.Hashes { + hashes = append(hashes, hash) + } + p.getBlocks(hashes) + + return nil } -func (p peers) getPeer(id string) *peer { - return p[id] +// SetIdle sets the peer to idle, allowing it to execute new retrieval requests. +func (p *peer) SetIdle() { + atomic.StoreInt32(&p.idle, 0) } -// peer represents an active peer -type peer struct { - state int // Peer state (working, idle) - rep int // TODO peer reputation +// Promote increases the peer's reputation. +func (p *peer) Promote() { + atomic.AddInt32(&p.rep, 1) +} - mu sync.RWMutex - id string - recentHash common.Hash +// Demote decreases the peer's reputation or leaves it at 0. +func (p *peer) Demote() { + for { + // Calculate the new reputation value + prev := atomic.LoadInt32(&p.rep) + next := prev / 2 - ignored *set.Set + // Try to update the old value + if atomic.CompareAndSwapInt32(&p.rep, prev, next) { + return + } + } +} - getHashes hashFetcherFn - getBlocks blockFetcherFn +// peerSet represents the collection of active peer participating in the block +// download procedure. +type peerSet struct { + peers map[string]*peer + lock sync.RWMutex } -// create a new peer -func newPeer(id string, hash common.Hash, getHashes hashFetcherFn, getBlocks blockFetcherFn) *peer { - return &peer{ - id: id, - recentHash: hash, - getHashes: getHashes, - getBlocks: getBlocks, - state: idleState, - ignored: set.New(), +// newPeerSet creates a new peer set top track the active download sources. +func newPeerSet() *peerSet { + return &peerSet{ + peers: make(map[string]*peer), } } -// fetch a chunk using the peer -func (p *peer) fetch(request *fetchRequest) error { - p.mu.Lock() - defer p.mu.Unlock() +// Reset iterates over the current peer set, and resets each of the known peers +// to prepare for a next batch of block retrieval. +func (ps *peerSet) Reset() { + ps.lock.RLock() + defer ps.lock.RUnlock() - if p.state == workingState { - return errors.New("peer already fetching chunk") + for _, peer := range ps.peers { + peer.Reset() } +} - // set working state - p.state = workingState +// Register injects a new peer into the working set, or returns an error if the +// peer is already known. +func (ps *peerSet) Register(p *peer) error { + ps.lock.Lock() + defer ps.lock.Unlock() - // Convert the hash set to a fetchable slice - hashes := make([]common.Hash, 0, len(request.Hashes)) - for hash, _ := range request.Hashes { - hashes = append(hashes, hash) + if _, ok := ps.peers[p.id]; ok { + return errAlreadyRegistered } - p.getBlocks(hashes) + ps.peers[p.id] = p + return nil +} + +// Unregister removes a remote peer from the active set, disabling any further +// actions to/from that particular entity. +func (ps *peerSet) Unregister(id string) error { + ps.lock.Lock() + defer ps.lock.Unlock() + if _, ok := ps.peers[id]; !ok { + return errNotRegistered + } + delete(ps.peers, id) return nil } -// promote increases the peer's reputation -func (p *peer) promote() { - p.mu.Lock() - defer p.mu.Unlock() +// Peer retrieves the registered peer with the given id. +func (ps *peerSet) Peer(id string) *peer { + ps.lock.RLock() + defer ps.lock.RUnlock() + + return ps.peers[id] +} + +// Len returns if the current number of peers in the set. +func (ps *peerSet) Len() int { + ps.lock.RLock() + defer ps.lock.RUnlock() - p.rep++ + return len(ps.peers) } -// demote decreases the peer's reputation or leaves it at 0 -func (p *peer) demote() { - p.mu.Lock() - defer p.mu.Unlock() +// AllPeers retrieves a flat list of all the peers within the set. +func (ps *peerSet) AllPeers() []*peer { + ps.lock.RLock() + defer ps.lock.RUnlock() - if p.rep > 1 { - p.rep -= 2 - } else { - p.rep = 0 + list := make([]*peer, 0, len(ps.peers)) + for _, p := range ps.peers { + list = append(list, p) } + return list } -func (p *peer) reset() { - p.state = idleState - p.ignored.Clear() +// IdlePeers retrieves a flat list of all the currently idle peers within the +// active peer set, ordered by their reputation. +func (ps *peerSet) IdlePeers() []*peer { + ps.lock.RLock() + defer ps.lock.RUnlock() + + list := make([]*peer, 0, len(ps.peers)) + for _, p := range ps.peers { + if atomic.LoadInt32(&p.idle) == 0 { + list = append(list, p) + } + } + for i := 0; i < len(list); i++ { + for j := i + 1; j < len(list); j++ { + if atomic.LoadInt32(&list[i].rep) < atomic.LoadInt32(&list[j].rep) { + list[i], list[j] = list[j], list[i] + } + } + } + return list } diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index 515440bca..40749698c 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -1,3 +1,6 @@ +// Contains the block download scheduler to collect download tasks and schedule +// them in an ordered, and throttled way. + package downloader import ( @@ -8,6 +11,8 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/logger/glog" "gopkg.in/karalabe/cookiejar.v2/collections/prque" ) @@ -126,6 +131,10 @@ func (q *queue) Insert(hashes []common.Hash) { for i, hash := range hashes { index := q.hashCounter + i + if old, ok := q.hashPool[hash]; ok { + glog.V(logger.Warn).Infof("Hash %x already scheduled at index %v", hash, old) + continue + } q.hashPool[hash] = index q.hashQueue.Push(hash, float32(index)) // Highest gets schedules first } diff --git a/eth/handler.go b/eth/handler.go index 41b6728d9..88394543e 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -381,7 +381,7 @@ func (pm *ProtocolManager) BroadcastTx(hash common.Hash, tx *types.Transaction) } } // Broadcast block to peer set - peers = peers[:int(math.Sqrt(float64(len(peers))))] + //FIXME include this again: peers = peers[:int(math.Sqrt(float64(len(peers))))] for _, peer := range peers { peer.sendTransaction(tx) } diff --git a/eth/sync.go b/eth/sync.go index d955eaa50..00b571782 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -98,7 +98,8 @@ func (pm *ProtocolManager) synchronise(peer *peer) { case downloader.ErrTimeout: glog.V(logger.Debug).Infof("Removing peer %v due to sync timeout", peer.id) pm.removePeer(peer) - + case downloader.ErrPendingQueue: + glog.V(logger.Debug).Infoln("Synchronisation aborted:", err) default: glog.V(logger.Warn).Infof("Synchronisation failed: %v", err) } |