aboutsummaryrefslogblamecommitdiffstats
path: root/core/test/network.go
blob: 207d8a47f20be3f16dbb463b9ab6157d196b3f43 (plain) (tree)
1
2
3
4
5
6
7
8
9

                                                    
  
                                                                        

                                                                               

                                         
                                                                         


                                                                          
  
                                                                           
                                                      

                                  
            

        
                 
                       
                
             
             

                 
              
              
 

                                                                 

                                                                             
                                                                

 
       



                                                              

                                              
 
                     
       


                                                     

 




                                                         

 

































































                                                                                  

                                                                      


















                                                                                



                                                               




                                                                            

 



                                                                   
                                            
                     








                                                                           

                                                                                        

                                                                             


                                                                     






                                                                                   
                
                                                                          

              
 
 

                                                    
                                    

 

                                                 
                                

 
                                                   
                                                   

                                                                                 
                              


                                                    
                                                      

                                                    

                                                                                    
                                

 
                                                              
                                           
                                             

                                          










                                                                      





                                                               
                                            
                                                  

                                           










                                                                      




                                                             
                                                         
                                      
                                                                 
                                                                             



                          
                                                              
                                           
                                          
                                                               

 
                                                                  
                                               
                                          
                                                       

 
                                                 
                                                    


                            

                                                                 
                                



                                                                             
                                                         
                             

                                                                   
                                                                           



                       




                                                        


              
                                                     

                                    
                          

                                                                        
                        



                                                                             
                         
                                                          
                   
                                  


                                          
                                  
                                                                  
                                                                   
                                  







                                                                                              

                                         
                
                             
         

 








































                                                                                                      


                         


                                    
                                  



                                    
                                  

                                                
                                          
                         
                                           



                 

                                       


                            

                       





                                              

                                                     


                                  

                                              
                              
 








                                                                          




                                                                    




                                                                   


                                                            
                                                                              

                         
 





                                                                                










                                                                       
                                        

                                                  
                                                      









                                                     


                                



































                                                                                 
                                                           























                                                                            
                                        





















                                                                       













                                                           




























































                                                                                       
// Copyright 2018 The dexon-consensus Authors
// This file is part of the dexon-consensus library.
//
// The dexon-consensus library is free software: you can redistribute it
// and/or modify it under the terms of the GNU Lesser General Public License as
// published by the Free Software Foundation, either version 3 of the License,
// or (at your option) any later version.
//
// The dexon-consensus library is distributed in the hope that it will be
// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
// General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the dexon-consensus library. If not, see
// <http://www.gnu.org/licenses/>.

package test

import (
    "context"
    "encoding/json"
    "errors"
    "fmt"
    "log"
    "net"
    "strconv"
    "sync"
    "time"

    "github.com/dexon-foundation/dexon-consensus/common"
    "github.com/dexon-foundation/dexon-consensus/core/crypto"
    "github.com/dexon-foundation/dexon-consensus/core/types"
    typesDKG "github.com/dexon-foundation/dexon-consensus/core/types/dkg"
    "github.com/dexon-foundation/dexon-consensus/core/utils"
)

const (
    // Count of maximum count of peers to pull votes from.
    maxPullingPeerCount = 3
)

// NetworkType is the simulation network type.
type NetworkType string

// NetworkType enums.
const (
    NetworkTypeTCP      NetworkType = "tcp"
    NetworkTypeTCPLocal NetworkType = "tcp-local"
    NetworkTypeFake     NetworkType = "fake"
)

// NetworkConfig is the configuration for Network module.
type NetworkConfig struct {
    Type       NetworkType
    PeerServer string
    PeerPort   int
}

// PullRequest is a generic request to pull everything (ex. vote, block...).
type PullRequest struct {
    Requester types.NodeID
    Type      string
    Identity  interface{}
}

// MarshalJSON implements json.Marshaller.
func (req *PullRequest) MarshalJSON() (b []byte, err error) {
    var idAsBytes []byte
    // Make sure caller prepare correct identity for pull requests.
    switch req.Type {
    case "block":
        idAsBytes, err = json.Marshal(req.Identity.(common.Hashes))
    case "vote":
        idAsBytes, err = json.Marshal(req.Identity.(types.Position))
    default:
        err = fmt.Errorf("unknown ID type for pull request: %v", req.Type)
    }
    if err != nil {
        return
    }
    b, err = json.Marshal(&struct {
        Requester types.NodeID `json:"req"`
        Type      string       `json:"type"`
        Identity  []byte       `json:"id"`
    }{req.Requester, req.Type, idAsBytes})
    return
}

