aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSonic <sonic@dexon.org>2018-12-05 15:17:38 +0800
committerWei-Ning Huang <w@dexon.org>2019-04-09 21:32:54 +0800
commit8a244e8fb95eac41e41ecb07e5652fedb6949d0e (patch)
tree3404cb5da63865d5045d910157e67fa66c717c3d
parent9e5b923db28f0776bd6e34dbe1c9eddef03ffea9 (diff)
downloaddexon-8a244e8fb95eac41e41ecb07e5652fedb6949d0e.tar
dexon-8a244e8fb95eac41e41ecb07e5652fedb6949d0e.tar.gz
dexon-8a244e8fb95eac41e41ecb07e5652fedb6949d0e.tar.bz2
dexon-8a244e8fb95eac41e41ecb07e5652fedb6949d0e.tar.lz
dexon-8a244e8fb95eac41e41ecb07e5652fedb6949d0e.tar.xz
dexon-8a244e8fb95eac41e41ecb07e5652fedb6949d0e.tar.zst
dexon-8a244e8fb95eac41e41ecb07e5652fedb6949d0e.zip
core, dex: polish sync (#75)
- Broadcasting blocks at chain head event is not correct when the full node is not running in block proposer mode. Introduce NewFinalizedBlockEvent, this event is post by the full node which runs in block proposer mode when a block is witnessed and resulting in some blocks are considered finalized. - Non block proposer node will still broadcast blocks at the following moment (same as ethereum): 1. a sync with a peer is terminated successfully 2. a block passes the fetcher's header check during inserting blocks 3. a block is successfully inserted by fetcher - Don't trigger a sync when we are not behind other peers more than acceptable distance. Fetcher is able to cover this.
-rw-r--r--core/events.go3
-rw-r--r--dex/app.go23
-rw-r--r--dex/backend.go3
-rw-r--r--dex/handler.go37
-rw-r--r--dex/helper_test.go11
-rw-r--r--dex/protocol.go5
-rw-r--r--dex/sync.go8
7 files changed, 82 insertions, 8 deletions
diff --git a/core/events.go b/core/events.go
index 1231daa37..e174e8aad 100644
--- a/core/events.go
+++ b/core/events.go
@@ -32,6 +32,9 @@ type PendingLogsEvent struct {
// NewMinedBlockEvent is posted when a block has been imported.
type NewMinedBlockEvent struct{ Block *types.Block }
+// NewFinalizedBlockEvent is posted when a block has been imported.
+type NewFinalizedBlockEvent struct{ Block *types.Block }
+
// RemovedLogsEvent is posted when a reorg happens
type RemovedLogsEvent struct{ Logs []*types.Log }
diff --git a/dex/app.go b/dex/app.go
index 8723420c5..92e1a6aaa 100644
--- a/dex/app.go
+++ b/dex/app.go
@@ -31,6 +31,7 @@ import (
"github.com/dexon-foundation/dexon/core"
"github.com/dexon-foundation/dexon/core/types"
"github.com/dexon-foundation/dexon/ethdb"
+ "github.com/dexon-foundation/dexon/event"
"github.com/dexon-foundation/dexon/log"
"github.com/dexon-foundation/dexon/rlp"
)
@@ -47,6 +48,9 @@ type DexconApp struct {
chainDB ethdb.Database
config *Config
+ finalizedBlockFeed event.Feed
+ scope event.SubscriptionScope
+
chainLocks sync.Map
chainLatestRoot sync.Map
}
@@ -490,6 +494,7 @@ func (d *DexconApp) BlockDelivered(
Randomness: result.Randomness,
}, txs, nil, nil)
+ h := d.blockchain.CurrentBlock().NumberU64() + 1
root, err := d.blockchain.ProcessPendingBlock(newBlock, &block.Witness)
if err != nil {
log.Error("Failed to process pending block", "error", err)
@@ -498,6 +503,15 @@ func (d *DexconApp) BlockDelivered(
d.chainLatestRoot.Store(block.Position.ChainID, root)
d.blockchain.RemoveConfirmedBlock(chainID, blockHash)
+
+ // New blocks are finalized, notify other components.
+ newHeight := d.blockchain.CurrentBlock().NumberU64()
+ for h <= newHeight {
+ b := d.blockchain.GetBlockByNumber(h)
+ go d.finalizedBlockFeed.Send(core.NewFinalizedBlockEvent{b})
+ log.Debug("Send new finalized block event", "number", h)
+ h++
+ }
}
// BlockConfirmed is called when a block is confirmed and added to lattice.
@@ -509,3 +523,12 @@ func (d *DexconApp) BlockConfirmed(block coreTypes.Block) {
panic(err)
}
}
+
+func (d *DexconApp) SubscribeNewFinalizedBlockEvent(
+ ch chan<- core.NewFinalizedBlockEvent) event.Subscription {
+ return d.scope.Track(d.finalizedBlockFeed.Subscribe(ch))
+}
+
+func (d *DexconApp) Stop() {
+ d.scope.Close()
+}
diff --git a/dex/backend.go b/dex/backend.go
index eb9d8f765..4e7def8e4 100644
--- a/dex/backend.go
+++ b/dex/backend.go
@@ -157,7 +157,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Dexon, error) {
pm, err := NewProtocolManager(dex.chainConfig, config.SyncMode,
config.NetworkId, dex.eventMux, dex.txPool, dex.engine, dex.blockchain,
- chainDb, config.BlockProposerEnabled, dex.governance)
+ chainDb, config.BlockProposerEnabled, dex.governance, dex.app)
if err != nil {
return nil, err
}
@@ -245,6 +245,7 @@ func (s *Dexon) Start(srvr *p2p.Server) error {
}
func (s *Dexon) Stop() error {
+ s.app.Stop()
return nil
}
diff --git a/dex/handler.go b/dex/handler.go
index 5753a7cd8..51167c9cb 100644
--- a/dex/handler.go
+++ b/dex/handler.go
@@ -72,6 +72,8 @@ const (
// The number is referenced from the size of tx pool.
txChanSize = 4096
+ finalizedBlockChanSize = 128
+
metaChanSize = 10240
maxPullPeers = 3
@@ -133,6 +135,10 @@ type ProtocolManager struct {
// Dexcon
isBlockProposer bool
+ app dexconApp
+
+ finalizedBlockCh chan core.NewFinalizedBlockEvent
+ finalizedBlockSub event.Subscription
}
// NewProtocolManager returns a new Ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
@@ -141,7 +147,7 @@ func NewProtocolManager(
config *params.ChainConfig, mode downloader.SyncMode, networkID uint64,
mux *event.TypeMux, txpool txPool, engine consensus.Engine,
blockchain *core.BlockChain, chaindb ethdb.Database,
- isBlockProposer bool, gov governance) (*ProtocolManager, error) {
+ isBlockProposer bool, gov governance, app dexconApp) (*ProtocolManager, error) {
tab := newNodeTable()
// Create the protocol manager with the base fields
manager := &ProtocolManager{
@@ -160,6 +166,7 @@ func NewProtocolManager(
quitSync: make(chan struct{}),
receiveCh: make(chan interface{}, 1024),
isBlockProposer: isBlockProposer,
+ app: app,
}
// Figure out whether to allow fast sync or not
@@ -255,6 +262,15 @@ func (pm *ProtocolManager) Start(srvr p2pServer, maxPeers int) {
pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh)
go pm.txBroadcastLoop()
+ if pm.isBlockProposer {
+ // broadcast finalized blocks
+ pm.finalizedBlockCh = make(chan core.NewFinalizedBlockEvent,
+ finalizedBlockChanSize)
+ pm.finalizedBlockSub = pm.app.SubscribeNewFinalizedBlockEvent(
+ pm.finalizedBlockCh)
+ go pm.finalizedBlockBroadcastLoop()
+ }
+
// broadcast node metas
pm.metasCh = make(chan newMetasEvent, metaChanSize)
pm.metasSub = pm.nodeTable.SubscribeNewMetasEvent(pm.metasCh)
@@ -1128,6 +1144,20 @@ func (pm *ProtocolManager) txBroadcastLoop() {
}
}
+func (pm *ProtocolManager) finalizedBlockBroadcastLoop() {
+ for {
+ select {
+ case event := <-pm.finalizedBlockCh:
+ pm.BroadcastBlock(event.Block, true)
+ pm.BroadcastBlock(event.Block, false)
+
+ // Err() channel will be closed when unsubscribing.
+ case <-pm.finalizedBlockSub.Err():
+ return
+ }
+ }
+}
+
func (pm *ProtocolManager) metaBroadcastLoop() {
for {
select {
@@ -1154,10 +1184,7 @@ func (pm *ProtocolManager) peerSetLoop() {
for {
select {
- case event := <-pm.chainHeadCh:
- pm.BroadcastBlock(event.Block, true) // First propagate block to peers
- pm.BroadcastBlock(event.Block, false) // Only then announce to the rest
-
+ case <-pm.chainHeadCh:
if !pm.isBlockProposer {
break
}
diff --git a/dex/helper_test.go b/dex/helper_test.go
index 62dd6c5f4..86a901cc2 100644
--- a/dex/helper_test.go
+++ b/dex/helper_test.go
@@ -98,6 +98,15 @@ func (s *testP2PServer) RemoveGroup(name string) {
delete(s.group, name)
}
+type testApp struct {
+ finalizedBlockFeed event.Feed
+}
+
+func (a *testApp) SubscribeNewFinalizedBlockEvent(
+ ch chan<- core.NewFinalizedBlockEvent) event.Subscription {
+ return a.finalizedBlockFeed.Subscribe(ch)
+}
+
// newTestProtocolManager creates a new protocol manager for testing purposes,
// with the given number of blocks already known, and potential notification
// channels for different events.
@@ -125,7 +134,7 @@ func newTestProtocolManager(mode downloader.SyncMode, blocks int, generator func
notarySetFunc: func(uint64, uint32) (map[string]struct{}, error) { return nil, nil },
}
- pm, err := NewProtocolManager(gspec.Config, mode, DefaultConfig.NetworkId, evmux, &testTxPool{added: newtx}, engine, blockchain, db, true, tgov)
+ pm, err := NewProtocolManager(gspec.Config, mode, DefaultConfig.NetworkId, evmux, &testTxPool{added: newtx}, engine, blockchain, db, true, tgov, &testApp{})
if err != nil {
return nil, nil, err
}
diff --git a/dex/protocol.go b/dex/protocol.go
index 49bd0cc20..b6d672b7f 100644
--- a/dex/protocol.go
+++ b/dex/protocol.go
@@ -153,6 +153,11 @@ type governance interface {
DKGSet(uint64) (map[string]struct{}, error)
}
+type dexconApp interface {
+ SubscribeNewFinalizedBlockEvent(
+ chan<- core.NewFinalizedBlockEvent) event.Subscription
+}
+
type p2pServer interface {
Self() *enode.Node
diff --git a/dex/sync.go b/dex/sync.go
index 43f1291ff..1e35faf21 100644
--- a/dex/sync.go
+++ b/dex/sync.go
@@ -32,6 +32,10 @@ const (
forceSyncCycle = 10 * time.Second // Time interval to force syncs, even if few peers are available
minDesiredPeerCount = 5 // Amount of peers desired to start syncing
+ // The distance between us and peer that we can accept.
+ // This distance is related to numChains and lambdaBA dexcon config.
+ acceptableDist = 16
+
// This is the target size for the packs of transactions sent by txsyncLoop.
// A pack can get larger than this if a single transactions exceeds this size.
txsyncPackSize = 100 * 1024
@@ -263,7 +267,9 @@ func (pm *ProtocolManager) synchronise(peer *peer) {
pHead, pNumber := peer.Head()
- if pNumber <= number {
+ // If we are behind the peer, but not more than acceptable distance, don't
+ // trigger a sync. Fetcher is able to cover this.
+ if pNumber <= number+acceptableDist {
return
}
// Otherwise try to sync with the downloader