diff options
-rw-r--r-- | dex/cache.go | 115 | ||||
-rw-r--r-- | dex/cache_test.go | 144 | ||||
-rw-r--r-- | dex/handler.go | 80 | ||||
-rw-r--r-- | dex/network.go | 2 | ||||
-rw-r--r-- | dex/peer.go | 55 | ||||
-rw-r--r-- | dex/protocol.go | 21 |
6 files changed, 415 insertions, 2 deletions
diff --git a/dex/cache.go b/dex/cache.go new file mode 100644 index 000000000..f373a642e --- /dev/null +++ b/dex/cache.go @@ -0,0 +1,115 @@ +// Copyright 2018 The dexon-consensus-core Authors +// This file is part of the dexon-consensus-core library. +// +// The dexon-consensus-core library is free software: you can redistribute it +// and/or modify it under the terms of the GNU Lesser General Public License as +// published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The dexon-consensus-core library is distributed in the hope that it will be +// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the dexon-consensus-core library. If not, see +// <http://www.gnu.org/licenses/>. + +package dex + +import ( + "sync" + + coreCommon "github.com/dexon-foundation/dexon-consensus-core/common" + coreTypes "github.com/dexon-foundation/dexon-consensus-core/core/types" +) + +type voteKey struct { + ProposerID coreTypes.NodeID + Type coreTypes.VoteType + BlockHash coreCommon.Hash + Period uint64 + Position coreTypes.Position +} + +func voteToKey(vote *coreTypes.Vote) voteKey { + return voteKey{ + ProposerID: vote.ProposerID, + Type: vote.Type, + BlockHash: vote.BlockHash, + Period: vote.Period, + Position: vote.Position, + } +} + +type cache struct { + lock sync.RWMutex + blockCache map[coreCommon.Hash]*coreTypes.Block + voteCache map[coreTypes.Position]map[voteKey]*coreTypes.Vote + votePosition []coreTypes.Position + voteSize int + size int +} + +func newCache(size int) *cache { + return &cache{ + blockCache: make(map[coreCommon.Hash]*coreTypes.Block), + voteCache: make(map[coreTypes.Position]map[voteKey]*coreTypes.Vote), + size: size, + } +} + +func (c *cache) addVote(vote *coreTypes.Vote) { + c.lock.Lock() + defer c.lock.Unlock() + if c.voteSize >= c.size { + pos := c.votePosition[0] + c.voteSize -= len(c.voteCache[pos]) + delete(c.voteCache, pos) + } + if _, exist := c.voteCache[vote.Position]; !exist { + c.votePosition = append(c.votePosition, vote.Position) + c.voteCache[vote.Position] = make(map[voteKey]*coreTypes.Vote) + } + key := voteToKey(vote) + if _, exist := c.voteCache[vote.Position][key]; exist { + return + } + c.voteCache[vote.Position][key] = vote + c.voteSize++ +} + +func (c *cache) votes(pos coreTypes.Position) []*coreTypes.Vote { + c.lock.RLock() + defer c.lock.RUnlock() + votes := make([]*coreTypes.Vote, 0, len(c.voteCache[pos])) + for _, vote := range c.voteCache[pos] { + votes = append(votes, vote) + } + return votes +} + +func (c *cache) addBlock(block *coreTypes.Block) { + c.lock.Lock() + defer c.lock.Unlock() + if len(c.blockCache) >= c.size { + // Randomly delete one entry. + for k := range c.blockCache { + delete(c.blockCache, k) + break + } + } + c.blockCache[block.Hash] = block +} + +func (c *cache) blocks(hashes coreCommon.Hashes) []*coreTypes.Block { + c.lock.RLock() + defer c.lock.RUnlock() + cacheBlocks := make([]*coreTypes.Block, 0, len(hashes)) + for _, hash := range hashes { + if block, exist := c.blockCache[hash]; exist { + cacheBlocks = append(cacheBlocks, block) + } + } + return cacheBlocks +} diff --git a/dex/cache_test.go b/dex/cache_test.go new file mode 100644 index 000000000..96e07aa17 --- /dev/null +++ b/dex/cache_test.go @@ -0,0 +1,144 @@ +// Copyright 2018 The dexon-consensus-core Authors +// This file is part of the dexon-consensus-core library. +// +// The dexon-consensus-core library is free software: you can redistribute it +// and/or modify it under the terms of the GNU Lesser General Public License as +// published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The dexon-consensus-core library is distributed in the hope that it will be +// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the dexon-consensus-core library. If not, see +// <http://www.gnu.org/licenses/>. + +package dex + +import ( + "testing" + + coreCommon "github.com/dexon-foundation/dexon-consensus-core/common" + coreTypes "github.com/dexon-foundation/dexon-consensus-core/core/types" +) + +func TestCacheVote(t *testing.T) { + cache := newCache(3) + pos0 := coreTypes.Position{ + Height: uint64(0), + } + pos1 := coreTypes.Position{ + Height: uint64(1), + } + vote1 := &coreTypes.Vote{ + BlockHash: coreCommon.NewRandomHash(), + Position: pos0, + } + vote2 := &coreTypes.Vote{ + BlockHash: coreCommon.NewRandomHash(), + Position: pos0, + } + vote3 := &coreTypes.Vote{ + BlockHash: coreCommon.NewRandomHash(), + Position: pos1, + } + vote4 := &coreTypes.Vote{ + BlockHash: coreCommon.NewRandomHash(), + Position: pos1, + } + cache.addVote(vote1) + cache.addVote(vote2) + cache.addVote(vote3) + + votes := cache.votes(pos0) + if len(votes) != 2 { + t.Errorf("fail to get votes: have %d, want 2", len(votes)) + } + if !votes[0].BlockHash.Equal(vote1.BlockHash) { + t.Errorf("get wrong vote: have %s, want %s", votes[0], vote1) + } + if !votes[1].BlockHash.Equal(vote2.BlockHash) { + t.Errorf("get wrong vote: have %s, want %s", votes[1], vote2) + } + votes = cache.votes(pos1) + if len(votes) != 1 { + t.Errorf("fail to get votes: have %d, want 1", len(votes)) + } + if !votes[0].BlockHash.Equal(vote3.BlockHash) { + t.Errorf("get wrong vote: have %s, want %s", votes[0], vote3) + } + + cache.addVote(vote4) + + votes = cache.votes(pos0) + if len(votes) != 0 { + t.Errorf("fail to get votes: have %d, want 0", len(votes)) + } + votes = cache.votes(pos1) + if len(votes) != 2 { + t.Errorf("fail to get votes: have %d, want 1", len(votes)) + } + if !votes[0].BlockHash.Equal(vote3.BlockHash) { + t.Errorf("get wrong vote: have %s, want %s", votes[0], vote3) + } + if !votes[1].BlockHash.Equal(vote4.BlockHash) { + t.Errorf("get wrong vote: have %s, want %s", votes[1], vote4) + } +} + +func TestCacheBlock(t *testing.T) { + cache := newCache(3) + block1 := &coreTypes.Block{ + Hash: coreCommon.NewRandomHash(), + } + block2 := &coreTypes.Block{ + Hash: coreCommon.NewRandomHash(), + } + block3 := &coreTypes.Block{ + Hash: coreCommon.NewRandomHash(), + } + block4 := &coreTypes.Block{ + Hash: coreCommon.NewRandomHash(), + } + cache.addBlock(block1) + cache.addBlock(block2) + cache.addBlock(block3) + + hashes := coreCommon.Hashes{block1.Hash, block2.Hash, block3.Hash, block4.Hash} + hashMap := map[coreCommon.Hash]struct{}{ + block1.Hash: struct{}{}, + block2.Hash: struct{}{}, + block3.Hash: struct{}{}, + } + blocks := cache.blocks(hashes) + if len(blocks) != 3 { + t.Errorf("fail to get blocks: have %d, want 3", len(blocks)) + } + for _, block := range blocks { + if _, exist := hashMap[block.Hash]; !exist { + t.Errorf("get wrong block: have %s, want %v", block, hashMap) + } + } + + cache.addBlock(block4) + + blocks = cache.blocks(hashes) + hashMap[block4.Hash] = struct{}{} + if len(blocks) != 3 { + t.Errorf("fail to get blocks: have %d, want 3", len(blocks)) + } + hasNewBlock := false + for _, block := range blocks { + if _, exist := hashMap[block.Hash]; !exist { + t.Errorf("get wrong block: have %s, want %v", block, hashMap) + } + if block.Hash.Equal(block4.Hash) { + hasNewBlock = true + } + } + if !hasNewBlock { + t.Errorf("expect block %s in cache, have %v", block4, blocks) + } +} diff --git a/dex/handler.go b/dex/handler.go index be50cb43d..5662e2b84 100644 --- a/dex/handler.go +++ b/dex/handler.go @@ -1,3 +1,20 @@ +// Copyright 2018 The dexon-consensus-core Authors +// This file is part of the dexon-consensus-core library. +// +// The dexon-consensus-core library is free software: you can redistribute it +// and/or modify it under the terms of the GNU Lesser General Public License as +// published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The dexon-consensus-core library is distributed in the hope that it will be +// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the dexon-consensus-core library. If not, see +// <http://www.gnu.org/licenses/>. + // Copyright 2015 The go-ethereum Authors // This file is part of the go-ethereum library. // @@ -26,6 +43,7 @@ import ( "sync/atomic" "time" + coreCommon "github.com/dexon-foundation/dexon-consensus-core/common" coreCrypto "github.com/dexon-foundation/dexon-consensus-core/core/crypto" coreTypes "github.com/dexon-foundation/dexon-consensus-core/core/types" dkgTypes "github.com/dexon-foundation/dexon-consensus-core/core/types/dkg" @@ -55,6 +73,8 @@ const ( txChanSize = 4096 metaChanSize = 10240 + + maxPullPeers = 3 ) // errIncompatibleConfig is returned if the requested protocols and configs are @@ -76,6 +96,7 @@ type ProtocolManager struct { gov governance blockchain *core.BlockChain chainconfig *params.ChainConfig + cache *cache maxPeers int downloader *downloader.Downloader @@ -128,6 +149,7 @@ func NewProtocolManager( nodeTable: tab, gov: gov, blockchain: blockchain, + cache: newCache(128), chainconfig: config, newPeerCh: make(chan *peer), noMorePeers: make(chan struct{}), @@ -714,12 +736,16 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { if err := msg.Decode(&block); err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) } + pm.cache.addBlock(&block) pm.receiveCh <- &block case msg.Code == VoteMsg: var vote coreTypes.Vote if err := msg.Decode(&vote); err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) } + if vote.Type >= coreTypes.VotePreCom { + pm.cache.addVote(&vote) + } pm.receiveCh <- &vote case msg.Code == AgreementMsg: // DKG set is receiver @@ -749,6 +775,30 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { return errResp(ErrDecode, "msg %v: %v", msg, err) } pm.receiveCh <- &psig + case msg.Code == PullBlocksMsg: + var hashes coreCommon.Hashes + if err := msg.Decode(&hashes); err != nil { + return errResp(ErrDecode, "msg %v: %v", msg, err) + } + blocks := pm.cache.blocks(hashes) + log.Debug("Push blocks", "blocks", blocks) + for _, block := range blocks { + if err := p.SendLatticeBlock(block); err != nil { + return err + } + } + case msg.Code == PullVotesMsg: + var pos coreTypes.Position + if err := msg.Decode(&pos); err != nil { + return errResp(ErrDecode, "msg %v: %v", msg, err) + } + votes := pm.cache.votes(pos) + log.Debug("Push votes", "votes", votes) + for _, vote := range votes { + if err := p.SendVote(vote); err != nil { + return err + } + } default: return errResp(ErrInvalidMsgCode, "%v", msg.Code) } @@ -827,6 +877,7 @@ func (pm *ProtocolManager) BroadcastMetas(metas []*NodeMeta) { // TODO(sonic): block size is big, try not to send to all peers // to reduce traffic func (pm *ProtocolManager) BroadcastLatticeBlock(block *coreTypes.Block) { + pm.cache.addBlock(block) for _, peer := range pm.peers.PeersWithoutLatticeBlock(rlpHash(block)) { peer.AsyncSendLatticeBlock(block) } @@ -834,6 +885,9 @@ func (pm *ProtocolManager) BroadcastLatticeBlock(block *coreTypes.Block) { // BroadcastVote broadcasts the given vote to all peers in same notary set func (pm *ProtocolManager) BroadcastVote(vote *coreTypes.Vote) { + if vote.Type >= coreTypes.VotePreCom { + pm.cache.addVote(vote) + } label := peerLabel{ set: notaryset, chainID: vote.Position.ChainID, @@ -916,6 +970,32 @@ func (pm *ProtocolManager) BroadcastDKGPartialSignature( } } +func (pm *ProtocolManager) BroadcastPullBlocks( + hashes coreCommon.Hashes) { + // TODO(jimmy-dexon): pull from notary set only. + for idx, peer := range pm.peers.Peers() { + if idx >= maxPullPeers { + break + } + peer.AsyncSendPullBlocks(hashes) + } +} + +func (pm *ProtocolManager) BroadcastPullVotes( + pos coreTypes.Position) { + label := peerLabel{ + set: notaryset, + chainID: pos.ChainID, + round: pos.Round, + } + for idx, peer := range pm.peers.PeersWithLabel(label) { + if idx >= maxPullPeers { + break + } + peer.AsyncSendPullVotes(pos) + } +} + // Mined broadcast loop func (pm *ProtocolManager) minedBroadcastLoop() { // automatically stops if unsubscribe diff --git a/dex/network.go b/dex/network.go index 58d6fd855..4f5c5ac5c 100644 --- a/dex/network.go +++ b/dex/network.go @@ -34,10 +34,12 @@ func NewDexconNetwork(pm *ProtocolManager) *DexconNetwork { // PullBlocks tries to pull blocks from the DEXON network. func (n *DexconNetwork) PullBlocks(hashes coreCommon.Hashes) { + n.pm.BroadcastPullBlocks(hashes) } // PullVotes tries to pull votes from the DEXON network. func (n *DexconNetwork) PullVotes(pos types.Position) { + n.pm.BroadcastPullVotes(pos) } // BroadcastVote broadcasts vote to all nodes in DEXON network. diff --git a/dex/peer.go b/dex/peer.go index c005cec16..2fe8cac08 100644 --- a/dex/peer.go +++ b/dex/peer.go @@ -24,6 +24,7 @@ import ( "time" mapset "github.com/deckarep/golang-set" + coreCommon "github.com/dexon-foundation/dexon-consensus-core/common" coreTypes "github.com/dexon-foundation/dexon-consensus-core/core/types" dkgTypes "github.com/dexon-foundation/dexon-consensus-core/core/types/dkg" @@ -77,6 +78,8 @@ const ( maxQueuedRandomnesses = 16 maxQueuedDKGPrivateShare = 16 maxQueuedDKGParitialSignature = 16 + maxQueuedPullBlocks = 128 + maxQueuedPullVotes = 128 handshakeTimeout = 5 * time.Second @@ -141,6 +144,8 @@ type peer struct { queuedRandomnesses chan *coreTypes.BlockRandomnessResult queuedDKGPrivateShares chan *dkgTypes.PrivateShare queuedDKGPartialSignatures chan *dkgTypes.PartialSignature + queuedPullBlocks chan coreCommon.Hashes + queuedPullVotes chan coreTypes.Position term chan struct{} // Termination channel to stop the broadcaster } @@ -169,7 +174,9 @@ func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { queuedRandomnesses: make(chan *coreTypes.BlockRandomnessResult, maxQueuedRandomnesses), queuedDKGPrivateShares: make(chan *dkgTypes.PrivateShare, maxQueuedDKGPrivateShare), queuedDKGPartialSignatures: make(chan *dkgTypes.PartialSignature, maxQueuedDKGParitialSignature), - term: make(chan struct{}), + queuedPullBlocks: make(chan coreCommon.Hashes, maxQueuedPullBlocks), + queuedPullVotes: make(chan coreTypes.Position, maxQueuedPullVotes), + term: make(chan struct{}), } } @@ -232,6 +239,16 @@ func (p *peer) broadcast() { return } p.Log().Trace("Broadcast DKG partial signature") + case hashes := <-p.queuedPullBlocks: + if err := p.SendPullBlocks(hashes); err != nil { + return + } + p.Log().Trace("Pulling Blocks", "hashes", hashes) + case pos := <-p.queuedPullVotes: + if err := p.SendPullVotes(pos); err != nil { + return + } + p.Log().Trace("Pulling Votes", "position", pos) case <-p.term: return } @@ -472,6 +489,30 @@ func (p *peer) AsyncSendDKGPartialSignature(psig *dkgTypes.PartialSignature) { } } +func (p *peer) SendPullBlocks(hashes coreCommon.Hashes) error { + return p2p.Send(p.rw, PullBlocksMsg, hashes) +} + +func (p *peer) AsyncSendPullBlocks(hashes coreCommon.Hashes) { + select { + case p.queuedPullBlocks <- hashes: + default: + p.Log().Debug("Dropping Pull Blocks") + } +} + +func (p *peer) SendPullVotes(pos coreTypes.Position) error { + return p2p.Send(p.rw, PullVotesMsg, pos) +} + +func (p *peer) AsyncSendPullVotes(pos coreTypes.Position) { + select { + case p.queuedPullVotes <- pos: + default: + p.Log().Debug("Dropping Pull Votes") + } +} + // SendBlockHeaders sends a batch of block headers to the remote peer. func (p *peer) SendBlockHeaders(headers []*types.Header) error { return p2p.Send(p.rw, BlockHeadersMsg, headers) @@ -693,6 +734,18 @@ func (ps *peerSet) Len() int { return len(ps.peers) } +// Peers retrieves all of the peers. +func (ps *peerSet) Peers() []*peer { + ps.lock.RLock() + defer ps.lock.RUnlock() + + list := make([]*peer, 0, len(ps.peers)) + for _, p := range ps.peers { + list = append(list, p) + } + return list +} + // PeersWithoutBlock retrieves a list of peers that do not have a given block in // their set of known hashes. func (ps *peerSet) PeersWithoutBlock(hash common.Hash) []*peer { diff --git a/dex/protocol.go b/dex/protocol.go index 6e531c7af..f33179b69 100644 --- a/dex/protocol.go +++ b/dex/protocol.go @@ -1,3 +1,20 @@ +// Copyright 2018 The dexon-consensus-core Authors +// This file is part of the dexon-consensus-core library. +// +// The dexon-consensus-core library is free software: you can redistribute it +// and/or modify it under the terms of the GNU Lesser General Public License as +// published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The dexon-consensus-core library is distributed in the hope that it will be +// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the dexon-consensus-core library. If not, see +// <http://www.gnu.org/licenses/>. + // Copyright 2014 The go-ethereum Authors // This file is part of the go-ethereum library. // @@ -43,7 +60,7 @@ var ProtocolName = "dex" var ProtocolVersions = []uint{dex64} // ProtocolLengths are the number of implemented message corresponding to different protocol versions. -var ProtocolLengths = []uint64{38} +var ProtocolLengths = []uint64{40} const ProtocolMaxMsgSize = 10 * 1024 * 1024 // Maximum cap on the size of a protocol message @@ -74,6 +91,8 @@ const ( RandomnessMsg = 0x23 DKGPrivateShareMsg = 0x24 DKGPartialSignatureMsg = 0x25 + PullBlocksMsg = 0x26 + PullVotesMsg = 0x27 ) type errCode int |