// UnmarshalJSON iumplements json.Unmarshaller.
func (req *PullRequest) UnmarshalJSON(data []byte) (err error) {
    rawReq := &struct {
        Requester types.NodeID `json:"req"`
        Type      string       `json:"type"`
        Identity  []byte       `json:"id"`
    }{}
    if err = json.Unmarshal(data, rawReq); err != nil {
        return
    }
    var ID interface{}
    switch rawReq.Type {
    case "block":
        hashes := common.Hashes{}
        if err = json.Unmarshal(rawReq.Identity, &hashes); err != nil {
            break
        }
        ID = hashes
    case "vote":
        pos := types.Position{}
        if err = json.Unmarshal(rawReq.Identity, &pos); err != nil {
            break
        }
        ID = pos
    default:
        err = fmt.Errorf("unknown pull request type: %v", rawReq.Type)
    }
    if err != nil {
        return
    }
    req.Requester = rawReq.Requester
    req.Type = rawReq.Type
    req.Identity = ID
    return
}

// Network implements core.Network interface based on TransportClient.
type Network struct {
    ID                   types.NodeID
    config               NetworkConfig
    ctx                  context.Context
    ctxCancel            context.CancelFunc
    trans                TransportClient
    fromTransport        <-chan *TransportEnvelope
    toConsensus          chan interface{}
    toNode               chan interface{}
    sentRandomnessLock   sync.Mutex
    sentRandomness       map[common.Hash]struct{}
    sentAgreementLock    sync.Mutex
    sentAgreement        map[common.Hash]struct{}
    blockCacheLock       sync.RWMutex
    blockCache           map[common.Hash]*types.Block
    voteCacheLock        sync.RWMutex
    voteCache            map[types.Position]map[types.VoteHeader]*types.Vote
    voteCacheSize        int
    votePositions        []types.Position
    stateModule          *State
    peers                map[types.NodeID]struct{}
    unreceivedBlocksLock sync.RWMutex
    unreceivedBlocks     map[common.Hash]chan<- common.Hash
    latencyModel         LatencyModel
    cache                *utils.NodeSetCache
    notarySetCachesLock  sync.Mutex
    notarySetCaches      map[uint64]map[uint32]map[types.NodeID]struct{}
    dkgSetCachesLock     sync.Mutex
    dkgSetCaches         map[uint64]map[types.NodeID]struct{}
}

// NewNetwork setup network stuffs for nodes, which provides an
// implementation of core.Network based on TransportClient.
func NewNetwork(pubKey crypto.PublicKey, latency LatencyModel,
    marshaller Marshaller, config NetworkConfig) (n *Network) {
    // Construct basic network instance.
    n = &Network{
        ID:               types.NewNodeID(pubKey),
        config:           config,
        toConsensus:      make(chan interface{}, 1000),
        toNode:           make(chan interface{}, 1000),
        sentRandomness:   make(map[common.Hash]struct{}),
        sentAgreement:    make(map[common.Hash]struct{}),
        blockCache:       make(map[common.Hash]*types.Block),
        unreceivedBlocks: make(map[common.Hash]chan<- common.Hash),
        latencyModel:     latency,
        notarySetCaches:  make(map[uint64]map[uint32]map[types.NodeID]struct{}),
        dkgSetCaches:     make(map[uint64]map[types.NodeID]struct{}),
        voteCache: make(
            map[types.Position]map[types.VoteHeader]*types.Vote),
    }
    n.ctx, n.ctxCancel = context.WithCancel(context.Background())
    // Construct transport layer.
    switch config.Type {
    case NetworkTypeTCPLocal:
        n.trans = NewTCPTransportClient(pubKey, latency, marshaller, true)
    case NetworkTypeTCP:
        n.trans = NewTCPTransportClient(pubKey, latency, marshaller, false)
    case NetworkTypeFake:
        n.trans = NewFakeTransportClient(pubKey, latency)
    default:
        panic(fmt.Errorf("unknown network type: %v", config.Type))
    }
    return
}

// PullBlocks implements core.Network interface.
func (n *Network) PullBlocks(hashes common.Hashes) {
    go n.pullBlocksAsync(hashes)
}

// PullVotes implements core.Network interface.
func (n *Network) PullVotes(pos types.Position) {
    go n.pullVotesAsync(pos)
}

