aboutsummaryrefslogblamecommitdiffstats
path: root/core/test/network.go
blob: 6aff3d6033c2782e2021a356995fa44950dff773 (plain) (tree)
1
2
3
4
5
6
7
8
9

                                                    
  
                                                                        

                                                                               

                                         
                                                                         


                                                                          
  
                                                                           
                                                      

                                  
            

        
                 
                       
                
             
             

                 
              
              
 

                                                                 

                                                                             

 






                                                                  

                                              
 
                     
       


                                                     

 




                                                         

 

































































                                                                                  

                                                                      

























                                                                                

 



                                                                   
                                            
                     










                                                                             


                                                                     






                                                                                   
                
                                                                          

              
 
 

                                                    
                                    

 

                                                 
                                

 
                                                   
                                                   


                                                       
                              


                                                    
                                                      

                                                    


                                                        
                                

 
                                                              
                                           
                                             

                                          










                                                                      





                                                               
                                            
                                                  

                                           










                                                                      




                                                             
                                                       
                                                  




                                                          
                                                         
                                      
                                                                 
                                                                             



                          
                                                              
                                           
                                          




                                                           
                                                                  
                                               
                                          




                                                       
                                                 
                                                    


                            

                                                                 
                                



                                                                             
                                                         
                             

                                                                   
                                                                           



                       




                                                        


              
                                                     

                                    
                          

                                                                        
                        



                                                                             
                         
                   
                                  


                                          
                                  
                                                                  
                                                                   
                                  







                                                                                              

                                         
                
                             
         

 








































                                                                                                      


                         


                                    
                                  



                                    
                                  

                                                
                                          
                         
                                        



                 

                                       


                            

                       





                                              

                                                     


                                  

                                              
                              
 













                                                                          




                                                            














































































































































                                                                                                













                                                           
// Copyright 2018 The dexon-consensus Authors
// This file is part of the dexon-consensus library.
//
// The dexon-consensus library is free software: you can redistribute it
// and/or modify it under the terms of the GNU Lesser General Public License as
// published by the Free Software Foundation, either version 3 of the License,
// or (at your option) any later version.
//
// The dexon-consensus library is distributed in the hope that it will be
// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
// General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the dexon-consensus library. If not, see
// <http://www.gnu.org/licenses/>.

package test

import (
    "context"
    "encoding/json"
    "errors"
    "fmt"
    "log"
    "net"
    "strconv"
    "sync"
    "time"

    "github.com/dexon-foundation/dexon-consensus/common"
    "github.com/dexon-foundation/dexon-consensus/core/crypto"
    "github.com/dexon-foundation/dexon-consensus/core/types"
    typesDKG "github.com/dexon-foundation/dexon-consensus/core/types/dkg"
)

const (
    // Count of rounds of notary set cached in network module.
    cachedNotarySetSize = 10
    // Count of maximum count of peers to pull votes from.
    maxPullingPeerCount = 3
)

// NetworkType is the simulation network type.
type NetworkType string

// NetworkType enums.
const (
    NetworkTypeTCP      NetworkType = "tcp"
    NetworkTypeTCPLocal NetworkType = "tcp-local"
    NetworkTypeFake     NetworkType = "fake"
)

// NetworkConfig is the configuration for Network module.
type NetworkConfig struct {
    Type       NetworkType
    PeerServer string
    PeerPort   int
}

// PullRequest is a generic request to pull everything (ex. vote, block...).
type PullRequest struct {
    Requester types.NodeID
    Type      string
    Identity  interface{}
}

// MarshalJSON implements json.Marshaller.
func (req *PullRequest) MarshalJSON() (b []byte, err error) {
    var idAsBytes []byte
    // Make sure caller prepare correct identity for pull requests.
    switch req.Type {
    case "block":
        idAsBytes, err = json.Marshal(req.Identity.(common.Hashes))
    case "vote":
        idAsBytes, err = json.Marshal(req.Identity.(types.Position))
    default:
        err = fmt.Errorf("unknown ID type for pull request: %v", req.Type)
    }
    if err != nil {
        return
    }
    b, err = json.Marshal(&struct {
        Requester types.NodeID `json:"req"`
        Type      string       `json:"type"`
        Identity  []byte       `json:"id"`
    }{req.Requester, req.Type, idAsBytes})
    return
}

