aboutsummaryrefslogblamecommitdiffstats
path: root/dex/sync.go
blob: 927c04bc3654aec8ab7cac216ed2ae7731a69f89 (plain) (tree)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16















                                                                                  
           





                     

                                                      
                                                          
                                               
                                                     
                                                   





                                                                                                               



                                                                            


                                                                                      
 

                                                                                     




























                                                                                        
                                                    































































                                                                                                       


                             

 




                                                                     


                      
                                                        



                           

                                                                            
                                                       

                                                    
             
                                                        
                                                                         
                                                                            



                                                                      

                                                                    

                            


                                                                                

                                


                                                                                      


                                                   
                                                                                                     
                              
                                                                            


                                              
                                    













                                                
                                            







                                                                        
                                                                                    











                                                            




































                                                                              
                                                             
                                                    
                                          
 

                                     


                                                                                  


















                                                                                                     
                                                                            




                                                                                       
                                                                                        












                                                                                             
                                                                        


                                                 
// Copyright 2015 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

package dex

import (
    "math/rand"
    "sync/atomic"
    "time"

    "github.com/dexon-foundation/dexon/common"
    "github.com/dexon-foundation/dexon/core/types"
    "github.com/dexon-foundation/dexon/dex/downloader"
    "github.com/dexon-foundation/dexon/log"
    "github.com/dexon-foundation/dexon/p2p/enode"
    "github.com/dexon-foundation/dexon/p2p/enr"
)

const (
    forceSyncCycle      = 10 * time.Second // Time interval to force syncs, even if few peers are available
    minDesiredPeerCount = 5                // Amount of peers desired to start syncing

    // The distance between us and peer that we can accept.
    // This distance is related to numChains and lambdaBA dexcon config.
    acceptableDist = 16

    // This is the target size for the packs of transactions sent by txsyncLoop.
    // A pack can get larger than this if a single transactions exceeds this size.
    txsyncPackSize = 100 * 1024

    // This is the target number for the packs of records sent by recordsyncLoop.
    recordsyncPackNum = 1024
)

type txsync struct {
    p   *peer
    txs []*types.Transaction
}

// syncTransactions starts sending all currently pending transactions to the given peer.
func (pm *ProtocolManager) syncTransactions(p *peer) {
    var txs types.Transactions
    pending, _ := pm.txpool.Pending()
    for _, batch := range pending {
        txs = append(txs, batch...)
    }
    if len(txs) == 0 {
        return
    }
    select {
    case pm.txsyncCh <- &txsync{p, txs}:
    case <-pm.quitSync:
    }
}

// txsyncLoop takes care of the initial transaction sync for each new
// connection. When a new peer appears, we relay all currently pending
// transactions. In order to minimise egress bandwidth usage, we send
// the transactions in small packs to one peer at a time.
func (pm *ProtocolManager) txsyncLoop() {
    var (
        pending = make(map[enode.ID]*txsync)
        sending = false               // whether a send is active
        pack    = new(txsync)         // the pack that is being sent
        done    = make(chan error, 1) // result of the send
    )

    // send starts a sending a pack of transactions from the sync.
    send := func(s *txsync) {
        // Fill pack with transactions up to the target size.
        size := common.StorageSize(0)
        pack.p = s.p
        pack.txs = pack.txs[:0]
        for i := 0; i < len(s.txs) && size < txsyncPackSize; i++ {
            pack.txs = append(pack.txs, s.txs[i])
            size += s.txs[i].Size()
        }
        // Remove the transactions that will be sent.
        s.txs = s.txs[:copy(s.txs, s.txs[len(pack.txs):])]
        if len(s.txs) == 0 {
            delete(pending, s.p.ID())
        }
        // Send the pack in the background.
        s.p.Log().Trace("Sending batch of transactions", "count", len(pack.txs), "bytes", size)
        sending = true
        go func() { done <- pack.p.SendTransactions(pack.txs) }()
    }

    // pick chooses the next pending sync.
    pick := func() *txsync {
        if len(pending) == 0 {
            return nil
        }
        n := rand.Intn(len(pending)) + 1
        for _, s := range pending {
            if n--; n == 0 {
                return s
            }
        }
        return nil
    }

    for {
        select {
        case s := <-pm.txsyncCh:
            pending[s.p.ID()] = s
            if !sending {
                send(s)
            }
        case err := <-done:
            sending = false
            // Stop tracking peers that cause send failures.
            if err != nil {
                pack.p.Log().Debug("Transaction send failed", "err", err)
                delete(pending, pack.p.ID())
            }
            // Schedule the next send.
            if s := pick(); s != nil {
                send(s)
            }
        case <-pm.quitSync:
            return
        }
    }
}

type recordsync struct {
    p       *peer
    records []*enr.Record
}

// syncNodeRecords starts sending all node records to the given peer.
func (pm *ProtocolManager) syncNodeRecords(p *peer) {
    records := pm.nodeTable.Records()
    p.Log().Debug("Sync node records", "num", len(records))
    if len(records) == 0 {
        return
    }
    select {
    case pm.recordsyncCh <- &recordsync{p, records}:
    case <-pm.quitSync:
    }
}

