aboutsummaryrefslogtreecommitdiffstats
path: root/core/test/network.go
diff options
context:
space:
mode:
Diffstat (limited to 'core/test/network.go')
-rw-r--r--core/test/network.go178
1 files changed, 139 insertions, 39 deletions
diff --git a/core/test/network.go b/core/test/network.go
index 207d8a4..a79898e 100644
--- a/core/test/network.go
+++ b/core/test/network.go
@@ -125,34 +125,38 @@ func (req *PullRequest) UnmarshalJSON(data []byte) (err error) {
// Network implements core.Network interface based on TransportClient.
type Network struct {
- ID types.NodeID
- config NetworkConfig
- ctx context.Context
- ctxCancel context.CancelFunc
- trans TransportClient
- fromTransport <-chan *TransportEnvelope
- toConsensus chan interface{}
- toNode chan interface{}
- sentRandomnessLock sync.Mutex
- sentRandomness map[common.Hash]struct{}
- sentAgreementLock sync.Mutex
- sentAgreement map[common.Hash]struct{}
- blockCacheLock sync.RWMutex
- blockCache map[common.Hash]*types.Block
- voteCacheLock sync.RWMutex
- voteCache map[types.Position]map[types.VoteHeader]*types.Vote
- voteCacheSize int
- votePositions []types.Position
- stateModule *State
- peers map[types.NodeID]struct{}
- unreceivedBlocksLock sync.RWMutex
- unreceivedBlocks map[common.Hash]chan<- common.Hash
- latencyModel LatencyModel
- cache *utils.NodeSetCache
- notarySetCachesLock sync.Mutex
- notarySetCaches map[uint64]map[uint32]map[types.NodeID]struct{}
- dkgSetCachesLock sync.Mutex
- dkgSetCaches map[uint64]map[types.NodeID]struct{}
+ ID types.NodeID
+ config NetworkConfig
+ ctx context.Context
+ ctxCancel context.CancelFunc
+ trans TransportClient
+ fromTransport <-chan *TransportEnvelope
+ toConsensus chan interface{}
+ toNode chan interface{}
+ sentRandomnessLock sync.Mutex
+ sentRandomness map[common.Hash]struct{}
+ sentAgreementLock sync.Mutex
+ sentAgreement map[common.Hash]struct{}
+ blockCacheLock sync.RWMutex
+ blockCache map[common.Hash]*types.Block
+ voteCacheLock sync.RWMutex
+ voteCache map[types.Position]map[types.VoteHeader]*types.Vote
+ voteCacheSize int
+ votePositions []types.Position
+ randomnessCacheLock sync.RWMutex
+ randomnessCache map[common.Hash]*types.BlockRandomnessResult
+ stateModule *State
+ peers map[types.NodeID]struct{}
+ unreceivedBlocksLock sync.RWMutex
+ unreceivedBlocks map[common.Hash]chan<- common.Hash
+ unreceivedRandomnessLock sync.RWMutex
+ unreceivedRandomness map[common.Hash]chan<- common.Hash
+ latencyModel LatencyModel
+ cache *utils.NodeSetCache
+ notarySetCachesLock sync.Mutex
+ notarySetCaches map[uint64]map[uint32]map[types.NodeID]struct{}
+ dkgSetCachesLock sync.Mutex
+ dkgSetCaches map[uint64]map[types.NodeID]struct{}
}
// NewNetwork setup network stuffs for nodes, which provides an
@@ -161,17 +165,19 @@ func NewNetwork(pubKey crypto.PublicKey, latency LatencyModel,
marshaller Marshaller, config NetworkConfig) (n *Network) {
// Construct basic network instance.
n = &Network{
- ID: types.NewNodeID(pubKey),
- config: config,
- toConsensus: make(chan interface{}, 1000),
- toNode: make(chan interface{}, 1000),
- sentRandomness: make(map[common.Hash]struct{}),
- sentAgreement: make(map[common.Hash]struct{}),
- blockCache: make(map[common.Hash]*types.Block),
- unreceivedBlocks: make(map[common.Hash]chan<- common.Hash),
- latencyModel: latency,
- notarySetCaches: make(map[uint64]map[uint32]map[types.NodeID]struct{}),
- dkgSetCaches: make(map[uint64]map[types.NodeID]struct{}),
+ ID: types.NewNodeID(pubKey),
+ config: config,
+ toConsensus: make(chan interface{}, 1000),
+ toNode: make(chan interface{}, 1000),
+ sentRandomness: make(map[common.Hash]struct{}),
+ sentAgreement: make(map[common.Hash]struct{}),
+ blockCache: make(map[common.Hash]*types.Block),
+ randomnessCache: make(map[common.Hash]*types.BlockRandomnessResult),
+ unreceivedBlocks: make(map[common.Hash]chan<- common.Hash),
+ unreceivedRandomness: make(map[common.Hash]chan<- common.Hash),
+ latencyModel: latency,
+ notarySetCaches: make(map[uint64]map[uint32]map[types.NodeID]struct{}),
+ dkgSetCaches: make(map[uint64]map[types.NodeID]struct{}),
voteCache: make(
map[types.Position]map[types.VoteHeader]*types.Vote),
}
@@ -200,6 +206,11 @@ func (n *Network) PullVotes(pos types.Position) {
go n.pullVotesAsync(pos)
}
+// PullRandomness implememnts core.Network interface.
+func (n *Network) PullRandomness(hashes common.Hashes) {
+ go n.pullRandomnessAsync(hashes)
+}
+
// BroadcastVote implements core.Network interface.
func (n *Network) BroadcastVote(vote *types.Vote) {
n.broadcastToSet(
@@ -256,6 +267,7 @@ func (n *Network) BroadcastRandomnessResult(
if err := n.trans.Broadcast(randResult); err != nil {
panic(err)
}
+ n.addRandomnessToCache(randResult)
}
// SendDKGPrivateShare implements core.Network interface.
@@ -380,6 +392,27 @@ func (n *Network) handlePullRequest(req *PullRequest) {
}
}
}()
+ case "randomness":
+ hashes := req.Identity.(common.Hashes)
+ func() {
+ n.randomnessCacheLock.Lock()
+ defer n.randomnessCacheLock.Unlock()
+ All:
+ for _, h := range hashes {
+ r, exists := n.randomnessCache[h]
+ if !exists {
+ continue
+ }
+ select {
+ case <-n.ctx.Done():
+ break All
+ default:
+ }
+ if err := n.trans.Send(req.Requester, r); err != nil {
+ log.Println("unable to send randomness", req.Requester, err)
+ }
+ }
+ }()
default:
panic(fmt.Errorf("unknown type of pull request: %v", req.Type))
}
@@ -539,6 +572,60 @@ func (n *Network) pullVotesAsync(pos types.Position) {
}
}
+func (n *Network) pullRandomnessAsync(hashes common.Hashes) {
+ // Setup notification channels for each block hash.
+ notYetReceived := make(map[common.Hash]struct{})
+ ch := make(chan common.Hash, len(hashes))
+ func() {
+ n.unreceivedRandomnessLock.Lock()
+ defer n.unreceivedRandomnessLock.Unlock()
+ for _, h := range hashes {
+ if _, exists := n.unreceivedRandomness[h]; exists {
+ continue
+ }
+ n.unreceivedRandomness[h] = ch
+ notYetReceived[h] = struct{}{}
+ }
+ }()
+ req := &PullRequest{
+ Requester: n.ID,
+ Type: "randomness",
+ Identity: hashes,
+ }
+ // Randomly pick peers to send pull requests.
+Loop:
+ for nID := range n.peers {
+ if nID == n.ID {
+ continue
+ }
+ if err := n.trans.Send(nID, req); err != nil {
+ // Try next peer.
+ continue
+ }
+ select {
+ case <-n.ctx.Done():
+ break Loop
+ case <-time.After(2 * n.latencyModel.Delay()):
+ // Consume everything in the notification channel.
+ for {
+ select {
+ case h, ok := <-ch:
+ if !ok {
+ // This network module is closed.
+ break Loop
+ }
+ delete(notYetReceived, h)
+ if len(notYetReceived) == 0 {
+ break Loop
+ }
+ default:
+ continue Loop
+ }
+ }
+ }
+ }
+}
+
func (n *Network) addBlockToCache(b *types.Block) {
n.blockCacheLock.Lock()
defer n.blockCacheLock.Unlock()
@@ -573,6 +660,19 @@ func (n *Network) addVoteToCache(v *types.Vote) {
n.voteCacheSize++
}
+func (n *Network) addRandomnessToCache(rand *types.BlockRandomnessResult) {
+ n.randomnessCacheLock.Lock()
+ defer n.randomnessCacheLock.Unlock()
+ if len(n.randomnessCache) > 1000 {
+ // Randomly purge one randomness from cache.
+ for k := range n.randomnessCache {
+ delete(n.randomnessCache, k)
+ break
+ }
+ }
+ n.randomnessCache[rand.BlockHash] = rand
+}
+
func (n *Network) cloneForFake(v interface{}) interface{} {
if n.config.Type != NetworkTypeFake {
return v