aboutsummaryrefslogblamecommitdiffstats
path: root/blockpool/peers.go
blob: d56e270ab3589e46369dc2108512c280a71cf845 (plain) (tree)






































                                                                      
                                                                            


                         
                     



                                                
                                                





































































































































































                                                                                                                                                                              
                                                                                                                          






































































                                                                                                                                    
                                        








































































                                                                                                                                                    
                                                                                                                                                                             










                                          
                                                                             




                                                                                                                                                       

                                         
                                                                                                                                                 
                                                                              


























                                                                                                                                                               
                                                                                                                                                                








                                                                                                                         




                                                     

                                                                 
                                                          







                                                                                    
                 

                                            
























                                                                                                                                                                                                                       
                                   








                                     




                                               
                                                                     
 
                                 





                                                  
                                                      






                                                                                                                                       
                                       
                                                                                                                                                                                                      

                                          

                                                                                                                                                                                                             














                                                                                              
                                     
                                                                                                                                                                                                            





















                                                                                                                                                                       





                                                                                                                                                                                          







                                  
package blockpool

import (
    "bytes"
    "math/big"
    "math/rand"
    "sort"
    "sync"
    "time"

    "github.com/ethereum/go-ethereum/core/types"
    "github.com/ethereum/go-ethereum/errs"
    "github.com/ethereum/go-ethereum/ethutil"
)

type peer struct {
    lock sync.RWMutex

    // last known blockchain status
    td               *big.Int
    currentBlockHash []byte
    currentBlock     *types.Block
    parentHash       []byte
    headSection      *section

    id string

    // peer callbacks
    requestBlockHashes func([]byte) error
    requestBlocks      func([][]byte) error
    peerError          func(*errs.Error)
    errors             *errs.Errors

    sections [][]byte

    // channels to push new head block and head section for peer a
    currentBlockC chan *types.Block
    headSectionC  chan *section

    // channels to signal peer switch and peer quit to section processes
    idleC   chan bool
    switchC chan bool

    bp *BlockPool

    // timers for head section process
    blockHashesRequestTimer <-chan time.Time
    blocksRequestTimer      <-chan time.Time
    suicideC                <-chan time.Time

    idle bool
}

// peers is the component keeping a record of peers in a hashmap
//
type peers struct {
    lock sync.RWMutex

    bp     *BlockPool
    errors *errs.Errors
    peers  map[string]*peer
    best   *peer
    status *status
}

// peer constructor
func (self *peers) newPeer(
    td *big.Int,
    currentBlockHash []byte,
    id string,
    requestBlockHashes func([]byte) error,
    requestBlocks func([][]byte) error,
    peerError func(*errs.Error),
) (p *peer) {

    p = &peer{
        errors:             self.errors,
        td:                 td,
        currentBlockHash:   currentBlockHash,
        id:                 id,
        requestBlockHashes: requestBlockHashes,
        requestBlocks:      requestBlocks,
        peerError:          peerError,
        currentBlockC:      make(chan *types.Block),
        headSectionC:       make(chan *section),
        bp:                 self.bp,
        idle:               true,
    }
    // at creation the peer is recorded in the peer pool
    self.peers[id] = p
    return
}

// dispatches an error to a peer if still connected
func (self *peers) peerError(id string, code int, format string, params ...interface{}) {
    self.lock.RLock()
    defer self.lock.RUnlock()
    peer, ok := self.peers[id]
    if ok {
        peer.addError(code, format, params)
    }
    // blacklisting comes here
}

func (self *peer) addError(code int, format string, params ...interface{}) {
    err := self.errors.New(code, format, params...)
    self.peerError(err)
}

func (self *peer) setChainInfo(td *big.Int, c []byte) {
    self.lock.Lock()
    defer self.lock.Unlock()

    self.td = td
    self.currentBlockHash = c

    self.currentBlock = nil
    self.parentHash = nil
    self.headSection = nil
}

