// Copyright 2018 The dexon-consensus Authors
// This file is part of the dexon-consensus library.
//
// The dexon-consensus library is free software: you can redistribute it
// and/or modify it under the terms of the GNU Lesser General Public License as
// published by the Free Software Foundation, either version 3 of the License,
// or (at your option) any later version.
//
// The dexon-consensus library is distributed in the hope that it will be
// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
// General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the dexon-consensus library. If not, see
// <http://www.gnu.org/licenses/>.
package test
import (
"context"
"encoding/json"
"errors"
"fmt"
"net"
"strconv"
"sync"
"time"
"github.com/dexon-foundation/dexon-consensus/common"
"github.com/dexon-foundation/dexon-consensus/core/crypto"
"github.com/dexon-foundation/dexon-consensus/core/types"
typesDKG "github.com/dexon-foundation/dexon-consensus/core/types/dkg"
"github.com/dexon-foundation/dexon-consensus/core/utils"
)
const (
// Count of maximum count of peers to pull votes from.
maxPullingPeerCount = 3
maxBlockCache = 1000
maxVoteCache = 128
// Gossiping parameter.
gossipAgreementResultPercent = 33
)
// NetworkType is the simulation network type.
type NetworkType string
// NetworkType enums.
const (
NetworkTypeTCP NetworkType = "tcp"
NetworkTypeTCPLocal NetworkType = "tcp-local"
NetworkTypeFake NetworkType = "fake"
)
// NetworkConfig is the configuration for Network module.
type NetworkConfig struct {
Type NetworkType
PeerServer string
PeerPort int
DirectLatency LatencyModel
GossipLatency LatencyModel
Marshaller Marshaller
}
// PullRequest is a generic request to pull everything (ex. vote, block...).
type PullRequest struct {
Requester types.NodeID
Type string
Identity interface{}
}
// MarshalJSON implements json.Marshaller.
func (req *PullRequest) MarshalJSON() (b []byte, err error) {
var idAsBytes []byte
// Make sure caller prepare correct identity for pull requests.
switch req.Type {
case "block":
idAsBytes, err = json.Marshal(req.Identity.(common.Hashes))
case "vote":
idAsBytes, err = json.Marshal(req.Identity.(types.Position))
default:
err = fmt.Errorf("unknown ID type for pull request: %v", req.Type)
}
if err != nil {
return
}
b, err = json.Marshal(&struct {
Requester types.NodeID `json:"req"`
Type string `json:"type"`
Identity []byte `json:"id"`
}{req.Requester, req.Type, idAsBytes})
return
}
// UnmarshalJSON iumplements json.Unmarshaller.
func (req *PullRequest) UnmarshalJSON(data []byte) (err error) {
rawReq := &struct {
Requester types.NodeID `json:"req"`
Type string `json:"type"`
Identity []byte `json:"id"`
}{}
if err = json.Unmarshal(data, rawReq); err != nil {
return
}
var ID interface{}
switch rawReq.Type {
case "block":
hashes := common.Hashes{}
if err = json.Unmarshal(rawReq.Identity, &hashes); err != nil {
break
}
ID = hashes
case "vote":
pos := types.Position{}
if err = json.Unmarshal(rawReq.Identity, &pos); err != nil {
break
}
ID = pos
default:
err = fmt.Errorf("unknown pull request type: %v", rawReq.Type)
}
if err != nil {
return
}
req.Requester = rawReq.Requester
req.Type = rawReq.Type
req.Identity = ID
return
}
// NetworkCensor is a interface to determine if a message should be censored.
type NetworkCensor interface {
Censor(interface{}) bool
}
type censorClient struct {
TransportClient
censor NetworkCensor
lock sync.RWMutex
}
func (cc *censorClient) Send(ID types.NodeID, msg interface{}) error {
if func() bool {
cc.lock.RLock()
defer cc.lock.RUnlock()
return cc.censor.Censor(msg)
}() {
return nil
}
return cc.TransportClient.Send(ID, msg)
}
func (cc *censorClient) Broadcast(
IDs map[types.NodeID]struct{}, latency LatencyModel, msg interface{}) error {
if func() bool {
cc.lock.RLock()
defer cc.lock.RUnlock()
return cc.censor.Censor(msg)
}() {
return nil
}
return cc.TransportClient.Broadcast(IDs, latency, msg)
}
type dummyCensor struct{}
func (dc *dummyCensor) Censor(interface{}) bool { return false }
// Network implements core.Network interface based on TransportClient.
type Network struct {
ID types.NodeID
config NetworkConfig
ctx context.Context
ctxCancel context.CancelFunc
trans *censorClient
dMoment time.Time
fromTransport <-chan *TransportEnvelope
toConsensus chan interface{}
toNode chan interface{}
sentAgreementLock sync.Mutex
sentAgreement map[common.Hash]struct{}
blockCacheLock sync.RWMutex
blockCache map[common.Hash]*types.Block
voteCacheLock sync.RWMutex
voteCache map[types.Position]map[types.VoteHeader]*types.Vote
voteCacheSize int
votePositions []types.Position
stateModule *State
peers map[types.NodeID]struct{}
unreceivedBlocksLock sync.RWMutex
unreceivedBlocks map[common.Hash]chan<- common.Hash
cache *utils.NodeSetCache
notarySetCachesLock sync.Mutex
notarySetCaches map[uint64]map[types.NodeID]struct{}
censor NetworkCensor
censorLock sync.RWMutex
}
// NewNetwork setup network stuffs for nodes, which provides an
// implementation of core.Network based on TransportClient.
func NewNetwork(pubKey crypto.PublicKey, config NetworkConfig) (
n *Network) {
// Construct basic network instance.
n = &Network{
ID: types.NewNodeID(pubKey),
config: config,
toConsensus: make(chan interface{}, 1000),
toNode: make(chan interface{}, 1000),
sentAgreement: make(map[common.Hash]struct{}),
blockCache: make(map[common.Hash]*types.Block, maxBlockCache),
unreceivedBlocks: make(map[common.Hash]chan<- common.Hash),
peers: make(map[types.NodeID]struct{}),
notarySetCaches: make(map[uint64]map[types.NodeID]struct{}),
voteCache: make(
map[types.Position]map[types.VoteHeader]*types.Vote),
censor: &dummyCensor{},
}
n.ctx, n.ctxCancel = context.WithCancel(context.Background())
// Construct transport layer.
var trans TransportClient
switch config.Type {
case NetworkTypeTCPLocal:
trans = NewTCPTransportClient(pubKey, config.Marshaller, true)
case NetworkTypeTCP:
trans = NewTCPTransportClient(pubKey, config.Marshaller, false)
case NetworkTypeFake:
trans = NewFakeTransportClient(pubKey)
default:
panic(fmt.Errorf("unknown network type: %v", config.Type))
}
n.trans = &censorClient{
TransportClient: trans,
censor: &dummyCensor{},
}
return
}
// SetCensor to this network module.
func (n *Network) SetCensor(censorIn, censorOut NetworkCensor) {
if censorIn == nil {
censorIn = &dummyCensor{}
}
if censorOut == nil {
censorOut = &dummyCensor{}
}
func() {
n.censorLock.Lock()
defer n.censorLock.Unlock()
n.censor = censorIn
}()
func() {
n.trans.lock.Lock()
defer n.trans.lock.Unlock()
n.trans.censor = censorOut
}()
}
// PullBlocks implements core.Network interface.
func (n *Network) PullBlocks(hashes common.Hashes) {
go n.pullBlocksAsync(hashes)
}
// PullVotes implements core.Network interface.
func (n *Network) PullVotes(pos types.Position) {
go n.pullVotesAsync(pos)
}
// BroadcastVote implements core.Network interface.
func (n *Network) BroadcastVote(vote *types.Vote) {
if err := n.trans.Broadcast(n.getNotarySet(vote.Position.Round),
n.config.DirectLatency, vote); err != nil {
panic(err)
}
n.addVoteToCache(vote)
}
// BroadcastBlock implements core.Network interface.
func (n *Network) BroadcastBlock(block *types.Block) {
// Avoid data race in fake transport.
block = n.cloneForFake(block).(*types.Block)
notarySet := n.getNotarySet(block.Position.Round)
if err := n.trans.Broadcast(
notarySet, n.config.DirectLatency, block); err != nil {
panic(err)
}
if err := n.trans.Broadcast(getComplementSet(n.peers, notarySet),
n.config.GossipLatency, block); err != nil {
panic(err)
}
n.addBlockToCache(block)
if block.IsFinalized() {
n.addBlockFinalizationToCache(
block.Hash,
block.Finalization.Height,
block.Finalization.Randomness)
}
}
// BroadcastAgreementResult implements core.Network interface.
func (n *Network) BroadcastAgreementResult(
result *types.AgreementResult) {
if !n.markAgreementResultAsSent(result.BlockHash) {
return
}
n.addBlockFinalizationToCache(
result.BlockHash,
result.FinalizationHeight,
nil,
)
notarySet := n.getNotarySet(result.Position.Round)
count := len(notarySet)*gossipAgreementResultPercent/100 + 1
for nID := range notarySet {
if count--; count < 0 {
break
}
if err := n.trans.Send(nID, result); err != nil {
panic(err)
}
}
// Gossip to other nodes.
if err := n.trans.Broadcast(getComplementSet(n.peers, notarySet),
n.config.GossipLatency, result); err != nil {
panic(err)
}
}
// SendDKGPrivateShare implements core.Network interface.
func (n *Network) SendDKGPrivateShare(
recv crypto.PublicKey, prvShare *typesDKG.PrivateShare) {
n.send(types.NewNodeID(recv), prvShare)
}
// BroadcastDKGPrivateShare implements core.Network interface.
func (n *Network) BroadcastDKGPrivateShare(
prvShare *typesDKG.PrivateShare) {
if err := n.trans.Broadcast(n.getNotarySet(prvShare.Round),
n.config.DirectLatency, prvShare); err != nil {
panic(err)
}
}
// BroadcastDKGPartialSignature implements core.Network interface.
func (n *Network) BroadcastDKGPartialSignature(
psig *typesDKG.PartialSignature) {
if err := n.trans.Broadcast(
n.getNotarySet(psig.Round), n.config.DirectLatency, psig); err != nil {
panic(err)
}
}
// ReceiveChan implements core.Network interface.
func (n *Network) ReceiveChan() <-chan interface{} {
return n.toConsensus
}
// Setup transport layer.
func (n *Network) Setup(serverEndpoint interface{}) (err error) {
// Join the p2p network.
switch n.config.Type {
case NetworkTypeTCP, NetworkTypeTCPLocal:
addr := net.JoinHostPort(
n.config.PeerServer, strconv.Itoa(n.config.PeerPort))
n.fromTransport, err = n.trans.Join(addr)
case NetworkTypeFake:
n.fromTransport, err = n.trans.Join(serverEndpoint)
default:
err = fmt.Errorf("unknown network type: %v", n.config.Type)
}
if err != nil {
return
}
peerKeys := n.trans.Peers()
for _, k := range peerKeys {
n.peers[types.NewNodeID(k)] = struct{}{}
}
return
}
func (n *Network) dispatchMsg(e *TransportEnvelope) {
if func() bool {
n.censorLock.RLock()
defer n.censorLock.RUnlock()
return n.censor.Censor(e.Msg)
}() {
return
}
msg := n.cloneForFake(e.Msg)
switch v := msg.(type) {
case *types.Block:
n.addBlockToCache(v)
// Notify pulling routine about the newly arrived block.
func() {
n.unreceivedBlocksLock.Lock()
defer n.unreceivedBlocksLock.Unlock()
if ch, exists := n.unreceivedBlocks[v.Hash]; exists {
ch <- v.Hash
}
delete(n.unreceivedBlocks, v.Hash)
}()
n.toConsensus <- v
case *types.Vote:
// Add this vote to cache.
n.addVoteToCache(v)
n.toConsensus <- v
case *types.AgreementResult,
*typesDKG.PrivateShare, *typesDKG.PartialSignature:
n.toConsensus <- v
case packedStateChanges:
if n.stateModule == nil {
panic(errors.New(
"receive packed state change request without state attached"))
}
if err := n.stateModule.AddRequestsFromOthers([]byte(v)); err != nil {
panic(err)
}
case *PullRequest:
go n.handlePullRequest(v)
default:
n.toNode <- v
}
}
func (n *Network) handlePullRequest(req *PullRequest) {
switch req.Type {
case "block":
hashes := req.Identity.(common.Hashes)
func() {
n.blockCacheLock.Lock()
defer n.blockCacheLock.Unlock()
All:
for _, h := range hashes {
b, exists := n.blockCache[h]
if !exists {
continue
}
select {
case <-n.ctx.Done():
break All
default:
}
n.send(req.Requester, b)
}
}()
case "vote":
pos := req.Identity.(types.Position)
func() {
n.voteCacheLock.Lock()
defer n.voteCacheLock.Unlock()
if votes, exists := n.voteCache[pos]; exists {
for _, v := range votes {
n.send(req.Requester, v)
}
}
}()
default:
panic(fmt.Errorf("unknown type of pull request: %v", req.Type))
}
}
// Run the main loop.
func (n *Network) Run() {
Loop:
for {
select {
case <-n.ctx.Done():
break Loop
default:
}
select {
case <-n.ctx.Done():
break Loop
case e, ok := <-n.fromTransport:
if !ok {
break Loop
}
go n.dispatchMsg(e)
}
}
}
// Close stops the network.
func (n *Network) Close() (err error) {
n.ctxCancel()
close(n.toConsensus)
n.toConsensus = nil
close(n.toNode)
n.toNode = nil
if err = n.trans.Close(); err != nil {
return
}
return
}
// Report exports 'Report' method of TransportClient.
func (n *Network) Report(msg interface{}) error {
return n.trans.Report(msg)
}
// Broadcast a message to all peers.
func (n *Network) Broadcast(msg interface{}) error {
return n.trans.Broadcast(n.peers, &FixedLatencyModel{}, msg)
}
// Peers exports 'Peers' method of Transport.
func (n *Network) Peers() []crypto.PublicKey {
return n.trans.Peers()
}
// DMoment exports 'DMoment' method of Transport.
func (n *Network) DMoment() time.Time {
return n.trans.DMoment()
}
// ReceiveChanForNode returns a channel for messages not handled by
// core.Consensus.
func (n *Network) ReceiveChanForNode() <-chan interface{} {
return n.toNode
}
// addStateModule attaches a State instance to this network.
func (n *Network) addStateModule(s *State) {
// This variable should be attached before run, no lock to protect it.
n.stateModule = s
}
// AttachNodeSetCache attaches an utils.NodeSetCache to this module. Once attached
// The behavior of Broadcast-X methods would be switched to broadcast to correct
// set of peers, instead of all peers.
func (n *Network) AttachNodeSetCache(cache *utils.NodeSetCache) {
// This variable should be attached before run, no lock to protect it.
n.cache = cache
}
// PurgeNodeSetCache purges cache of some round in attached utils.NodeSetCache.
func (n *Network) PurgeNodeSetCache(round uint64) {
n.cache.Purge(round)
}
func (n *Network) pullBlocksAsync(hashes common.Hashes) {
// Setup notification channels for each block hash.
notYetReceived := make(map[common.Hash]struct{})
ch := make(chan common.Hash, len(hashes))
func() {
n.unreceivedBlocksLock.Lock()
defer n.unreceivedBlocksLock.Unlock()
for _, h := range hashes {
if _, exists := n.unreceivedBlocks[h]; exists {
continue
}
n.unreceivedBlocks[h] = ch
notYetReceived[h] = struct{}{}
}
}()
req := &PullRequest{
Requester: n.ID,
Type: "block",
Identity: hashes,
}
// Randomly pick peers to send pull requests.
Loop:
for nID := range n.peers {
if nID == n.ID {
continue
}
n.send(nID, req)
select {
case <-n.ctx.Done():
break Loop
case <-time.After(2 * n.config.DirectLatency.Delay()):
// Consume everything in the notification channel.
for {
select {
case h, ok := <-ch:
if !ok {
// This network module is closed.
break Loop
}
delete(notYetReceived, h)
if len(notYetReceived) == 0 {
break Loop
}
default:
continue Loop
}
}
}
}
}
func (n *Network) pullVotesAsync(pos types.Position) {
// Randomly pick several peers to pull votes from.
req := &PullRequest{
Requester: n.ID,
Type: "vote",
Identity: pos,
}
// Get corresponding notary set.
notarySet := n.getNotarySet(pos.Round)
// Randomly select one peer from notary set and send a pull request.
sentCount := 0
for nID := range notarySet {
n.send(nID, req)
sentCount++
if sentCount >= maxPullingPeerCount {
break
}
}
}
func (n *Network) addBlockToCache(b *types.Block) {
n.blockCacheLock.Lock()
defer n.blockCacheLock.Unlock()
if len(n.blockCache) > maxBlockCache {
// Randomly purge one block from cache.
for k := range n.blockCache {
delete(n.blockCache, k)
break
}
}
n.blockCache[b.Hash] = b.Clone()
}
func (n *Network) addBlockFinalizationToCache(
hash common.Hash, height uint64, rand []byte) {
n.blockCacheLock.Lock()
defer n.blockCacheLock.Unlock()
block, exist := n.blockCache[hash]
if !exist {
return
}
block.Finalization.Height = height
block.Finalization.Randomness = rand
}
func (n *Network) addVoteToCache(v *types.Vote) {
n.voteCacheLock.Lock()
defer n.voteCacheLock.Unlock()
if n.voteCacheSize >= maxVoteCache {
pos := n.votePositions[0]
n.voteCacheSize -= len(n.voteCache[pos])
delete(n.voteCache, pos)
n.votePositions = n.votePositions[1:]
}
if _, exists := n.voteCache[v.Position]; !exists {
n.votePositions = append(n.votePositions, v.Position)
n.voteCache[v.Position] =
make(map[types.VoteHeader]*types.Vote)
}
if _, exists := n.voteCache[v.Position][v.VoteHeader]; exists {
return
}
n.voteCache[v.Position][v.VoteHeader] = v
n.voteCacheSize++
}
func (n *Network) markAgreementResultAsSent(blockHash common.Hash) bool {
n.sentAgreementLock.Lock()
defer n.sentAgreementLock.Unlock()
if _, exist := n.sentAgreement[blockHash]; exist {
return false
}
if len(n.sentAgreement) > 1000 {
// Randomly drop one entry.
for k := range n.sentAgreement {
delete(n.sentAgreement, k)
break
}
}
n.sentAgreement[blockHash] = struct{}{}
return true
}
func (n *Network) cloneForFake(v interface{}) interface{} {
if n.config.Type != NetworkTypeFake {
return v
}
switch val := v.(type) {
case *types.Block:
return val.Clone()
case *types.AgreementResult:
// Perform deep copy for randomness result.
return cloneAgreementResult(val)
}
return v
}
// getNotarySet gets notary set for that (round, chain) from cache.
func (n *Network) getNotarySet(round uint64) map[types.NodeID]struct{} {
if n.cache == nil {
// Default behavior is to broadcast to all peers, which makes it easier
// to be used in simple test cases.
return n.peers
}
n.notarySetCachesLock.Lock()
defer n.notarySetCachesLock.Unlock()
set, exists := n.notarySetCaches[round]
if !exists {
var err error
set, err = n.cache.GetNotarySet(round)
if err != nil {
panic(err)
}
n.notarySetCaches[round] = set
}
return set
}
func (n *Network) send(endpoint types.NodeID, msg interface{}) {
go func() {
time.Sleep(n.config.DirectLatency.Delay())
if err := n.trans.Send(endpoint, msg); err != nil {
panic(err)
}
}()
}