// BroadcastVote implements core.Network interface.
func (n *Network) BroadcastVote(vote *types.Vote) {
    n.broadcastToSet(
        n.getNotarySet(vote.Position.Round, vote.Position.ChainID), vote)
    n.addVoteToCache(vote)
}

// BroadcastBlock implements core.Network interface.
func (n *Network) BroadcastBlock(block *types.Block) {
    // Avoid data race in fake transport.
    block = n.cloneForFake(block).(*types.Block)
    n.broadcastToSet(
        n.getNotarySet(block.Position.Round, block.Position.ChainID), block)
    n.addBlockToCache(block)
}

// BroadcastAgreementResult implements core.Network interface.
func (n *Network) BroadcastAgreementResult(
    randRequest *types.AgreementResult) {
    n.sentAgreementLock.Lock()
    defer n.sentAgreementLock.Unlock()
    if _, exist := n.sentAgreement[randRequest.BlockHash]; exist {
        return
    }
    if len(n.sentAgreement) > 1000 {
        // Randomly drop one entry.
        for k := range n.sentAgreement {
            delete(n.sentAgreement, k)
            break
        }
    }
    n.sentAgreement[randRequest.BlockHash] = struct{}{}
    if err := n.trans.Broadcast(randRequest); err != nil {
        panic(err)
    }
}

// BroadcastRandomnessResult implements core.Network interface.
func (n *Network) BroadcastRandomnessResult(
    randResult *types.BlockRandomnessResult) {
    n.sentRandomnessLock.Lock()
    defer n.sentRandomnessLock.Unlock()
    if _, exist := n.sentRandomness[randResult.BlockHash]; exist {
        return
    }
    if len(n.sentRandomness) > 1000 {
        // Randomly drop one entry.
        for k := range n.sentRandomness {
            delete(n.sentRandomness, k)
            break
        }
    }
    n.sentRandomness[randResult.BlockHash] = struct{}{}
    if err := n.trans.Broadcast(randResult); err != nil {
        panic(err)
    }
}

// SendDKGPrivateShare implements core.Network interface.
func (n *Network) SendDKGPrivateShare(
    recv crypto.PublicKey, prvShare *typesDKG.PrivateShare) {
    if err := n.trans.Send(types.NewNodeID(recv), prvShare); err != nil {
        panic(err)
    }
}

// BroadcastDKGPrivateShare implements core.Network interface.
func (n *Network) BroadcastDKGPrivateShare(
    prvShare *typesDKG.PrivateShare) {
    n.broadcastToSet(n.getDKGSet(prvShare.Round), prvShare)
}

// BroadcastDKGPartialSignature implements core.Network interface.
func (n *Network) BroadcastDKGPartialSignature(
    psig *typesDKG.PartialSignature) {
    n.broadcastToSet(n.getDKGSet(psig.Round), psig)
}

// ReceiveChan implements core.Network interface.
func (n *Network) ReceiveChan() <-chan interface{} {
    return n.toConsensus
}

// Setup transport layer.
func (n *Network) Setup(serverEndpoint interface{}) (err error) {
    // Join the p2p network.
    switch n.config.Type {
    case NetworkTypeTCP, NetworkTypeTCPLocal:
        addr := net.JoinHostPort(
            n.config.PeerServer, strconv.Itoa(n.config.PeerPort))
        n.fromTransport, err = n.trans.Join(addr)
    case NetworkTypeFake:
        n.fromTransport, err = n.trans.Join(serverEndpoint)
    default:
        err = fmt.Errorf("unknown network type: %v", n.config.Type)
    }
    if err != nil {
        return
    }
    peerKeys := n.trans.Peers()
    n.peers = make(map[types.NodeID]struct{})
    for _, k := range peerKeys {
        n.peers[types.NewNodeID(k)] = struct{}{}
    }
    return
}

func (n *Network) dispatchMsg(e *TransportEnvelope) {
    msg := n.cloneForFake(e.Msg)
    switch v := msg.(type) {
    case *types.Block:
        n.addBlockToCache(v)
        // Notify pulling routine about the newly arrived block.
        func() {
            n.unreceivedBlocksLock.Lock()
            defer n.unreceivedBlocksLock.Unlock()
            if ch, exists := n.unreceivedBlocks[v.Hash]; exists {
                ch <- v.Hash
            }
            delete(n.unreceivedBlocks, v.Hash)
        }()
        n.toConsensus <- v
    case *types.Vote:
        // Add this vote to cache.
        n.addVoteToCache(v)
        n.toConsensus <- v
    case *types.AgreementResult, *types.BlockRandomnessResult,
        *typesDKG.PrivateShare, *typesDKG.PartialSignature:
        n.toConsensus <- v
    case packedStateChanges:
        if n.stateModule == nil {
            panic(errors.New(
                "receive packed state change request without state attached"))
        }
        if err := n.stateModule.AddRequestsFromOthers([]byte(v)); err != nil {
            panic(err)
        }
    case *PullRequest:
        go n.handlePullRequest(v)
    default:
        n.toNode <- v
    }
}

