From cd9f3011f58af965b910c0a1b0e27b22ccb30f23 Mon Sep 17 00:00:00 2001 From: Mission Liao Date: Mon, 5 Nov 2018 18:02:53 +0800 Subject: test: move simulation.network to test package (#297) --- core/test/marshaller.go | 147 ++++++++++++++++++++ core/test/network.go | 293 ++++++++++++++++++++++++++++++++++++++++ simulation/app.go | 25 +++- simulation/config/config.go | 15 +-- simulation/governance.go | 11 +- simulation/marshaller.go | 93 ++----------- simulation/network.go | 322 -------------------------------------------- simulation/node.go | 52 +++++-- simulation/simulation.go | 7 +- 9 files changed, 524 insertions(+), 441 deletions(-) create mode 100644 core/test/marshaller.go create mode 100644 core/test/network.go delete mode 100644 simulation/network.go diff --git a/core/test/marshaller.go b/core/test/marshaller.go new file mode 100644 index 0000000..fc42639 --- /dev/null +++ b/core/test/marshaller.go @@ -0,0 +1,147 @@ +// Copyright 2018 The dexon-consensus Authors +// This file is part of the dexon-consensus library. +// +// The dexon-consensus library is free software: you can redistribute it +// and/or modify it under the terms of the GNU Lesser General Public License as +// published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The dexon-consensus library is distributed in the hope that it will be +// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the dexon-consensus library. If not, see +// . + +package test + +import ( + "encoding/json" + "fmt" + + "github.com/dexon-foundation/dexon-consensus/core/types" + typesDKG "github.com/dexon-foundation/dexon-consensus/core/types/dkg" +) + +// DefaultMarshaller is the default marshaller for testing core.Consensus. +type DefaultMarshaller struct { + fallback Marshaller +} + +// NewDefaultMarshaller constructs an DefaultMarshaller instance. +func NewDefaultMarshaller(fallback Marshaller) *DefaultMarshaller { + return &DefaultMarshaller{ + fallback: fallback, + } +} + +// Unmarshal implements Marshaller interface. +func (m *DefaultMarshaller) Unmarshal( + msgType string, payload []byte) (msg interface{}, err error) { + switch msgType { + case "block": + block := &types.Block{} + if err = json.Unmarshal(payload, block); err != nil { + break + } + msg = block + case "vote": + vote := &types.Vote{} + if err = json.Unmarshal(payload, vote); err != nil { + break + } + msg = vote + case "block-randomness-request": + request := &types.AgreementResult{} + if err = json.Unmarshal(payload, request); err != nil { + break + } + msg = request + case "block-randomness-result": + result := &types.BlockRandomnessResult{} + if err = json.Unmarshal(payload, result); err != nil { + break + } + msg = result + case "dkg-private-share": + privateShare := &typesDKG.PrivateShare{} + if err = json.Unmarshal(payload, privateShare); err != nil { + break + } + msg = privateShare + case "dkg-master-public-key": + masterPublicKey := typesDKG.NewMasterPublicKey() + if err = json.Unmarshal(payload, masterPublicKey); err != nil { + break + } + msg = masterPublicKey + case "dkg-complaint": + complaint := &typesDKG.Complaint{} + if err = json.Unmarshal(payload, complaint); err != nil { + break + } + msg = complaint + case "dkg-partial-signature": + psig := &typesDKG.PartialSignature{} + if err = json.Unmarshal(payload, psig); err != nil { + break + } + msg = psig + case "dkg-finalize": + final := &typesDKG.Finalize{} + if err = json.Unmarshal(payload, final); err != nil { + break + } + msg = final + default: + if m.fallback == nil { + err = fmt.Errorf("unknown msg type: %v", msgType) + break + } + msg, err = m.fallback.Unmarshal(msgType, payload) + } + return +} + +// Marshal implements Marshaller interface. +func (m *DefaultMarshaller) Marshal( + msg interface{}) (msgType string, payload []byte, err error) { + switch msg.(type) { + case *types.Block: + msgType = "block" + payload, err = json.Marshal(msg) + case *types.Vote: + msgType = "vote" + payload, err = json.Marshal(msg) + case *types.AgreementResult: + msgType = "block-randomness-request" + payload, err = json.Marshal(msg) + case *types.BlockRandomnessResult: + msgType = "block-randomness-result" + payload, err = json.Marshal(msg) + case *typesDKG.PrivateShare: + msgType = "dkg-private-share" + payload, err = json.Marshal(msg) + case *typesDKG.MasterPublicKey: + msgType = "dkg-master-public-key" + payload, err = json.Marshal(msg) + case *typesDKG.Complaint: + msgType = "dkg-complaint" + payload, err = json.Marshal(msg) + case *typesDKG.PartialSignature: + msgType = "dkg-partial-signature" + payload, err = json.Marshal(msg) + case *typesDKG.Finalize: + msgType = "dkg-finalize" + payload, err = json.Marshal(msg) + default: + if m.fallback == nil { + err = fmt.Errorf("unknwon message type: %v", msg) + break + } + msgType, payload, err = m.fallback.Marshal(msg) + } + return +} diff --git a/core/test/network.go b/core/test/network.go new file mode 100644 index 0000000..00c60d9 --- /dev/null +++ b/core/test/network.go @@ -0,0 +1,293 @@ +// Copyright 2018 The dexon-consensus Authors +// This file is part of the dexon-consensus library. +// +// The dexon-consensus library is free software: you can redistribute it +// and/or modify it under the terms of the GNU Lesser General Public License as +// published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The dexon-consensus library is distributed in the hope that it will be +// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the dexon-consensus library. If not, see +// . + +package test + +import ( + "context" + "fmt" + "net" + "strconv" + + "github.com/dexon-foundation/dexon-consensus/common" + "github.com/dexon-foundation/dexon-consensus/core/crypto" + "github.com/dexon-foundation/dexon-consensus/core/types" + typesDKG "github.com/dexon-foundation/dexon-consensus/core/types/dkg" +) + +// NetworkType is the simulation network type. +type NetworkType string + +// NetworkType enums. +const ( + NetworkTypeTCP NetworkType = "tcp" + NetworkTypeTCPLocal NetworkType = "tcp-local" + NetworkTypeFake NetworkType = "fake" +) + +// NetworkConfig is the configuration for Network module. +type NetworkConfig struct { + Type NetworkType + PeerServer string + PeerPort int +} + +// Network implements core.Network interface based on TransportClient. +type Network struct { + config NetworkConfig + ctx context.Context + ctxCancel context.CancelFunc + trans TransportClient + fromTransport <-chan *TransportEnvelope + toConsensus chan interface{} + toNode chan interface{} + sentRandomness map[common.Hash]struct{} + sentAgreement map[common.Hash]struct{} + blockCache map[common.Hash]*types.Block +} + +// NewNetwork setup network stuffs for nodes, which provides an +// implementation of core.Network based on TransportClient. +func NewNetwork(pubKey crypto.PublicKey, latency LatencyModel, + marshaller Marshaller, config NetworkConfig) (n *Network) { + // Construct basic network instance. + n = &Network{ + config: config, + toConsensus: make(chan interface{}, 1000), + toNode: make(chan interface{}, 1000), + sentRandomness: make(map[common.Hash]struct{}), + sentAgreement: make(map[common.Hash]struct{}), + blockCache: make(map[common.Hash]*types.Block), + } + n.ctx, n.ctxCancel = context.WithCancel(context.Background()) + // Construct transport layer. + switch config.Type { + case NetworkTypeTCPLocal: + n.trans = NewTCPTransportClient(pubKey, latency, marshaller, true) + case NetworkTypeTCP: + n.trans = NewTCPTransportClient(pubKey, latency, marshaller, false) + case NetworkTypeFake: + n.trans = NewFakeTransportClient(pubKey, latency) + default: + panic(fmt.Errorf("unknown network type: %v", config.Type)) + } + return +} + +// PullBlocks implements core.Network interface. +func (n *Network) PullBlocks(hashes common.Hashes) { + go func() { + for _, hash := range hashes { + // TODO(jimmy-dexon): request block from network instead of cache. + if block, exist := n.blockCache[hash]; exist { + n.toConsensus <- block + continue + } + panic(fmt.Errorf("unknown block %s requested", hash)) + } + }() +} + +// PullVotes implements core.Network interface. +func (n *Network) PullVotes(pos types.Position) { + // TODO(jimmy-dexon): implement this. +} + +// BroadcastVote implements core.Network interface. +func (n *Network) BroadcastVote(vote *types.Vote) { + if err := n.trans.Broadcast(vote); err != nil { + panic(err) + } +} + +// BroadcastBlock implements core.Network interface. +func (n *Network) BroadcastBlock(block *types.Block) { + if err := n.trans.Broadcast(block); err != nil { + panic(err) + } +} + +// BroadcastAgreementResult implements core.Network interface. +func (n *Network) BroadcastAgreementResult( + randRequest *types.AgreementResult) { + if _, exist := n.sentAgreement[randRequest.BlockHash]; exist { + return + } + if len(n.sentAgreement) > 1000 { + // Randomly drop one entry. + for k := range n.sentAgreement { + delete(n.sentAgreement, k) + break + } + } + n.sentAgreement[randRequest.BlockHash] = struct{}{} + if err := n.trans.Broadcast(randRequest); err != nil { + panic(err) + } +} + +// BroadcastRandomnessResult implements core.Network interface. +func (n *Network) BroadcastRandomnessResult( + randResult *types.BlockRandomnessResult) { + if _, exist := n.sentRandomness[randResult.BlockHash]; exist { + return + } + if len(n.sentRandomness) > 1000 { + // Randomly drop one entry. + for k := range n.sentRandomness { + delete(n.sentRandomness, k) + break + } + } + n.sentRandomness[randResult.BlockHash] = struct{}{} + if err := n.trans.Broadcast(randResult); err != nil { + panic(err) + } +} + +// broadcast message to all other nodes in the network. +func (n *Network) broadcast(message interface{}) { + if err := n.trans.Broadcast(message); err != nil { + panic(err) + } +} + +// SendDKGPrivateShare implements core.Network interface. +func (n *Network) SendDKGPrivateShare( + recv crypto.PublicKey, prvShare *typesDKG.PrivateShare) { + if err := n.trans.Send(types.NewNodeID(recv), prvShare); err != nil { + panic(err) + } +} + +// BroadcastDKGPrivateShare implements core.Network interface. +func (n *Network) BroadcastDKGPrivateShare( + prvShare *typesDKG.PrivateShare) { + if err := n.trans.Broadcast(prvShare); err != nil { + panic(err) + } +} + +// BroadcastDKGPartialSignature implements core.Network interface. +func (n *Network) BroadcastDKGPartialSignature( + psig *typesDKG.PartialSignature) { + if err := n.trans.Broadcast(psig); err != nil { + panic(err) + } +} + +// ReceiveChan implements core.Network interface. +func (n *Network) ReceiveChan() <-chan interface{} { + return n.toConsensus +} + +// Setup transport layer. +func (n *Network) Setup(serverEndpoint interface{}) (err error) { + // Join the p2p network. + switch n.config.Type { + case NetworkTypeTCP, NetworkTypeTCPLocal: + addr := net.JoinHostPort( + n.config.PeerServer, strconv.Itoa(n.config.PeerPort)) + n.fromTransport, err = n.trans.Join(addr) + case NetworkTypeFake: + n.fromTransport, err = n.trans.Join(serverEndpoint) + default: + err = fmt.Errorf("unknown network type: %v", n.config.Type) + } + if err != nil { + return + } + return +} + +func (n *Network) msgHandler(e *TransportEnvelope) { + switch v := e.Msg.(type) { + case *types.Block: + if len(n.blockCache) > 500 { + // Randomly purge one block from cache. + for k := range n.blockCache { + delete(n.blockCache, k) + break + } + } + n.blockCache[v.Hash] = v + n.toConsensus <- e.Msg + case *types.Vote, *types.AgreementResult, *types.BlockRandomnessResult, + *typesDKG.PrivateShare, *typesDKG.PartialSignature: + n.toConsensus <- e.Msg + default: + n.toNode <- e.Msg + } +} + +// Run the main loop. +func (n *Network) Run() { +Loop: + for { + select { + case <-n.ctx.Done(): + break Loop + default: + } + select { + case <-n.ctx.Done(): + break Loop + case e, ok := <-n.fromTransport: + if !ok { + break Loop + } + n.msgHandler(e) + } + } +} + +// Close stops the network. +func (n *Network) Close() (err error) { + n.ctxCancel() + close(n.toConsensus) + n.toConsensus = nil + close(n.toNode) + n.toNode = nil + if err = n.trans.Close(); err != nil { + return + } + return +} + +// Report exports 'Report' method of TransportClient. +func (n *Network) Report(msg interface{}) error { + return n.trans.Report(msg) +} + +// Peers exports 'Peers' method of Transport. +func (n *Network) Peers() []crypto.PublicKey { + return n.trans.Peers() +} + +// Broadcast exports 'Broadcast' method of Transport, and would panic when +// error. +func (n *Network) Broadcast(msg interface{}) { + if err := n.trans.Broadcast(msg); err != nil { + panic(err) + } +} + +// ReceiveChanForNode returns a channel for messages not handled by +// core.Consensus. +func (n *Network) ReceiveChanForNode() <-chan interface{} { + return n.toNode +} diff --git a/simulation/app.go b/simulation/app.go index 89657a7..ea45cc9 100644 --- a/simulation/app.go +++ b/simulation/app.go @@ -24,15 +24,32 @@ import ( "time" "github.com/dexon-foundation/dexon-consensus/common" + "github.com/dexon-foundation/dexon-consensus/core/test" "github.com/dexon-foundation/dexon-consensus/core/types" ) +type timestampEvent string + +const ( + blockSeen timestampEvent = "blockSeen" + timestampConfirm timestampEvent = "timestampConfirm" + timestampAck timestampEvent = "timestampAck" +) + +// TimestampMessage is a struct for peer sending consensus timestamp information +// to server. +type timestampMessage struct { + BlockHash common.Hash `json:"hash"` + Event timestampEvent `json:"event"` + Timestamp time.Time `json:"timestamp"` +} + // simApp is an DEXON app for simulation. type simApp struct { NodeID types.NodeID Outputs []*types.Block Early bool - netModule *network + netModule *test.Network DeliverID int // blockSeen stores the time when block is delivered by Total Ordering. blockSeen map[common.Hash]time.Time @@ -45,7 +62,7 @@ type simApp struct { } // newSimApp returns point to a new instance of simApp. -func newSimApp(id types.NodeID, netModule *network) *simApp { +func newSimApp(id types.NodeID, netModule *test.Network) *simApp { return &simApp{ NodeID: id, netModule: netModule, @@ -126,7 +143,7 @@ func (a *simApp) TotalOrderingDelivered( ID: a.DeliverID, BlockHash: blockHashes, } - a.netModule.report(blockList) + a.netModule.Report(blockList) a.DeliverID++ } @@ -178,5 +195,5 @@ func (a *simApp) BlockDelivered( Type: blockTimestamp, Payload: jsonPayload, } - a.netModule.report(msg) + a.netModule.Report(msg) } diff --git a/simulation/config/config.go b/simulation/config/config.go index 8083bee..5a548f1 100644 --- a/simulation/config/config.go +++ b/simulation/config/config.go @@ -21,19 +21,10 @@ import ( "math" "os" + "github.com/dexon-foundation/dexon-consensus/core/test" "github.com/naoina/toml" ) -// NetworkType is the simulation network type. -type NetworkType string - -// NetworkType enums. -const ( - NetworkTypeTCP NetworkType = "tcp" - NetworkTypeTCPLocal NetworkType = "tcp-local" - NetworkTypeFake NetworkType = "fake" -) - // Consensus settings. type Consensus struct { PhiRatio float32 @@ -61,7 +52,7 @@ type Node struct { // Networking config. type Networking struct { - Type NetworkType + Type test.NetworkType PeerServer string Mean float64 @@ -110,7 +101,7 @@ func GenerateDefault(path string) error { MaxBlock: math.MaxUint64, }, Networking: Networking{ - Type: NetworkTypeTCPLocal, + Type: test.NetworkTypeTCPLocal, PeerServer: "127.0.0.1", Mean: 100, Sigma: 10, diff --git a/simulation/governance.go b/simulation/governance.go index 4c501b6..3b48248 100644 --- a/simulation/governance.go +++ b/simulation/governance.go @@ -24,6 +24,7 @@ import ( "github.com/dexon-foundation/dexon-consensus/common" "github.com/dexon-foundation/dexon-consensus/core/crypto" + "github.com/dexon-foundation/dexon-consensus/core/test" "github.com/dexon-foundation/dexon-consensus/core/types" typesDKG "github.com/dexon-foundation/dexon-consensus/core/types/dkg" "github.com/dexon-foundation/dexon-consensus/simulation/config" @@ -47,7 +48,7 @@ type simGovernance struct { lambdaBA time.Duration lambdaDKG time.Duration roundInterval time.Duration - network *network + network *test.Network } // newSimGovernance returns a new simGovernance instance. @@ -76,7 +77,7 @@ func newSimGovernance( } } -func (g *simGovernance) setNetwork(network *network) { +func (g *simGovernance) setNetwork(network *test.Network) { g.network = network } @@ -160,7 +161,7 @@ func (g *simGovernance) AddDKGComplaint( g.dkgComplaint[complaint.Round] = append( g.dkgComplaint[complaint.Round], complaint) if complaint.ProposerID == g.id { - g.network.broadcast(complaint) + g.network.Broadcast(complaint) } } @@ -183,7 +184,7 @@ func (g *simGovernance) AddDKGMasterPublicKey( g.dkgMasterPublicKey[masterPublicKey.Round] = append( g.dkgMasterPublicKey[masterPublicKey.Round], masterPublicKey) if masterPublicKey.ProposerID == g.id { - g.network.broadcast(masterPublicKey) + g.network.Broadcast(masterPublicKey) } } @@ -209,7 +210,7 @@ func (g *simGovernance) AddDKGFinalize( } g.dkgFinal[final.Round][final.ProposerID] = struct{}{} if final.ProposerID == g.id { - g.network.broadcast(final) + g.network.Broadcast(final) } } diff --git a/simulation/marshaller.go b/simulation/marshaller.go index 61988b8..86eab3e 100644 --- a/simulation/marshaller.go +++ b/simulation/marshaller.go @@ -20,9 +20,6 @@ package simulation import ( "encoding/json" "fmt" - - "github.com/dexon-foundation/dexon-consensus/core/types" - typesDKG "github.com/dexon-foundation/dexon-consensus/core/types/dkg" ) // jsonMarshaller implements test.Marshaller to marshal simulation related @@ -32,8 +29,13 @@ type jsonMarshaller struct{} // Unmarshal implements Unmarshal method of test.Marshaller interface. func (m *jsonMarshaller) Unmarshal( msgType string, payload []byte) (msg interface{}, err error) { - switch msgType { + case "info-status": + var status infoStatus + if err = json.Unmarshal(payload, &status); err != nil { + break + } + msg = status case "blocklist": var blocks BlockList if err = json.Unmarshal(payload, &blocks); err != nil { @@ -46,66 +48,6 @@ func (m *jsonMarshaller) Unmarshal( break } msg = &m - case "info-status": - var status infoStatus - if err = json.Unmarshal(payload, &status); err != nil { - break - } - msg = status - case "block": - block := &types.Block{} - if err = json.Unmarshal(payload, block); err != nil { - break - } - msg = block - case "vote": - vote := &types.Vote{} - if err = json.Unmarshal(payload, vote); err != nil { - break - } - msg = vote - case "block-randomness-request": - request := &types.AgreementResult{} - if err = json.Unmarshal(payload, request); err != nil { - break - } - msg = request - case "block-randomness-result": - result := &types.BlockRandomnessResult{} - if err = json.Unmarshal(payload, result); err != nil { - break - } - msg = result - case "dkg-private-share": - privateShare := &typesDKG.PrivateShare{} - if err = json.Unmarshal(payload, privateShare); err != nil { - break - } - msg = privateShare - case "dkg-master-public-key": - masterPublicKey := typesDKG.NewMasterPublicKey() - if err = json.Unmarshal(payload, masterPublicKey); err != nil { - break - } - msg = masterPublicKey - case "dkg-complaint": - complaint := &typesDKG.Complaint{} - if err = json.Unmarshal(payload, complaint); err != nil { - break - } - msg = complaint - case "dkg-partial-signature": - psig := &typesDKG.PartialSignature{} - if err = json.Unmarshal(payload, psig); err != nil { - break - } - msg = psig - case "dkg-finalize": - final := &typesDKG.Finalize{} - if err = json.Unmarshal(payload, final); err != nil { - break - } - msg = final default: err = fmt.Errorf("unrecognized message type: %v", msgType) } @@ -118,32 +60,13 @@ func (m *jsonMarshaller) Unmarshal( // Marshal implements Marshal method of test.Marshaller interface. func (m *jsonMarshaller) Marshal(msg interface{}) ( msgType string, payload []byte, err error) { - switch msg.(type) { + case infoStatus: + msgType = "info-status" case *BlockList: msgType = "blocklist" case *message: msgType = "message" - case infoStatus: - msgType = "info-status" - case *types.Block: - msgType = "block" - case *types.Vote: - msgType = "vote" - case *types.AgreementResult: - msgType = "block-randomness-request" - case *types.BlockRandomnessResult: - msgType = "block-randomness-result" - case *typesDKG.PrivateShare: - msgType = "dkg-private-share" - case *typesDKG.MasterPublicKey: - msgType = "dkg-master-public-key" - case *typesDKG.Complaint: - msgType = "dkg-complaint" - case *typesDKG.PartialSignature: - msgType = "dkg-partial-signature" - case *typesDKG.Finalize: - msgType = "dkg-finalize" default: err = fmt.Errorf("unknwon message type: %v", msg) } diff --git a/simulation/network.go b/simulation/network.go deleted file mode 100644 index fa4f343..0000000 --- a/simulation/network.go +++ /dev/null @@ -1,322 +0,0 @@ -// Copyright 2018 The dexon-consensus Authors -// This file is part of the dexon-consensus library. -// -// The dexon-consensus library is free software: you can redistribute it -// and/or modify it under the terms of the GNU Lesser General Public License as -// published by the Free Software Foundation, either version 3 of the License, -// or (at your option) any later version. -// -// The dexon-consensus library is distributed in the hope that it will be -// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser -// General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the dexon-consensus library. If not, see -// . - -package simulation - -import ( - "context" - "encoding/json" - "fmt" - "net" - "strconv" - "time" - - "github.com/dexon-foundation/dexon-consensus/common" - "github.com/dexon-foundation/dexon-consensus/core/crypto" - "github.com/dexon-foundation/dexon-consensus/core/test" - "github.com/dexon-foundation/dexon-consensus/core/types" - typesDKG "github.com/dexon-foundation/dexon-consensus/core/types/dkg" - "github.com/dexon-foundation/dexon-consensus/simulation/config" -) - -type messageType string - -const ( - shutdownAck messageType = "shutdownAck" - blockTimestamp messageType = "blockTimestamps" -) - -// message is a struct for peer sending message to server. -type message struct { - Type messageType `json:"type"` - Payload json.RawMessage `json:"payload"` -} - -type timestampEvent string - -const ( - blockSeen timestampEvent = "blockSeen" - timestampConfirm timestampEvent = "timestampConfirm" - timestampAck timestampEvent = "timestampAck" -) - -// TimestampMessage is a struct for peer sending consensus timestamp information -// to server. -type timestampMessage struct { - BlockHash common.Hash `json:"hash"` - Event timestampEvent `json:"event"` - Timestamp time.Time `json:"timestamp"` -} - -type infoStatus string - -const ( - statusInit infoStatus = "init" - statusNormal infoStatus = "normal" - statusShutdown infoStatus = "shutdown" -) - -// infoMessage is a struct used by peerServer's /info. -type infoMessage struct { - Status infoStatus `json:"status"` - Peers map[types.NodeID]string `json:"peers"` -} - -// network implements core.Network interface and other methods for simulation -// based on test.TransportClient. -type network struct { - cfg config.Networking - ctx context.Context - ctxCancel context.CancelFunc - trans test.TransportClient - fromTransport <-chan *test.TransportEnvelope - toConsensus chan interface{} - toNode chan interface{} - sentRandomness map[common.Hash]struct{} - sentAgreement map[common.Hash]struct{} - blockCache map[common.Hash]*types.Block -} - -// newNetwork setup network stuffs for nodes, which provides an -// implementation of core.Network based on test.TransportClient. -func newNetwork(pubKey crypto.PublicKey, cfg config.Networking) (n *network) { - // Construct latency model. - latency := &test.NormalLatencyModel{ - Mean: cfg.Mean, - Sigma: cfg.Sigma, - } - // Construct basic network instance. - n = &network{ - cfg: cfg, - toNode: make(chan interface{}, 1000), - toConsensus: make(chan interface{}, 1000), - sentRandomness: make(map[common.Hash]struct{}), - sentAgreement: make(map[common.Hash]struct{}), - blockCache: make(map[common.Hash]*types.Block), - } - n.ctx, n.ctxCancel = context.WithCancel(context.Background()) - // Construct transport layer. - switch cfg.Type { - case config.NetworkTypeTCPLocal: - n.trans = test.NewTCPTransportClient( - pubKey, latency, &jsonMarshaller{}, true) - case config.NetworkTypeTCP: - n.trans = test.NewTCPTransportClient( - pubKey, latency, &jsonMarshaller{}, false) - case config.NetworkTypeFake: - n.trans = test.NewFakeTransportClient(pubKey, latency) - default: - panic(fmt.Errorf("unknown network type: %v", cfg.Type)) - } - return -} - -// PullBlock implements core.Network interface. -func (n *network) PullBlocks(hashes common.Hashes) { - go func() { - for _, hash := range hashes { - // TODO(jimmy-dexon): request block from network instead of cache. - if block, exist := n.blockCache[hash]; exist { - n.toConsensus <- block - continue - } - panic(fmt.Errorf("unknown block %s requested", hash)) - } - }() -} - -// PullVote implements core.Network interface. -func (n *network) PullVotes(pos types.Position) { - // TODO(jimmy-dexon): implement this. -} - -// BroadcastVote implements core.Network interface. -func (n *network) BroadcastVote(vote *types.Vote) { - if err := n.trans.Broadcast(vote); err != nil { - panic(err) - } -} - -// BroadcastBlock implements core.Network interface. -func (n *network) BroadcastBlock(block *types.Block) { - if err := n.trans.Broadcast(block); err != nil { - panic(err) - } -} - -// BroadcastAgreementResult implements core.Network interface. -func (n *network) BroadcastAgreementResult( - randRequest *types.AgreementResult) { - if _, exist := n.sentAgreement[randRequest.BlockHash]; exist { - return - } - if len(n.sentAgreement) > 1000 { - // Randomly drop one entry. - for k := range n.sentAgreement { - delete(n.sentAgreement, k) - break - } - } - n.sentAgreement[randRequest.BlockHash] = struct{}{} - if err := n.trans.Broadcast(randRequest); err != nil { - panic(err) - } -} - -// BroadcastRandomnessResult implements core.Network interface. -func (n *network) BroadcastRandomnessResult( - randResult *types.BlockRandomnessResult) { - if _, exist := n.sentRandomness[randResult.BlockHash]; exist { - return - } - if len(n.sentRandomness) > 1000 { - // Randomly drop one entry. - for k := range n.sentRandomness { - delete(n.sentRandomness, k) - break - } - } - n.sentRandomness[randResult.BlockHash] = struct{}{} - if err := n.trans.Broadcast(randResult); err != nil { - panic(err) - } -} - -// broadcast message to all other nodes in the network. -func (n *network) broadcast(message interface{}) { - if err := n.trans.Broadcast(message); err != nil { - panic(err) - } -} - -// SendDKGPrivateShare implements core.Network interface. -func (n *network) SendDKGPrivateShare( - recv crypto.PublicKey, prvShare *typesDKG.PrivateShare) { - if err := n.trans.Send(types.NewNodeID(recv), prvShare); err != nil { - panic(err) - } -} - -// BroadcastDKGPrivateShare implements core.Network interface. -func (n *network) BroadcastDKGPrivateShare( - prvShare *typesDKG.PrivateShare) { - if err := n.trans.Broadcast(prvShare); err != nil { - panic(err) - } -} - -// BroadcastDKGPartialSignature implements core.Network interface. -func (n *network) BroadcastDKGPartialSignature( - psig *typesDKG.PartialSignature) { - if err := n.trans.Broadcast(psig); err != nil { - panic(err) - } -} - -// ReceiveChan implements core.Network interface. -func (n *network) ReceiveChan() <-chan interface{} { - return n.toConsensus -} - -// receiveChanForNode returns a channel for nodes' specific -// messages. -func (n *network) receiveChanForNode() <-chan interface{} { - return n.toNode -} - -// setup transport layer. -func (n *network) setup(serverEndpoint interface{}) (err error) { - // Join the p2p network. - switch n.cfg.Type { - case config.NetworkTypeTCP, config.NetworkTypeTCPLocal: - addr := net.JoinHostPort(n.cfg.PeerServer, strconv.Itoa(peerPort)) - n.fromTransport, err = n.trans.Join(addr) - case config.NetworkTypeFake: - n.fromTransport, err = n.trans.Join(serverEndpoint) - default: - err = fmt.Errorf("unknown network type: %v", n.cfg.Type) - } - if err != nil { - return - } - return -} - -// run the main loop. -func (n *network) run() { - // The dispatcher declararion: - // to consensus or node, that's the question. - disp := func(e *test.TransportEnvelope) { - if block, ok := e.Msg.(*types.Block); ok { - if len(n.blockCache) > 500 { - for k := range n.blockCache { - delete(n.blockCache, k) - break - } - } - n.blockCache[block.Hash] = block - } - switch e.Msg.(type) { - case *types.Block, *types.Vote, - *types.AgreementResult, *types.BlockRandomnessResult, - *typesDKG.PrivateShare, *typesDKG.PartialSignature: - n.toConsensus <- e.Msg - default: - n.toNode <- e.Msg - } - } -MainLoop: - for { - select { - case <-n.ctx.Done(): - break MainLoop - default: - } - select { - case <-n.ctx.Done(): - break MainLoop - case e, ok := <-n.fromTransport: - if !ok { - break MainLoop - } - disp(e) - } - } -} - -// Close stop the network. -func (n *network) Close() (err error) { - n.ctxCancel() - close(n.toConsensus) - n.toConsensus = nil - close(n.toNode) - n.toNode = nil - if err = n.trans.Close(); err != nil { - return - } - return -} - -// report exports 'Report' method of test.TransportClient. -func (n *network) report(msg interface{}) error { - return n.trans.Report(msg) -} - -// peers exports 'Peers' method of test.Transport. -func (n *network) peers() []crypto.PublicKey { - return n.trans.Peers() -} diff --git a/simulation/node.go b/simulation/node.go index 311ccfc..8907d5a 100644 --- a/simulation/node.go +++ b/simulation/node.go @@ -18,6 +18,7 @@ package simulation import ( + "encoding/json" "fmt" "sort" "time" @@ -26,11 +27,33 @@ import ( "github.com/dexon-foundation/dexon-consensus/core" "github.com/dexon-foundation/dexon-consensus/core/blockdb" "github.com/dexon-foundation/dexon-consensus/core/crypto" + "github.com/dexon-foundation/dexon-consensus/core/test" "github.com/dexon-foundation/dexon-consensus/core/types" typesDKG "github.com/dexon-foundation/dexon-consensus/core/types/dkg" "github.com/dexon-foundation/dexon-consensus/simulation/config" ) +type infoStatus string + +const ( + statusInit infoStatus = "init" + statusNormal infoStatus = "normal" + statusShutdown infoStatus = "shutdown" +) + +type messageType string + +const ( + shutdownAck messageType = "shutdownAck" + blockTimestamp messageType = "blockTimestamps" +) + +// message is a struct for peer sending message to server. +type message struct { + Type messageType `json:"type"` + Payload json.RawMessage `json:"payload"` +} + // node represents a node in DexCon. type node struct { app core.Application @@ -38,7 +61,7 @@ type node struct { db blockdb.BlockDatabase config config.Node - netModule *network + netModule *test.Network ID types.NodeID chainID uint64 @@ -50,12 +73,21 @@ type node struct { func newNode( prvKey crypto.PrivateKey, config config.Config) *node { - pubKey := prvKey.PublicKey() - netModule := newNetwork(pubKey, config.Networking) + netModule := test.NewNetwork( + pubKey, + &test.NormalLatencyModel{ + Mean: config.Networking.Mean, + Sigma: config.Networking.Sigma, + }, + test.NewDefaultMarshaller(&jsonMarshaller{}), + test.NetworkConfig{ + Type: config.Networking.Type, + PeerServer: config.Networking.PeerServer, + PeerPort: peerPort, + }) id := types.NewNodeID(pubKey) - db, err := blockdb.NewMemBackedBlockDB( - id.String() + ".blockdb") + db, err := blockdb.NewMemBackedBlockDB(id.String() + ".blockdb") if err != nil { panic(err) } @@ -79,12 +111,12 @@ func (n *node) GetID() types.NodeID { // run starts the node. func (n *node) run(serverEndpoint interface{}, dMoment time.Time) { // Run network. - if err := n.netModule.setup(serverEndpoint); err != nil { + if err := n.netModule.Setup(serverEndpoint); err != nil { panic(err) } - msgChannel := n.netModule.receiveChanForNode() - peers := n.netModule.peers() - go n.netModule.run() + msgChannel := n.netModule.ReceiveChanForNode() + peers := n.netModule.Peers() + go n.netModule.Run() n.gov.setNetwork(n.netModule) // Run consensus. hashes := make(common.Hashes, 0, len(peers)) @@ -134,7 +166,7 @@ MainLoop: if err := n.db.Close(); err != nil { fmt.Println(err) } - n.netModule.report(&message{ + n.netModule.Report(&message{ Type: shutdownAck, }) // TODO(mission): once we have a way to know if consensus is stopped, stop diff --git a/simulation/simulation.go b/simulation/simulation.go index d2445c7..801bb7e 100644 --- a/simulation/simulation.go +++ b/simulation/simulation.go @@ -22,6 +22,7 @@ import ( "time" "github.com/dexon-foundation/dexon-consensus/core/crypto/ecdsa" + "github.com/dexon-foundation/dexon-consensus/core/test" "github.com/dexon-foundation/dexon-consensus/simulation/config" ) @@ -51,11 +52,11 @@ func Run(cfg *config.Config) { } switch networkType { - case config.NetworkTypeTCP: + case test.NetworkTypeTCP: // Intialized a simulation on multiple remotely peers. // The peer-server would be initialized with another command. init(nil) - case config.NetworkTypeTCPLocal, config.NetworkTypeFake: + case test.NetworkTypeTCPLocal, test.NetworkTypeFake: // Initialize a local simulation with a peer server. var serverEndpoint interface{} server = NewPeerServer() @@ -76,7 +77,7 @@ func Run(cfg *config.Config) { // Do not exit when we are in TCP node, since k8s will restart the pod and // cause confusions. - if networkType == config.NetworkTypeTCP { + if networkType == test.NetworkTypeTCP { select {} } } -- cgit v1.2.3