aboutsummaryrefslogtreecommitdiffstats
path: root/core
diff options
context:
space:
mode:
authorMission Liao <mission.liao@dexon.org>2018-11-08 15:58:51 +0800
committerGitHub <noreply@github.com>2018-11-08 15:58:51 +0800
commit1ee5863fd4a295d34c3a2d602d5603e8746e3f7b (patch)
tree044308b22000bb0c9f5a8c3c21f465159418db24 /core
parentdbe83ea4a324941417d6ff09230e5874d5ba5df5 (diff)
downloaddexon-consensus-1ee5863fd4a295d34c3a2d602d5603e8746e3f7b.tar
dexon-consensus-1ee5863fd4a295d34c3a2d602d5603e8746e3f7b.tar.gz
dexon-consensus-1ee5863fd4a295d34c3a2d602d5603e8746e3f7b.tar.bz2
dexon-consensus-1ee5863fd4a295d34c3a2d602d5603e8746e3f7b.tar.lz
dexon-consensus-1ee5863fd4a295d34c3a2d602d5603e8746e3f7b.tar.xz
dexon-consensus-1ee5863fd4a295d34c3a2d602d5603e8746e3f7b.tar.zst
dexon-consensus-1ee5863fd4a295d34c3a2d602d5603e8746e3f7b.zip
simulation: use test.Governacne in simulation (#311)
* Move simulation.Network to test package * Use test.Governance in simulation * Pack/Apply state request in blocks payload * Add Governance.SwitchToRemoteMode This would trigger governance to broadcast pending state change requests when changes. * Allow to marshal/unmarshal packedStateChanges * Attach test.Network and test.State
Diffstat (limited to 'core')
-rw-r--r--core/consensus.go3
-rw-r--r--core/lattice.go2
-rw-r--r--core/test/governance.go75
-rw-r--r--core/test/marshaller.go9
-rw-r--r--core/test/network.go67
-rw-r--r--core/test/state.go8
-rw-r--r--core/test/state_test.go3
7 files changed, 126 insertions, 41 deletions
diff --git a/core/consensus.go b/core/consensus.go
index b9da4f0..a449701 100644
--- a/core/consensus.go
+++ b/core/consensus.go
@@ -660,7 +660,8 @@ func (con *Consensus) initialRound(
// unexpected network fluctuation and ensure the robustness.
for (con.gov.CRS(nextRound) == common.Hash{}) {
con.logger.Info("CRS is not ready yet. Try again later...",
- "nodeID", con.ID)
+ "nodeID", con.ID,
+ "round", nextRound)
time.Sleep(500 * time.Millisecond)
}
nextDkgSet, err := con.nodeSetCache.GetDKGSet(nextRound)
diff --git a/core/lattice.go b/core/lattice.go
index 20e16f2..dcb3368 100644
--- a/core/lattice.go
+++ b/core/lattice.go
@@ -81,7 +81,7 @@ func (l *Lattice) PrepareBlock(
if err = l.data.prepareBlock(b); err != nil {
return
}
- l.logger.Debug("Calling Application.PreparePayload", "position", b.Position)
+ l.logger.Debug("Calling Application.PreparePayload", "position", &b.Position)
if b.Payload, err = l.app.PreparePayload(b.Position); err != nil {
return
}
diff --git a/core/test/governance.go b/core/test/governance.go
index 42e0da1..e6b6232 100644
--- a/core/test/governance.go
+++ b/core/test/governance.go
@@ -37,7 +37,8 @@ import (
type Governance struct {
configs []*types.Config
nodeSets [][]crypto.PublicKey
- state *State
+ stateModule *State
+ networkModule *Network
pendingConfigChanges map[uint64]map[StateChangeType]interface{}
lock sync.RWMutex
}
@@ -51,7 +52,7 @@ func NewGovernance(genesisNodes []crypto.PublicKey,
// modification smaller.
g = &Governance{
pendingConfigChanges: make(map[uint64]map[StateChangeType]interface{}),
- state: NewState(genesisNodes, lambda, true),
+ stateModule: NewState(genesisNodes, lambda, true),
}
return
}
@@ -89,7 +90,7 @@ func (g *Governance) Configuration(round uint64) *types.Config {
// CRS returns the CRS for a given round.
func (g *Governance) CRS(round uint64) common.Hash {
- return g.state.CRS(round)
+ return g.stateModule.CRS(round)
}
// NotifyRoundHeight notifies governace contract to snapshot config.
@@ -100,11 +101,12 @@ func (g *Governance) NotifyRoundHeight(round, height uint64) {
g.lock.Lock()
defer g.lock.Unlock()
for t, v := range g.pendingConfigChanges[round+1] {
- if err := g.state.RequestChange(t, v); err != nil {
+ if err := g.stateModule.RequestChange(t, v); err != nil {
panic(err)
}
}
delete(g.pendingConfigChanges, round+1)
+ g.broadcastPendingStateChanges()
}()
}
@@ -113,13 +115,15 @@ func (g *Governance) ProposeCRS(round uint64, signedCRS []byte) {
g.lock.Lock()
defer g.lock.Unlock()
crs := crypto.Keccak256Hash(signedCRS)
- if err := g.state.ProposeCRS(round, crs); err != nil {
+ if err := g.stateModule.ProposeCRS(round, crs); err != nil {
// CRS can be proposed multiple times, other errors are not
// accepted.
if err != ErrDuplicatedChange {
panic(err)
}
+ return
}
+ g.broadcastPendingStateChanges()
}
// AddDKGComplaint add a DKGComplaint.
@@ -131,12 +135,16 @@ func (g *Governance) AddDKGComplaint(
if g.IsDKGFinal(complaint.Round) {
return
}
- g.state.RequestChange(StateAddDKGComplaint, complaint)
+ if err := g.stateModule.RequestChange(
+ StateAddDKGComplaint, complaint); err != nil {
+ panic(err)
+ }
+ g.broadcastPendingStateChanges()
}
// DKGComplaints returns the DKGComplaints of round.
func (g *Governance) DKGComplaints(round uint64) []*typesDKG.Complaint {
- return g.state.DKGComplaints(round)
+ return g.stateModule.DKGComplaints(round)
}
// AddDKGMasterPublicKey adds a DKGMasterPublicKey.
@@ -145,13 +153,17 @@ func (g *Governance) AddDKGMasterPublicKey(
if round != masterPublicKey.Round {
return
}
- g.state.RequestChange(StateAddDKGMasterPublicKey, masterPublicKey)
+ if err := g.stateModule.RequestChange(
+ StateAddDKGMasterPublicKey, masterPublicKey); err != nil {
+ panic(err)
+ }
+ g.broadcastPendingStateChanges()
}
// DKGMasterPublicKeys returns the DKGMasterPublicKeys of round.
func (g *Governance) DKGMasterPublicKeys(
round uint64) []*typesDKG.MasterPublicKey {
- return g.state.DKGMasterPublicKeys(round)
+ return g.stateModule.DKGMasterPublicKeys(round)
}
// AddDKGFinalize adds a DKG finalize message.
@@ -159,7 +171,10 @@ func (g *Governance) AddDKGFinalize(round uint64, final *typesDKG.Finalize) {
if round != final.Round {
return
}
- g.state.RequestChange(StateAddDKGFinal, final)
+ if err := g.stateModule.RequestChange(StateAddDKGFinal, final); err != nil {
+ panic(err)
+ }
+ g.broadcastPendingStateChanges()
}
// IsDKGFinal checks if DKG is final.
@@ -174,16 +189,31 @@ func (g *Governance) IsDKGFinal(round uint64) bool {
if round >= uint64(len(g.configs)) {
return false
}
- return g.state.IsDKGFinal(round, int(g.configs[round].DKGSetSize)/3*2)
+ return g.stateModule.IsDKGFinal(round, int(g.configs[round].DKGSetSize)/3*2)
}
//
// Test Utilities
//
+type packedStateChanges []byte
+
+// This method broadcasts pending state change requests in the underlying
+// State instance, this behavior is to simulate tx-gossiping in full nodes.
+func (g *Governance) broadcastPendingStateChanges() {
+ if g.networkModule == nil {
+ return
+ }
+ packed, err := g.stateModule.PackOwnRequests()
+ if err != nil {
+ panic(err)
+ }
+ g.networkModule.Broadcast(packedStateChanges(packed))
+}
+
// State allows to access embed State instance.
func (g *Governance) State() *State {
- return g.state
+ return g.stateModule
}
// CatchUpWithRound attempts to perform state snapshot to
@@ -199,7 +229,7 @@ func (g *Governance) CatchUpWithRound(round uint64) {
g.lock.Lock()
defer g.lock.Unlock()
for uint64(len(g.configs)) <= round {
- config, nodeSet := g.state.Snapshot()
+ config, nodeSet := g.stateModule.Snapshot()
g.configs = append(g.configs, config)
g.nodeSets = append(g.nodeSets, nodeSet)
}
@@ -210,7 +240,7 @@ func (g *Governance) Clone() *Governance {
g.lock.RLock()
defer g.lock.RUnlock()
// Clone state.
- copiedState := g.state.Clone()
+ copiedState := g.stateModule.Clone()
// Clone configs.
copiedConfigs := []*types.Config{}
for _, c := range g.configs {
@@ -239,10 +269,9 @@ func (g *Governance) Clone() *Governance {
copiedNodeSets = append(copiedNodeSets, copiedNodeSet)
}
// Clone pending changes.
-
return &Governance{
configs: copiedConfigs,
- state: copiedState,
+ stateModule: copiedState,
nodeSets: copiedNodeSets,
pendingConfigChanges: copiedPendingChanges,
}
@@ -286,7 +315,7 @@ func (g *Governance) Equal(other *Governance, checkState bool) bool {
// different state, only the snapshots (configs and node sets) are
// essentially equal.
if checkState {
- return g.state.Equal(other.state) == nil
+ return g.stateModule.Equal(other.stateModule) == nil
}
return true
}
@@ -319,3 +348,15 @@ func (g *Governance) RegisterConfigChange(
pendingChangesForRound[t] = v
return nil
}
+
+// SwitchToRemoteMode would switch this governance instance to remote mode,
+// which means: it will broadcast all changes from its underlying state
+// instance.
+func (g *Governance) SwitchToRemoteMode(n *Network) {
+ if g.networkModule != nil {
+ panic(errors.New("not in local mode before switching"))
+ }
+ g.stateModule.SwitchToRemoteMode()
+ g.networkModule = n
+ n.addStateModule(g.stateModule)
+}
diff --git a/core/test/marshaller.go b/core/test/marshaller.go
index fc42639..a1b15b6 100644
--- a/core/test/marshaller.go
+++ b/core/test/marshaller.go
@@ -95,6 +95,12 @@ func (m *DefaultMarshaller) Unmarshal(
break
}
msg = final
+ case "packed-state-changes":
+ packed := &packedStateChanges{}
+ if err = json.Unmarshal(payload, packed); err != nil {
+ break
+ }
+ msg = *packed
default:
if m.fallback == nil {
err = fmt.Errorf("unknown msg type: %v", msgType)
@@ -136,6 +142,9 @@ func (m *DefaultMarshaller) Marshal(
case *typesDKG.Finalize:
msgType = "dkg-finalize"
payload, err = json.Marshal(msg)
+ case packedStateChanges:
+ msgType = "packed-state-changes"
+ payload, err = json.Marshal(msg)
default:
if m.fallback == nil {
err = fmt.Errorf("unknwon message type: %v", msg)
diff --git a/core/test/network.go b/core/test/network.go
index 00c60d9..0d92af0 100644
--- a/core/test/network.go
+++ b/core/test/network.go
@@ -19,9 +19,11 @@ package test
import (
"context"
+ "errors"
"fmt"
"net"
"strconv"
+ "sync"
"github.com/dexon-foundation/dexon-consensus/common"
"github.com/dexon-foundation/dexon-consensus/core/crypto"
@@ -48,16 +50,20 @@ type NetworkConfig struct {
// Network implements core.Network interface based on TransportClient.
type Network struct {
- config NetworkConfig
- ctx context.Context
- ctxCancel context.CancelFunc
- trans TransportClient
- fromTransport <-chan *TransportEnvelope
- toConsensus chan interface{}
- toNode chan interface{}
- sentRandomness map[common.Hash]struct{}
- sentAgreement map[common.Hash]struct{}
- blockCache map[common.Hash]*types.Block
+ config NetworkConfig
+ ctx context.Context
+ ctxCancel context.CancelFunc
+ trans TransportClient
+ fromTransport <-chan *TransportEnvelope
+ toConsensus chan interface{}
+ toNode chan interface{}
+ sentRandomnessLock sync.Mutex
+ sentRandomness map[common.Hash]struct{}
+ sentAgreementLock sync.Mutex
+ sentAgreement map[common.Hash]struct{}
+ blockCacheLock sync.RWMutex
+ blockCache map[common.Hash]*types.Block
+ stateModule *State
}
// NewNetwork setup network stuffs for nodes, which provides an
@@ -91,6 +97,8 @@ func NewNetwork(pubKey crypto.PublicKey, latency LatencyModel,
// PullBlocks implements core.Network interface.
func (n *Network) PullBlocks(hashes common.Hashes) {
go func() {
+ n.blockCacheLock.RLock()
+ defer n.blockCacheLock.RUnlock()
for _, hash := range hashes {
// TODO(jimmy-dexon): request block from network instead of cache.
if block, exist := n.blockCache[hash]; exist {
@@ -124,6 +132,8 @@ func (n *Network) BroadcastBlock(block *types.Block) {
// BroadcastAgreementResult implements core.Network interface.
func (n *Network) BroadcastAgreementResult(
randRequest *types.AgreementResult) {
+ n.sentAgreementLock.Lock()
+ defer n.sentAgreementLock.Unlock()
if _, exist := n.sentAgreement[randRequest.BlockHash]; exist {
return
}
@@ -143,6 +153,8 @@ func (n *Network) BroadcastAgreementResult(
// BroadcastRandomnessResult implements core.Network interface.
func (n *Network) BroadcastRandomnessResult(
randResult *types.BlockRandomnessResult) {
+ n.sentRandomnessLock.Lock()
+ defer n.sentRandomnessLock.Unlock()
if _, exist := n.sentRandomness[randResult.BlockHash]; exist {
return
}
@@ -214,21 +226,33 @@ func (n *Network) Setup(serverEndpoint interface{}) (err error) {
return
}
-func (n *Network) msgHandler(e *TransportEnvelope) {
+func (n *Network) dispatchMsg(e *TransportEnvelope) {
switch v := e.Msg.(type) {
case *types.Block:
- if len(n.blockCache) > 500 {
- // Randomly purge one block from cache.
- for k := range n.blockCache {
- delete(n.blockCache, k)
- break
+ func() {
+ n.blockCacheLock.Lock()
+ defer n.blockCacheLock.Unlock()
+ if len(n.blockCache) > 500 {
+ // Randomly purge one block from cache.
+ for k := range n.blockCache {
+ delete(n.blockCache, k)
+ break
+ }
}
- }
- n.blockCache[v.Hash] = v
+ n.blockCache[v.Hash] = v
+ }()
n.toConsensus <- e.Msg
case *types.Vote, *types.AgreementResult, *types.BlockRandomnessResult,
*typesDKG.PrivateShare, *typesDKG.PartialSignature:
n.toConsensus <- e.Msg
+ case packedStateChanges:
+ if n.stateModule == nil {
+ panic(errors.New(
+ "receive packed state change request without state attached"))
+ }
+ if err := n.stateModule.AddRequestsFromOthers([]byte(v)); err != nil {
+ panic(err)
+ }
default:
n.toNode <- e.Msg
}
@@ -250,7 +274,7 @@ Loop:
if !ok {
break Loop
}
- n.msgHandler(e)
+ n.dispatchMsg(e)
}
}
}
@@ -291,3 +315,8 @@ func (n *Network) Broadcast(msg interface{}) {
func (n *Network) ReceiveChanForNode() <-chan interface{} {
return n.toNode
}
+
+// addStateModule attaches a State instance to this network.
+func (n *Network) addStateModule(s *State) {
+ n.stateModule = s
+}
diff --git a/core/test/state.go b/core/test/state.go
index 2bb04e5..630c43f 100644
--- a/core/test/state.go
+++ b/core/test/state.go
@@ -518,9 +518,11 @@ func (s *State) AddRequestsFromOthers(reqsAsBytes []byte) (err error) {
// PackRequests pack all current pending requests, include those from others.
func (s *State) PackRequests() (b []byte, err error) {
- // Convert own requests to global one for packing.
- if _, err = s.PackOwnRequests(); err != nil {
- return
+ if s.local {
+ // Convert own requests to global one for packing.
+ if _, err = s.PackOwnRequests(); err != nil {
+ return
+ }
}
// Pack requests in global pool.
packed := []*StateChangeRequest{}
diff --git a/core/test/state_test.go b/core/test/state_test.go
index 6c5f882..864b5be 100644
--- a/core/test/state_test.go
+++ b/core/test/state_test.go
@@ -315,6 +315,9 @@ func (s *StateTestSuite) TestPacking() {
req.Empty(st.DKGMasterPublicKeys(2))
req.Empty(st.DKGComplaints(2))
req.False(st.IsDKGFinal(2, 0))
+ // In remote mode, we need to manually convert own requests to global ones.
+ _, err = st.PackOwnRequests()
+ req.NoError(err)
// Pack changes into bytes.
b, err := st.PackRequests()
req.NoError(err)