func (n *Network) handlePullRequest(req *PullRequest) {
    switch req.Type {
    case "block":
        hashes := req.Identity.(common.Hashes)
        func() {
            n.blockCacheLock.Lock()
            defer n.blockCacheLock.Unlock()
        All:
            for _, h := range hashes {
                b, exists := n.blockCache[h]
                if !exists {
                    continue
                }
                select {
                case <-n.ctx.Done():
                    break All
                default:
                }
                if err := n.trans.Send(req.Requester, b); err != nil {
                    log.Println("unable to send block", req.Requester, err)
                }
            }
        }()
    case "vote":
        pos := req.Identity.(types.Position)
        func() {
            n.voteCacheLock.Lock()
            defer n.voteCacheLock.Unlock()
            if votes, exists := n.voteCache[pos]; exists {
                for _, v := range votes {
                    if err := n.trans.Send(req.Requester, v); err != nil {
                        log.Println("unable to send vote", req.Requester, err)
                    }
                }
            }
        }()
    default:
        panic(fmt.Errorf("unknown type of pull request: %v", req.Type))
    }
}

// Run the main loop.
func (n *Network) Run() {
Loop:
    for {
        select {
        case <-n.ctx.Done():
            break Loop
        default:
        }
        select {
        case <-n.ctx.Done():
            break Loop
        case e, ok := <-n.fromTransport:
            if !ok {
                break Loop
            }
            go n.dispatchMsg(e)
        }
    }
}

// Close stops the network.
func (n *Network) Close() (err error) {
    n.ctxCancel()
    close(n.toConsensus)
    n.toConsensus = nil
    close(n.toNode)
    n.toNode = nil
    if err = n.trans.Close(); err != nil {
        return
    }
    return
}

// Report exports 'Report' method of TransportClient.
func (n *Network) Report(msg interface{}) error {
    return n.trans.Report(msg)
}

// Peers exports 'Peers' method of Transport.
func (n *Network) Peers() []crypto.PublicKey {
    return n.trans.Peers()
}

// Broadcast exports 'Broadcast' method of Transport, and would panic when
// error.
func (n *Network) Broadcast(msg interface{}) {
    if err := n.trans.Broadcast(msg); err != nil {
        panic(err)
    }
}

// Send exports 'Send' method of Transport.
func (n *Network) Send(nodeID types.NodeID, msg interface{}) error {
    return n.trans.Send(nodeID, msg)
}

// ReceiveChanForNode returns a channel for messages not handled by
// core.Consensus.
func (n *Network) ReceiveChanForNode() <-chan interface{} {
    return n.toNode
}

// addStateModule attaches a State instance to this network.
func (n *Network) addStateModule(s *State) {
    // This variable should be attached before run, no lock to protect it.
    n.stateModule = s
}

// AddNodeSetCache attaches an utils.NodeSetCache to this module. Once attached
// The behavior of Broadcast-X methods would be switched to broadcast to correct
// set of peers, instead of all peers.
func (n *Network) AddNodeSetCache(cache *utils.NodeSetCache) {
    // This variable should be attached before run, no lock to protect it.
    n.cache = cache
}

func (n *Network) pullBlocksAsync(hashes common.Hashes) {
    // Setup notification channels for each block hash.
    notYetReceived := make(map[common.Hash]struct{})
    ch := make(chan common.Hash, len(hashes))
    func() {
        n.unreceivedBlocksLock.Lock()
        defer n.unreceivedBlocksLock.Unlock()
        for _, h := range hashes {
            if _, exists := n.unreceivedBlocks[h]; exists {
                continue
            }
            n.unreceivedBlocks[h] = ch
            notYetReceived[h] = struct{}{}
        }
    }()
    req := &PullRequest{
        Requester: n.ID,
        Type:      "block",
        Identity:  hashes,
    }
    // Randomly pick peers to send pull requests.
Loop:
    for nID := range n.peers {
        if nID == n.ID {
            continue
        }
        if err := n.trans.Send(nID, req); err != nil {
            // Try next peer.
            continue
        }
        select {
        case <-n.ctx.Done():
            break Loop
        case <-time.After(2 * n.latencyModel.Delay()):
            // Consume everything in the notification channel.
            for {
                select {
                case h, ok := <-ch:
                    if !ok {
                        // This network module is closed.
                        break Loop
                    }
                    delete(notYetReceived, h)
                    if len(notYetReceived) == 0 {
                        break Loop
                    }
                default:
                    continue Loop
                }
            }
        }
    }
}

