From dbb3d8fd30bd33df37f13048dc334ead8d335ddc Mon Sep 17 00:00:00 2001 From: Wei-Ning Huang Date: Thu, 15 Nov 2018 13:30:50 +0800 Subject: core: refactor validator and fix light node sync (#25) Remove custom Dexon validator by adding a new `ValidateWitnessData` method into the validator interface. This allow us to properly detect know blocks. This also allow other gdex "light" client to sync compaction chain. Also, setup a standalone RPC node for handling RPC reqeusts. --- dex/backend.go | 7 ++-- dex/config.go | 12 ++++--- dex/handler.go | 103 +++++++++++++++++++++++++++++++++++---------------------- 3 files changed, 75 insertions(+), 47 deletions(-) (limited to 'dex') diff --git a/dex/backend.go b/dex/backend.go index d7bc4630e..740b8cd6f 100644 --- a/dex/backend.go +++ b/dex/backend.go @@ -134,8 +134,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Dexon, error) { } cacheConfig = &core.CacheConfig{Disabled: config.NoPruning, TrieCleanLimit: config.TrieCleanCache, TrieDirtyLimit: config.TrieDirtyCache, TrieTimeLimit: config.TrieTimeout} ) - dex.blockchain, err = core.NewBlockChainWithDexonValidator(chainDb, cacheConfig, - dex.chainConfig, dex.engine, vmConfig, nil) + dex.blockchain, err = core.NewBlockChain(chainDb, cacheConfig, dex.chainConfig, dex.engine, vmConfig, nil) // Rewind the chain in case of an incompatible config upgrade. if compat, ok := genesisErr.(*params.ConfigCompatError); ok { @@ -148,7 +147,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Dexon, error) { if config.TxPool.Journal != "" { config.TxPool.Journal = ctx.ResolvePath(config.TxPool.Journal) } - dex.txPool = core.NewTxPool(config.TxPool, dex.chainConfig, dex.blockchain) + dex.txPool = core.NewTxPool(config.TxPool, dex.chainConfig, dex.blockchain, config.BlockProposerEnabled) dex.APIBackend = &DexAPIBackend{dex, nil} gpoParams := config.GPO @@ -166,7 +165,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Dexon, error) { pm, err := NewProtocolManager(dex.chainConfig, config.SyncMode, config.NetworkId, dex.eventMux, dex.txPool, dex.engine, dex.blockchain, - chainDb, dex.governance) + chainDb, config.BlockProposerEnabled, dex.governance) if err != nil { return nil, err } diff --git a/dex/config.go b/dex/config.go index cd9d1b1db..282de494f 100644 --- a/dex/config.go +++ b/dex/config.go @@ -46,10 +46,11 @@ var DefaultConfig = Config{ Blocks: 20, Percentile: 60, }, - DefaultGasPrice: big.NewInt(params.GWei), - GasFloor: 8000000, - GasCeil: 8000000, - GasLimitTolerance: 1000000, + BlockProposerEnabled: false, + DefaultGasPrice: big.NewInt(params.GWei), + GasFloor: 8000000, + GasCeil: 8000000, + GasLimitTolerance: 1000000, } func init() { @@ -106,6 +107,9 @@ type Config struct { // Gas Price Oracle options GPO gasprice.Config + // BlockProposer options + BlockProposerEnabled bool + // Enables tracking of SHA3 preimages in the VM EnablePreimageRecording bool diff --git a/dex/handler.go b/dex/handler.go index 2f8ed13fa..7bc9c297d 100644 --- a/dex/handler.go +++ b/dex/handler.go @@ -106,12 +106,11 @@ type ProtocolManager struct { SubProtocols []p2p.Protocol - eventMux *event.TypeMux - txsCh chan core.NewTxsEvent - txsSub event.Subscription - metasCh chan newMetasEvent - metasSub event.Subscription - minedBlockSub *event.TypeMuxSubscription + eventMux *event.TypeMux + txsCh chan core.NewTxsEvent + txsSub event.Subscription + metasCh chan newMetasEvent + metasSub event.Subscription // channels for fetcher, syncer, txsyncLoop newPeerCh chan *peer @@ -132,6 +131,9 @@ type ProtocolManager struct { // wait group is used for graceful shutdowns during downloading // and processing wg sync.WaitGroup + + // Dexcon + isBlockProposer bool } // NewProtocolManager returns a new Ethereum sub protocol manager. The Ethereum sub protocol manages peers capable @@ -140,24 +142,25 @@ func NewProtocolManager( config *params.ChainConfig, mode downloader.SyncMode, networkID uint64, mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database, - gov governance) (*ProtocolManager, error) { + isBlockProposer bool, gov governance) (*ProtocolManager, error) { tab := newNodeTable() // Create the protocol manager with the base fields manager := &ProtocolManager{ - networkID: networkID, - eventMux: mux, - txpool: txpool, - nodeTable: tab, - gov: gov, - blockchain: blockchain, - cache: newCache(128), - chainconfig: config, - newPeerCh: make(chan *peer), - noMorePeers: make(chan struct{}), - txsyncCh: make(chan *txsync), - metasyncCh: make(chan *metasync), - quitSync: make(chan struct{}), - receiveCh: make(chan interface{}, 1024), + networkID: networkID, + eventMux: mux, + txpool: txpool, + nodeTable: tab, + gov: gov, + blockchain: blockchain, + cache: newCache(128), + chainconfig: config, + newPeerCh: make(chan *peer), + noMorePeers: make(chan struct{}), + txsyncCh: make(chan *txsync), + metasyncCh: make(chan *metasync), + quitSync: make(chan struct{}), + receiveCh: make(chan interface{}, 1024), + isBlockProposer: isBlockProposer, } // Figure out whether to allow fast sync or not @@ -258,10 +261,6 @@ func (pm *ProtocolManager) Start(srvr p2pServer, maxPeers int) { pm.metasSub = pm.nodeTable.SubscribeNewMetasEvent(pm.metasCh) go pm.metaBroadcastLoop() - // broadcast mined blocks - pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{}) - go pm.minedBroadcastLoop() - // run the peer set loop pm.chainHeadCh = make(chan core.ChainHeadEvent) pm.chainHeadSub = pm.blockchain.SubscribeChainHeadEvent(pm.chainHeadCh) @@ -306,8 +305,7 @@ func (pm *ProtocolManager) makeSelfNodeMeta() *NodeMeta { func (pm *ProtocolManager) Stop() { log.Info("Stopping Ethereum protocol") - pm.txsSub.Unsubscribe() // quits txBroadcastLoop - pm.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop + pm.txsSub.Unsubscribe() // quits txBroadcastLoop pm.chainHeadSub.Unsubscribe() // Quit the sync loop. @@ -650,6 +648,10 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } case msg.Code == NewBlockHashesMsg: + // Ignore new block hash messages in block proposer mode. + if pm.isBlockProposer { + break + } var announces newBlockHashesData if err := msg.Decode(&announces); err != nil { return errResp(ErrDecode, "%v: %v", msg, err) @@ -670,6 +672,10 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } case msg.Code == NewBlockMsg: + // Ignore new block messages in block proposer mode. + if pm.isBlockProposer { + break + } // Retrieve and decode the propagated block var request newBlockData if err := msg.Decode(&request); err != nil { @@ -732,7 +738,13 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { p.MarkNodeMeta(meta.Hash()) } pm.nodeTable.Add(metas) + + // Block proposer-only messages. + case msg.Code == LatticeBlockMsg: + if !pm.isBlockProposer { + break + } var block coreTypes.Block if err := msg.Decode(&block); err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) @@ -740,6 +752,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { pm.cache.addBlock(&block) pm.receiveCh <- &block case msg.Code == VoteMsg: + if !pm.isBlockProposer { + break + } var vote coreTypes.Vote if err := msg.Decode(&vote); err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) @@ -749,6 +764,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } pm.receiveCh <- &vote case msg.Code == AgreementMsg: + if !pm.isBlockProposer { + break + } // DKG set is receiver var agreement coreTypes.AgreementResult if err := msg.Decode(&agreement); err != nil { @@ -756,6 +774,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } pm.receiveCh <- &agreement case msg.Code == RandomnessMsg: + if !pm.isBlockProposer { + break + } // Broadcast this to all peer var randomness coreTypes.BlockRandomnessResult if err := msg.Decode(&randomness); err != nil { @@ -763,6 +784,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } pm.receiveCh <- &randomness case msg.Code == DKGPrivateShareMsg: + if !pm.isBlockProposer { + break + } // Do not relay this msg var ps dkgTypes.PrivateShare if err := msg.Decode(&ps); err != nil { @@ -770,6 +794,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } pm.receiveCh <- &ps case msg.Code == DKGPartialSignatureMsg: + if !pm.isBlockProposer { + break + } // broadcast in DKG set var psig dkgTypes.PartialSignature if err := msg.Decode(&psig); err != nil { @@ -777,6 +804,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } pm.receiveCh <- &psig case msg.Code == PullBlocksMsg: + if !pm.isBlockProposer { + break + } var hashes coreCommon.Hashes if err := msg.Decode(&hashes); err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) @@ -789,6 +819,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } } case msg.Code == PullVotesMsg: + if !pm.isBlockProposer { + break + } var pos coreTypes.Position if err := msg.Decode(&pos); err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) @@ -1003,17 +1036,6 @@ func (pm *ProtocolManager) BroadcastPullVotes( } } -// Mined broadcast loop -func (pm *ProtocolManager) minedBroadcastLoop() { - // automatically stops if unsubscribe - for obj := range pm.minedBlockSub.Chan() { - if ev, ok := obj.Data.(core.NewMinedBlockEvent); ok { - pm.BroadcastBlock(ev.Block, true) // First propagate block to peers - pm.BroadcastBlock(ev.Block, false) // Only then announce to the rest - } - } -} - func (pm *ProtocolManager) txBroadcastLoop() { for { select { @@ -1053,7 +1075,10 @@ func (pm *ProtocolManager) peerSetLoop() { for { select { - case <-pm.chainHeadCh: + case event := <-pm.chainHeadCh: + pm.BroadcastBlock(event.Block, true) // First propagate block to peers + pm.BroadcastBlock(event.Block, false) // Only then announce to the rest + newRound := pm.gov.LenCRS() - 1 log.Trace("new round", "round", newRound) if newRound == round { -- cgit v1.2.3