diff options
Diffstat (limited to 'eth')
-rw-r--r-- | eth/backend.go | 114 | ||||
-rw-r--r-- | eth/downloader/downloader.go | 51 | ||||
-rw-r--r-- | eth/downloader/downloader_test.go | 63 | ||||
-rw-r--r-- | eth/downloader/queue.go | 2 | ||||
-rw-r--r-- | eth/handler.go | 71 | ||||
-rw-r--r-- | eth/peer.go | 122 | ||||
-rw-r--r-- | eth/protocol.go | 2 | ||||
-rw-r--r-- | eth/sync.go | 36 |
8 files changed, 361 insertions, 100 deletions
diff --git a/eth/backend.go b/eth/backend.go index a7107f8d8..938071fc7 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -14,6 +14,7 @@ import ( "github.com/ethereum/ethash" "github.com/ethereum/go-ethereum/accounts" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/compiler" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" @@ -30,6 +31,14 @@ import ( "github.com/ethereum/go-ethereum/whisper" ) +const ( + epochLength = 30000 + ethashRevision = 23 + + autoDAGcheckInterval = 10 * time.Hour + autoDAGepochHeight = epochLength / 2 +) + var ( jsonlogger = logger.NewJsonLogger() @@ -59,6 +68,7 @@ type Config struct { LogJSON string VmDebug bool NatSpec bool + AutoDAG bool MaxPeers int MaxPendingPeers int @@ -79,6 +89,7 @@ type Config struct { GasPrice *big.Int MinerThreads int AccountManager *accounts.Manager + SolcPath string // NewDB is used to create databases. // If nil, the default is to create leveldb databases on disk. @@ -181,6 +192,8 @@ type Ethereum struct { pow *ethash.Ethash protocolManager *ProtocolManager downloader *downloader.Downloader + SolcPath string + solc *compiler.Solidity net *p2p.Server eventMux *event.TypeMux @@ -193,6 +206,8 @@ type Ethereum struct { MinerThreads int NatSpec bool DataDir string + AutoDAG bool + autodagquit chan bool etherbase common.Address clientVersion string ethVersionId int @@ -209,7 +224,7 @@ func New(config *Config) (*Ethereum, error) { // Let the database take 3/4 of the max open files (TODO figure out a way to get the actual limit of the open files) const dbCount = 3 - ethdb.OpenFileLimit = 256 / (dbCount + 1) + ethdb.OpenFileLimit = 128 / (dbCount + 1) newdb := config.NewDB if newdb == nil { @@ -264,11 +279,13 @@ func New(config *Config) (*Ethereum, error) { netVersionId: config.NetworkId, NatSpec: config.NatSpec, MinerThreads: config.MinerThreads, + SolcPath: config.SolcPath, + AutoDAG: config.AutoDAG, } - eth.chainManager = core.NewChainManager(blockDb, stateDb, eth.EventMux()) - eth.downloader = downloader.New(eth.EventMux(), eth.chainManager.HasBlock, eth.chainManager.GetBlock) eth.pow = ethash.New() + eth.chainManager = core.NewChainManager(blockDb, stateDb, eth.pow, eth.EventMux()) + eth.downloader = downloader.New(eth.EventMux(), eth.chainManager.HasBlock, eth.chainManager.GetBlock) eth.txPool = core.NewTxPool(eth.EventMux(), eth.chainManager.State, eth.chainManager.GasLimit) eth.blockProcessor = core.NewBlockProcessor(stateDb, extraDb, eth.pow, eth.txPool, eth.chainManager, eth.EventMux()) eth.chainManager.SetProcessor(eth.blockProcessor) @@ -443,6 +460,10 @@ func (s *Ethereum) Start() error { // periodically flush databases go s.syncDatabases() + if s.AutoDAG { + s.StartAutoDAG() + } + // Start services go s.txPool.Start() s.protocolManager.Start() @@ -521,6 +542,7 @@ func (s *Ethereum) Stop() { if s.whisper != nil { s.whisper.Stop() } + s.StopAutoDAG() glog.V(logger.Info).Infoln("Server stopped") close(s.shutdownChan) @@ -554,6 +576,77 @@ func (self *Ethereum) syncAccounts(tx *types.Transaction) { } } +// StartAutoDAG() spawns a go routine that checks the DAG every autoDAGcheckInterval +// by default that is 10 times per epoch +// in epoch n, if we past autoDAGepochHeight within-epoch blocks, +// it calls ethash.MakeDAG to pregenerate the DAG for the next epoch n+1 +// if it does not exist yet as well as remove the DAG for epoch n-1 +// the loop quits if autodagquit channel is closed, it can safely restart and +// stop any number of times. +// For any more sophisticated pattern of DAG generation, use CLI subcommand +// makedag +func (self *Ethereum) StartAutoDAG() { + if self.autodagquit != nil { + return // already started + } + go func() { + glog.V(logger.Info).Infof("Automatic pregeneration of ethash DAG ON (ethash dir: %s)", ethash.DefaultDir) + var nextEpoch uint64 + timer := time.After(0) + self.autodagquit = make(chan bool) + for { + select { + case <-timer: + glog.V(logger.Info).Infof("checking DAG (ethash dir: %s)", ethash.DefaultDir) + currentBlock := self.ChainManager().CurrentBlock().NumberU64() + thisEpoch := currentBlock / epochLength + if nextEpoch <= thisEpoch { + if currentBlock%epochLength > autoDAGepochHeight { + if thisEpoch > 0 { + previousDag, previousDagFull := dagFiles(thisEpoch - 1) + os.Remove(filepath.Join(ethash.DefaultDir, previousDag)) + os.Remove(filepath.Join(ethash.DefaultDir, previousDagFull)) + glog.V(logger.Info).Infof("removed DAG for epoch %d (%s)", thisEpoch-1, previousDag) + } + nextEpoch = thisEpoch + 1 + dag, _ := dagFiles(nextEpoch) + if _, err := os.Stat(dag); os.IsNotExist(err) { + glog.V(logger.Info).Infof("Pregenerating DAG for epoch %d (%s)", nextEpoch, dag) + err := ethash.MakeDAG(nextEpoch*epochLength, "") // "" -> ethash.DefaultDir + if err != nil { + glog.V(logger.Error).Infof("Error generating DAG for epoch %d (%s)", nextEpoch, dag) + return + } + } else { + glog.V(logger.Error).Infof("DAG for epoch %d (%s)", nextEpoch, dag) + } + } + } + timer = time.After(autoDAGcheckInterval) + case <-self.autodagquit: + return + } + } + }() +} + +// dagFiles(epoch) returns the two alternative DAG filenames (not a path) +// 1) <revision>-<hex(seedhash[8])> 2) full-R<revision>-<hex(seedhash[8])> +func dagFiles(epoch uint64) (string, string) { + seedHash, _ := ethash.GetSeedHash(epoch * epochLength) + dag := fmt.Sprintf("full-R%d-%x", ethashRevision, seedHash[:8]) + return dag, "full-R" + dag +} + +// stopAutoDAG stops automatic DAG pregeneration by quitting the loop +func (self *Ethereum) StopAutoDAG() { + if self.autodagquit != nil { + close(self.autodagquit) + self.autodagquit = nil + } + glog.V(logger.Info).Infof("Automatic pregeneration of ethash DAG OFF (ethash dir: %s)", ethash.DefaultDir) +} + func saveProtocolVersion(db common.Database, protov int) { d, _ := db.Get([]byte("ProtocolVersion")) protocolVersion := common.NewValue(d).Uint() @@ -571,3 +664,18 @@ func saveBlockchainVersion(db common.Database, bcVersion int) { db.Put([]byte("BlockchainVersion"), common.NewValue(bcVersion).Bytes()) } } + +func (self *Ethereum) Solc() (*compiler.Solidity, error) { + var err error + if self.solc == nil { + self.solc, err = compiler.New(self.SolcPath) + } + return self.solc, err +} + +// set in js console via admin interface or wrapper from cli flags +func (self *Ethereum) SetSolc(solcPath string) (*compiler.Solidity, error) { + self.SolcPath = solcPath + self.solc = nil + return self.Solc() +} diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index d817b223c..fd588d2f3 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -15,8 +15,10 @@ import ( ) const ( - maxHashFetch = 512 // Amount of hashes to be fetched per chunk - maxBlockFetch = 128 // Amount of blocks to be fetched per chunk + MinHashFetch = 512 // Minimum amount of hashes to not consider a peer stalling + MaxHashFetch = 2048 // Amount of hashes to be fetched per retrieval request + MaxBlockFetch = 128 // Amount of blocks to be fetched per retrieval request + peerCountTimeout = 12 * time.Second // Amount of time it takes for the peer handler to ignore minDesiredPeerCount hashTTL = 5 * time.Second // Time it takes for a hash request to time out ) @@ -28,10 +30,11 @@ var ( ) var ( - errLowTd = errors.New("peer's TD is too low") + errLowTd = errors.New("peers TD is too low") ErrBusy = errors.New("busy") - errUnknownPeer = errors.New("peer's unknown or unhealthy") + errUnknownPeer = errors.New("peer is unknown or unhealthy") ErrBadPeer = errors.New("action from bad peer ignored") + ErrStallingPeer = errors.New("peer is stalling") errNoPeers = errors.New("no peers to keep download active") ErrPendingQueue = errors.New("pending items in queue") ErrTimeout = errors.New("timeout") @@ -60,13 +63,18 @@ type hashPack struct { hashes []common.Hash } +type crossCheck struct { + expire time.Time + parent common.Hash +} + type Downloader struct { mux *event.TypeMux mu sync.RWMutex - queue *queue // Scheduler for selecting the hashes to download - peers *peerSet // Set of active peers from which download can proceed - checks map[common.Hash]time.Time // Pending cross checks to verify a hash chain + queue *queue // Scheduler for selecting the hashes to download + peers *peerSet // Set of active peers from which download can proceed + checks map[common.Hash]*crossCheck // Pending cross checks to verify a hash chain // Callbacks hasBlock hashCheckFn @@ -157,7 +165,7 @@ func (d *Downloader) Synchronise(id string, hash common.Hash) error { // Reset the queue and peer set to clean any internal leftover state d.queue.Reset() d.peers.Reset() - d.checks = make(map[common.Hash]time.Time) + d.checks = make(map[common.Hash]*crossCheck) // Retrieve the origin peer and initiate the downloading process p := d.peers.Peer(id) @@ -283,15 +291,22 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { return ErrBadPeer } if !done { + // Check that the peer is not stalling the sync + if len(inserts) < MinHashFetch { + return ErrStallingPeer + } // Try and fetch a random block to verify the hash batch // Skip the last hash as the cross check races with the next hash fetch - if len(inserts) > 1 { - cross := inserts[rand.Intn(len(inserts)-1)] - glog.V(logger.Detail).Infof("Cross checking (%s) with %x", active.id, cross) + cross := rand.Intn(len(inserts) - 1) + origin, parent := inserts[cross], inserts[cross+1] + glog.V(logger.Detail).Infof("Cross checking (%s) with %x/%x", active.id, origin, parent) - d.checks[cross] = time.Now().Add(blockTTL) - active.getBlocks([]common.Hash{cross}) + d.checks[origin] = &crossCheck{ + expire: time.Now().Add(blockTTL), + parent: parent, } + active.getBlocks([]common.Hash{origin}) + // Also fetch a fresh active.getHashes(head) continue @@ -310,8 +325,8 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { continue } block := blockPack.blocks[0] - if _, ok := d.checks[block.Hash()]; ok { - if !d.queue.Has(block.ParentHash()) { + if check, ok := d.checks[block.Hash()]; ok { + if block.ParentHash() != check.parent { return ErrCrossCheckFailed } delete(d.checks, block.Hash()) @@ -319,8 +334,8 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { case <-crossTicker.C: // Iterate over all the cross checks and fail the hash chain if they're not verified - for hash, deadline := range d.checks { - if time.Now().After(deadline) { + for hash, check := range d.checks { + if time.Now().After(check.expire) { glog.V(logger.Debug).Infof("Cross check timeout for %x", hash) return ErrCrossCheckFailed } @@ -438,7 +453,7 @@ out: } // Get a possible chunk. If nil is returned no chunk // could be returned due to no hashes available. - request := d.queue.Reserve(peer, maxBlockFetch) + request := d.queue.Reserve(peer, MaxBlockFetch) if request == nil { continue } diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 19d64ac67..8b541d8b7 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -53,6 +53,8 @@ type downloadTester struct { blocks map[common.Hash]*types.Block // Blocks associated with the hashes chain []common.Hash // Block-chain being constructed + maxHashFetch int // Overrides the maximum number of retrieved hashes + t *testing.T pcount int done chan bool @@ -133,8 +135,12 @@ func (dl *downloadTester) getBlock(hash common.Hash) *types.Block { // getHashes retrieves a batch of hashes for reconstructing the chain. func (dl *downloadTester) getHashes(head common.Hash) error { + limit := MaxHashFetch + if dl.maxHashFetch > 0 { + limit = dl.maxHashFetch + } // Gather the next batch of hashes - hashes := make([]common.Hash, 0, maxHashFetch) + hashes := make([]common.Hash, 0, limit) for i, hash := range dl.hashes { if hash == head { i++ @@ -382,7 +388,7 @@ func TestRepeatingHashAttack(t *testing.T) { // Make sure that syncing returns and does so with a failure select { - case <-time.After(100 * time.Millisecond): + case <-time.After(time.Second): t.Fatalf("synchronisation blocked") case err := <-errc: if err == nil { @@ -469,6 +475,23 @@ func TestMadeupHashChainAttack(t *testing.T) { } } +// Tests that if a malicious peer makes up a random hash chain, and tries to push +// indefinitely, one hash at a time, it actually gets caught with it. The reason +// this is separate from the classical made up chain attack is that sending hashes +// one by one prevents reliable block/parent verification. +func TestMadeupHashChainDrippingAttack(t *testing.T) { + // Create a random chain of hashes to drip + hashes := createHashes(0, 16*blockCacheLimit) + tester := newTester(t, hashes, nil) + + // Try and sync with the attacker, one hash at a time + tester.maxHashFetch = 1 + tester.newPeer("attack", big.NewInt(10000), hashes[0]) + if _, err := tester.syncTake("attack", hashes[0]); err != ErrStallingPeer { + t.Fatalf("synchronisation error mismatch: have %v, want %v", err, ErrStallingPeer) + } +} + // Tests that if a malicious peer makes up a random block chain, and tried to // push indefinitely, it actually gets caught with it. func TestMadeupBlockChainAttack(t *testing.T) { @@ -479,7 +502,7 @@ func TestMadeupBlockChainAttack(t *testing.T) { crossCheckCycle = 25 * time.Millisecond // Create a long chain of blocks and simulate an invalid chain by dropping every second - hashes := createHashes(0, 32*blockCacheLimit) + hashes := createHashes(0, 16*blockCacheLimit) blocks := createBlocksFromHashes(hashes) gapped := make([]common.Hash, len(hashes)/2) @@ -502,3 +525,37 @@ func TestMadeupBlockChainAttack(t *testing.T) { t.Fatalf("failed to synchronise blocks: %v", err) } } + +// Advanced form of the above forged blockchain attack, where not only does the +// attacker make up a valid hashes for random blocks, but also forges the block +// parents to point to existing hashes. +func TestMadeupParentBlockChainAttack(t *testing.T) { + defaultBlockTTL := blockTTL + defaultCrossCheckCycle := crossCheckCycle + + blockTTL = 100 * time.Millisecond + crossCheckCycle = 25 * time.Millisecond + + // Create a long chain of blocks and simulate an invalid chain by dropping every second + hashes := createHashes(0, 16*blockCacheLimit) + blocks := createBlocksFromHashes(hashes) + forges := createBlocksFromHashes(hashes) + for hash, block := range forges { + block.ParentHeaderHash = hash // Simulate pointing to already known hash + } + // Try and sync with the malicious node and check that it fails + tester := newTester(t, hashes, forges) + tester.newPeer("attack", big.NewInt(10000), hashes[0]) + if _, err := tester.syncTake("attack", hashes[0]); err != ErrCrossCheckFailed { + t.Fatalf("synchronisation error mismatch: have %v, want %v", err, ErrCrossCheckFailed) + } + // Ensure that a valid chain can still pass sync + blockTTL = defaultBlockTTL + crossCheckCycle = defaultCrossCheckCycle + + tester.blocks = blocks + tester.newPeer("valid", big.NewInt(20000), hashes[0]) + if _, err := tester.syncTake("valid", hashes[0]); err != nil { + t.Fatalf("failed to synchronise blocks: %v", err) + } +} diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index 13ec9a520..591a37773 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -17,7 +17,7 @@ import ( ) const ( - blockCacheLimit = 1024 // Maximum number of blocks to cache before throttling the download + blockCacheLimit = 8 * MaxBlockFetch // Maximum number of blocks to cache before throttling the download ) // fetchRequest is a currently running block retrieval operation. diff --git a/eth/handler.go b/eth/handler.go index b2d741295..9117a70de 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -47,9 +47,7 @@ type ProtocolManager struct { txpool txPool chainman *core.ChainManager downloader *downloader.Downloader - - pmu sync.Mutex - peers map[string]*peer + peers *peerSet SubProtocol p2p.Protocol @@ -73,7 +71,7 @@ func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpo txpool: txpool, chainman: chainman, downloader: downloader, - peers: make(map[string]*peer), + peers: newPeerSet(), newPeerCh: make(chan *peer, 1), quitSync: make(chan struct{}), } @@ -95,10 +93,14 @@ func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpo } func (pm *ProtocolManager) removePeer(peer *peer) { - pm.pmu.Lock() - defer pm.pmu.Unlock() + // Unregister the peer from the downloader pm.downloader.UnregisterPeer(peer.id) - delete(pm.peers, peer.id) + + // Remove the peer from the Ethereum peer set too + glog.V(logger.Detail).Infoln("Removing peer", peer.id) + if err := pm.peers.Unregister(peer.id); err != nil { + glog.V(logger.Error).Infoln("Removal failed:", err) + } } func (pm *ProtocolManager) Start() { @@ -136,31 +138,32 @@ func (pm *ProtocolManager) newPeer(pv, nv int, p *p2p.Peer, rw p2p.MsgReadWriter } func (pm *ProtocolManager) handle(p *peer) error { + // Execute the Ethereum handshake, short circuit if fails if err := p.handleStatus(); err != nil { return err } - pm.pmu.Lock() - pm.peers[p.id] = p - pm.pmu.Unlock() - - pm.downloader.RegisterPeer(p.id, p.recentHash, p.requestHashes, p.requestBlocks) - defer func() { - pm.removePeer(p) - }() + // Register the peer locally and in the downloader too + glog.V(logger.Detail).Infoln("Adding peer", p.id) + if err := pm.peers.Register(p); err != nil { + glog.V(logger.Error).Infoln("Addition failed:", err) + return err + } + defer pm.removePeer(p) + if err := pm.downloader.RegisterPeer(p.id, p.recentHash, p.requestHashes, p.requestBlocks); err != nil { + return err + } // propagate existing transactions. new transactions appearing // after this will be sent via broadcasts. if err := p.sendTransactions(pm.txpool.GetTransactions()); err != nil { return err } - // main loop. handle incoming messages. for { if err := pm.handleMsg(p); err != nil { return err } } - return nil } @@ -203,8 +206,8 @@ func (self *ProtocolManager) handleMsg(p *peer) error { return errResp(ErrDecode, "->msg %v: %v", msg, err) } - if request.Amount > maxHashes { - request.Amount = maxHashes + if request.Amount > downloader.MaxHashFetch { + request.Amount = downloader.MaxHashFetch } hashes := self.chainman.GetBlockHashesFromHash(request.Hash, request.Amount) @@ -251,7 +254,7 @@ func (self *ProtocolManager) handleMsg(p *peer) error { if block != nil { blocks = append(blocks, block) } - if i == maxBlocks { + if i == downloader.MaxBlockFetch { break } } @@ -346,18 +349,8 @@ func (pm *ProtocolManager) verifyTd(peer *peer, request newBlockMsgData) error { // out which peers do not contain the block in their block set and will do a // sqrt(peers) to determine the amount of peers we broadcast to. func (pm *ProtocolManager) BroadcastBlock(hash common.Hash, block *types.Block) { - pm.pmu.Lock() - defer pm.pmu.Unlock() - - // Find peers who don't know anything about the given hash. Peers that - // don't know about the hash will be a candidate for the broadcast loop - var peers []*peer - for _, peer := range pm.peers { - if !peer.blockHashes.Has(hash) { - peers = append(peers, peer) - } - } - // Broadcast block to peer set + // Broadcast block to a batch of peers not knowing about it + peers := pm.peers.PeersWithoutBlock(hash) peers = peers[:int(math.Sqrt(float64(len(peers))))] for _, peer := range peers { peer.sendNewBlock(block) @@ -369,18 +362,8 @@ func (pm *ProtocolManager) BroadcastBlock(hash common.Hash, block *types.Block) // out which peers do not contain the block in their block set and will do a // sqrt(peers) to determine the amount of peers we broadcast to. func (pm *ProtocolManager) BroadcastTx(hash common.Hash, tx *types.Transaction) { - pm.pmu.Lock() - defer pm.pmu.Unlock() - - // Find peers who don't know anything about the given hash. Peers that - // don't know about the hash will be a candidate for the broadcast loop - var peers []*peer - for _, peer := range pm.peers { - if !peer.txHashes.Has(hash) { - peers = append(peers, peer) - } - } - // Broadcast block to peer set + // Broadcast transaction to a batch of peers not knowing about it + peers := pm.peers.PeersWithoutTx(hash) //FIXME include this again: peers = peers[:int(math.Sqrt(float64(len(peers))))] for _, peer := range peers { peer.sendTransaction(tx) diff --git a/eth/peer.go b/eth/peer.go index 861efaaec..bb6a20349 100644 --- a/eth/peer.go +++ b/eth/peer.go @@ -1,17 +1,25 @@ package eth import ( + "errors" "fmt" "math/big" + "sync" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/p2p" "gopkg.in/fatih/set.v0" ) +var ( + errAlreadyRegistered = errors.New("peer is already registered") + errNotRegistered = errors.New("peer is not registered") +) + type statusMsgData struct { ProtocolVersion uint32 NetworkId uint32 @@ -25,16 +33,6 @@ type getBlockHashesMsgData struct { Amount uint64 } -func getBestPeer(peers map[string]*peer) *peer { - var peer *peer - for _, cp := range peers { - if peer == nil || cp.td.Cmp(peer.td) > 0 { - peer = cp - } - } - return peer -} - type peer struct { *p2p.Peer @@ -103,8 +101,8 @@ func (p *peer) sendTransaction(tx *types.Transaction) error { } func (p *peer) requestHashes(from common.Hash) error { - glog.V(logger.Debug).Infof("[%s] fetching hashes (%d) %x...\n", p.id, maxHashes, from[:4]) - return p2p.Send(p.rw, GetBlockHashesMsg, getBlockHashesMsgData{from, maxHashes}) + glog.V(logger.Debug).Infof("[%s] fetching hashes (%d) %x...\n", p.id, downloader.MaxHashFetch, from[:4]) + return p2p.Send(p.rw, GetBlockHashesMsg, getBlockHashesMsgData{from, downloader.MaxHashFetch}) } func (p *peer) requestBlocks(hashes []common.Hash) error { @@ -159,3 +157,103 @@ func (p *peer) handleStatus() error { return <-errc } + +// peerSet represents the collection of active peers currently participating in +// the Ethereum sub-protocol. +type peerSet struct { + peers map[string]*peer + lock sync.RWMutex +} + +// newPeerSet creates a new peer set to track the active participants. +func newPeerSet() *peerSet { + return &peerSet{ + peers: make(map[string]*peer), + } +} + +// Register injects a new peer into the working set, or returns an error if the +// peer is already known. +func (ps *peerSet) Register(p *peer) error { + ps.lock.Lock() + defer ps.lock.Unlock() + + if _, ok := ps.peers[p.id]; ok { + return errAlreadyRegistered + } + ps.peers[p.id] = p + return nil +} + +// Unregister removes a remote peer from the active set, disabling any further +// actions to/from that particular entity. +func (ps *peerSet) Unregister(id string) error { + ps.lock.Lock() + defer ps.lock.Unlock() + + if _, ok := ps.peers[id]; !ok { + return errNotRegistered + } + delete(ps.peers, id) + return nil +} + +// Peer retrieves the registered peer with the given id. +func (ps *peerSet) Peer(id string) *peer { + ps.lock.RLock() + defer ps.lock.RUnlock() + + return ps.peers[id] +} + +// Len returns if the current number of peers in the set. +func (ps *peerSet) Len() int { + ps.lock.RLock() + defer ps.lock.RUnlock() + + return len(ps.peers) +} + +// PeersWithoutBlock retrieves a list of peers that do not have a given block in +// their set of known hashes. +func (ps *peerSet) PeersWithoutBlock(hash common.Hash) []*peer { + ps.lock.RLock() + defer ps.lock.RUnlock() + + list := make([]*peer, 0, len(ps.peers)) + for _, p := range ps.peers { + if !p.blockHashes.Has(hash) { + list = append(list, p) + } + } + return list +} + +// PeersWithoutTx retrieves a list of peers that do not have a given transaction +// in their set of known hashes. +func (ps *peerSet) PeersWithoutTx(hash common.Hash) []*peer { + ps.lock.RLock() + defer ps.lock.RUnlock() + + list := make([]*peer, 0, len(ps.peers)) + for _, p := range ps.peers { + if !p.txHashes.Has(hash) { + list = append(list, p) + } + } + return list +} + +// BestPeer retrieves the known peer with the currently highest total difficulty. +func (ps *peerSet) BestPeer() *peer { + ps.lock.RLock() + defer ps.lock.RUnlock() + + var best *peer + for _, p := range ps.peers { + if best == nil || p.td.Cmp(best.td) > 0 { + best = p + } + } + return best +} diff --git a/eth/protocol.go b/eth/protocol.go index 48f37b59c..948051ed1 100644 --- a/eth/protocol.go +++ b/eth/protocol.go @@ -12,8 +12,6 @@ const ( NetworkId = 0 ProtocolLength = uint64(8) ProtocolMaxMsgSize = 10 * 1024 * 1024 - maxHashes = 512 - maxBlocks = 128 ) // eth protocol message codes diff --git a/eth/sync.go b/eth/sync.go index aa7ebc77b..62d08acb6 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -10,8 +10,8 @@ import ( "github.com/ethereum/go-ethereum/logger/glog" ) -// Sync contains all synchronisation code for the eth protocol - +// update periodically tries to synchronise with the network, both downloading +// hashes and blocks as well as retrieving cached ones. func (pm *ProtocolManager) update() { forceSync := time.Tick(forceSyncCycle) blockProc := time.Tick(blockProcCycle) @@ -20,22 +20,16 @@ func (pm *ProtocolManager) update() { for { select { case <-pm.newPeerCh: - // Meet the `minDesiredPeerCount` before we select our best peer - if len(pm.peers) < minDesiredPeerCount { + // Make sure we have peers to select from, then sync + if pm.peers.Len() < minDesiredPeerCount { break } - // Find the best peer and synchronise with it - peer := getBestPeer(pm.peers) - if peer == nil { - glog.V(logger.Debug).Infoln("Sync attempt canceled. No peers available") - } - go pm.synchronise(peer) + go pm.synchronise(pm.peers.BestPeer()) case <-forceSync: // Force a sync even if not enough peers are present - if peer := getBestPeer(pm.peers); peer != nil { - go pm.synchronise(peer) - } + go pm.synchronise(pm.peers.BestPeer()) + case <-blockProc: // Try to pull some blocks from the downloaded if atomic.CompareAndSwapInt32(&blockProcPend, 0, 1) { @@ -51,10 +45,9 @@ func (pm *ProtocolManager) update() { } } -// processBlocks will attempt to reconstruct a chain by checking the first item and check if it's -// a known parent. The first block in the chain may be unknown during downloading. When the -// downloader isn't downloading blocks will be dropped with an unknown parent until either it -// has depleted the list or found a known parent. +// processBlocks retrieves downloaded blocks from the download cache and tries +// to construct the local block chain with it. Note, since the block retrieval +// order matters, access to this function *must* be synchronized/serialized. func (pm *ProtocolManager) processBlocks() error { pm.wg.Add(1) defer pm.wg.Done() @@ -79,15 +72,24 @@ func (pm *ProtocolManager) processBlocks() error { return nil } +// synchronise tries to sync up our local block chain with a remote peer, both +// adding various sanity checks as well as wrapping it with various log entries. func (pm *ProtocolManager) synchronise(peer *peer) { + // Short circuit if no peers are available + if peer == nil { + glog.V(logger.Debug).Infoln("Synchronisation canceled: no peers available") + return + } // Make sure the peer's TD is higher than our own. If not drop. if peer.td.Cmp(pm.chainman.Td()) <= 0 { + glog.V(logger.Debug).Infoln("Synchronisation canceled: peer TD too small") return } // FIXME if we have the hash in our chain and the TD of the peer is // much higher than ours, something is wrong with us or the peer. // Check if the hash is on our own chain if pm.chainman.HasBlock(peer.recentHash) { + glog.V(logger.Debug).Infoln("Synchronisation canceled: head already known") return } // Get the hashes from the peer (synchronously) |