aboutsummaryrefslogtreecommitdiffstats
path: root/core/test/network.go
diff options
context:
space:
mode:
Diffstat (limited to 'core/test/network.go')
-rw-r--r--core/test/network.go272
1 files changed, 62 insertions, 210 deletions
diff --git a/core/test/network.go b/core/test/network.go
index 443a26c..6034fa6 100644
--- a/core/test/network.go
+++ b/core/test/network.go
@@ -39,6 +39,9 @@ const (
maxPullingPeerCount = 3
maxBlockCache = 1000
maxVoteCache = 128
+
+ // Gossiping parameter.
+ gossipAgreementResultPercent = 33
)
// NetworkType is the simulation network type.
@@ -77,8 +80,6 @@ func (req *PullRequest) MarshalJSON() (b []byte, err error) {
idAsBytes, err = json.Marshal(req.Identity.(common.Hashes))
case "vote":
idAsBytes, err = json.Marshal(req.Identity.(types.Position))
- case "randomness":
- idAsBytes, err = json.Marshal(req.Identity.(common.Hashes))
default:
err = fmt.Errorf("unknown ID type for pull request: %v", req.Type)
}
@@ -117,12 +118,6 @@ func (req *PullRequest) UnmarshalJSON(data []byte) (err error) {
break
}
ID = pos
- case "randomness":
- hashes := common.Hashes{}
- if err = json.Unmarshal(rawReq.Identity, &hashes); err != nil {
- break
- }
- ID = hashes
default:
err = fmt.Errorf("unknown pull request type: %v", rawReq.Type)
}
@@ -137,38 +132,30 @@ func (req *PullRequest) UnmarshalJSON(data []byte) (err error) {
// Network implements core.Network interface based on TransportClient.
type Network struct {
- ID types.NodeID
- config NetworkConfig
- ctx context.Context
- ctxCancel context.CancelFunc
- trans TransportClient
- dMoment time.Time
- fromTransport <-chan *TransportEnvelope
- toConsensus chan interface{}
- toNode chan interface{}
- sentRandomnessLock sync.Mutex
- sentRandomness map[common.Hash]struct{}
- sentAgreementLock sync.Mutex
- sentAgreement map[common.Hash]struct{}
- blockCacheLock sync.RWMutex
- blockCache map[common.Hash]*types.Block
- voteCacheLock sync.RWMutex
- voteCache map[types.Position]map[types.VoteHeader]*types.Vote
- voteCacheSize int
- votePositions []types.Position
- randomnessCacheLock sync.RWMutex
- randomnessCache map[common.Hash]*types.BlockRandomnessResult
- stateModule *State
- peers map[types.NodeID]struct{}
- unreceivedBlocksLock sync.RWMutex
- unreceivedBlocks map[common.Hash]chan<- common.Hash
- unreceivedRandomnessLock sync.RWMutex
- unreceivedRandomness map[common.Hash]chan<- common.Hash
- cache *utils.NodeSetCache
- notarySetCachesLock sync.Mutex
- notarySetCaches map[uint64]map[types.NodeID]struct{}
- dkgSetCachesLock sync.Mutex
- dkgSetCaches map[uint64]map[types.NodeID]struct{}
+ ID types.NodeID
+ config NetworkConfig
+ ctx context.Context
+ ctxCancel context.CancelFunc
+ trans TransportClient
+ dMoment time.Time
+ fromTransport <-chan *TransportEnvelope
+ toConsensus chan interface{}
+ toNode chan interface{}
+ sentAgreementLock sync.Mutex
+ sentAgreement map[common.Hash]struct{}
+ blockCacheLock sync.RWMutex
+ blockCache map[common.Hash]*types.Block
+ voteCacheLock sync.RWMutex
+ voteCache map[types.Position]map[types.VoteHeader]*types.Vote
+ voteCacheSize int
+ votePositions []types.Position
+ stateModule *State
+ peers map[types.NodeID]struct{}
+ unreceivedBlocksLock sync.RWMutex
+ unreceivedBlocks map[common.Hash]chan<- common.Hash
+ cache *utils.NodeSetCache
+ notarySetCachesLock sync.Mutex
+ notarySetCaches map[uint64]map[types.NodeID]struct{}
}
// NewNetwork setup network stuffs for nodes, which provides an
@@ -177,19 +164,15 @@ func NewNetwork(pubKey crypto.PublicKey, config NetworkConfig) (
n *Network) {
// Construct basic network instance.
n = &Network{
- ID: types.NewNodeID(pubKey),
- config: config,
- toConsensus: make(chan interface{}, 1000),
- toNode: make(chan interface{}, 1000),
- sentRandomness: make(map[common.Hash]struct{}),
- sentAgreement: make(map[common.Hash]struct{}),
- blockCache: make(map[common.Hash]*types.Block, maxBlockCache),
- randomnessCache: make(map[common.Hash]*types.BlockRandomnessResult),
- unreceivedBlocks: make(map[common.Hash]chan<- common.Hash),
- unreceivedRandomness: make(map[common.Hash]chan<- common.Hash),
- peers: make(map[types.NodeID]struct{}),
- notarySetCaches: make(map[uint64]map[types.NodeID]struct{}),
- dkgSetCaches: make(map[uint64]map[types.NodeID]struct{}),
+ ID: types.NewNodeID(pubKey),
+ config: config,
+ toConsensus: make(chan interface{}, 1000),
+ toNode: make(chan interface{}, 1000),
+ sentAgreement: make(map[common.Hash]struct{}),
+ blockCache: make(map[common.Hash]*types.Block, maxBlockCache),
+ unreceivedBlocks: make(map[common.Hash]chan<- common.Hash),
+ peers: make(map[types.NodeID]struct{}),
+ notarySetCaches: make(map[uint64]map[types.NodeID]struct{}),
voteCache: make(
map[types.Position]map[types.VoteHeader]*types.Vote),
}
@@ -218,11 +201,6 @@ func (n *Network) PullVotes(pos types.Position) {
go n.pullVotesAsync(pos)
}
-// PullRandomness implememnts core.Network interface.
-func (n *Network) PullRandomness(hashes common.Hashes) {
- go n.pullRandomnessAsync(hashes)
-}
-
// BroadcastVote implements core.Network interface.
func (n *Network) BroadcastVote(vote *types.Vote) {
if err := n.trans.Broadcast(n.getNotarySet(vote.Position.Round),
@@ -254,37 +232,22 @@ func (n *Network) BroadcastAgreementResult(
if !n.markAgreementResultAsSent(result.BlockHash) {
return
}
- // Send to DKG set first.
- dkgSet := n.getDKGSet(result.Position.Round)
- if err := n.trans.Broadcast(
- dkgSet, n.config.DirectLatency, result); err != nil {
- panic(err)
- }
- // Gossip to other nodes.
- if err := n.trans.Broadcast(getComplementSet(n.peers, dkgSet),
- n.config.GossipLatency, result); err != nil {
- panic(err)
- }
-}
-
-// BroadcastRandomnessResult implements core.Network interface.
-func (n *Network) BroadcastRandomnessResult(
- randResult *types.BlockRandomnessResult) {
- if !n.markRandomnessResultAsSent(randResult.BlockHash) {
- return
- }
- // Send to notary set first.
- notarySet := n.getNotarySet(randResult.Position.Round)
- if err := n.trans.Broadcast(
- notarySet, n.config.DirectLatency, randResult); err != nil {
- panic(err)
+ n.addBlockRandomnessToCache(result.BlockHash, result.Randomness)
+ notarySet := n.getNotarySet(result.Position.Round)
+ count := len(notarySet) * gossipAgreementResultPercent / 100
+ for nID := range notarySet {
+ if count--; count < 0 {
+ break
+ }
+ if err := n.trans.Send(nID, result); err != nil {
+ panic(err)
+ }
}
// Gossip to other nodes.
if err := n.trans.Broadcast(getComplementSet(n.peers, notarySet),
- n.config.GossipLatency, randResult); err != nil {
+ n.config.GossipLatency, result); err != nil {
panic(err)
}
- n.addRandomnessToCache(randResult)
}
// SendDKGPrivateShare implements core.Network interface.
@@ -296,7 +259,7 @@ func (n *Network) SendDKGPrivateShare(
// BroadcastDKGPrivateShare implements core.Network interface.
func (n *Network) BroadcastDKGPrivateShare(
prvShare *typesDKG.PrivateShare) {
- if err := n.trans.Broadcast(n.getDKGSet(prvShare.Round),
+ if err := n.trans.Broadcast(n.getNotarySet(prvShare.Round),
n.config.DirectLatency, prvShare); err != nil {
panic(err)
}
@@ -306,7 +269,7 @@ func (n *Network) BroadcastDKGPrivateShare(
func (n *Network) BroadcastDKGPartialSignature(
psig *typesDKG.PartialSignature) {
if err := n.trans.Broadcast(
- n.getDKGSet(psig.Round), n.config.DirectLatency, psig); err != nil {
+ n.getNotarySet(psig.Round), n.config.DirectLatency, psig); err != nil {
panic(err)
}
}
@@ -358,7 +321,7 @@ func (n *Network) dispatchMsg(e *TransportEnvelope) {
// Add this vote to cache.
n.addVoteToCache(v)
n.toConsensus <- v
- case *types.AgreementResult, *types.BlockRandomnessResult,
+ case *types.AgreementResult,
*typesDKG.PrivateShare, *typesDKG.PartialSignature:
n.toConsensus <- v
case packedStateChanges:
@@ -408,25 +371,6 @@ func (n *Network) handlePullRequest(req *PullRequest) {
}
}
}()
- case "randomness":
- hashes := req.Identity.(common.Hashes)
- func() {
- n.randomnessCacheLock.Lock()
- defer n.randomnessCacheLock.Unlock()
- All:
- for _, h := range hashes {
- r, exists := n.randomnessCache[h]
- if !exists {
- continue
- }
- select {
- case <-n.ctx.Done():
- break All
- default:
- }
- n.send(req.Requester, r)
- }
- }()
default:
panic(fmt.Errorf("unknown type of pull request: %v", req.Type))
}
@@ -582,57 +526,6 @@ func (n *Network) pullVotesAsync(pos types.Position) {
}
}
-func (n *Network) pullRandomnessAsync(hashes common.Hashes) {
- // Setup notification channels for each block hash.
- notYetReceived := make(map[common.Hash]struct{})
- ch := make(chan common.Hash, len(hashes))
- func() {
- n.unreceivedRandomnessLock.Lock()
- defer n.unreceivedRandomnessLock.Unlock()
- for _, h := range hashes {
- if _, exists := n.unreceivedRandomness[h]; exists {
- continue
- }
- n.unreceivedRandomness[h] = ch
- notYetReceived[h] = struct{}{}
- }
- }()
- req := &PullRequest{
- Requester: n.ID,
- Type: "randomness",
- Identity: hashes,
- }
- // Randomly pick peers to send pull requests.
-Loop:
- for nID := range n.peers {
- if nID == n.ID {
- continue
- }
- n.send(nID, req)
- select {
- case <-n.ctx.Done():
- break Loop
- case <-time.After(2 * n.config.DirectLatency.Delay()):
- // Consume everything in the notification channel.
- for {
- select {
- case h, ok := <-ch:
- if !ok {
- // This network module is closed.
- break Loop
- }
- delete(notYetReceived, h)
- if len(notYetReceived) == 0 {
- break Loop
- }
- default:
- continue Loop
- }
- }
- }
- }
-}
-
func (n *Network) addBlockToCache(b *types.Block) {
n.blockCacheLock.Lock()
defer n.blockCacheLock.Unlock()
@@ -646,6 +539,16 @@ func (n *Network) addBlockToCache(b *types.Block) {
n.blockCache[b.Hash] = b.Clone()
}
+func (n *Network) addBlockRandomnessToCache(hash common.Hash, rand []byte) {
+ n.blockCacheLock.Lock()
+ defer n.blockCacheLock.Unlock()
+ block, exist := n.blockCache[hash]
+ if !exist {
+ return
+ }
+ block.Finalization.Randomness = rand
+}
+
func (n *Network) addVoteToCache(v *types.Vote) {
n.voteCacheLock.Lock()
defer n.voteCacheLock.Unlock()
@@ -667,19 +570,6 @@ func (n *Network) addVoteToCache(v *types.Vote) {
n.voteCacheSize++
}
-func (n *Network) addRandomnessToCache(rand *types.BlockRandomnessResult) {
- n.randomnessCacheLock.Lock()
- defer n.randomnessCacheLock.Unlock()
- if len(n.randomnessCache) > 1000 {
- // Randomly purge one randomness from cache.
- for k := range n.randomnessCache {
- delete(n.randomnessCache, k)
- break
- }
- }
- n.randomnessCache[rand.BlockHash] = rand
-}
-
func (n *Network) markAgreementResultAsSent(blockHash common.Hash) bool {
n.sentAgreementLock.Lock()
defer n.sentAgreementLock.Unlock()
@@ -697,23 +587,6 @@ func (n *Network) markAgreementResultAsSent(blockHash common.Hash) bool {
return true
}
-func (n *Network) markRandomnessResultAsSent(blockHash common.Hash) bool {
- n.sentRandomnessLock.Lock()
- defer n.sentRandomnessLock.Unlock()
- if _, exist := n.sentRandomness[blockHash]; exist {
- return false
- }
- if len(n.sentRandomness) > 1000 {
- // Randomly drop one entry.
- for k := range n.sentRandomness {
- delete(n.sentRandomness, k)
- break
- }
- }
- n.sentRandomness[blockHash] = struct{}{}
- return true
-}
-
func (n *Network) cloneForFake(v interface{}) interface{} {
if n.config.Type != NetworkTypeFake {
return v
@@ -721,9 +594,9 @@ func (n *Network) cloneForFake(v interface{}) interface{} {
switch val := v.(type) {
case *types.Block:
return val.Clone()
- case *types.BlockRandomnessResult:
+ case *types.AgreementResult:
// Perform deep copy for randomness result.
- return cloneBlockRandomnessResult(val)
+ return cloneAgreementResult(val)
}
return v
}
@@ -749,27 +622,6 @@ func (n *Network) getNotarySet(round uint64) map[types.NodeID]struct{} {
return set
}
-// getDKGSet gets DKG set for that round from cache.
-func (n *Network) getDKGSet(round uint64) map[types.NodeID]struct{} {
- if n.cache == nil {
- // Default behavior is to broadcast to all peers, which makes it easier
- // to be used in simple test cases.
- return n.peers
- }
- n.dkgSetCachesLock.Lock()
- defer n.dkgSetCachesLock.Unlock()
- set, exists := n.dkgSetCaches[round]
- if !exists {
- var err error
- set, err = n.cache.GetDKGSet(round)
- if err != nil {
- panic(err)
- }
- n.dkgSetCaches[round] = set
- }
- return set
-}
-
func (n *Network) send(endpoint types.NodeID, msg interface{}) {
go func() {
time.Sleep(n.config.DirectLatency.Delay())