aboutsummaryrefslogtreecommitdiffstats
path: root/dex/handler.go
diff options
context:
space:
mode:
Diffstat (limited to 'dex/handler.go')
-rw-r--r--dex/handler.go103
1 files changed, 64 insertions, 39 deletions
diff --git a/dex/handler.go b/dex/handler.go
index 2f8ed13fa..7bc9c297d 100644
--- a/dex/handler.go
+++ b/dex/handler.go
@@ -106,12 +106,11 @@ type ProtocolManager struct {
SubProtocols []p2p.Protocol
- eventMux *event.TypeMux
- txsCh chan core.NewTxsEvent
- txsSub event.Subscription
- metasCh chan newMetasEvent
- metasSub event.Subscription
- minedBlockSub *event.TypeMuxSubscription
+ eventMux *event.TypeMux
+ txsCh chan core.NewTxsEvent
+ txsSub event.Subscription
+ metasCh chan newMetasEvent
+ metasSub event.Subscription
// channels for fetcher, syncer, txsyncLoop
newPeerCh chan *peer
@@ -132,6 +131,9 @@ type ProtocolManager struct {
// wait group is used for graceful shutdowns during downloading
// and processing
wg sync.WaitGroup
+
+ // Dexcon
+ isBlockProposer bool
}
// NewProtocolManager returns a new Ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
@@ -140,24 +142,25 @@ func NewProtocolManager(
config *params.ChainConfig, mode downloader.SyncMode, networkID uint64,
mux *event.TypeMux, txpool txPool, engine consensus.Engine,
blockchain *core.BlockChain, chaindb ethdb.Database,
- gov governance) (*ProtocolManager, error) {
+ isBlockProposer bool, gov governance) (*ProtocolManager, error) {
tab := newNodeTable()
// Create the protocol manager with the base fields
manager := &ProtocolManager{
- networkID: networkID,
- eventMux: mux,
- txpool: txpool,
- nodeTable: tab,
- gov: gov,
- blockchain: blockchain,
- cache: newCache(128),
- chainconfig: config,
- newPeerCh: make(chan *peer),
- noMorePeers: make(chan struct{}),
- txsyncCh: make(chan *txsync),
- metasyncCh: make(chan *metasync),
- quitSync: make(chan struct{}),
- receiveCh: make(chan interface{}, 1024),
+ networkID: networkID,
+ eventMux: mux,
+ txpool: txpool,
+ nodeTable: tab,
+ gov: gov,
+ blockchain: blockchain,
+ cache: newCache(128),
+ chainconfig: config,
+ newPeerCh: make(chan *peer),
+ noMorePeers: make(chan struct{}),
+ txsyncCh: make(chan *txsync),
+ metasyncCh: make(chan *metasync),
+ quitSync: make(chan struct{}),
+ receiveCh: make(chan interface{}, 1024),
+ isBlockProposer: isBlockProposer,
}
// Figure out whether to allow fast sync or not
@@ -258,10 +261,6 @@ func (pm *ProtocolManager) Start(srvr p2pServer, maxPeers int) {
pm.metasSub = pm.nodeTable.SubscribeNewMetasEvent(pm.metasCh)
go pm.metaBroadcastLoop()
- // broadcast mined blocks
- pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})
- go pm.minedBroadcastLoop()
-
// run the peer set loop
pm.chainHeadCh = make(chan core.ChainHeadEvent)
pm.chainHeadSub = pm.blockchain.SubscribeChainHeadEvent(pm.chainHeadCh)
@@ -306,8 +305,7 @@ func (pm *ProtocolManager) makeSelfNodeMeta() *NodeMeta {
func (pm *ProtocolManager) Stop() {
log.Info("Stopping Ethereum protocol")
- pm.txsSub.Unsubscribe() // quits txBroadcastLoop
- pm.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop
+ pm.txsSub.Unsubscribe() // quits txBroadcastLoop
pm.chainHeadSub.Unsubscribe()
// Quit the sync loop.
@@ -650,6 +648,10 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
case msg.Code == NewBlockHashesMsg:
+ // Ignore new block hash messages in block proposer mode.
+ if pm.isBlockProposer {
+ break
+ }
var announces newBlockHashesData
if err := msg.Decode(&announces); err != nil {
return errResp(ErrDecode, "%v: %v", msg, err)
@@ -670,6 +672,10 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
case msg.Code == NewBlockMsg:
+ // Ignore new block messages in block proposer mode.
+ if pm.isBlockProposer {
+ break
+ }
// Retrieve and decode the propagated block
var request newBlockData
if err := msg.Decode(&request); err != nil {
@@ -732,7 +738,13 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
p.MarkNodeMeta(meta.Hash())
}
pm.nodeTable.Add(metas)
+
+ // Block proposer-only messages.
+
case msg.Code == LatticeBlockMsg:
+ if !pm.isBlockProposer {
+ break
+ }
var block coreTypes.Block
if err := msg.Decode(&block); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
@@ -740,6 +752,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
pm.cache.addBlock(&block)
pm.receiveCh <- &block
case msg.Code == VoteMsg:
+ if !pm.isBlockProposer {
+ break
+ }
var vote coreTypes.Vote
if err := msg.Decode(&vote); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
@@ -749,6 +764,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
pm.receiveCh <- &vote
case msg.Code == AgreementMsg:
+ if !pm.isBlockProposer {
+ break
+ }
// DKG set is receiver
var agreement coreTypes.AgreementResult
if err := msg.Decode(&agreement); err != nil {
@@ -756,6 +774,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
pm.receiveCh <- &agreement
case msg.Code == RandomnessMsg:
+ if !pm.isBlockProposer {
+ break
+ }
// Broadcast this to all peer
var randomness coreTypes.BlockRandomnessResult
if err := msg.Decode(&randomness); err != nil {
@@ -763,6 +784,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
pm.receiveCh <- &randomness
case msg.Code == DKGPrivateShareMsg:
+ if !pm.isBlockProposer {
+ break
+ }
// Do not relay this msg
var ps dkgTypes.PrivateShare
if err := msg.Decode(&ps); err != nil {
@@ -770,6 +794,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
pm.receiveCh <- &ps
case msg.Code == DKGPartialSignatureMsg:
+ if !pm.isBlockProposer {
+ break
+ }
// broadcast in DKG set
var psig dkgTypes.PartialSignature
if err := msg.Decode(&psig); err != nil {
@@ -777,6 +804,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
pm.receiveCh <- &psig
case msg.Code == PullBlocksMsg:
+ if !pm.isBlockProposer {
+ break
+ }
var hashes coreCommon.Hashes
if err := msg.Decode(&hashes); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
@@ -789,6 +819,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
}
case msg.Code == PullVotesMsg:
+ if !pm.isBlockProposer {
+ break
+ }
var pos coreTypes.Position
if err := msg.Decode(&pos); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
@@ -1003,17 +1036,6 @@ func (pm *ProtocolManager) BroadcastPullVotes(
}
}
-// Mined broadcast loop
-func (pm *ProtocolManager) minedBroadcastLoop() {
- // automatically stops if unsubscribe
- for obj := range pm.minedBlockSub.Chan() {
- if ev, ok := obj.Data.(core.NewMinedBlockEvent); ok {
- pm.BroadcastBlock(ev.Block, true) // First propagate block to peers
- pm.BroadcastBlock(ev.Block, false) // Only then announce to the rest
- }
- }
-}
-
func (pm *ProtocolManager) txBroadcastLoop() {
for {
select {
@@ -1053,7 +1075,10 @@ func (pm *ProtocolManager) peerSetLoop() {
for {
select {
- case <-pm.chainHeadCh:
+ case event := <-pm.chainHeadCh:
+ pm.BroadcastBlock(event.Block, true) // First propagate block to peers
+ pm.BroadcastBlock(event.Block, false) // Only then announce to the rest
+
newRound := pm.gov.LenCRS() - 1
log.Trace("new round", "round", newRound)
if newRound == round {