func (self *peer) setChainInfoFromBlock(block *types.Block) {
    self.lock.Lock()
    defer self.lock.Unlock()
    // use the optional TD to update peer td, this helps second best peer selection
    // in case best peer is lost
    if block.Td != nil && block.Td.Cmp(self.td) > 0 {
        plog.DebugDetailf("setChainInfoFromBlock: update <%s> - head: %v->%v - TD: %v->%v", self.id, hex(self.currentBlockHash), hex(block.Hash()), self.td, block.Td)
        self.td = block.Td
        self.currentBlockHash = block.Hash()
        self.parentHash = block.ParentHash()
        self.currentBlock = block
        self.headSection = nil
    }
    self.bp.wg.Add(1)
    go func() {
        self.currentBlockC <- block
        self.bp.wg.Done()
    }()
}

func (self *peers) requestBlocks(attempts int, hashes [][]byte) {
    // distribute block request among known peers
    self.lock.RLock()
    defer self.lock.RUnlock()
    peerCount := len(self.peers)
    // on first attempt use the best peer
    if attempts == 0 {
        plog.DebugDetailf("request %v missing blocks from best peer <%s>", len(hashes), self.best.id)
        self.best.requestBlocks(hashes)
        return
    }
    repetitions := self.bp.Config.BlocksRequestRepetition
    if repetitions > peerCount {
        repetitions = peerCount
    }
    i := 0
    indexes := rand.Perm(peerCount)[0:repetitions]
    sort.Ints(indexes)

    plog.DebugDetailf("request %v missing blocks from %v/%v peers", len(hashes), repetitions, peerCount)
    for _, peer := range self.peers {
        if i == indexes[0] {
            plog.DebugDetailf("request length: %v", len(hashes))
            plog.DebugDetailf("request %v missing blocks [%x/%x] from peer <%s>", len(hashes), hashes[0][:4], hashes[len(hashes)-1][:4], peer.id)
            peer.requestBlocks(hashes)
            indexes = indexes[1:]
            if len(indexes) == 0 {
                break
            }
        }
        i++
    }
    self.bp.putHashSlice(hashes)
}

// addPeer implements the logic for blockpool.AddPeer
// returns true iff peer is promoted as best peer in the pool
func (self *peers) addPeer(
    td *big.Int,
    currentBlockHash []byte,
    id string,
    requestBlockHashes func([]byte) error,
    requestBlocks func([][]byte) error,
    peerError func(*errs.Error),
) (best bool) {

    var previousBlockHash []byte
    self.lock.Lock()
    p, found := self.peers[id]
    if found {
        if !bytes.Equal(p.currentBlockHash, currentBlockHash) {
            previousBlockHash = p.currentBlockHash
            plog.Debugf("addPeer: Update peer <%s> with td %v and current block %s (was %v)", id, td, hex(currentBlockHash), hex(previousBlockHash))
            p.setChainInfo(td, currentBlockHash)
            self.status.lock.Lock()
            self.status.values.NewBlocks++
            self.status.lock.Unlock()
        }
    } else {
        p = self.newPeer(td, currentBlockHash, id, requestBlockHashes, requestBlocks, peerError)

        self.status.lock.Lock()

        self.status.peers[id]++
        self.status.values.NewBlocks++
        self.status.lock.Unlock()

        plog.Debugf("addPeer: add new peer <%v> with td %v and current block %s", id, td, hex(currentBlockHash))
    }
    self.lock.Unlock()

    // check peer current head
    if self.bp.hasBlock(currentBlockHash) {
        // peer not ahead
        plog.Debugf("addPeer: peer <%v> with td %v and current block %s is behind", id, td, hex(currentBlockHash))
        return false
    }

    if self.best == p {
        // new block update for active current best peer -> request hashes
        plog.Debugf("addPeer: <%s> already the best peer. Request new head section info from %s", id, hex(currentBlockHash))

        if previousBlockHash != nil {
            if entry := self.bp.get(previousBlockHash); entry != nil {
                p.headSectionC <- nil
                self.bp.activateChain(entry.section, p, nil)
                p.sections = append(p.sections, previousBlockHash)
            }
        }
        best = true
    } else {
        currentTD := ethutil.Big0
        if self.best != nil {
            currentTD = self.best.td
        }
        if td.Cmp(currentTD) > 0 {
            self.status.lock.Lock()
            self.status.bestPeers[p.id]++
            self.status.lock.Unlock()
            plog.Debugf("addPeer: peer <%v> promoted best peer", id)
            self.bp.switchPeer(self.best, p)
            self.best = p
            best = true
        }
    }
    return
}

