From b680d7be8c4d11fbb1b7ece43bcee93217d0278e Mon Sep 17 00:00:00 2001
From: Sonic <sonic@dexon.org>
Date: Fri, 21 Dec 2018 14:53:37 +0800
Subject: core, dex, internal: block proposer syncing (first iteration) (#96)

* dex, internal: block proposer syncing (first iteration)

* core: find block from db if not in memory

This fix handles stopping proposing and then restarting

* core: no need to reorg when reset

Dexon will not fork. This commit also fix when a block confirm but
its parent is not in db yet, during restarting proposing.

* dex: always accept NewBlockMsg, NewBlockHashesMsg

We need to accept NewBlockMsg, NewBlockHashesMsg to sync current block with
other peers in block proposer mode when syncing lattice data. It's a waste
when the node is synced and start proposing.

Todo: control msg processing on/off more granular, accept NewBlockMsg,
NewBlockHashesMsg when syncing, but stop when synced.
---
 dex/api.go           |  16 ++++
 dex/app.go           |   5 ++
 dex/backend.go       |  41 +++++++----
 dex/blockproposer.go | 201 +++++++++++++++++++++++++++++++++++++++++++++++++++
 dex/handler.go       |   8 --
 5 files changed, 247 insertions(+), 24 deletions(-)
 create mode 100644 dex/blockproposer.go

(limited to 'dex')

diff --git a/dex/api.go b/dex/api.go
index 40928a5c1..4d39f9f6e 100644
--- a/dex/api.go
+++ b/dex/api.go
@@ -131,6 +131,22 @@ func (api *PrivateAdminAPI) ImportChain(file string) (bool, error) {
 	return true, nil
 }
 
+func (api *PrivateAdminAPI) StartProposing() error {
+	return api.dex.StartProposing()
+}
+
+func (api *PrivateAdminAPI) StopProposing() {
+	api.dex.StopProposing()
+}
+
+func (api *PrivateAdminAPI) IsLatticeSyncing() bool {
+	return api.dex.IsLatticeSyncing()
+}
+
+func (api *PrivateAdminAPI) IsProposing() bool {
+	return api.dex.IsProposing()
+}
+
 // PublicDebugAPI is the collection of Ethereum full node APIs exposed
 // over the public debugging endpoint.
 type PublicDebugAPI struct {
diff --git a/dex/app.go b/dex/app.go
index 9dcfd87e9..d04b2afd6 100644
--- a/dex/app.go
+++ b/dex/app.go
@@ -451,6 +451,9 @@ func (d *DexconApp) BlockDelivered(
 	blockPosition coreTypes.Position,
 	result coreTypes.FinalizationResult) {
 
+	log.Debug("DexconApp block deliver", "height", result.Height, "hash", blockHash, "position", blockPosition.String())
+	defer log.Debug("DexconApp block delivered", "height", result.Height, "hash", blockHash, "position", blockPosition.String())
+
 	chainID := blockPosition.ChainID
 	d.chainLock(chainID)
 	defer d.chainUnlock(chainID)
@@ -461,6 +464,7 @@ func (d *DexconApp) BlockDelivered(
 	}
 
 	block.Payload = nil
+	block.Finalization = result
 	dexconMeta, err := rlp.EncodeToBytes(block)
 	if err != nil {
 		panic(err)
@@ -501,6 +505,7 @@ func (d *DexconApp) BlockConfirmed(block coreTypes.Block) {
 	d.chainLock(block.Position.ChainID)
 	defer d.chainUnlock(block.Position.ChainID)
 
+	log.Debug("DexconApp block confirmed", "block", block.String())
 	if err := d.blockchain.AddConfirmedBlock(&block); err != nil {
 		panic(err)
 	}
diff --git a/dex/backend.go b/dex/backend.go
index aab13d0d3..48e05698f 100644
--- a/dex/backend.go
+++ b/dex/backend.go
@@ -21,9 +21,6 @@ import (
 	"fmt"
 	"time"
 
-	dexCore "github.com/dexon-foundation/dexon-consensus/core"
-	coreEcdsa "github.com/dexon-foundation/dexon-consensus/core/crypto/ecdsa"
-
 	"github.com/dexon-foundation/dexon/accounts"
 	"github.com/dexon-foundation/dexon/consensus"
 	"github.com/dexon-foundation/dexon/consensus/dexcon"
@@ -31,7 +28,6 @@ import (
 	"github.com/dexon-foundation/dexon/core/bloombits"
 	"github.com/dexon-foundation/dexon/core/rawdb"
 	"github.com/dexon-foundation/dexon/core/vm"
-	dexDB "github.com/dexon-foundation/dexon/dex/db"
 	"github.com/dexon-foundation/dexon/dex/downloader"
 	"github.com/dexon-foundation/dexon/eth/filters"
 	"github.com/dexon-foundation/dexon/eth/gasprice"
@@ -74,7 +70,8 @@ type Dexon struct {
 	app        *DexconApp
 	governance *DexconGovernance
 	network    *DexconNetwork
-	consensus  *dexCore.Consensus
+
+	bp *blockProposer
 
 	networkID     uint64
 	netRPCService *ethapi.PublicNetAPI
@@ -154,6 +151,14 @@ func New(ctx *node.ServiceContext, config *Config) (*Dexon, error) {
 	// Set config fetcher so engine can fetch current system configuration from state.
 	engine.SetConfigFetcher(dex.governance)
 
+	dMoment := time.Unix(config.DMoment, int64(0))
+	log.Info("DEXON Consensus DMoment", "time", dMoment)
+
+	// Force starting with full sync mode if this node is a bootstrap proposer.
+	if config.BlockProposerEnabled && dMoment.After(time.Now()) {
+		config.SyncMode = downloader.FullSync
+	}
+
 	pm, err := NewProtocolManager(dex.chainConfig, config.SyncMode,
 		config.NetworkId, dex.eventMux, dex.txPool, dex.engine, dex.blockchain,
 		chainDb, config.BlockProposerEnabled, dex.governance, dex.app)
@@ -164,13 +169,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Dexon, error) {
 	dex.protocolManager = pm
 	dex.network = NewDexconNetwork(pm)
 
-	privKey := coreEcdsa.NewPrivateKeyFromECDSA(config.PrivateKey)
-
-	dMoment := time.Unix(config.DMoment, int64(0))
-	log.Info("DEXON Consensus DMoment", "time", dMoment)
-
-	dex.consensus = dexCore.NewConsensus(dMoment,
-		dex.app, dex.governance, dexDB.NewDatabase(chainDb), dex.network, privKey, log.Root())
+	dex.bp = NewBlockProposer(dex, dMoment)
 	return dex, nil
 }
 
@@ -240,15 +239,25 @@ func (s *Dexon) Start(srvr *p2p.Server) error {
 }
 
 func (s *Dexon) Stop() error {
-	s.consensus.Stop()
+	s.bp.Stop()
 	s.app.Stop()
 	return nil
 }
 
 func (s *Dexon) StartProposing() error {
-	// TODO: Run with the latest confirmed block in compaction chain.
-	s.consensus.Run()
-	return nil
+	return s.bp.Start()
+}
+
+func (s *Dexon) StopProposing() {
+	s.bp.Stop()
+}
+
+func (s *Dexon) IsLatticeSyncing() bool {
+	return s.bp.IsLatticeSyncing()
+}
+
+func (s *Dexon) IsProposing() bool {
+	return s.bp.IsProposing()
 }
 
 // CreateDB creates the chain database.
diff --git a/dex/blockproposer.go b/dex/blockproposer.go
new file mode 100644
index 000000000..21b8ddbde
--- /dev/null
+++ b/dex/blockproposer.go
@@ -0,0 +1,201 @@
+package dex
+
+import (
+	"errors"
+	"fmt"
+	"sync"
+	"sync/atomic"
+	"time"
+
+	dexCore "github.com/dexon-foundation/dexon-consensus/core"
+	coreEcdsa "github.com/dexon-foundation/dexon-consensus/core/crypto/ecdsa"
+	"github.com/dexon-foundation/dexon-consensus/core/syncer"
+	coreTypes "github.com/dexon-foundation/dexon-consensus/core/types"
+
+	"github.com/dexon-foundation/dexon/core"
+	"github.com/dexon-foundation/dexon/dex/db"
+	"github.com/dexon-foundation/dexon/log"
+	"github.com/dexon-foundation/dexon/rlp"
+)
+
+type blockProposer struct {
+	mu        sync.Mutex
+	running   int32
+	syncing   int32
+	proposing int32
+	dex       *Dexon
+	dMoment   time.Time
+
+	wg     sync.WaitGroup
+	stopCh chan struct{}
+}
+
+func NewBlockProposer(dex *Dexon, dMoment time.Time) *blockProposer {
+	return &blockProposer{
+		dex:     dex,
+		dMoment: dMoment,
+	}
+}
+
+func (b *blockProposer) Start() error {
+	b.mu.Lock()
+	defer b.mu.Unlock()
+
+	if !atomic.CompareAndSwapInt32(&b.running, 0, 1) {
+		return fmt.Errorf("block proposer is already running")
+	}
+	log.Info("Block proposer started")
+
+	b.stopCh = make(chan struct{})
+	b.wg.Add(1)
+	go func() {
+		defer b.wg.Done()
+		defer atomic.StoreInt32(&b.running, 0)
+
+		var err error
+		var c *dexCore.Consensus
+
+		if b.dMoment.After(time.Now()) {
+			c = b.initConsensus()
+		} else {
+			c, err = b.syncConsensus()
+		}
+
+		if err != nil {
+			log.Error("Block proposer stopped, before start running", "err", err)
+			return
+		}
+
+		b.run(c)
+		log.Info("Block proposer successfully stopped")
+	}()
+	return nil
+}
+
+func (b *blockProposer) run(c *dexCore.Consensus) {
+	log.Info("Start running consensus core")
+	go c.Run()
+	atomic.StoreInt32(&b.proposing, 1)
+	<-b.stopCh
+	log.Debug("Block proposer receive stop signal")
+	c.Stop()
+}
+
+func (b *blockProposer) Stop() {
+	b.mu.Lock()
+	defer b.mu.Unlock()
+
+	if atomic.LoadInt32(&b.running) == 1 {
+		b.dex.protocolManager.isBlockProposer = false
+		close(b.stopCh)
+		b.wg.Wait()
+		atomic.StoreInt32(&b.proposing, 0)
+	}
+}
+
+func (b *blockProposer) IsLatticeSyncing() bool {
+	return atomic.LoadInt32(&b.syncing) == 1
+}
+
+func (b *blockProposer) IsProposing() bool {
+	return atomic.LoadInt32(&b.proposing) == 1
+}
+
+func (b *blockProposer) initConsensus() *dexCore.Consensus {
+	db := db.NewDatabase(b.dex.chainDb)
+	privkey := coreEcdsa.NewPrivateKeyFromECDSA(b.dex.config.PrivateKey)
+	return dexCore.NewConsensus(b.dMoment,
+		b.dex.app, b.dex.governance, db, b.dex.network, privkey, log.Root())
+}
+
+func (b *blockProposer) syncConsensus() (*dexCore.Consensus, error) {
+	atomic.StoreInt32(&b.syncing, 1)
+	defer atomic.StoreInt32(&b.syncing, 0)
+
+	db := db.NewDatabase(b.dex.chainDb)
+	privkey := coreEcdsa.NewPrivateKeyFromECDSA(b.dex.config.PrivateKey)
+	consensusSync := syncer.NewConsensus(b.dMoment, b.dex.app, b.dex.governance,
+		db, b.dex.network, privkey, log.Root())
+
+	blocksToSync := func(coreHeight, height uint64) []*coreTypes.Block {
+		var blocks []*coreTypes.Block
+		for len(blocks) < 1024 && coreHeight < height {
+			var block coreTypes.Block
+			b := b.dex.blockchain.GetBlockByNumber(coreHeight + 1)
+			if err := rlp.DecodeBytes(b.Header().DexconMeta, &block); err != nil {
+				panic(err)
+			}
+			blocks = append(blocks, &block)
+			coreHeight = coreHeight + 1
+		}
+		return blocks
+	}
+
+	// Sync all blocks in compaction chain to core.
+	_, coreHeight := db.GetCompactionChainTipInfo()
+
+Loop:
+	for {
+		currentBlock := b.dex.blockchain.CurrentBlock()
+		log.Debug("Syncing compaction chain", "core height", coreHeight,
+			"height", currentBlock.NumberU64())
+		blocks := blocksToSync(coreHeight, currentBlock.NumberU64())
+
+		if len(blocks) == 0 {
+			break Loop
+		}
+
+		log.Debug("Filling compaction chain", "num", len(blocks),
+			"first", blocks[0].Finalization.Height,
+			"last", blocks[len(blocks)-1].Finalization.Height)
+		if _, err := consensusSync.SyncBlocks(blocks, false); err != nil {
+			return nil, err
+		}
+		coreHeight = blocks[len(blocks)-1].Finalization.Height
+
+		select {
+		case <-b.stopCh:
+			return nil, errors.New("early stop")
+		default:
+		}
+	}
+
+	// Enable isBlockProposer flag to start receiving msg.
+	b.dex.protocolManager.isBlockProposer = true
+
+	ch := make(chan core.ChainHeadEvent)
+	sub := b.dex.blockchain.SubscribeChainHeadEvent(ch)
+	defer sub.Unsubscribe()
+
+	// Listen chain head event until synced.
+ListenLoop:
+	for {
+		select {
+		case ev := <-ch:
+			blocks := blocksToSync(coreHeight, ev.Block.NumberU64())
+			if len(blocks) > 0 {
+				log.Debug("Filling compaction chain", "num", len(blocks),
+					"first", blocks[0].Finalization.Height,
+					"last", blocks[len(blocks)-1].Finalization.Height)
+				synced, err := consensusSync.SyncBlocks(blocks, true)
+				if err != nil {
+					log.Error("SyncBlocks fail", "err", err)
+					return nil, err
+				}
+				if synced {
+					log.Debug("Consensus core synced")
+					break ListenLoop
+				}
+				coreHeight = blocks[len(blocks)-1].Finalization.Height
+			}
+		case <-sub.Err():
+			log.Debug("System stopped when syncing consensus core")
+			return nil, errors.New("system stop")
+		case <-b.stopCh:
+			log.Debug("Early stop, before consensus core can run")
+			return nil, errors.New("early stop")
+		}
+	}
+
+	return consensusSync.GetSyncedConsensus()
+}
diff --git a/dex/handler.go b/dex/handler.go
index 7dfd74b26..9956bd1c0 100644
--- a/dex/handler.go
+++ b/dex/handler.go
@@ -718,10 +718,6 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
 		}
 
 	case msg.Code == NewBlockHashesMsg:
-		// Ignore new block hash messages in block proposer mode.
-		if pm.isBlockProposer {
-			break
-		}
 		var announces newBlockHashesData
 		if err := msg.Decode(&announces); err != nil {
 			return errResp(ErrDecode, "%v: %v", msg, err)
@@ -742,10 +738,6 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
 		}
 
 	case msg.Code == NewBlockMsg:
-		// Ignore new block messages in block proposer mode.
-		if pm.isBlockProposer {
-			break
-		}
 		// Retrieve and decode the propagated block
 		var block types.Block
 		if err := msg.Decode(&block); err != nil {
-- 
cgit v1.2.3