// UnmarshalJSON iumplements json.Unmarshaller.
func (req *PullRequest) UnmarshalJSON(data []byte) (err error) {
    rawReq := &struct {
        Requester types.NodeID `json:"req"`
        Type      string       `json:"type"`
        Identity  []byte       `json:"id"`
    }{}
    if err = json.Unmarshal(data, rawReq); err != nil {
        return
    }
    var ID interface{}
    switch rawReq.Type {
    case "block":
        hashes := common.Hashes{}
        if err = json.Unmarshal(rawReq.Identity, &hashes); err != nil {
            break
        }
        ID = hashes
    case "vote":
        pos := types.Position{}
        if err = json.Unmarshal(rawReq.Identity, &pos); err != nil {
            break
        }
        ID = pos
    default:
        err = fmt.Errorf("unknown pull request type: %v", rawReq.Type)
    }
    if err != nil {
        return
    }
    req.Requester = rawReq.Requester
    req.Type = rawReq.Type
    req.Identity = ID
    return
}

// Network implements core.Network interface based on TransportClient.
type Network struct {
    ID                   types.NodeID
    config               NetworkConfig
    ctx                  context.Context
    ctxCancel            context.CancelFunc
    trans                TransportClient
    fromTransport        <-chan *TransportEnvelope
    toConsensus          chan interface{}
    toNode               chan interface{}
    sentRandomnessLock   sync.Mutex
    sentRandomness       map[common.Hash]struct{}
    sentAgreementLock    sync.Mutex
    sentAgreement        map[common.Hash]struct{}
    blockCacheLock       sync.RWMutex
    blockCache           map[common.Hash]*types.Block
    voteCacheLock        sync.RWMutex
    voteCache            map[types.Position]map[types.VoteHeader]*types.Vote
    voteCacheSize        int
    votePositions        []types.Position
    stateModule          *State
    notarySetLock        sync.RWMutex
    notarySets           []map[types.NodeID]struct{}
    notarySetMinRound    uint64
    peers                map[types.NodeID]struct{}
    unreceivedBlocksLock sync.RWMutex
    unreceivedBlocks     map[common.Hash]chan<- common.Hash
    latencyModel         LatencyModel
}

// NewNetwork setup network stuffs for nodes, which provides an
// implementation of core.Network based on TransportClient.
func NewNetwork(pubKey crypto.PublicKey, latency LatencyModel,
    marshaller Marshaller, config NetworkConfig) (n *Network) {
    // Construct basic network instance.
    n = &Network{
        ID:               types.NewNodeID(pubKey),
        config:           config,
        toConsensus:      make(chan interface{}, 1000),
        toNode:           make(chan interface{}, 1000),
        sentRandomness:   make(map[common.Hash]struct{}),
        sentAgreement:    make(map[common.Hash]struct{}),
        blockCache:       make(map[common.Hash]*types.Block),
        unreceivedBlocks: make(map[common.Hash]chan<- common.Hash),
        latencyModel:     latency,
        voteCache: make(
            map[types.Position]map[types.VoteHeader]*types.Vote),
    }
    n.ctx, n.ctxCancel = context.WithCancel(context.Background())
    // Construct transport layer.
    switch config.Type {
    case NetworkTypeTCPLocal:
        n.trans = NewTCPTransportClient(pubKey, latency, marshaller, true)
    case NetworkTypeTCP:
        n.trans = NewTCPTransportClient(pubKey, latency, marshaller, false)
    case NetworkTypeFake:
        n.trans = NewFakeTransportClient(pubKey, latency)
    default:
        panic(fmt.Errorf("unknown network type: %v", config.Type))
    }
    return
}

// PullBlocks implements core.Network interface.
func (n *Network) PullBlocks(hashes common.Hashes) {
    go n.pullBlocksAsync(hashes)
}

// PullVotes implements core.Network interface.
func (n *Network) PullVotes(pos types.Position) {
    go n.pullVotesAsync(pos)
}

