diff options
Diffstat (limited to 'dex/handler.go')
-rw-r--r-- | dex/handler.go | 103 |
1 files changed, 64 insertions, 39 deletions
diff --git a/dex/handler.go b/dex/handler.go index 2f8ed13fa..7bc9c297d 100644 --- a/dex/handler.go +++ b/dex/handler.go @@ -106,12 +106,11 @@ type ProtocolManager struct { SubProtocols []p2p.Protocol - eventMux *event.TypeMux - txsCh chan core.NewTxsEvent - txsSub event.Subscription - metasCh chan newMetasEvent - metasSub event.Subscription - minedBlockSub *event.TypeMuxSubscription + eventMux *event.TypeMux + txsCh chan core.NewTxsEvent + txsSub event.Subscription + metasCh chan newMetasEvent + metasSub event.Subscription // channels for fetcher, syncer, txsyncLoop newPeerCh chan *peer @@ -132,6 +131,9 @@ type ProtocolManager struct { // wait group is used for graceful shutdowns during downloading // and processing wg sync.WaitGroup + + // Dexcon + isBlockProposer bool } // NewProtocolManager returns a new Ethereum sub protocol manager. The Ethereum sub protocol manages peers capable @@ -140,24 +142,25 @@ func NewProtocolManager( config *params.ChainConfig, mode downloader.SyncMode, networkID uint64, mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database, - gov governance) (*ProtocolManager, error) { + isBlockProposer bool, gov governance) (*ProtocolManager, error) { tab := newNodeTable() // Create the protocol manager with the base fields manager := &ProtocolManager{ - networkID: networkID, - eventMux: mux, - txpool: txpool, - nodeTable: tab, - gov: gov, - blockchain: blockchain, - cache: newCache(128), - chainconfig: config, - newPeerCh: make(chan *peer), - noMorePeers: make(chan struct{}), - txsyncCh: make(chan *txsync), - metasyncCh: make(chan *metasync), - quitSync: make(chan struct{}), - receiveCh: make(chan interface{}, 1024), + networkID: networkID, + eventMux: mux, + txpool: txpool, + nodeTable: tab, + gov: gov, + blockchain: blockchain, + cache: newCache(128), + chainconfig: config, + newPeerCh: make(chan *peer), + noMorePeers: make(chan struct{}), + txsyncCh: make(chan *txsync), + metasyncCh: make(chan *metasync), + quitSync: make(chan struct{}), + receiveCh: make(chan interface{}, 1024), + isBlockProposer: isBlockProposer, } // Figure out whether to allow fast sync or not @@ -258,10 +261,6 @@ func (pm *ProtocolManager) Start(srvr p2pServer, maxPeers int) { pm.metasSub = pm.nodeTable.SubscribeNewMetasEvent(pm.metasCh) go pm.metaBroadcastLoop() - // broadcast mined blocks - pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{}) - go pm.minedBroadcastLoop() - // run the peer set loop pm.chainHeadCh = make(chan core.ChainHeadEvent) pm.chainHeadSub = pm.blockchain.SubscribeChainHeadEvent(pm.chainHeadCh) @@ -306,8 +305,7 @@ func (pm *ProtocolManager) makeSelfNodeMeta() *NodeMeta { func (pm *ProtocolManager) Stop() { log.Info("Stopping Ethereum protocol") - pm.txsSub.Unsubscribe() // quits txBroadcastLoop - pm.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop + pm.txsSub.Unsubscribe() // quits txBroadcastLoop pm.chainHeadSub.Unsubscribe() // Quit the sync loop. @@ -650,6 +648,10 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } case msg.Code == NewBlockHashesMsg: + // Ignore new block hash messages in block proposer mode. + if pm.isBlockProposer { + break + } var announces newBlockHashesData if err := msg.Decode(&announces); err != nil { return errResp(ErrDecode, "%v: %v", msg, err) @@ -670,6 +672,10 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } case msg.Code == NewBlockMsg: + // Ignore new block messages in block proposer mode. + if pm.isBlockProposer { + break + } // Retrieve and decode the propagated block var request newBlockData if err := msg.Decode(&request); err != nil { @@ -732,7 +738,13 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { p.MarkNodeMeta(meta.Hash()) } pm.nodeTable.Add(metas) + + // Block proposer-only messages. + case msg.Code == LatticeBlockMsg: + if !pm.isBlockProposer { + break + } var block coreTypes.Block if err := msg.Decode(&block); err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) @@ -740,6 +752,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { pm.cache.addBlock(&block) pm.receiveCh <- &block case msg.Code == VoteMsg: + if !pm.isBlockProposer { + break + } var vote coreTypes.Vote if err := msg.Decode(&vote); err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) @@ -749,6 +764,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } pm.receiveCh <- &vote case msg.Code == AgreementMsg: + if !pm.isBlockProposer { + break + } // DKG set is receiver var agreement coreTypes.AgreementResult if err := msg.Decode(&agreement); err != nil { @@ -756,6 +774,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } pm.receiveCh <- &agreement case msg.Code == RandomnessMsg: + if !pm.isBlockProposer { + break + } // Broadcast this to all peer var randomness coreTypes.BlockRandomnessResult if err := msg.Decode(&randomness); err != nil { @@ -763,6 +784,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } pm.receiveCh <- &randomness case msg.Code == DKGPrivateShareMsg: + if !pm.isBlockProposer { + break + } // Do not relay this msg var ps dkgTypes.PrivateShare if err := msg.Decode(&ps); err != nil { @@ -770,6 +794,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } pm.receiveCh <- &ps case msg.Code == DKGPartialSignatureMsg: + if !pm.isBlockProposer { + break + } // broadcast in DKG set var psig dkgTypes.PartialSignature if err := msg.Decode(&psig); err != nil { @@ -777,6 +804,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } pm.receiveCh <- &psig case msg.Code == PullBlocksMsg: + if !pm.isBlockProposer { + break + } var hashes coreCommon.Hashes if err := msg.Decode(&hashes); err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) @@ -789,6 +819,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } } case msg.Code == PullVotesMsg: + if !pm.isBlockProposer { + break + } var pos coreTypes.Position if err := msg.Decode(&pos); err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) @@ -1003,17 +1036,6 @@ func (pm *ProtocolManager) BroadcastPullVotes( } } -// Mined broadcast loop -func (pm *ProtocolManager) minedBroadcastLoop() { - // automatically stops if unsubscribe - for obj := range pm.minedBlockSub.Chan() { - if ev, ok := obj.Data.(core.NewMinedBlockEvent); ok { - pm.BroadcastBlock(ev.Block, true) // First propagate block to peers - pm.BroadcastBlock(ev.Block, false) // Only then announce to the rest - } - } -} - func (pm *ProtocolManager) txBroadcastLoop() { for { select { @@ -1053,7 +1075,10 @@ func (pm *ProtocolManager) peerSetLoop() { for { select { - case <-pm.chainHeadCh: + case event := <-pm.chainHeadCh: + pm.BroadcastBlock(event.Block, true) // First propagate block to peers + pm.BroadcastBlock(event.Block, false) // Only then announce to the rest + newRound := pm.gov.LenCRS() - 1 log.Trace("new round", "round", newRound) if newRound == round { |