aboutsummaryrefslogtreecommitdiffstats
path: root/dex/handler.go
diff options
context:
space:
mode:
Diffstat (limited to 'dex/handler.go')
-rw-r--r--dex/handler.go109
1 files changed, 68 insertions, 41 deletions
diff --git a/dex/handler.go b/dex/handler.go
index b5ee01528..074d42249 100644
--- a/dex/handler.go
+++ b/dex/handler.go
@@ -106,12 +106,11 @@ type ProtocolManager struct {
SubProtocols []p2p.Protocol
- eventMux *event.TypeMux
- txsCh chan core.NewTxsEvent
- txsSub event.Subscription
- metasCh chan newMetasEvent
- metasSub event.Subscription
- minedBlockSub *event.TypeMuxSubscription
+ eventMux *event.TypeMux
+ txsCh chan core.NewTxsEvent
+ txsSub event.Subscription
+ metasCh chan newMetasEvent
+ metasSub event.Subscription
// channels for fetcher, syncer, txsyncLoop
newPeerCh chan *peer
@@ -132,6 +131,9 @@ type ProtocolManager struct {
// wait group is used for graceful shutdowns during downloading
// and processing
wg sync.WaitGroup
+
+ // Dexcon
+ isBlockProposer bool
}
// NewProtocolManager returns a new Ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
@@ -140,28 +142,31 @@ func NewProtocolManager(
config *params.ChainConfig, mode downloader.SyncMode, networkID uint64,
mux *event.TypeMux, txpool txPool, engine consensus.Engine,
blockchain *core.BlockChain, chaindb ethdb.Database,
- gov governance) (*ProtocolManager, error) {
+ isBlockProposer bool, gov governance) (*ProtocolManager, error) {
tab := newNodeTable()
// Create the protocol manager with the base fields
manager := &ProtocolManager{
- networkID: networkID,
- eventMux: mux,
- txpool: txpool,
- nodeTable: tab,
- gov: gov,
- blockchain: blockchain,
- cache: newCache(128),
- chainconfig: config,
- newPeerCh: make(chan *peer),
- noMorePeers: make(chan struct{}),
- txsyncCh: make(chan *txsync),
- metasyncCh: make(chan *metasync),
- quitSync: make(chan struct{}),
- receiveCh: make(chan interface{}, 1024),
+ networkID: networkID,
+ eventMux: mux,
+ txpool: txpool,
+ nodeTable: tab,
+ gov: gov,
+ blockchain: blockchain,
+ cache: newCache(128),
+ chainconfig: config,
+ newPeerCh: make(chan *peer),
+ noMorePeers: make(chan struct{}),
+ txsyncCh: make(chan *txsync),
+ metasyncCh: make(chan *metasync),
+ quitSync: make(chan struct{}),
+ receiveCh: make(chan interface{}, 1024),
+ isBlockProposer: isBlockProposer,
}
- // TODO(w): remove this hack once we have fix block processing.
- atomic.StoreUint32(&manager.acceptTxs, 1)
+ // TODO(w): start accepting TXs only when we are synced.
+ if isBlockProposer {
+ atomic.StoreUint32(&manager.acceptTxs, 1)
+ }
// Figure out whether to allow fast sync or not
if mode == downloader.FastSync && blockchain.CurrentBlock().NumberU64() > 0 {
@@ -261,10 +266,6 @@ func (pm *ProtocolManager) Start(srvr p2pServer, maxPeers int) {
pm.metasSub = pm.nodeTable.SubscribeNewMetasEvent(pm.metasCh)
go pm.metaBroadcastLoop()
- // broadcast mined blocks
- pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})
- go pm.minedBroadcastLoop()
-
// run the peer set loop
pm.chainHeadCh = make(chan core.ChainHeadEvent)
pm.chainHeadSub = pm.blockchain.SubscribeChainHeadEvent(pm.chainHeadCh)
@@ -309,8 +310,7 @@ func (pm *ProtocolManager) makeSelfNodeMeta() *NodeMeta {
func (pm *ProtocolManager) Stop() {
log.Info("Stopping Ethereum protocol")
- pm.txsSub.Unsubscribe() // quits txBroadcastLoop
- pm.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop
+ pm.txsSub.Unsubscribe() // quits txBroadcastLoop
pm.chainHeadSub.Unsubscribe()
// Quit the sync loop.
@@ -653,6 +653,10 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
case msg.Code == NewBlockHashesMsg:
+ // Ignore new block hash messages in block proposer mode.
+ if pm.isBlockProposer {
+ break
+ }
var announces newBlockHashesData
if err := msg.Decode(&announces); err != nil {
return errResp(ErrDecode, "%v: %v", msg, err)
@@ -673,6 +677,10 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
case msg.Code == NewBlockMsg:
+ // Ignore new block messages in block proposer mode.
+ if pm.isBlockProposer {
+ break
+ }
// Retrieve and decode the propagated block
var request newBlockData
if err := msg.Decode(&request); err != nil {
@@ -735,7 +743,13 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
p.MarkNodeMeta(meta.Hash())
}
pm.nodeTable.Add(metas)
+
+ // Block proposer-only messages.
+
case msg.Code == LatticeBlockMsg:
+ if !pm.isBlockProposer {
+ break
+ }
var block coreTypes.Block
if err := msg.Decode(&block); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
@@ -743,6 +757,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
pm.cache.addBlock(&block)
pm.receiveCh <- &block
case msg.Code == VoteMsg:
+ if !pm.isBlockProposer {
+ break
+ }
var vote coreTypes.Vote
if err := msg.Decode(&vote); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
@@ -752,6 +769,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
pm.receiveCh <- &vote
case msg.Code == AgreementMsg:
+ if !pm.isBlockProposer {
+ break
+ }
// DKG set is receiver
var agreement coreTypes.AgreementResult
if err := msg.Decode(&agreement); err != nil {
@@ -759,6 +779,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
pm.receiveCh <- &agreement
case msg.Code == RandomnessMsg:
+ if !pm.isBlockProposer {
+ break
+ }
// Broadcast this to all peer
var randomness coreTypes.BlockRandomnessResult
if err := msg.Decode(&randomness); err != nil {
@@ -766,6 +789,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
pm.receiveCh <- &randomness
case msg.Code == DKGPrivateShareMsg:
+ if !pm.isBlockProposer {
+ break
+ }
// Do not relay this msg
var ps dkgTypes.PrivateShare
if err := msg.Decode(&ps); err != nil {
@@ -773,6 +799,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
pm.receiveCh <- &ps
case msg.Code == DKGPartialSignatureMsg:
+ if !pm.isBlockProposer {
+ break
+ }
// broadcast in DKG set
var psig dkgTypes.PartialSignature
if err := msg.Decode(&psig); err != nil {
@@ -780,6 +809,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
pm.receiveCh <- &psig
case msg.Code == PullBlocksMsg:
+ if !pm.isBlockProposer {
+ break
+ }
var hashes coreCommon.Hashes
if err := msg.Decode(&hashes); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
@@ -792,6 +824,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
}
case msg.Code == PullVotesMsg:
+ if !pm.isBlockProposer {
+ break
+ }
var pos coreTypes.Position
if err := msg.Decode(&pos); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
@@ -1006,17 +1041,6 @@ func (pm *ProtocolManager) BroadcastPullVotes(
}
}
-// Mined broadcast loop
-func (pm *ProtocolManager) minedBroadcastLoop() {
- // automatically stops if unsubscribe
- for obj := range pm.minedBlockSub.Chan() {
- if ev, ok := obj.Data.(core.NewMinedBlockEvent); ok {
- pm.BroadcastBlock(ev.Block, true) // First propagate block to peers
- pm.BroadcastBlock(ev.Block, false) // Only then announce to the rest
- }
- }
-}
-
func (pm *ProtocolManager) txBroadcastLoop() {
for {
select {
@@ -1056,7 +1080,10 @@ func (pm *ProtocolManager) peerSetLoop() {
for {
select {
- case <-pm.chainHeadCh:
+ case event := <-pm.chainHeadCh:
+ pm.BroadcastBlock(event.Block, true) // First propagate block to peers
+ pm.BroadcastBlock(event.Block, false) // Only then announce to the rest
+
newRound := pm.gov.LenCRS() - 1
log.Trace("new round", "round", newRound)
if newRound == round {