// BroadcastVote implements core.Network interface.
func (n *Network) BroadcastVote(vote *types.Vote) {
    if err := n.trans.Broadcast(vote); err != nil {
        panic(err)
    }
    n.addVoteToCache(vote)
}

// BroadcastBlock implements core.Network interface.
func (n *Network) BroadcastBlock(block *types.Block) {
    // Avoid data race in fake transport.
    block = n.cloneForFake(block).(*types.Block)
    if err := n.trans.Broadcast(block); err != nil {
        panic(err)
    }
    n.addBlockToCache(block)
}

// BroadcastAgreementResult implements core.Network interface.
func (n *Network) BroadcastAgreementResult(
    randRequest *types.AgreementResult) {
    n.sentAgreementLock.Lock()
    defer n.sentAgreementLock.Unlock()
    if _, exist := n.sentAgreement[randRequest.BlockHash]; exist {
        return
    }
    if len(n.sentAgreement) > 1000 {
        // Randomly drop one entry.
        for k := range n.sentAgreement {
            delete(n.sentAgreement, k)
            break
        }
    }
    n.sentAgreement[randRequest.BlockHash] = struct{}{}
    if err := n.trans.Broadcast(randRequest); err != nil {
        panic(err)
    }
}

// BroadcastRandomnessResult implements core.Network interface.
func (n *Network) BroadcastRandomnessResult(
    randResult *types.BlockRandomnessResult) {
    n.sentRandomnessLock.Lock()
    defer n.sentRandomnessLock.Unlock()
    if _, exist := n.sentRandomness[randResult.BlockHash]; exist {
        return
    }
    if len(n.sentRandomness) > 1000 {
        // Randomly drop one entry.
        for k := range n.sentRandomness {
            delete(n.sentRandomness, k)
            break
        }
    }
    n.sentRandomness[randResult.BlockHash] = struct{}{}
    if err := n.trans.Broadcast(randResult); err != nil {
        panic(err)
    }
}

// broadcast message to all other nodes in the network.
func (n *Network) broadcast(message interface{}) {
    if err := n.trans.Broadcast(message); err != nil {
        panic(err)
    }
}

// SendDKGPrivateShare implements core.Network interface.
func (n *Network) SendDKGPrivateShare(
    recv crypto.PublicKey, prvShare *typesDKG.PrivateShare) {
    if err := n.trans.Send(types.NewNodeID(recv), prvShare); err != nil {
        panic(err)
    }
}

// BroadcastDKGPrivateShare implements core.Network interface.
func (n *Network) BroadcastDKGPrivateShare(
    prvShare *typesDKG.PrivateShare) {
    if err := n.trans.Broadcast(prvShare); err != nil {
        panic(err)
    }
}

// BroadcastDKGPartialSignature implements core.Network interface.
func (n *Network) BroadcastDKGPartialSignature(
    psig *typesDKG.PartialSignature) {
    if err := n.trans.Broadcast(psig); err != nil {
        panic(err)
    }
}

// ReceiveChan implements core.Network interface.
func (n *Network) ReceiveChan() <-chan interface{} {
    return n.toConsensus
}

// Setup transport layer.
func (n *Network) Setup(serverEndpoint interface{}) (err error) {
    // Join the p2p network.
    switch n.config.Type {
    case NetworkTypeTCP, NetworkTypeTCPLocal:
        addr := net.JoinHostPort(
            n.config.PeerServer, strconv.Itoa(n.config.PeerPort))
        n.fromTransport, err = n.trans.Join(addr)
    case NetworkTypeFake:
        n.fromTransport, err = n.trans.Join(serverEndpoint)
    default:
        err = fmt.Errorf("unknown network type: %v", n.config.Type)
    }
    if err != nil {
        return
    }
    peerKeys := n.trans.Peers()
    n.peers = make(map[types.NodeID]struct{})
    for _, k := range peerKeys {
        n.peers[types.NewNodeID(k)] = struct{}{}
    }
    return
}

