diff options
author | Sonic <sonic@cobinhood.com> | 2018-10-12 15:02:33 +0800 |
---|---|---|
committer | Wei-Ning Huang <w@dexon.org> | 2019-03-12 12:19:09 +0800 |
commit | 961231f91fc4c7edc122ccdf337d804a885b1f6b (patch) | |
tree | 4de8ac71927a013dedb65a7be91770c1a3fc97bb /dex | |
parent | f4936aa1eb7fa01f56c28f081af266c2a4924a61 (diff) | |
download | dexon-961231f91fc4c7edc122ccdf337d804a885b1f6b.tar dexon-961231f91fc4c7edc122ccdf337d804a885b1f6b.tar.gz dexon-961231f91fc4c7edc122ccdf337d804a885b1f6b.tar.bz2 dexon-961231f91fc4c7edc122ccdf337d804a885b1f6b.tar.lz dexon-961231f91fc4c7edc122ccdf337d804a885b1f6b.tar.xz dexon-961231f91fc4c7edc122ccdf337d804a885b1f6b.tar.zst dexon-961231f91fc4c7edc122ccdf337d804a885b1f6b.zip |
dex: network: implement the network interface
Diffstat (limited to 'dex')
-rw-r--r-- | dex/backend.go | 35 | ||||
-rw-r--r-- | dex/governance.go | 20 | ||||
-rw-r--r-- | dex/handler.go | 116 | ||||
-rw-r--r-- | dex/helper_test.go | 2 | ||||
-rw-r--r-- | dex/network.go | 18 | ||||
-rw-r--r-- | dex/peer.go | 230 | ||||
-rw-r--r-- | dex/peer_test.go | 28 | ||||
-rw-r--r-- | dex/protocol.go | 134 | ||||
-rw-r--r-- | dex/protocol_test.go | 499 |
9 files changed, 1024 insertions, 58 deletions
diff --git a/dex/backend.go b/dex/backend.go index d01ab8040..caac0fe21 100644 --- a/dex/backend.go +++ b/dex/backend.go @@ -85,7 +85,6 @@ func New(ctx *node.ServiceContext, config *Config) (*Dexon, error) { if err != nil { panic(err) } - network := NewDexconNetwork() chainDb, err := CreateDB(ctx, config, "chaindata") if err != nil { @@ -116,7 +115,6 @@ func New(ctx *node.ServiceContext, config *Config) (*Dexon, error) { networkID: config.NetworkId, bloomRequests: make(chan chan *bloombits.Retrieval), bloomIndexer: NewBloomIndexer(chainDb, params.BloomBitsBlocks, params.BloomConfirms), - network: network, blockdb: db, engine: dexcon.New(¶ms.DexconConfig{}), } @@ -154,9 +152,18 @@ func New(ctx *node.ServiceContext, config *Config) (*Dexon, error) { dex.governance = NewDexconGovernance(dex.APIBackend, dex.chainConfig, config.PrivateKey) dex.app = NewDexconApp(dex.txPool, dex.blockchain, dex.governance, chainDb, config, vmConfig) - privKey := coreEcdsa.NewPrivateKeyFromECDSA(config.PrivateKey) - dex.consensus = dexCore.NewConsensus(dex.app, dex.governance, db, network, privKey) + pm, err := NewProtocolManager(dex.chainConfig, config.SyncMode, + config.NetworkId, dex.eventMux, dex.txPool, dex.engine, dex.blockchain, + chainDb, dex.governance) + if err != nil { + return nil, err + } + + dex.protocolManager = pm + dex.network = NewDexconNetwork(pm) + privKey := coreEcdsa.NewPrivateKeyFromECDSA(config.PrivateKey) + dex.consensus = dexCore.NewConsensus(dex.app, dex.governance, db, dex.network, privKey) return dex, nil } @@ -168,7 +175,24 @@ func (s *Dexon) APIs() []rpc.API { return nil } -func (s *Dexon) Start(server *p2p.Server) error { +func (s *Dexon) Start(srvr *p2p.Server) error { + // Start the bloom bits servicing goroutines + s.startBloomHandlers(params.BloomBitsBlocks) + + // Start the RPC service + s.netRPCService = ethapi.NewPublicNetAPI(srvr, s.NetVersion()) + + // Figure out a max peers count based on the server limits + maxPeers := srvr.MaxPeers + if s.config.LightServ > 0 { + if s.config.LightPeers >= srvr.MaxPeers { + return fmt.Errorf("invalid peer config: light peer count (%d) >= total peer count (%d)", s.config.LightPeers, srvr.MaxPeers) + } + maxPeers -= s.config.LightPeers + } + // Start the networking layer and the light server if requested + s.protocolManager.Start(srvr, maxPeers) + return nil } @@ -196,3 +220,4 @@ func (d *Dexon) EventMux() *event.TypeMux { return d.eventMux } func (d *Dexon) Engine() consensus.Engine { return d.engine } func (d *Dexon) ChainDb() ethdb.Database { return d.chainDb } func (d *Dexon) Downloader() *downloader.Downloader { return d.protocolManager.downloader } +func (d *Dexon) NetVersion() uint64 { return d.networkID } diff --git a/dex/governance.go b/dex/governance.go index 369cc2f0c..37985cec4 100644 --- a/dex/governance.go +++ b/dex/governance.go @@ -10,10 +10,13 @@ import ( coreCrypto "github.com/dexon-foundation/dexon-consensus-core/core/crypto" coreEcdsa "github.com/dexon-foundation/dexon-consensus-core/core/crypto/ecdsa" coreTypes "github.com/dexon-foundation/dexon-consensus-core/core/types" + "github.com/dexon-foundation/dexon/common" + "github.com/dexon-foundation/dexon/core" "github.com/dexon-foundation/dexon/core/types" "github.com/dexon-foundation/dexon/core/vm" "github.com/dexon-foundation/dexon/crypto" + "github.com/dexon-foundation/dexon/event" "github.com/dexon-foundation/dexon/log" "github.com/dexon-foundation/dexon/params" "github.com/dexon-foundation/dexon/rlp" @@ -276,3 +279,20 @@ func (d *DexconGovernance) IsDKGFinal(round uint64) bool { count := s.DKGFinalizedsCount(big.NewInt(int64(round))).Uint64() return count >= threshold } + +// TODO(sonic): finish these +func (d *DexconGovernance) GetChainNum(uint64) uint32 { + return 3 +} + +func (d *DexconGovernance) GetNotarySet(uint32, uint64) map[string]struct{} { + return nil +} + +func (d *DexconGovernance) GetDKGSet(uint64) map[string]struct{} { + return nil +} + +func (d *DexconGovernance) SubscribeNewCRSEvent(ch chan core.NewCRSEvent) event.Subscription { + return nil +} diff --git a/dex/handler.go b/dex/handler.go index 67cbe8a63..e013b9722 100644 --- a/dex/handler.go +++ b/dex/handler.go @@ -26,6 +26,9 @@ import ( "sync/atomic" "time" + "github.com/dexon-foundation/dexon-consensus-core/core/crypto" + coreTypes "github.com/dexon-foundation/dexon-consensus-core/core/types" + "github.com/dexon-foundation/dexon/common" "github.com/dexon-foundation/dexon/consensus" "github.com/dexon-foundation/dexon/core" @@ -97,6 +100,9 @@ type ProtocolManager struct { crsCh chan core.NewCRSEvent crsSub event.Subscription + // channels for dexon consensus core + receiveCh chan interface{} + srvr p2pServer // wait group is used for graceful shutdowns during downloading @@ -125,6 +131,7 @@ func NewProtocolManager( noMorePeers: make(chan struct{}), txsyncCh: make(chan *txsync), quitSync: make(chan struct{}), + receiveCh: make(chan interface{}, 1024), } // Figure out whether to allow fast sync or not @@ -267,6 +274,10 @@ func (pm *ProtocolManager) Stop() { log.Info("Ethereum protocol stopped") } +func (pm *ProtocolManager) ReceiveChan() <-chan interface{} { + return pm.receiveCh +} + func (pm *ProtocolManager) newPeer(pv int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { return newPeer(pv, p, newMeteredMsgWriter(rw)) } @@ -666,6 +677,47 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { p.MarkNodeMeta(meta.Hash()) } pm.nodeTable.Add(metas) + case msg.Code == LatticeBlockMsg: + var rb rlpLatticeBlock + if err := msg.Decode(&rb); err != nil { + fmt.Println("decode lattice block error", err) + return errResp(ErrDecode, "msg %v: %v", msg, err) + } + pm.receiveCh <- fromRLPLatticeBlock(&rb) + case msg.Code == VoteMsg: + var vote coreTypes.Vote + if err := msg.Decode(&vote); err != nil { + return errResp(ErrDecode, "msg %v: %v", msg, err) + } + pm.receiveCh <- &vote + case msg.Code == AgreementMsg: + // DKG set is receiver + var agreement coreTypes.AgreementResult + if err := msg.Decode(&agreement); err != nil { + return errResp(ErrDecode, "msg %v: %v", msg, err) + } + pm.receiveCh <- &agreement + case msg.Code == RandomnessMsg: + // Broadcast this to all peer + var randomness coreTypes.BlockRandomnessResult + if err := msg.Decode(&randomness); err != nil { + return errResp(ErrDecode, "msg %v: %v", msg, err) + } + pm.receiveCh <- &randomness + case msg.Code == DKGPrivateShareMsg: + // Do not relay this msg + var rps rlpDKGPrivateShare + if err := msg.Decode(&rps); err != nil { + return errResp(ErrDecode, "msg %v: %v", msg, err) + } + pm.receiveCh <- fromRLPDKGPrivateShare(&rps) + case msg.Code == DKGPartialSignatureMsg: + // broadcast in DKG set + var psig coreTypes.DKGPartialSignature + if err := msg.Decode(&psig); err != nil { + return errResp(ErrDecode, "msg %v: %v", msg, err) + } + pm.receiveCh <- &psig default: return errResp(ErrInvalidMsgCode, "%v", msg.Code) } @@ -741,6 +793,68 @@ func (pm *ProtocolManager) BroadcastMetas(metas []*NodeMeta) { } } +func (pm *ProtocolManager) BroadcastVote(vote *coreTypes.Vote) { + label := peerLabel{ + set: notaryset, + chainID: vote.Position.ChainID, + round: vote.Position.Round, + } + + for _, peer := range pm.peers.PeersWithoutVote(rlpHash(vote), label) { + peer.AsyncSendVote(vote) + } +} + +// TODO(sonic): try to reduce traffic +func (pm *ProtocolManager) BroadcastLatticeBlock(block *coreTypes.Block) { + hash := rlpHash(toRLPLatticeBlock(block)) + for _, peer := range pm.peers.PeersWithoutLatticeBlock(hash) { + peer.AsyncSendLatticeBlock(block) + } +} + +// TODO(sonic): try to reduce traffic +func (pm *ProtocolManager) SendDKGPrivateShare( + pub crypto.PublicKey, privateShare *coreTypes.DKGPrivateShare) { + id := discover.MustBytesID(pub.Bytes()[1:]) + if p := pm.peers.Peer(id.String()); p != nil { + p.AsyncSendDKGPrivateShare(privateShare) + } +} + +// TODO(sonic): try to reduce traffic +func (pm *ProtocolManager) BroadcastDKGPrivateShare( + privateShare *coreTypes.DKGPrivateShare) { + for _, peer := range pm.peers.allPeers() { + peer.AsyncSendDKGPrivateShare(privateShare) + } +} + +// TODO(sonic): try to reduce traffic +func (pm *ProtocolManager) BroadcastAgreementResult( + agreement *coreTypes.AgreementResult) { + for _, peer := range pm.peers.PeersWithoutAgreement(rlpHash(agreement)) { + peer.AsyncSendAgreement(agreement) + } +} + +// TODO(sonic): try to reduce traffic +func (pm *ProtocolManager) BroadcastRandomnessResult( + randomness *coreTypes.BlockRandomnessResult) { + // random pick n peers + for _, peer := range pm.peers.PeersWithoutRandomness(rlpHash(randomness)) { + peer.AsyncSendRandomness(randomness) + } +} + +// TODO(sonic): try to reduce traffic +func (pm *ProtocolManager) BroadcastDKGPartialSignature( + psig *coreTypes.DKGPartialSignature) { + for _, peer := range pm.peers.PeersWithoutDKGPartialSignature(rlpHash(psig)) { + peer.AsyncSendDKGPartialSignature(psig) + } +} + // Mined broadcast loop func (pm *ProtocolManager) minedBroadcastLoop() { // automatically stops if unsubscribe @@ -781,6 +895,8 @@ func (pm *ProtocolManager) metaBroadcastLoop() { // a loop keep building and maintaining peers in notary set. // TODO: finish this func (pm *ProtocolManager) peerSetLoop() { + + log.Debug("start peer set loop") for { select { case event := <-pm.crsCh: diff --git a/dex/helper_test.go b/dex/helper_test.go index dcda6f4d2..fc8053774 100644 --- a/dex/helper_test.go +++ b/dex/helper_test.go @@ -101,7 +101,7 @@ func newTestProtocolManager(mode downloader.SyncMode, blocks int, generator func db = ethdb.NewMemDatabase() gspec = &core.Genesis{ Config: params.TestChainConfig, - Alloc: core.GenesisAlloc{testBank: {Balance: big.NewInt(1000000)}}, + Alloc: core.GenesisAlloc{testBank: {Balance: big.NewInt(1000000), Staked: big.NewInt(0)}}, } genesis = gspec.MustCommit(db) blockchain, _ = core.NewBlockChain(db, nil, gspec.Config, engine, vm.Config{}) diff --git a/dex/network.go b/dex/network.go index 24ef2cc63..e99b4f5b1 100644 --- a/dex/network.go +++ b/dex/network.go @@ -6,49 +6,53 @@ import ( ) type DexconNetwork struct { - receiveChan chan interface{} + pm *ProtocolManager } -func NewDexconNetwork() *DexconNetwork { - return &DexconNetwork{ - receiveChan: make(chan interface{}), - } +func NewDexconNetwork(pm *ProtocolManager) *DexconNetwork { + return &DexconNetwork{pm: pm} } // BroadcastVote broadcasts vote to all nodes in DEXON network. func (n *DexconNetwork) BroadcastVote(vote *types.Vote) { + n.pm.BroadcastVote(vote) } // BroadcastBlock broadcasts block to all nodes in DEXON network. func (n *DexconNetwork) BroadcastBlock(block *types.Block) { + n.pm.BroadcastLatticeBlock(block) } // SendDKGPrivateShare sends PrivateShare to a DKG participant. func (n *DexconNetwork) SendDKGPrivateShare( pub crypto.PublicKey, prvShare *types.DKGPrivateShare) { + n.pm.SendDKGPrivateShare(pub, prvShare) } // BroadcastDKGPrivateShare broadcasts PrivateShare to all DKG participants. func (n *DexconNetwork) BroadcastDKGPrivateShare( prvShare *types.DKGPrivateShare) { + n.pm.BroadcastDKGPrivateShare(prvShare) } // BroadcastDKGPartialSignature broadcasts partialSignature to all // DKG participants. func (n *DexconNetwork) BroadcastDKGPartialSignature( psig *types.DKGPartialSignature) { + n.pm.BroadcastDKGPartialSignature(psig) } // BroadcastAgreementResult broadcasts rand request to DKG set. func (n *DexconNetwork) BroadcastAgreementResult(randRequest *types.AgreementResult) { - + n.pm.BroadcastAgreementResult(randRequest) } // BroadcastRandomnessResult broadcasts rand request to Notary set. func (n *DexconNetwork) BroadcastRandomnessResult(randResult *types.BlockRandomnessResult) { + n.pm.BroadcastRandomnessResult(randResult) } // ReceiveChan returns a channel to receive messages from DEXON network. func (n *DexconNetwork) ReceiveChan() <-chan interface{} { - return n.receiveChan + return n.pm.ReceiveChan() } diff --git a/dex/peer.go b/dex/peer.go index 05947b456..e7c4f5d53 100644 --- a/dex/peer.go +++ b/dex/peer.go @@ -24,6 +24,7 @@ import ( "time" mapset "github.com/deckarep/golang-set" + coreTypes "github.com/dexon-foundation/dexon-consensus-core/core/types" "github.com/dexon-foundation/dexon/common" "github.com/dexon-foundation/dexon/core/types" "github.com/dexon-foundation/dexon/p2p" @@ -104,31 +105,45 @@ type peer struct { td *big.Int lock sync.RWMutex - knownTxs mapset.Set // Set of transaction hashes known to be known by this peer - knownMetas mapset.Set // Set of node metas known to be known by this peer - knownBlocks mapset.Set // Set of block hashes known to be known by this peer - queuedTxs chan []*types.Transaction // Queue of transactions to broadcast to the peer - queuedMetas chan []*NodeMeta // Queue of node metas to broadcast to the peer - queuedProps chan *propEvent // Queue of blocks to broadcast to the peer - queuedAnns chan *types.Block // Queue of blocks to announce to the peer - term chan struct{} // Termination channel to stop the broadcaster + knownTxs mapset.Set // Set of transaction hashes known to be known by this peer + knownMetas mapset.Set // Set of node metas known to be known by this peer + knownBlocks mapset.Set // Set of block hashes known to be known by this peer + knownVotes mapset.Set + queuedTxs chan []*types.Transaction // Queue of transactions to broadcast to the peer + queuedMetas chan []*NodeMeta // Queue of node metas to broadcast to the peer + queuedProps chan *propEvent // Queue of blocks to broadcast to the peer + queuedAnns chan *types.Block // Queue of blocks to announce to the peer + queuedLatticeBlock chan *coreTypes.Block + queuedVote chan *coreTypes.Vote + queuedAgreement chan *coreTypes.AgreementResult + queuedRandomness chan *coreTypes.BlockRandomnessResult + queuedDKGPrivateShare chan *coreTypes.DKGPrivateShare + queuedDKGPartialSignature chan *coreTypes.DKGPartialSignature + term chan struct{} // Termination channel to stop the broadcaster } func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { return &peer{ - Peer: p, - rw: rw, - version: version, - labels: mapset.NewSet(), - id: p.ID().String(), - knownTxs: mapset.NewSet(), - knownMetas: mapset.NewSet(), - knownBlocks: mapset.NewSet(), - queuedTxs: make(chan []*types.Transaction, maxQueuedTxs), - queuedMetas: make(chan []*NodeMeta, maxQueuedMetas), - queuedProps: make(chan *propEvent, maxQueuedProps), - queuedAnns: make(chan *types.Block, maxQueuedAnns), - term: make(chan struct{}), + Peer: p, + rw: rw, + version: version, + labels: mapset.NewSet(), + id: p.ID().String(), + knownTxs: mapset.NewSet(), + knownMetas: mapset.NewSet(), + knownBlocks: mapset.NewSet(), + knownVotes: mapset.NewSet(), + queuedTxs: make(chan []*types.Transaction, maxQueuedTxs), + queuedMetas: make(chan []*NodeMeta, maxQueuedMetas), + queuedProps: make(chan *propEvent, maxQueuedProps), + queuedAnns: make(chan *types.Block, maxQueuedAnns), + queuedLatticeBlock: make(chan *coreTypes.Block, 16), + queuedVote: make(chan *coreTypes.Vote, 16), + queuedAgreement: make(chan *coreTypes.AgreementResult, 16), + queuedRandomness: make(chan *coreTypes.BlockRandomnessResult, 16), + queuedDKGPrivateShare: make(chan *coreTypes.DKGPrivateShare, 16), + queuedDKGPartialSignature: make(chan *coreTypes.DKGPartialSignature, 16), + term: make(chan struct{}), } } @@ -161,7 +176,36 @@ func (p *peer) broadcast() { return } p.Log().Trace("Announced block", "number", block.Number(), "hash", block.Hash()) - + case block := <-p.queuedLatticeBlock: + if err := p.SendLatticeBlock(block); err != nil { + return + } + p.Log().Trace("Broadcast lattice block") + case vote := <-p.queuedVote: + if err := p.SendVote(vote); err != nil { + return + } + p.Log().Trace("Broadcast vote", "vote", vote.String(), "hash", rlpHash(vote)) + case agreement := <-p.queuedAgreement: + if err := p.SendAgreement(agreement); err != nil { + return + } + p.Log().Trace("Broadcast agreement") + case randomness := <-p.queuedRandomness: + if err := p.SendRandomness(randomness); err != nil { + return + } + p.Log().Trace("Broadcast randomness") + case privateShare := <-p.queuedDKGPrivateShare: + if err := p.SendDKGPrivateShare(privateShare); err != nil { + return + } + p.Log().Trace("Broadcast DKG private share") + case psig := <-p.queuedDKGPartialSignature: + if err := p.SendDKGPartialSignature(psig); err != nil { + return + } + p.Log().Trace("Broadcast DKG partial signature") case <-p.term: return } @@ -326,6 +370,78 @@ func (p *peer) AsyncSendNewBlock(block *types.Block, td *big.Int) { } } +func (p *peer) SendLatticeBlock(block *coreTypes.Block) error { + return p2p.Send(p.rw, LatticeBlockMsg, toRLPLatticeBlock(block)) +} + +func (p *peer) AsyncSendLatticeBlock(block *coreTypes.Block) { + select { + case p.queuedLatticeBlock <- block: + default: + p.Log().Debug("Dropping lattice block propagation") + } +} + +func (p *peer) SendVote(vote *coreTypes.Vote) error { + return p2p.Send(p.rw, VoteMsg, vote) +} + +func (p *peer) AsyncSendVote(vote *coreTypes.Vote) { + select { + case p.queuedVote <- vote: + default: + p.Log().Debug("Dropping vote propagation") + } +} + +func (p *peer) SendAgreement(agreement *coreTypes.AgreementResult) error { + return p2p.Send(p.rw, AgreementMsg, agreement) +} + +func (p *peer) AsyncSendAgreement(agreement *coreTypes.AgreementResult) { + select { + case p.queuedAgreement <- agreement: + default: + p.Log().Debug("Dropping agreement result") + } +} + +func (p *peer) SendRandomness(randomness *coreTypes.BlockRandomnessResult) error { + return p2p.Send(p.rw, RandomnessMsg, randomness) +} + +func (p *peer) AsyncSendRandomness(randomness *coreTypes.BlockRandomnessResult) { + select { + case p.queuedRandomness <- randomness: + default: + p.Log().Debug("Dropping randomness result") + } +} + +func (p *peer) SendDKGPrivateShare(privateShare *coreTypes.DKGPrivateShare) error { + return p2p.Send(p.rw, DKGPrivateShareMsg, toRLPDKGPrivateShare(privateShare)) +} + +func (p *peer) AsyncSendDKGPrivateShare(privateShare *coreTypes.DKGPrivateShare) { + select { + case p.queuedDKGPrivateShare <- privateShare: + default: + p.Log().Debug("Dropping DKG private share") + } +} + +func (p *peer) SendDKGPartialSignature(psig *coreTypes.DKGPartialSignature) error { + return p2p.Send(p.rw, DKGPartialSignatureMsg, psig) +} + +func (p *peer) AsyncSendDKGPartialSignature(psig *coreTypes.DKGPartialSignature) { + select { + case p.queuedDKGPartialSignature <- psig: + default: + p.Log().Debug("Dropping DKG partial signature") + } +} + // SendBlockHeaders sends a batch of block headers to the remote peer. func (p *peer) SendBlockHeaders(headers []*types.Header) error { return p2p.Send(p.rw, BlockHeadersMsg, headers) @@ -474,7 +590,8 @@ type peerSet struct { srvr p2pServer gov governance - peerLabels map[string]map[peerLabel]struct{} + peer2Labels map[string]map[peerLabel]struct{} + label2Peers map[peerLabel]map[string]struct{} notaryHistory map[uint64]struct{} dkgHistory map[uint64]struct{} } @@ -486,7 +603,8 @@ func newPeerSet(gov governance, srvr p2pServer, tab *nodeTable) *peerSet { gov: gov, srvr: srvr, tab: tab, - peerLabels: make(map[string]map[peerLabel]struct{}), + peer2Labels: make(map[string]map[peerLabel]struct{}), + label2Peers: make(map[peerLabel]map[string]struct{}), notaryHistory: make(map[uint64]struct{}), dkgHistory: make(map[uint64]struct{}), } @@ -573,6 +691,21 @@ func (ps *peerSet) PeersWithoutTx(hash common.Hash) []*peer { return list } +func (ps *peerSet) PeersWithoutVote(hash common.Hash, label peerLabel) []*peer { + ps.lock.RLock() + defer ps.lock.RUnlock() + + list := make([]*peer, 0, len(ps.label2Peers[label])) + for id := range ps.label2Peers[label] { + if p, ok := ps.peers[id]; ok { + if !p.knownVotes.Contains(hash) { + list = append(list, p) + } + } + } + return list +} + // PeersWithoutNodeMeta retrieves a list of peers that do not have a // given meta in their set of known hashes. func (ps *peerSet) PeersWithoutNodeMeta(hash common.Hash) []*peer { @@ -587,6 +720,31 @@ func (ps *peerSet) PeersWithoutNodeMeta(hash common.Hash) []*peer { return list } +// TODO(sonic): finish the following dummy function. +func (ps *peerSet) PeersWithoutAgreement(hash common.Hash) []*peer { + return ps.allPeers() +} + +func (ps *peerSet) PeersWithoutRandomness(hash common.Hash) []*peer { + return ps.allPeers() +} + +func (ps *peerSet) PeersWithoutDKGPartialSignature(hash common.Hash) []*peer { + return ps.allPeers() +} + +func (ps *peerSet) PeersWithoutLatticeBlock(hash common.Hash) []*peer { + return ps.allPeers() +} + +func (ps *peerSet) allPeers() []*peer { + list := make([]*peer, 0, len(ps.peers)) + for _, p := range ps.peers { + list = append(list, p) + } + return list +} + // BestPeer retrieves the known peer with the currently highest total difficulty. func (ps *peerSet) BestPeer() *peer { ps.lock.RLock() @@ -746,11 +904,16 @@ func (ps *peerSet) addDirectPeer(id string, label peerLabel) { p.addLabel(label) } - if _, ok := ps.peerLabels[id]; !ok { - ps.peerLabels[id] = make(map[peerLabel]struct{}) + if _, ok := ps.peer2Labels[id]; !ok { + ps.peer2Labels[id] = make(map[peerLabel]struct{}) } - ps.peerLabels[id][label] = struct{}{} + if _, ok := ps.label2Peers[label]; !ok { + ps.label2Peers[label] = make(map[string]struct{}) + } + + ps.peer2Labels[id][label] = struct{}{} + ps.label2Peers[label][id] = struct{}{} ps.srvr.AddDirectPeer(ps.newNode(id)) } @@ -760,11 +923,18 @@ func (ps *peerSet) removeDirectPeer(id string, label peerLabel) { p.removeLabel(label) } - delete(ps.peerLabels[id], label) + delete(ps.peer2Labels[id], label) - if len(ps.peerLabels[id]) == 0 { + if len(ps.peer2Labels[id]) == 0 { ps.srvr.RemoveDirectPeer(ps.newNode(id)) - delete(ps.peerLabels, id) + delete(ps.peer2Labels, id) + } + + if _, ok := ps.label2Peers[label]; ok { + delete(ps.label2Peers[label], id) + if len(ps.label2Peers[label]) == 0 { + delete(ps.label2Peers, label) + } } } diff --git a/dex/peer_test.go b/dex/peer_test.go index bac6ed5ec..6e539e078 100644 --- a/dex/peer_test.go +++ b/dex/peer_test.go @@ -74,7 +74,7 @@ func TestPeerSetBuildAndForgetNotaryConn(t *testing.T) { if err != nil { t.Error(err) } - err = checkPeerLabels(ps, map[string][]peerLabel{ + err = checkPeer2Labels(ps, map[string][]peerLabel{ nodeID(1).String(): []peerLabel{ peerLabel{notaryset, 0, 10}, }, @@ -120,7 +120,7 @@ func TestPeerSetBuildAndForgetNotaryConn(t *testing.T) { if err != nil { t.Error(err) } - err = checkPeerLabels(ps, map[string][]peerLabel{ + err = checkPeer2Labels(ps, map[string][]peerLabel{ nodeID(1).String(): []peerLabel{ peerLabel{notaryset, 0, 10}, peerLabel{notaryset, 0, 11}, @@ -176,7 +176,7 @@ func TestPeerSetBuildAndForgetNotaryConn(t *testing.T) { if err != nil { t.Error(err) } - err = checkPeerLabels(ps, map[string][]peerLabel{ + err = checkPeer2Labels(ps, map[string][]peerLabel{ nodeID(1).String(): []peerLabel{ peerLabel{notaryset, 0, 10}, peerLabel{notaryset, 0, 11}, @@ -242,7 +242,7 @@ func TestPeerSetBuildAndForgetNotaryConn(t *testing.T) { if err != nil { t.Error(err) } - err = checkPeerLabels(ps, map[string][]peerLabel{ + err = checkPeer2Labels(ps, map[string][]peerLabel{ nodeID(2).String(): []peerLabel{ peerLabel{notaryset, 2, 12}, }, @@ -291,7 +291,7 @@ func TestPeerSetBuildAndForgetNotaryConn(t *testing.T) { if err != nil { t.Error(err) } - err = checkPeerLabels(ps, map[string][]peerLabel{}) + err = checkPeer2Labels(ps, map[string][]peerLabel{}) if err != nil { t.Error(err) } @@ -354,7 +354,7 @@ func TestPeerSetBuildDKGConn(t *testing.T) { if err != nil { t.Error(err) } - err = checkPeerLabels(ps, map[string][]peerLabel{ + err = checkPeer2Labels(ps, map[string][]peerLabel{ nodeID(1).String(): []peerLabel{ peerLabel{dkgset, 0, 10}, }, @@ -391,7 +391,7 @@ func TestPeerSetBuildDKGConn(t *testing.T) { if err != nil { t.Error(err) } - err = checkPeerLabels(ps, map[string][]peerLabel{ + err = checkPeer2Labels(ps, map[string][]peerLabel{ nodeID(1).String(): []peerLabel{ peerLabel{dkgset, 0, 10}, }, @@ -428,7 +428,7 @@ func TestPeerSetBuildDKGConn(t *testing.T) { if err != nil { t.Error(err) } - err = checkPeerLabels(ps, map[string][]peerLabel{ + err = checkPeer2Labels(ps, map[string][]peerLabel{ nodeID(1).String(): []peerLabel{ peerLabel{dkgset, 0, 10}, }, @@ -467,7 +467,7 @@ func TestPeerSetBuildDKGConn(t *testing.T) { if err != nil { t.Error(err) } - err = checkPeerLabels(ps, map[string][]peerLabel{ + err = checkPeer2Labels(ps, map[string][]peerLabel{ nodeID(3).String(): []peerLabel{ peerLabel{dkgset, 0, 12}, }, @@ -499,7 +499,7 @@ func TestPeerSetBuildDKGConn(t *testing.T) { if err != nil { t.Error(err) } - err = checkPeerLabels(ps, map[string][]peerLabel{}) + err = checkPeer2Labels(ps, map[string][]peerLabel{}) if err != nil { t.Error(err) } @@ -527,13 +527,13 @@ func checkLabels(p *peer, want []peerLabel) error { return nil } -func checkPeerLabels(ps *peerSet, want map[string][]peerLabel) error { - if len(ps.peerLabels) != len(want) { +func checkPeer2Labels(ps *peerSet, want map[string][]peerLabel) error { + if len(ps.peer2Labels) != len(want) { return fmt.Errorf("peer num mismatch: got %d, want %d", - len(ps.peerLabels), len(want)) + len(ps.peer2Labels), len(want)) } - for peerID, gotLabels := range ps.peerLabels { + for peerID, gotLabels := range ps.peer2Labels { wantLabels, ok := want[peerID] if !ok { return fmt.Errorf("peer id %s not exists", peerID) diff --git a/dex/protocol.go b/dex/protocol.go index 7b01217ff..94241104b 100644 --- a/dex/protocol.go +++ b/dex/protocol.go @@ -20,13 +20,18 @@ import ( "fmt" "io" "math/big" + "time" + coreCommon "github.com/dexon-foundation/dexon-consensus-core/common" + "github.com/dexon-foundation/dexon-consensus-core/core/crypto" + coreTypes "github.com/dexon-foundation/dexon-consensus-core/core/types" "github.com/dexon-foundation/dexon/common" "github.com/dexon-foundation/dexon/core" "github.com/dexon-foundation/dexon/core/types" "github.com/dexon-foundation/dexon/event" "github.com/dexon-foundation/dexon/p2p/enode" "github.com/dexon-foundation/dexon/rlp" + "golang.org/x/crypto/sha3" ) // Constants to match up protocol versions and messages @@ -41,7 +46,7 @@ var ProtocolName = "dex" var ProtocolVersions = []uint{dex64} // ProtocolLengths are the number of implemented message corresponding to different protocol versions. -var ProtocolLengths = []uint64{18} +var ProtocolLengths = []uint64{38} const ProtocolMaxMsgSize = 10 * 1024 * 1024 // Maximum cap on the size of a protocol message @@ -65,6 +70,13 @@ const ( // Protocol messages belonging to dex/64 MetaMsg = 0x11 + + LatticeBlockMsg = 0x20 + VoteMsg = 0x21 + AgreementMsg = 0x22 + RandomnessMsg = 0x23 + DKGPrivateShareMsg = 0x24 + DKGPartialSignatureMsg = 0x25 ) type errCode int @@ -206,3 +218,123 @@ type blockBody struct { // blockBodiesData is the network packet for block content distribution. type blockBodiesData []*blockBody + +func rlpHash(x interface{}) (h common.Hash) { + hw := sha3.NewLegacyKeccak256() + rlp.Encode(hw, x) + hw.Sum(h[:0]) + return h +} + +type rlpDKGPrivateShare struct { + ProposerID coreTypes.NodeID + ReceiverID coreTypes.NodeID + Round uint64 + PrivateShare []byte + Signature crypto.Signature +} + +func toRLPDKGPrivateShare(ps *coreTypes.DKGPrivateShare) *rlpDKGPrivateShare { + return &rlpDKGPrivateShare{ + ProposerID: ps.ProposerID, + ReceiverID: ps.ReceiverID, + Round: ps.Round, + PrivateShare: ps.PrivateShare.Bytes(), + Signature: ps.Signature, + } +} + +func fromRLPDKGPrivateShare(rps *rlpDKGPrivateShare) *coreTypes.DKGPrivateShare { + ps := &coreTypes.DKGPrivateShare{ + ProposerID: rps.ProposerID, + ReceiverID: rps.ReceiverID, + Round: rps.Round, + Signature: rps.Signature, + } + ps.PrivateShare.SetBytes(rps.PrivateShare) + return ps +} + +type rlpWitness struct { + Timestamp uint64 + Height uint64 + Data []byte +} + +type rlpFinalizeResult struct { + Randomness []byte + Timestamp uint64 + Height uint64 +} + +type rlpLatticeBlock struct { + ProposerID coreTypes.NodeID `json:"proposer_id"` + ParentHash coreCommon.Hash `json:"parent_hash"` + Hash coreCommon.Hash `json:"hash"` + Position coreTypes.Position `json:"position"` + Timestamp uint64 `json:"timestamps"` + Acks coreCommon.SortedHashes `json:"acks"` + Payload []byte `json:"payload"` + Witness rlpWitness + Finalization rlpFinalizeResult + Signature crypto.Signature `json:"signature"` + CRSSignature crypto.Signature `json:"crs_signature"` +} + +func toRLPLatticeBlock(b *coreTypes.Block) *rlpLatticeBlock { + return &rlpLatticeBlock{ + ProposerID: b.ProposerID, + ParentHash: b.ParentHash, + Hash: b.Hash, + Position: b.Position, + Timestamp: toMillisecond(b.Timestamp), + Acks: b.Acks, + Payload: b.Payload, + Witness: rlpWitness{ + Timestamp: toMillisecond(b.Witness.Timestamp), + Height: b.Witness.Height, + Data: b.Witness.Data, + }, + Finalization: rlpFinalizeResult{ + Randomness: b.Finalization.Randomness, + Timestamp: toMillisecond(b.Finalization.Timestamp), + Height: b.Finalization.Height, + }, + Signature: b.Signature, + CRSSignature: b.CRSSignature, + } +} + +func fromRLPLatticeBlock(rb *rlpLatticeBlock) *coreTypes.Block { + return &coreTypes.Block{ + ProposerID: rb.ProposerID, + ParentHash: rb.ParentHash, + Hash: rb.Hash, + Position: rb.Position, + Timestamp: fromMillisecond(rb.Timestamp), + Acks: rb.Acks, + Payload: rb.Payload, + Witness: coreTypes.Witness{ + Timestamp: fromMillisecond(rb.Witness.Timestamp), + Height: rb.Witness.Height, + Data: rb.Witness.Data, + }, + Finalization: coreTypes.FinalizationResult{ + Randomness: rb.Finalization.Randomness, + Timestamp: fromMillisecond(rb.Finalization.Timestamp), + Height: rb.Finalization.Height, + }, + Signature: rb.Signature, + CRSSignature: rb.CRSSignature, + } +} + +func fromMillisecond(s uint64) time.Time { + sec := int64(s / 1000) + nsec := int64((s % 1000) * 1000000) + return time.Unix(sec, nsec) +} + +func toMillisecond(t time.Time) uint64 { + return uint64(t.UnixNano() / 1000000) +} diff --git a/dex/protocol_test.go b/dex/protocol_test.go index 8c7638b2b..a26a40feb 100644 --- a/dex/protocol_test.go +++ b/dex/protocol_test.go @@ -18,10 +18,16 @@ package dex import ( "fmt" + "reflect" "sync" "testing" "time" + coreCommon "github.com/dexon-foundation/dexon-consensus-core/common" + coreCrypto "github.com/dexon-foundation/dexon-consensus-core/core/crypto" + "github.com/dexon-foundation/dexon-consensus-core/core/crypto/dkg" + coreTypes "github.com/dexon-foundation/dexon-consensus-core/core/types" + "github.com/dexon-foundation/dexon/common" "github.com/dexon-foundation/dexon/core/types" "github.com/dexon-foundation/dexon/crypto" @@ -303,3 +309,496 @@ func TestSendNodeMetas(t *testing.T) { pm.nodeTable.Add(allmetas) wg.Wait() } + +func TestRecvLatticeBlock(t *testing.T) { + pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 0, nil, nil) + p, _ := newTestPeer("peer", dex64, pm, true) + defer pm.Stop() + defer p.close() + + block := coreTypes.Block{ + ProposerID: coreTypes.NodeID{coreCommon.Hash{1, 2, 3}}, + ParentHash: coreCommon.Hash{1, 1, 1, 1, 1}, + Hash: coreCommon.Hash{2, 2, 2, 2, 2}, + Position: coreTypes.Position{ + ChainID: 11, + Round: 12, + Height: 13, + }, + Timestamp: fromMillisecond(toMillisecond(time.Now())), + Acks: coreCommon.NewSortedHashes(coreCommon.Hashes([]coreCommon.Hash{ + coreCommon.Hash{101}, coreCommon.Hash{100}, coreCommon.Hash{102}, + })), + Payload: []byte{3, 3, 3, 3, 3}, + Witness: coreTypes.Witness{ + Timestamp: fromMillisecond(toMillisecond(time.Now())), + Height: 13, + Data: []byte{4, 4, 4, 4, 4}, + }, + Finalization: coreTypes.FinalizationResult{ + Randomness: []byte{5, 5, 5, 5, 5}, + Timestamp: fromMillisecond(toMillisecond(time.Now())), + Height: 13, + }, + Signature: coreCrypto.Signature{ + Type: "signature", + Signature: []byte("signature"), + }, + CRSSignature: coreCrypto.Signature{ + Type: "crs-signature", + Signature: []byte("crs-signature"), + }, + } + + if err := p2p.Send(p.app, LatticeBlockMsg, toRLPLatticeBlock(&block)); err != nil { + t.Fatalf("send error: %v", err) + } + + ch := pm.ReceiveChan() + select { + case msg := <-ch: + rb := msg.(*coreTypes.Block) + if !reflect.DeepEqual(rb, &block) { + t.Errorf("block mismatch") + } + case <-time.After(3 * time.Second): + t.Errorf("no newMetasEvent received within 3 seconds") + } +} + +func TestSendLatticeBlock(t *testing.T) { + pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 0, nil, nil) + p, _ := newTestPeer("peer", dex64, pm, true) + defer pm.Stop() + defer p.close() + + block := coreTypes.Block{ + ProposerID: coreTypes.NodeID{coreCommon.Hash{1, 2, 3}}, + ParentHash: coreCommon.Hash{1, 1, 1, 1, 1}, + Hash: coreCommon.Hash{2, 2, 2, 2, 2}, + Position: coreTypes.Position{ + ChainID: 11, + Round: 12, + Height: 13, + }, + Timestamp: fromMillisecond(toMillisecond(time.Now())), + Acks: coreCommon.NewSortedHashes(coreCommon.Hashes([]coreCommon.Hash{ + coreCommon.Hash{101}, coreCommon.Hash{100}, coreCommon.Hash{102}, + })), + Payload: []byte{3, 3, 3, 3, 3}, + Witness: coreTypes.Witness{ + Timestamp: fromMillisecond(toMillisecond(time.Now())), + Height: 13, + Data: []byte{4, 4, 4, 4, 4}, + }, + Finalization: coreTypes.FinalizationResult{ + Randomness: []byte{5, 5, 5, 5, 5}, + Timestamp: fromMillisecond(toMillisecond(time.Now())), + Height: 13, + }, + Signature: coreCrypto.Signature{ + Type: "signature", + Signature: []byte("signature"), + }, + CRSSignature: coreCrypto.Signature{ + Type: "crs-signature", + Signature: []byte("crs-signature"), + }, + } + + pm.BroadcastLatticeBlock(&block) + msg, err := p.app.ReadMsg() + if err != nil { + t.Errorf("%v: read error: %v", p.Peer, err) + } else if msg.Code != LatticeBlockMsg { + t.Errorf("%v: got code %d, want %d", p.Peer, msg.Code, LatticeBlockMsg) + } + + var rb rlpLatticeBlock + if err := msg.Decode(&rb); err != nil { + t.Errorf("%v: %v", p.Peer, err) + } + + if !reflect.DeepEqual(fromRLPLatticeBlock(&rb), &block) { + t.Errorf("block mismatch") + } +} + +func TestRecvVote(t *testing.T) { + pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 0, nil, nil) + p, _ := newTestPeer("peer", dex64, pm, true) + defer pm.Stop() + defer p.close() + + vote := coreTypes.Vote{ + ProposerID: coreTypes.NodeID{coreCommon.Hash{1, 2, 3}}, + Period: 10, + Position: coreTypes.Position{ + ChainID: 11, + Round: 12, + Height: 13, + }, + Signature: coreCrypto.Signature{ + Type: "123", + Signature: []byte("sig"), + }, + } + + if err := p2p.Send(p.app, VoteMsg, vote); err != nil { + t.Fatalf("send error: %v", err) + } + + ch := pm.ReceiveChan() + + select { + case msg := <-ch: + rvote := msg.(*coreTypes.Vote) + if rlpHash(rvote) != rlpHash(vote) { + t.Errorf("vote mismatch") + } + case <-time.After(1 * time.Second): + t.Errorf("no vote received within 1 seconds") + } +} + +func TestSendVote(t *testing.T) { + pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 0, nil, nil) + defer pm.Stop() + + vote := coreTypes.Vote{ + ProposerID: coreTypes.NodeID{coreCommon.Hash{1, 2, 3}}, + Period: 10, + Position: coreTypes.Position{ + ChainID: 1, + Round: 10, + Height: 13, + }, + Signature: coreCrypto.Signature{ + Type: "123", + Signature: []byte("sig"), + }, + } + + // Connect several peers. They should all receive the pending transactions. + var wg sync.WaitGroup + checkvote := func(p *testPeer, isReceiver bool) { + defer wg.Done() + defer p.close() + if !isReceiver { + go func() { + time.Sleep(100 * time.Millisecond) + p.close() + }() + } + + msg, err := p.app.ReadMsg() + if !isReceiver { + if err != p2p.ErrPipeClosed { + t.Errorf("err mismatch: got %v, want %v (not receiver peer)", + err, p2p.ErrPipeClosed) + } + return + } + + var v coreTypes.Vote + if err != nil { + t.Errorf("%v: read error: %v", p.Peer, err) + } else if msg.Code != VoteMsg { + t.Errorf("%v: got code %d, want %d", p.Peer, msg.Code, VoteMsg) + } + if err := msg.Decode(&v); err != nil { + t.Errorf("%v: %v", p.Peer, err) + } + if !reflect.DeepEqual(v, vote) { + t.Errorf("vote mismatch") + } + } + + testPeers := []struct { + label *peerLabel + isReceiver bool + }{ + { + label: &peerLabel{set: notaryset, chainID: 1, round: 10}, + isReceiver: true, + }, + { + label: &peerLabel{set: notaryset, chainID: 1, round: 10}, + isReceiver: true, + }, + { + label: nil, + isReceiver: false, + }, + { + label: &peerLabel{set: notaryset, chainID: 1, round: 11}, + isReceiver: false, + }, + { + label: &peerLabel{set: notaryset, chainID: 2, round: 10}, + isReceiver: false, + }, + { + label: &peerLabel{set: dkgset, chainID: 1, round: 10}, + isReceiver: false, + }, + } + + for i, tt := range testPeers { + p, _ := newTestPeer(fmt.Sprintf("peer #%d", i), dex64, pm, true) + if tt.label != nil { + pm.peers.addDirectPeer(p.id, *tt.label) + } + wg.Add(1) + go checkvote(p, tt.isReceiver) + } + pm.BroadcastVote(&vote) + wg.Wait() +} + +type mockPublicKey []byte + +func (p mockPublicKey) VerifySignature(hash coreCommon.Hash, signature coreCrypto.Signature) bool { + return true +} + +func (p mockPublicKey) Bytes() []byte { + return append([]byte{1}, p...) +} + +func TestRecvDKGPrivateShare(t *testing.T) { + pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 0, nil, nil) + p, _ := newTestPeer("peer1", dex64, pm, true) + defer pm.Stop() + defer p.close() + + // TODO(sonic): polish this + privkey := dkg.NewPrivateKey() + privateShare := coreTypes.DKGPrivateShare{ + ProposerID: coreTypes.NodeID{coreCommon.Hash{1, 2, 3}}, + ReceiverID: coreTypes.NodeID{coreCommon.Hash{3, 4, 5}}, + Round: 10, + PrivateShare: *privkey, + Signature: coreCrypto.Signature{ + Type: "DKGPrivateShare", + Signature: []byte("DKGPrivateShare"), + }, + } + + if err := p2p.Send( + p.app, DKGPrivateShareMsg, toRLPDKGPrivateShare(&privateShare)); err != nil { + t.Fatalf("send error: %v", err) + } + + ch := pm.ReceiveChan() + select { + case msg := <-ch: + rps := msg.(*coreTypes.DKGPrivateShare) + if !reflect.DeepEqual( + toRLPDKGPrivateShare(rps), toRLPDKGPrivateShare(&privateShare)) { + t.Errorf("vote mismatch") + } + case <-time.After(1 * time.Second): + t.Errorf("no dkg received within 1 seconds") + } +} + +func TestSendDKGPrivateShare(t *testing.T) { + pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 0, nil, nil) + p1, _ := newTestPeer("peer1", dex64, pm, true) + p2, _ := newTestPeer("peer2", dex64, pm, true) + defer pm.Stop() + defer p1.close() + + // TODO(sonic): polish this + privkey := dkg.NewPrivateKey() + privateShare := coreTypes.DKGPrivateShare{ + ProposerID: coreTypes.NodeID{coreCommon.Hash{1, 2, 3}}, + ReceiverID: coreTypes.NodeID{coreCommon.Hash{3, 4, 5}}, + Round: 10, + PrivateShare: *privkey, + Signature: coreCrypto.Signature{ + Type: "DKGPrivateShare", + Signature: []byte("DKGPrivateShare"), + }, + } + + go pm.SendDKGPrivateShare(mockPublicKey(p1.ID().Bytes()), &privateShare) + msg, err := p1.app.ReadMsg() + if err != nil { + t.Errorf("%v: read error: %v", p1.Peer, err) + } else if msg.Code != DKGPrivateShareMsg { + t.Errorf("%v: got code %d, want %d", p1.Peer, msg.Code, DKGPrivateShareMsg) + } + + var rps rlpDKGPrivateShare + if err := msg.Decode(&rps); err != nil { + t.Errorf("%v: %v", p1.Peer, err) + } + + expected := toRLPDKGPrivateShare(&privateShare) + if !reflect.DeepEqual(rps, *expected) { + t.Errorf("DKG private share mismatch") + } + + go func() { + time.Sleep(500 * time.Millisecond) + p2.close() + }() + + msg, err = p2.app.ReadMsg() + if err != p2p.ErrPipeClosed { + t.Errorf("err mismatch: got %v, want %v (not receiver peer)", + err, p2p.ErrPipeClosed) + } +} + +func TestRecvAgreement(t *testing.T) { + pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 0, nil, nil) + p, _ := newTestPeer("peer", dex64, pm, true) + defer pm.Stop() + defer p.close() + + // TODO(sonic): polish this + vote := coreTypes.Vote{ + ProposerID: coreTypes.NodeID{coreCommon.Hash{1, 2, 3}}, + Period: 10, + Position: coreTypes.Position{ + ChainID: 1, + Round: 10, + Height: 13, + }, + Signature: coreCrypto.Signature{ + Type: "123", + Signature: []byte("sig"), + }, + } + + agreement := coreTypes.AgreementResult{ + BlockHash: coreCommon.Hash{9, 9, 9}, + Round: 13, + Position: vote.Position, + Votes: []coreTypes.Vote{vote}, + } + + if err := p2p.Send(p.app, AgreementMsg, &agreement); err != nil { + t.Fatalf("send error: %v", err) + } + + ch := pm.ReceiveChan() + select { + case msg := <-ch: + a := msg.(*coreTypes.AgreementResult) + if !reflect.DeepEqual(a, &agreement) { + t.Errorf("agreement mismatch") + } + case <-time.After(1 * time.Second): + t.Errorf("no agreement received within 1 seconds") + } +} + +func TestSendAgreement(t *testing.T) { + pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 0, nil, nil) + p, _ := newTestPeer("peer", dex64, pm, true) + defer pm.Stop() + defer p.close() + + // TODO(sonic): polish this + vote := coreTypes.Vote{ + ProposerID: coreTypes.NodeID{coreCommon.Hash{1, 2, 3}}, + Period: 10, + Position: coreTypes.Position{ + ChainID: 1, + Round: 10, + Height: 13, + }, + Signature: coreCrypto.Signature{ + Type: "123", + Signature: []byte("sig"), + }, + } + + agreement := coreTypes.AgreementResult{ + BlockHash: coreCommon.Hash{9, 9, 9}, + Round: 13, + Position: vote.Position, + Votes: []coreTypes.Vote{vote}, + } + + pm.BroadcastAgreementResult(&agreement) + msg, err := p.app.ReadMsg() + if err != nil { + t.Errorf("%v: read error: %v", p.Peer, err) + } else if msg.Code != AgreementMsg { + t.Errorf("%v: got code %d, want %d", p.Peer, msg.Code, AgreementMsg) + } + + var a coreTypes.AgreementResult + if err := msg.Decode(&a); err != nil { + t.Errorf("%v: %v", p.Peer, err) + } + + if !reflect.DeepEqual(a, agreement) { + t.Errorf("agreement mismatch") + } +} + +func TestRecvRandomness(t *testing.T) { + pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 0, nil, nil) + p, _ := newTestPeer("peer", dex64, pm, true) + defer pm.Stop() + defer p.close() + + // TODO(sonic): polish this + randomness := coreTypes.BlockRandomnessResult{ + BlockHash: coreCommon.Hash{8, 8, 8}, + Round: 17, + Randomness: []byte{7, 7, 7, 7}, + } + + if err := p2p.Send(p.app, RandomnessMsg, &randomness); err != nil { + t.Fatalf("send error: %v", err) + } + + ch := pm.ReceiveChan() + select { + case msg := <-ch: + r := msg.(*coreTypes.BlockRandomnessResult) + if !reflect.DeepEqual(r, &randomness) { + t.Errorf("randomness mismatch") + } + case <-time.After(1 * time.Second): + t.Errorf("no randomness received within 1 seconds") + } +} + +func TestSendRandomness(t *testing.T) { + pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 0, nil, nil) + p, _ := newTestPeer("peer", dex64, pm, true) + defer pm.Stop() + defer p.close() + + // TODO(sonic): polish this + randomness := coreTypes.BlockRandomnessResult{ + BlockHash: coreCommon.Hash{8, 8, 8}, + Round: 17, + Randomness: []byte{7, 7, 7, 7}, + } + + pm.BroadcastRandomnessResult(&randomness) + msg, err := p.app.ReadMsg() + if err != nil { + t.Errorf("%v: read error: %v", p.Peer, err) + } else if msg.Code != RandomnessMsg { + t.Errorf("%v: got code %d, want %d", p.Peer, msg.Code, RandomnessMsg) + } + + var r coreTypes.BlockRandomnessResult + if err := msg.Decode(&r); err != nil { + t.Errorf("%v: %v", p.Peer, err) + } + + if !reflect.DeepEqual(r, randomness) { + t.Errorf("agreement mismatch") + } +} |