aboutsummaryrefslogblamecommitdiffstats
path: root/simulation/peer-server.go
blob: 95ce5c1c78411bf9fe4736d49f9da072dac1fcc8 (plain) (tree)
1
2
3
4
5
6
7
8
9

                                                    
  
                                                                        



                                                                               
                                                                         




                                                                           
                                                      




                                  
                 


                       
                 
              
              
 
                                                            


                                                                       

 
                                                                           
                        
                                                  

                                                     

                                       
                               


                                           
                                                                     

 
                                                   
                                  
                                                               
                           
                                                                
                                                     

                                       
                                                                                   


         



                                                                  


                    
                                                                         
                                                                          

                                         
 


                                                                  
         
                                         
                                  


                                                 




                                                                                          
                 
                                               
                                                         

                                                                                 
                         
                 

             
 
                                                                     
                                                                 















                                                                                    
                 



                                                                           
 










                                                                                       



                                    
                              
                        
                 



                                         
                                              
                                     
                         





                                                               

                                                                      

                                                                                               
                         
                 
         
 
 














                                                                                  
         



                                     
 

                            
                                                                    


                             


                                                             









                                                                       
         
















                                                                                                   
 
// Copyright 2018 The dexon-consensus Authors
// This file is part of the dexon-consensus library.
//
// The dexon-consensus library is free software: you can redistribute it
// and/or modify it under the terms of the GNU Lesser General Public License as
// published by the Free Software Foundation, either version 3 of the License,
// or (at your option) any later version.
//
// The dexon-consensus library is distributed in the hope that it will be
// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
// General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the dexon-consensus library. If not, see
// <http://www.gnu.org/licenses/>.

package simulation

import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "reflect"
    "sync"
    "time"

    "github.com/dexon-foundation/dexon-consensus/common"
    "github.com/dexon-foundation/dexon-consensus/core/test"
    "github.com/dexon-foundation/dexon-consensus/core/types"
    "github.com/dexon-foundation/dexon-consensus/simulation/config"
)

// PeerServer is the main object to collect results and monitor simulation.
type PeerServer struct {
    peers            map[types.NodeID]struct{}
    msgChannel       chan *test.TransportEnvelope
    trans            test.TransportServer
    peerTotalOrder   PeerTotalOrder
    peerTotalOrderMu sync.Mutex
    verifiedLen      uint64
    cfg              *config.Config
    ctx              context.Context
    ctxCancel        context.CancelFunc
    blockEvents      map[types.NodeID]map[common.Hash][]time.Time
}

// NewPeerServer returns a new PeerServer instance.
func NewPeerServer() *PeerServer {
    ctx, cancel := context.WithCancel(context.Background())
    return &PeerServer{
        peers:          make(map[types.NodeID]struct{}),
        peerTotalOrder: make(PeerTotalOrder),
        ctx:            ctx,
        ctxCancel:      cancel,
        blockEvents:    make(map[types.NodeID]map[common.Hash][]time.Time),
    }
}

// isNode checks if nID is in p.peers. If peer server restarts but
// nodes are not, it will cause panic if nodes send message.
func (p *PeerServer) isNode(nID types.NodeID) bool {
    _, exist := p.peers[nID]
    return exist
}

// handleBlockList is the handler for messages with BlockList as payload.
func (p *PeerServer) handleBlockList(id types.NodeID, blocks *BlockList) {
    p.peerTotalOrderMu.Lock()
    defer p.peerTotalOrderMu.Unlock()

    readyForVerify := p.peerTotalOrder[id].PushBlocks(*blocks)
    if !readyForVerify {
        return
    }
    // Verify the total order result.
    go func(id types.NodeID) {
        p.peerTotalOrderMu.Lock()
        defer p.peerTotalOrderMu.Unlock()

        var correct bool
        var length int
        p.peerTotalOrder, correct, length = VerifyTotalOrder(id, p.peerTotalOrder)
        if !correct {
            log.Printf("The result of Total Ordering Algorithm has error.\n")
        }
        p.verifiedLen += uint64(length)
        if p.verifiedLen >= p.cfg.Node.MaxBlock {
            if err := p.trans.Broadcast(statusShutdown); err != nil {
                panic(err)
            }
        }
    }(id)
}

