aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSonic <sonic@dexon.org>2018-12-05 15:17:38 +0800
committerWei-Ning Huang <w@dexon.org>2018-12-19 20:54:27 +0800
commit5e11c9fb4edda091ce93bf23d9108d8f670f55fc (patch)
tree3ee81ec716d2a9a2efe031d94d955a7b38dbbf0b
parent861110ade27c4fa06703f2ac0990b3e688ac4277 (diff)
downloaddexon-5e11c9fb4edda091ce93bf23d9108d8f670f55fc.tar
dexon-5e11c9fb4edda091ce93bf23d9108d8f670f55fc.tar.gz
dexon-5e11c9fb4edda091ce93bf23d9108d8f670f55fc.tar.bz2
dexon-5e11c9fb4edda091ce93bf23d9108d8f670f55fc.tar.lz
dexon-5e11c9fb4edda091ce93bf23d9108d8f670f55fc.tar.xz
dexon-5e11c9fb4edda091ce93bf23d9108d8f670f55fc.tar.zst
dexon-5e11c9fb4edda091ce93bf23d9108d8f670f55fc.zip
core, dex: polish sync (#75)
- Broadcasting blocks at chain head event is not correct when the full node is not running in block proposer mode. Introduce NewFinalizedBlockEvent, this event is post by the full node which runs in block proposer mode when a block is witnessed and resulting in some blocks are considered finalized. - Non block proposer node will still broadcast blocks at the following moment (same as ethereum): 1. a sync with a peer is terminated successfully 2. a block passes the fetcher's header check during inserting blocks 3. a block is successfully inserted by fetcher - Don't trigger a sync when we are not behind other peers more than acceptable distance. Fetcher is able to cover this.
-rw-r--r--core/events.go3
-rw-r--r--dex/app.go23
-rw-r--r--dex/backend.go3
-rw-r--r--dex/handler.go37
-rw-r--r--dex/helper_test.go11
-rw-r--r--dex/protocol.go5
-rw-r--r--dex/sync.go8
7 files changed, 82 insertions, 8 deletions
diff --git a/core/events.go b/core/events.go
index 1231daa37..e174e8aad 100644
--- a/core/events.go
+++ b/core/events.go
@@ -32,6 +32,9 @@ type PendingLogsEvent struct {
// NewMinedBlockEvent is posted when a block has been imported.
type NewMinedBlockEvent struct{ Block *types.Block }
+// NewFinalizedBlockEvent is posted when a block has been imported.
+type NewFinalizedBlockEvent struct{ Block *types.Block }
+
// RemovedLogsEvent is posted when a reorg happens
type RemovedLogsEvent struct{ Logs []*types.Log }
diff --git a/dex/app.go b/dex/app.go
index 8723420c5..92e1a6aaa 100644
--- a/dex/app.go
+++ b/dex/app.go
@@ -31,6 +31,7 @@ import (
"github.com/dexon-foundation/dexon/core"
"github.com/dexon-foundation/dexon/core/types"
"github.com/dexon-foundation/dexon/ethdb"
+ "github.com/dexon-foundation/dexon/event"
"github.com/dexon-foundation/dexon/log"
"github.com/dexon-foundation/dexon/rlp"
)
@@ -47,6 +48,9 @@ type DexconApp struct {
chainDB ethdb.Database
config *Config
+ finalizedBlockFeed event.Feed
+ scope event.SubscriptionScope
+
chainLocks sync.Map
chainLatestRoot sync.Map
}
@@ -490,6 +494,7 @@ func (d *DexconApp) BlockDelivered(
Randomness: result.Randomness,
}, txs, nil, nil)
+ h := d.blockchain.CurrentBlock().NumberU64() + 1
root, err := d.blockchain.ProcessPendingBlock(newBlock, &block.Witness)
if err != nil {
log.Error("Failed to process pending block", "error", err)
@@ -498,6 +503,15 @@ func (d *DexconApp) BlockDelivered(
d.chainLatestRoot.Store(block.Position.ChainID, root)
d.blockchain.RemoveConfirmedBlock(chainID, blockHash)
+
+ // New blocks are finalized, notify other components.
+ newHeight := d.blockchain.CurrentBlock().NumberU64()
+ for h <= newHeight {
+ b := d.blockchain.GetBlockByNumber(h)
+ go d.finalizedBlockFeed.Send(core.NewFinalizedBlockEvent{b})
+ log.Debug("Send new finalized block event", "number", h)
+ h++
+ }
}
// BlockConfirmed is called when a block is confirmed and added to lattice.
@@ -509,3 +523,12 @@ func (d *DexconApp) BlockConfirmed(block coreTypes.Block) {
panic(err)
}
}
+
+func (d *DexconApp) SubscribeNewFinalizedBlockEvent(
+ ch chan<- core.NewFinalizedBlockEvent) event.Subscription {
+ return d.scope.Track(d.finalizedBlockFeed.Subscribe(ch))
+}
+
+func (d *DexconApp) Stop() {
+ d.scope.Close()
+}
diff --git a/dex/backend.go b/dex/backend.go
index 17bf542a2..914ba6a6c 100644
--- a/dex/backend.go
+++ b/dex/backend.go
@@ -157,7 +157,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Dexon, error) {
pm, err := NewProtocolManager(dex.chainConfig, config.SyncMode,
config.NetworkId, dex.eventMux, dex.txPool, dex.engine, dex.blockchain,
- chainDb, config.BlockProposerEnabled, dex.governance)
+ chainDb, config.BlockProposerEnabled, dex.governance, dex.app)
if err != nil {
return nil, err
}
@@ -245,6 +245,7 @@ func (s *Dexon) Start(srvr *p2p.Server) error {
}
func (s *Dexon) Stop() error {
+ s.app.Stop()
return nil
}
diff --git a/dex/handler.go b/dex/handler.go
index d45afa628..00f2b5040 100644
--- a/dex/handler.go
+++ b/dex/handler.go
@@ -72,6 +72,8 @@ const (
// The number is referenced from the size of tx pool.
txChanSize = 4096
+ finalizedBlockChanSize = 128
+
metaChanSize = 10240
maxPullPeers = 3
@@ -133,6 +135,10 @@ type ProtocolManager struct {
// Dexcon
isBlockProposer bool
+ app dexconApp
+
+ finalizedBlockCh chan core.NewFinalizedBlockEvent
+ finalizedBlockSub event.Subscription
}
// NewProtocolManager returns a new Ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
@@ -141,7 +147,7 @@ func NewProtocolManager(
config *params.ChainConfig, mode downloader.SyncMode, networkID uint64,
mux *event.TypeMux, txpool txPool, engine consensus.Engine,
blockchain *core.BlockChain, chaindb ethdb.Database,
- isBlockProposer bool, gov governance) (*ProtocolManager, error) {
+ isBlockProposer bool, gov governance, app dexconApp) (*ProtocolManager, error) {
tab := newNodeTable()
// Create the protocol manager with the base fields
manager := &ProtocolManager{
@@ -160,6 +166,7 @@ func NewProtocolManager(
quitSync: make(chan struct{}),
receiveCh: make(chan interface{}, 1024),
isBlockProposer: isBlockProposer,
+ app: app,
}
// TODO(w): start accepting TXs only when we are synced.
@@ -260,6 +267,15 @@ func (pm *ProtocolManager) Start(srvr p2pServer, maxPeers int) {
pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh)
go pm.txBroadcastLoop()
+ if pm.isBlockProposer {
+ // broadcast finalized blocks
+ pm.finalizedBlockCh = make(chan core.NewFinalizedBlockEvent,
+ finalizedBlockChanSize)
+ pm.finalizedBlockSub = pm.app.SubscribeNewFinalizedBlockEvent(
+ pm.finalizedBlockCh)
+ go pm.finalizedBlockBroadcastLoop()
+ }
+
// broadcast node metas
pm.metasCh = make(chan newMetasEvent, metaChanSize)
pm.metasSub = pm.nodeTable.SubscribeNewMetasEvent(pm.metasCh)
@@ -1133,6 +1149,20 @@ func (pm *ProtocolManager) txBroadcastLoop() {
}
}
+func (pm *ProtocolManager) finalizedBlockBroadcastLoop() {
+ for {
+ select {
+ case event := <-pm.finalizedBlockCh:
+ pm.BroadcastBlock(event.Block, true)
+ pm.BroadcastBlock(event.Block, false)
+
+ // Err() channel will be closed when unsubscribing.
+ case <-pm.finalizedBlockSub.Err():
+ return
+ }
+ }
+}
+
func (pm *ProtocolManager) metaBroadcastLoop() {
for {
select {
@@ -1159,10 +1189,7 @@ func (pm *ProtocolManager) peerSetLoop() {
for {
select {
- case event := <-pm.chainHeadCh:
- pm.BroadcastBlock(event.Block, true) // First propagate block to peers
- pm.BroadcastBlock(event.Block, false) // Only then announce to the rest
-
+ case <-pm.chainHeadCh:
if !pm.isBlockProposer {
break
}
diff --git a/dex/helper_test.go b/dex/helper_test.go
index 62dd6c5f4..86a901cc2 100644
--- a/dex/helper_test.go
+++ b/dex/helper_test.go
@@ -98,6 +98,15 @@ func (s *testP2PServer) RemoveGroup(name string) {
delete(s.group, name)
}
+type testApp struct {
+ finalizedBlockFeed event.Feed
+}
+
+func (a *testApp) SubscribeNewFinalizedBlockEvent(
+ ch chan<- core.NewFinalizedBlockEvent) event.Subscription {
+ return a.finalizedBlockFeed.Subscribe(ch)
+}
+
// newTestProtocolManager creates a new protocol manager for testing purposes,
// with the given number of blocks already known, and potential notification
// channels for different events.
@@ -125,7 +134,7 @@ func newTestProtocolManager(mode downloader.SyncMode, blocks int, generator func
notarySetFunc: func(uint64, uint32) (map[string]struct{}, error) { return nil, nil },
}
- pm, err := NewProtocolManager(gspec.Config, mode, DefaultConfig.NetworkId, evmux, &testTxPool{added: newtx}, engine, blockchain, db, true, tgov)
+ pm, err := NewProtocolManager(gspec.Config, mode, DefaultConfig.NetworkId, evmux, &testTxPool{added: newtx}, engine, blockchain, db, true, tgov, &testApp{})
if err != nil {
return nil, nil, err
}
diff --git a/dex/protocol.go b/dex/protocol.go
index aec434b87..c63dd78ca 100644
--- a/dex/protocol.go
+++ b/dex/protocol.go
@@ -153,6 +153,11 @@ type governance interface {
DKGSet(uint64) (map[string]struct{}, error)
}
+type dexconApp interface {
+ SubscribeNewFinalizedBlockEvent(
+ chan<- core.NewFinalizedBlockEvent) event.Subscription
+}
+
type p2pServer interface {
Self() *enode.Node
diff --git a/dex/sync.go b/dex/sync.go
index 43f1291ff..1e35faf21 100644
--- a/dex/sync.go
+++ b/dex/sync.go
@@ -32,6 +32,10 @@ const (
forceSyncCycle = 10 * time.Second // Time interval to force syncs, even if few peers are available
minDesiredPeerCount = 5 // Amount of peers desired to start syncing
+ // The distance between us and peer that we can accept.
+ // This distance is related to numChains and lambdaBA dexcon config.
+ acceptableDist = 16
+
// This is the target size for the packs of transactions sent by txsyncLoop.
// A pack can get larger than this if a single transactions exceeds this size.
txsyncPackSize = 100 * 1024
@@ -263,7 +267,9 @@ func (pm *ProtocolManager) synchronise(peer *peer) {
pHead, pNumber := peer.Head()
- if pNumber <= number {
+ // If we are behind the peer, but not more than acceptable distance, don't
+ // trigger a sync. Fetcher is able to cover this.
+ if pNumber <= number+acceptableDist {
return
}
// Otherwise try to sync with the downloader