diff options
Diffstat (limited to 'dex/handler.go')
-rw-r--r-- | dex/handler.go | 109 |
1 files changed, 68 insertions, 41 deletions
diff --git a/dex/handler.go b/dex/handler.go index b5ee01528..074d42249 100644 --- a/dex/handler.go +++ b/dex/handler.go @@ -106,12 +106,11 @@ type ProtocolManager struct { SubProtocols []p2p.Protocol - eventMux *event.TypeMux - txsCh chan core.NewTxsEvent - txsSub event.Subscription - metasCh chan newMetasEvent - metasSub event.Subscription - minedBlockSub *event.TypeMuxSubscription + eventMux *event.TypeMux + txsCh chan core.NewTxsEvent + txsSub event.Subscription + metasCh chan newMetasEvent + metasSub event.Subscription // channels for fetcher, syncer, txsyncLoop newPeerCh chan *peer @@ -132,6 +131,9 @@ type ProtocolManager struct { // wait group is used for graceful shutdowns during downloading // and processing wg sync.WaitGroup + + // Dexcon + isBlockProposer bool } // NewProtocolManager returns a new Ethereum sub protocol manager. The Ethereum sub protocol manages peers capable @@ -140,28 +142,31 @@ func NewProtocolManager( config *params.ChainConfig, mode downloader.SyncMode, networkID uint64, mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database, - gov governance) (*ProtocolManager, error) { + isBlockProposer bool, gov governance) (*ProtocolManager, error) { tab := newNodeTable() // Create the protocol manager with the base fields manager := &ProtocolManager{ - networkID: networkID, - eventMux: mux, - txpool: txpool, - nodeTable: tab, - gov: gov, - blockchain: blockchain, - cache: newCache(128), - chainconfig: config, - newPeerCh: make(chan *peer), - noMorePeers: make(chan struct{}), - txsyncCh: make(chan *txsync), - metasyncCh: make(chan *metasync), - quitSync: make(chan struct{}), - receiveCh: make(chan interface{}, 1024), + networkID: networkID, + eventMux: mux, + txpool: txpool, + nodeTable: tab, + gov: gov, + blockchain: blockchain, + cache: newCache(128), + chainconfig: config, + newPeerCh: make(chan *peer), + noMorePeers: make(chan struct{}), + txsyncCh: make(chan *txsync), + metasyncCh: make(chan *metasync), + quitSync: make(chan struct{}), + receiveCh: make(chan interface{}, 1024), + isBlockProposer: isBlockProposer, } - // TODO(w): remove this hack once we have fix block processing. - atomic.StoreUint32(&manager.acceptTxs, 1) + // TODO(w): start accepting TXs only when we are synced. + if isBlockProposer { + atomic.StoreUint32(&manager.acceptTxs, 1) + } // Figure out whether to allow fast sync or not if mode == downloader.FastSync && blockchain.CurrentBlock().NumberU64() > 0 { @@ -261,10 +266,6 @@ func (pm *ProtocolManager) Start(srvr p2pServer, maxPeers int) { pm.metasSub = pm.nodeTable.SubscribeNewMetasEvent(pm.metasCh) go pm.metaBroadcastLoop() - // broadcast mined blocks - pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{}) - go pm.minedBroadcastLoop() - // run the peer set loop pm.chainHeadCh = make(chan core.ChainHeadEvent) pm.chainHeadSub = pm.blockchain.SubscribeChainHeadEvent(pm.chainHeadCh) @@ -309,8 +310,7 @@ func (pm *ProtocolManager) makeSelfNodeMeta() *NodeMeta { func (pm *ProtocolManager) Stop() { log.Info("Stopping Ethereum protocol") - pm.txsSub.Unsubscribe() // quits txBroadcastLoop - pm.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop + pm.txsSub.Unsubscribe() // quits txBroadcastLoop pm.chainHeadSub.Unsubscribe() // Quit the sync loop. @@ -653,6 +653,10 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } case msg.Code == NewBlockHashesMsg: + // Ignore new block hash messages in block proposer mode. + if pm.isBlockProposer { + break + } var announces newBlockHashesData if err := msg.Decode(&announces); err != nil { return errResp(ErrDecode, "%v: %v", msg, err) @@ -673,6 +677,10 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } case msg.Code == NewBlockMsg: + // Ignore new block messages in block proposer mode. + if pm.isBlockProposer { + break + } // Retrieve and decode the propagated block var request newBlockData if err := msg.Decode(&request); err != nil { @@ -735,7 +743,13 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { p.MarkNodeMeta(meta.Hash()) } pm.nodeTable.Add(metas) + + // Block proposer-only messages. + case msg.Code == LatticeBlockMsg: + if !pm.isBlockProposer { + break + } var block coreTypes.Block if err := msg.Decode(&block); err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) @@ -743,6 +757,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { pm.cache.addBlock(&block) pm.receiveCh <- &block case msg.Code == VoteMsg: + if !pm.isBlockProposer { + break + } var vote coreTypes.Vote if err := msg.Decode(&vote); err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) @@ -752,6 +769,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } pm.receiveCh <- &vote case msg.Code == AgreementMsg: + if !pm.isBlockProposer { + break + } // DKG set is receiver var agreement coreTypes.AgreementResult if err := msg.Decode(&agreement); err != nil { @@ -759,6 +779,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } pm.receiveCh <- &agreement case msg.Code == RandomnessMsg: + if !pm.isBlockProposer { + break + } // Broadcast this to all peer var randomness coreTypes.BlockRandomnessResult if err := msg.Decode(&randomness); err != nil { @@ -766,6 +789,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } pm.receiveCh <- &randomness case msg.Code == DKGPrivateShareMsg: + if !pm.isBlockProposer { + break + } // Do not relay this msg var ps dkgTypes.PrivateShare if err := msg.Decode(&ps); err != nil { @@ -773,6 +799,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } pm.receiveCh <- &ps case msg.Code == DKGPartialSignatureMsg: + if !pm.isBlockProposer { + break + } // broadcast in DKG set var psig dkgTypes.PartialSignature if err := msg.Decode(&psig); err != nil { @@ -780,6 +809,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } pm.receiveCh <- &psig case msg.Code == PullBlocksMsg: + if !pm.isBlockProposer { + break + } var hashes coreCommon.Hashes if err := msg.Decode(&hashes); err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) @@ -792,6 +824,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } } case msg.Code == PullVotesMsg: + if !pm.isBlockProposer { + break + } var pos coreTypes.Position if err := msg.Decode(&pos); err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) @@ -1006,17 +1041,6 @@ func (pm *ProtocolManager) BroadcastPullVotes( } } -// Mined broadcast loop -func (pm *ProtocolManager) minedBroadcastLoop() { - // automatically stops if unsubscribe - for obj := range pm.minedBlockSub.Chan() { - if ev, ok := obj.Data.(core.NewMinedBlockEvent); ok { - pm.BroadcastBlock(ev.Block, true) // First propagate block to peers - pm.BroadcastBlock(ev.Block, false) // Only then announce to the rest - } - } -} - func (pm *ProtocolManager) txBroadcastLoop() { for { select { @@ -1056,7 +1080,10 @@ func (pm *ProtocolManager) peerSetLoop() { for { select { - case <-pm.chainHeadCh: + case event := <-pm.chainHeadCh: + pm.BroadcastBlock(event.Block, true) // First propagate block to peers + pm.BroadcastBlock(event.Block, false) // Only then announce to the rest + newRound := pm.gov.LenCRS() - 1 log.Trace("new round", "round", newRound) if newRound == round { |