func (n *Network) pullVotesAsync(pos types.Position) {
    // Randomly pick several peers to pull votes from.
    req := &PullRequest{
        Requester: n.ID,
        Type:      "vote",
        Identity:  pos,
    }
    // Get corresponding notary set.
    notarySet := n.getNotarySet(pos.Round, pos.ChainID)
    // Randomly select one peer from notary set and send a pull request.
    sentCount := 0
    for nID := range notarySet {
        if err := n.trans.Send(nID, req); err != nil {
            // Try next peer.
            continue
        }
        sentCount++
        if sentCount >= maxPullingPeerCount {
            break
        }
    }
}

func (n *Network) addBlockToCache(b *types.Block) {
    n.blockCacheLock.Lock()
    defer n.blockCacheLock.Unlock()
    if len(n.blockCache) > 1000 {
        // Randomly purge one block from cache.
        for k := range n.blockCache {
            delete(n.blockCache, k)
            break
        }
    }
    n.blockCache[b.Hash] = b.Clone()
}

func (n *Network) addVoteToCache(v *types.Vote) {
    n.voteCacheLock.Lock()
    defer n.voteCacheLock.Unlock()
    if n.voteCacheSize >= 128 {
        pos := n.votePositions[0]
        n.voteCacheSize -= len(n.voteCache[pos])
        delete(n.voteCache, pos)
        n.votePositions = n.votePositions[1:]
    }
    if _, exists := n.voteCache[v.Position]; !exists {
        n.votePositions = append(n.votePositions, v.Position)
        n.voteCache[v.Position] =
            make(map[types.VoteHeader]*types.Vote)
    }
    if _, exists := n.voteCache[v.Position][v.VoteHeader]; exists {
        return
    }
    n.voteCache[v.Position][v.VoteHeader] = v
    n.voteCacheSize++
}

func (n *Network) cloneForFake(v interface{}) interface{} {
    if n.config.Type != NetworkTypeFake {
        return v
    }
    switch val := v.(type) {
    case *types.Block:
        return val.Clone()
    case *types.BlockRandomnessResult:
        // Perform deep copy for randomness result.
        return cloneBlockRandomnessResult(val)
    }
    return v
}

// getNotarySet gets notary set for that (round, chain) from cache.
func (n *Network) getNotarySet(
    round uint64, chain uint32) map[types.NodeID]struct{} {
    if n.cache == nil {
        // Default behavior is to broadcast to all peers, which makes it easier
        // to be used in simple test cases.
        return n.peers
    }
    n.notarySetCachesLock.Lock()
    defer n.notarySetCachesLock.Unlock()
    roundSets, exists := n.notarySetCaches[round]
    if !exists {
        roundSets = make(map[uint32]map[types.NodeID]struct{})
        n.notarySetCaches[round] = roundSets
    }
    set, exists := roundSets[chain]
    if !exists {
        var err error
        set, err = n.cache.GetNotarySet(round, chain)
        if err != nil {
            panic(err)
        }
        roundSets[chain] = set
    }
    return set
}

// getDKGSet gets DKG set for that round from cache.
func (n *Network) getDKGSet(round uint64) map[types.NodeID]struct{} {
    if n.cache == nil {
        // Default behavior is to broadcast to all peers, which makes it easier
        // to be used in simple test cases.
        return n.peers
    }
    n.dkgSetCachesLock.Lock()
    defer n.dkgSetCachesLock.Unlock()
    set, exists := n.dkgSetCaches[round]
    if !exists {
        var err error
        set, err = n.cache.GetDKGSet(round)
        if err != nil {
            panic(err)
        }
        n.dkgSetCaches[round] = set
    }
    return set
}

// broadcastToSet broadcast a message to a set of nodes.
func (n *Network) broadcastToSet(
    set map[types.NodeID]struct{}, msg interface{}) {
    for nID := range set {
        if nID == n.ID {
            continue
        }
        if err := n.trans.Send(nID, msg); err != nil {
            panic(err)
        }
    }
}