aboutsummaryrefslogblamecommitdiffstats
path: root/dex/sync.go
blob: b6a8035d4b20aa4187ac86f8097e1b9ca2102f6a (plain) (tree)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16















                                                                                  
           





                     

                                                      
                                                          
                                               
                                                     








                                                                                                               


                                                                                 




























                                                                                        
                                                    































































                                                                                                       























































































                                                                                                        





















































































                                                                                                     
// Copyright 2015 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

package dex

import (
    "math/rand"
    "sync/atomic"
    "time"

    "github.com/dexon-foundation/dexon/common"
    "github.com/dexon-foundation/dexon/core/types"
    "github.com/dexon-foundation/dexon/dex/downloader"
    "github.com/dexon-foundation/dexon/log"
    "github.com/dexon-foundation/dexon/p2p/enode"
)

const (
    forceSyncCycle      = 10 * time.Second // Time interval to force syncs, even if few peers are available
    minDesiredPeerCount = 5                // Amount of peers desired to start syncing

    // This is the target size for the packs of transactions sent by txsyncLoop.
    // A pack can get larger than this if a single transactions exceeds this size.
    txsyncPackSize = 100 * 1024

    // This is the target number for the packs of metas sent by metasyncLoop.
    metasyncPackNum = 1024
)

type txsync struct {
    p   *peer
    txs []*types.Transaction
}

// syncTransactions starts sending all currently pending transactions to the given peer.
func (pm *ProtocolManager) syncTransactions(p *peer) {
    var txs types.Transactions
    pending, _ := pm.txpool.Pending()
    for _, batch := range pending {
        txs = append(txs, batch...)
    }
    if len(txs) == 0 {
        return
    }
    select {
    case pm.txsyncCh <- &txsync{p, txs}:
    case <-pm.quitSync:
    }
}

// txsyncLoop takes care of the initial transaction sync for each new
// connection. When a new peer appears, we relay all currently pending
// transactions. In order to minimise egress bandwidth usage, we send
// the transactions in small packs to one peer at a time.
func (pm *ProtocolManager) txsyncLoop() {
    var (
        pending = make(map[enode.ID]*txsync)
        sending = false               // whether a send is active
        pack    = new(txsync)         // the pack that is being sent
        done    = make(chan error, 1) // result of the send
    )

    // send starts a sending a pack of transactions from the sync.
    send := func(s *txsync) {
        // Fill pack with transactions up to the target size.
        size := common.StorageSize(0)
        pack.p = s.p
        pack.txs = pack.txs[:0]
        for i := 0; i < len(s.txs) && size < txsyncPackSize; i++ {
            pack.txs = append(pack.txs, s.txs[i])
            size += s.txs[i].Size()
        }
        // Remove the transactions that will be sent.
        s.txs = s.txs[:copy(s.txs, s.txs[len(pack.txs):])]
        if len(s.txs) == 0 {
            delete(pending, s.p.ID())
        }
        // Send the pack in the background.
        s.p.Log().Trace("Sending batch of transactions", "count", len(pack.txs), "bytes", size)
        sending = true
        go func() { done <- pack.p.SendTransactions(pack.txs) }()
    }

    // pick chooses the next pending sync.
    pick := func() *txsync {
        if len(pending) == 0 {
            return nil
        }
        n := rand.Intn(len(pending)) + 1
        for _, s := range pending {
            if n--; n == 0 {
                return s
            }
        }
        return nil
    }

    for {
        select {
        case s := <-pm.txsyncCh:
            pending[s.p.ID()] = s
            if !sending {
                send(s)
            }
        case err := <-done:
            sending = false
            // Stop tracking peers that cause send failures.
            if err != nil {
                pack.p.Log().Debug("Transaction send failed", "err", err)
                delete(pending, pack.p.ID())
            }
            // Schedule the next send.
            if s := pick(); s != nil {
                send(s)
            }
        case <-pm.quitSync:
            return
        }
    }
}

type metasync struct {
    p     *peer
    metas []*NodeMeta
}

// syncNodeMetas starts sending all node metas to the given peer.
func (pm *ProtocolManager) syncNodeMetas(p *peer) {
    metas := pm.nodeTable.Metas()
    if len(metas) == 0 {
        return
    }
    select {
    case pm.metasyncCh <- &metasync{p, metas}:
    case <-pm.quitSync:
    }
}