// handleMessage is the handler for messages with Message as payload.
func (p *PeerServer) handleMessage(id types.NodeID, m *message) {
    switch m.Type {
    case shutdownAck:
        delete(p.peers, id)
        log.Printf("%v shutdown, %d remains.\n", id, len(p.peers))
        if len(p.peers) == 0 {
            p.ctxCancel()
        }
    case blockTimestamp:
        msgs := []timestampMessage{}
        if err := json.Unmarshal(m.Payload, &msgs); err != nil {
            panic(err)
        }
        for _, msg := range msgs {
            if ok := p.peerTotalOrder[id].PushTimestamp(msg); !ok {
                panic(fmt.Errorf("unable to push timestamp: %v", m))
            }
        }
    default:
        panic(fmt.Errorf("unknown simulation message type: %v", m))
    }
}

func (p *PeerServer) handleBlockEventMessage(id types.NodeID, msg *blockEventMessage) {
    if _, exist := p.blockEvents[id]; !exist {
        p.blockEvents[id] = make(map[common.Hash][]time.Time)
    }
    nodeEvents := p.blockEvents[id]
    if _, exist := nodeEvents[msg.BlockHash]; !exist {
        nodeEvents[msg.BlockHash] = []time.Time{}
    }
    nodeEvents[msg.BlockHash] = msg.Timestamps
}

func (p *PeerServer) mainLoop() {
    for {
        select {
        case <-p.ctx.Done():
            return
        default:
        }
        select {
        case <-p.ctx.Done():
            return
        case e := <-p.msgChannel:
            if !p.isNode(e.From) {
                break
            }
            // Handle messages based on their type.
            switch val := e.Msg.(type) {
            case *BlockList:
                p.handleBlockList(e.From, val)
            case *message:
                p.handleMessage(e.From, val)
            case *blockEventMessage:
                p.handleBlockEventMessage(e.From, val)
            default:
                panic(fmt.Errorf("unknown message: %v", reflect.TypeOf(e.Msg)))
            }
        }
    }
}

// Setup prepares simualtion.
func (p *PeerServer) Setup(
    cfg *config.Config) (serverEndpoint interface{}, err error) {
    // Setup transport layer.
    switch cfg.Networking.Type {
    case "tcp", "tcp-local":
        p.trans = test.NewTCPTransportServer(&jsonMarshaller{}, peerPort)
    case "fake":
        p.trans = test.NewFakeTransportServer()
    default:
        panic(fmt.Errorf("unknown network type: %v", cfg.Networking.Type))
    }
    p.msgChannel, err = p.trans.Host()
    if err != nil {
        return
    }
    p.cfg = cfg
    serverEndpoint = p.msgChannel
    return
}

// Run the simulation.
func (p *PeerServer) Run() {
    if err := p.trans.WaitForPeers(p.cfg.Node.Num); err != nil {
        panic(err)
    }
    // Cache peers' info.
    for _, pubKey := range p.trans.Peers() {
        p.peers[types.NewNodeID(pubKey)] = struct{}{}
    }
    // Initialize total order result cache.
    for id := range p.peers {
        p.peerTotalOrder[id] = NewTotalOrderResult(id)
    }
    // Block to handle incoming messages.
    p.mainLoop()
    // The simulation is done, clean up.
    LogStatus(p.peerTotalOrder)
    if err := p.trans.Close(); err != nil {
        log.Printf("Error shutting down peerServer: %v\n", err)
    }
    p.logBlockEvents()
}

func (p *PeerServer) logBlockEvents() {
    diffs := [blockEventCount - 1][]float64{}
    for _, bs := range p.blockEvents {
        for _, ts := range bs {
            for i := 0; i < blockEventCount-1; i++ {
                diffs[i] = append(diffs[i], float64(ts[i+1].Sub(ts[i]))/1000000000)
            }
        }
    }
    log.Printf("===== block event mean and std dev (%d blocks) =====\n", len(diffs[0]))
    for i, a := range diffs {
        m, d := calcMeanAndStdDeviation(a)
        log.Printf("  event %d: mean = %f, std dev = %f\n", i, m, d)
    }
}