diff options
-rw-r--r-- | blockpool/blockpool.go | 17 | ||||
-rw-r--r-- | blockpool/blockpool_test.go | 6 | ||||
-rw-r--r-- | blockpool/blockpool_util_test.go | 30 | ||||
-rw-r--r-- | blockpool/config_test.go | 4 | ||||
-rw-r--r-- | blockpool/errors_test.go | 33 | ||||
-rw-r--r-- | blockpool/peers.go | 49 | ||||
-rw-r--r-- | blockpool/test/hash_pool.go | 17 | ||||
-rw-r--r-- | eth/protocol.go | 13 |
8 files changed, 125 insertions, 44 deletions
diff --git a/blockpool/blockpool.go b/blockpool/blockpool.go index c5af481a7..ef619b27b 100644 --- a/blockpool/blockpool.go +++ b/blockpool/blockpool.go @@ -33,7 +33,8 @@ var ( // timeout interval: max time allowed for peer without sending a block blocksTimeout = 60 * time.Second // - idleBestPeerTimeout = 120 * time.Second + idleBestPeerTimeout = 120 * time.Second + peerSuspensionInterval = 300 * time.Second ) // config embedded in components, by default fall back to constants @@ -48,6 +49,7 @@ type Config struct { BlockHashesTimeout time.Duration BlocksTimeout time.Duration IdleBestPeerTimeout time.Duration + PeerSuspensionInterval time.Duration } // blockpool errors @@ -96,6 +98,9 @@ func (self *Config) init() { if self.IdleBestPeerTimeout == 0 { self.IdleBestPeerTimeout = idleBestPeerTimeout } + if self.PeerSuspensionInterval == 0 { + self.PeerSuspensionInterval = peerSuspensionInterval + } } // node is the basic unit of the internal model of block chain/tree in the blockpool @@ -188,9 +193,10 @@ func (self *BlockPool) Start() { Errors: errorToString, Level: severity, }, - peers: make(map[string]*peer), - status: self.status, - bp: self, + peers: make(map[string]*peer), + blacklist: make(map[string]time.Time), + status: self.status, + bp: self, } timer := time.NewTicker(3 * time.Second) go func() { @@ -267,7 +273,8 @@ func (self *BlockPool) AddPeer( requestBlocks func([]common.Hash) error, peerError func(*errs.Error), -) (best bool) { +) (best bool, suspended bool) { + return self.peers.addPeer(td, currentBlockHash, peerId, requestBlockHashes, requestBlocks, peerError) } diff --git a/blockpool/blockpool_test.go b/blockpool/blockpool_test.go index 411779057..d8271886f 100644 --- a/blockpool/blockpool_test.go +++ b/blockpool/blockpool_test.go @@ -5,8 +5,8 @@ import ( "time" "github.com/ethereum/go-ethereum/blockpool/test" - "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" ) func TestPeerWithKnownBlock(t *testing.T) { @@ -69,8 +69,8 @@ func TestPeerPromotionByOptionalTdOnBlock(t *testing.T) { hashes := blockPoolTester.hashPool.IndexesToHashes([]int{2, 3}) peer1.waitBlocksRequests(3) blockPool.AddBlock(&types.Block{ - HeaderHash: common.Bytes(hashes[1]), - ParentHeaderHash: common.Bytes(hashes[0]), + HeaderHash: common.Hash(hashes[1]), + ParentHeaderHash: common.Hash(hashes[0]), Td: common.Big3, }, "peer1") diff --git a/blockpool/blockpool_util_test.go b/blockpool/blockpool_util_test.go index 9ac996bca..5ba92066c 100644 --- a/blockpool/blockpool_util_test.go +++ b/blockpool/blockpool_util_test.go @@ -8,9 +8,9 @@ import ( "time" "github.com/ethereum/go-ethereum/blockpool/test" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/errs" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/pow" ) @@ -63,10 +63,10 @@ func (self *blockPoolTester) Errorf(format string, params ...interface{}) { // blockPoolTester implements the 3 callbacks needed by the blockPool: // hasBlock, insetChain, verifyPoW -func (self *blockPoolTester) hasBlock(block []byte) (ok bool) { +func (self *blockPoolTester) hasBlock(block common.Hash) (ok bool) { self.lock.RLock() defer self.lock.RUnlock() - indexes := self.hashPool.HashesToIndexes([][]byte{block}) + indexes := self.hashPool.HashesToIndexes([]common.Hash{block}) i := indexes[0] _, ok = self.blockChain[i] fmt.Printf("has block %v (%x...): %v\n", i, block[0:4], ok) @@ -80,13 +80,13 @@ func (self *blockPoolTester) insertChain(blocks types.Blocks) error { var children, refChildren []int var ok bool for _, block := range blocks { - child = self.hashPool.HashesToIndexes([][]byte{block.Hash()})[0] + child = self.hashPool.HashesToIndexes([]common.Hash{block.Hash()})[0] _, ok = self.blockChain[child] if ok { fmt.Printf("block %v already in blockchain\n", child) continue // already in chain } - parent = self.hashPool.HashesToIndexes([][]byte{block.ParentHeaderHash})[0] + parent = self.hashPool.HashesToIndexes([]common.Hash{block.ParentHeaderHash})[0] children, ok = self.blockChain[parent] if !ok { return fmt.Errorf("parent %v not in blockchain ", parent) @@ -274,9 +274,10 @@ func (self *blockPoolTester) initRefBlockChain(n int) { // peerTester functions that mimic protocol calls to the blockpool // registers the peer with the blockPool -func (self *peerTester) AddPeer() bool { +func (self *peerTester) AddPeer() (best bool) { hash := self.hashPool.IndexesToHashes([]int{self.currentBlock})[0] - return self.blockPool.AddPeer(big.NewInt(int64(self.td)), hash, self.id, self.requestBlockHashes, self.requestBlocks, self.peerError) + best, _ = self.blockPool.AddPeer(big.NewInt(int64(self.td)), hash, self.id, self.requestBlockHashes, self.requestBlocks, self.peerError) + return } // peer sends blockhashes if and when gets a request @@ -291,7 +292,7 @@ func (self *peerTester) sendBlockHashes(indexes ...int) { fmt.Printf("adding block hashes %v\n", indexes) hashes := self.hashPool.IndexesToHashes(indexes) i := 1 - next := func() (hash []byte, ok bool) { + next := func() (hash common.Hash, ok bool) { if i < len(hashes) { hash = hashes[i] ok = true @@ -315,15 +316,15 @@ func (self *peerTester) sendBlocks(indexes ...int) { hashes := self.hashPool.IndexesToHashes(indexes) for i := 1; i < len(hashes); i++ { fmt.Printf("adding block %v %x\n", indexes[i], hashes[i][:4]) - self.blockPool.AddBlock(&types.Block{HeaderHash: common.Bytes(hashes[i]), ParentHeaderHash: common.Bytes(hashes[i-1])}, self.id) + self.blockPool.AddBlock(&types.Block{HeaderHash: hashes[i], ParentHeaderHash: hashes[i-1]}, self.id) } } // peer callbacks // -1 is special: not found (a hash never seen) // records block hashes requests by the blockPool -func (self *peerTester) requestBlockHashes(hash []byte) error { - indexes := self.hashPool.HashesToIndexes([][]byte{hash}) +func (self *peerTester) requestBlockHashes(hash common.Hash) error { + indexes := self.hashPool.HashesToIndexes([]common.Hash{hash}) fmt.Printf("[%s] block hash request %v %x\n", self.id, indexes[0], hash[:4]) self.lock.Lock() defer self.lock.Unlock() @@ -332,7 +333,7 @@ func (self *peerTester) requestBlockHashes(hash []byte) error { } // records block requests by the blockPool -func (self *peerTester) requestBlocks(hashes [][]byte) error { +func (self *peerTester) requestBlocks(hashes []common.Hash) error { indexes := self.hashPool.HashesToIndexes(hashes) fmt.Printf("blocks request %v %x...\n", indexes, hashes[0][:4]) self.bt.reqlock.Lock() @@ -347,4 +348,9 @@ func (self *peerTester) requestBlocks(hashes [][]byte) error { // records the error codes of all the peerErrors found the blockPool func (self *peerTester) peerError(err *errs.Error) { self.peerErrors = append(self.peerErrors, err.Code) + fmt.Printf("Error %v on peer %v\n", err, self.id) + if err.Fatal() { + fmt.Printf("Error %v is fatal, removing peer %v\n", err, self.id) + self.blockPool.RemovePeer(self.id) + } } diff --git a/blockpool/config_test.go b/blockpool/config_test.go index d5540c864..8eeaceb51 100644 --- a/blockpool/config_test.go +++ b/blockpool/config_test.go @@ -21,12 +21,13 @@ func TestBlockPoolConfig(t *testing.T) { test.CheckDuration("BlockHashesTimeout", c.BlockHashesTimeout, blockHashesTimeout, t) test.CheckDuration("BlocksTimeout", c.BlocksTimeout, blocksTimeout, t) test.CheckDuration("IdleBestPeerTimeout", c.IdleBestPeerTimeout, idleBestPeerTimeout, t) + test.CheckDuration("PeerSuspensionInterval", c.PeerSuspensionInterval, peerSuspensionInterval, t) } func TestBlockPoolOverrideConfig(t *testing.T) { test.LogInit() blockPool := &BlockPool{Config: &Config{}} - c := &Config{128, 32, 1, 0, 300 * time.Millisecond, 100 * time.Millisecond, 90 * time.Second, 0, 30 * time.Second} + c := &Config{128, 32, 1, 0, 300 * time.Millisecond, 100 * time.Millisecond, 90 * time.Second, 0, 30 * time.Second, 30 * time.Second} blockPool.Config = c blockPool.Start() @@ -39,4 +40,5 @@ func TestBlockPoolOverrideConfig(t *testing.T) { test.CheckDuration("BlockHashesTimeout", c.BlockHashesTimeout, 90*time.Second, t) test.CheckDuration("BlocksTimeout", c.BlocksTimeout, blocksTimeout, t) test.CheckDuration("IdleBestPeerTimeout", c.IdleBestPeerTimeout, 30*time.Second, t) + test.CheckDuration("PeerSuspensionInterval", c.PeerSuspensionInterval, 30*time.Second, t) } diff --git a/blockpool/errors_test.go b/blockpool/errors_test.go index 65a161233..5188930f0 100644 --- a/blockpool/errors_test.go +++ b/blockpool/errors_test.go @@ -5,6 +5,7 @@ import ( "time" "github.com/ethereum/go-ethereum/blockpool/test" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/pow" ) @@ -45,7 +46,7 @@ func TestVerifyPoW(t *testing.T) { first := false blockPoolTester.blockPool.verifyPoW = func(b pow.Block) bool { bb, _ := b.(*types.Block) - indexes := blockPoolTester.hashPool.HashesToIndexes([][]byte{bb.Hash()}) + indexes := blockPoolTester.hashPool.HashesToIndexes([]common.Hash{bb.Hash()}) if indexes[0] == 2 && !first { first = true return false @@ -122,3 +123,33 @@ func TestErrInsufficientChainInfo(t *testing.T) { t.Errorf("expected %v error, got %v", ErrInsufficientChainInfo, peer1.peerErrors) } } + +func TestPeerSuspension(t *testing.T) { + test.LogInit() + _, blockPool, blockPoolTester := newTestBlockPool(t) + blockPool.Config.PeerSuspensionInterval = 100 * time.Millisecond + + blockPool.Start() + + peer1 := blockPoolTester.newPeer("peer1", 1, 3) + peer1.AddPeer() + blockPool.peers.peerError("peer1", 0, "") + bestpeer, _ := blockPool.peers.getPeer("peer1") + if bestpeer != nil { + t.Errorf("peer1 not removed on error") + } + peer1.AddPeer() + bestpeer, _ = blockPool.peers.getPeer("peer1") + if bestpeer != nil { + t.Errorf("peer1 not removed on reconnect") + } + time.Sleep(100 * time.Millisecond) + peer1.AddPeer() + bestpeer, _ = blockPool.peers.getPeer("peer1") + if bestpeer == nil { + t.Errorf("peer1 not connected after PeerSuspensionInterval") + } + // blockPool.Wait(waitTimeout) + blockPool.Stop() + +} diff --git a/blockpool/peers.go b/blockpool/peers.go index d94d6ac46..81bab31e7 100644 --- a/blockpool/peers.go +++ b/blockpool/peers.go @@ -47,6 +47,8 @@ type peer struct { blocksRequestTimer <-chan time.Time suicideC <-chan time.Time + addToBlacklist func(id string) + idle bool } @@ -55,11 +57,12 @@ type peer struct { type peers struct { lock sync.RWMutex - bp *BlockPool - errors *errs.Errors - peers map[string]*peer - best *peer - status *status + bp *BlockPool + errors *errs.Errors + peers map[string]*peer + best *peer + status *status + blacklist map[string]time.Time } // peer constructor @@ -84,26 +87,46 @@ func (self *peers) newPeer( headSectionC: make(chan *section), bp: self.bp, idle: true, + addToBlacklist: self.addToBlacklist, } // at creation the peer is recorded in the peer pool self.peers[id] = p return } -// dispatches an error to a peer if still connected +// dispatches an error to a peer if still connected, adds it to the blacklist func (self *peers) peerError(id string, code int, format string, params ...interface{}) { self.lock.RLock() - defer self.lock.RUnlock() peer, ok := self.peers[id] + self.lock.RUnlock() if ok { peer.addError(code, format, params) } - // blacklisting comes here + self.addToBlacklist(id) +} + +func (self *peers) addToBlacklist(id string) { + self.lock.Lock() + defer self.lock.Unlock() + self.blacklist[id] = time.Now() +} + +func (self *peers) suspended(id string) (s bool) { + self.lock.Lock() + defer self.lock.Unlock() + if suspendedAt, ok := self.blacklist[id]; ok { + if s = suspendedAt.Add(self.bp.Config.PeerSuspensionInterval).After(time.Now()); !s { + // no longer suspended, delete entry + delete(self.blacklist, id) + } + } + return } func (self *peer) addError(code int, format string, params ...interface{}) { err := self.errors.New(code, format, params...) self.peerError(err) + self.addToBlacklist(self.id) } func (self *peer) setChainInfo(td *big.Int, c common.Hash) { @@ -182,9 +205,13 @@ func (self *peers) addPeer( requestBlockHashes func(common.Hash) error, requestBlocks func([]common.Hash) error, peerError func(*errs.Error), -) (best bool) { +) (best bool, suspended bool) { var previousBlockHash common.Hash + if self.suspended(id) { + suspended = true + return + } self.lock.Lock() p, found := self.peers[id] if found { @@ -213,7 +240,7 @@ func (self *peers) addPeer( if self.bp.hasBlock(currentBlockHash) { // peer not ahead plog.Debugf("addPeer: peer <%v> with td %v and current block %s is behind", id, td, hex(currentBlockHash)) - return false + return false, false } if self.best == p { @@ -248,8 +275,10 @@ func (self *peers) addPeer( // removePeer is called (via RemovePeer) by the eth protocol when the peer disconnects func (self *peers) removePeer(id string) { + plog.Debugf("addPeer: remove peer 0 <%v>", id) self.lock.Lock() defer self.lock.Unlock() + plog.Debugf("addPeer: remove peer 1 <%v>", id) p, found := self.peers[id] if !found { diff --git a/blockpool/test/hash_pool.go b/blockpool/test/hash_pool.go index 4e0332d7d..eea135af1 100644 --- a/blockpool/test/hash_pool.go +++ b/blockpool/test/hash_pool.go @@ -3,6 +3,7 @@ package test import ( "sync" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" ) @@ -13,9 +14,9 @@ func NewHashPool() *TestHashPool { return &TestHashPool{intToHash: make(intToHash), hashToInt: make(hashToInt)} } -type intToHash map[int][]byte +type intToHash map[int]common.Hash -type hashToInt map[string]int +type hashToInt map[common.Hash]int // hashPool is a test helper, that allows random hashes to be referred to by integers type TestHashPool struct { @@ -24,11 +25,11 @@ type TestHashPool struct { lock sync.Mutex } -func newHash(i int) []byte { - return crypto.Sha3([]byte(string(i))) +func newHash(i int) common.Hash { + return common.BytesToHash(crypto.Sha3([]byte(string(i)))) } -func (self *TestHashPool) IndexesToHashes(indexes []int) (hashes [][]byte) { +func (self *TestHashPool) IndexesToHashes(indexes []int) (hashes []common.Hash) { self.lock.Lock() defer self.lock.Unlock() for _, i := range indexes { @@ -36,18 +37,18 @@ func (self *TestHashPool) IndexesToHashes(indexes []int) (hashes [][]byte) { if !found { hash = newHash(i) self.intToHash[i] = hash - self.hashToInt[string(hash)] = i + self.hashToInt[hash] = i } hashes = append(hashes, hash) } return } -func (self *TestHashPool) HashesToIndexes(hashes [][]byte) (indexes []int) { +func (self *TestHashPool) HashesToIndexes(hashes []common.Hash) (indexes []int) { self.lock.Lock() defer self.lock.Unlock() for _, hash := range hashes { - i, found := self.hashToInt[string(hash)] + i, found := self.hashToInt[hash] if !found { i = -1 } diff --git a/eth/protocol.go b/eth/protocol.go index 6d610a663..1999d9807 100644 --- a/eth/protocol.go +++ b/eth/protocol.go @@ -42,6 +42,7 @@ const ( ErrGenesisBlockMismatch ErrNoStatusMsg ErrExtraStatusMsg + ErrSuspendedPeer ) var errorToString = map[int]string{ @@ -53,6 +54,7 @@ var errorToString = map[int]string{ ErrGenesisBlockMismatch: "Genesis block mismatch", ErrNoStatusMsg: "No status message", ErrExtraStatusMsg: "Extra status message", + ErrSuspendedPeer: "Suspended peer", } // ethProtocol represents the ethereum wire protocol @@ -85,7 +87,7 @@ type chainManager interface { type blockPool interface { AddBlockHashes(next func() (common.Hash, bool), peerId string) AddBlock(block *types.Block, peerId string) - AddPeer(td *big.Int, currentBlock common.Hash, peerId string, requestHashes func(common.Hash) error, requestBlocks func([]common.Hash) error, peerError func(*errs.Error)) (best bool) + AddPeer(td *big.Int, currentBlock common.Hash, peerId string, requestHashes func(common.Hash) error, requestBlocks func([]common.Hash) error, peerError func(*errs.Error)) (best bool, suspended bool) RemovePeer(peerId string) } @@ -288,7 +290,7 @@ func (self *ethProtocol) handle() error { // to simplify backend interface adding a new block // uses AddPeer followed by AddBlock only if peer is the best peer // (or selected as new best peer) - if self.blockPool.AddPeer(request.TD, hash, self.id, self.requestBlockHashes, self.requestBlocks, self.protoErrorDisconnect) { + if best, _ := self.blockPool.AddPeer(request.TD, hash, self.id, self.requestBlockHashes, self.requestBlocks, self.protoErrorDisconnect); best { self.blockPool.AddBlock(request.Block, self.id) } @@ -334,9 +336,12 @@ func (self *ethProtocol) handleStatus() error { return self.protoError(ErrProtocolVersionMismatch, "%d (!= %d)", status.ProtocolVersion, self.protocolVersion) } - self.peer.Infof("Peer is [eth] capable (%d/%d). TD=%v H=%x\n", status.ProtocolVersion, status.NetworkId, status.TD, status.CurrentBlock[:4]) + _, suspended := self.blockPool.AddPeer(status.TD, status.CurrentBlock, self.id, self.requestBlockHashes, self.requestBlocks, self.protoErrorDisconnect) + if suspended { + return self.protoError(ErrSuspendedPeer, "") + } - self.blockPool.AddPeer(status.TD, status.CurrentBlock, self.id, self.requestBlockHashes, self.requestBlocks, self.protoErrorDisconnect) + self.peer.Infof("Peer is [eth] capable (%d/%d). TD=%v H=%x\n", status.ProtocolVersion, status.NetworkId, status.TD, status.CurrentBlock[:4]) return nil } |