// metasyncLoop takes care of the initial node meta sync for each new
// connection. When a new peer appears, we relay all currently node metas.
// In order to minimise egress bandwidth usage, we send
// the metas in small packs to one peer at a time.
func (pm *ProtocolManager) metasyncLoop() {
    var (
        pending = make(map[enode.ID]*metasync)
        sending = false               // whether a send is active
        pack    = new(metasync)       // the pack that is being sent
        done    = make(chan error, 1) // result of the send
    )

    // send starts a sending a pack of transactions from the sync.
    send := func(s *metasync) {
        // Fill pack with node metas up to the target num.
        var num int
        pack.p = s.p
        pack.metas = pack.metas[:0]
        for i := 0; i < len(s.metas) && num < metasyncPackNum; i++ {
            pack.metas = append(pack.metas, s.metas[i])
            num += 1
        }
        // Remove the metas that will be sent.
        s.metas = s.metas[:copy(s.metas, s.metas[len(pack.metas):])]
        if len(s.metas) == 0 {
            delete(pending, s.p.ID())
        }
        // Send the pack in the background.
        s.p.Log().Trace("Sending batch of transactions", "count", len(pack.metas), "bytes", num)
        sending = true
        go func() { done <- pack.p.SendNodeMetas(pack.metas) }()
    }

    // pick chooses the next pending sync.
    pick := func() *metasync {
        if len(pending) == 0 {
            return nil
        }
        n := rand.Intn(len(pending)) + 1
        for _, s := range pending {
            if n--; n == 0 {
                return s
            }
        }
        return nil
    }

    for {
        select {
        case s := <-pm.metasyncCh:
            pending[s.p.ID()] = s
            if !sending {
                send(s)
            }
        case err := <-done:
            sending = false
            // Stop tracking peers that cause send failures.
            if err != nil {
                pack.p.Log().Debug("NodeMeta send failed", "err", err)
                delete(pending, pack.p.ID())
            }
            // Schedule the next send.
            if s := pick(); s != nil {
                send(s)
            }
        case <-pm.quitSync:
            return
        }
    }
}

// syncer is responsible for periodically synchronising with the network, both
// downloading hashes and blocks as well as handling the announcement handler.
func (pm *ProtocolManager) syncer() {
    // Start and ensure cleanup of sync mechanisms
    pm.fetcher.Start()
    defer pm.fetcher.Stop()
    defer pm.downloader.Terminate()

    // Wait for different events to fire synchronisation operations
    forceSync := time.NewTicker(forceSyncCycle)
    defer forceSync.Stop()

    for {
        select {
        case <-pm.newPeerCh:
            // Make sure we have peers to select from, then sync
            if pm.peers.Len() < minDesiredPeerCount {
                break
            }
            go pm.synchronise(pm.peers.BestPeer())

        case <-forceSync.C:
            // Force a sync even if not enough peers are present
            go pm.synchronise(pm.peers.BestPeer())

        case <-pm.noMorePeers:
            return
        }
    }
}

// synchronise tries to sync up our local block chain with a remote peer.
func (pm *ProtocolManager) synchronise(peer *peer) {
    // Short circuit if no peers are available
    if peer == nil {
        return
    }
    // Make sure the peer's TD is higher than our own
    currentBlock := pm.blockchain.CurrentBlock()
    td := pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64())

    pHead, pTd := peer.Head()
    if pTd.Cmp(td) <= 0 {
        return
    }
    // Otherwise try to sync with the downloader
    mode := downloader.FullSync
    if atomic.LoadUint32(&pm.fastSync) == 1 {
        // Fast sync was explicitly requested, and explicitly granted
        mode = downloader.FastSync
    } else if currentBlock.NumberU64() == 0 && pm.blockchain.CurrentFastBlock().NumberU64() > 0 {
        // The database seems empty as the current block is the genesis. Yet the fast
        // block is ahead, so fast sync was enabled for this node at a certain point.
        // The only scenario where this can happen is if the user manually (or via a
        // bad block) rolled back a fast sync node below the sync point. In this case
        // however it's safe to reenable fast sync.
        atomic.StoreUint32(&pm.fastSync, 1)
        mode = downloader.FastSync
    }

    if mode == downloader.FastSync {
        // Make sure the peer's total difficulty we are synchronizing is higher.
        if pm.blockchain.GetTdByHash(pm.blockchain.CurrentFastBlock().Hash()).Cmp(pTd) >= 0 {
            return
        }
    }

    // Run the sync cycle, and disable fast sync if we've went past the pivot block
    if err := pm.downloader.Synchronise(peer.id, pHead, pTd, mode); err != nil {
        return
    }
    if atomic.LoadUint32(&pm.fastSync) == 1 {
        log.Info("Fast sync complete, auto disabling")
        atomic.StoreUint32(&pm.fastSync, 0)
    }
    atomic.StoreUint32(&pm.acceptTxs, 1) // Mark initial sync done
    if head := pm.blockchain.CurrentBlock(); head.NumberU64() > 0 {
        // We've completed a sync cycle, notify all peers of new state. This path is
        // essential in star-topology networks where a gateway node needs to notify
        // all its out-of-date peers of the availability of a new block. This failure
        // scenario will most often crop up in private and hackathon networks with
        // degenerate connectivity, but it should be healthy for the mainnet too to
        // more reliably update peers or the local TD state.
        go pm.BroadcastBlock(head, false)
    }
}