aboutsummaryrefslogtreecommitdiffstats
path: root/dex
diff options
context:
space:
mode:
Diffstat (limited to 'dex')
-rw-r--r--dex/api.go16
-rw-r--r--dex/app.go5
-rw-r--r--dex/backend.go41
-rw-r--r--dex/blockproposer.go201
-rw-r--r--dex/handler.go8
5 files changed, 247 insertions, 24 deletions
diff --git a/dex/api.go b/dex/api.go
index 40928a5c1..4d39f9f6e 100644
--- a/dex/api.go
+++ b/dex/api.go
@@ -131,6 +131,22 @@ func (api *PrivateAdminAPI) ImportChain(file string) (bool, error) {
return true, nil
}
+func (api *PrivateAdminAPI) StartProposing() error {
+ return api.dex.StartProposing()
+}
+
+func (api *PrivateAdminAPI) StopProposing() {
+ api.dex.StopProposing()
+}
+
+func (api *PrivateAdminAPI) IsLatticeSyncing() bool {
+ return api.dex.IsLatticeSyncing()
+}
+
+func (api *PrivateAdminAPI) IsProposing() bool {
+ return api.dex.IsProposing()
+}
+
// PublicDebugAPI is the collection of Ethereum full node APIs exposed
// over the public debugging endpoint.
type PublicDebugAPI struct {
diff --git a/dex/app.go b/dex/app.go
index 9dcfd87e9..d04b2afd6 100644
--- a/dex/app.go
+++ b/dex/app.go
@@ -451,6 +451,9 @@ func (d *DexconApp) BlockDelivered(
blockPosition coreTypes.Position,
result coreTypes.FinalizationResult) {
+ log.Debug("DexconApp block deliver", "height", result.Height, "hash", blockHash, "position", blockPosition.String())
+ defer log.Debug("DexconApp block delivered", "height", result.Height, "hash", blockHash, "position", blockPosition.String())
+
chainID := blockPosition.ChainID
d.chainLock(chainID)
defer d.chainUnlock(chainID)
@@ -461,6 +464,7 @@ func (d *DexconApp) BlockDelivered(
}
block.Payload = nil
+ block.Finalization = result
dexconMeta, err := rlp.EncodeToBytes(block)
if err != nil {
panic(err)
@@ -501,6 +505,7 @@ func (d *DexconApp) BlockConfirmed(block coreTypes.Block) {
d.chainLock(block.Position.ChainID)
defer d.chainUnlock(block.Position.ChainID)
+ log.Debug("DexconApp block confirmed", "block", block.String())
if err := d.blockchain.AddConfirmedBlock(&block); err != nil {
panic(err)
}
diff --git a/dex/backend.go b/dex/backend.go
index 5eb9a85fc..8fe38cd45 100644
--- a/dex/backend.go
+++ b/dex/backend.go
@@ -21,9 +21,6 @@ import (
"fmt"
"time"
- dexCore "github.com/dexon-foundation/dexon-consensus/core"
- coreEcdsa "github.com/dexon-foundation/dexon-consensus/core/crypto/ecdsa"
-
"github.com/dexon-foundation/dexon/accounts"
"github.com/dexon-foundation/dexon/consensus"
"github.com/dexon-foundation/dexon/consensus/dexcon"
@@ -31,7 +28,6 @@ import (
"github.com/dexon-foundation/dexon/core/bloombits"
"github.com/dexon-foundation/dexon/core/rawdb"
"github.com/dexon-foundation/dexon/core/vm"
- dexDB "github.com/dexon-foundation/dexon/dex/db"
"github.com/dexon-foundation/dexon/dex/downloader"
"github.com/dexon-foundation/dexon/eth/filters"
"github.com/dexon-foundation/dexon/eth/gasprice"
@@ -74,7 +70,8 @@ type Dexon struct {
app *DexconApp
governance *DexconGovernance
network *DexconNetwork
- consensus *dexCore.Consensus
+
+ bp *blockProposer
networkID uint64
netRPCService *ethapi.PublicNetAPI
@@ -154,6 +151,14 @@ func New(ctx *node.ServiceContext, config *Config) (*Dexon, error) {
// Set config fetcher so engine can fetch current system configuration from state.
engine.SetConfigFetcher(dex.governance)
+ dMoment := time.Unix(config.DMoment, int64(0))
+ log.Info("DEXON Consensus DMoment", "time", dMoment)
+
+ // Force starting with full sync mode if this node is a bootstrap proposer.
+ if config.BlockProposerEnabled && dMoment.After(time.Now()) {
+ config.SyncMode = downloader.FullSync
+ }
+
pm, err := NewProtocolManager(dex.chainConfig, config.SyncMode,
config.NetworkId, dex.eventMux, dex.txPool, dex.engine, dex.blockchain,
chainDb, config.BlockProposerEnabled, dex.governance, dex.app)
@@ -164,13 +169,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Dexon, error) {
dex.protocolManager = pm
dex.network = NewDexconNetwork(pm)
- privKey := coreEcdsa.NewPrivateKeyFromECDSA(config.PrivateKey)
-
- dMoment := time.Unix(config.DMoment, int64(0))
- log.Info("DEXON Consensus DMoment", "time", dMoment)
-
- dex.consensus = dexCore.NewConsensus(dMoment,
- dex.app, dex.governance, dexDB.NewDatabase(chainDb), dex.network, privKey, log.Root())
+ dex.bp = NewBlockProposer(dex, dMoment)
return dex, nil
}
@@ -240,15 +239,25 @@ func (s *Dexon) Start(srvr *p2p.Server) error {
}
func (s *Dexon) Stop() error {
- s.consensus.Stop()
+ s.bp.Stop()
s.app.Stop()
return nil
}
func (s *Dexon) StartProposing() error {
- // TODO: Run with the latest confirmed block in compaction chain.
- s.consensus.Run()
- return nil
+ return s.bp.Start()
+}
+
+func (s *Dexon) StopProposing() {
+ s.bp.Stop()
+}
+
+func (s *Dexon) IsLatticeSyncing() bool {
+ return s.bp.IsLatticeSyncing()
+}
+
+func (s *Dexon) IsProposing() bool {
+ return s.bp.IsProposing()
}
// CreateDB creates the chain database.
diff --git a/dex/blockproposer.go b/dex/blockproposer.go
new file mode 100644
index 000000000..21b8ddbde
--- /dev/null
+++ b/dex/blockproposer.go
@@ -0,0 +1,201 @@
+package dex
+
+import (
+ "errors"
+ "fmt"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ dexCore "github.com/dexon-foundation/dexon-consensus/core"
+ coreEcdsa "github.com/dexon-foundation/dexon-consensus/core/crypto/ecdsa"
+ "github.com/dexon-foundation/dexon-consensus/core/syncer"
+ coreTypes "github.com/dexon-foundation/dexon-consensus/core/types"
+
+ "github.com/dexon-foundation/dexon/core"
+ "github.com/dexon-foundation/dexon/dex/db"
+ "github.com/dexon-foundation/dexon/log"
+ "github.com/dexon-foundation/dexon/rlp"
+)
+
+type blockProposer struct {
+ mu sync.Mutex
+ running int32
+ syncing int32
+ proposing int32
+ dex *Dexon
+ dMoment time.Time
+
+ wg sync.WaitGroup
+ stopCh chan struct{}
+}
+
+func NewBlockProposer(dex *Dexon, dMoment time.Time) *blockProposer {
+ return &blockProposer{
+ dex: dex,
+ dMoment: dMoment,
+ }
+}
+
+func (b *blockProposer) Start() error {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+
+ if !atomic.CompareAndSwapInt32(&b.running, 0, 1) {
+ return fmt.Errorf("block proposer is already running")
+ }
+ log.Info("Block proposer started")
+
+ b.stopCh = make(chan struct{})
+ b.wg.Add(1)
+ go func() {
+ defer b.wg.Done()
+ defer atomic.StoreInt32(&b.running, 0)
+
+ var err error
+ var c *dexCore.Consensus
+
+ if b.dMoment.After(time.Now()) {
+ c = b.initConsensus()
+ } else {
+ c, err = b.syncConsensus()
+ }
+
+ if err != nil {
+ log.Error("Block proposer stopped, before start running", "err", err)
+ return
+ }
+
+ b.run(c)
+ log.Info("Block proposer successfully stopped")
+ }()
+ return nil
+}
+
+func (b *blockProposer) run(c *dexCore.Consensus) {
+ log.Info("Start running consensus core")
+ go c.Run()
+ atomic.StoreInt32(&b.proposing, 1)
+ <-b.stopCh
+ log.Debug("Block proposer receive stop signal")
+ c.Stop()
+}
+
+func (b *blockProposer) Stop() {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+
+ if atomic.LoadInt32(&b.running) == 1 {
+ b.dex.protocolManager.isBlockProposer = false
+ close(b.stopCh)
+ b.wg.Wait()
+ atomic.StoreInt32(&b.proposing, 0)
+ }
+}
+
+func (b *blockProposer) IsLatticeSyncing() bool {
+ return atomic.LoadInt32(&b.syncing) == 1
+}
+
+func (b *blockProposer) IsProposing() bool {
+ return atomic.LoadInt32(&b.proposing) == 1
+}
+
+func (b *blockProposer) initConsensus() *dexCore.Consensus {
+ db := db.NewDatabase(b.dex.chainDb)
+ privkey := coreEcdsa.NewPrivateKeyFromECDSA(b.dex.config.PrivateKey)
+ return dexCore.NewConsensus(b.dMoment,
+ b.dex.app, b.dex.governance, db, b.dex.network, privkey, log.Root())
+}
+
+func (b *blockProposer) syncConsensus() (*dexCore.Consensus, error) {
+ atomic.StoreInt32(&b.syncing, 1)
+ defer atomic.StoreInt32(&b.syncing, 0)
+
+ db := db.NewDatabase(b.dex.chainDb)
+ privkey := coreEcdsa.NewPrivateKeyFromECDSA(b.dex.config.PrivateKey)
+ consensusSync := syncer.NewConsensus(b.dMoment, b.dex.app, b.dex.governance,
+ db, b.dex.network, privkey, log.Root())
+
+ blocksToSync := func(coreHeight, height uint64) []*coreTypes.Block {
+ var blocks []*coreTypes.Block
+ for len(blocks) < 1024 && coreHeight < height {
+ var block coreTypes.Block
+ b := b.dex.blockchain.GetBlockByNumber(coreHeight + 1)
+ if err := rlp.DecodeBytes(b.Header().DexconMeta, &block); err != nil {
+ panic(err)
+ }
+ blocks = append(blocks, &block)
+ coreHeight = coreHeight + 1
+ }
+ return blocks
+ }
+
+ // Sync all blocks in compaction chain to core.
+ _, coreHeight := db.GetCompactionChainTipInfo()
+
+Loop:
+ for {
+ currentBlock := b.dex.blockchain.CurrentBlock()
+ log.Debug("Syncing compaction chain", "core height", coreHeight,
+ "height", currentBlock.NumberU64())
+ blocks := blocksToSync(coreHeight, currentBlock.NumberU64())
+
+ if len(blocks) == 0 {
+ break Loop
+ }
+
+ log.Debug("Filling compaction chain", "num", len(blocks),
+ "first", blocks[0].Finalization.Height,
+ "last", blocks[len(blocks)-1].Finalization.Height)
+ if _, err := consensusSync.SyncBlocks(blocks, false); err != nil {
+ return nil, err
+ }
+ coreHeight = blocks[len(blocks)-1].Finalization.Height
+
+ select {
+ case <-b.stopCh:
+ return nil, errors.New("early stop")
+ default:
+ }
+ }
+
+ // Enable isBlockProposer flag to start receiving msg.
+ b.dex.protocolManager.isBlockProposer = true
+
+ ch := make(chan core.ChainHeadEvent)
+ sub := b.dex.blockchain.SubscribeChainHeadEvent(ch)
+ defer sub.Unsubscribe()
+
+ // Listen chain head event until synced.
+ListenLoop:
+ for {
+ select {
+ case ev := <-ch:
+ blocks := blocksToSync(coreHeight, ev.Block.NumberU64())
+ if len(blocks) > 0 {
+ log.Debug("Filling compaction chain", "num", len(blocks),
+ "first", blocks[0].Finalization.Height,
+ "last", blocks[len(blocks)-1].Finalization.Height)
+ synced, err := consensusSync.SyncBlocks(blocks, true)
+ if err != nil {
+ log.Error("SyncBlocks fail", "err", err)
+ return nil, err
+ }
+ if synced {
+ log.Debug("Consensus core synced")
+ break ListenLoop
+ }
+ coreHeight = blocks[len(blocks)-1].Finalization.Height
+ }
+ case <-sub.Err():
+ log.Debug("System stopped when syncing consensus core")
+ return nil, errors.New("system stop")
+ case <-b.stopCh:
+ log.Debug("Early stop, before consensus core can run")
+ return nil, errors.New("early stop")
+ }
+ }
+
+ return consensusSync.GetSyncedConsensus()
+}
diff --git a/dex/handler.go b/dex/handler.go
index 9174d8516..ff87884d2 100644
--- a/dex/handler.go
+++ b/dex/handler.go
@@ -713,10 +713,6 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
case msg.Code == NewBlockHashesMsg:
- // Ignore new block hash messages in block proposer mode.
- if pm.isBlockProposer {
- break
- }
var announces newBlockHashesData
if err := msg.Decode(&announces); err != nil {
return errResp(ErrDecode, "%v: %v", msg, err)
@@ -737,10 +733,6 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
case msg.Code == NewBlockMsg:
- // Ignore new block messages in block proposer mode.
- if pm.isBlockProposer {
- break
- }
// Retrieve and decode the propagated block
var block types.Block
if err := msg.Decode(&block); err != nil {