// removePeer is called (via RemovePeer) by the eth protocol when the peer disconnects
func (self *peers) removePeer(id string) {
    self.lock.Lock()
    defer self.lock.Unlock()

    p, found := self.peers[id]
    if !found {
        return
    }

    delete(self.peers, id)
    plog.Debugf("addPeer: remove peer <%v>", id)

    // if current best peer is removed, need to find a better one
    if self.best == p {
        var newp *peer
        // FIXME: own TD
        max := ethutil.Big0
        // peer with the highest self-acclaimed TD is chosen
        for _, pp := range self.peers {
            if pp.td.Cmp(max) > 0 {
                max = pp.td
                newp = pp
            }
        }
        if newp != nil {
            self.status.lock.Lock()
            self.status.bestPeers[p.id]++
            self.status.lock.Unlock()
            plog.Debugf("addPeer: peer <%v> with td %v promoted best peer", newp.id, newp.td)
        } else {
            plog.Warnln("addPeer: no suitable peers found")
        }
        self.best = newp
        self.bp.switchPeer(p, newp)
    }
}

// switchPeer launches section processes
func (self *BlockPool) switchPeer(oldp, newp *peer) {

    // first quit AddBlockHashes, requestHeadSection and activateChain
    if oldp != nil {
        plog.DebugDetailf("<%s> quit peer processes", oldp.id)
        close(oldp.switchC)
    }
    if newp != nil {
        newp.idleC = make(chan bool)
        newp.switchC = make(chan bool)
        // if new best peer has no head section yet, create it and run it
        // otherwise head section is an element of peer.sections
        if newp.headSection == nil {
            plog.DebugDetailf("[%s] head section for [%s] not created, requesting info", newp.id, hex(newp.currentBlockHash))

            if newp.idle {
                self.wg.Add(1)
                newp.idle = false
                self.syncing()
            }

            go func() {
                newp.run()
                if !newp.idle {
                    self.wg.Done()
                    newp.idle = true
                }
            }()

        }

        var connected = make(map[string]*section)
        var sections [][]byte
        for _, hash := range newp.sections {
            plog.DebugDetailf("activate chain starting from section [%s]", hex(hash))
            // if section not connected (ie, top of a contiguous sequence of sections)
            if connected[string(hash)] == nil {
                // if not deleted, then reread from pool (it can be orphaned top half of a split section)
                if entry := self.get(hash); entry != nil {
                    self.activateChain(entry.section, newp, connected)
                    connected[string(hash)] = entry.section
                    sections = append(sections, hash)
                }
            }
        }
        plog.DebugDetailf("<%s> section processes (%v non-contiguous sequences, was %v before)", newp.id, len(sections), len(newp.sections))
        // need to lock now that newp is exposed to section processes
        newp.lock.Lock()
        newp.sections = sections
        newp.lock.Unlock()
    }
    // finally deactivate section process for sections where newp didnt activate
    // newp activating section process changes the quit channel for this reason
    if oldp != nil {
        plog.DebugDetailf("<%s> quit section processes", oldp.id)
        //
        close(oldp.idleC)
    }
}

func (self *peers) getPeer(id string) (p *peer, best bool) {
    self.lock.RLock()
    defer self.lock.RUnlock()
    if self.best != nil && self.best.id == id {
        return self.best, true
    }
    p = self.peers[id]
    return
}

