From 1ee5863fd4a295d34c3a2d602d5603e8746e3f7b Mon Sep 17 00:00:00 2001 From: Mission Liao Date: Thu, 8 Nov 2018 15:58:51 +0800 Subject: simulation: use test.Governacne in simulation (#311) * Move simulation.Network to test package * Use test.Governance in simulation * Pack/Apply state request in blocks payload * Add Governance.SwitchToRemoteMode This would trigger governance to broadcast pending state change requests when changes. * Allow to marshal/unmarshal packedStateChanges * Attach test.Network and test.State --- core/consensus.go | 3 +- core/lattice.go | 2 +- core/test/governance.go | 75 ++++++++++++++++++++++++++++++++++++++----------- core/test/marshaller.go | 9 ++++++ core/test/network.go | 67 ++++++++++++++++++++++++++++++------------- core/test/state.go | 8 ++++-- core/test/state_test.go | 3 ++ 7 files changed, 126 insertions(+), 41 deletions(-) (limited to 'core') diff --git a/core/consensus.go b/core/consensus.go index b9da4f0..a449701 100644 --- a/core/consensus.go +++ b/core/consensus.go @@ -660,7 +660,8 @@ func (con *Consensus) initialRound( // unexpected network fluctuation and ensure the robustness. for (con.gov.CRS(nextRound) == common.Hash{}) { con.logger.Info("CRS is not ready yet. Try again later...", - "nodeID", con.ID) + "nodeID", con.ID, + "round", nextRound) time.Sleep(500 * time.Millisecond) } nextDkgSet, err := con.nodeSetCache.GetDKGSet(nextRound) diff --git a/core/lattice.go b/core/lattice.go index 20e16f2..dcb3368 100644 --- a/core/lattice.go +++ b/core/lattice.go @@ -81,7 +81,7 @@ func (l *Lattice) PrepareBlock( if err = l.data.prepareBlock(b); err != nil { return } - l.logger.Debug("Calling Application.PreparePayload", "position", b.Position) + l.logger.Debug("Calling Application.PreparePayload", "position", &b.Position) if b.Payload, err = l.app.PreparePayload(b.Position); err != nil { return } diff --git a/core/test/governance.go b/core/test/governance.go index 42e0da1..e6b6232 100644 --- a/core/test/governance.go +++ b/core/test/governance.go @@ -37,7 +37,8 @@ import ( type Governance struct { configs []*types.Config nodeSets [][]crypto.PublicKey - state *State + stateModule *State + networkModule *Network pendingConfigChanges map[uint64]map[StateChangeType]interface{} lock sync.RWMutex } @@ -51,7 +52,7 @@ func NewGovernance(genesisNodes []crypto.PublicKey, // modification smaller. g = &Governance{ pendingConfigChanges: make(map[uint64]map[StateChangeType]interface{}), - state: NewState(genesisNodes, lambda, true), + stateModule: NewState(genesisNodes, lambda, true), } return } @@ -89,7 +90,7 @@ func (g *Governance) Configuration(round uint64) *types.Config { // CRS returns the CRS for a given round. func (g *Governance) CRS(round uint64) common.Hash { - return g.state.CRS(round) + return g.stateModule.CRS(round) } // NotifyRoundHeight notifies governace contract to snapshot config. @@ -100,11 +101,12 @@ func (g *Governance) NotifyRoundHeight(round, height uint64) { g.lock.Lock() defer g.lock.Unlock() for t, v := range g.pendingConfigChanges[round+1] { - if err := g.state.RequestChange(t, v); err != nil { + if err := g.stateModule.RequestChange(t, v); err != nil { panic(err) } } delete(g.pendingConfigChanges, round+1) + g.broadcastPendingStateChanges() }() } @@ -113,13 +115,15 @@ func (g *Governance) ProposeCRS(round uint64, signedCRS []byte) { g.lock.Lock() defer g.lock.Unlock() crs := crypto.Keccak256Hash(signedCRS) - if err := g.state.ProposeCRS(round, crs); err != nil { + if err := g.stateModule.ProposeCRS(round, crs); err != nil { // CRS can be proposed multiple times, other errors are not // accepted. if err != ErrDuplicatedChange { panic(err) } + return } + g.broadcastPendingStateChanges() } // AddDKGComplaint add a DKGComplaint. @@ -131,12 +135,16 @@ func (g *Governance) AddDKGComplaint( if g.IsDKGFinal(complaint.Round) { return } - g.state.RequestChange(StateAddDKGComplaint, complaint) + if err := g.stateModule.RequestChange( + StateAddDKGComplaint, complaint); err != nil { + panic(err) + } + g.broadcastPendingStateChanges() } // DKGComplaints returns the DKGComplaints of round. func (g *Governance) DKGComplaints(round uint64) []*typesDKG.Complaint { - return g.state.DKGComplaints(round) + return g.stateModule.DKGComplaints(round) } // AddDKGMasterPublicKey adds a DKGMasterPublicKey. @@ -145,13 +153,17 @@ func (g *Governance) AddDKGMasterPublicKey( if round != masterPublicKey.Round { return } - g.state.RequestChange(StateAddDKGMasterPublicKey, masterPublicKey) + if err := g.stateModule.RequestChange( + StateAddDKGMasterPublicKey, masterPublicKey); err != nil { + panic(err) + } + g.broadcastPendingStateChanges() } // DKGMasterPublicKeys returns the DKGMasterPublicKeys of round. func (g *Governance) DKGMasterPublicKeys( round uint64) []*typesDKG.MasterPublicKey { - return g.state.DKGMasterPublicKeys(round) + return g.stateModule.DKGMasterPublicKeys(round) } // AddDKGFinalize adds a DKG finalize message. @@ -159,7 +171,10 @@ func (g *Governance) AddDKGFinalize(round uint64, final *typesDKG.Finalize) { if round != final.Round { return } - g.state.RequestChange(StateAddDKGFinal, final) + if err := g.stateModule.RequestChange(StateAddDKGFinal, final); err != nil { + panic(err) + } + g.broadcastPendingStateChanges() } // IsDKGFinal checks if DKG is final. @@ -174,16 +189,31 @@ func (g *Governance) IsDKGFinal(round uint64) bool { if round >= uint64(len(g.configs)) { return false } - return g.state.IsDKGFinal(round, int(g.configs[round].DKGSetSize)/3*2) + return g.stateModule.IsDKGFinal(round, int(g.configs[round].DKGSetSize)/3*2) } // // Test Utilities // +type packedStateChanges []byte + +// This method broadcasts pending state change requests in the underlying +// State instance, this behavior is to simulate tx-gossiping in full nodes. +func (g *Governance) broadcastPendingStateChanges() { + if g.networkModule == nil { + return + } + packed, err := g.stateModule.PackOwnRequests() + if err != nil { + panic(err) + } + g.networkModule.Broadcast(packedStateChanges(packed)) +} + // State allows to access embed State instance. func (g *Governance) State() *State { - return g.state + return g.stateModule } // CatchUpWithRound attempts to perform state snapshot to @@ -199,7 +229,7 @@ func (g *Governance) CatchUpWithRound(round uint64) { g.lock.Lock() defer g.lock.Unlock() for uint64(len(g.configs)) <= round { - config, nodeSet := g.state.Snapshot() + config, nodeSet := g.stateModule.Snapshot() g.configs = append(g.configs, config) g.nodeSets = append(g.nodeSets, nodeSet) } @@ -210,7 +240,7 @@ func (g *Governance) Clone() *Governance { g.lock.RLock() defer g.lock.RUnlock() // Clone state. - copiedState := g.state.Clone() + copiedState := g.stateModule.Clone() // Clone configs. copiedConfigs := []*types.Config{} for _, c := range g.configs { @@ -239,10 +269,9 @@ func (g *Governance) Clone() *Governance { copiedNodeSets = append(copiedNodeSets, copiedNodeSet) } // Clone pending changes. - return &Governance{ configs: copiedConfigs, - state: copiedState, + stateModule: copiedState, nodeSets: copiedNodeSets, pendingConfigChanges: copiedPendingChanges, } @@ -286,7 +315,7 @@ func (g *Governance) Equal(other *Governance, checkState bool) bool { // different state, only the snapshots (configs and node sets) are // essentially equal. if checkState { - return g.state.Equal(other.state) == nil + return g.stateModule.Equal(other.stateModule) == nil } return true } @@ -319,3 +348,15 @@ func (g *Governance) RegisterConfigChange( pendingChangesForRound[t] = v return nil } + +// SwitchToRemoteMode would switch this governance instance to remote mode, +// which means: it will broadcast all changes from its underlying state +// instance. +func (g *Governance) SwitchToRemoteMode(n *Network) { + if g.networkModule != nil { + panic(errors.New("not in local mode before switching")) + } + g.stateModule.SwitchToRemoteMode() + g.networkModule = n + n.addStateModule(g.stateModule) +} diff --git a/core/test/marshaller.go b/core/test/marshaller.go index fc42639..a1b15b6 100644 --- a/core/test/marshaller.go +++ b/core/test/marshaller.go @@ -95,6 +95,12 @@ func (m *DefaultMarshaller) Unmarshal( break } msg = final + case "packed-state-changes": + packed := &packedStateChanges{} + if err = json.Unmarshal(payload, packed); err != nil { + break + } + msg = *packed default: if m.fallback == nil { err = fmt.Errorf("unknown msg type: %v", msgType) @@ -136,6 +142,9 @@ func (m *DefaultMarshaller) Marshal( case *typesDKG.Finalize: msgType = "dkg-finalize" payload, err = json.Marshal(msg) + case packedStateChanges: + msgType = "packed-state-changes" + payload, err = json.Marshal(msg) default: if m.fallback == nil { err = fmt.Errorf("unknwon message type: %v", msg) diff --git a/core/test/network.go b/core/test/network.go index 00c60d9..0d92af0 100644 --- a/core/test/network.go +++ b/core/test/network.go @@ -19,9 +19,11 @@ package test import ( "context" + "errors" "fmt" "net" "strconv" + "sync" "github.com/dexon-foundation/dexon-consensus/common" "github.com/dexon-foundation/dexon-consensus/core/crypto" @@ -48,16 +50,20 @@ type NetworkConfig struct { // Network implements core.Network interface based on TransportClient. type Network struct { - config NetworkConfig - ctx context.Context - ctxCancel context.CancelFunc - trans TransportClient - fromTransport <-chan *TransportEnvelope - toConsensus chan interface{} - toNode chan interface{} - sentRandomness map[common.Hash]struct{} - sentAgreement map[common.Hash]struct{} - blockCache map[common.Hash]*types.Block + config NetworkConfig + ctx context.Context + ctxCancel context.CancelFunc + trans TransportClient + fromTransport <-chan *TransportEnvelope + toConsensus chan interface{} + toNode chan interface{} + sentRandomnessLock sync.Mutex + sentRandomness map[common.Hash]struct{} + sentAgreementLock sync.Mutex + sentAgreement map[common.Hash]struct{} + blockCacheLock sync.RWMutex + blockCache map[common.Hash]*types.Block + stateModule *State } // NewNetwork setup network stuffs for nodes, which provides an @@ -91,6 +97,8 @@ func NewNetwork(pubKey crypto.PublicKey, latency LatencyModel, // PullBlocks implements core.Network interface. func (n *Network) PullBlocks(hashes common.Hashes) { go func() { + n.blockCacheLock.RLock() + defer n.blockCacheLock.RUnlock() for _, hash := range hashes { // TODO(jimmy-dexon): request block from network instead of cache. if block, exist := n.blockCache[hash]; exist { @@ -124,6 +132,8 @@ func (n *Network) BroadcastBlock(block *types.Block) { // BroadcastAgreementResult implements core.Network interface. func (n *Network) BroadcastAgreementResult( randRequest *types.AgreementResult) { + n.sentAgreementLock.Lock() + defer n.sentAgreementLock.Unlock() if _, exist := n.sentAgreement[randRequest.BlockHash]; exist { return } @@ -143,6 +153,8 @@ func (n *Network) BroadcastAgreementResult( // BroadcastRandomnessResult implements core.Network interface. func (n *Network) BroadcastRandomnessResult( randResult *types.BlockRandomnessResult) { + n.sentRandomnessLock.Lock() + defer n.sentRandomnessLock.Unlock() if _, exist := n.sentRandomness[randResult.BlockHash]; exist { return } @@ -214,21 +226,33 @@ func (n *Network) Setup(serverEndpoint interface{}) (err error) { return } -func (n *Network) msgHandler(e *TransportEnvelope) { +func (n *Network) dispatchMsg(e *TransportEnvelope) { switch v := e.Msg.(type) { case *types.Block: - if len(n.blockCache) > 500 { - // Randomly purge one block from cache. - for k := range n.blockCache { - delete(n.blockCache, k) - break + func() { + n.blockCacheLock.Lock() + defer n.blockCacheLock.Unlock() + if len(n.blockCache) > 500 { + // Randomly purge one block from cache. + for k := range n.blockCache { + delete(n.blockCache, k) + break + } } - } - n.blockCache[v.Hash] = v + n.blockCache[v.Hash] = v + }() n.toConsensus <- e.Msg case *types.Vote, *types.AgreementResult, *types.BlockRandomnessResult, *typesDKG.PrivateShare, *typesDKG.PartialSignature: n.toConsensus <- e.Msg + case packedStateChanges: + if n.stateModule == nil { + panic(errors.New( + "receive packed state change request without state attached")) + } + if err := n.stateModule.AddRequestsFromOthers([]byte(v)); err != nil { + panic(err) + } default: n.toNode <- e.Msg } @@ -250,7 +274,7 @@ Loop: if !ok { break Loop } - n.msgHandler(e) + n.dispatchMsg(e) } } } @@ -291,3 +315,8 @@ func (n *Network) Broadcast(msg interface{}) { func (n *Network) ReceiveChanForNode() <-chan interface{} { return n.toNode } + +// addStateModule attaches a State instance to this network. +func (n *Network) addStateModule(s *State) { + n.stateModule = s +} diff --git a/core/test/state.go b/core/test/state.go index 2bb04e5..630c43f 100644 --- a/core/test/state.go +++ b/core/test/state.go @@ -518,9 +518,11 @@ func (s *State) AddRequestsFromOthers(reqsAsBytes []byte) (err error) { // PackRequests pack all current pending requests, include those from others. func (s *State) PackRequests() (b []byte, err error) { - // Convert own requests to global one for packing. - if _, err = s.PackOwnRequests(); err != nil { - return + if s.local { + // Convert own requests to global one for packing. + if _, err = s.PackOwnRequests(); err != nil { + return + } } // Pack requests in global pool. packed := []*StateChangeRequest{} diff --git a/core/test/state_test.go b/core/test/state_test.go index 6c5f882..864b5be 100644 --- a/core/test/state_test.go +++ b/core/test/state_test.go @@ -315,6 +315,9 @@ func (s *StateTestSuite) TestPacking() { req.Empty(st.DKGMasterPublicKeys(2)) req.Empty(st.DKGComplaints(2)) req.False(st.IsDKGFinal(2, 0)) + // In remote mode, we need to manually convert own requests to global ones. + _, err = st.PackOwnRequests() + req.NoError(err) // Pack changes into bytes. b, err := st.PackRequests() req.NoError(err) -- cgit v1.2.3