aboutsummaryrefslogtreecommitdiffstats
path: root/dex
diff options
context:
space:
mode:
authorSonic <sonic@cobinhood.com>2018-10-12 15:02:33 +0800
committerWei-Ning Huang <w@dexon.org>2019-03-12 12:19:09 +0800
commit961231f91fc4c7edc122ccdf337d804a885b1f6b (patch)
tree4de8ac71927a013dedb65a7be91770c1a3fc97bb /dex
parentf4936aa1eb7fa01f56c28f081af266c2a4924a61 (diff)
downloaddexon-961231f91fc4c7edc122ccdf337d804a885b1f6b.tar
dexon-961231f91fc4c7edc122ccdf337d804a885b1f6b.tar.gz
dexon-961231f91fc4c7edc122ccdf337d804a885b1f6b.tar.bz2
dexon-961231f91fc4c7edc122ccdf337d804a885b1f6b.tar.lz
dexon-961231f91fc4c7edc122ccdf337d804a885b1f6b.tar.xz
dexon-961231f91fc4c7edc122ccdf337d804a885b1f6b.tar.zst
dexon-961231f91fc4c7edc122ccdf337d804a885b1f6b.zip
dex: network: implement the network interface
Diffstat (limited to 'dex')
-rw-r--r--dex/backend.go35
-rw-r--r--dex/governance.go20
-rw-r--r--dex/handler.go116
-rw-r--r--dex/helper_test.go2
-rw-r--r--dex/network.go18
-rw-r--r--dex/peer.go230
-rw-r--r--dex/peer_test.go28
-rw-r--r--dex/protocol.go134
-rw-r--r--dex/protocol_test.go499
9 files changed, 1024 insertions, 58 deletions
diff --git a/dex/backend.go b/dex/backend.go
index d01ab8040..caac0fe21 100644
--- a/dex/backend.go
+++ b/dex/backend.go
@@ -85,7 +85,6 @@ func New(ctx *node.ServiceContext, config *Config) (*Dexon, error) {
if err != nil {
panic(err)
}
- network := NewDexconNetwork()
chainDb, err := CreateDB(ctx, config, "chaindata")
if err != nil {
@@ -116,7 +115,6 @@ func New(ctx *node.ServiceContext, config *Config) (*Dexon, error) {
networkID: config.NetworkId,
bloomRequests: make(chan chan *bloombits.Retrieval),
bloomIndexer: NewBloomIndexer(chainDb, params.BloomBitsBlocks, params.BloomConfirms),
- network: network,
blockdb: db,
engine: dexcon.New(&params.DexconConfig{}),
}
@@ -154,9 +152,18 @@ func New(ctx *node.ServiceContext, config *Config) (*Dexon, error) {
dex.governance = NewDexconGovernance(dex.APIBackend, dex.chainConfig, config.PrivateKey)
dex.app = NewDexconApp(dex.txPool, dex.blockchain, dex.governance, chainDb, config, vmConfig)
- privKey := coreEcdsa.NewPrivateKeyFromECDSA(config.PrivateKey)
- dex.consensus = dexCore.NewConsensus(dex.app, dex.governance, db, network, privKey)
+ pm, err := NewProtocolManager(dex.chainConfig, config.SyncMode,
+ config.NetworkId, dex.eventMux, dex.txPool, dex.engine, dex.blockchain,
+ chainDb, dex.governance)
+ if err != nil {
+ return nil, err
+ }
+
+ dex.protocolManager = pm
+ dex.network = NewDexconNetwork(pm)
+ privKey := coreEcdsa.NewPrivateKeyFromECDSA(config.PrivateKey)
+ dex.consensus = dexCore.NewConsensus(dex.app, dex.governance, db, dex.network, privKey)
return dex, nil
}
@@ -168,7 +175,24 @@ func (s *Dexon) APIs() []rpc.API {
return nil
}
-func (s *Dexon) Start(server *p2p.Server) error {
+func (s *Dexon) Start(srvr *p2p.Server) error {
+ // Start the bloom bits servicing goroutines
+ s.startBloomHandlers(params.BloomBitsBlocks)
+
+ // Start the RPC service
+ s.netRPCService = ethapi.NewPublicNetAPI(srvr, s.NetVersion())
+
+ // Figure out a max peers count based on the server limits
+ maxPeers := srvr.MaxPeers
+ if s.config.LightServ > 0 {
+ if s.config.LightPeers >= srvr.MaxPeers {
+ return fmt.Errorf("invalid peer config: light peer count (%d) >= total peer count (%d)", s.config.LightPeers, srvr.MaxPeers)
+ }
+ maxPeers -= s.config.LightPeers
+ }
+ // Start the networking layer and the light server if requested
+ s.protocolManager.Start(srvr, maxPeers)
+
return nil
}
@@ -196,3 +220,4 @@ func (d *Dexon) EventMux() *event.TypeMux { return d.eventMux }
func (d *Dexon) Engine() consensus.Engine { return d.engine }
func (d *Dexon) ChainDb() ethdb.Database { return d.chainDb }
func (d *Dexon) Downloader() *downloader.Downloader { return d.protocolManager.downloader }
+func (d *Dexon) NetVersion() uint64 { return d.networkID }
diff --git a/dex/governance.go b/dex/governance.go
index 369cc2f0c..37985cec4 100644
--- a/dex/governance.go
+++ b/dex/governance.go
@@ -10,10 +10,13 @@ import (
coreCrypto "github.com/dexon-foundation/dexon-consensus-core/core/crypto"
coreEcdsa "github.com/dexon-foundation/dexon-consensus-core/core/crypto/ecdsa"
coreTypes "github.com/dexon-foundation/dexon-consensus-core/core/types"
+
"github.com/dexon-foundation/dexon/common"
+ "github.com/dexon-foundation/dexon/core"
"github.com/dexon-foundation/dexon/core/types"
"github.com/dexon-foundation/dexon/core/vm"
"github.com/dexon-foundation/dexon/crypto"
+ "github.com/dexon-foundation/dexon/event"
"github.com/dexon-foundation/dexon/log"
"github.com/dexon-foundation/dexon/params"
"github.com/dexon-foundation/dexon/rlp"
@@ -276,3 +279,20 @@ func (d *DexconGovernance) IsDKGFinal(round uint64) bool {
count := s.DKGFinalizedsCount(big.NewInt(int64(round))).Uint64()
return count >= threshold
}
+
+// TODO(sonic): finish these
+func (d *DexconGovernance) GetChainNum(uint64) uint32 {
+ return 3
+}
+
+func (d *DexconGovernance) GetNotarySet(uint32, uint64) map[string]struct{} {
+ return nil
+}
+
+func (d *DexconGovernance) GetDKGSet(uint64) map[string]struct{} {
+ return nil
+}
+
+func (d *DexconGovernance) SubscribeNewCRSEvent(ch chan core.NewCRSEvent) event.Subscription {
+ return nil
+}
diff --git a/dex/handler.go b/dex/handler.go
index 67cbe8a63..e013b9722 100644
--- a/dex/handler.go
+++ b/dex/handler.go
@@ -26,6 +26,9 @@ import (
"sync/atomic"
"time"
+ "github.com/dexon-foundation/dexon-consensus-core/core/crypto"
+ coreTypes "github.com/dexon-foundation/dexon-consensus-core/core/types"
+
"github.com/dexon-foundation/dexon/common"
"github.com/dexon-foundation/dexon/consensus"
"github.com/dexon-foundation/dexon/core"
@@ -97,6 +100,9 @@ type ProtocolManager struct {
crsCh chan core.NewCRSEvent
crsSub event.Subscription
+ // channels for dexon consensus core
+ receiveCh chan interface{}
+
srvr p2pServer
// wait group is used for graceful shutdowns during downloading
@@ -125,6 +131,7 @@ func NewProtocolManager(
noMorePeers: make(chan struct{}),
txsyncCh: make(chan *txsync),
quitSync: make(chan struct{}),
+ receiveCh: make(chan interface{}, 1024),
}
// Figure out whether to allow fast sync or not
@@ -267,6 +274,10 @@ func (pm *ProtocolManager) Stop() {
log.Info("Ethereum protocol stopped")
}
+func (pm *ProtocolManager) ReceiveChan() <-chan interface{} {
+ return pm.receiveCh
+}
+
func (pm *ProtocolManager) newPeer(pv int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
return newPeer(pv, p, newMeteredMsgWriter(rw))
}
@@ -666,6 +677,47 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
p.MarkNodeMeta(meta.Hash())
}
pm.nodeTable.Add(metas)
+ case msg.Code == LatticeBlockMsg:
+ var rb rlpLatticeBlock
+ if err := msg.Decode(&rb); err != nil {
+ fmt.Println("decode lattice block error", err)
+ return errResp(ErrDecode, "msg %v: %v", msg, err)
+ }
+ pm.receiveCh <- fromRLPLatticeBlock(&rb)
+ case msg.Code == VoteMsg:
+ var vote coreTypes.Vote
+ if err := msg.Decode(&vote); err != nil {
+ return errResp(ErrDecode, "msg %v: %v", msg, err)
+ }
+ pm.receiveCh <- &vote
+ case msg.Code == AgreementMsg:
+ // DKG set is receiver
+ var agreement coreTypes.AgreementResult
+ if err := msg.Decode(&agreement); err != nil {
+ return errResp(ErrDecode, "msg %v: %v", msg, err)
+ }
+ pm.receiveCh <- &agreement
+ case msg.Code == RandomnessMsg:
+ // Broadcast this to all peer
+ var randomness coreTypes.BlockRandomnessResult
+ if err := msg.Decode(&randomness); err != nil {
+ return errResp(ErrDecode, "msg %v: %v", msg, err)
+ }
+ pm.receiveCh <- &randomness
+ case msg.Code == DKGPrivateShareMsg:
+ // Do not relay this msg
+ var rps rlpDKGPrivateShare
+ if err := msg.Decode(&rps); err != nil {
+ return errResp(ErrDecode, "msg %v: %v", msg, err)
+ }
+ pm.receiveCh <- fromRLPDKGPrivateShare(&rps)
+ case msg.Code == DKGPartialSignatureMsg:
+ // broadcast in DKG set
+ var psig coreTypes.DKGPartialSignature
+ if err := msg.Decode(&psig); err != nil {
+ return errResp(ErrDecode, "msg %v: %v", msg, err)
+ }
+ pm.receiveCh <- &psig
default:
return errResp(ErrInvalidMsgCode, "%v", msg.Code)
}
@@ -741,6 +793,68 @@ func (pm *ProtocolManager) BroadcastMetas(metas []*NodeMeta) {
}
}
+func (pm *ProtocolManager) BroadcastVote(vote *coreTypes.Vote) {
+ label := peerLabel{
+ set: notaryset,
+ chainID: vote.Position.ChainID,
+ round: vote.Position.Round,
+ }
+
+ for _, peer := range pm.peers.PeersWithoutVote(rlpHash(vote), label) {
+ peer.AsyncSendVote(vote)
+ }
+}
+
+// TODO(sonic): try to reduce traffic
+func (pm *ProtocolManager) BroadcastLatticeBlock(block *coreTypes.Block) {
+ hash := rlpHash(toRLPLatticeBlock(block))
+ for _, peer := range pm.peers.PeersWithoutLatticeBlock(hash) {
+ peer.AsyncSendLatticeBlock(block)
+ }
+}
+
+// TODO(sonic): try to reduce traffic
+func (pm *ProtocolManager) SendDKGPrivateShare(
+ pub crypto.PublicKey, privateShare *coreTypes.DKGPrivateShare) {
+ id := discover.MustBytesID(pub.Bytes()[1:])
+ if p := pm.peers.Peer(id.String()); p != nil {
+ p.AsyncSendDKGPrivateShare(privateShare)
+ }
+}
+
+// TODO(sonic): try to reduce traffic
+func (pm *ProtocolManager) BroadcastDKGPrivateShare(
+ privateShare *coreTypes.DKGPrivateShare) {
+ for _, peer := range pm.peers.allPeers() {
+ peer.AsyncSendDKGPrivateShare(privateShare)
+ }
+}
+
+// TODO(sonic): try to reduce traffic
+func (pm *ProtocolManager) BroadcastAgreementResult(
+ agreement *coreTypes.AgreementResult) {
+ for _, peer := range pm.peers.PeersWithoutAgreement(rlpHash(agreement)) {
+ peer.AsyncSendAgreement(agreement)
+ }
+}
+
+// TODO(sonic): try to reduce traffic
+func (pm *ProtocolManager) BroadcastRandomnessResult(
+ randomness *coreTypes.BlockRandomnessResult) {
+ // random pick n peers
+ for _, peer := range pm.peers.PeersWithoutRandomness(rlpHash(randomness)) {
+ peer.AsyncSendRandomness(randomness)
+ }
+}
+
+// TODO(sonic): try to reduce traffic
+func (pm *ProtocolManager) BroadcastDKGPartialSignature(
+ psig *coreTypes.DKGPartialSignature) {
+ for _, peer := range pm.peers.PeersWithoutDKGPartialSignature(rlpHash(psig)) {
+ peer.AsyncSendDKGPartialSignature(psig)
+ }
+}
+
// Mined broadcast loop
func (pm *ProtocolManager) minedBroadcastLoop() {
// automatically stops if unsubscribe
@@ -781,6 +895,8 @@ func (pm *ProtocolManager) metaBroadcastLoop() {
// a loop keep building and maintaining peers in notary set.
// TODO: finish this
func (pm *ProtocolManager) peerSetLoop() {
+
+ log.Debug("start peer set loop")
for {
select {
case event := <-pm.crsCh:
diff --git a/dex/helper_test.go b/dex/helper_test.go
index dcda6f4d2..fc8053774 100644
--- a/dex/helper_test.go
+++ b/dex/helper_test.go
@@ -101,7 +101,7 @@ func newTestProtocolManager(mode downloader.SyncMode, blocks int, generator func
db = ethdb.NewMemDatabase()
gspec = &core.Genesis{
Config: params.TestChainConfig,
- Alloc: core.GenesisAlloc{testBank: {Balance: big.NewInt(1000000)}},
+ Alloc: core.GenesisAlloc{testBank: {Balance: big.NewInt(1000000), Staked: big.NewInt(0)}},
}
genesis = gspec.MustCommit(db)
blockchain, _ = core.NewBlockChain(db, nil, gspec.Config, engine, vm.Config{})
diff --git a/dex/network.go b/dex/network.go
index 24ef2cc63..e99b4f5b1 100644
--- a/dex/network.go
+++ b/dex/network.go
@@ -6,49 +6,53 @@ import (
)
type DexconNetwork struct {
- receiveChan chan interface{}
+ pm *ProtocolManager
}
-func NewDexconNetwork() *DexconNetwork {
- return &DexconNetwork{
- receiveChan: make(chan interface{}),
- }
+func NewDexconNetwork(pm *ProtocolManager) *DexconNetwork {
+ return &DexconNetwork{pm: pm}
}
// BroadcastVote broadcasts vote to all nodes in DEXON network.
func (n *DexconNetwork) BroadcastVote(vote *types.Vote) {
+ n.pm.BroadcastVote(vote)
}
// BroadcastBlock broadcasts block to all nodes in DEXON network.
func (n *DexconNetwork) BroadcastBlock(block *types.Block) {
+ n.pm.BroadcastLatticeBlock(block)
}
// SendDKGPrivateShare sends PrivateShare to a DKG participant.
func (n *DexconNetwork) SendDKGPrivateShare(
pub crypto.PublicKey, prvShare *types.DKGPrivateShare) {
+ n.pm.SendDKGPrivateShare(pub, prvShare)
}
// BroadcastDKGPrivateShare broadcasts PrivateShare to all DKG participants.
func (n *DexconNetwork) BroadcastDKGPrivateShare(
prvShare *types.DKGPrivateShare) {
+ n.pm.BroadcastDKGPrivateShare(prvShare)
}
// BroadcastDKGPartialSignature broadcasts partialSignature to all
// DKG participants.
func (n *DexconNetwork) BroadcastDKGPartialSignature(
psig *types.DKGPartialSignature) {
+ n.pm.BroadcastDKGPartialSignature(psig)
}
// BroadcastAgreementResult broadcasts rand request to DKG set.
func (n *DexconNetwork) BroadcastAgreementResult(randRequest *types.AgreementResult) {
-
+ n.pm.BroadcastAgreementResult(randRequest)
}
// BroadcastRandomnessResult broadcasts rand request to Notary set.
func (n *DexconNetwork) BroadcastRandomnessResult(randResult *types.BlockRandomnessResult) {
+ n.pm.BroadcastRandomnessResult(randResult)
}
// ReceiveChan returns a channel to receive messages from DEXON network.
func (n *DexconNetwork) ReceiveChan() <-chan interface{} {
- return n.receiveChan
+ return n.pm.ReceiveChan()
}
diff --git a/dex/peer.go b/dex/peer.go
index 05947b456..e7c4f5d53 100644
--- a/dex/peer.go
+++ b/dex/peer.go
@@ -24,6 +24,7 @@ import (
"time"
mapset "github.com/deckarep/golang-set"
+ coreTypes "github.com/dexon-foundation/dexon-consensus-core/core/types"
"github.com/dexon-foundation/dexon/common"
"github.com/dexon-foundation/dexon/core/types"
"github.com/dexon-foundation/dexon/p2p"
@@ -104,31 +105,45 @@ type peer struct {
td *big.Int
lock sync.RWMutex
- knownTxs mapset.Set // Set of transaction hashes known to be known by this peer
- knownMetas mapset.Set // Set of node metas known to be known by this peer
- knownBlocks mapset.Set // Set of block hashes known to be known by this peer
- queuedTxs chan []*types.Transaction // Queue of transactions to broadcast to the peer
- queuedMetas chan []*NodeMeta // Queue of node metas to broadcast to the peer
- queuedProps chan *propEvent // Queue of blocks to broadcast to the peer
- queuedAnns chan *types.Block // Queue of blocks to announce to the peer
- term chan struct{} // Termination channel to stop the broadcaster
+ knownTxs mapset.Set // Set of transaction hashes known to be known by this peer
+ knownMetas mapset.Set // Set of node metas known to be known by this peer
+ knownBlocks mapset.Set // Set of block hashes known to be known by this peer
+ knownVotes mapset.Set
+ queuedTxs chan []*types.Transaction // Queue of transactions to broadcast to the peer
+ queuedMetas chan []*NodeMeta // Queue of node metas to broadcast to the peer
+ queuedProps chan *propEvent // Queue of blocks to broadcast to the peer
+ queuedAnns chan *types.Block // Queue of blocks to announce to the peer
+ queuedLatticeBlock chan *coreTypes.Block
+ queuedVote chan *coreTypes.Vote
+ queuedAgreement chan *coreTypes.AgreementResult
+ queuedRandomness chan *coreTypes.BlockRandomnessResult
+ queuedDKGPrivateShare chan *coreTypes.DKGPrivateShare
+ queuedDKGPartialSignature chan *coreTypes.DKGPartialSignature
+ term chan struct{} // Termination channel to stop the broadcaster
}
func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
return &peer{
- Peer: p,
- rw: rw,
- version: version,
- labels: mapset.NewSet(),
- id: p.ID().String(),
- knownTxs: mapset.NewSet(),
- knownMetas: mapset.NewSet(),
- knownBlocks: mapset.NewSet(),
- queuedTxs: make(chan []*types.Transaction, maxQueuedTxs),
- queuedMetas: make(chan []*NodeMeta, maxQueuedMetas),
- queuedProps: make(chan *propEvent, maxQueuedProps),
- queuedAnns: make(chan *types.Block, maxQueuedAnns),
- term: make(chan struct{}),
+ Peer: p,
+ rw: rw,
+ version: version,
+ labels: mapset.NewSet(),
+ id: p.ID().String(),
+ knownTxs: mapset.NewSet(),
+ knownMetas: mapset.NewSet(),
+ knownBlocks: mapset.NewSet(),
+ knownVotes: mapset.NewSet(),
+ queuedTxs: make(chan []*types.Transaction, maxQueuedTxs),
+ queuedMetas: make(chan []*NodeMeta, maxQueuedMetas),
+ queuedProps: make(chan *propEvent, maxQueuedProps),
+ queuedAnns: make(chan *types.Block, maxQueuedAnns),
+ queuedLatticeBlock: make(chan *coreTypes.Block, 16),
+ queuedVote: make(chan *coreTypes.Vote, 16),
+ queuedAgreement: make(chan *coreTypes.AgreementResult, 16),
+ queuedRandomness: make(chan *coreTypes.BlockRandomnessResult, 16),
+ queuedDKGPrivateShare: make(chan *coreTypes.DKGPrivateShare, 16),
+ queuedDKGPartialSignature: make(chan *coreTypes.DKGPartialSignature, 16),
+ term: make(chan struct{}),
}
}
@@ -161,7 +176,36 @@ func (p *peer) broadcast() {
return
}
p.Log().Trace("Announced block", "number", block.Number(), "hash", block.Hash())
-
+ case block := <-p.queuedLatticeBlock:
+ if err := p.SendLatticeBlock(block); err != nil {
+ return
+ }
+ p.Log().Trace("Broadcast lattice block")
+ case vote := <-p.queuedVote:
+ if err := p.SendVote(vote); err != nil {
+ return
+ }
+ p.Log().Trace("Broadcast vote", "vote", vote.String(), "hash", rlpHash(vote))
+ case agreement := <-p.queuedAgreement:
+ if err := p.SendAgreement(agreement); err != nil {
+ return
+ }
+ p.Log().Trace("Broadcast agreement")
+ case randomness := <-p.queuedRandomness:
+ if err := p.SendRandomness(randomness); err != nil {
+ return
+ }
+ p.Log().Trace("Broadcast randomness")
+ case privateShare := <-p.queuedDKGPrivateShare:
+ if err := p.SendDKGPrivateShare(privateShare); err != nil {
+ return
+ }
+ p.Log().Trace("Broadcast DKG private share")
+ case psig := <-p.queuedDKGPartialSignature:
+ if err := p.SendDKGPartialSignature(psig); err != nil {
+ return
+ }
+ p.Log().Trace("Broadcast DKG partial signature")
case <-p.term:
return
}
@@ -326,6 +370,78 @@ func (p *peer) AsyncSendNewBlock(block *types.Block, td *big.Int) {
}
}
+func (p *peer) SendLatticeBlock(block *coreTypes.Block) error {
+ return p2p.Send(p.rw, LatticeBlockMsg, toRLPLatticeBlock(block))
+}
+
+func (p *peer) AsyncSendLatticeBlock(block *coreTypes.Block) {
+ select {
+ case p.queuedLatticeBlock <- block:
+ default:
+ p.Log().Debug("Dropping lattice block propagation")
+ }
+}
+
+func (p *peer) SendVote(vote *coreTypes.Vote) error {
+ return p2p.Send(p.rw, VoteMsg, vote)
+}
+
+func (p *peer) AsyncSendVote(vote *coreTypes.Vote) {
+ select {
+ case p.queuedVote <- vote:
+ default:
+ p.Log().Debug("Dropping vote propagation")
+ }
+}
+
+func (p *peer) SendAgreement(agreement *coreTypes.AgreementResult) error {
+ return p2p.Send(p.rw, AgreementMsg, agreement)
+}
+
+func (p *peer) AsyncSendAgreement(agreement *coreTypes.AgreementResult) {
+ select {
+ case p.queuedAgreement <- agreement:
+ default:
+ p.Log().Debug("Dropping agreement result")
+ }
+}
+
+func (p *peer) SendRandomness(randomness *coreTypes.BlockRandomnessResult) error {
+ return p2p.Send(p.rw, RandomnessMsg, randomness)
+}
+
+func (p *peer) AsyncSendRandomness(randomness *coreTypes.BlockRandomnessResult) {
+ select {
+ case p.queuedRandomness <- randomness:
+ default:
+ p.Log().Debug("Dropping randomness result")
+ }
+}
+
+func (p *peer) SendDKGPrivateShare(privateShare *coreTypes.DKGPrivateShare) error {
+ return p2p.Send(p.rw, DKGPrivateShareMsg, toRLPDKGPrivateShare(privateShare))
+}
+
+func (p *peer) AsyncSendDKGPrivateShare(privateShare *coreTypes.DKGPrivateShare) {
+ select {
+ case p.queuedDKGPrivateShare <- privateShare:
+ default:
+ p.Log().Debug("Dropping DKG private share")
+ }
+}
+
+func (p *peer) SendDKGPartialSignature(psig *coreTypes.DKGPartialSignature) error {
+ return p2p.Send(p.rw, DKGPartialSignatureMsg, psig)
+}
+
+func (p *peer) AsyncSendDKGPartialSignature(psig *coreTypes.DKGPartialSignature) {
+ select {
+ case p.queuedDKGPartialSignature <- psig:
+ default:
+ p.Log().Debug("Dropping DKG partial signature")
+ }
+}
+
// SendBlockHeaders sends a batch of block headers to the remote peer.
func (p *peer) SendBlockHeaders(headers []*types.Header) error {
return p2p.Send(p.rw, BlockHeadersMsg, headers)
@@ -474,7 +590,8 @@ type peerSet struct {
srvr p2pServer
gov governance
- peerLabels map[string]map[peerLabel]struct{}
+ peer2Labels map[string]map[peerLabel]struct{}
+ label2Peers map[peerLabel]map[string]struct{}
notaryHistory map[uint64]struct{}
dkgHistory map[uint64]struct{}
}
@@ -486,7 +603,8 @@ func newPeerSet(gov governance, srvr p2pServer, tab *nodeTable) *peerSet {
gov: gov,
srvr: srvr,
tab: tab,
- peerLabels: make(map[string]map[peerLabel]struct{}),
+ peer2Labels: make(map[string]map[peerLabel]struct{}),
+ label2Peers: make(map[peerLabel]map[string]struct{}),
notaryHistory: make(map[uint64]struct{}),
dkgHistory: make(map[uint64]struct{}),
}
@@ -573,6 +691,21 @@ func (ps *peerSet) PeersWithoutTx(hash common.Hash) []*peer {
return list
}
+func (ps *peerSet) PeersWithoutVote(hash common.Hash, label peerLabel) []*peer {
+ ps.lock.RLock()
+ defer ps.lock.RUnlock()
+
+ list := make([]*peer, 0, len(ps.label2Peers[label]))
+ for id := range ps.label2Peers[label] {
+ if p, ok := ps.peers[id]; ok {
+ if !p.knownVotes.Contains(hash) {
+ list = append(list, p)
+ }
+ }
+ }
+ return list
+}
+
// PeersWithoutNodeMeta retrieves a list of peers that do not have a
// given meta in their set of known hashes.
func (ps *peerSet) PeersWithoutNodeMeta(hash common.Hash) []*peer {
@@ -587,6 +720,31 @@ func (ps *peerSet) PeersWithoutNodeMeta(hash common.Hash) []*peer {
return list
}
+// TODO(sonic): finish the following dummy function.
+func (ps *peerSet) PeersWithoutAgreement(hash common.Hash) []*peer {
+ return ps.allPeers()
+}
+
+func (ps *peerSet) PeersWithoutRandomness(hash common.Hash) []*peer {
+ return ps.allPeers()
+}
+
+func (ps *peerSet) PeersWithoutDKGPartialSignature(hash common.Hash) []*peer {
+ return ps.allPeers()
+}
+
+func (ps *peerSet) PeersWithoutLatticeBlock(hash common.Hash) []*peer {
+ return ps.allPeers()
+}
+
+func (ps *peerSet) allPeers() []*peer {
+ list := make([]*peer, 0, len(ps.peers))
+ for _, p := range ps.peers {
+ list = append(list, p)
+ }
+ return list
+}
+
// BestPeer retrieves the known peer with the currently highest total difficulty.
func (ps *peerSet) BestPeer() *peer {
ps.lock.RLock()
@@ -746,11 +904,16 @@ func (ps *peerSet) addDirectPeer(id string, label peerLabel) {
p.addLabel(label)
}
- if _, ok := ps.peerLabels[id]; !ok {
- ps.peerLabels[id] = make(map[peerLabel]struct{})
+ if _, ok := ps.peer2Labels[id]; !ok {
+ ps.peer2Labels[id] = make(map[peerLabel]struct{})
}
- ps.peerLabels[id][label] = struct{}{}
+ if _, ok := ps.label2Peers[label]; !ok {
+ ps.label2Peers[label] = make(map[string]struct{})
+ }
+
+ ps.peer2Labels[id][label] = struct{}{}
+ ps.label2Peers[label][id] = struct{}{}
ps.srvr.AddDirectPeer(ps.newNode(id))
}
@@ -760,11 +923,18 @@ func (ps *peerSet) removeDirectPeer(id string, label peerLabel) {
p.removeLabel(label)
}
- delete(ps.peerLabels[id], label)
+ delete(ps.peer2Labels[id], label)
- if len(ps.peerLabels[id]) == 0 {
+ if len(ps.peer2Labels[id]) == 0 {
ps.srvr.RemoveDirectPeer(ps.newNode(id))
- delete(ps.peerLabels, id)
+ delete(ps.peer2Labels, id)
+ }
+
+ if _, ok := ps.label2Peers[label]; ok {
+ delete(ps.label2Peers[label], id)
+ if len(ps.label2Peers[label]) == 0 {
+ delete(ps.label2Peers, label)
+ }
}
}
diff --git a/dex/peer_test.go b/dex/peer_test.go
index bac6ed5ec..6e539e078 100644
--- a/dex/peer_test.go
+++ b/dex/peer_test.go
@@ -74,7 +74,7 @@ func TestPeerSetBuildAndForgetNotaryConn(t *testing.T) {
if err != nil {
t.Error(err)
}
- err = checkPeerLabels(ps, map[string][]peerLabel{
+ err = checkPeer2Labels(ps, map[string][]peerLabel{
nodeID(1).String(): []peerLabel{
peerLabel{notaryset, 0, 10},
},
@@ -120,7 +120,7 @@ func TestPeerSetBuildAndForgetNotaryConn(t *testing.T) {
if err != nil {
t.Error(err)
}
- err = checkPeerLabels(ps, map[string][]peerLabel{
+ err = checkPeer2Labels(ps, map[string][]peerLabel{
nodeID(1).String(): []peerLabel{
peerLabel{notaryset, 0, 10},
peerLabel{notaryset, 0, 11},
@@ -176,7 +176,7 @@ func TestPeerSetBuildAndForgetNotaryConn(t *testing.T) {
if err != nil {
t.Error(err)
}
- err = checkPeerLabels(ps, map[string][]peerLabel{
+ err = checkPeer2Labels(ps, map[string][]peerLabel{
nodeID(1).String(): []peerLabel{
peerLabel{notaryset, 0, 10},
peerLabel{notaryset, 0, 11},
@@ -242,7 +242,7 @@ func TestPeerSetBuildAndForgetNotaryConn(t *testing.T) {
if err != nil {
t.Error(err)
}
- err = checkPeerLabels(ps, map[string][]peerLabel{
+ err = checkPeer2Labels(ps, map[string][]peerLabel{
nodeID(2).String(): []peerLabel{
peerLabel{notaryset, 2, 12},
},
@@ -291,7 +291,7 @@ func TestPeerSetBuildAndForgetNotaryConn(t *testing.T) {
if err != nil {
t.Error(err)
}
- err = checkPeerLabels(ps, map[string][]peerLabel{})
+ err = checkPeer2Labels(ps, map[string][]peerLabel{})
if err != nil {
t.Error(err)
}
@@ -354,7 +354,7 @@ func TestPeerSetBuildDKGConn(t *testing.T) {
if err != nil {
t.Error(err)
}
- err = checkPeerLabels(ps, map[string][]peerLabel{
+ err = checkPeer2Labels(ps, map[string][]peerLabel{
nodeID(1).String(): []peerLabel{
peerLabel{dkgset, 0, 10},
},
@@ -391,7 +391,7 @@ func TestPeerSetBuildDKGConn(t *testing.T) {
if err != nil {
t.Error(err)
}
- err = checkPeerLabels(ps, map[string][]peerLabel{
+ err = checkPeer2Labels(ps, map[string][]peerLabel{
nodeID(1).String(): []peerLabel{
peerLabel{dkgset, 0, 10},
},
@@ -428,7 +428,7 @@ func TestPeerSetBuildDKGConn(t *testing.T) {
if err != nil {
t.Error(err)
}
- err = checkPeerLabels(ps, map[string][]peerLabel{
+ err = checkPeer2Labels(ps, map[string][]peerLabel{
nodeID(1).String(): []peerLabel{
peerLabel{dkgset, 0, 10},
},
@@ -467,7 +467,7 @@ func TestPeerSetBuildDKGConn(t *testing.T) {
if err != nil {
t.Error(err)
}
- err = checkPeerLabels(ps, map[string][]peerLabel{
+ err = checkPeer2Labels(ps, map[string][]peerLabel{
nodeID(3).String(): []peerLabel{
peerLabel{dkgset, 0, 12},
},
@@ -499,7 +499,7 @@ func TestPeerSetBuildDKGConn(t *testing.T) {
if err != nil {
t.Error(err)
}
- err = checkPeerLabels(ps, map[string][]peerLabel{})
+ err = checkPeer2Labels(ps, map[string][]peerLabel{})
if err != nil {
t.Error(err)
}
@@ -527,13 +527,13 @@ func checkLabels(p *peer, want []peerLabel) error {
return nil
}
-func checkPeerLabels(ps *peerSet, want map[string][]peerLabel) error {
- if len(ps.peerLabels) != len(want) {
+func checkPeer2Labels(ps *peerSet, want map[string][]peerLabel) error {
+ if len(ps.peer2Labels) != len(want) {
return fmt.Errorf("peer num mismatch: got %d, want %d",
- len(ps.peerLabels), len(want))
+ len(ps.peer2Labels), len(want))
}
- for peerID, gotLabels := range ps.peerLabels {
+ for peerID, gotLabels := range ps.peer2Labels {
wantLabels, ok := want[peerID]
if !ok {
return fmt.Errorf("peer id %s not exists", peerID)
diff --git a/dex/protocol.go b/dex/protocol.go
index 7b01217ff..94241104b 100644
--- a/dex/protocol.go
+++ b/dex/protocol.go
@@ -20,13 +20,18 @@ import (
"fmt"
"io"
"math/big"
+ "time"
+ coreCommon "github.com/dexon-foundation/dexon-consensus-core/common"
+ "github.com/dexon-foundation/dexon-consensus-core/core/crypto"
+ coreTypes "github.com/dexon-foundation/dexon-consensus-core/core/types"
"github.com/dexon-foundation/dexon/common"
"github.com/dexon-foundation/dexon/core"
"github.com/dexon-foundation/dexon/core/types"
"github.com/dexon-foundation/dexon/event"
"github.com/dexon-foundation/dexon/p2p/enode"
"github.com/dexon-foundation/dexon/rlp"
+ "golang.org/x/crypto/sha3"
)
// Constants to match up protocol versions and messages
@@ -41,7 +46,7 @@ var ProtocolName = "dex"
var ProtocolVersions = []uint{dex64}
// ProtocolLengths are the number of implemented message corresponding to different protocol versions.
-var ProtocolLengths = []uint64{18}
+var ProtocolLengths = []uint64{38}
const ProtocolMaxMsgSize = 10 * 1024 * 1024 // Maximum cap on the size of a protocol message
@@ -65,6 +70,13 @@ const (
// Protocol messages belonging to dex/64
MetaMsg = 0x11
+
+ LatticeBlockMsg = 0x20
+ VoteMsg = 0x21
+ AgreementMsg = 0x22
+ RandomnessMsg = 0x23
+ DKGPrivateShareMsg = 0x24
+ DKGPartialSignatureMsg = 0x25
)
type errCode int
@@ -206,3 +218,123 @@ type blockBody struct {
// blockBodiesData is the network packet for block content distribution.
type blockBodiesData []*blockBody
+
+func rlpHash(x interface{}) (h common.Hash) {
+ hw := sha3.NewLegacyKeccak256()
+ rlp.Encode(hw, x)
+ hw.Sum(h[:0])
+ return h
+}
+
+type rlpDKGPrivateShare struct {
+ ProposerID coreTypes.NodeID
+ ReceiverID coreTypes.NodeID
+ Round uint64
+ PrivateShare []byte
+ Signature crypto.Signature
+}
+
+func toRLPDKGPrivateShare(ps *coreTypes.DKGPrivateShare) *rlpDKGPrivateShare {
+ return &rlpDKGPrivateShare{
+ ProposerID: ps.ProposerID,
+ ReceiverID: ps.ReceiverID,
+ Round: ps.Round,
+ PrivateShare: ps.PrivateShare.Bytes(),
+ Signature: ps.Signature,
+ }
+}
+
+func fromRLPDKGPrivateShare(rps *rlpDKGPrivateShare) *coreTypes.DKGPrivateShare {
+ ps := &coreTypes.DKGPrivateShare{
+ ProposerID: rps.ProposerID,
+ ReceiverID: rps.ReceiverID,
+ Round: rps.Round,
+ Signature: rps.Signature,
+ }
+ ps.PrivateShare.SetBytes(rps.PrivateShare)
+ return ps
+}
+
+type rlpWitness struct {
+ Timestamp uint64
+ Height uint64
+ Data []byte
+}
+
+type rlpFinalizeResult struct {
+ Randomness []byte
+ Timestamp uint64
+ Height uint64
+}
+
+type rlpLatticeBlock struct {
+ ProposerID coreTypes.NodeID `json:"proposer_id"`
+ ParentHash coreCommon.Hash `json:"parent_hash"`
+ Hash coreCommon.Hash `json:"hash"`
+ Position coreTypes.Position `json:"position"`
+ Timestamp uint64 `json:"timestamps"`
+ Acks coreCommon.SortedHashes `json:"acks"`
+ Payload []byte `json:"payload"`
+ Witness rlpWitness
+ Finalization rlpFinalizeResult
+ Signature crypto.Signature `json:"signature"`
+ CRSSignature crypto.Signature `json:"crs_signature"`
+}
+
+func toRLPLatticeBlock(b *coreTypes.Block) *rlpLatticeBlock {
+ return &rlpLatticeBlock{
+ ProposerID: b.ProposerID,
+ ParentHash: b.ParentHash,
+ Hash: b.Hash,
+ Position: b.Position,
+ Timestamp: toMillisecond(b.Timestamp),
+ Acks: b.Acks,
+ Payload: b.Payload,
+ Witness: rlpWitness{
+ Timestamp: toMillisecond(b.Witness.Timestamp),
+ Height: b.Witness.Height,
+ Data: b.Witness.Data,
+ },
+ Finalization: rlpFinalizeResult{
+ Randomness: b.Finalization.Randomness,
+ Timestamp: toMillisecond(b.Finalization.Timestamp),
+ Height: b.Finalization.Height,
+ },
+ Signature: b.Signature,
+ CRSSignature: b.CRSSignature,
+ }
+}
+
+func fromRLPLatticeBlock(rb *rlpLatticeBlock) *coreTypes.Block {
+ return &coreTypes.Block{
+ ProposerID: rb.ProposerID,
+ ParentHash: rb.ParentHash,
+ Hash: rb.Hash,
+ Position: rb.Position,
+ Timestamp: fromMillisecond(rb.Timestamp),
+ Acks: rb.Acks,
+ Payload: rb.Payload,
+ Witness: coreTypes.Witness{
+ Timestamp: fromMillisecond(rb.Witness.Timestamp),
+ Height: rb.Witness.Height,
+ Data: rb.Witness.Data,
+ },
+ Finalization: coreTypes.FinalizationResult{
+ Randomness: rb.Finalization.Randomness,
+ Timestamp: fromMillisecond(rb.Finalization.Timestamp),
+ Height: rb.Finalization.Height,
+ },
+ Signature: rb.Signature,
+ CRSSignature: rb.CRSSignature,
+ }
+}
+
+func fromMillisecond(s uint64) time.Time {
+ sec := int64(s / 1000)
+ nsec := int64((s % 1000) * 1000000)
+ return time.Unix(sec, nsec)
+}
+
+func toMillisecond(t time.Time) uint64 {
+ return uint64(t.UnixNano() / 1000000)
+}
diff --git a/dex/protocol_test.go b/dex/protocol_test.go
index 8c7638b2b..a26a40feb 100644
--- a/dex/protocol_test.go
+++ b/dex/protocol_test.go
@@ -18,10 +18,16 @@ package dex
import (
"fmt"
+ "reflect"
"sync"
"testing"
"time"
+ coreCommon "github.com/dexon-foundation/dexon-consensus-core/common"
+ coreCrypto "github.com/dexon-foundation/dexon-consensus-core/core/crypto"
+ "github.com/dexon-foundation/dexon-consensus-core/core/crypto/dkg"
+ coreTypes "github.com/dexon-foundation/dexon-consensus-core/core/types"
+
"github.com/dexon-foundation/dexon/common"
"github.com/dexon-foundation/dexon/core/types"
"github.com/dexon-foundation/dexon/crypto"
@@ -303,3 +309,496 @@ func TestSendNodeMetas(t *testing.T) {
pm.nodeTable.Add(allmetas)
wg.Wait()
}
+
+func TestRecvLatticeBlock(t *testing.T) {
+ pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 0, nil, nil)
+ p, _ := newTestPeer("peer", dex64, pm, true)
+ defer pm.Stop()
+ defer p.close()
+
+ block := coreTypes.Block{
+ ProposerID: coreTypes.NodeID{coreCommon.Hash{1, 2, 3}},
+ ParentHash: coreCommon.Hash{1, 1, 1, 1, 1},
+ Hash: coreCommon.Hash{2, 2, 2, 2, 2},
+ Position: coreTypes.Position{
+ ChainID: 11,
+ Round: 12,
+ Height: 13,
+ },
+ Timestamp: fromMillisecond(toMillisecond(time.Now())),
+ Acks: coreCommon.NewSortedHashes(coreCommon.Hashes([]coreCommon.Hash{
+ coreCommon.Hash{101}, coreCommon.Hash{100}, coreCommon.Hash{102},
+ })),
+ Payload: []byte{3, 3, 3, 3, 3},
+ Witness: coreTypes.Witness{
+ Timestamp: fromMillisecond(toMillisecond(time.Now())),
+ Height: 13,
+ Data: []byte{4, 4, 4, 4, 4},
+ },
+ Finalization: coreTypes.FinalizationResult{
+ Randomness: []byte{5, 5, 5, 5, 5},
+ Timestamp: fromMillisecond(toMillisecond(time.Now())),
+ Height: 13,
+ },
+ Signature: coreCrypto.Signature{
+ Type: "signature",
+ Signature: []byte("signature"),
+ },
+ CRSSignature: coreCrypto.Signature{
+ Type: "crs-signature",
+ Signature: []byte("crs-signature"),
+ },
+ }
+
+ if err := p2p.Send(p.app, LatticeBlockMsg, toRLPLatticeBlock(&block)); err != nil {
+ t.Fatalf("send error: %v", err)
+ }
+
+ ch := pm.ReceiveChan()
+ select {
+ case msg := <-ch:
+ rb := msg.(*coreTypes.Block)
+ if !reflect.DeepEqual(rb, &block) {
+ t.Errorf("block mismatch")
+ }
+ case <-time.After(3 * time.Second):
+ t.Errorf("no newMetasEvent received within 3 seconds")
+ }
+}
+
+func TestSendLatticeBlock(t *testing.T) {
+ pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 0, nil, nil)
+ p, _ := newTestPeer("peer", dex64, pm, true)
+ defer pm.Stop()
+ defer p.close()
+
+ block := coreTypes.Block{
+ ProposerID: coreTypes.NodeID{coreCommon.Hash{1, 2, 3}},
+ ParentHash: coreCommon.Hash{1, 1, 1, 1, 1},
+ Hash: coreCommon.Hash{2, 2, 2, 2, 2},
+ Position: coreTypes.Position{
+ ChainID: 11,
+ Round: 12,
+ Height: 13,
+ },
+ Timestamp: fromMillisecond(toMillisecond(time.Now())),
+ Acks: coreCommon.NewSortedHashes(coreCommon.Hashes([]coreCommon.Hash{
+ coreCommon.Hash{101}, coreCommon.Hash{100}, coreCommon.Hash{102},
+ })),
+ Payload: []byte{3, 3, 3, 3, 3},
+ Witness: coreTypes.Witness{
+ Timestamp: fromMillisecond(toMillisecond(time.Now())),
+ Height: 13,
+ Data: []byte{4, 4, 4, 4, 4},
+ },
+ Finalization: coreTypes.FinalizationResult{
+ Randomness: []byte{5, 5, 5, 5, 5},
+ Timestamp: fromMillisecond(toMillisecond(time.Now())),
+ Height: 13,
+ },
+ Signature: coreCrypto.Signature{
+ Type: "signature",
+ Signature: []byte("signature"),
+ },
+ CRSSignature: coreCrypto.Signature{
+ Type: "crs-signature",
+ Signature: []byte("crs-signature"),
+ },
+ }
+
+ pm.BroadcastLatticeBlock(&block)
+ msg, err := p.app.ReadMsg()
+ if err != nil {
+ t.Errorf("%v: read error: %v", p.Peer, err)
+ } else if msg.Code != LatticeBlockMsg {
+ t.Errorf("%v: got code %d, want %d", p.Peer, msg.Code, LatticeBlockMsg)
+ }
+
+ var rb rlpLatticeBlock
+ if err := msg.Decode(&rb); err != nil {
+ t.Errorf("%v: %v", p.Peer, err)
+ }
+
+ if !reflect.DeepEqual(fromRLPLatticeBlock(&rb), &block) {
+ t.Errorf("block mismatch")
+ }
+}
+
+func TestRecvVote(t *testing.T) {
+ pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 0, nil, nil)
+ p, _ := newTestPeer("peer", dex64, pm, true)
+ defer pm.Stop()
+ defer p.close()
+
+ vote := coreTypes.Vote{
+ ProposerID: coreTypes.NodeID{coreCommon.Hash{1, 2, 3}},
+ Period: 10,
+ Position: coreTypes.Position{
+ ChainID: 11,
+ Round: 12,
+ Height: 13,
+ },
+ Signature: coreCrypto.Signature{
+ Type: "123",
+ Signature: []byte("sig"),
+ },
+ }
+
+ if err := p2p.Send(p.app, VoteMsg, vote); err != nil {
+ t.Fatalf("send error: %v", err)
+ }
+
+ ch := pm.ReceiveChan()
+
+ select {
+ case msg := <-ch:
+ rvote := msg.(*coreTypes.Vote)
+ if rlpHash(rvote) != rlpHash(vote) {
+ t.Errorf("vote mismatch")
+ }
+ case <-time.After(1 * time.Second):
+ t.Errorf("no vote received within 1 seconds")
+ }
+}
+
+func TestSendVote(t *testing.T) {
+ pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 0, nil, nil)
+ defer pm.Stop()
+
+ vote := coreTypes.Vote{
+ ProposerID: coreTypes.NodeID{coreCommon.Hash{1, 2, 3}},
+ Period: 10,
+ Position: coreTypes.Position{
+ ChainID: 1,
+ Round: 10,
+ Height: 13,
+ },
+ Signature: coreCrypto.Signature{
+ Type: "123",
+ Signature: []byte("sig"),
+ },
+ }
+
+ // Connect several peers. They should all receive the pending transactions.
+ var wg sync.WaitGroup
+ checkvote := func(p *testPeer, isReceiver bool) {
+ defer wg.Done()
+ defer p.close()
+ if !isReceiver {
+ go func() {
+ time.Sleep(100 * time.Millisecond)
+ p.close()
+ }()
+ }
+
+ msg, err := p.app.ReadMsg()
+ if !isReceiver {
+ if err != p2p.ErrPipeClosed {
+ t.Errorf("err mismatch: got %v, want %v (not receiver peer)",
+ err, p2p.ErrPipeClosed)
+ }
+ return
+ }
+
+ var v coreTypes.Vote
+ if err != nil {
+ t.Errorf("%v: read error: %v", p.Peer, err)
+ } else if msg.Code != VoteMsg {
+ t.Errorf("%v: got code %d, want %d", p.Peer, msg.Code, VoteMsg)
+ }
+ if err := msg.Decode(&v); err != nil {
+ t.Errorf("%v: %v", p.Peer, err)
+ }
+ if !reflect.DeepEqual(v, vote) {
+ t.Errorf("vote mismatch")
+ }
+ }
+
+ testPeers := []struct {
+ label *peerLabel
+ isReceiver bool
+ }{
+ {
+ label: &peerLabel{set: notaryset, chainID: 1, round: 10},
+ isReceiver: true,
+ },
+ {
+ label: &peerLabel{set: notaryset, chainID: 1, round: 10},
+ isReceiver: true,
+ },
+ {
+ label: nil,
+ isReceiver: false,
+ },
+ {
+ label: &peerLabel{set: notaryset, chainID: 1, round: 11},
+ isReceiver: false,
+ },
+ {
+ label: &peerLabel{set: notaryset, chainID: 2, round: 10},
+ isReceiver: false,
+ },
+ {
+ label: &peerLabel{set: dkgset, chainID: 1, round: 10},
+ isReceiver: false,
+ },
+ }
+
+ for i, tt := range testPeers {
+ p, _ := newTestPeer(fmt.Sprintf("peer #%d", i), dex64, pm, true)
+ if tt.label != nil {
+ pm.peers.addDirectPeer(p.id, *tt.label)
+ }
+ wg.Add(1)
+ go checkvote(p, tt.isReceiver)
+ }
+ pm.BroadcastVote(&vote)
+ wg.Wait()
+}
+
+type mockPublicKey []byte
+
+func (p mockPublicKey) VerifySignature(hash coreCommon.Hash, signature coreCrypto.Signature) bool {
+ return true
+}
+
+func (p mockPublicKey) Bytes() []byte {
+ return append([]byte{1}, p...)
+}
+
+func TestRecvDKGPrivateShare(t *testing.T) {
+ pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 0, nil, nil)
+ p, _ := newTestPeer("peer1", dex64, pm, true)
+ defer pm.Stop()
+ defer p.close()
+
+ // TODO(sonic): polish this
+ privkey := dkg.NewPrivateKey()
+ privateShare := coreTypes.DKGPrivateShare{
+ ProposerID: coreTypes.NodeID{coreCommon.Hash{1, 2, 3}},
+ ReceiverID: coreTypes.NodeID{coreCommon.Hash{3, 4, 5}},
+ Round: 10,
+ PrivateShare: *privkey,
+ Signature: coreCrypto.Signature{
+ Type: "DKGPrivateShare",
+ Signature: []byte("DKGPrivateShare"),
+ },
+ }
+
+ if err := p2p.Send(
+ p.app, DKGPrivateShareMsg, toRLPDKGPrivateShare(&privateShare)); err != nil {
+ t.Fatalf("send error: %v", err)
+ }
+
+ ch := pm.ReceiveChan()
+ select {
+ case msg := <-ch:
+ rps := msg.(*coreTypes.DKGPrivateShare)
+ if !reflect.DeepEqual(
+ toRLPDKGPrivateShare(rps), toRLPDKGPrivateShare(&privateShare)) {
+ t.Errorf("vote mismatch")
+ }
+ case <-time.After(1 * time.Second):
+ t.Errorf("no dkg received within 1 seconds")
+ }
+}
+
+func TestSendDKGPrivateShare(t *testing.T) {
+ pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 0, nil, nil)
+ p1, _ := newTestPeer("peer1", dex64, pm, true)
+ p2, _ := newTestPeer("peer2", dex64, pm, true)
+ defer pm.Stop()
+ defer p1.close()
+
+ // TODO(sonic): polish this
+ privkey := dkg.NewPrivateKey()
+ privateShare := coreTypes.DKGPrivateShare{
+ ProposerID: coreTypes.NodeID{coreCommon.Hash{1, 2, 3}},
+ ReceiverID: coreTypes.NodeID{coreCommon.Hash{3, 4, 5}},
+ Round: 10,
+ PrivateShare: *privkey,
+ Signature: coreCrypto.Signature{
+ Type: "DKGPrivateShare",
+ Signature: []byte("DKGPrivateShare"),
+ },
+ }
+
+ go pm.SendDKGPrivateShare(mockPublicKey(p1.ID().Bytes()), &privateShare)
+ msg, err := p1.app.ReadMsg()
+ if err != nil {
+ t.Errorf("%v: read error: %v", p1.Peer, err)
+ } else if msg.Code != DKGPrivateShareMsg {
+ t.Errorf("%v: got code %d, want %d", p1.Peer, msg.Code, DKGPrivateShareMsg)
+ }
+
+ var rps rlpDKGPrivateShare
+ if err := msg.Decode(&rps); err != nil {
+ t.Errorf("%v: %v", p1.Peer, err)
+ }
+
+ expected := toRLPDKGPrivateShare(&privateShare)
+ if !reflect.DeepEqual(rps, *expected) {
+ t.Errorf("DKG private share mismatch")
+ }
+
+ go func() {
+ time.Sleep(500 * time.Millisecond)
+ p2.close()
+ }()
+
+ msg, err = p2.app.ReadMsg()
+ if err != p2p.ErrPipeClosed {
+ t.Errorf("err mismatch: got %v, want %v (not receiver peer)",
+ err, p2p.ErrPipeClosed)
+ }
+}
+
+func TestRecvAgreement(t *testing.T) {
+ pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 0, nil, nil)
+ p, _ := newTestPeer("peer", dex64, pm, true)
+ defer pm.Stop()
+ defer p.close()
+
+ // TODO(sonic): polish this
+ vote := coreTypes.Vote{
+ ProposerID: coreTypes.NodeID{coreCommon.Hash{1, 2, 3}},
+ Period: 10,
+ Position: coreTypes.Position{
+ ChainID: 1,
+ Round: 10,
+ Height: 13,
+ },
+ Signature: coreCrypto.Signature{
+ Type: "123",
+ Signature: []byte("sig"),
+ },
+ }
+
+ agreement := coreTypes.AgreementResult{
+ BlockHash: coreCommon.Hash{9, 9, 9},
+ Round: 13,
+ Position: vote.Position,
+ Votes: []coreTypes.Vote{vote},
+ }
+
+ if err := p2p.Send(p.app, AgreementMsg, &agreement); err != nil {
+ t.Fatalf("send error: %v", err)
+ }
+
+ ch := pm.ReceiveChan()
+ select {
+ case msg := <-ch:
+ a := msg.(*coreTypes.AgreementResult)
+ if !reflect.DeepEqual(a, &agreement) {
+ t.Errorf("agreement mismatch")
+ }
+ case <-time.After(1 * time.Second):
+ t.Errorf("no agreement received within 1 seconds")
+ }
+}
+
+func TestSendAgreement(t *testing.T) {
+ pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 0, nil, nil)
+ p, _ := newTestPeer("peer", dex64, pm, true)
+ defer pm.Stop()
+ defer p.close()
+
+ // TODO(sonic): polish this
+ vote := coreTypes.Vote{
+ ProposerID: coreTypes.NodeID{coreCommon.Hash{1, 2, 3}},
+ Period: 10,
+ Position: coreTypes.Position{
+ ChainID: 1,
+ Round: 10,
+ Height: 13,
+ },
+ Signature: coreCrypto.Signature{
+ Type: "123",
+ Signature: []byte("sig"),
+ },
+ }
+
+ agreement := coreTypes.AgreementResult{
+ BlockHash: coreCommon.Hash{9, 9, 9},
+ Round: 13,
+ Position: vote.Position,
+ Votes: []coreTypes.Vote{vote},
+ }
+
+ pm.BroadcastAgreementResult(&agreement)
+ msg, err := p.app.ReadMsg()
+ if err != nil {
+ t.Errorf("%v: read error: %v", p.Peer, err)
+ } else if msg.Code != AgreementMsg {
+ t.Errorf("%v: got code %d, want %d", p.Peer, msg.Code, AgreementMsg)
+ }
+
+ var a coreTypes.AgreementResult
+ if err := msg.Decode(&a); err != nil {
+ t.Errorf("%v: %v", p.Peer, err)
+ }
+
+ if !reflect.DeepEqual(a, agreement) {
+ t.Errorf("agreement mismatch")
+ }
+}
+
+func TestRecvRandomness(t *testing.T) {
+ pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 0, nil, nil)
+ p, _ := newTestPeer("peer", dex64, pm, true)
+ defer pm.Stop()
+ defer p.close()
+
+ // TODO(sonic): polish this
+ randomness := coreTypes.BlockRandomnessResult{
+ BlockHash: coreCommon.Hash{8, 8, 8},
+ Round: 17,
+ Randomness: []byte{7, 7, 7, 7},
+ }
+
+ if err := p2p.Send(p.app, RandomnessMsg, &randomness); err != nil {
+ t.Fatalf("send error: %v", err)
+ }
+
+ ch := pm.ReceiveChan()
+ select {
+ case msg := <-ch:
+ r := msg.(*coreTypes.BlockRandomnessResult)
+ if !reflect.DeepEqual(r, &randomness) {
+ t.Errorf("randomness mismatch")
+ }
+ case <-time.After(1 * time.Second):
+ t.Errorf("no randomness received within 1 seconds")
+ }
+}
+
+func TestSendRandomness(t *testing.T) {
+ pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 0, nil, nil)
+ p, _ := newTestPeer("peer", dex64, pm, true)
+ defer pm.Stop()
+ defer p.close()
+
+ // TODO(sonic): polish this
+ randomness := coreTypes.BlockRandomnessResult{
+ BlockHash: coreCommon.Hash{8, 8, 8},
+ Round: 17,
+ Randomness: []byte{7, 7, 7, 7},
+ }
+
+ pm.BroadcastRandomnessResult(&randomness)
+ msg, err := p.app.ReadMsg()
+ if err != nil {
+ t.Errorf("%v: read error: %v", p.Peer, err)
+ } else if msg.Code != RandomnessMsg {
+ t.Errorf("%v: got code %d, want %d", p.Peer, msg.Code, RandomnessMsg)
+ }
+
+ var r coreTypes.BlockRandomnessResult
+ if err := msg.Decode(&r); err != nil {
+ t.Errorf("%v: %v", p.Peer, err)
+ }
+
+ if !reflect.DeepEqual(r, randomness) {
+ t.Errorf("agreement mismatch")
+ }
+}