func (self *peer) handleSection(sec *section) {
    self.lock.Lock()
    defer self.lock.Unlock()
    plog.DebugDetailf("HeadSection: <%s> (head: %s) head section received [%s]-[%s]", self.id, hex(self.currentBlockHash), sectionhex(self.headSection), sectionhex(sec))

    self.headSection = sec
    self.blockHashesRequestTimer = nil

    if sec == nil {
        if self.idle {
            self.idle = false
            self.bp.wg.Add(1)
            self.bp.syncing()
        }

        self.suicideC = time.After(self.bp.Config.BlockHashesTimeout)

        plog.DebugDetailf("HeadSection: <%s> head block hash changed (mined block received). New head %s", self.id, hex(self.currentBlockHash))
    } else {
        if !self.idle {
            self.idle = true
            self.bp.wg.Done()
        }
        plog.DebugDetailf("HeadSection: <%s> (head: %s) head section [%s] created", self.id, hex(self.currentBlockHash), sectionhex(sec))
        self.suicideC = time.After(self.bp.Config.IdleBestPeerTimeout)
    }
}

func (self *peer) getCurrentBlock(currentBlock *types.Block) {
    // called by update or after AddBlock signals that head block of current peer is received
    if currentBlock == nil {
        if entry := self.bp.get(self.currentBlockHash); entry != nil {
            entry.node.lock.Lock()
            currentBlock = entry.node.block
            entry.node.lock.Unlock()
        }
        if currentBlock != nil {
            plog.DebugDetailf("HeadSection: <%s> head block %s found in blockpool", self.id, hex(self.currentBlockHash))
        } else {
            plog.DebugDetailf("HeadSection: <%s> head block %s not found... requesting it", self.id, hex(self.currentBlockHash))
            self.requestBlocks([][]byte{self.currentBlockHash})
            self.blocksRequestTimer = time.After(self.bp.Config.BlocksRequestInterval)
            return
        }
    } else {
        plog.DebugDetailf("HeadSection: <%s> head block %s received (parent: %s)", self.id, hex(self.currentBlockHash), hex(currentBlock.ParentHash()))
    }

    self.lock.Lock()
    defer self.lock.Unlock()
    self.currentBlock = currentBlock
    self.parentHash = currentBlock.ParentHash()
    plog.DebugDetailf("HeadSection: <%s> head block %s found (parent: %s)... requesting  hashes", self.id, hex(self.currentBlockHash), hex(self.parentHash))
    self.blockHashesRequestTimer = time.After(0)
    self.blocksRequestTimer = nil
}

func (self *peer) getBlockHashes() {
    //if connecting parent is found
    if self.bp.hasBlock(self.parentHash) {
        plog.DebugDetailf("HeadSection: <%s> parent block %s found in blockchain", self.id, hex(self.parentHash))
        err := self.bp.insertChain(types.Blocks([]*types.Block{self.currentBlock}))

        self.bp.status.lock.Lock()
        self.bp.status.badPeers[self.id]++
        self.bp.status.values.BlocksInChain++
        self.bp.status.values.BlocksInPool--
        if err != nil {
            self.addError(ErrInvalidBlock, "%v", err)
            self.bp.status.badPeers[self.id]++
        } else {
            headKey := string(self.parentHash)
            height := self.bp.status.chain[headKey] + 1
            self.bp.status.chain[string(self.currentBlockHash)] = height
            if height > self.bp.status.values.LongestChain {
                self.bp.status.values.LongestChain = height
            }
            delete(self.bp.status.chain, headKey)
        }
        self.bp.status.lock.Unlock()

    } else {
        if parent := self.bp.get(self.parentHash); parent != nil {
            if self.bp.get(self.currentBlockHash) == nil {
                plog.DebugDetailf("HeadSection: <%s> connecting parent %s found in pool... creating singleton section", self.id, hex(self.parentHash))
                n := &node{
                    hash:    self.currentBlockHash,
                    block:   self.currentBlock,
                    hashBy:  self.id,
                    blockBy: self.id,
                }
                self.bp.newSection([]*node{n}).activate(self)
            } else {
                plog.DebugDetailf("HeadSection: <%s> connecting parent %s found in pool...head section [%s] exists...not requesting hashes", self.id, hex(self.parentHash), sectionhex(parent.section))
                self.bp.activateChain(parent.section, self, nil)
            }
        } else {
            plog.DebugDetailf("HeadSection: <%s> section [%s] requestBlockHashes", self.id, sectionhex(self.headSection))
            self.requestBlockHashes(self.currentBlockHash)
            self.blockHashesRequestTimer = time.After(self.bp.Config.BlockHashesRequestInterval)
            return
        }
    }
    self.blockHashesRequestTimer = nil
    if !self.idle {
        self.idle = true
        self.suicideC = nil
        self.bp.wg.Done()
    }
}

