aboutsummaryrefslogtreecommitdiffstats
path: root/dex
diff options
context:
space:
mode:
Diffstat (limited to 'dex')
-rw-r--r--dex/backend.go7
-rw-r--r--dex/config.go12
-rw-r--r--dex/handler.go103
3 files changed, 75 insertions, 47 deletions
diff --git a/dex/backend.go b/dex/backend.go
index d7bc4630e..740b8cd6f 100644
--- a/dex/backend.go
+++ b/dex/backend.go
@@ -134,8 +134,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Dexon, error) {
}
cacheConfig = &core.CacheConfig{Disabled: config.NoPruning, TrieCleanLimit: config.TrieCleanCache, TrieDirtyLimit: config.TrieDirtyCache, TrieTimeLimit: config.TrieTimeout}
)
- dex.blockchain, err = core.NewBlockChainWithDexonValidator(chainDb, cacheConfig,
- dex.chainConfig, dex.engine, vmConfig, nil)
+ dex.blockchain, err = core.NewBlockChain(chainDb, cacheConfig, dex.chainConfig, dex.engine, vmConfig, nil)
// Rewind the chain in case of an incompatible config upgrade.
if compat, ok := genesisErr.(*params.ConfigCompatError); ok {
@@ -148,7 +147,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Dexon, error) {
if config.TxPool.Journal != "" {
config.TxPool.Journal = ctx.ResolvePath(config.TxPool.Journal)
}
- dex.txPool = core.NewTxPool(config.TxPool, dex.chainConfig, dex.blockchain)
+ dex.txPool = core.NewTxPool(config.TxPool, dex.chainConfig, dex.blockchain, config.BlockProposerEnabled)
dex.APIBackend = &DexAPIBackend{dex, nil}
gpoParams := config.GPO
@@ -166,7 +165,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Dexon, error) {
pm, err := NewProtocolManager(dex.chainConfig, config.SyncMode,
config.NetworkId, dex.eventMux, dex.txPool, dex.engine, dex.blockchain,
- chainDb, dex.governance)
+ chainDb, config.BlockProposerEnabled, dex.governance)
if err != nil {
return nil, err
}
diff --git a/dex/config.go b/dex/config.go
index cd9d1b1db..282de494f 100644
--- a/dex/config.go
+++ b/dex/config.go
@@ -46,10 +46,11 @@ var DefaultConfig = Config{
Blocks: 20,
Percentile: 60,
},
- DefaultGasPrice: big.NewInt(params.GWei),
- GasFloor: 8000000,
- GasCeil: 8000000,
- GasLimitTolerance: 1000000,
+ BlockProposerEnabled: false,
+ DefaultGasPrice: big.NewInt(params.GWei),
+ GasFloor: 8000000,
+ GasCeil: 8000000,
+ GasLimitTolerance: 1000000,
}
func init() {
@@ -106,6 +107,9 @@ type Config struct {
// Gas Price Oracle options
GPO gasprice.Config
+ // BlockProposer options
+ BlockProposerEnabled bool
+
// Enables tracking of SHA3 preimages in the VM
EnablePreimageRecording bool
diff --git a/dex/handler.go b/dex/handler.go
index 2f8ed13fa..7bc9c297d 100644
--- a/dex/handler.go
+++ b/dex/handler.go
@@ -106,12 +106,11 @@ type ProtocolManager struct {
SubProtocols []p2p.Protocol
- eventMux *event.TypeMux
- txsCh chan core.NewTxsEvent
- txsSub event.Subscription
- metasCh chan newMetasEvent
- metasSub event.Subscription
- minedBlockSub *event.TypeMuxSubscription
+ eventMux *event.TypeMux
+ txsCh chan core.NewTxsEvent
+ txsSub event.Subscription
+ metasCh chan newMetasEvent
+ metasSub event.Subscription
// channels for fetcher, syncer, txsyncLoop
newPeerCh chan *peer
@@ -132,6 +131,9 @@ type ProtocolManager struct {
// wait group is used for graceful shutdowns during downloading
// and processing
wg sync.WaitGroup
+
+ // Dexcon
+ isBlockProposer bool
}
// NewProtocolManager returns a new Ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
@@ -140,24 +142,25 @@ func NewProtocolManager(
config *params.ChainConfig, mode downloader.SyncMode, networkID uint64,
mux *event.TypeMux, txpool txPool, engine consensus.Engine,
blockchain *core.BlockChain, chaindb ethdb.Database,
- gov governance) (*ProtocolManager, error) {
+ isBlockProposer bool, gov governance) (*ProtocolManager, error) {
tab := newNodeTable()
// Create the protocol manager with the base fields
manager := &ProtocolManager{
- networkID: networkID,
- eventMux: mux,
- txpool: txpool,
- nodeTable: tab,
- gov: gov,
- blockchain: blockchain,
- cache: newCache(128),
- chainconfig: config,
- newPeerCh: make(chan *peer),
- noMorePeers: make(chan struct{}),
- txsyncCh: make(chan *txsync),
- metasyncCh: make(chan *metasync),
- quitSync: make(chan struct{}),
- receiveCh: make(chan interface{}, 1024),
+ networkID: networkID,
+ eventMux: mux,
+ txpool: txpool,
+ nodeTable: tab,
+ gov: gov,
+ blockchain: blockchain,
+ cache: newCache(128),
+ chainconfig: config,
+ newPeerCh: make(chan *peer),
+ noMorePeers: make(chan struct{}),
+ txsyncCh: make(chan *txsync),
+ metasyncCh: make(chan *metasync),
+ quitSync: make(chan struct{}),
+ receiveCh: make(chan interface{}, 1024),
+ isBlockProposer: isBlockProposer,
}
// Figure out whether to allow fast sync or not
@@ -258,10 +261,6 @@ func (pm *ProtocolManager) Start(srvr p2pServer, maxPeers int) {
pm.metasSub = pm.nodeTable.SubscribeNewMetasEvent(pm.metasCh)
go pm.metaBroadcastLoop()
- // broadcast mined blocks
- pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})
- go pm.minedBroadcastLoop()
-
// run the peer set loop
pm.chainHeadCh = make(chan core.ChainHeadEvent)
pm.chainHeadSub = pm.blockchain.SubscribeChainHeadEvent(pm.chainHeadCh)
@@ -306,8 +305,7 @@ func (pm *ProtocolManager) makeSelfNodeMeta() *NodeMeta {
func (pm *ProtocolManager) Stop() {
log.Info("Stopping Ethereum protocol")
- pm.txsSub.Unsubscribe() // quits txBroadcastLoop
- pm.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop
+ pm.txsSub.Unsubscribe() // quits txBroadcastLoop
pm.chainHeadSub.Unsubscribe()
// Quit the sync loop.
@@ -650,6 +648,10 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
case msg.Code == NewBlockHashesMsg:
+ // Ignore new block hash messages in block proposer mode.
+ if pm.isBlockProposer {
+ break
+ }
var announces newBlockHashesData
if err := msg.Decode(&announces); err != nil {
return errResp(ErrDecode, "%v: %v", msg, err)
@@ -670,6 +672,10 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
case msg.Code == NewBlockMsg:
+ // Ignore new block messages in block proposer mode.
+ if pm.isBlockProposer {
+ break
+ }
// Retrieve and decode the propagated block
var request newBlockData
if err := msg.Decode(&request); err != nil {
@@ -732,7 +738,13 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
p.MarkNodeMeta(meta.Hash())
}
pm.nodeTable.Add(metas)
+
+ // Block proposer-only messages.
+
case msg.Code == LatticeBlockMsg:
+ if !pm.isBlockProposer {
+ break
+ }
var block coreTypes.Block
if err := msg.Decode(&block); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
@@ -740,6 +752,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
pm.cache.addBlock(&block)
pm.receiveCh <- &block
case msg.Code == VoteMsg:
+ if !pm.isBlockProposer {
+ break
+ }
var vote coreTypes.Vote
if err := msg.Decode(&vote); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
@@ -749,6 +764,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
pm.receiveCh <- &vote
case msg.Code == AgreementMsg:
+ if !pm.isBlockProposer {
+ break
+ }
// DKG set is receiver
var agreement coreTypes.AgreementResult
if err := msg.Decode(&agreement); err != nil {
@@ -756,6 +774,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
pm.receiveCh <- &agreement
case msg.Code == RandomnessMsg:
+ if !pm.isBlockProposer {
+ break
+ }
// Broadcast this to all peer
var randomness coreTypes.BlockRandomnessResult
if err := msg.Decode(&randomness); err != nil {
@@ -763,6 +784,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
pm.receiveCh <- &randomness
case msg.Code == DKGPrivateShareMsg:
+ if !pm.isBlockProposer {
+ break
+ }
// Do not relay this msg
var ps dkgTypes.PrivateShare
if err := msg.Decode(&ps); err != nil {
@@ -770,6 +794,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
pm.receiveCh <- &ps
case msg.Code == DKGPartialSignatureMsg:
+ if !pm.isBlockProposer {
+ break
+ }
// broadcast in DKG set
var psig dkgTypes.PartialSignature
if err := msg.Decode(&psig); err != nil {
@@ -777,6 +804,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
pm.receiveCh <- &psig
case msg.Code == PullBlocksMsg:
+ if !pm.isBlockProposer {
+ break
+ }
var hashes coreCommon.Hashes
if err := msg.Decode(&hashes); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
@@ -789,6 +819,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
}
case msg.Code == PullVotesMsg:
+ if !pm.isBlockProposer {
+ break
+ }
var pos coreTypes.Position
if err := msg.Decode(&pos); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
@@ -1003,17 +1036,6 @@ func (pm *ProtocolManager) BroadcastPullVotes(
}
}
-// Mined broadcast loop
-func (pm *ProtocolManager) minedBroadcastLoop() {
- // automatically stops if unsubscribe
- for obj := range pm.minedBlockSub.Chan() {
- if ev, ok := obj.Data.(core.NewMinedBlockEvent); ok {
- pm.BroadcastBlock(ev.Block, true) // First propagate block to peers
- pm.BroadcastBlock(ev.Block, false) // Only then announce to the rest
- }
- }
-}
-
func (pm *ProtocolManager) txBroadcastLoop() {
for {
select {
@@ -1053,7 +1075,10 @@ func (pm *ProtocolManager) peerSetLoop() {
for {
select {
- case <-pm.chainHeadCh:
+ case event := <-pm.chainHeadCh:
+ pm.BroadcastBlock(event.Block, true) // First propagate block to peers
+ pm.BroadcastBlock(event.Block, false) // Only then announce to the rest
+
newRound := pm.gov.LenCRS() - 1
log.Trace("new round", "round", newRound)
if newRound == round {