func (n *Network) dispatchMsg(e *TransportEnvelope) {
    msg := n.cloneForFake(e.Msg)
    switch v := msg.(type) {
    case *types.Block:
        n.addBlockToCache(v)
        // Notify pulling routine about the newly arrived block.
        func() {
            n.unreceivedBlocksLock.Lock()
            defer n.unreceivedBlocksLock.Unlock()
            if ch, exists := n.unreceivedBlocks[v.Hash]; exists {
                ch <- v.Hash
            }
        }()
        n.toConsensus <- v
    case *types.Vote:
        // Add this vote to cache.
        n.addVoteToCache(v)
        n.toConsensus <- v
    case *types.AgreementResult, *types.BlockRandomnessResult,
        *typesDKG.PrivateShare, *typesDKG.PartialSignature:
        n.toConsensus <- v
    case packedStateChanges:
        if n.stateModule == nil {
            panic(errors.New(
                "receive packed state change request without state attached"))
        }
        if err := n.stateModule.AddRequestsFromOthers([]byte(v)); err != nil {
            panic(err)
        }
    case *PullRequest:
        go n.handlePullRequest(v)
    default:
        n.toNode <- v
    }
}

func (n *Network) handlePullRequest(req *PullRequest) {
    switch req.Type {
    case "block":
        hashes := req.Identity.(common.Hashes)
        func() {
            n.blockCacheLock.Lock()
            defer n.blockCacheLock.Unlock()
        All:
            for _, h := range hashes {
                b, exists := n.blockCache[h]
                if !exists {
                    continue
                }
                select {
                case <-n.ctx.Done():
                    break All
                default:
                }
                if err := n.trans.Send(req.Requester, b); err != nil {
                    log.Println("unable to send block", req.Requester, err)
                }
            }
        }()
    case "vote":
        pos := req.Identity.(types.Position)
        func() {
            n.voteCacheLock.Lock()
            defer n.voteCacheLock.Unlock()
            if votes, exists := n.voteCache[pos]; exists {
                for _, v := range votes {
                    if err := n.trans.Send(req.Requester, v); err != nil {
                        log.Println("unable to send vote", req.Requester, err)
                    }
                }
            }
        }()
    default:
        panic(fmt.Errorf("unknown type of pull request: %v", req.Type))
    }
}

// Run the main loop.
func (n *Network) Run() {
Loop:
    for {
        select {
        case <-n.ctx.Done():
            break Loop
        default:
        }
        select {
        case <-n.ctx.Done():
            break Loop
        case e, ok := <-n.fromTransport:
            if !ok {
                break Loop
            }
            n.dispatchMsg(e)
        }
    }
}

// Close stops the network.
func (n *Network) Close() (err error) {
    n.ctxCancel()
    close(n.toConsensus)
    n.toConsensus = nil
    close(n.toNode)
    n.toNode = nil
    if err = n.trans.Close(); err != nil {
        return
    }
    return
}

// Report exports 'Report' method of TransportClient.
func (n *Network) Report(msg interface{}) error {
    return n.trans.Report(msg)
}

// Peers exports 'Peers' method of Transport.
func (n *Network) Peers() []crypto.PublicKey {
    return n.trans.Peers()
}

// Broadcast exports 'Broadcast' method of Transport, and would panic when
// error.
func (n *Network) Broadcast(msg interface{}) {
    if err := n.trans.Broadcast(msg); err != nil {
        panic(err)
    }
}

// ReceiveChanForNode returns a channel for messages not handled by
// core.Consensus.
func (n *Network) ReceiveChanForNode() <-chan interface{} {
    return n.toNode
}

// addStateModule attaches a State instance to this network.
func (n *Network) addStateModule(s *State) {
    n.stateModule = s
}

// appendRoundSetting updates essential info to network module for each round.
func (n *Network) appendRoundSetting(
    round uint64, notarySet map[types.NodeID]struct{}) {
    n.notarySetLock.Lock()
    defer n.notarySetLock.Unlock()
    if len(n.notarySets) != 0 {
        // This network module is already initialized, do some check against
        // the inputs.
        if round != n.notarySetMinRound+uint64(len(n.notarySets)) {
            panic(fmt.Errorf(
                "round not increasing when appending round setting: %v", round))
        }
    } else {
        n.notarySetMinRound = round
    }
    n.notarySets = append(n.notarySets, notarySet)
    // Purge cached notary sets.
    if len(n.notarySets) > cachedNotarySetSize {
        n.notarySets = n.notarySets[1:]
        n.notarySetMinRound++
    }
    return
}

