From 0080d104ecb56bbf658c9d3fea231ba3da669529 Mon Sep 17 00:00:00 2001 From: Wei-Ning Huang Date: Fri, 12 Apr 2019 18:14:18 +0800 Subject: dex: implement bad peer detect and disconnect mechanism (#360) --- dex/handler.go | 53 +++++++++++-- dex/network.go | 7 +- dex/protocol_test.go | 8 +- .../dexon-consensus/core/agreement-mgr.go | 67 +++++++++++++--- .../dexon-consensus/core/agreement.go | 15 ++-- .../dexon-consensus/core/blockchain.go | 3 - .../dexon-consensus/core/consensus.go | 50 +++++++++--- .../dexon-consensus/core/interfaces.go | 5 +- .../dexon-consensus/core/leader-selector.go | 20 +---- .../dexon-consensus/core/syncer/consensus.go | 10 +-- .../dexon-consensus/core/types/message.go | 24 ++++++ .../dexon-consensus/core/utils/crypto.go | 23 ++++-- .../dexon-consensus/core/utils/signer.go | 23 +++++- .../dexon-consensus/core/utils/utils.go | 2 +- .../dexon-consensus/core/utils/vote-filter.go | 6 +- vendor/vendor.json | 92 +++++++++++----------- 16 files changed, 279 insertions(+), 129 deletions(-) create mode 100644 vendor/github.com/dexon-foundation/dexon-consensus/core/types/message.go diff --git a/dex/handler.go b/dex/handler.go index 398a8443a..8e5898817 100644 --- a/dex/handler.go +++ b/dex/handler.go @@ -134,7 +134,8 @@ type ProtocolManager struct { chainHeadSub event.Subscription // channels for dexon consensus core - receiveCh chan interface{} + receiveCh chan coreTypes.Msg + reportBadPeerChan chan interface{} receiveCoreMessage int32 srvr p2pServer @@ -176,7 +177,8 @@ func NewProtocolManager( noMorePeers: make(chan struct{}), txsyncCh: make(chan *txsync), quitSync: make(chan struct{}), - receiveCh: make(chan interface{}, 1024), + receiveCh: make(chan coreTypes.Msg, 1024), + reportBadPeerChan: make(chan interface{}, 128), receiveCoreMessage: 0, isBlockProposer: isBlockProposer, app: app, @@ -299,6 +301,9 @@ func (pm *ProtocolManager) Start(srvr p2pServer, maxPeers int) { // start sync handlers go pm.syncer() go pm.txsyncLoop() + + // Listen to bad peer and disconnect it. + go pm.badPeerWatchLoop() } func (pm *ProtocolManager) Stop() { @@ -330,10 +335,26 @@ func (pm *ProtocolManager) Stop() { log.Info("DEXON protocol stopped") } -func (pm *ProtocolManager) ReceiveChan() <-chan interface{} { +func (pm *ProtocolManager) ReceiveChan() <-chan coreTypes.Msg { return pm.receiveCh } +func (pm *ProtocolManager) ReportBadPeerChan() chan<- interface{} { + return pm.reportBadPeerChan +} + +func (pm *ProtocolManager) badPeerWatchLoop() { + for { + select { + case id := <-pm.reportBadPeerChan: + log.Debug("Bad peer detected, removing", "id", id.(string)) + pm.removePeer(id.(string)) + case <-pm.quitSync: + return + } + } +} + func (pm *ProtocolManager) newPeer(pv int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { return newPeer(pv, p, newMeteredMsgWriter(rw)) } @@ -382,6 +403,7 @@ func (pm *ProtocolManager) handle(p *peer) error { return err } } + // Handle incoming messages until the connection is torn down for { if err := pm.handleMsg(p); err != nil { @@ -832,7 +854,10 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } pm.cache.addBlocks(blocks) for _, block := range blocks { - pm.receiveCh <- block + pm.receiveCh <- coreTypes.Msg{ + PeerID: p.ID().String(), + Payload: block, + } } case msg.Code == VoteMsg: if atomic.LoadInt32(&pm.receiveCoreMessage) == 0 { @@ -846,7 +871,10 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { if vote.Type >= coreTypes.VotePreCom { pm.cache.addVote(vote) } - pm.receiveCh <- vote + pm.receiveCh <- coreTypes.Msg{ + PeerID: p.ID().String(), + Payload: vote, + } } case msg.Code == AgreementMsg: if atomic.LoadInt32(&pm.receiveCoreMessage) == 0 { @@ -864,7 +892,10 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { block[0].Randomness = agreement.Randomness pm.cache.addFinalizedBlock(block[0]) } - pm.receiveCh <- &agreement + pm.receiveCh <- coreTypes.Msg{ + PeerID: p.ID().String(), + Payload: &agreement, + } case msg.Code == DKGPrivateShareMsg: if atomic.LoadInt32(&pm.receiveCoreMessage) == 0 { break @@ -875,7 +906,10 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { return errResp(ErrDecode, "msg %v: %v", msg, err) } p.MarkDKGPrivateShares(rlpHash(ps)) - pm.receiveCh <- &ps + pm.receiveCh <- coreTypes.Msg{ + PeerID: p.ID().String(), + Payload: &ps, + } case msg.Code == DKGPartialSignatureMsg: if atomic.LoadInt32(&pm.receiveCoreMessage) == 0 { break @@ -885,7 +919,10 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { if err := msg.Decode(&psig); err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) } - pm.receiveCh <- &psig + pm.receiveCh <- coreTypes.Msg{ + PeerID: p.ID().String(), + Payload: &psig, + } case msg.Code == PullBlocksMsg: if atomic.LoadInt32(&pm.receiveCoreMessage) == 0 { break diff --git a/dex/network.go b/dex/network.go index 0e2d338c1..ab676a349 100644 --- a/dex/network.go +++ b/dex/network.go @@ -84,6 +84,11 @@ func (n *DexconNetwork) BroadcastAgreementResult(result *types.AgreementResult) } // ReceiveChan returns a channel to receive messages from DEXON network. -func (n *DexconNetwork) ReceiveChan() <-chan interface{} { +func (n *DexconNetwork) ReceiveChan() <-chan types.Msg { return n.pm.ReceiveChan() } + +// ReportBadPeerChan returns a channel to receive messages from DEXON network. +func (n *DexconNetwork) ReportBadPeerChan() chan<- interface{} { + return n.pm.ReportBadPeerChan() +} diff --git a/dex/protocol_test.go b/dex/protocol_test.go index 3ed93c061..81550d918 100644 --- a/dex/protocol_test.go +++ b/dex/protocol_test.go @@ -271,7 +271,7 @@ func TestRecvCoreBlocks(t *testing.T) { ch := pm.ReceiveChan() select { case msg := <-ch: - rb := msg.(*coreTypes.Block) + rb := msg.Payload.(*coreTypes.Block) if !reflect.DeepEqual(rb, &block) { t.Errorf("block mismatch") } @@ -416,7 +416,7 @@ func TestRecvVotes(t *testing.T) { select { case msg := <-ch: - rvote := msg.(*coreTypes.Vote) + rvote := msg.Payload.(*coreTypes.Vote) if !reflect.DeepEqual(rvote, &vote) { t.Errorf("vote mismatch") } @@ -557,7 +557,7 @@ func TestRecvDKGPrivateShare(t *testing.T) { ch := pm.ReceiveChan() select { case msg := <-ch: - rps := msg.(*dkgTypes.PrivateShare) + rps := msg.Payload.(*dkgTypes.PrivateShare) if !reflect.DeepEqual(rps, &privateShare) { t.Errorf("vote mismatch") } @@ -657,7 +657,7 @@ func TestRecvAgreement(t *testing.T) { ch := pm.ReceiveChan() select { case msg := <-ch: - a := msg.(*coreTypes.AgreementResult) + a := msg.Payload.(*coreTypes.AgreementResult) if !reflect.DeepEqual(a, &agreement) { t.Errorf("agreement mismatch") } diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-mgr.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-mgr.go index af0adf259..4597fe92b 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-mgr.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-mgr.go @@ -24,6 +24,8 @@ import ( "sync" "time" + lru "github.com/hashicorp/golang-lru" + "github.com/dexon-foundation/dexon-consensus/common" "github.com/dexon-foundation/dexon-consensus/core/types" typesDKG "github.com/dexon-foundation/dexon-consensus/core/types/dkg" @@ -36,17 +38,34 @@ var ( ErrRoundOutOfRange = errors.New("round out of range") ErrInvalidBlock = errors.New("invalid block") ErrNoValidLeader = errors.New("no valid leader") + ErrIncorrectCRSSignature = errors.New("incorrect CRS signature") + ErrBlockTooOld = errors.New("block too old") ) const maxResultCache = 100 +const settingLimit = 3 // genValidLeader generate a validLeader function for agreement modules. func genValidLeader( - mgr *agreementMgr) func(*types.Block) (bool, error) { - return func(block *types.Block) (bool, error) { + mgr *agreementMgr) validLeaderFn { + return func(block *types.Block, crs common.Hash) (bool, error) { if block.Timestamp.After(time.Now()) { return false, nil } + if block.Position.Round >= DKGDelayRound { + if mgr.recv.npks == nil { + return false, nil + } + if block.Position.Round > mgr.recv.npks.Round { + return false, nil + } + if block.Position.Round < mgr.recv.npks.Round { + return false, ErrBlockTooOld + } + } + if !utils.VerifyCRSSignature(block, crs, mgr.recv.npks) { + return false, ErrIncorrectCRSSignature + } if err := mgr.bcModule.sanityCheck(block); err != nil { if err == ErrRetrySanityCheckLater { return false, nil @@ -114,12 +133,15 @@ type agreementMgr struct { recv *consensusBAReceiver processedBAResult map[types.Position]struct{} voteFilter *utils.VoteFilter + settingCache *lru.Cache + curRoundSetting *baRoundSetting waitGroup sync.WaitGroup isRunning bool lock sync.RWMutex } func newAgreementMgr(con *Consensus) (mgr *agreementMgr, err error) { + settingCache, _ := lru.New(settingLimit) mgr = &agreementMgr{ con: con, ID: con.ID, @@ -133,6 +155,7 @@ func newAgreementMgr(con *Consensus) (mgr *agreementMgr, err error) { ctx: con.ctx, processedBAResult: make(map[types.Position]struct{}, maxResultCache), voteFilter: utils.NewVoteFilter(), + settingCache: settingCache, } mgr.recv = &consensusBAReceiver{ consensus: con, @@ -149,21 +172,18 @@ func (mgr *agreementMgr) prepare() { newLeaderSelector(genValidLeader(mgr), mgr.logger), mgr.signer, mgr.logger) - nodes, err := mgr.cache.GetNodeSet(round) - if err != nil { + setting := mgr.generateSetting(round) + if setting == nil { + mgr.logger.Warn("Unable to prepare init setting", "round", round) return } - agr.notarySet = nodes.GetSubSet( - int(mgr.config(round).notarySetSize), - types.NewNotarySetTarget(mgr.config(round).crs)) + mgr.curRoundSetting = setting + agr.notarySet = mgr.curRoundSetting.dkgSet // Hacky way to make agreement module self contained. mgr.recv.agreementModule = agr mgr.baModule = agr if round >= DKGDelayRound { - setting := mgr.generateSetting(round) - if setting == nil { - mgr.logger.Warn("Unable to prepare init setting", "round", round) - } else if _, exist := setting.dkgSet[mgr.ID]; exist { + if _, exist := setting.dkgSet[mgr.ID]; exist { mgr.logger.Debug("Preparing signer and npks.", "round", round) npk, signer, err := mgr.con.cfgModule.getDKGInfo(round, false) if err != nil { @@ -252,9 +272,25 @@ func (mgr *agreementMgr) notifyRoundEvents(evts []utils.RoundEventParam) error { } func (mgr *agreementMgr) processVote(v *types.Vote) (err error) { + if !mgr.recv.isNotary { + return nil + } if mgr.voteFilter.Filter(v) { return nil } + if v.Position.Round == mgr.curRoundSetting.round { + if _, exist := mgr.curRoundSetting.dkgSet[v.ProposerID]; !exist { + return ErrNotInNotarySet + } + } else if v.Position.Round == mgr.curRoundSetting.round+1 { + setting := mgr.generateSetting(v.Position.Round) + if setting == nil { + return ErrConfigurationNotReady + } + if _, exist := setting.dkgSet[v.ProposerID]; !exist { + return ErrNotInNotarySet + } + } if err = mgr.baModule.processVote(v); err == nil { mgr.baModule.updateFilter(mgr.voteFilter) mgr.voteFilter.AddVote(v) @@ -323,6 +359,7 @@ func (mgr *agreementMgr) processAgreementResult( result.Position.Round) return ErrConfigurationNotReady } + mgr.curRoundSetting = setting leader, err := mgr.calcLeader(setting.dkgSet, setting.crs, result.Position) if err != nil { return err @@ -358,6 +395,9 @@ func (mgr *agreementMgr) stop() { } func (mgr *agreementMgr) generateSetting(round uint64) *baRoundSetting { + if setting, exist := mgr.settingCache.Get(round); exist { + return setting.(*baRoundSetting) + } curConfig := mgr.config(round) if curConfig == nil { return nil @@ -383,13 +423,15 @@ func (mgr *agreementMgr) generateSetting(round uint64) *baRoundSetting { return nil } } - return &baRoundSetting{ + setting := &baRoundSetting{ crs: curConfig.crs, dkgSet: dkgSet, round: round, threshold: utils.GetBAThreshold(&types.Config{ NotarySetSize: curConfig.notarySetSize}), } + mgr.settingCache.Add(round, setting) + return setting } func (mgr *agreementMgr) runBA(initRound uint64) { @@ -449,6 +491,7 @@ Loop: } mgr.recv.isNotary = checkRound() mgr.voteFilter = utils.NewVoteFilter() + mgr.voteFilter.Position.Round = currentRound mgr.recv.emptyBlockHashMap = &sync.Map{} if currentRound >= DKGDelayRound && mgr.recv.isNotary { var err error diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement.go index b122a4ddf..cb467719d 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement.go @@ -361,9 +361,6 @@ func (a *agreement) sanityCheck(vote *types.Vote) error { if vote.Type >= types.MaxVoteType { return ErrInvalidVote } - if _, exist := a.notarySet[vote.ProposerID]; !exist { - return ErrNotInNotarySet - } ok, err := utils.VerifyVoteSignature(vote) if err != nil { return err @@ -371,6 +368,10 @@ func (a *agreement) sanityCheck(vote *types.Vote) error { if !ok { return ErrIncorrectVoteSignature } + if vote.Position.Round != a.agreementID().Round { + // TODO(jimmy): maybe we can verify partial signature at agreement-mgr. + return nil + } if !a.data.recv.VerifyPartialSignature(vote) { return ErrIncorrectVotePartialSignature } @@ -412,7 +413,7 @@ func (a *agreement) updateFilter(filter *utils.VoteFilter) { filter.Confirm = a.hasOutput filter.LockIter = a.data.lockIter filter.Period = a.data.period - filter.Height = a.agreementID().Height + filter.Position.Height = a.agreementID().Height } // processVote is the entry point for processing Vote. @@ -426,8 +427,8 @@ func (a *agreement) processVote(vote *types.Vote) error { // Agreement module has stopped. if isStop(aID) { - // Hacky way to not drop first votes for genesis height. - if vote.Position.Height == types.GenesisHeight { + // Hacky way to not drop first votes when round just begins. + if vote.Position.Round == aID.Round { a.pendingVote = append(a.pendingVote, pendingVote{ vote: vote, receivedTime: time.Now().UTC(), @@ -690,7 +691,7 @@ func (a *agreement) processBlock(block *types.Block) error { if !exist { return true } - ok, err := a.data.leader.validLeader(block) + ok, err := a.data.leader.validLeader(block, a.data.leader.hashCRS) if err != nil { fmt.Println("Error checking validLeader for Fast BA", "error", err, "block", block) diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/blockchain.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/blockchain.go index 9fbb86162..1efd867e7 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/blockchain.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/blockchain.go @@ -204,9 +204,6 @@ func (bc *blockChain) notifyRoundEvents(evts []utils.RoundEventParam) error { } bc.configs = append(bc.configs, c) } - if e.Reset != 0 { - bc.vGetter.Purge(e.Round + 1) - } return nil } for _, e := range evts { diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go index 966c70aaa..ec15bf32d 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go @@ -520,7 +520,7 @@ type Consensus struct { roundEvent *utils.RoundEvent logger common.Logger resetDeliveryGuardTicker chan struct{} - msgChan chan interface{} + msgChan chan types.Msg priorityMsgChan chan interface{} waitGroup sync.WaitGroup processBlockChan chan *types.Block @@ -528,7 +528,7 @@ type Consensus struct { // Context of Dummy receiver during switching from syncer. dummyCancel context.CancelFunc dummyFinished <-chan struct{} - dummyMsgBuffer []interface{} + dummyMsgBuffer []types.Msg } // NewConsensus construct an Consensus instance. @@ -577,7 +577,7 @@ func NewConsensusFromSyncer( networkModule Network, prv crypto.PrivateKey, confirmedBlocks []*types.Block, - cachedMessages []interface{}, + cachedMessages []types.Msg, logger common.Logger) (*Consensus, error) { // Setup Consensus instance. con := newConsensusForRound(initBlock, dMoment, app, gov, db, @@ -585,7 +585,7 @@ func NewConsensusFromSyncer( // Launch a dummy receiver before we start receiving from network module. con.dummyMsgBuffer = cachedMessages con.dummyCancel, con.dummyFinished = utils.LaunchDummyReceiver( - con.ctx, networkModule.ReceiveChan(), func(msg interface{}) { + con.ctx, networkModule.ReceiveChan(), func(msg types.Msg) { con.dummyMsgBuffer = append(con.dummyMsgBuffer, msg) }) // Dump all BA-confirmed blocks to the consensus instance, make sure these @@ -656,6 +656,14 @@ func newConsensusForRound( } cfgModule := newConfigurationChain(ID, recv, gov, nodeSetCache, db, logger) recv.cfgModule = cfgModule + signer.SetBLSSigner( + func(round uint64, hash common.Hash) (crypto.Signature, error) { + _, signer, err := cfgModule.getDKGInfo(round, false) + if err != nil { + return crypto.Signature{}, err + } + return crypto.Signature(signer.sign(hash)), nil + }) appModule := app if usingNonBlocking { appModule = newNonBlocking(app, debugApp) @@ -682,7 +690,7 @@ func newConsensusForRound( event: common.NewEvent(), logger: logger, resetDeliveryGuardTicker: make(chan struct{}), - msgChan: make(chan interface{}, 1024), + msgChan: make(chan types.Msg, 1024), priorityMsgChan: make(chan interface{}, 1024), processBlockChan: make(chan *types.Block, 1024), } @@ -1226,14 +1234,15 @@ MessageLoop: return default: } - var msg interface{} + var msg, peer interface{} select { case msg = <-con.priorityMsgChan: default: } if msg == nil { select { - case msg = <-con.msgChan: + case message := <-con.msgChan: + msg, peer = message.Payload, message.PeerID case msg = <-con.priorityMsgChan: case <-con.ctx.Done(): return @@ -1252,42 +1261,53 @@ MessageLoop: if val.IsEmpty() { hash, err := utils.HashBlock(val) if err != nil { - con.logger.Error("error verifying empty block hash", + con.logger.Error("Error verifying empty block hash", "block", val, "error, err") + con.network.ReportBadPeerChan() <- peer continue MessageLoop } if hash != val.Hash { - con.logger.Error("incorrect confirmed empty block hash", + con.logger.Error("Incorrect confirmed empty block hash", "block", val, "hash", hash) + con.network.ReportBadPeerChan() <- peer continue MessageLoop } if _, err := con.bcModule.proposeBlock( val.Position, time.Time{}, true); err != nil { - con.logger.Error("error adding empty block", + con.logger.Error("Error adding empty block", "block", val, "error", err) + con.network.ReportBadPeerChan() <- peer continue MessageLoop } } else { + if !val.IsFinalized() { + con.logger.Warn("Ignore not finalized block", + "block", val) + continue MessageLoop + } ok, err := con.bcModule.verifyRandomness( val.Hash, val.Position.Round, val.Randomness) if err != nil { - con.logger.Error("error verifying confirmed block randomness", + con.logger.Error("Error verifying confirmed block randomness", "block", val, "error", err) + con.network.ReportBadPeerChan() <- peer continue MessageLoop } if !ok { - con.logger.Error("incorrect confirmed block randomness", + con.logger.Error("Incorrect confirmed block randomness", "block", val) + con.network.ReportBadPeerChan() <- peer continue MessageLoop } if err := utils.VerifyBlockSignature(val); err != nil { con.logger.Error("VerifyBlockSignature failed", "block", val, "error", err) + con.network.ReportBadPeerChan() <- peer continue MessageLoop } } @@ -1306,12 +1326,14 @@ MessageLoop: con.logger.Error("Failed to process finalized block", "block", val, "error", err) + con.network.ReportBadPeerChan() <- peer } } else { if err := con.preProcessBlock(val); err != nil { con.logger.Error("Failed to pre process block", "block", val, "error", err) + con.network.ReportBadPeerChan() <- peer } } case *types.Vote: @@ -1319,23 +1341,27 @@ MessageLoop: con.logger.Error("Failed to process vote", "vote", val, "error", err) + con.network.ReportBadPeerChan() <- peer } case *types.AgreementResult: if err := con.ProcessAgreementResult(val); err != nil { con.logger.Error("Failed to process agreement result", "result", val, "error", err) + con.network.ReportBadPeerChan() <- peer } case *typesDKG.PrivateShare: if err := con.cfgModule.processPrivateShare(val); err != nil { con.logger.Error("Failed to process private share", "error", err) + con.network.ReportBadPeerChan() <- peer } case *typesDKG.PartialSignature: if err := con.cfgModule.processPartialSignature(val); err != nil { con.logger.Error("Failed to process partial signature", "error", err) + con.network.ReportBadPeerChan() <- peer } } } diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/interfaces.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/interfaces.go index f76ee1975..c16c624e3 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/interfaces.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/interfaces.go @@ -83,7 +83,10 @@ type Network interface { BroadcastDKGPartialSignature(psig *typesDKG.PartialSignature) // ReceiveChan returns a channel to receive messages from DEXON network. - ReceiveChan() <-chan interface{} + ReceiveChan() <-chan types.Msg + + // ReportBadPeerChan returns a channel to report bad peer. + ReportBadPeerChan() chan<- interface{} } // Governance interface specifies interface to control the governance contract. diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/leader-selector.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/leader-selector.go index 8c063286e..91b2e9979 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/leader-selector.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/leader-selector.go @@ -18,22 +18,15 @@ package core import ( - "fmt" "math/big" "sync" "github.com/dexon-foundation/dexon-consensus/common" "github.com/dexon-foundation/dexon-consensus/core/crypto" "github.com/dexon-foundation/dexon-consensus/core/types" - "github.com/dexon-foundation/dexon-consensus/core/utils" ) -// Errors for leader module. -var ( - ErrIncorrectCRSSignature = fmt.Errorf("incorrect CRS signature") -) - -type validLeaderFn func(*types.Block) (bool, error) +type validLeaderFn func(block *types.Block, crs common.Hash) (bool, error) // Some constant value. var ( @@ -105,7 +98,7 @@ func (l *leaderSelector) leaderBlockHash() common.Hash { if !ok { continue } - ok, err := l.validLeader(b) + ok, err := l.validLeader(b, l.hashCRS) if err != nil { l.logger.Error("Error checking validLeader", "error", err, "block", b) delete(l.pendingBlocks, b.Hash) @@ -120,20 +113,13 @@ func (l *leaderSelector) leaderBlockHash() common.Hash { } func (l *leaderSelector) processBlock(block *types.Block) error { - ok, err := utils.VerifyCRSSignature(block, l.hashCRS) - if err != nil { - return err - } - if !ok { - return ErrIncorrectCRSSignature - } l.lock.Lock() defer l.lock.Unlock() ok, dist := l.potentialLeader(block) if !ok { return nil } - ok, err = l.validLeader(block) + ok, err := l.validLeader(block, l.hashCRS) if err != nil { return err } diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/consensus.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/consensus.go index 7db836a9a..496c0f9a8 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/consensus.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/consensus.go @@ -80,7 +80,7 @@ type Consensus struct { syncedSkipNext bool dummyCancel context.CancelFunc dummyFinished <-chan struct{} - dummyMsgBuffer []interface{} + dummyMsgBuffer []types.Msg initChainTipHeight uint64 } @@ -297,7 +297,7 @@ func (con *Consensus) ForceSync(lastPos types.Position, skip bool) { if con.dummyCancel == nil { con.dummyCancel, con.dummyFinished = utils.LaunchDummyReceiver( context.Background(), con.network.ReceiveChan(), - func(msg interface{}) { + func(msg types.Msg) { con.dummyMsgBuffer = append(con.dummyMsgBuffer, msg) }) } @@ -448,7 +448,7 @@ func (con *Consensus) stopBuffering() { // need to launch a dummy receiver right away. con.dummyCancel, con.dummyFinished = utils.LaunchDummyReceiver( context.Background(), con.network.ReceiveChan(), - func(msg interface{}) { + func(msg types.Msg) { con.dummyMsgBuffer = append(con.dummyMsgBuffer, msg) }) // Stop agreements. @@ -512,7 +512,7 @@ func (con *Consensus) startNetwork() { for { select { case val := <-con.network.ReceiveChan(): - switch v := val.(type) { + switch v := val.Payload.(type) { case *types.Block: case *types.AgreementResult: // Avoid byzantine nodes attack by broadcasting older @@ -524,7 +524,7 @@ func (con *Consensus) startNetwork() { default: continue loop } - con.agreementModule.inputChan <- val + con.agreementModule.inputChan <- val.Payload case <-con.ctx.Done(): break loop } diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/types/message.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/types/message.go new file mode 100644 index 000000000..0335cfaae --- /dev/null +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/types/message.go @@ -0,0 +1,24 @@ +// Copyright 2019 The dexon-consensus Authors +// This file is part of the dexon-consensus library. +// +// The dexon-consensus library is free software: you can redistribute it +// and/or modify it under the terms of the GNU Lesser General Public License as +// published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The dexon-consensus library is distributed in the hope that it will be +// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the dexon-consensus library. If not, see +// . + +package types + +// Msg for the network ReceiveChan. +type Msg struct { + PeerID interface{} + Payload interface{} +} diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/utils/crypto.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/utils/crypto.go index 7fd3a7776..496944dab 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/utils/crypto.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/utils/crypto.go @@ -18,6 +18,7 @@ package utils import ( + "bytes" "encoding/binary" "github.com/dexon-foundation/dexon-consensus/common" @@ -122,21 +123,27 @@ func VerifyVoteSignature(vote *types.Vote) (bool, error) { func hashCRS(block *types.Block, crs common.Hash) common.Hash { hashPos := HashPosition(block.Position) + if block.Position.Round < dkgDelayRound { + return crypto.Keccak256Hash(crs[:], hashPos[:], block.ProposerID.Hash[:]) + } return crypto.Keccak256Hash(crs[:], hashPos[:]) } // VerifyCRSSignature verifies the CRS signature of types.Block. -func VerifyCRSSignature(block *types.Block, crs common.Hash) ( - bool, error) { +func VerifyCRSSignature( + block *types.Block, crs common.Hash, npks *typesDKG.NodePublicKeys) bool { hash := hashCRS(block, crs) - pubKey, err := crypto.SigToPub(hash, block.CRSSignature) - if err != nil { - return false, err + if block.Position.Round < dkgDelayRound { + return bytes.Compare(block.CRSSignature.Signature[:], hash[:]) == 0 } - if block.ProposerID != types.NewNodeID(pubKey) { - return false, nil + if npks == nil { + return false } - return true, nil + pubKey, exist := npks.PublicKeys[block.ProposerID] + if !exist { + return false + } + return pubKey.VerifySignature(hash, block.CRSSignature) } // HashPosition generates hash of a types.Position. diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/utils/signer.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/utils/signer.go index 7694dab4e..9904410c4 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/utils/signer.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/utils/signer.go @@ -31,13 +31,17 @@ var ( ErrInvalidProposerID = errors.New("invalid proposer id") ErrIncorrectHash = errors.New("hash of block is incorrect") ErrIncorrectSignature = errors.New("signature of block is incorrect") + ErrNoBLSSigner = errors.New("bls signer not set") ) +type blsSigner func(round uint64, hash common.Hash) (crypto.Signature, error) + // Signer signs a segment of data. type Signer struct { prvKey crypto.PrivateKey pubKey crypto.PublicKey proposerID types.NodeID + blsSign blsSigner } // NewSigner constructs an Signer instance. @@ -50,6 +54,11 @@ func NewSigner(prvKey crypto.PrivateKey) (s *Signer) { return } +// SetBLSSigner for signing CRSSignature +func (s *Signer) SetBLSSigner(signer blsSigner) { + s.blsSign = signer +} + // SignBlock signs a types.Block. func (s *Signer) SignBlock(b *types.Block) (err error) { b.ProposerID = s.proposerID @@ -76,7 +85,19 @@ func (s *Signer) SignCRS(b *types.Block, crs common.Hash) (err error) { err = ErrInvalidProposerID return } - b.CRSSignature, err = s.prvKey.Sign(hashCRS(b, crs)) + if b.Position.Round < dkgDelayRound { + hash := hashCRS(b, crs) + b.CRSSignature = crypto.Signature{ + Type: "bls", + Signature: hash[:], + } + return + } + if s.blsSign == nil { + err = ErrNoBLSSigner + return + } + b.CRSSignature, err = s.blsSign(b.Position.Round, hashCRS(b, crs)) return } diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/utils/utils.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/utils/utils.go index 1a372c70c..dc29bdfa5 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/utils/utils.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/utils/utils.go @@ -105,7 +105,7 @@ func VerifyDKGComplaint( // channel of a network module. An context is required to stop the go routine // automatically. An optinal message handler could be provided. func LaunchDummyReceiver( - ctx context.Context, recv <-chan interface{}, handler func(interface{})) ( + ctx context.Context, recv <-chan types.Msg, handler func(types.Msg)) ( context.CancelFunc, <-chan struct{}) { var ( dummyCtx, dummyCancel = context.WithCancel(ctx) diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/utils/vote-filter.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/utils/vote-filter.go index 2fc18bb34..446d88a64 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/utils/vote-filter.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/utils/vote-filter.go @@ -25,7 +25,7 @@ import ( // To maximize performance, this structure is not thread-safe and will never be. type VoteFilter struct { Voted map[types.VoteHeader]struct{} - Height uint64 + Position types.Position LockIter uint64 Period uint64 Confirm bool @@ -43,9 +43,9 @@ func (vf *VoteFilter) Filter(vote *types.Vote) bool { if vote.Type == types.VoteInit { return true } - if vote.Position.Height < vf.Height { + if vote.Position.Older(vf.Position) { return true - } else if vote.Position.Height > vf.Height { + } else if vote.Position.Newer(vf.Position) { // It's impossible to check the vote of other height. return false } diff --git a/vendor/vendor.json b/vendor/vendor.json index f94f0e307..4c14dfdb2 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -141,90 +141,90 @@ { "checksumSHA1": "In6vBHYUsX7DUIGiFN2hQggBgvI=", "path": "github.com/dexon-foundation/dexon-consensus/common", - "revision": "269fed574986331e07bf931b2c9b1a495c40f8ac", - "revisionTime": "2019-04-11T06:42:07Z", - "version": "single-chain", - "versionExact": "single-chain" + "revision": "24c0cecbbf7ba84754ccc02d37c9540ce317976c", + "revisionTime": "2019-04-12T03:35:37Z", + "version": "master", + "versionExact": "master" }, { - "checksumSHA1": "9uNk7orRE3xuTv2m/y6vO/4uDAg=", + "checksumSHA1": "GhwP6CLvYoMcnzMTQjSDhT9qjnk=", "path": "github.com/dexon-foundation/dexon-consensus/core", - "revision": "269fed574986331e07bf931b2c9b1a495c40f8ac", - "revisionTime": "2019-04-11T06:42:07Z", - "version": "single-chain", - "versionExact": "single-chain" + "revision": "24c0cecbbf7ba84754ccc02d37c9540ce317976c", + "revisionTime": "2019-04-12T03:35:37Z", + "version": "master", + "versionExact": "master" }, { "checksumSHA1": "v4fKR7uhoyufi6hAVO44cFEb+tY=", "path": "github.com/dexon-foundation/dexon-consensus/core/blockdb", "revision": "56e872f84131348adbc0861afb3554bba4a8e5db", "revisionTime": "2018-12-05T06:29:54Z", - "version": "single-chain", - "versionExact": "single-chain" + "version": "master", + "versionExact": "master" }, { "checksumSHA1": "tQSbYCu5P00lUhKsx3IbBZCuSLY=", "path": "github.com/dexon-foundation/dexon-consensus/core/crypto", - "revision": "269fed574986331e07bf931b2c9b1a495c40f8ac", - "revisionTime": "2019-04-11T06:42:07Z", - "version": "single-chain", - "versionExact": "single-chain" + "revision": "24c0cecbbf7ba84754ccc02d37c9540ce317976c", + "revisionTime": "2019-04-12T03:35:37Z", + "version": "master", + "versionExact": "master" }, { "checksumSHA1": "4besQaa0rm8jRUAJjpEaLZ/ZOYs=", "path": "github.com/dexon-foundation/dexon-consensus/core/crypto/dkg", - "revision": "269fed574986331e07bf931b2c9b1a495c40f8ac", - "revisionTime": "2019-04-11T06:42:07Z", - "version": "single-chain", - "versionExact": "single-chain" + "revision": "24c0cecbbf7ba84754ccc02d37c9540ce317976c", + "revisionTime": "2019-04-12T03:35:37Z", + "version": "master", + "versionExact": "master" }, { "checksumSHA1": "BhLKK8RveoLaeXc9UyUKMwQqchU=", "path": "github.com/dexon-foundation/dexon-consensus/core/crypto/ecdsa", - "revision": "269fed574986331e07bf931b2c9b1a495c40f8ac", - "revisionTime": "2019-04-11T06:42:07Z", - "version": "single-chain", - "versionExact": "single-chain" + "revision": "24c0cecbbf7ba84754ccc02d37c9540ce317976c", + "revisionTime": "2019-04-12T03:35:37Z", + "version": "master", + "versionExact": "master" }, { "checksumSHA1": "3Ludp/1V4dMBZH/c1oIVjHj0CqY=", "path": "github.com/dexon-foundation/dexon-consensus/core/db", - "revision": "269fed574986331e07bf931b2c9b1a495c40f8ac", - "revisionTime": "2019-04-11T06:42:07Z", - "version": "single-chain", - "versionExact": "single-chain" + "revision": "24c0cecbbf7ba84754ccc02d37c9540ce317976c", + "revisionTime": "2019-04-12T03:35:37Z", + "version": "master", + "versionExact": "master" }, { - "checksumSHA1": "/OcEQKdtWDyRZibazIsAxJWHUyg=", + "checksumSHA1": "sO5twEFTdLvkMuQo+I3vyzm9T3o=", "path": "github.com/dexon-foundation/dexon-consensus/core/syncer", - "revision": "269fed574986331e07bf931b2c9b1a495c40f8ac", - "revisionTime": "2019-04-11T06:42:07Z", - "version": "single-chain", - "versionExact": "single-chain" + "revision": "24c0cecbbf7ba84754ccc02d37c9540ce317976c", + "revisionTime": "2019-04-12T03:35:37Z", + "version": "master", + "versionExact": "master" }, { - "checksumSHA1": "zIgCdN4FJiAuPGMhB+/9YGK/Wgk=", + "checksumSHA1": "0BY+E0E2cM7IHIMqunXwoolDS5Y=", "path": "github.com/dexon-foundation/dexon-consensus/core/types", - "revision": "269fed574986331e07bf931b2c9b1a495c40f8ac", - "revisionTime": "2019-04-11T06:42:07Z", - "version": "single-chain", - "versionExact": "single-chain" + "revision": "24c0cecbbf7ba84754ccc02d37c9540ce317976c", + "revisionTime": "2019-04-12T03:35:37Z", + "version": "master", + "versionExact": "master" }, { "checksumSHA1": "lbG7yqVgzo2CV/CQPYjG78xp5jg=", "path": "github.com/dexon-foundation/dexon-consensus/core/types/dkg", - "revision": "269fed574986331e07bf931b2c9b1a495c40f8ac", - "revisionTime": "2019-04-11T06:42:07Z", - "version": "single-chain", - "versionExact": "single-chain" + "revision": "24c0cecbbf7ba84754ccc02d37c9540ce317976c", + "revisionTime": "2019-04-12T03:35:37Z", + "version": "master", + "versionExact": "master" }, { - "checksumSHA1": "1VsJIshz0loXnGwCtrMM8SuIo6Y=", + "checksumSHA1": "C2r/uE8K53WIIA0FYYHIfR2xhng=", "path": "github.com/dexon-foundation/dexon-consensus/core/utils", - "revision": "269fed574986331e07bf931b2c9b1a495c40f8ac", - "revisionTime": "2019-04-11T06:42:07Z", - "version": "single-chain", - "versionExact": "single-chain" + "revision": "24c0cecbbf7ba84754ccc02d37c9540ce317976c", + "revisionTime": "2019-04-12T03:35:37Z", + "version": "master", + "versionExact": "master" }, { "checksumSHA1": "TAkwduKZqLyimyTPPWIllZWYFuE=", -- cgit v1.2.3