diff options
Diffstat (limited to 'dex/handler.go')
-rw-r--r-- | dex/handler.go | 80 |
1 files changed, 80 insertions, 0 deletions
diff --git a/dex/handler.go b/dex/handler.go index be50cb43d..5662e2b84 100644 --- a/dex/handler.go +++ b/dex/handler.go @@ -1,3 +1,20 @@ +// Copyright 2018 The dexon-consensus-core Authors +// This file is part of the dexon-consensus-core library. +// +// The dexon-consensus-core library is free software: you can redistribute it +// and/or modify it under the terms of the GNU Lesser General Public License as +// published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The dexon-consensus-core library is distributed in the hope that it will be +// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the dexon-consensus-core library. If not, see +// <http://www.gnu.org/licenses/>. + // Copyright 2015 The go-ethereum Authors // This file is part of the go-ethereum library. // @@ -26,6 +43,7 @@ import ( "sync/atomic" "time" + coreCommon "github.com/dexon-foundation/dexon-consensus-core/common" coreCrypto "github.com/dexon-foundation/dexon-consensus-core/core/crypto" coreTypes "github.com/dexon-foundation/dexon-consensus-core/core/types" dkgTypes "github.com/dexon-foundation/dexon-consensus-core/core/types/dkg" @@ -55,6 +73,8 @@ const ( txChanSize = 4096 metaChanSize = 10240 + + maxPullPeers = 3 ) // errIncompatibleConfig is returned if the requested protocols and configs are @@ -76,6 +96,7 @@ type ProtocolManager struct { gov governance blockchain *core.BlockChain chainconfig *params.ChainConfig + cache *cache maxPeers int downloader *downloader.Downloader @@ -128,6 +149,7 @@ func NewProtocolManager( nodeTable: tab, gov: gov, blockchain: blockchain, + cache: newCache(128), chainconfig: config, newPeerCh: make(chan *peer), noMorePeers: make(chan struct{}), @@ -714,12 +736,16 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { if err := msg.Decode(&block); err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) } + pm.cache.addBlock(&block) pm.receiveCh <- &block case msg.Code == VoteMsg: var vote coreTypes.Vote if err := msg.Decode(&vote); err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) } + if vote.Type >= coreTypes.VotePreCom { + pm.cache.addVote(&vote) + } pm.receiveCh <- &vote case msg.Code == AgreementMsg: // DKG set is receiver @@ -749,6 +775,30 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { return errResp(ErrDecode, "msg %v: %v", msg, err) } pm.receiveCh <- &psig + case msg.Code == PullBlocksMsg: + var hashes coreCommon.Hashes + if err := msg.Decode(&hashes); err != nil { + return errResp(ErrDecode, "msg %v: %v", msg, err) + } + blocks := pm.cache.blocks(hashes) + log.Debug("Push blocks", "blocks", blocks) + for _, block := range blocks { + if err := p.SendLatticeBlock(block); err != nil { + return err + } + } + case msg.Code == PullVotesMsg: + var pos coreTypes.Position + if err := msg.Decode(&pos); err != nil { + return errResp(ErrDecode, "msg %v: %v", msg, err) + } + votes := pm.cache.votes(pos) + log.Debug("Push votes", "votes", votes) + for _, vote := range votes { + if err := p.SendVote(vote); err != nil { + return err + } + } default: return errResp(ErrInvalidMsgCode, "%v", msg.Code) } @@ -827,6 +877,7 @@ func (pm *ProtocolManager) BroadcastMetas(metas []*NodeMeta) { // TODO(sonic): block size is big, try not to send to all peers // to reduce traffic func (pm *ProtocolManager) BroadcastLatticeBlock(block *coreTypes.Block) { + pm.cache.addBlock(block) for _, peer := range pm.peers.PeersWithoutLatticeBlock(rlpHash(block)) { peer.AsyncSendLatticeBlock(block) } @@ -834,6 +885,9 @@ func (pm *ProtocolManager) BroadcastLatticeBlock(block *coreTypes.Block) { // BroadcastVote broadcasts the given vote to all peers in same notary set func (pm *ProtocolManager) BroadcastVote(vote *coreTypes.Vote) { + if vote.Type >= coreTypes.VotePreCom { + pm.cache.addVote(vote) + } label := peerLabel{ set: notaryset, chainID: vote.Position.ChainID, @@ -916,6 +970,32 @@ func (pm *ProtocolManager) BroadcastDKGPartialSignature( } } +func (pm *ProtocolManager) BroadcastPullBlocks( + hashes coreCommon.Hashes) { + // TODO(jimmy-dexon): pull from notary set only. + for idx, peer := range pm.peers.Peers() { + if idx >= maxPullPeers { + break + } + peer.AsyncSendPullBlocks(hashes) + } +} + +func (pm *ProtocolManager) BroadcastPullVotes( + pos coreTypes.Position) { + label := peerLabel{ + set: notaryset, + chainID: pos.ChainID, + round: pos.Round, + } + for idx, peer := range pm.peers.PeersWithLabel(label) { + if idx >= maxPullPeers { + break + } + peer.AsyncSendPullVotes(pos) + } +} + // Mined broadcast loop func (pm *ProtocolManager) minedBroadcastLoop() { // automatically stops if unsubscribe |