aboutsummaryrefslogblamecommitdiffstats
path: root/eth/sync.go
blob: 82abb725f90cfa14034f6d3048a4a34acd9fb6f0 (plain) (tree)
1
2
3
4
5
6
7
8
9
10


           
                   

              
                                                
                                                    

                                                     
                                                      

 
       

                                                                                                               



                                                                                      

 







                                                                               























































































                                                                                                              
                                                                              
                                                                              
                                     


                                                      
                                       
 
                                                                       
                                              


                                    

                                                                            

                                     
                                                              
 

                                                                            

                                                              
                                   
                              



                 
                                                                         
                                                    

                                                  

                      
                                                                       
                                                 

                      

                                                       
 
package eth

import (
    "math/rand"
    "time"

    "github.com/ethereum/go-ethereum/common"
    "github.com/ethereum/go-ethereum/core/types"
    "github.com/ethereum/go-ethereum/logger"
    "github.com/ethereum/go-ethereum/logger/glog"
    "github.com/ethereum/go-ethereum/p2p/discover"
)

const (
    forceSyncCycle      = 10 * time.Second // Time interval to force syncs, even if few peers are available
    minDesiredPeerCount = 5                // Amount of peers desired to start syncing

    // This is the target size for the packs of transactions sent by txsyncLoop.
    // A pack can get larger than this if a single transactions exceeds this size.
    txsyncPackSize = 100 * 1024
)

// blockAnnounce is the hash notification of the availability of a new block in
// the network.
type blockAnnounce struct {
    hash common.Hash
    peer *peer
    time time.Time
}

type txsync struct {
    p   *peer
    txs []*types.Transaction
}

// syncTransactions starts sending all currently pending transactions to the given peer.
func (pm *ProtocolManager) syncTransactions(p *peer) {
    txs := pm.txpool.GetTransactions()
    if len(txs) == 0 {
        return
    }
    select {
    case pm.txsyncCh <- &txsync{p, txs}:
    case <-pm.quitSync:
    }
}

// txsyncLoop takes care of the initial transaction sync for each new
// connection. When a new peer appears, we relay all currently pending
// transactions. In order to minimise egress bandwidth usage, we send
// the transactions in small packs to one peer at a time.
func (pm *ProtocolManager) txsyncLoop() {
    var (
        pending = make(map[discover.NodeID]*txsync)
        sending = false               // whether a send is active
        pack    = new(txsync)         // the pack that is being sent
        done    = make(chan error, 1) // result of the send
    )

    // send starts a sending a pack of transactions from the sync.
    send := func(s *txsync) {
        // Fill pack with transactions up to the target size.
        size := common.StorageSize(0)
        pack.p = s.p
        pack.txs = pack.txs[:0]
        for i := 0; i < len(s.txs) && size < txsyncPackSize; i++ {
            pack.txs = append(pack.txs, s.txs[i])
            size += s.txs[i].Size()
        }
        // Remove the transactions that will be sent.
        s.txs = s.txs[:copy(s.txs, s.txs[len(pack.txs):])]
        if len(s.txs) == 0 {
            delete(pending, s.p.ID())
        }
        // Send the pack in the background.
        glog.V(logger.Detail).Infof("%v: sending %d transactions (%v)", s.p.Peer, len(pack.txs), size)
        sending = true
        go func() { done <- pack.p.sendTransactions(pack.txs) }()
    }

    // pick chooses the next pending sync.
    pick := func() *txsync {
        if len(pending) == 0 {
            return nil
        }
        n := rand.Intn(len(pending)) + 1
        for _, s := range pending {
            if n--; n == 0 {
                return s
            }
        }
        return nil
    }

    for {
        select {
        case s := <-pm.txsyncCh:
            pending[s.p.ID()] = s
            if !sending {
                send(s)
            }
        case err := <-done:
            sending = false
            // Stop tracking peers that cause send failures.
            if err != nil {
                glog.V(logger.Debug).Infof("%v: tx send failed: %v", pack.p.Peer, err)
                delete(pending, pack.p.ID())
            }
            // Schedule the next send.
            if s := pick(); s != nil {
                send(s)
            }
        case <-pm.quitSync:
            return
        }
    }
}

// syncer is responsible for periodically synchronising with the network, both
// downloading hashes and blocks as well as handling the announcement handler.
func (pm *ProtocolManager) syncer() {
    // Start and ensure cleanup of sync mechanisms
    pm.fetcher.Start()
    defer pm.fetcher.Stop()
    defer pm.downloader.Terminate()

    // Wait for different events to fire synchronisation operations
    forceSync := time.Tick(forceSyncCycle)
    for {
        select {
        case <-pm.newPeerCh:
            // Make sure we have peers to select from, then sync
            if pm.peers.Len() < minDesiredPeerCount {
                break
            }
            go pm.synchronise(pm.peers.BestPeer())

        case <-forceSync:
            // Force a sync even if not enough peers are present
            go pm.synchronise(pm.peers.BestPeer())

        case <-pm.quitSync:
            return
        }
    }
}

// synchronise tries to sync up our local block chain with a remote peer.
func (pm *ProtocolManager) synchronise(peer *peer) {
    // Short circuit if no peers are available
    if peer == nil {
        return
    }
    // Make sure the peer's TD is higher than our own. If not drop.
    if peer.Td().Cmp(pm.chainman.Td()) <= 0 {
        return
    }
    // Otherwise try to sync with the downloader
    pm.downloader.Synchronise(peer.id, peer.Head())
}