// main loop for head section process
func (self *peer) run() {

    self.lock.RLock()
    switchC := self.switchC
    self.lock.RUnlock()

    self.blockHashesRequestTimer = nil

    self.blocksRequestTimer = time.After(0)
    self.suicideC = time.After(self.bp.Config.BlockHashesTimeout)

    var quit <-chan time.Time

    var ping = time.NewTicker(5 * time.Second)

LOOP:
    for {
        select {
        // to minitor section process behaviou
        case <-ping.C:
            plog.Debugf("HeadSection: <%s> section with head %s, idle: %v", self.id, hex(self.currentBlockHash), self.idle)

        // signal from AddBlockHashes that head section for current best peer is created
        // if sec == nil, it signals that chain info has updated (new block message)
        case sec := <-self.headSectionC:
            self.handleSection(sec)
            if sec == nil {
                plog.Debugf("HeadSection: <%s> (headsection [%s], received: [%s]) quit channel set to nil, catchup happening", self.id, sectionhex(self.headSection), sectionhex(sec))
                quit = nil
            } else {
                plog.Debugf("HeadSection: <%s> (headsection [%s], received: [%s]) quit channel set to go off in IdleBestPeerTimeout", self.id, sectionhex(self.headSection), sectionhex(sec))
                quit = time.After(self.bp.Config.IdleBestPeerTimeout)
            }

        // periodic check for block hashes or parent block/section
        case <-self.blockHashesRequestTimer:
            self.getBlockHashes()

        // signal from AddBlock that head block of current best peer has been received
        case currentBlock := <-self.currentBlockC:
            self.getCurrentBlock(currentBlock)

        // keep requesting until found or timed out
        case <-self.blocksRequestTimer:
            self.getCurrentBlock(nil)

        // quitting on timeout
        case <-self.suicideC:
            self.peerError(self.bp.peers.errors.New(ErrInsufficientChainInfo, "timed out without providing block hashes or head block (td: %v, head: %s)", self.td, hex(self.currentBlockHash)))

            self.bp.status.lock.Lock()
            self.bp.status.badPeers[self.id]++
            self.bp.status.lock.Unlock()
            // there is no persistence here, so GC will just take care of cleaning up
            break LOOP

        // signal for peer switch, quit
        case <-switchC:
            var complete = "incomplete "
            if self.idle {
                complete = "complete"
            }
            plog.Debugf("HeadSection: <%s> section with head %s %s... quit request loop due to peer switch", self.id, hex(self.currentBlockHash), complete)
            break LOOP

        // global quit for blockpool
        case <-self.bp.quit:
            break LOOP

        // quit
        case <-quit:
            self.peerError(self.bp.peers.errors.New(ErrIdleTooLong, "timed out without providing new blocks (td: %v, head: %s)...quitting", self.td, self.currentBlockHash))

            self.bp.status.lock.Lock()
            self.bp.status.badPeers[self.id]++
            self.bp.status.lock.Unlock()
            plog.Debugf("HeadSection: <%s> (headsection [%s]) quit channel closed : timed out without providing new blocks...quitting", self.id, sectionhex(self.headSection))
            break LOOP
        }
    }
    if !self.idle {
        self.idle = true
        self.bp.wg.Done()
    }
}