diff options
-rw-r--r-- | core/events.go | 3 | ||||
-rw-r--r-- | dex/app.go | 23 | ||||
-rw-r--r-- | dex/backend.go | 3 | ||||
-rw-r--r-- | dex/handler.go | 37 | ||||
-rw-r--r-- | dex/helper_test.go | 11 | ||||
-rw-r--r-- | dex/protocol.go | 5 | ||||
-rw-r--r-- | dex/sync.go | 8 |
7 files changed, 82 insertions, 8 deletions
diff --git a/core/events.go b/core/events.go index 1231daa37..e174e8aad 100644 --- a/core/events.go +++ b/core/events.go @@ -32,6 +32,9 @@ type PendingLogsEvent struct { // NewMinedBlockEvent is posted when a block has been imported. type NewMinedBlockEvent struct{ Block *types.Block } +// NewFinalizedBlockEvent is posted when a block has been imported. +type NewFinalizedBlockEvent struct{ Block *types.Block } + // RemovedLogsEvent is posted when a reorg happens type RemovedLogsEvent struct{ Logs []*types.Log } diff --git a/dex/app.go b/dex/app.go index 8723420c5..92e1a6aaa 100644 --- a/dex/app.go +++ b/dex/app.go @@ -31,6 +31,7 @@ import ( "github.com/dexon-foundation/dexon/core" "github.com/dexon-foundation/dexon/core/types" "github.com/dexon-foundation/dexon/ethdb" + "github.com/dexon-foundation/dexon/event" "github.com/dexon-foundation/dexon/log" "github.com/dexon-foundation/dexon/rlp" ) @@ -47,6 +48,9 @@ type DexconApp struct { chainDB ethdb.Database config *Config + finalizedBlockFeed event.Feed + scope event.SubscriptionScope + chainLocks sync.Map chainLatestRoot sync.Map } @@ -490,6 +494,7 @@ func (d *DexconApp) BlockDelivered( Randomness: result.Randomness, }, txs, nil, nil) + h := d.blockchain.CurrentBlock().NumberU64() + 1 root, err := d.blockchain.ProcessPendingBlock(newBlock, &block.Witness) if err != nil { log.Error("Failed to process pending block", "error", err) @@ -498,6 +503,15 @@ func (d *DexconApp) BlockDelivered( d.chainLatestRoot.Store(block.Position.ChainID, root) d.blockchain.RemoveConfirmedBlock(chainID, blockHash) + + // New blocks are finalized, notify other components. + newHeight := d.blockchain.CurrentBlock().NumberU64() + for h <= newHeight { + b := d.blockchain.GetBlockByNumber(h) + go d.finalizedBlockFeed.Send(core.NewFinalizedBlockEvent{b}) + log.Debug("Send new finalized block event", "number", h) + h++ + } } // BlockConfirmed is called when a block is confirmed and added to lattice. @@ -509,3 +523,12 @@ func (d *DexconApp) BlockConfirmed(block coreTypes.Block) { panic(err) } } + +func (d *DexconApp) SubscribeNewFinalizedBlockEvent( + ch chan<- core.NewFinalizedBlockEvent) event.Subscription { + return d.scope.Track(d.finalizedBlockFeed.Subscribe(ch)) +} + +func (d *DexconApp) Stop() { + d.scope.Close() +} diff --git a/dex/backend.go b/dex/backend.go index eb9d8f765..4e7def8e4 100644 --- a/dex/backend.go +++ b/dex/backend.go @@ -157,7 +157,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Dexon, error) { pm, err := NewProtocolManager(dex.chainConfig, config.SyncMode, config.NetworkId, dex.eventMux, dex.txPool, dex.engine, dex.blockchain, - chainDb, config.BlockProposerEnabled, dex.governance) + chainDb, config.BlockProposerEnabled, dex.governance, dex.app) if err != nil { return nil, err } @@ -245,6 +245,7 @@ func (s *Dexon) Start(srvr *p2p.Server) error { } func (s *Dexon) Stop() error { + s.app.Stop() return nil } diff --git a/dex/handler.go b/dex/handler.go index 5753a7cd8..51167c9cb 100644 --- a/dex/handler.go +++ b/dex/handler.go @@ -72,6 +72,8 @@ const ( // The number is referenced from the size of tx pool. txChanSize = 4096 + finalizedBlockChanSize = 128 + metaChanSize = 10240 maxPullPeers = 3 @@ -133,6 +135,10 @@ type ProtocolManager struct { // Dexcon isBlockProposer bool + app dexconApp + + finalizedBlockCh chan core.NewFinalizedBlockEvent + finalizedBlockSub event.Subscription } // NewProtocolManager returns a new Ethereum sub protocol manager. The Ethereum sub protocol manages peers capable @@ -141,7 +147,7 @@ func NewProtocolManager( config *params.ChainConfig, mode downloader.SyncMode, networkID uint64, mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database, - isBlockProposer bool, gov governance) (*ProtocolManager, error) { + isBlockProposer bool, gov governance, app dexconApp) (*ProtocolManager, error) { tab := newNodeTable() // Create the protocol manager with the base fields manager := &ProtocolManager{ @@ -160,6 +166,7 @@ func NewProtocolManager( quitSync: make(chan struct{}), receiveCh: make(chan interface{}, 1024), isBlockProposer: isBlockProposer, + app: app, } // Figure out whether to allow fast sync or not @@ -255,6 +262,15 @@ func (pm *ProtocolManager) Start(srvr p2pServer, maxPeers int) { pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh) go pm.txBroadcastLoop() + if pm.isBlockProposer { + // broadcast finalized blocks + pm.finalizedBlockCh = make(chan core.NewFinalizedBlockEvent, + finalizedBlockChanSize) + pm.finalizedBlockSub = pm.app.SubscribeNewFinalizedBlockEvent( + pm.finalizedBlockCh) + go pm.finalizedBlockBroadcastLoop() + } + // broadcast node metas pm.metasCh = make(chan newMetasEvent, metaChanSize) pm.metasSub = pm.nodeTable.SubscribeNewMetasEvent(pm.metasCh) @@ -1128,6 +1144,20 @@ func (pm *ProtocolManager) txBroadcastLoop() { } } +func (pm *ProtocolManager) finalizedBlockBroadcastLoop() { + for { + select { + case event := <-pm.finalizedBlockCh: + pm.BroadcastBlock(event.Block, true) + pm.BroadcastBlock(event.Block, false) + + // Err() channel will be closed when unsubscribing. + case <-pm.finalizedBlockSub.Err(): + return + } + } +} + func (pm *ProtocolManager) metaBroadcastLoop() { for { select { @@ -1154,10 +1184,7 @@ func (pm *ProtocolManager) peerSetLoop() { for { select { - case event := <-pm.chainHeadCh: - pm.BroadcastBlock(event.Block, true) // First propagate block to peers - pm.BroadcastBlock(event.Block, false) // Only then announce to the rest - + case <-pm.chainHeadCh: if !pm.isBlockProposer { break } diff --git a/dex/helper_test.go b/dex/helper_test.go index 62dd6c5f4..86a901cc2 100644 --- a/dex/helper_test.go +++ b/dex/helper_test.go @@ -98,6 +98,15 @@ func (s *testP2PServer) RemoveGroup(name string) { delete(s.group, name) } +type testApp struct { + finalizedBlockFeed event.Feed +} + +func (a *testApp) SubscribeNewFinalizedBlockEvent( + ch chan<- core.NewFinalizedBlockEvent) event.Subscription { + return a.finalizedBlockFeed.Subscribe(ch) +} + // newTestProtocolManager creates a new protocol manager for testing purposes, // with the given number of blocks already known, and potential notification // channels for different events. @@ -125,7 +134,7 @@ func newTestProtocolManager(mode downloader.SyncMode, blocks int, generator func notarySetFunc: func(uint64, uint32) (map[string]struct{}, error) { return nil, nil }, } - pm, err := NewProtocolManager(gspec.Config, mode, DefaultConfig.NetworkId, evmux, &testTxPool{added: newtx}, engine, blockchain, db, true, tgov) + pm, err := NewProtocolManager(gspec.Config, mode, DefaultConfig.NetworkId, evmux, &testTxPool{added: newtx}, engine, blockchain, db, true, tgov, &testApp{}) if err != nil { return nil, nil, err } diff --git a/dex/protocol.go b/dex/protocol.go index 49bd0cc20..b6d672b7f 100644 --- a/dex/protocol.go +++ b/dex/protocol.go @@ -153,6 +153,11 @@ type governance interface { DKGSet(uint64) (map[string]struct{}, error) } +type dexconApp interface { + SubscribeNewFinalizedBlockEvent( + chan<- core.NewFinalizedBlockEvent) event.Subscription +} + type p2pServer interface { Self() *enode.Node diff --git a/dex/sync.go b/dex/sync.go index 43f1291ff..1e35faf21 100644 --- a/dex/sync.go +++ b/dex/sync.go @@ -32,6 +32,10 @@ const ( forceSyncCycle = 10 * time.Second // Time interval to force syncs, even if few peers are available minDesiredPeerCount = 5 // Amount of peers desired to start syncing + // The distance between us and peer that we can accept. + // This distance is related to numChains and lambdaBA dexcon config. + acceptableDist = 16 + // This is the target size for the packs of transactions sent by txsyncLoop. // A pack can get larger than this if a single transactions exceeds this size. txsyncPackSize = 100 * 1024 @@ -263,7 +267,9 @@ func (pm *ProtocolManager) synchronise(peer *peer) { pHead, pNumber := peer.Head() - if pNumber <= number { + // If we are behind the peer, but not more than acceptable distance, don't + // trigger a sync. Fetcher is able to cover this. + if pNumber <= number+acceptableDist { return } // Otherwise try to sync with the downloader |