// recordsyncLoop takes care of the initial node record sync for each new
// connection. When a new peer appears, we relay all currently node records.
// In order to minimise egress bandwidth usage, we send
// the records in small packs to one peer at a time.
func (pm *ProtocolManager) recordsyncLoop() {
    var (
        pending = make(map[enode.ID]*recordsync)
        sending = false               // whether a send is active
        pack    = new(recordsync)     // the pack that is being sent
        done    = make(chan error, 1) // result of the send
    )

    // send starts a sending a pack of transactions from the sync.
    send := func(s *recordsync) {
        // Fill pack with node records up to the target num.
        var num int
        pack.p = s.p
        pack.records = pack.records[:0]
        for i := 0; i < len(s.records) && num < recordsyncPackNum; i++ {
            pack.records = append(pack.records, s.records[i])
            num += 1
        }
        // Remove the records that will be sent.
        s.records = s.records[:copy(s.records, s.records[len(pack.records):])]
        if len(s.records) == 0 {
            delete(pending, s.p.ID())
        }
        // Send the pack in the background.
        s.p.Log().Trace("Sending batch of records", "count", len(pack.records), "bytes", num)
        sending = true
        go func() { done <- pack.p.SendNodeRecords(pack.records) }()
    }

    // pick chooses the next pending sync.
    pick := func() *recordsync {
        if len(pending) == 0 {
            return nil
        }
        n := rand.Intn(len(pending)) + 1
        for _, s := range pending {
            if n--; n == 0 {
                return s
            }
        }
        return nil
    }

    for {
        select {
        case s := <-pm.recordsyncCh:
            pending[s.p.ID()] = s
            if !sending {
                send(s)
            }
        case err := <-done:
            sending = false
            // Stop tracking peers that cause send failures.
            if err != nil {
                pack.p.Log().Debug("Record send failed", "err", err)
                delete(pending, pack.p.ID())
            }
            // Schedule the next send.
            if s := pick(); s != nil {
                send(s)
            }
        case <-pm.quitSync:
            return
        }
    }
}

// syncer is responsible for periodically synchronising with the network, both
// downloading hashes and blocks as well as handling the announcement handler.
func (pm *ProtocolManager) syncer() {
    // Start and ensure cleanup of sync mechanisms
    pm.fetcher.Start()
    defer pm.fetcher.Stop()
    defer pm.downloader.Terminate()

    // Wait for different events to fire synchronisation operations
    forceSync := time.NewTicker(forceSyncCycle)
    defer forceSync.Stop()

    for {
        select {
        case <-pm.newPeerCh:
            // Make sure we have peers to select from, then sync
            if pm.peers.Len() < minDesiredPeerCount {
                break
            }
            go pm.synchronise(pm.peers.BestPeer())

        case <-forceSync.C:
            // Force a sync even if not enough peers are present
            go pm.synchronise(pm.peers.BestPeer())

        case <-pm.noMorePeers:
            return
        }
    }
}

// synchronise tries to sync up our local block chain with a remote peer.
func (pm *ProtocolManager) synchronise(peer *peer) {
    // Short circuit if no peers are available
    if peer == nil {
        return
    }
    // Make sure the peer's number is higher than our own
    currentBlock := pm.blockchain.CurrentBlock()
    number := currentBlock.NumberU64()

    pHead, pNumber := peer.Head()

    // If we are behind the peer, but not more than acceptable distance, don't
    // trigger a sync. Fetcher is able to cover this.
    if pNumber <= number+acceptableDist {
        return
    }
    // Otherwise try to sync with the downloader
    mode := downloader.FullSync
    if atomic.LoadUint32(&pm.fastSync) == 1 {
        // Fast sync was explicitly requested, and explicitly granted
        mode = downloader.FastSync
    } else if currentBlock.NumberU64() == 0 && pm.blockchain.CurrentFastBlock().NumberU64() > 0 {
        // The database seems empty as the current block is the genesis. Yet the fast
        // block is ahead, so fast sync was enabled for this node at a certain point.
        // The only scenario where this can happen is if the user manually (or via a
        // bad block) rolled back a fast sync node below the sync point. In this case
        // however it's safe to reenable fast sync.
        atomic.StoreUint32(&pm.fastSync, 1)
        mode = downloader.FastSync
    }

    if mode == downloader.FastSync {
        // Make sure the peer's total difficulty we are synchronizing is higher.
        if pm.blockchain.CurrentFastBlock().NumberU64() >= pNumber {
            return
        }
    }

    // Run the sync cycle, and disable fast sync if we've went past the pivot block
    if err := pm.downloader.Synchronise(peer.id, pHead, pNumber, mode); err != nil {
        return
    }
    if atomic.LoadUint32(&pm.fastSync) == 1 {
        log.Info("Fast sync complete, auto disabling")
        atomic.StoreUint32(&pm.fastSync, 0)
    }
    atomic.StoreUint32(&pm.acceptTxs, 1) // Mark initial sync done
    if head := pm.blockchain.CurrentBlock(); head.NumberU64() > 0 {
        // We've completed a sync cycle, notify all peers of new state. This path is
        // essential in star-topology networks where a gateway node needs to notify
        // all its out-of-date peers of the availability of a new block. This failure
        // scenario will most often crop up in private and hackathon networks with
        // degenerate connectivity, but it should be healthy for the mainnet too to
        // more reliably update peers or the local number state.
        go pm.BroadcastBlock(head, false)
    }
}