From c3d28a2f4d12c98b41d3a874a040489ab0a91ba5 Mon Sep 17 00:00:00 2001 From: Sonic Date: Wed, 5 Dec 2018 15:17:38 +0800 Subject: core, dex: polish sync (#75) - Broadcasting blocks at chain head event is not correct when the full node is not running in block proposer mode. Introduce NewFinalizedBlockEvent, this event is post by the full node which runs in block proposer mode when a block is witnessed and resulting in some blocks are considered finalized. - Non block proposer node will still broadcast blocks at the following moment (same as ethereum): 1. a sync with a peer is terminated successfully 2. a block passes the fetcher's header check during inserting blocks 3. a block is successfully inserted by fetcher - Don't trigger a sync when we are not behind other peers more than acceptable distance. Fetcher is able to cover this. --- dex/app.go | 23 +++++++++++++++++++++++ dex/backend.go | 3 ++- dex/handler.go | 37 ++++++++++++++++++++++++++++++++----- dex/helper_test.go | 11 ++++++++++- dex/protocol.go | 5 +++++ dex/sync.go | 8 +++++++- 6 files changed, 79 insertions(+), 8 deletions(-) (limited to 'dex') diff --git a/dex/app.go b/dex/app.go index 8723420c5..92e1a6aaa 100644 --- a/dex/app.go +++ b/dex/app.go @@ -31,6 +31,7 @@ import ( "github.com/dexon-foundation/dexon/core" "github.com/dexon-foundation/dexon/core/types" "github.com/dexon-foundation/dexon/ethdb" + "github.com/dexon-foundation/dexon/event" "github.com/dexon-foundation/dexon/log" "github.com/dexon-foundation/dexon/rlp" ) @@ -47,6 +48,9 @@ type DexconApp struct { chainDB ethdb.Database config *Config + finalizedBlockFeed event.Feed + scope event.SubscriptionScope + chainLocks sync.Map chainLatestRoot sync.Map } @@ -490,6 +494,7 @@ func (d *DexconApp) BlockDelivered( Randomness: result.Randomness, }, txs, nil, nil) + h := d.blockchain.CurrentBlock().NumberU64() + 1 root, err := d.blockchain.ProcessPendingBlock(newBlock, &block.Witness) if err != nil { log.Error("Failed to process pending block", "error", err) @@ -498,6 +503,15 @@ func (d *DexconApp) BlockDelivered( d.chainLatestRoot.Store(block.Position.ChainID, root) d.blockchain.RemoveConfirmedBlock(chainID, blockHash) + + // New blocks are finalized, notify other components. + newHeight := d.blockchain.CurrentBlock().NumberU64() + for h <= newHeight { + b := d.blockchain.GetBlockByNumber(h) + go d.finalizedBlockFeed.Send(core.NewFinalizedBlockEvent{b}) + log.Debug("Send new finalized block event", "number", h) + h++ + } } // BlockConfirmed is called when a block is confirmed and added to lattice. @@ -509,3 +523,12 @@ func (d *DexconApp) BlockConfirmed(block coreTypes.Block) { panic(err) } } + +func (d *DexconApp) SubscribeNewFinalizedBlockEvent( + ch chan<- core.NewFinalizedBlockEvent) event.Subscription { + return d.scope.Track(d.finalizedBlockFeed.Subscribe(ch)) +} + +func (d *DexconApp) Stop() { + d.scope.Close() +} diff --git a/dex/backend.go b/dex/backend.go index eb9d8f765..4e7def8e4 100644 --- a/dex/backend.go +++ b/dex/backend.go @@ -157,7 +157,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Dexon, error) { pm, err := NewProtocolManager(dex.chainConfig, config.SyncMode, config.NetworkId, dex.eventMux, dex.txPool, dex.engine, dex.blockchain, - chainDb, config.BlockProposerEnabled, dex.governance) + chainDb, config.BlockProposerEnabled, dex.governance, dex.app) if err != nil { return nil, err } @@ -245,6 +245,7 @@ func (s *Dexon) Start(srvr *p2p.Server) error { } func (s *Dexon) Stop() error { + s.app.Stop() return nil } diff --git a/dex/handler.go b/dex/handler.go index 5753a7cd8..51167c9cb 100644 --- a/dex/handler.go +++ b/dex/handler.go @@ -72,6 +72,8 @@ const ( // The number is referenced from the size of tx pool. txChanSize = 4096 + finalizedBlockChanSize = 128 + metaChanSize = 10240 maxPullPeers = 3 @@ -133,6 +135,10 @@ type ProtocolManager struct { // Dexcon isBlockProposer bool + app dexconApp + + finalizedBlockCh chan core.NewFinalizedBlockEvent + finalizedBlockSub event.Subscription } // NewProtocolManager returns a new Ethereum sub protocol manager. The Ethereum sub protocol manages peers capable @@ -141,7 +147,7 @@ func NewProtocolManager( config *params.ChainConfig, mode downloader.SyncMode, networkID uint64, mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database, - isBlockProposer bool, gov governance) (*ProtocolManager, error) { + isBlockProposer bool, gov governance, app dexconApp) (*ProtocolManager, error) { tab := newNodeTable() // Create the protocol manager with the base fields manager := &ProtocolManager{ @@ -160,6 +166,7 @@ func NewProtocolManager( quitSync: make(chan struct{}), receiveCh: make(chan interface{}, 1024), isBlockProposer: isBlockProposer, + app: app, } // Figure out whether to allow fast sync or not @@ -255,6 +262,15 @@ func (pm *ProtocolManager) Start(srvr p2pServer, maxPeers int) { pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh) go pm.txBroadcastLoop() + if pm.isBlockProposer { + // broadcast finalized blocks + pm.finalizedBlockCh = make(chan core.NewFinalizedBlockEvent, + finalizedBlockChanSize) + pm.finalizedBlockSub = pm.app.SubscribeNewFinalizedBlockEvent( + pm.finalizedBlockCh) + go pm.finalizedBlockBroadcastLoop() + } + // broadcast node metas pm.metasCh = make(chan newMetasEvent, metaChanSize) pm.metasSub = pm.nodeTable.SubscribeNewMetasEvent(pm.metasCh) @@ -1128,6 +1144,20 @@ func (pm *ProtocolManager) txBroadcastLoop() { } } +func (pm *ProtocolManager) finalizedBlockBroadcastLoop() { + for { + select { + case event := <-pm.finalizedBlockCh: + pm.BroadcastBlock(event.Block, true) + pm.BroadcastBlock(event.Block, false) + + // Err() channel will be closed when unsubscribing. + case <-pm.finalizedBlockSub.Err(): + return + } + } +} + func (pm *ProtocolManager) metaBroadcastLoop() { for { select { @@ -1154,10 +1184,7 @@ func (pm *ProtocolManager) peerSetLoop() { for { select { - case event := <-pm.chainHeadCh: - pm.BroadcastBlock(event.Block, true) // First propagate block to peers - pm.BroadcastBlock(event.Block, false) // Only then announce to the rest - + case <-pm.chainHeadCh: if !pm.isBlockProposer { break } diff --git a/dex/helper_test.go b/dex/helper_test.go index 62dd6c5f4..86a901cc2 100644 --- a/dex/helper_test.go +++ b/dex/helper_test.go @@ -98,6 +98,15 @@ func (s *testP2PServer) RemoveGroup(name string) { delete(s.group, name) } +type testApp struct { + finalizedBlockFeed event.Feed +} + +func (a *testApp) SubscribeNewFinalizedBlockEvent( + ch chan<- core.NewFinalizedBlockEvent) event.Subscription { + return a.finalizedBlockFeed.Subscribe(ch) +} + // newTestProtocolManager creates a new protocol manager for testing purposes, // with the given number of blocks already known, and potential notification // channels for different events. @@ -125,7 +134,7 @@ func newTestProtocolManager(mode downloader.SyncMode, blocks int, generator func notarySetFunc: func(uint64, uint32) (map[string]struct{}, error) { return nil, nil }, } - pm, err := NewProtocolManager(gspec.Config, mode, DefaultConfig.NetworkId, evmux, &testTxPool{added: newtx}, engine, blockchain, db, true, tgov) + pm, err := NewProtocolManager(gspec.Config, mode, DefaultConfig.NetworkId, evmux, &testTxPool{added: newtx}, engine, blockchain, db, true, tgov, &testApp{}) if err != nil { return nil, nil, err } diff --git a/dex/protocol.go b/dex/protocol.go index 49bd0cc20..b6d672b7f 100644 --- a/dex/protocol.go +++ b/dex/protocol.go @@ -153,6 +153,11 @@ type governance interface { DKGSet(uint64) (map[string]struct{}, error) } +type dexconApp interface { + SubscribeNewFinalizedBlockEvent( + chan<- core.NewFinalizedBlockEvent) event.Subscription +} + type p2pServer interface { Self() *enode.Node diff --git a/dex/sync.go b/dex/sync.go index 43f1291ff..1e35faf21 100644 --- a/dex/sync.go +++ b/dex/sync.go @@ -32,6 +32,10 @@ const ( forceSyncCycle = 10 * time.Second // Time interval to force syncs, even if few peers are available minDesiredPeerCount = 5 // Amount of peers desired to start syncing + // The distance between us and peer that we can accept. + // This distance is related to numChains and lambdaBA dexcon config. + acceptableDist = 16 + // This is the target size for the packs of transactions sent by txsyncLoop. // A pack can get larger than this if a single transactions exceeds this size. txsyncPackSize = 100 * 1024 @@ -263,7 +267,9 @@ func (pm *ProtocolManager) synchronise(peer *peer) { pHead, pNumber := peer.Head() - if pNumber <= number { + // If we are behind the peer, but not more than acceptable distance, don't + // trigger a sync. Fetcher is able to cover this. + if pNumber <= number+acceptableDist { return } // Otherwise try to sync with the downloader -- cgit v1.2.3