func (n *Network) pullBlocksAsync(hashes common.Hashes) {
    // Setup notification channels for each block hash.
    notYetReceived := make(map[common.Hash]struct{})
    ch := make(chan common.Hash, len(hashes))
    func() {
        n.unreceivedBlocksLock.Lock()
        defer n.unreceivedBlocksLock.Unlock()
        for _, h := range hashes {
            if _, exists := n.unreceivedBlocks[h]; exists {
                panic(fmt.Errorf("attempting to pull one block multiple times"))
            }
            n.unreceivedBlocks[h] = ch
        }
    }()
    // Clean all unreceived block entry in unrecelivedBlocks field when leaving.
    defer func() {
        n.unreceivedBlocksLock.Lock()
        defer n.unreceivedBlocksLock.Unlock()
        for _, h := range hashes {
            delete(n.unreceivedBlocks, h)
        }
    }()
    req := &PullRequest{
        Requester: n.ID,
        Type:      "block",
        Identity:  hashes,
    }
    // Randomly pick peers to send pull requests.
Loop:
    for nID := range n.peers {
        if err := n.trans.Send(nID, req); err != nil {
            // Try next peer.
            continue
        }
        select {
        case <-n.ctx.Done():
            break Loop
        case <-time.After(2 * n.latencyModel.Delay()):
            // Consume everything in the notification channel.
            for {
                select {
                case h, ok := <-ch:
                    if !ok {
                        // This network module is closed.
                        break Loop
                    }
                    delete(notYetReceived, h)
                    if len(notYetReceived) == 0 {
                        break Loop
                    }
                default:
                    continue Loop
                }
            }
        }
    }
}

func (n *Network) pullVotesAsync(pos types.Position) {
    // Randomly pick several peers to pull votes from.
    req := &PullRequest{
        Requester: n.ID,
        Type:      "vote",
        Identity:  pos,
    }
    // Get corresponding notary set.
    notarySet := func() map[types.NodeID]struct{} {
        n.notarySetLock.Lock()
        defer n.notarySetLock.Unlock()
        return n.notarySets[pos.Round-n.notarySetMinRound]
    }()
    // Randomly select one peer from notary set and send a pull request.
    sentCount := 0
    for nID := range notarySet {
        if err := n.trans.Send(nID, req); err != nil {
            // Try next peer.
            continue
        }
        sentCount++
        if sentCount >= maxPullingPeerCount {
            break
        }
    }
}

func (n *Network) addBlockToCache(b *types.Block) {
    n.blockCacheLock.Lock()
    defer n.blockCacheLock.Unlock()
    if len(n.blockCache) > 1000 {
        // Randomly purge one block from cache.
        for k := range n.blockCache {
            delete(n.blockCache, k)
            break
        }
    }
    n.blockCache[b.Hash] = b
}

func (n *Network) addVoteToCache(v *types.Vote) {
    n.voteCacheLock.Lock()
    defer n.voteCacheLock.Unlock()
    if n.voteCacheSize >= 128 {
        pos := n.votePositions[0]
        n.voteCacheSize -= len(n.voteCache[pos])
        delete(n.voteCache, pos)
        n.votePositions = n.votePositions[1:]
    }
    if _, exists := n.voteCache[v.Position]; !exists {
        n.votePositions = append(n.votePositions, v.Position)
        n.voteCache[v.Position] =
            make(map[types.VoteHeader]*types.Vote)
    }
    if _, exists := n.voteCache[v.Position][v.VoteHeader]; exists {
        return
    }
    n.voteCache[v.Position][v.VoteHeader] = v
    n.voteCacheSize++
}

func (n *Network) cloneForFake(v interface{}) interface{} {
    if n.config.Type != NetworkTypeFake {
        return v
    }
    switch val := v.(type) {
    case *types.Block:
        return val.Clone()
    case *types.BlockRandomnessResult:
        // Perform deep copy for randomness result.
        return cloneBlockRandomnessResult(val)
    }
    return v
}