aboutsummaryrefslogtreecommitdiffstats
path: root/dex/handler.go
diff options
context:
space:
mode:
Diffstat (limited to 'dex/handler.go')
-rw-r--r--dex/handler.go80
1 files changed, 80 insertions, 0 deletions
diff --git a/dex/handler.go b/dex/handler.go
index be50cb43d..5662e2b84 100644
--- a/dex/handler.go
+++ b/dex/handler.go
@@ -1,3 +1,20 @@
+// Copyright 2018 The dexon-consensus-core Authors
+// This file is part of the dexon-consensus-core library.
+//
+// The dexon-consensus-core library is free software: you can redistribute it
+// and/or modify it under the terms of the GNU Lesser General Public License as
+// published by the Free Software Foundation, either version 3 of the License,
+// or (at your option) any later version.
+//
+// The dexon-consensus-core library is distributed in the hope that it will be
+// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
+// General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the dexon-consensus-core library. If not, see
+// <http://www.gnu.org/licenses/>.
+
// Copyright 2015 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
@@ -26,6 +43,7 @@ import (
"sync/atomic"
"time"
+ coreCommon "github.com/dexon-foundation/dexon-consensus-core/common"
coreCrypto "github.com/dexon-foundation/dexon-consensus-core/core/crypto"
coreTypes "github.com/dexon-foundation/dexon-consensus-core/core/types"
dkgTypes "github.com/dexon-foundation/dexon-consensus-core/core/types/dkg"
@@ -55,6 +73,8 @@ const (
txChanSize = 4096
metaChanSize = 10240
+
+ maxPullPeers = 3
)
// errIncompatibleConfig is returned if the requested protocols and configs are
@@ -76,6 +96,7 @@ type ProtocolManager struct {
gov governance
blockchain *core.BlockChain
chainconfig *params.ChainConfig
+ cache *cache
maxPeers int
downloader *downloader.Downloader
@@ -128,6 +149,7 @@ func NewProtocolManager(
nodeTable: tab,
gov: gov,
blockchain: blockchain,
+ cache: newCache(128),
chainconfig: config,
newPeerCh: make(chan *peer),
noMorePeers: make(chan struct{}),
@@ -714,12 +736,16 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
if err := msg.Decode(&block); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
+ pm.cache.addBlock(&block)
pm.receiveCh <- &block
case msg.Code == VoteMsg:
var vote coreTypes.Vote
if err := msg.Decode(&vote); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
+ if vote.Type >= coreTypes.VotePreCom {
+ pm.cache.addVote(&vote)
+ }
pm.receiveCh <- &vote
case msg.Code == AgreementMsg:
// DKG set is receiver
@@ -749,6 +775,30 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
pm.receiveCh <- &psig
+ case msg.Code == PullBlocksMsg:
+ var hashes coreCommon.Hashes
+ if err := msg.Decode(&hashes); err != nil {
+ return errResp(ErrDecode, "msg %v: %v", msg, err)
+ }
+ blocks := pm.cache.blocks(hashes)
+ log.Debug("Push blocks", "blocks", blocks)
+ for _, block := range blocks {
+ if err := p.SendLatticeBlock(block); err != nil {
+ return err
+ }
+ }
+ case msg.Code == PullVotesMsg:
+ var pos coreTypes.Position
+ if err := msg.Decode(&pos); err != nil {
+ return errResp(ErrDecode, "msg %v: %v", msg, err)
+ }
+ votes := pm.cache.votes(pos)
+ log.Debug("Push votes", "votes", votes)
+ for _, vote := range votes {
+ if err := p.SendVote(vote); err != nil {
+ return err
+ }
+ }
default:
return errResp(ErrInvalidMsgCode, "%v", msg.Code)
}
@@ -827,6 +877,7 @@ func (pm *ProtocolManager) BroadcastMetas(metas []*NodeMeta) {
// TODO(sonic): block size is big, try not to send to all peers
// to reduce traffic
func (pm *ProtocolManager) BroadcastLatticeBlock(block *coreTypes.Block) {
+ pm.cache.addBlock(block)
for _, peer := range pm.peers.PeersWithoutLatticeBlock(rlpHash(block)) {
peer.AsyncSendLatticeBlock(block)
}
@@ -834,6 +885,9 @@ func (pm *ProtocolManager) BroadcastLatticeBlock(block *coreTypes.Block) {
// BroadcastVote broadcasts the given vote to all peers in same notary set
func (pm *ProtocolManager) BroadcastVote(vote *coreTypes.Vote) {
+ if vote.Type >= coreTypes.VotePreCom {
+ pm.cache.addVote(vote)
+ }
label := peerLabel{
set: notaryset,
chainID: vote.Position.ChainID,
@@ -916,6 +970,32 @@ func (pm *ProtocolManager) BroadcastDKGPartialSignature(
}
}
+func (pm *ProtocolManager) BroadcastPullBlocks(
+ hashes coreCommon.Hashes) {
+ // TODO(jimmy-dexon): pull from notary set only.
+ for idx, peer := range pm.peers.Peers() {
+ if idx >= maxPullPeers {
+ break
+ }
+ peer.AsyncSendPullBlocks(hashes)
+ }
+}
+
+func (pm *ProtocolManager) BroadcastPullVotes(
+ pos coreTypes.Position) {
+ label := peerLabel{
+ set: notaryset,
+ chainID: pos.ChainID,
+ round: pos.Round,
+ }
+ for idx, peer := range pm.peers.PeersWithLabel(label) {
+ if idx >= maxPullPeers {
+ break
+ }
+ peer.AsyncSendPullVotes(pos)
+ }
+}
+
// Mined broadcast loop
func (pm *ProtocolManager) minedBroadcastLoop() {
// automatically stops if unsubscribe