aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--blockpool/blockpool.go17
-rw-r--r--blockpool/blockpool_test.go6
-rw-r--r--blockpool/blockpool_util_test.go30
-rw-r--r--blockpool/config_test.go4
-rw-r--r--blockpool/errors_test.go33
-rw-r--r--blockpool/peers.go49
-rw-r--r--blockpool/test/hash_pool.go17
-rw-r--r--eth/protocol.go13
8 files changed, 125 insertions, 44 deletions
diff --git a/blockpool/blockpool.go b/blockpool/blockpool.go
index c5af481a7..ef619b27b 100644
--- a/blockpool/blockpool.go
+++ b/blockpool/blockpool.go
@@ -33,7 +33,8 @@ var (
// timeout interval: max time allowed for peer without sending a block
blocksTimeout = 60 * time.Second
//
- idleBestPeerTimeout = 120 * time.Second
+ idleBestPeerTimeout = 120 * time.Second
+ peerSuspensionInterval = 300 * time.Second
)
// config embedded in components, by default fall back to constants
@@ -48,6 +49,7 @@ type Config struct {
BlockHashesTimeout time.Duration
BlocksTimeout time.Duration
IdleBestPeerTimeout time.Duration
+ PeerSuspensionInterval time.Duration
}
// blockpool errors
@@ -96,6 +98,9 @@ func (self *Config) init() {
if self.IdleBestPeerTimeout == 0 {
self.IdleBestPeerTimeout = idleBestPeerTimeout
}
+ if self.PeerSuspensionInterval == 0 {
+ self.PeerSuspensionInterval = peerSuspensionInterval
+ }
}
// node is the basic unit of the internal model of block chain/tree in the blockpool
@@ -188,9 +193,10 @@ func (self *BlockPool) Start() {
Errors: errorToString,
Level: severity,
},
- peers: make(map[string]*peer),
- status: self.status,
- bp: self,
+ peers: make(map[string]*peer),
+ blacklist: make(map[string]time.Time),
+ status: self.status,
+ bp: self,
}
timer := time.NewTicker(3 * time.Second)
go func() {
@@ -267,7 +273,8 @@ func (self *BlockPool) AddPeer(
requestBlocks func([]common.Hash) error,
peerError func(*errs.Error),
-) (best bool) {
+) (best bool, suspended bool) {
+
return self.peers.addPeer(td, currentBlockHash, peerId, requestBlockHashes, requestBlocks, peerError)
}
diff --git a/blockpool/blockpool_test.go b/blockpool/blockpool_test.go
index 411779057..d8271886f 100644
--- a/blockpool/blockpool_test.go
+++ b/blockpool/blockpool_test.go
@@ -5,8 +5,8 @@ import (
"time"
"github.com/ethereum/go-ethereum/blockpool/test"
- "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/core/types"
)
func TestPeerWithKnownBlock(t *testing.T) {
@@ -69,8 +69,8 @@ func TestPeerPromotionByOptionalTdOnBlock(t *testing.T) {
hashes := blockPoolTester.hashPool.IndexesToHashes([]int{2, 3})
peer1.waitBlocksRequests(3)
blockPool.AddBlock(&types.Block{
- HeaderHash: common.Bytes(hashes[1]),
- ParentHeaderHash: common.Bytes(hashes[0]),
+ HeaderHash: common.Hash(hashes[1]),
+ ParentHeaderHash: common.Hash(hashes[0]),
Td: common.Big3,
}, "peer1")
diff --git a/blockpool/blockpool_util_test.go b/blockpool/blockpool_util_test.go
index 9ac996bca..5ba92066c 100644
--- a/blockpool/blockpool_util_test.go
+++ b/blockpool/blockpool_util_test.go
@@ -8,9 +8,9 @@ import (
"time"
"github.com/ethereum/go-ethereum/blockpool/test"
+ "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/errs"
- "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/pow"
)
@@ -63,10 +63,10 @@ func (self *blockPoolTester) Errorf(format string, params ...interface{}) {
// blockPoolTester implements the 3 callbacks needed by the blockPool:
// hasBlock, insetChain, verifyPoW
-func (self *blockPoolTester) hasBlock(block []byte) (ok bool) {
+func (self *blockPoolTester) hasBlock(block common.Hash) (ok bool) {
self.lock.RLock()
defer self.lock.RUnlock()
- indexes := self.hashPool.HashesToIndexes([][]byte{block})
+ indexes := self.hashPool.HashesToIndexes([]common.Hash{block})
i := indexes[0]
_, ok = self.blockChain[i]
fmt.Printf("has block %v (%x...): %v\n", i, block[0:4], ok)
@@ -80,13 +80,13 @@ func (self *blockPoolTester) insertChain(blocks types.Blocks) error {
var children, refChildren []int
var ok bool
for _, block := range blocks {
- child = self.hashPool.HashesToIndexes([][]byte{block.Hash()})[0]
+ child = self.hashPool.HashesToIndexes([]common.Hash{block.Hash()})[0]
_, ok = self.blockChain[child]
if ok {
fmt.Printf("block %v already in blockchain\n", child)
continue // already in chain
}
- parent = self.hashPool.HashesToIndexes([][]byte{block.ParentHeaderHash})[0]
+ parent = self.hashPool.HashesToIndexes([]common.Hash{block.ParentHeaderHash})[0]
children, ok = self.blockChain[parent]
if !ok {
return fmt.Errorf("parent %v not in blockchain ", parent)
@@ -274,9 +274,10 @@ func (self *blockPoolTester) initRefBlockChain(n int) {
// peerTester functions that mimic protocol calls to the blockpool
// registers the peer with the blockPool
-func (self *peerTester) AddPeer() bool {
+func (self *peerTester) AddPeer() (best bool) {
hash := self.hashPool.IndexesToHashes([]int{self.currentBlock})[0]
- return self.blockPool.AddPeer(big.NewInt(int64(self.td)), hash, self.id, self.requestBlockHashes, self.requestBlocks, self.peerError)
+ best, _ = self.blockPool.AddPeer(big.NewInt(int64(self.td)), hash, self.id, self.requestBlockHashes, self.requestBlocks, self.peerError)
+ return
}
// peer sends blockhashes if and when gets a request
@@ -291,7 +292,7 @@ func (self *peerTester) sendBlockHashes(indexes ...int) {
fmt.Printf("adding block hashes %v\n", indexes)
hashes := self.hashPool.IndexesToHashes(indexes)
i := 1
- next := func() (hash []byte, ok bool) {
+ next := func() (hash common.Hash, ok bool) {
if i < len(hashes) {
hash = hashes[i]
ok = true
@@ -315,15 +316,15 @@ func (self *peerTester) sendBlocks(indexes ...int) {
hashes := self.hashPool.IndexesToHashes(indexes)
for i := 1; i < len(hashes); i++ {
fmt.Printf("adding block %v %x\n", indexes[i], hashes[i][:4])
- self.blockPool.AddBlock(&types.Block{HeaderHash: common.Bytes(hashes[i]), ParentHeaderHash: common.Bytes(hashes[i-1])}, self.id)
+ self.blockPool.AddBlock(&types.Block{HeaderHash: hashes[i], ParentHeaderHash: hashes[i-1]}, self.id)
}
}
// peer callbacks
// -1 is special: not found (a hash never seen)
// records block hashes requests by the blockPool
-func (self *peerTester) requestBlockHashes(hash []byte) error {
- indexes := self.hashPool.HashesToIndexes([][]byte{hash})
+func (self *peerTester) requestBlockHashes(hash common.Hash) error {
+ indexes := self.hashPool.HashesToIndexes([]common.Hash{hash})
fmt.Printf("[%s] block hash request %v %x\n", self.id, indexes[0], hash[:4])
self.lock.Lock()
defer self.lock.Unlock()
@@ -332,7 +333,7 @@ func (self *peerTester) requestBlockHashes(hash []byte) error {
}
// records block requests by the blockPool
-func (self *peerTester) requestBlocks(hashes [][]byte) error {
+func (self *peerTester) requestBlocks(hashes []common.Hash) error {
indexes := self.hashPool.HashesToIndexes(hashes)
fmt.Printf("blocks request %v %x...\n", indexes, hashes[0][:4])
self.bt.reqlock.Lock()
@@ -347,4 +348,9 @@ func (self *peerTester) requestBlocks(hashes [][]byte) error {
// records the error codes of all the peerErrors found the blockPool
func (self *peerTester) peerError(err *errs.Error) {
self.peerErrors = append(self.peerErrors, err.Code)
+ fmt.Printf("Error %v on peer %v\n", err, self.id)
+ if err.Fatal() {
+ fmt.Printf("Error %v is fatal, removing peer %v\n", err, self.id)
+ self.blockPool.RemovePeer(self.id)
+ }
}
diff --git a/blockpool/config_test.go b/blockpool/config_test.go
index d5540c864..8eeaceb51 100644
--- a/blockpool/config_test.go
+++ b/blockpool/config_test.go
@@ -21,12 +21,13 @@ func TestBlockPoolConfig(t *testing.T) {
test.CheckDuration("BlockHashesTimeout", c.BlockHashesTimeout, blockHashesTimeout, t)
test.CheckDuration("BlocksTimeout", c.BlocksTimeout, blocksTimeout, t)
test.CheckDuration("IdleBestPeerTimeout", c.IdleBestPeerTimeout, idleBestPeerTimeout, t)
+ test.CheckDuration("PeerSuspensionInterval", c.PeerSuspensionInterval, peerSuspensionInterval, t)
}
func TestBlockPoolOverrideConfig(t *testing.T) {
test.LogInit()
blockPool := &BlockPool{Config: &Config{}}
- c := &Config{128, 32, 1, 0, 300 * time.Millisecond, 100 * time.Millisecond, 90 * time.Second, 0, 30 * time.Second}
+ c := &Config{128, 32, 1, 0, 300 * time.Millisecond, 100 * time.Millisecond, 90 * time.Second, 0, 30 * time.Second, 30 * time.Second}
blockPool.Config = c
blockPool.Start()
@@ -39,4 +40,5 @@ func TestBlockPoolOverrideConfig(t *testing.T) {
test.CheckDuration("BlockHashesTimeout", c.BlockHashesTimeout, 90*time.Second, t)
test.CheckDuration("BlocksTimeout", c.BlocksTimeout, blocksTimeout, t)
test.CheckDuration("IdleBestPeerTimeout", c.IdleBestPeerTimeout, 30*time.Second, t)
+ test.CheckDuration("PeerSuspensionInterval", c.PeerSuspensionInterval, 30*time.Second, t)
}
diff --git a/blockpool/errors_test.go b/blockpool/errors_test.go
index 65a161233..5188930f0 100644
--- a/blockpool/errors_test.go
+++ b/blockpool/errors_test.go
@@ -5,6 +5,7 @@ import (
"time"
"github.com/ethereum/go-ethereum/blockpool/test"
+ "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/pow"
)
@@ -45,7 +46,7 @@ func TestVerifyPoW(t *testing.T) {
first := false
blockPoolTester.blockPool.verifyPoW = func(b pow.Block) bool {
bb, _ := b.(*types.Block)
- indexes := blockPoolTester.hashPool.HashesToIndexes([][]byte{bb.Hash()})
+ indexes := blockPoolTester.hashPool.HashesToIndexes([]common.Hash{bb.Hash()})
if indexes[0] == 2 && !first {
first = true
return false
@@ -122,3 +123,33 @@ func TestErrInsufficientChainInfo(t *testing.T) {
t.Errorf("expected %v error, got %v", ErrInsufficientChainInfo, peer1.peerErrors)
}
}
+
+func TestPeerSuspension(t *testing.T) {
+ test.LogInit()
+ _, blockPool, blockPoolTester := newTestBlockPool(t)
+ blockPool.Config.PeerSuspensionInterval = 100 * time.Millisecond
+
+ blockPool.Start()
+
+ peer1 := blockPoolTester.newPeer("peer1", 1, 3)
+ peer1.AddPeer()
+ blockPool.peers.peerError("peer1", 0, "")
+ bestpeer, _ := blockPool.peers.getPeer("peer1")
+ if bestpeer != nil {
+ t.Errorf("peer1 not removed on error")
+ }
+ peer1.AddPeer()
+ bestpeer, _ = blockPool.peers.getPeer("peer1")
+ if bestpeer != nil {
+ t.Errorf("peer1 not removed on reconnect")
+ }
+ time.Sleep(100 * time.Millisecond)
+ peer1.AddPeer()
+ bestpeer, _ = blockPool.peers.getPeer("peer1")
+ if bestpeer == nil {
+ t.Errorf("peer1 not connected after PeerSuspensionInterval")
+ }
+ // blockPool.Wait(waitTimeout)
+ blockPool.Stop()
+
+}
diff --git a/blockpool/peers.go b/blockpool/peers.go
index d94d6ac46..81bab31e7 100644
--- a/blockpool/peers.go
+++ b/blockpool/peers.go
@@ -47,6 +47,8 @@ type peer struct {
blocksRequestTimer <-chan time.Time
suicideC <-chan time.Time
+ addToBlacklist func(id string)
+
idle bool
}
@@ -55,11 +57,12 @@ type peer struct {
type peers struct {
lock sync.RWMutex
- bp *BlockPool
- errors *errs.Errors
- peers map[string]*peer
- best *peer
- status *status
+ bp *BlockPool
+ errors *errs.Errors
+ peers map[string]*peer
+ best *peer
+ status *status
+ blacklist map[string]time.Time
}
// peer constructor
@@ -84,26 +87,46 @@ func (self *peers) newPeer(
headSectionC: make(chan *section),
bp: self.bp,
idle: true,
+ addToBlacklist: self.addToBlacklist,
}
// at creation the peer is recorded in the peer pool
self.peers[id] = p
return
}
-// dispatches an error to a peer if still connected
+// dispatches an error to a peer if still connected, adds it to the blacklist
func (self *peers) peerError(id string, code int, format string, params ...interface{}) {
self.lock.RLock()
- defer self.lock.RUnlock()
peer, ok := self.peers[id]
+ self.lock.RUnlock()
if ok {
peer.addError(code, format, params)
}
- // blacklisting comes here
+ self.addToBlacklist(id)
+}
+
+func (self *peers) addToBlacklist(id string) {
+ self.lock.Lock()
+ defer self.lock.Unlock()
+ self.blacklist[id] = time.Now()
+}
+
+func (self *peers) suspended(id string) (s bool) {
+ self.lock.Lock()
+ defer self.lock.Unlock()
+ if suspendedAt, ok := self.blacklist[id]; ok {
+ if s = suspendedAt.Add(self.bp.Config.PeerSuspensionInterval).After(time.Now()); !s {
+ // no longer suspended, delete entry
+ delete(self.blacklist, id)
+ }
+ }
+ return
}
func (self *peer) addError(code int, format string, params ...interface{}) {
err := self.errors.New(code, format, params...)
self.peerError(err)
+ self.addToBlacklist(self.id)
}
func (self *peer) setChainInfo(td *big.Int, c common.Hash) {
@@ -182,9 +205,13 @@ func (self *peers) addPeer(
requestBlockHashes func(common.Hash) error,
requestBlocks func([]common.Hash) error,
peerError func(*errs.Error),
-) (best bool) {
+) (best bool, suspended bool) {
var previousBlockHash common.Hash
+ if self.suspended(id) {
+ suspended = true
+ return
+ }
self.lock.Lock()
p, found := self.peers[id]
if found {
@@ -213,7 +240,7 @@ func (self *peers) addPeer(
if self.bp.hasBlock(currentBlockHash) {
// peer not ahead
plog.Debugf("addPeer: peer <%v> with td %v and current block %s is behind", id, td, hex(currentBlockHash))
- return false
+ return false, false
}
if self.best == p {
@@ -248,8 +275,10 @@ func (self *peers) addPeer(
// removePeer is called (via RemovePeer) by the eth protocol when the peer disconnects
func (self *peers) removePeer(id string) {
+ plog.Debugf("addPeer: remove peer 0 <%v>", id)
self.lock.Lock()
defer self.lock.Unlock()
+ plog.Debugf("addPeer: remove peer 1 <%v>", id)
p, found := self.peers[id]
if !found {
diff --git a/blockpool/test/hash_pool.go b/blockpool/test/hash_pool.go
index 4e0332d7d..eea135af1 100644
--- a/blockpool/test/hash_pool.go
+++ b/blockpool/test/hash_pool.go
@@ -3,6 +3,7 @@ package test
import (
"sync"
+ "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
)
@@ -13,9 +14,9 @@ func NewHashPool() *TestHashPool {
return &TestHashPool{intToHash: make(intToHash), hashToInt: make(hashToInt)}
}
-type intToHash map[int][]byte
+type intToHash map[int]common.Hash
-type hashToInt map[string]int
+type hashToInt map[common.Hash]int
// hashPool is a test helper, that allows random hashes to be referred to by integers
type TestHashPool struct {
@@ -24,11 +25,11 @@ type TestHashPool struct {
lock sync.Mutex
}
-func newHash(i int) []byte {
- return crypto.Sha3([]byte(string(i)))
+func newHash(i int) common.Hash {
+ return common.BytesToHash(crypto.Sha3([]byte(string(i))))
}
-func (self *TestHashPool) IndexesToHashes(indexes []int) (hashes [][]byte) {
+func (self *TestHashPool) IndexesToHashes(indexes []int) (hashes []common.Hash) {
self.lock.Lock()
defer self.lock.Unlock()
for _, i := range indexes {
@@ -36,18 +37,18 @@ func (self *TestHashPool) IndexesToHashes(indexes []int) (hashes [][]byte) {
if !found {
hash = newHash(i)
self.intToHash[i] = hash
- self.hashToInt[string(hash)] = i
+ self.hashToInt[hash] = i
}
hashes = append(hashes, hash)
}
return
}
-func (self *TestHashPool) HashesToIndexes(hashes [][]byte) (indexes []int) {
+func (self *TestHashPool) HashesToIndexes(hashes []common.Hash) (indexes []int) {
self.lock.Lock()
defer self.lock.Unlock()
for _, hash := range hashes {
- i, found := self.hashToInt[string(hash)]
+ i, found := self.hashToInt[hash]
if !found {
i = -1
}
diff --git a/eth/protocol.go b/eth/protocol.go
index 6d610a663..1999d9807 100644
--- a/eth/protocol.go
+++ b/eth/protocol.go
@@ -42,6 +42,7 @@ const (
ErrGenesisBlockMismatch
ErrNoStatusMsg
ErrExtraStatusMsg
+ ErrSuspendedPeer
)
var errorToString = map[int]string{
@@ -53,6 +54,7 @@ var errorToString = map[int]string{
ErrGenesisBlockMismatch: "Genesis block mismatch",
ErrNoStatusMsg: "No status message",
ErrExtraStatusMsg: "Extra status message",
+ ErrSuspendedPeer: "Suspended peer",
}
// ethProtocol represents the ethereum wire protocol
@@ -85,7 +87,7 @@ type chainManager interface {
type blockPool interface {
AddBlockHashes(next func() (common.Hash, bool), peerId string)
AddBlock(block *types.Block, peerId string)
- AddPeer(td *big.Int, currentBlock common.Hash, peerId string, requestHashes func(common.Hash) error, requestBlocks func([]common.Hash) error, peerError func(*errs.Error)) (best bool)
+ AddPeer(td *big.Int, currentBlock common.Hash, peerId string, requestHashes func(common.Hash) error, requestBlocks func([]common.Hash) error, peerError func(*errs.Error)) (best bool, suspended bool)
RemovePeer(peerId string)
}
@@ -288,7 +290,7 @@ func (self *ethProtocol) handle() error {
// to simplify backend interface adding a new block
// uses AddPeer followed by AddBlock only if peer is the best peer
// (or selected as new best peer)
- if self.blockPool.AddPeer(request.TD, hash, self.id, self.requestBlockHashes, self.requestBlocks, self.protoErrorDisconnect) {
+ if best, _ := self.blockPool.AddPeer(request.TD, hash, self.id, self.requestBlockHashes, self.requestBlocks, self.protoErrorDisconnect); best {
self.blockPool.AddBlock(request.Block, self.id)
}
@@ -334,9 +336,12 @@ func (self *ethProtocol) handleStatus() error {
return self.protoError(ErrProtocolVersionMismatch, "%d (!= %d)", status.ProtocolVersion, self.protocolVersion)
}
- self.peer.Infof("Peer is [eth] capable (%d/%d). TD=%v H=%x\n", status.ProtocolVersion, status.NetworkId, status.TD, status.CurrentBlock[:4])
+ _, suspended := self.blockPool.AddPeer(status.TD, status.CurrentBlock, self.id, self.requestBlockHashes, self.requestBlocks, self.protoErrorDisconnect)
+ if suspended {
+ return self.protoError(ErrSuspendedPeer, "")
+ }
- self.blockPool.AddPeer(status.TD, status.CurrentBlock, self.id, self.requestBlockHashes, self.requestBlocks, self.protoErrorDisconnect)
+ self.peer.Infof("Peer is [eth] capable (%d/%d). TD=%v H=%x\n", status.ProtocolVersion, status.NetworkId, status.TD, status.CurrentBlock[:4])
return nil
}