aboutsummaryrefslogtreecommitdiffstats
path: root/blockpool/peers.go
diff options
context:
space:
mode:
Diffstat (limited to 'blockpool/peers.go')
-rw-r--r--blockpool/peers.go547
1 files changed, 547 insertions, 0 deletions
diff --git a/blockpool/peers.go b/blockpool/peers.go
new file mode 100644
index 000000000..b4aa48f49
--- /dev/null
+++ b/blockpool/peers.go
@@ -0,0 +1,547 @@
+package blockpool
+
+import (
+ "bytes"
+ "math/big"
+ "math/rand"
+ "sort"
+ "sync"
+ "time"
+
+ "github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/errs"
+ "github.com/ethereum/go-ethereum/ethutil"
+)
+
+type peer struct {
+ lock sync.RWMutex
+
+ // last known blockchain status
+ td *big.Int
+ currentBlockHash []byte
+ currentBlock *types.Block
+ parentHash []byte
+ headSection *section
+
+ id string
+
+ // peer callbacks
+ requestBlockHashes func([]byte) error
+ requestBlocks func([][]byte) error
+ peerError func(*errs.Error)
+ errors *errs.Errors
+
+ sections [][]byte
+
+ // channels to push new head block and head section for peer a
+ currentBlockC chan *types.Block
+ headSectionC chan *section
+
+ // channels to signal peer switch and peer quit to section processes
+ idleC chan bool
+ switchC chan bool
+
+ quit chan bool
+ bp *BlockPool
+
+ // timers for head section process
+ blockHashesRequestTimer <-chan time.Time
+ blocksRequestTimer <-chan time.Time
+ suicideC <-chan time.Time
+
+ idle bool
+}
+
+// peers is the component keeping a record of peers in a hashmap
+//
+type peers struct {
+ lock sync.RWMutex
+
+ bp *BlockPool
+ errors *errs.Errors
+ peers map[string]*peer
+ best *peer
+ status *status
+}
+
+// peer constructor
+func (self *peers) newPeer(
+ td *big.Int,
+ currentBlockHash []byte,
+ id string,
+ requestBlockHashes func([]byte) error,
+ requestBlocks func([][]byte) error,
+ peerError func(*errs.Error),
+) (p *peer) {
+
+ p = &peer{
+ errors: self.errors,
+ td: td,
+ currentBlockHash: currentBlockHash,
+ id: id,
+ requestBlockHashes: requestBlockHashes,
+ requestBlocks: requestBlocks,
+ peerError: peerError,
+ currentBlockC: make(chan *types.Block),
+ headSectionC: make(chan *section),
+ bp: self.bp,
+ idle: true,
+ }
+ // at creation the peer is recorded in the peer pool
+ self.peers[id] = p
+ return
+}
+
+// dispatches an error to a peer if still connected
+func (self *peers) peerError(id string, code int, format string, params ...interface{}) {
+ self.lock.RLock()
+ defer self.lock.RUnlock()
+ peer, ok := self.peers[id]
+ if ok {
+ peer.addError(code, format, params)
+ }
+ // blacklisting comes here
+}
+
+func (self *peer) addError(code int, format string, params ...interface{}) {
+ err := self.errors.New(code, format, params...)
+ self.peerError(err)
+}
+
+func (self *peer) setChainInfo(td *big.Int, c []byte) {
+ self.lock.Lock()
+ defer self.lock.Unlock()
+
+ self.td = td
+ self.currentBlockHash = c
+
+ self.currentBlock = nil
+ self.parentHash = nil
+ self.headSection = nil
+}
+
+func (self *peer) setChainInfoFromBlock(block *types.Block) {
+ self.lock.Lock()
+ defer self.lock.Unlock()
+ // use the optional TD to update peer td, this helps second best peer selection
+ // in case best peer is lost
+ if block.Td != nil && block.Td.Cmp(self.td) > 0 {
+ plog.DebugDetailf("setChainInfoFromBlock: update <%s> - head: %v->%v - TD: %v->%v", self.id, hex(self.currentBlockHash), hex(block.Hash()), self.td, block.Td)
+ self.td = block.Td
+ self.currentBlockHash = block.Hash()
+ self.parentHash = block.ParentHash()
+ self.currentBlock = block
+ self.headSection = nil
+ }
+ self.bp.wg.Add(1)
+ go func() {
+ self.currentBlockC <- block
+ self.bp.wg.Done()
+ }()
+}
+
+func (self *peers) requestBlocks(attempts int, hashes [][]byte) {
+ // distribute block request among known peers
+ self.lock.RLock()
+ defer self.lock.RUnlock()
+ peerCount := len(self.peers)
+ // on first attempt use the best peer
+ if attempts == 0 {
+ plog.DebugDetailf("request %v missing blocks from best peer <%s>", len(hashes), self.best.id)
+ self.best.requestBlocks(hashes)
+ return
+ }
+ repetitions := self.bp.Config.BlocksRequestRepetition
+ if repetitions > peerCount {
+ repetitions = peerCount
+ }
+ i := 0
+ indexes := rand.Perm(peerCount)[0:repetitions]
+ sort.Ints(indexes)
+
+ plog.DebugDetailf("request %v missing blocks from %v/%v peers", len(hashes), repetitions, peerCount)
+ for _, peer := range self.peers {
+ if i == indexes[0] {
+ plog.DebugDetailf("request length: %v", len(hashes))
+ plog.DebugDetailf("request %v missing blocks [%x/%x] from peer <%s>", len(hashes), hashes[0][:4], hashes[len(hashes)-1][:4], peer.id)
+ peer.requestBlocks(hashes)
+ indexes = indexes[1:]
+ if len(indexes) == 0 {
+ break
+ }
+ }
+ i++
+ }
+ self.bp.putHashSlice(hashes)
+}
+
+// addPeer implements the logic for blockpool.AddPeer
+// returns true iff peer is promoted as best peer in the pool
+func (self *peers) addPeer(
+ td *big.Int,
+ currentBlockHash []byte,
+ id string,
+ requestBlockHashes func([]byte) error,
+ requestBlocks func([][]byte) error,
+ peerError func(*errs.Error),
+) (best bool) {
+
+ var previousBlockHash []byte
+ self.lock.Lock()
+ p, found := self.peers[id]
+ if found {
+ if !bytes.Equal(p.currentBlockHash, currentBlockHash) {
+ previousBlockHash = p.currentBlockHash
+ plog.Debugf("addPeer: Update peer <%s> with td %v and current block %s (was %v)", id, td, hex(currentBlockHash), hex(previousBlockHash))
+ p.setChainInfo(td, currentBlockHash)
+ self.status.lock.Lock()
+ self.status.values.NewBlocks++
+ self.status.lock.Unlock()
+ }
+ } else {
+ p = self.newPeer(td, currentBlockHash, id, requestBlockHashes, requestBlocks, peerError)
+
+ self.status.lock.Lock()
+
+ self.status.peers[id]++
+ self.status.values.NewBlocks++
+ self.status.lock.Unlock()
+
+ plog.Debugf("addPeer: add new peer <%v> with td %v and current block %s", id, td, hex(currentBlockHash))
+ }
+ self.lock.Unlock()
+
+ // check peer current head
+ if self.bp.hasBlock(currentBlockHash) {
+ // peer not ahead
+ plog.Debugf("addPeer: peer <%v> with td %v and current block %s is behind", id, td, hex(currentBlockHash))
+ return false
+ }
+
+ if self.best == p {
+ // new block update for active current best peer -> request hashes
+ plog.Debugf("addPeer: <%s> already the best peer. Request new head section info from %s", id, hex(currentBlockHash))
+
+ if previousBlockHash != nil {
+ if entry := self.bp.get(previousBlockHash); entry != nil {
+ p.headSectionC <- nil
+ self.bp.activateChain(entry.section, p, nil)
+ p.sections = append(p.sections, previousBlockHash)
+ }
+ }
+ best = true
+ } else {
+ currentTD := ethutil.Big0
+ if self.best != nil {
+ currentTD = self.best.td
+ }
+ if td.Cmp(currentTD) > 0 {
+ self.status.lock.Lock()
+ self.status.bestPeers[p.id]++
+ self.status.lock.Unlock()
+ plog.Debugf("addPeer: peer <%v> promoted best peer", id)
+ self.bp.switchPeer(self.best, p)
+ self.best = p
+ best = true
+ }
+ }
+ return
+}
+
+// removePeer is called (via RemovePeer) by the eth protocol when the peer disconnects
+func (self *peers) removePeer(id string) {
+ self.lock.Lock()
+ defer self.lock.Unlock()
+
+ p, found := self.peers[id]
+ if !found {
+ return
+ }
+
+ delete(self.peers, id)
+ plog.Debugf("addPeer: remove peer <%v>", id)
+
+ // if current best peer is removed, need to find a better one
+ if self.best == p {
+ var newp *peer
+ // FIXME: own TD
+ max := ethutil.Big0
+ // peer with the highest self-acclaimed TD is chosen
+ for _, pp := range self.peers {
+ if pp.td.Cmp(max) > 0 {
+ max = pp.td
+ newp = pp
+ }
+ }
+ if newp != nil {
+ self.status.lock.Lock()
+ self.status.bestPeers[p.id]++
+ self.status.lock.Unlock()
+ plog.Debugf("addPeer: peer <%v> with td %v promoted best peer", newp.id, newp.td)
+ } else {
+ plog.Warnln("addPeer: no suitable peers found")
+ }
+ self.best = newp
+ self.bp.switchPeer(p, newp)
+ }
+}
+
+// switchPeer launches section processes
+func (self *BlockPool) switchPeer(oldp, newp *peer) {
+
+ // first quit AddBlockHashes, requestHeadSection and activateChain
+ if oldp != nil {
+ plog.DebugDetailf("<%s> quit peer processes", oldp.id)
+ close(oldp.switchC)
+ }
+ if newp != nil {
+ newp.idleC = make(chan bool)
+ newp.switchC = make(chan bool)
+ // if new best peer has no head section yet, create it and run it
+ // otherwise head section is an element of peer.sections
+ if newp.headSection == nil {
+ plog.DebugDetailf("[%s] head section for [%s] not created, requesting info", newp.id, hex(newp.currentBlockHash))
+
+ if newp.idle {
+ self.wg.Add(1)
+ newp.idle = false
+ self.syncing()
+ }
+
+ go func() {
+ newp.run()
+ if !newp.idle {
+ self.wg.Done()
+ newp.idle = true
+ }
+ }()
+
+ }
+
+ var connected = make(map[string]*section)
+ var sections [][]byte
+ for _, hash := range newp.sections {
+ plog.DebugDetailf("activate chain starting from section [%s]", hex(hash))
+ // if section not connected (ie, top of a contiguous sequence of sections)
+ if connected[string(hash)] == nil {
+ // if not deleted, then reread from pool (it can be orphaned top half of a split section)
+ if entry := self.get(hash); entry != nil {
+ self.activateChain(entry.section, newp, connected)
+ connected[string(hash)] = entry.section
+ sections = append(sections, hash)
+ }
+ }
+ }
+ plog.DebugDetailf("<%s> section processes (%v non-contiguous sequences, was %v before)", newp.id, len(sections), len(newp.sections))
+ // need to lock now that newp is exposed to section processes
+ newp.lock.Lock()
+ newp.sections = sections
+ newp.lock.Unlock()
+ }
+ // finally deactivate section process for sections where newp didnt activate
+ // newp activating section process changes the quit channel for this reason
+ if oldp != nil {
+ plog.DebugDetailf("<%s> quit section processes", oldp.id)
+ //
+ close(oldp.idleC)
+ }
+}
+
+func (self *peers) getPeer(id string) (p *peer, best bool) {
+ self.lock.RLock()
+ defer self.lock.RUnlock()
+ if self.best != nil && self.best.id == id {
+ return self.best, true
+ }
+ p = self.peers[id]
+ return
+}
+
+func (self *peer) handleSection(sec *section) {
+ self.lock.Lock()
+ defer self.lock.Unlock()
+
+ self.headSection = sec
+ self.blockHashesRequestTimer = nil
+
+ if sec == nil {
+ if self.idle {
+ self.idle = false
+ self.bp.wg.Add(1)
+ self.bp.syncing()
+ }
+
+ self.suicideC = time.After(self.bp.Config.BlockHashesTimeout)
+
+ plog.DebugDetailf("HeadSection: <%s> head block hash changed (mined block received). New head %s", self.id, hex(self.currentBlockHash))
+ } else {
+ if !self.idle {
+ self.idle = true
+ self.bp.wg.Done()
+ }
+ plog.DebugDetailf("HeadSection: <%s> head section [%s] created", self.id, sectionhex(sec))
+ self.suicideC = time.After(self.bp.Config.IdleBestPeerTimeout)
+ }
+}
+
+func (self *peer) getCurrentBlock(currentBlock *types.Block) {
+ // called by update or after AddBlock signals that head block of current peer is received
+ if currentBlock == nil {
+ if entry := self.bp.get(self.currentBlockHash); entry != nil {
+ entry.node.lock.Lock()
+ currentBlock = entry.node.block
+ entry.node.lock.Unlock()
+ }
+ if currentBlock != nil {
+ plog.DebugDetailf("HeadSection: <%s> head block %s found in blockpool", self.id, hex(self.currentBlockHash))
+ } else {
+ plog.DebugDetailf("HeadSection: <%s> head block %s not found... requesting it", self.id, hex(self.currentBlockHash))
+ self.requestBlocks([][]byte{self.currentBlockHash})
+ self.blocksRequestTimer = time.After(self.bp.Config.BlocksRequestInterval)
+ return
+ }
+ } else {
+ plog.DebugDetailf("HeadSection: <%s> head block %s received (parent: %s)", self.id, hex(self.currentBlockHash), hex(currentBlock.ParentHash()))
+ }
+
+ self.lock.Lock()
+ defer self.lock.Unlock()
+ self.currentBlock = currentBlock
+ self.parentHash = currentBlock.ParentHash()
+ plog.DebugDetailf("HeadSection: <%s> head block %s found (parent: [%s])... requesting hashes", self.id, hex(self.currentBlockHash), hex(self.parentHash))
+ self.blockHashesRequestTimer = time.After(0)
+ self.blocksRequestTimer = nil
+}
+
+func (self *peer) getBlockHashes() {
+ //if connecting parent is found
+ if self.bp.hasBlock(self.parentHash) {
+ plog.DebugDetailf("HeadSection: <%s> parent block %s found in blockchain", self.id, hex(self.parentHash))
+ err := self.bp.insertChain(types.Blocks([]*types.Block{self.currentBlock}))
+ if err != nil {
+ self.addError(ErrInvalidBlock, "%v", err)
+
+ self.bp.status.lock.Lock()
+ self.bp.status.badPeers[self.id]++
+ self.bp.status.lock.Unlock()
+ }
+ } else {
+ if parent := self.bp.get(self.parentHash); parent != nil {
+ if self.bp.get(self.currentBlockHash) == nil {
+ plog.DebugDetailf("HeadSection: <%s> connecting parent %s found in pool... creating singleton section", self.id, hex(self.parentHash))
+ n := &node{
+ hash: self.currentBlockHash,
+ block: self.currentBlock,
+ hashBy: self.id,
+ blockBy: self.id,
+ }
+ self.bp.newSection([]*node{n}).activate(self)
+ } else {
+ plog.DebugDetailf("HeadSection: <%s> connecting parent %s found in pool...head section [%s] exists...not requesting hashes", self.id, hex(self.parentHash), sectionhex(parent.section))
+ self.bp.activateChain(parent.section, self, nil)
+ }
+ } else {
+ plog.DebugDetailf("HeadSection: <%s> section [%s] requestBlockHashes", self.id, sectionhex(self.headSection))
+ self.requestBlockHashes(self.currentBlockHash)
+ self.blockHashesRequestTimer = time.After(self.bp.Config.BlockHashesRequestInterval)
+ return
+ }
+ }
+ self.blockHashesRequestTimer = nil
+ if !self.idle {
+ self.idle = true
+ self.suicideC = time.After(self.bp.Config.IdleBestPeerTimeout)
+ self.bp.wg.Done()
+ }
+}
+
+// main loop for head section process
+func (self *peer) run() {
+
+ self.lock.RLock()
+ switchC := self.switchC
+ currentBlockHash := self.currentBlockHash
+ self.lock.RUnlock()
+
+ self.blockHashesRequestTimer = nil
+
+ self.blocksRequestTimer = time.After(0)
+ self.suicideC = time.After(self.bp.Config.BlockHashesTimeout)
+
+ var quit chan bool
+
+ var ping = time.NewTicker(5 * time.Second)
+
+LOOP:
+ for {
+ select {
+ // to minitor section process behaviou
+ case <-ping.C:
+ plog.Debugf("HeadSection: <%s> section with head %s, idle: %v", self.id, hex(self.currentBlockHash), self.idle)
+
+ // idle timer started when process goes idle
+ case <-self.idleC:
+ if self.idle {
+ self.peerError(self.bp.peers.errors.New(ErrIdleTooLong, "timed out without providing new blocks...quitting", currentBlockHash))
+
+ self.bp.status.lock.Lock()
+ self.bp.status.badPeers[self.id]++
+ self.bp.status.lock.Unlock()
+ }
+
+ // signal from AddBlockHashes that head section for current best peer is created
+ // if sec == nil, it signals that chain info has updated (new block message)
+ case sec := <-self.headSectionC:
+ self.handleSection(sec)
+ // local var quit channel is linked to sections suicide channel so that
+ if sec == nil {
+ quit = nil
+ } else {
+ quit = sec.suicideC
+ }
+
+ // periodic check for block hashes or parent block/section
+ case <-self.blockHashesRequestTimer:
+ self.getBlockHashes()
+
+ // signal from AddBlock that head block of current best peer has been received
+ case currentBlock := <-self.currentBlockC:
+ self.getCurrentBlock(currentBlock)
+
+ // keep requesting until found or timed out
+ case <-self.blocksRequestTimer:
+ self.getCurrentBlock(nil)
+
+ // quitting on timeout
+ case <-self.suicideC:
+ self.peerError(self.bp.peers.errors.New(ErrInsufficientChainInfo, "timed out without providing block hashes or head block %x", currentBlockHash))
+
+ self.bp.status.lock.Lock()
+ self.bp.status.badPeers[self.id]++
+ self.bp.status.lock.Unlock()
+ // there is no persistence here, so GC will just take care of cleaning up
+ break LOOP
+
+ // signal for peer switch, quit
+ case <-switchC:
+ var complete = "incomplete "
+ if self.idle {
+ complete = "complete"
+ }
+ plog.Debugf("HeadSection: <%s> section with head %s %s... quit request loop due to peer switch", self.id, hex(self.currentBlockHash), complete)
+ break LOOP
+
+ // global quit for blockpool
+ case <-self.bp.quit:
+ break LOOP
+
+ // quit
+ case <-quit:
+ break LOOP
+ }
+ }
+ if !self.idle {
+ self.idle = true
+ self.bp.wg.Done()
+ }
+}