aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--core/events.go3
-rw-r--r--dex/app.go23
-rw-r--r--dex/backend.go3
-rw-r--r--dex/handler.go37
-rw-r--r--dex/helper_test.go11
-rw-r--r--dex/protocol.go5
-rw-r--r--dex/sync.go8
7 files changed, 82 insertions, 8 deletions
diff --git a/core/events.go b/core/events.go
index 1231daa37..e174e8aad 100644
--- a/core/events.go
+++ b/core/events.go
@@ -32,6 +32,9 @@ type PendingLogsEvent struct {
// NewMinedBlockEvent is posted when a block has been imported.
type NewMinedBlockEvent struct{ Block *types.Block }
+// NewFinalizedBlockEvent is posted when a block has been imported.
+type NewFinalizedBlockEvent struct{ Block *types.Block }
+
// RemovedLogsEvent is posted when a reorg happens
type RemovedLogsEvent struct{ Logs []*types.Log }
diff --git a/dex/app.go b/dex/app.go
index 8723420c5..92e1a6aaa 100644
--- a/dex/app.go
+++ b/dex/app.go
@@ -31,6 +31,7 @@ import (
"github.com/dexon-foundation/dexon/core"
"github.com/dexon-foundation/dexon/core/types"
"github.com/dexon-foundation/dexon/ethdb"
+ "github.com/dexon-foundation/dexon/event"
"github.com/dexon-foundation/dexon/log"
"github.com/dexon-foundation/dexon/rlp"
)
@@ -47,6 +48,9 @@ type DexconApp struct {
chainDB ethdb.Database
config *Config
+ finalizedBlockFeed event.Feed
+ scope event.SubscriptionScope
+
chainLocks sync.Map
chainLatestRoot sync.Map
}
@@ -490,6 +494,7 @@ func (d *DexconApp) BlockDelivered(
Randomness: result.Randomness,
}, txs, nil, nil)
+ h := d.blockchain.CurrentBlock().NumberU64() + 1
root, err := d.blockchain.ProcessPendingBlock(newBlock, &block.Witness)
if err != nil {
log.Error("Failed to process pending block", "error", err)
@@ -498,6 +503,15 @@ func (d *DexconApp) BlockDelivered(
d.chainLatestRoot.Store(block.Position.ChainID, root)
d.blockchain.RemoveConfirmedBlock(chainID, blockHash)
+
+ // New blocks are finalized, notify other components.
+ newHeight := d.blockchain.CurrentBlock().NumberU64()
+ for h <= newHeight {
+ b := d.blockchain.GetBlockByNumber(h)
+ go d.finalizedBlockFeed.Send(core.NewFinalizedBlockEvent{b})
+ log.Debug("Send new finalized block event", "number", h)
+ h++
+ }
}
// BlockConfirmed is called when a block is confirmed and added to lattice.
@@ -509,3 +523,12 @@ func (d *DexconApp) BlockConfirmed(block coreTypes.Block) {
panic(err)
}
}
+
+func (d *DexconApp) SubscribeNewFinalizedBlockEvent(
+ ch chan<- core.NewFinalizedBlockEvent) event.Subscription {
+ return d.scope.Track(d.finalizedBlockFeed.Subscribe(ch))
+}
+
+func (d *DexconApp) Stop() {
+ d.scope.Close()
+}
diff --git a/dex/backend.go b/dex/backend.go
index eb9d8f765..4e7def8e4 100644
--- a/dex/backend.go
+++ b/dex/backend.go
@@ -157,7 +157,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Dexon, error) {
pm, err := NewProtocolManager(dex.chainConfig, config.SyncMode,
config.NetworkId, dex.eventMux, dex.txPool, dex.engine, dex.blockchain,
- chainDb, config.BlockProposerEnabled, dex.governance)
+ chainDb, config.BlockProposerEnabled, dex.governance, dex.app)
if err != nil {
return nil, err
}
@@ -245,6 +245,7 @@ func (s *Dexon) Start(srvr *p2p.Server) error {
}
func (s *Dexon) Stop() error {
+ s.app.Stop()
return nil
}
diff --git a/dex/handler.go b/dex/handler.go
index 5753a7cd8..51167c9cb 100644
--- a/dex/handler.go
+++ b/dex/handler.go
@@ -72,6 +72,8 @@ const (
// The number is referenced from the size of tx pool.
txChanSize = 4096
+ finalizedBlockChanSize = 128
+
metaChanSize = 10240
maxPullPeers = 3
@@ -133,6 +135,10 @@ type ProtocolManager struct {
// Dexcon
isBlockProposer bool
+ app dexconApp
+
+ finalizedBlockCh chan core.NewFinalizedBlockEvent
+ finalizedBlockSub event.Subscription
}
// NewProtocolManager returns a new Ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
@@ -141,7 +147,7 @@ func NewProtocolManager(
config *params.ChainConfig, mode downloader.SyncMode, networkID uint64,
mux *event.TypeMux, txpool txPool, engine consensus.Engine,
blockchain *core.BlockChain, chaindb ethdb.Database,
- isBlockProposer bool, gov governance) (*ProtocolManager, error) {
+ isBlockProposer bool, gov governance, app dexconApp) (*ProtocolManager, error) {
tab := newNodeTable()
// Create the protocol manager with the base fields
manager := &ProtocolManager{
@@ -160,6 +166,7 @@ func NewProtocolManager(
quitSync: make(chan struct{}),
receiveCh: make(chan interface{}, 1024),
isBlockProposer: isBlockProposer,
+ app: app,
}
// Figure out whether to allow fast sync or not
@@ -255,6 +262,15 @@ func (pm *ProtocolManager) Start(srvr p2pServer, maxPeers int) {
pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh)
go pm.txBroadcastLoop()
+ if pm.isBlockProposer {
+ // broadcast finalized blocks
+ pm.finalizedBlockCh = make(chan core.NewFinalizedBlockEvent,
+ finalizedBlockChanSize)
+ pm.finalizedBlockSub = pm.app.SubscribeNewFinalizedBlockEvent(
+ pm.finalizedBlockCh)
+ go pm.finalizedBlockBroadcastLoop()
+ }
+
// broadcast node metas
pm.metasCh = make(chan newMetasEvent, metaChanSize)
pm.metasSub = pm.nodeTable.SubscribeNewMetasEvent(pm.metasCh)
@@ -1128,6 +1144,20 @@ func (pm *ProtocolManager) txBroadcastLoop() {
}
}
+func (pm *ProtocolManager) finalizedBlockBroadcastLoop() {
+ for {
+ select {
+ case event := <-pm.finalizedBlockCh:
+ pm.BroadcastBlock(event.Block, true)
+ pm.BroadcastBlock(event.Block, false)
+
+ // Err() channel will be closed when unsubscribing.
+ case <-pm.finalizedBlockSub.Err():
+ return
+ }
+ }
+}
+
func (pm *ProtocolManager) metaBroadcastLoop() {
for {
select {
@@ -1154,10 +1184,7 @@ func (pm *ProtocolManager) peerSetLoop() {
for {
select {
- case event := <-pm.chainHeadCh:
- pm.BroadcastBlock(event.Block, true) // First propagate block to peers
- pm.BroadcastBlock(event.Block, false) // Only then announce to the rest
-
+ case <-pm.chainHeadCh:
if !pm.isBlockProposer {
break
}
diff --git a/dex/helper_test.go b/dex/helper_test.go
index 62dd6c5f4..86a901cc2 100644
--- a/dex/helper_test.go
+++ b/dex/helper_test.go
@@ -98,6 +98,15 @@ func (s *testP2PServer) RemoveGroup(name string) {
delete(s.group, name)
}
+type testApp struct {
+ finalizedBlockFeed event.Feed
+}
+
+func (a *testApp) SubscribeNewFinalizedBlockEvent(
+ ch chan<- core.NewFinalizedBlockEvent) event.Subscription {
+ return a.finalizedBlockFeed.Subscribe(ch)
+}
+
// newTestProtocolManager creates a new protocol manager for testing purposes,
// with the given number of blocks already known, and potential notification
// channels for different events.
@@ -125,7 +134,7 @@ func newTestProtocolManager(mode downloader.SyncMode, blocks int, generator func
notarySetFunc: func(uint64, uint32) (map[string]struct{}, error) { return nil, nil },
}
- pm, err := NewProtocolManager(gspec.Config, mode, DefaultConfig.NetworkId, evmux, &testTxPool{added: newtx}, engine, blockchain, db, true, tgov)
+ pm, err := NewProtocolManager(gspec.Config, mode, DefaultConfig.NetworkId, evmux, &testTxPool{added: newtx}, engine, blockchain, db, true, tgov, &testApp{})
if err != nil {
return nil, nil, err
}
diff --git a/dex/protocol.go b/dex/protocol.go
index 49bd0cc20..b6d672b7f 100644
--- a/dex/protocol.go
+++ b/dex/protocol.go
@@ -153,6 +153,11 @@ type governance interface {
DKGSet(uint64) (map[string]struct{}, error)
}
+type dexconApp interface {
+ SubscribeNewFinalizedBlockEvent(
+ chan<- core.NewFinalizedBlockEvent) event.Subscription
+}
+
type p2pServer interface {
Self() *enode.Node
diff --git a/dex/sync.go b/dex/sync.go
index 43f1291ff..1e35faf21 100644
--- a/dex/sync.go
+++ b/dex/sync.go
@@ -32,6 +32,10 @@ const (
forceSyncCycle = 10 * time.Second // Time interval to force syncs, even if few peers are available
minDesiredPeerCount = 5 // Amount of peers desired to start syncing
+ // The distance between us and peer that we can accept.
+ // This distance is related to numChains and lambdaBA dexcon config.
+ acceptableDist = 16
+
// This is the target size for the packs of transactions sent by txsyncLoop.
// A pack can get larger than this if a single transactions exceeds this size.
txsyncPackSize = 100 * 1024
@@ -263,7 +267,9 @@ func (pm *ProtocolManager) synchronise(peer *peer) {
pHead, pNumber := peer.Head()
- if pNumber <= number {
+ // If we are behind the peer, but not more than acceptable distance, don't
+ // trigger a sync. Fetcher is able to cover this.
+ if pNumber <= number+acceptableDist {
return
}
// Otherwise try to sync with the downloader