diff options
author | Mission Liao <mission.liao@dexon.org> | 2018-09-11 14:56:47 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-09-11 14:56:47 +0800 |
commit | 292ad73ec08621fa9beef5f028860131fcbf9bd9 (patch) | |
tree | 47644eaad7757cd2e8798ae7fe361fa5d6e99060 /simulation | |
parent | 582a491aa0bcb784ac7b65ebbfb42139945ea703 (diff) | |
download | dexon-consensus-292ad73ec08621fa9beef5f028860131fcbf9bd9.tar dexon-consensus-292ad73ec08621fa9beef5f028860131fcbf9bd9.tar.gz dexon-consensus-292ad73ec08621fa9beef5f028860131fcbf9bd9.tar.bz2 dexon-consensus-292ad73ec08621fa9beef5f028860131fcbf9bd9.tar.lz dexon-consensus-292ad73ec08621fa9beef5f028860131fcbf9bd9.tar.xz dexon-consensus-292ad73ec08621fa9beef5f028860131fcbf9bd9.tar.zst dexon-consensus-292ad73ec08621fa9beef5f028860131fcbf9bd9.zip |
simulation: integrate test.Transport (#99)
- Add marshaller for simulation by encoding/json
- Implement peer server based on test.TranportServer
- Remove network models, they are replaced with test.LatencyModel
Diffstat (limited to 'simulation')
-rw-r--r-- | simulation/app.go | 24 | ||||
-rw-r--r-- | simulation/config/config.go | 1 | ||||
-rw-r--r-- | simulation/marshaller.go | 106 | ||||
-rw-r--r-- | simulation/network-model.go | 85 | ||||
-rw-r--r-- | simulation/network-model_test.go | 48 | ||||
-rw-r--r-- | simulation/network.go | 174 | ||||
-rw-r--r-- | simulation/peer-server.go | 336 | ||||
-rw-r--r-- | simulation/simulation.go | 83 | ||||
-rw-r--r-- | simulation/tcp-network.go | 457 | ||||
-rw-r--r-- | simulation/validator.go | 102 | ||||
-rw-r--r-- | simulation/verification.go | 2 |
11 files changed, 468 insertions, 950 deletions
diff --git a/simulation/app.go b/simulation/app.go index 5a31273..c312204 100644 --- a/simulation/app.go +++ b/simulation/app.go @@ -32,7 +32,7 @@ type simApp struct { ValidatorID types.ValidatorID Outputs []*types.Block Early bool - Network PeerServerNetwork + netModule *network DeliverID int // blockSeen stores the time when block is delivered by Total Ordering. blockSeen map[common.Hash]time.Time @@ -43,10 +43,10 @@ type simApp struct { } // newSimApp returns point to a new instance of simApp. -func newSimApp(id types.ValidatorID, Network PeerServerNetwork) *simApp { +func newSimApp(id types.ValidatorID, netModule *network) *simApp { return &simApp{ ValidatorID: id, - Network: Network, + netModule: netModule, DeliverID: 0, blockSeen: make(map[common.Hash]time.Time), unconfirmedBlocks: make(map[types.ValidatorID]common.Hashes), @@ -120,7 +120,7 @@ func (a *simApp) TotalOrderingDeliver(blockHashes common.Hashes, early bool) { confirmLatency := []time.Duration{} - payload := []TimestampMessage{} + payload := []timestampMessage{} for _, block := range blocks { if block.ProposerID == a.ValidatorID { confirmLatency = append(confirmLatency, @@ -128,7 +128,7 @@ func (a *simApp) TotalOrderingDeliver(blockHashes common.Hashes, early bool) { } for hash := range block.Acks { for _, blockHash := range a.getAckedBlocks(hash) { - payload = append(payload, TimestampMessage{ + payload = append(payload, timestampMessage{ BlockHash: blockHash, Event: timestampAck, Timestamp: now, @@ -142,20 +142,20 @@ func (a *simApp) TotalOrderingDeliver(blockHashes common.Hashes, early bool) { if err != nil { fmt.Println(err) } else { - msg := Message{ + msg := &message{ Type: blockTimestamp, Payload: jsonPayload, } - a.Network.NotifyServer(msg) + a.netModule.report(msg) } } - blockList := BlockList{ + blockList := &BlockList{ ID: a.DeliverID, BlockHash: blockHashes, ConfirmLatency: confirmLatency, } - a.Network.DeliverBlocks(blockList) + a.netModule.report(blockList) a.DeliverID++ for _, block := range blocks { a.blockSeen[block.Hash] = now @@ -171,7 +171,7 @@ func (a *simApp) DeliverBlock(blockHash common.Hash, timestamp time.Time) { return } now := time.Now() - payload := []TimestampMessage{ + payload := []timestampMessage{ { BlockHash: blockHash, Event: blockSeen, @@ -188,11 +188,11 @@ func (a *simApp) DeliverBlock(blockHash common.Hash, timestamp time.Time) { fmt.Println(err) return } - msg := Message{ + msg := &message{ Type: blockTimestamp, Payload: jsonPayload, } - a.Network.NotifyServer(msg) + a.netModule.report(msg) } // NotaryAckDeliver is called when a notary ack is created. diff --git a/simulation/config/config.go b/simulation/config/config.go index 2f03f85..f8f629b 100644 --- a/simulation/config/config.go +++ b/simulation/config/config.go @@ -31,6 +31,7 @@ type NetworkType string const ( NetworkTypeTCP NetworkType = "tcp" NetworkTypeTCPLocal NetworkType = "tcp-local" + NetworkTypeFake NetworkType = "fake" ) // Consensus settings. diff --git a/simulation/marshaller.go b/simulation/marshaller.go new file mode 100644 index 0000000..88e2f6a --- /dev/null +++ b/simulation/marshaller.go @@ -0,0 +1,106 @@ +// Copyright 2018 The dexon-consensus-core Authors +// This file is part of the dexon-consensus-core library. +// +// The dexon-consensus-core library is free software: you can redistribute it +// and/or modify it under the terms of the GNU Lesser General Public License as +// published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The dexon-consensus-core library is distributed in the hope that it will be +// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the dexon-consensus-core library. If not, see +// <http://www.gnu.org/licenses/>. + +package simulation + +import ( + "encoding/json" + "fmt" + + "github.com/dexon-foundation/dexon-consensus-core/core/types" +) + +// jsonMarshaller implements test.Marshaller to marshal simulation related +// messages. +type jsonMarshaller struct{} + +// Unmarshal implements Unmarshal method of test.Marshaller interface. +func (m *jsonMarshaller) Unmarshal( + msgType string, payload []byte) (msg interface{}, err error) { + + switch msgType { + case "blocklist": + var blocks BlockList + if err = json.Unmarshal(payload, &blocks); err != nil { + break + } + msg = &blocks + case "message": + var m message + if err = json.Unmarshal(payload, &m); err != nil { + break + } + msg = &m + case "info-status": + var status infoStatus + if err = json.Unmarshal(payload, &status); err != nil { + break + } + msg = status + case "block": + block := &types.Block{} + if err = json.Unmarshal(payload, block); err != nil { + break + } + msg = block + case "notary-ack": + nAck := &types.NotaryAck{} + if err = json.Unmarshal(payload, nAck); err != nil { + break + } + msg = nAck + case "vote": + vote := &types.Vote{} + if err = json.Unmarshal(payload, vote); err != nil { + break + } + msg = vote + default: + err = fmt.Errorf("unrecognized message type: %v", msgType) + } + if err != nil { + return + } + return +} + +// Marshal implements Marshal method of test.Marshaller interface. +func (m *jsonMarshaller) Marshal(msg interface{}) ( + msgType string, payload []byte, err error) { + + switch msg.(type) { + case *BlockList: + msgType = "blocklist" + case *message: + msgType = "message" + case infoStatus: + msgType = "info-status" + case *types.Block: + msgType = "block" + case *types.NotaryAck: + msgType = "notary-ack" + case *types.Vote: + msgType = "vote" + default: + err = fmt.Errorf("unknwon message type: %v", msg) + } + if err != nil { + return + } + payload, err = json.Marshal(msg) + return +} diff --git a/simulation/network-model.go b/simulation/network-model.go deleted file mode 100644 index 01704e7..0000000 --- a/simulation/network-model.go +++ /dev/null @@ -1,85 +0,0 @@ -// Copyright 2018 The dexon-consensus-core Authors -// This file is part of the dexon-consensus-core library. -// -// The dexon-consensus-core library is free software: you can redistribute it -// and/or modify it under the terms of the GNU Lesser General Public License as -// published by the Free Software Foundation, either version 3 of the License, -// or (at your option) any later version. -// -// The dexon-consensus-core library is distributed in the hope that it will be -// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser -// General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the dexon-consensus-core library. If not, see -// <http://www.gnu.org/licenses/>. - -package simulation - -import ( - "math/rand" - "time" -) - -// Model is the interface for define a given network environment. -type Model interface { - // LossRate returns the message lost ratio between [0, 1) - LossRate() float64 - - // Delay returns the send delay of the message. This function is called each - // time before the message is sent, so one can return different number each - // time. - Delay() time.Duration -} - -// LosslessNetwork is a lossless network model. -type LosslessNetwork struct { -} - -// LossRate returns lossrate for the model. -func (l *LosslessNetwork) LossRate() float64 { - return 0.0 -} - -// Delay returns the send delay of a given message. -func (l *LosslessNetwork) Delay() time.Duration { - return time.Duration(0) -} - -// FixedLostNoDelayModel is a network with no delay and a fixed lost -// ratio. -type FixedLostNoDelayModel struct { - LossRateValue float64 -} - -// LossRate returns lossrate for the model. -func (f *FixedLostNoDelayModel) LossRate() float64 { - return f.LossRateValue -} - -// Delay returns the send delay of a given message. -func (f *FixedLostNoDelayModel) Delay() time.Duration { - return time.Duration(0) -} - -// NormalNetwork is a model where it's delay is a normal distribution. -type NormalNetwork struct { - Sigma float64 - Mean float64 - LossRateValue float64 -} - -// LossRate returns lossrate for the model. -func (n *NormalNetwork) LossRate() float64 { - return n.LossRateValue -} - -// Delay returns the send delay of a given message. -func (n *NormalNetwork) Delay() time.Duration { - delay := rand.NormFloat64()*n.Sigma + n.Mean - if delay < 0 { - delay = n.Sigma / 2 - } - return time.Duration(delay) * time.Millisecond -} diff --git a/simulation/network-model_test.go b/simulation/network-model_test.go deleted file mode 100644 index a9de462..0000000 --- a/simulation/network-model_test.go +++ /dev/null @@ -1,48 +0,0 @@ -// Copyright 2018 The dexon-consensus-core Authors -// This file is part of the dexon-consensus-core library. -// -// The dexon-consensus-core library is free software: you can redistribute it -// and/or modify it under the terms of the GNU Lesser General Public License as -// published by the Free Software Foundation, either version 3 of the License, -// or (at your option) any later version. -// -// The dexon-consensus-core library is distributed in the hope that it will be -// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser -// General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the dexon-consensus-core library. If not, see -// <http://www.gnu.org/licenses/>. - -package simulation - -import ( - "testing" - "time" - - "github.com/stretchr/testify/suite" -) - -type NetworkModelsTestSuite struct { - suite.Suite -} - -func (n *NetworkModelsTestSuite) SetupTest() { -} - -func (n *NetworkModelsTestSuite) TearDownTest() { -} - -// TestNormalNetwork make sure the Delay() or NormalNetwork does not -// exceeds 200ms. -func (n *NetworkModelsTestSuite) TestNormalNetwork() { - m := NormalNetwork{} - for i := 0; i < 1000; i++ { - n.Require().True(m.Delay() < 200*time.Millisecond) - } -} - -func TestNetworkModels(t *testing.T) { - suite.Run(t, new(NetworkModelsTestSuite)) -} diff --git a/simulation/network.go b/simulation/network.go index 791790c..90f3aec 100644 --- a/simulation/network.go +++ b/simulation/network.go @@ -18,12 +18,17 @@ package simulation import ( + "context" "encoding/json" + "fmt" + "net" + "strconv" "time" "github.com/dexon-foundation/dexon-consensus-core/common" - "github.com/dexon-foundation/dexon-consensus-core/core" + "github.com/dexon-foundation/dexon-consensus-core/core/test" "github.com/dexon-foundation/dexon-consensus-core/core/types" + "github.com/dexon-foundation/dexon-consensus-core/simulation/config" ) type messageType string @@ -33,8 +38,8 @@ const ( blockTimestamp messageType = "blockTimestamps" ) -// Message is a struct for peer sending message to server. -type Message struct { +// message is a struct for peer sending message to server. +type message struct { Type messageType `json:"type"` Payload json.RawMessage `json:"payload"` } @@ -49,7 +54,7 @@ const ( // TimestampMessage is a struct for peer sending consensus timestamp information // to server. -type TimestampMessage struct { +type timestampMessage struct { BlockHash common.Hash `json:"hash"` Event timestampEvent `json:"event"` Timestamp time.Time `json:"timestamp"` @@ -63,30 +68,155 @@ const ( statusShutdown infoStatus = "shutdown" ) -// InfoMessage is a struct used by peerServer's /info. -type InfoMessage struct { +// infoMessage is a struct used by peerServer's /info. +type infoMessage struct { Status infoStatus `json:"status"` Peers map[types.ValidatorID]string `json:"peers"` } -// Endpoint is the interface for a client network endpoint. -type Endpoint interface { - GetID() types.ValidatorID +// network implements core.Network interface and other methods for simulation +// based on test.TransportClient. +type network struct { + cfg config.Networking + ctx context.Context + ctxCancel context.CancelFunc + trans test.TransportClient + fromTransport <-chan *test.TransportEnvelope + toConsensus chan interface{} + toValidator chan interface{} } -// Network is the interface for network related functions. -type Network interface { - PeerServerNetwork - core.Network - Start() - NumPeers() int - Join(endpoint Endpoint) - Endpoints() types.ValidatorIDs +// newNetwork setup network stuffs for validators, which provides an +// implementation of core.Network based on test.TransportClient. +func newNetwork(vID types.ValidatorID, cfg config.Networking) (n *network) { + // Construct latency model. + latency := &test.NormalLatencyModel{ + Mean: cfg.Mean, + Sigma: cfg.Sigma, + } + // Construct basic network instance. + n = &network{ + cfg: cfg, + toValidator: make(chan interface{}, 1000), + toConsensus: make(chan interface{}, 1000), + } + n.ctx, n.ctxCancel = context.WithCancel(context.Background()) + // Construct transport layer. + switch cfg.Type { + case config.NetworkTypeTCPLocal: + n.trans = test.NewTCPTransportClient( + vID, latency, &jsonMarshaller{}, true) + case config.NetworkTypeTCP: + n.trans = test.NewTCPTransportClient( + vID, latency, &jsonMarshaller{}, false) + case config.NetworkTypeFake: + n.trans = test.NewFakeTransportClient(vID, latency) + default: + panic(fmt.Errorf("unknown network type: %v", cfg.Type)) + } + return } -// PeerServerNetwork is the interface for peerServer network related functions -type PeerServerNetwork interface { - DeliverBlocks(blocks BlockList) - NotifyServer(msg Message) - GetServerInfo() InfoMessage +// BroadcastVote implements core.Network interface. +func (n *network) BroadcastVote(vote *types.Vote) { + if err := n.trans.Broadcast(vote); err != nil { + panic(err) + } +} + +// BroadcastBlock implements core.Network interface. +func (n *network) BroadcastBlock(block *types.Block) { + if err := n.trans.Broadcast(block); err != nil { + panic(err) + } +} + +// BroadcastNotaryAck implements core.Network interface. +func (n *network) BroadcastNotaryAck(notaryAck *types.NotaryAck) { + if err := n.trans.Broadcast(notaryAck); err != nil { + panic(err) + } +} + +// ReceiveChan implements core.Network interface. +func (n *network) ReceiveChan() <-chan interface{} { + return n.toConsensus +} + +// receiveChanForValidator returns a channel for validators' specific +// messages. +func (n *network) receiveChanForValidator() <-chan interface{} { + return n.toValidator +} + +// setup transport layer. +func (n *network) setup(serverEndpoint interface{}) (err error) { + // Join the p2p network. + switch n.cfg.Type { + case config.NetworkTypeTCP, config.NetworkTypeTCPLocal: + addr := net.JoinHostPort(n.cfg.PeerServer, strconv.Itoa(peerPort)) + n.fromTransport, err = n.trans.Join(addr) + case config.NetworkTypeFake: + n.fromTransport, err = n.trans.Join(serverEndpoint) + default: + err = fmt.Errorf("unknown network type: %v", n.cfg.Type) + } + if err != nil { + return + } + return +} + +// run the main loop. +func (n *network) run() { + // The dispatcher declararion: + // to consensus or validator, that's the question. + disp := func(e *test.TransportEnvelope) { + switch e.Msg.(type) { + case *types.Block, *types.Vote, *types.NotaryAck: + n.toConsensus <- e.Msg + default: + n.toValidator <- e.Msg + } + } +MainLoop: + for { + select { + case <-n.ctx.Done(): + break MainLoop + default: + } + select { + case <-n.ctx.Done(): + break MainLoop + case e, ok := <-n.fromTransport: + if !ok { + break MainLoop + } + disp(e) + } + } +} + +// Close stop the network. +func (n *network) Close() (err error) { + n.ctxCancel() + close(n.toConsensus) + n.toConsensus = nil + close(n.toValidator) + n.toValidator = nil + if err = n.trans.Close(); err != nil { + return + } + return +} + +// report exports 'Report' method of test.TransportClient. +func (n *network) report(msg interface{}) error { + return n.trans.Report(msg) +} + +// peers exports 'Peers' method of test.Transport. +func (n *network) peers() map[types.ValidatorID]struct{} { + return n.trans.Peers() } diff --git a/simulation/peer-server.go b/simulation/peer-server.go index 6567639..5b43be4 100644 --- a/simulation/peer-server.go +++ b/simulation/peer-server.go @@ -21,275 +21,163 @@ import ( "context" "encoding/json" "fmt" - "io/ioutil" "log" - "net" - "net/http" + "reflect" "sync" - "time" + "github.com/dexon-foundation/dexon-consensus-core/core/test" "github.com/dexon-foundation/dexon-consensus-core/core/types" "github.com/dexon-foundation/dexon-consensus-core/simulation/config" ) -// PeerServer is the main object for maintaining peer list. +// PeerServer is the main object to collect results and monitor simulation. type PeerServer struct { - peers map[types.ValidatorID]string - peersMu sync.Mutex + peers map[types.ValidatorID]struct{} + msgChannel chan *test.TransportEnvelope + trans test.TransportServer peerTotalOrder PeerTotalOrder peerTotalOrderMu sync.Mutex verifiedLen uint64 + cfg *config.Config + ctx context.Context + ctxCancel context.CancelFunc } -// NewPeerServer returns a new peer server. +// NewPeerServer returns a new PeerServer instance. func NewPeerServer() *PeerServer { + ctx, cancel := context.WithCancel(context.Background()) return &PeerServer{ - peers: make(map[types.ValidatorID]string), + peers: make(map[types.ValidatorID]struct{}), peerTotalOrder: make(PeerTotalOrder), + ctx: ctx, + ctxCancel: cancel, } } // isValidator checks if vID is in p.peers. If peer server restarts but // validators are not, it will cause panic if validators send message. func (p *PeerServer) isValidator(vID types.ValidatorID) bool { - p.peersMu.Lock() - defer p.peersMu.Unlock() _, exist := p.peers[vID] return exist } -// Run starts the peer server. -func (p *PeerServer) Run(configPath string) { - cfg, err := config.Read(configPath) - if err != nil { - panic(err) - } - - resetHandler := func(w http.ResponseWriter, r *http.Request) { - p.peersMu.Lock() - defer p.peersMu.Unlock() - - p.peers = make(map[types.ValidatorID]string) - log.Printf("Peer server has been reset.") - } - - joinHandler := func(w http.ResponseWriter, r *http.Request) { - defer r.Body.Close() - - idString := r.Header.Get("ID") - portString := r.Header.Get("PORT") - - id := types.ValidatorID{} - id.UnmarshalText([]byte(idString)) - - p.peersMu.Lock() - defer p.peersMu.Unlock() - - host, _, _ := net.SplitHostPort(r.RemoteAddr) - p.peers[id] = net.JoinHostPort(host, portString) - p.peerTotalOrder[id] = NewTotalOrderResult(id) - log.Printf("Peer %s joined from %s", id, p.peers[id]) - - if len(p.peers) == cfg.Validator.Num { - log.Println("All peers connected.") - } - w.WriteHeader(http.StatusOK) - } - - peersHandler := func(w http.ResponseWriter, r *http.Request) { - p.peersMu.Lock() - defer p.peersMu.Unlock() - defer r.Body.Close() - - if len(p.peers) != cfg.Validator.Num { - w.WriteHeader(http.StatusNotFound) - return - } - - jsonText, err := json.Marshal(p.peers) - if err != nil { - w.WriteHeader(http.StatusInternalServerError) - return - } +// handleBlockList is the handler for messages with BlockList as payload. +func (p *PeerServer) handleBlockList(id types.ValidatorID, blocks *BlockList) { + p.peerTotalOrderMu.Lock() + defer p.peerTotalOrderMu.Unlock() - w.WriteHeader(http.StatusOK) - w.Header().Set("Content-Type", "application/json") - w.Write(jsonText) + readyForVerify := p.peerTotalOrder[id].PushBlocks(*blocks) + if !readyForVerify { + return } - - infoHandler := func(w http.ResponseWriter, r *http.Request) { - p.peersMu.Lock() - defer p.peersMu.Unlock() - defer r.Body.Close() - - msg := InfoMessage{ - Status: statusNormal, - Peers: p.peers, - } - - if len(p.peers) < cfg.Validator.Num { - msg.Status = statusInit - } - - // Determine msg.status. - if p.verifiedLen >= cfg.Validator.MaxBlock { - msg.Status = statusShutdown - } - - jsonText, err := json.Marshal(msg) - if err != nil { - w.WriteHeader(http.StatusInternalServerError) - return - } - - w.WriteHeader(http.StatusOK) - w.Header().Set("Content-Type", "application/json") - w.Write(jsonText) - } - - deliveryHandler := func(w http.ResponseWriter, r *http.Request) { - defer r.Body.Close() - - idString := r.Header.Get("ID") - body, err := ioutil.ReadAll(r.Body) - if err != nil { - w.WriteHeader(http.StatusBadRequest) - return - } - - m := BlockList{} - if err := json.Unmarshal(body, &m); err != nil { - w.WriteHeader(http.StatusBadRequest) - return - } - - id := types.ValidatorID{} - if err := id.UnmarshalText([]byte(idString)); err != nil { - w.WriteHeader(http.StatusBadRequest) - return - } - - if !p.isValidator(id) { - w.WriteHeader(http.StatusForbidden) - return - } - - w.WriteHeader(http.StatusOK) - + // Verify the total order result. + go func(id types.ValidatorID) { p.peerTotalOrderMu.Lock() defer p.peerTotalOrderMu.Unlock() - readyForVerify := p.peerTotalOrder[id].PushBlocks(m) - if !readyForVerify { - return + var correct bool + var length int + p.peerTotalOrder, correct, length = VerifyTotalOrder(id, p.peerTotalOrder) + if !correct { + log.Printf("The result of Total Ordering Algorithm has error.\n") } - - // Verify the total order result. - go func(id types.ValidatorID) { - p.peerTotalOrderMu.Lock() - defer p.peerTotalOrderMu.Unlock() - - var correct bool - var length int - p.peerTotalOrder, correct, length = VerifyTotalOrder(id, p.peerTotalOrder) - if !correct { - log.Printf("The result of Total Ordering Algorithm has error.\n") + p.verifiedLen += uint64(length) + if p.verifiedLen >= p.cfg.Validator.MaxBlock { + if err := p.trans.Broadcast(statusShutdown); err != nil { + panic(err) } - p.verifiedLen += uint64(length) - }(id) - } - - stopServer := make(chan struct{}) - - messageHandler := func(w http.ResponseWriter, r *http.Request) { - defer r.Body.Close() - - idString := r.Header.Get("ID") - id := types.ValidatorID{} - id.UnmarshalText([]byte(idString)) - - if !p.isValidator(id) { - w.WriteHeader(http.StatusForbidden) - return } + }(id) +} - body, err := ioutil.ReadAll(r.Body) - if err != nil { - w.WriteHeader(http.StatusBadRequest) - return +// handleMessage is the handler for messages with Message as payload. +func (p *PeerServer) handleMessage(id types.ValidatorID, m *message) { + switch m.Type { + case shutdownAck: + delete(p.peers, id) + log.Printf("%v shutdown, %d remains.\n", id, len(p.peers)) + if len(p.peers) == 0 { + p.ctxCancel() + } + case blockTimestamp: + msgs := []timestampMessage{} + if err := json.Unmarshal(m.Payload, &msgs); err != nil { + panic(err) + } + for _, msg := range msgs { + if ok := p.peerTotalOrder[id].PushTimestamp(msg); !ok { + panic(fmt.Errorf("unable to push timestamp: %v", m)) + } } + default: + panic(fmt.Errorf("unknown simulation message type: %v", m)) + } +} - m := Message{} - if err := json.Unmarshal(body, &m); err != nil { - w.WriteHeader(http.StatusBadRequest) +func (p *PeerServer) mainLoop() { + for { + select { + case <-p.ctx.Done(): return + default: } - - switch m.Type { - case shutdownAck: - func() { - p.peersMu.Lock() - defer p.peersMu.Unlock() - - delete(p.peers, id) - log.Printf("%v shutdown, %d remains.\n", id, len(p.peers)) - if len(p.peers) == 0 { - stopServer <- struct{}{} - } - }() - break - case blockTimestamp: - msgs := []TimestampMessage{} - if err := json.Unmarshal(m.Payload, &msgs); err != nil { - w.WriteHeader(http.StatusBadRequest) - return + select { + case <-p.ctx.Done(): + return + case e := <-p.msgChannel: + if !p.isValidator(e.From) { + break } - for _, msg := range msgs { - if ok := p.peerTotalOrder[id].PushTimestamp(msg); !ok { - w.WriteHeader(http.StatusBadRequest) - return - } + // Handle messages based on their type. + switch val := e.Msg.(type) { + case *BlockList: + p.handleBlockList(e.From, val) + case *message: + p.handleMessage(e.From, val) + default: + panic(fmt.Errorf("unknown message: %v", reflect.TypeOf(e.Msg))) } - break - default: - w.WriteHeader(http.StatusBadRequest) - return } - w.WriteHeader(http.StatusOK) } +} - http.HandleFunc("/reset", resetHandler) - http.HandleFunc("/join", joinHandler) - http.HandleFunc("/peers", peersHandler) - http.HandleFunc("/info", infoHandler) - http.HandleFunc("/delivery", deliveryHandler) - http.HandleFunc("/message", messageHandler) - - addr := fmt.Sprintf("0.0.0.0:%d", peerPort) - log.Printf("Peer server started at %s", addr) - - server := &http.Server{Addr: addr} - - go func() { - <-stopServer - - LogStatus(p.peerTotalOrder) - - log.Printf("Shutting down peerServer.\n") - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - if err := server.Shutdown(ctx); err != nil { - log.Printf("Error shutting down peerServer: %v\n", err) - } - }() - - if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { - log.Fatalf("Error starting server %v\n", err) +// Setup prepares simualtion. +func (p *PeerServer) Setup( + cfg *config.Config) (serverEndpoint interface{}, err error) { + // Setup transport layer. + switch cfg.Networking.Type { + case "tcp", "tcp-local": + p.trans = test.NewTCPTransportServer(&jsonMarshaller{}, peerPort) + case "fake": + p.trans = test.NewFakeTransportServer() + default: + panic(fmt.Errorf("unknown network type: %v", cfg.Networking.Type)) + } + p.msgChannel, err = p.trans.Host() + if err != nil { + return } + p.cfg = cfg + serverEndpoint = p.msgChannel + return +} - // Do not exit when we are in TCP node, since k8s will restart the pod and - // cause confusions. - if cfg.Networking.Type == config.NetworkTypeTCP { - select {} +// Run the simulation. +func (p *PeerServer) Run() { + if err := p.trans.WaitForPeers(p.cfg.Validator.Num); err != nil { + panic(err) + } + // Cache peers' info. + p.peers = p.trans.Peers() + // Initialize total order result cache. + for id := range p.peers { + p.peerTotalOrder[id] = NewTotalOrderResult(id) + } + // Block to handle incoming messages. + p.mainLoop() + // The simulation is done, clean up. + LogStatus(p.peerTotalOrder) + if err := p.trans.Close(); err != nil { + log.Printf("Error shutting down peerServer: %v\n", err) } } diff --git a/simulation/simulation.go b/simulation/simulation.go index 978107a..74a758d 100644 --- a/simulation/simulation.go +++ b/simulation/simulation.go @@ -18,7 +18,6 @@ package simulation import ( - "fmt" "sync" "github.com/dexon-foundation/dexon-consensus-core/crypto/eth" @@ -26,63 +25,51 @@ import ( ) // Run starts the simulation. -func Run(configPath string, legacy bool) { - cfg, err := config.Read(configPath) - if err != nil { - panic(err) - } - - networkType := cfg.Networking.Type - +func Run(cfg *config.Config, legacy bool) { var ( - vs []*Validator - networkModel = &NormalNetwork{ - Sigma: cfg.Networking.Sigma, - Mean: cfg.Networking.Mean, - LossRateValue: cfg.Networking.LossRateValue, - } + networkType = cfg.Networking.Type + server *PeerServer + wg sync.WaitGroup + err error ) - if networkType == config.NetworkTypeTCPLocal { - lock := sync.Mutex{} - wg := sync.WaitGroup{} - for i := 0; i < cfg.Validator.Num; i++ { - prv, err := eth.NewPrivateKey() - if err != nil { - panic(err) - } - wg.Add(1) - go func() { - network := NewTCPNetwork(true, cfg.Networking.PeerServer, networkModel) - network.Start() - lock.Lock() - defer lock.Unlock() - vs = append(vs, NewValidator(prv, eth.SigToPub, cfg.Validator, network)) - wg.Done() - }() - } - wg.Wait() - - for i := 0; i < cfg.Validator.Num; i++ { - fmt.Printf("Validator %d: %s\n", i, vs[i].ID) - go vs[i].Run(legacy) - } - } else if networkType == config.NetworkTypeTCP { + // init is a function to init a validator. + init := func(serverEndpoint interface{}) { prv, err := eth.NewPrivateKey() if err != nil { panic(err) } - network := NewTCPNetwork(false, cfg.Networking.PeerServer, networkModel) - network.Start() - v := NewValidator(prv, eth.SigToPub, cfg.Validator, network) - go v.Run(legacy) - vs = append(vs, v) + v := newValidator(prv, eth.SigToPub, *cfg) + wg.Add(1) + go func() { + defer wg.Done() + v.run(serverEndpoint, legacy) + }() } - for _, v := range vs { - v.Wait() - fmt.Printf("Validator %s is shutdown\n", v.GetID()) + switch networkType { + case config.NetworkTypeTCP: + // Intialized a simulation on multiple remotely peers. + // The peer-server would be initialized with another command. + init(nil) + case config.NetworkTypeTCPLocal, config.NetworkTypeFake: + // Initialize a local simulation with a peer server. + var serverEndpoint interface{} + server = NewPeerServer() + if serverEndpoint, err = server.Setup(cfg); err != nil { + panic(err) + } + wg.Add(1) + go func() { + defer wg.Done() + server.Run() + }() + // Initialize all validators. + for i := 0; i < cfg.Validator.Num; i++ { + init(serverEndpoint) + } } + wg.Wait() // Do not exit when we are in TCP node, since k8s will restart the pod and // cause confusions. diff --git a/simulation/tcp-network.go b/simulation/tcp-network.go deleted file mode 100644 index 468baff..0000000 --- a/simulation/tcp-network.go +++ /dev/null @@ -1,457 +0,0 @@ -// Copyright 2018 The dexon-consensus-core Authors -// This file is part of the dexon-consensus-core library. -// -// The dexon-consensus-core library is free software: you can redistribute it -// and/or modify it under the terms of the GNU Lesser General Public License as -// published by the Free Software Foundation, either version 3 of the License, -// or (at your option) any later version. -// -// The dexon-consensus-core library is distributed in the hope that it will be -// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser -// General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the dexon-consensus-core library. If not, see -// <http://www.gnu.org/licenses/>. - -package simulation - -import ( - "context" - "encoding/json" - "fmt" - "io" - "io/ioutil" - "math/rand" - "net" - "net/http" - "os" - "runtime" - "strconv" - "strings" - "sync" - "syscall" - "time" - - "github.com/dexon-foundation/dexon-consensus-core/core/types" -) - -const retries = 5 - -// TCPNetwork implements the Network interface. -type TCPNetwork struct { - local bool - port int - endpoint Endpoint - client *http.Client - - peerServer string - endpointMutex sync.RWMutex - endpoints map[types.ValidatorID]string - recieveChan chan interface{} - model Model -} - -// NewTCPNetwork returns pointer to a new Network instance. -func NewTCPNetwork(local bool, peerServer string, model Model) *TCPNetwork { - pServer := peerServer - if local { - pServer = "127.0.0.1" - } - // Force connection reuse. - tr := &http.Transport{ - MaxIdleConnsPerHost: 1024, - TLSHandshakeTimeout: 0 * time.Second, - } - client := &http.Client{ - Transport: tr, - Timeout: 5 * time.Second, - } - return &TCPNetwork{ - local: local, - peerServer: pServer, - client: client, - endpoints: make(map[types.ValidatorID]string), - recieveChan: make(chan interface{}, msgBufferSize), - model: model, - } -} - -// Start starts the http server for accepting message. -func (n *TCPNetwork) Start() { - listenSuccess := make(chan struct{}) - go func() { - for { - ctx, cancel := context.WithTimeout(context.Background(), - 50*time.Millisecond) - defer cancel() - go func() { - <-ctx.Done() - if ctx.Err() != context.Canceled { - listenSuccess <- struct{}{} - } - }() - port := 1024 + rand.Int()%1024 - if !n.local { - port = peerPort - } - addr := net.JoinHostPort("0.0.0.0", strconv.Itoa(port)) - server := &http.Server{ - Addr: addr, - Handler: n, - } - - n.port = port - if err := server.ListenAndServe(); err != nil { - cancel() - if err == http.ErrServerClosed { - break - } - if !n.local { - panic(err) - } - // In local-tcp, retry with other port. - operr, ok := err.(*net.OpError) - if !ok { - panic(err) - } - oserr, ok := operr.Err.(*os.SyscallError) - if !ok { - panic(operr) - } - errno, ok := oserr.Err.(syscall.Errno) - if !ok { - panic(oserr) - } - if errno != syscall.EADDRINUSE { - panic(errno) - } - } - } - }() - <-listenSuccess - fmt.Printf("Validator started at 0.0.0.0:%d\n", n.port) -} - -// NumPeers returns the number of peers in the network. -func (n *TCPNetwork) NumPeers() int { - n.endpointMutex.Lock() - defer n.endpointMutex.Unlock() - - return len(n.endpoints) -} - -// ServerHTTP implements the http.Handler interface. -func (n *TCPNetwork) ServeHTTP(w http.ResponseWriter, r *http.Request) { - defer r.Body.Close() - body, err := ioutil.ReadAll(r.Body) - if err != nil { - w.WriteHeader(http.StatusBadRequest) - return - } - - m := struct { - Type string `json:"type"` - Payload json.RawMessage `json:"payload"` - }{} - if err := json.Unmarshal(body, &m); err != nil { - w.WriteHeader(http.StatusBadRequest) - return - } - - switch m.Type { - case "block": - block := &types.Block{} - if err := json.Unmarshal(m.Payload, block); err != nil { - w.WriteHeader(http.StatusBadRequest) - return - } - n.recieveChan <- block - case "vote": - vote := &types.Vote{} - if err := json.Unmarshal(m.Payload, vote); err != nil { - w.WriteHeader(http.StatusBadRequest) - return - } - n.recieveChan <- vote - case "notaryAck": - ack := &types.NotaryAck{} - if err := json.Unmarshal(m.Payload, ack); err != nil { - w.WriteHeader(http.StatusBadRequest) - return - } - n.recieveChan <- ack - default: - w.WriteHeader(http.StatusBadRequest) - return - } -} - -// Join allow a client to join the network. It reutnrs a interface{} channel for -// the client to recieve information. -func (n *TCPNetwork) Join(endpoint Endpoint) { - n.endpointMutex.Lock() - defer n.endpointMutex.Unlock() - - n.endpoint = endpoint - - joinURL := fmt.Sprintf("http://%s:%d/join", n.peerServer, peerPort) - peersURL := fmt.Sprintf("http://%s:%d/peers", n.peerServer, peerPort) - - // Join the peer list. - for { - time.Sleep(time.Second) - - req, err := http.NewRequest(http.MethodGet, joinURL, nil) - if err != nil { - continue - } - req.Header.Add("ID", endpoint.GetID().String()) - req.Header.Add("PORT", fmt.Sprintf("%d", n.port)) - - resp, err := n.client.Do(req) - if err == nil { - defer resp.Body.Close() - io.Copy(ioutil.Discard, resp.Body) - } - if err == nil && resp.StatusCode == http.StatusOK { - break - } - } - - var peerList map[types.ValidatorID]string - - // Wait for the server to collect all validators and return a list. - for { - time.Sleep(time.Second) - - req, err := http.NewRequest(http.MethodGet, peersURL, nil) - if err != nil { - fmt.Println(err) - continue - } - resp, err := n.client.Do(req) - if err != nil || resp.StatusCode != http.StatusOK { - continue - } - - defer resp.Body.Close() - body, err := ioutil.ReadAll(resp.Body) - - if err := json.Unmarshal(body, &peerList); err != nil { - fmt.Printf("error: %v", err) - continue - } - break - } - - for key, val := range peerList { - n.endpoints[key] = val - } -} - -// ReceiveChan return the receive channel. -func (n *TCPNetwork) ReceiveChan() <-chan interface{} { - return n.recieveChan -} - -// Send sends a msg to another client. -func (n *TCPNetwork) Send(destID types.ValidatorID, messageJSON []byte) { - clientAddr, exists := n.endpoints[destID] - if !exists { - return - } - - msgURL := fmt.Sprintf("http://%s/msg", clientAddr) - go func() { - time.Sleep(n.model.Delay()) - for i := 0; i < retries; i++ { - req, err := http.NewRequest( - http.MethodPost, msgURL, strings.NewReader(string(messageJSON))) - if err != nil { - continue - } - req.Header.Add("ID", n.endpoint.GetID().String()) - - resp, err := n.client.Do(req) - if err == nil { - defer resp.Body.Close() - io.Copy(ioutil.Discard, resp.Body) - } - if err == nil && resp.StatusCode == http.StatusOK { - runtime.Goexit() - } - - fmt.Printf("failed to submit message: %s\n", err) - time.Sleep(1 * time.Second) - } - fmt.Printf("failed to send message: %v\n", string(messageJSON)) - }() -} - -func (n *TCPNetwork) marshalMessage(msg interface{}) (messageJSON []byte) { - message := struct { - Type string `json:"type"` - Payload interface{} `json:"payload"` - }{} - - switch v := msg.(type) { - case *types.Block: - message.Type = "block" - message.Payload = v - case *types.NotaryAck: - message.Type = "notaryAck" - message.Payload = v - case *types.Vote: - message.Type = "vote" - message.Payload = v - default: - fmt.Println("error: invalid message type") - return - } - - messageJSON, err := json.Marshal(message) - if err != nil { - fmt.Printf("error: failed to marshal json: %v\n%+v\n", err, message) - return - } - return -} - -// BroadcastBlock broadcast blocks into the network. -func (n *TCPNetwork) BroadcastBlock(block *types.Block) { - payload := n.marshalMessage(block) - for endpoint := range n.endpoints { - if endpoint == block.ProposerID { - continue - } - n.Send(endpoint, payload) - } -} - -// BroadcastNotaryAck broadcast notaryAck into the network. -func (n *TCPNetwork) BroadcastNotaryAck(notaryAck *types.NotaryAck) { - payload := n.marshalMessage(notaryAck) - for endpoint := range n.endpoints { - if endpoint == notaryAck.ProposerID { - continue - } - n.Send(endpoint, payload) - } -} - -// BroadcastVote broadcast vote into the network. -func (n *TCPNetwork) BroadcastVote(vote *types.Vote) { - payload := n.marshalMessage(vote) - for endpoint := range n.endpoints { - if endpoint == vote.ProposerID { - continue - } - n.Send(endpoint, payload) - } -} - -// DeliverBlocks sends blocks to peerServer. -func (n *TCPNetwork) DeliverBlocks(blocks BlockList) { - messageJSON, err := json.Marshal(blocks) - if err != nil { - fmt.Printf("error: failed to marshal json: %v\n%+v\n", err, blocks) - return - } - - msgURL := fmt.Sprintf("http://%s:%d/delivery", n.peerServer, peerPort) - - go func() { - for i := 0; i < retries; i++ { - req, err := http.NewRequest( - http.MethodPost, msgURL, strings.NewReader(string(messageJSON))) - if err != nil { - continue - } - req.Header.Add("ID", n.endpoint.GetID().String()) - - resp, err := n.client.Do(req) - if err == nil { - defer resp.Body.Close() - io.Copy(ioutil.Discard, resp.Body) - } - - if err == nil && resp.StatusCode == http.StatusOK { - runtime.Goexit() - } - time.Sleep(1 * time.Second) - } - fmt.Printf("failed to send message: %v\n", blocks) - }() -} - -// NotifyServer sends message to peerServer -func (n *TCPNetwork) NotifyServer(msg Message) { - messageJSON, err := json.Marshal(msg) - if err != nil { - fmt.Printf("error: failed to marshal json: %v\n%+v\n", err, msg) - return - } - - msgURL := fmt.Sprintf("http://%s:%d/message", n.peerServer, peerPort) - - for i := 0; i < retries; i++ { - req, err := http.NewRequest( - http.MethodPost, msgURL, strings.NewReader(string(messageJSON))) - if err != nil { - continue - } - req.Header.Add("ID", n.endpoint.GetID().String()) - - resp, err := n.client.Do(req) - if err == nil { - defer resp.Body.Close() - io.Copy(ioutil.Discard, resp.Body) - } - if err == nil && resp.StatusCode == http.StatusOK { - return - } - time.Sleep(1 * time.Second) - } - fmt.Printf("failed to send message: %v\n", msg) - - return -} - -// GetServerInfo retrieve the info message from peerServer. -func (n *TCPNetwork) GetServerInfo() InfoMessage { - infoMsg := InfoMessage{} - msgURL := fmt.Sprintf("http://%s:%d/info", n.peerServer, peerPort) - - req, err := http.NewRequest( - http.MethodGet, msgURL, nil) - if err != nil { - fmt.Printf("error: %v\n", err) - } - - resp, err := n.client.Do(req) - if err != nil { - fmt.Printf("error: %v\n", err) - return infoMsg - } - if resp.StatusCode != http.StatusOK { - fmt.Printf("error: %v\n", err) - } - defer resp.Body.Close() - body, err := ioutil.ReadAll(resp.Body) - - if err := json.Unmarshal(body, &infoMsg); err != nil { - fmt.Printf("error: %v", err) - } - return infoMsg -} - -// Endpoints returns all validatorIDs. -func (n *TCPNetwork) Endpoints() types.ValidatorIDs { - vIDs := make(types.ValidatorIDs, 0, len(n.endpoints)) - for vID := range n.endpoints { - vIDs = append(vIDs, vID) - } - return vIDs -} diff --git a/simulation/validator.go b/simulation/validator.go index 21b9db6..46d42d3 100644 --- a/simulation/validator.go +++ b/simulation/validator.go @@ -30,15 +30,14 @@ import ( "github.com/dexon-foundation/dexon-consensus-core/simulation/config" ) -// Validator represents a validator in DexCon. -type Validator struct { - network Network - app *simApp - gov *simGovernance - db blockdb.BlockDatabase +// validator represents a validator in DexCon. +type validator struct { + app *simApp + gov *simGovernance + db blockdb.BlockDatabase config config.Validator - msgChannel <-chan interface{} + netModule *network isFinished chan struct{} ID types.ValidatorID @@ -49,46 +48,50 @@ type Validator struct { compactionChain *core.BlockChain } -// NewValidator returns a new empty validator. -func NewValidator( +// newValidator returns a new empty validator. +func newValidator( prvKey crypto.PrivateKey, sigToPub core.SigToPubFn, - config config.Validator, - network Network) *Validator { + config config.Config) *validator { id := types.NewValidatorID(prvKey.PublicKey()) - + netModule := newNetwork(id, config.Networking) db, err := blockdb.NewMemBackedBlockDB( id.String() + ".blockdb") if err != nil { panic(err) } - gov := newSimGovernance(config.Num, config.Consensus) - return &Validator{ + gov := newSimGovernance(config.Validator.Num, config.Validator.Consensus) + return &validator{ ID: id, prvKey: prvKey, sigToPub: sigToPub, - config: config, - network: network, - app: newSimApp(id, network), + config: config.Validator, + app: newSimApp(id, netModule), gov: gov, db: db, + netModule: netModule, isFinished: make(chan struct{}), } } // GetID returns the ID of validator. -func (v *Validator) GetID() types.ValidatorID { +func (v *validator) GetID() types.ValidatorID { return v.ID } -// Run starts the validator. -func (v *Validator) Run(legacy bool) { - v.network.Join(v) - v.msgChannel = v.network.ReceiveChan() - - hashes := make(common.Hashes, 0, v.network.NumPeers()) - for _, vID := range v.network.Endpoints() { +// run starts the validator. +func (v *validator) run(serverEndpoint interface{}, legacy bool) { + // Run network. + if err := v.netModule.setup(serverEndpoint); err != nil { + panic(err) + } + msgChannel := v.netModule.receiveChanForValidator() + peers := v.netModule.peers() + go v.netModule.run() + // Run consensus. + hashes := make(common.Hashes, 0, len(peers)) + for vID := range peers { v.gov.addValidator(vID) hashes = append(hashes, vID.Hash) } @@ -101,15 +104,16 @@ func (v *Validator) Run(legacy bool) { } if legacy { v.consensus = core.NewConsensus( - v.app, v.gov, v.db, v.network, + v.app, v.gov, v.db, v.netModule, time.NewTicker( - time.Duration(v.config.Legacy.ProposeIntervalMean)*time.Millisecond), + time.Duration( + v.config.Legacy.ProposeIntervalMean)*time.Millisecond), v.prvKey, v.sigToPub) go v.consensus.RunLegacy() } else { v.consensus = core.NewConsensus( - v.app, v.gov, v.db, v.network, + v.app, v.gov, v.db, v.netModule, time.NewTicker( time.Duration(v.config.Lambda)*time.Millisecond), v.prvKey, v.sigToPub) @@ -117,36 +121,28 @@ func (v *Validator) Run(legacy bool) { go v.consensus.Run() } - isShutdown := make(chan struct{}) - - go v.CheckServerInfo(isShutdown) - // Blocks forever. - <-isShutdown +MainLoop: + for { + msg := <-msgChannel + switch val := msg.(type) { + case infoStatus: + if val == statusShutdown { + break MainLoop + } + default: + panic(fmt.Errorf("unexpected message from server: %v", val)) + } + } + // Cleanup. v.consensus.Stop() if err := v.db.Close(); err != nil { fmt.Println(err) } - v.network.NotifyServer(Message{ + v.netModule.report(&message{ Type: shutdownAck, }) - v.isFinished <- struct{}{} -} - -// Wait for the validator to stop (if peerServer told it to). -func (v *Validator) Wait() { - <-v.isFinished -} - -// CheckServerInfo will check the info from the peerServer and update -// validator's status if needed. -func (v *Validator) CheckServerInfo(isShutdown chan struct{}) { - for { - infoMsg := v.network.GetServerInfo() - if infoMsg.Status == statusShutdown { - isShutdown <- struct{}{} - break - } - time.Sleep(250 * time.Millisecond) - } + // TODO(mission): once we have a way to know if consensus is stopped, stop + // the network module. + return } diff --git a/simulation/verification.go b/simulation/verification.go index 2e90b8b..0ae7450 100644 --- a/simulation/verification.go +++ b/simulation/verification.go @@ -99,7 +99,7 @@ func (totalOrder *TotalOrderResult) PushBlocks(blocks BlockList) (ready bool) { } // PushTimestamp log the information in the msg. -func (totalOrder *TotalOrderResult) PushTimestamp(msg TimestampMessage) bool { +func (totalOrder *TotalOrderResult) PushTimestamp(msg timestampMessage) bool { pushLatency := func(latency *[]time.Duration, t1, t2 time.Time) { *latency = append(*latency, t2.Sub(t1)) } |