aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--core/blockchain.go6
-rw-r--r--core/tx_pool.go53
-rw-r--r--dex/api.go16
-rw-r--r--dex/app.go5
-rw-r--r--dex/backend.go41
-rw-r--r--dex/blockproposer.go201
-rw-r--r--dex/handler.go8
-rw-r--r--internal/web3ext/web3ext.go16
-rw-r--r--les/handler_test.go19
-rw-r--r--vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/agreement.go181
-rw-r--r--vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/consensus.go747
-rw-r--r--vendor/vendor.json6
12 files changed, 1199 insertions, 100 deletions
diff --git a/core/blockchain.go b/core/blockchain.go
index 1b3b22e5e..0e87360c3 100644
--- a/core/blockchain.go
+++ b/core/blockchain.go
@@ -1796,8 +1796,6 @@ func (bc *BlockChain) processPendingBlock(
return nil, nil, nil, fmt.Errorf("validate witness data error: %v", err)
}
- currentBlock := bc.CurrentBlock()
-
var (
receipts types.Receipts
usedGas = new(uint64)
@@ -1810,8 +1808,8 @@ func (bc *BlockChain) processPendingBlock(
var err error
parent, exist := bc.pendingBlocks[block.NumberU64()-1]
if !exist {
- parentBlock = currentBlock
- if parentBlock.NumberU64() != block.NumberU64()-1 {
+ parentBlock = bc.GetBlockByNumber(block.NumberU64() - 1)
+ if parentBlock == nil {
return nil, nil, nil, fmt.Errorf("parent block %d not exist", block.NumberU64()-1)
}
} else {
diff --git a/core/tx_pool.go b/core/tx_pool.go
index ea2025cde..911b6c261 100644
--- a/core/tx_pool.go
+++ b/core/tx_pool.go
@@ -19,7 +19,6 @@ package core
import (
"errors"
"fmt"
- "math"
"math/big"
"sort"
"sync"
@@ -405,53 +404,6 @@ func (pool *TxPool) lockedReset(oldHead, newHead *types.Header) {
// reset retrieves the current state of the blockchain and ensures the content
// of the transaction pool is valid with regard to the chain state.
func (pool *TxPool) reset(oldHead, newHead *types.Header) {
- // If we're reorging an old state, reinject all dropped transactions
- var reinject types.Transactions
-
- if oldHead != nil && oldHead.Hash() != newHead.ParentHash {
- // If the reorg is too deep, avoid doing it (will happen during fast sync)
- oldNum := oldHead.Number.Uint64()
- newNum := newHead.Number.Uint64()
-
- if depth := uint64(math.Abs(float64(oldNum) - float64(newNum))); depth > 64 {
- log.Debug("Skipping deep transaction reorg", "depth", depth)
- } else {
- // Reorg seems shallow enough to pull in all transactions into memory
- var discarded, included types.Transactions
-
- var (
- rem = pool.chain.GetBlock(oldHead.Hash(), oldHead.Number.Uint64())
- add = pool.chain.GetBlock(newHead.Hash(), newHead.Number.Uint64())
- )
- for rem.NumberU64() > add.NumberU64() {
- discarded = append(discarded, rem.Transactions()...)
- if rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil {
- log.Error("Unrooted old chain seen by tx pool", "block", oldHead.Number, "hash", oldHead.Hash())
- return
- }
- }
- for add.NumberU64() > rem.NumberU64() {
- included = append(included, add.Transactions()...)
- if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil {
- log.Error("Unrooted new chain seen by tx pool", "block", newHead.Number, "hash", newHead.Hash())
- return
- }
- }
- for rem.Hash() != add.Hash() {
- discarded = append(discarded, rem.Transactions()...)
- if rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil {
- log.Error("Unrooted old chain seen by tx pool", "block", oldHead.Number, "hash", oldHead.Hash())
- return
- }
- included = append(included, add.Transactions()...)
- if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil {
- log.Error("Unrooted new chain seen by tx pool", "block", newHead.Number, "hash", newHead.Hash())
- return
- }
- }
- reinject = types.TxDifference(discarded, included)
- }
- }
// Initialize the internal state to the current head
if newHead == nil {
newHead = pool.chain.CurrentBlock().Header() // Special case during testing
@@ -465,11 +417,6 @@ func (pool *TxPool) reset(oldHead, newHead *types.Header) {
pool.pendingState = state.ManageState(statedb)
pool.currentMaxGas = newHead.GasLimit
- // Inject any transactions discarded due to reorgs
- log.Debug("Reinjecting stale transactions", "count", len(reinject))
- senderCacher.recover(pool.signer, reinject)
- pool.addTxsLocked(reinject, false)
-
// validate the pool of pending transactions, this will remove
// any transactions that have been included in the block or
// have been invalidated because of another transaction (e.g.
diff --git a/dex/api.go b/dex/api.go
index 40928a5c1..4d39f9f6e 100644
--- a/dex/api.go
+++ b/dex/api.go
@@ -131,6 +131,22 @@ func (api *PrivateAdminAPI) ImportChain(file string) (bool, error) {
return true, nil
}
+func (api *PrivateAdminAPI) StartProposing() error {
+ return api.dex.StartProposing()
+}
+
+func (api *PrivateAdminAPI) StopProposing() {
+ api.dex.StopProposing()
+}
+
+func (api *PrivateAdminAPI) IsLatticeSyncing() bool {
+ return api.dex.IsLatticeSyncing()
+}
+
+func (api *PrivateAdminAPI) IsProposing() bool {
+ return api.dex.IsProposing()
+}
+
// PublicDebugAPI is the collection of Ethereum full node APIs exposed
// over the public debugging endpoint.
type PublicDebugAPI struct {
diff --git a/dex/app.go b/dex/app.go
index 9dcfd87e9..d04b2afd6 100644
--- a/dex/app.go
+++ b/dex/app.go
@@ -451,6 +451,9 @@ func (d *DexconApp) BlockDelivered(
blockPosition coreTypes.Position,
result coreTypes.FinalizationResult) {
+ log.Debug("DexconApp block deliver", "height", result.Height, "hash", blockHash, "position", blockPosition.String())
+ defer log.Debug("DexconApp block delivered", "height", result.Height, "hash", blockHash, "position", blockPosition.String())
+
chainID := blockPosition.ChainID
d.chainLock(chainID)
defer d.chainUnlock(chainID)
@@ -461,6 +464,7 @@ func (d *DexconApp) BlockDelivered(
}
block.Payload = nil
+ block.Finalization = result
dexconMeta, err := rlp.EncodeToBytes(block)
if err != nil {
panic(err)
@@ -501,6 +505,7 @@ func (d *DexconApp) BlockConfirmed(block coreTypes.Block) {
d.chainLock(block.Position.ChainID)
defer d.chainUnlock(block.Position.ChainID)
+ log.Debug("DexconApp block confirmed", "block", block.String())
if err := d.blockchain.AddConfirmedBlock(&block); err != nil {
panic(err)
}
diff --git a/dex/backend.go b/dex/backend.go
index 5eb9a85fc..8fe38cd45 100644
--- a/dex/backend.go
+++ b/dex/backend.go
@@ -21,9 +21,6 @@ import (
"fmt"
"time"
- dexCore "github.com/dexon-foundation/dexon-consensus/core"
- coreEcdsa "github.com/dexon-foundation/dexon-consensus/core/crypto/ecdsa"
-
"github.com/dexon-foundation/dexon/accounts"
"github.com/dexon-foundation/dexon/consensus"
"github.com/dexon-foundation/dexon/consensus/dexcon"
@@ -31,7 +28,6 @@ import (
"github.com/dexon-foundation/dexon/core/bloombits"
"github.com/dexon-foundation/dexon/core/rawdb"
"github.com/dexon-foundation/dexon/core/vm"
- dexDB "github.com/dexon-foundation/dexon/dex/db"
"github.com/dexon-foundation/dexon/dex/downloader"
"github.com/dexon-foundation/dexon/eth/filters"
"github.com/dexon-foundation/dexon/eth/gasprice"
@@ -74,7 +70,8 @@ type Dexon struct {
app *DexconApp
governance *DexconGovernance
network *DexconNetwork
- consensus *dexCore.Consensus
+
+ bp *blockProposer
networkID uint64
netRPCService *ethapi.PublicNetAPI
@@ -154,6 +151,14 @@ func New(ctx *node.ServiceContext, config *Config) (*Dexon, error) {
// Set config fetcher so engine can fetch current system configuration from state.
engine.SetConfigFetcher(dex.governance)
+ dMoment := time.Unix(config.DMoment, int64(0))
+ log.Info("DEXON Consensus DMoment", "time", dMoment)
+
+ // Force starting with full sync mode if this node is a bootstrap proposer.
+ if config.BlockProposerEnabled && dMoment.After(time.Now()) {
+ config.SyncMode = downloader.FullSync
+ }
+
pm, err := NewProtocolManager(dex.chainConfig, config.SyncMode,
config.NetworkId, dex.eventMux, dex.txPool, dex.engine, dex.blockchain,
chainDb, config.BlockProposerEnabled, dex.governance, dex.app)
@@ -164,13 +169,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Dexon, error) {
dex.protocolManager = pm
dex.network = NewDexconNetwork(pm)
- privKey := coreEcdsa.NewPrivateKeyFromECDSA(config.PrivateKey)
-
- dMoment := time.Unix(config.DMoment, int64(0))
- log.Info("DEXON Consensus DMoment", "time", dMoment)
-
- dex.consensus = dexCore.NewConsensus(dMoment,
- dex.app, dex.governance, dexDB.NewDatabase(chainDb), dex.network, privKey, log.Root())
+ dex.bp = NewBlockProposer(dex, dMoment)
return dex, nil
}
@@ -240,15 +239,25 @@ func (s *Dexon) Start(srvr *p2p.Server) error {
}
func (s *Dexon) Stop() error {
- s.consensus.Stop()
+ s.bp.Stop()
s.app.Stop()
return nil
}
func (s *Dexon) StartProposing() error {
- // TODO: Run with the latest confirmed block in compaction chain.
- s.consensus.Run()
- return nil
+ return s.bp.Start()
+}
+
+func (s *Dexon) StopProposing() {
+ s.bp.Stop()
+}
+
+func (s *Dexon) IsLatticeSyncing() bool {
+ return s.bp.IsLatticeSyncing()
+}
+
+func (s *Dexon) IsProposing() bool {
+ return s.bp.IsProposing()
}
// CreateDB creates the chain database.
diff --git a/dex/blockproposer.go b/dex/blockproposer.go
new file mode 100644
index 000000000..21b8ddbde
--- /dev/null
+++ b/dex/blockproposer.go
@@ -0,0 +1,201 @@
+package dex
+
+import (
+ "errors"
+ "fmt"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ dexCore "github.com/dexon-foundation/dexon-consensus/core"
+ coreEcdsa "github.com/dexon-foundation/dexon-consensus/core/crypto/ecdsa"
+ "github.com/dexon-foundation/dexon-consensus/core/syncer"
+ coreTypes "github.com/dexon-foundation/dexon-consensus/core/types"
+
+ "github.com/dexon-foundation/dexon/core"
+ "github.com/dexon-foundation/dexon/dex/db"
+ "github.com/dexon-foundation/dexon/log"
+ "github.com/dexon-foundation/dexon/rlp"
+)
+
+type blockProposer struct {
+ mu sync.Mutex
+ running int32
+ syncing int32
+ proposing int32
+ dex *Dexon
+ dMoment time.Time
+
+ wg sync.WaitGroup
+ stopCh chan struct{}
+}
+
+func NewBlockProposer(dex *Dexon, dMoment time.Time) *blockProposer {
+ return &blockProposer{
+ dex: dex,
+ dMoment: dMoment,
+ }
+}
+
+func (b *blockProposer) Start() error {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+
+ if !atomic.CompareAndSwapInt32(&b.running, 0, 1) {
+ return fmt.Errorf("block proposer is already running")
+ }
+ log.Info("Block proposer started")
+
+ b.stopCh = make(chan struct{})
+ b.wg.Add(1)
+ go func() {
+ defer b.wg.Done()
+ defer atomic.StoreInt32(&b.running, 0)
+
+ var err error
+ var c *dexCore.Consensus
+
+ if b.dMoment.After(time.Now()) {
+ c = b.initConsensus()
+ } else {
+ c, err = b.syncConsensus()
+ }
+
+ if err != nil {
+ log.Error("Block proposer stopped, before start running", "err", err)
+ return
+ }
+
+ b.run(c)
+ log.Info("Block proposer successfully stopped")
+ }()
+ return nil
+}
+
+func (b *blockProposer) run(c *dexCore.Consensus) {
+ log.Info("Start running consensus core")
+ go c.Run()
+ atomic.StoreInt32(&b.proposing, 1)
+ <-b.stopCh
+ log.Debug("Block proposer receive stop signal")
+ c.Stop()
+}
+
+func (b *blockProposer) Stop() {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+
+ if atomic.LoadInt32(&b.running) == 1 {
+ b.dex.protocolManager.isBlockProposer = false
+ close(b.stopCh)
+ b.wg.Wait()
+ atomic.StoreInt32(&b.proposing, 0)
+ }
+}
+
+func (b *blockProposer) IsLatticeSyncing() bool {
+ return atomic.LoadInt32(&b.syncing) == 1
+}
+
+func (b *blockProposer) IsProposing() bool {
+ return atomic.LoadInt32(&b.proposing) == 1
+}
+
+func (b *blockProposer) initConsensus() *dexCore.Consensus {
+ db := db.NewDatabase(b.dex.chainDb)
+ privkey := coreEcdsa.NewPrivateKeyFromECDSA(b.dex.config.PrivateKey)
+ return dexCore.NewConsensus(b.dMoment,
+ b.dex.app, b.dex.governance, db, b.dex.network, privkey, log.Root())
+}
+
+func (b *blockProposer) syncConsensus() (*dexCore.Consensus, error) {
+ atomic.StoreInt32(&b.syncing, 1)
+ defer atomic.StoreInt32(&b.syncing, 0)
+
+ db := db.NewDatabase(b.dex.chainDb)
+ privkey := coreEcdsa.NewPrivateKeyFromECDSA(b.dex.config.PrivateKey)
+ consensusSync := syncer.NewConsensus(b.dMoment, b.dex.app, b.dex.governance,
+ db, b.dex.network, privkey, log.Root())
+
+ blocksToSync := func(coreHeight, height uint64) []*coreTypes.Block {
+ var blocks []*coreTypes.Block
+ for len(blocks) < 1024 && coreHeight < height {
+ var block coreTypes.Block
+ b := b.dex.blockchain.GetBlockByNumber(coreHeight + 1)
+ if err := rlp.DecodeBytes(b.Header().DexconMeta, &block); err != nil {
+ panic(err)
+ }
+ blocks = append(blocks, &block)
+ coreHeight = coreHeight + 1
+ }
+ return blocks
+ }
+
+ // Sync all blocks in compaction chain to core.
+ _, coreHeight := db.GetCompactionChainTipInfo()
+
+Loop:
+ for {
+ currentBlock := b.dex.blockchain.CurrentBlock()
+ log.Debug("Syncing compaction chain", "core height", coreHeight,
+ "height", currentBlock.NumberU64())
+ blocks := blocksToSync(coreHeight, currentBlock.NumberU64())
+
+ if len(blocks) == 0 {
+ break Loop
+ }
+
+ log.Debug("Filling compaction chain", "num", len(blocks),
+ "first", blocks[0].Finalization.Height,
+ "last", blocks[len(blocks)-1].Finalization.Height)
+ if _, err := consensusSync.SyncBlocks(blocks, false); err != nil {
+ return nil, err
+ }
+ coreHeight = blocks[len(blocks)-1].Finalization.Height
+
+ select {
+ case <-b.stopCh:
+ return nil, errors.New("early stop")
+ default:
+ }
+ }
+
+ // Enable isBlockProposer flag to start receiving msg.
+ b.dex.protocolManager.isBlockProposer = true
+
+ ch := make(chan core.ChainHeadEvent)
+ sub := b.dex.blockchain.SubscribeChainHeadEvent(ch)
+ defer sub.Unsubscribe()
+
+ // Listen chain head event until synced.
+ListenLoop:
+ for {
+ select {
+ case ev := <-ch:
+ blocks := blocksToSync(coreHeight, ev.Block.NumberU64())
+ if len(blocks) > 0 {
+ log.Debug("Filling compaction chain", "num", len(blocks),
+ "first", blocks[0].Finalization.Height,
+ "last", blocks[len(blocks)-1].Finalization.Height)
+ synced, err := consensusSync.SyncBlocks(blocks, true)
+ if err != nil {
+ log.Error("SyncBlocks fail", "err", err)
+ return nil, err
+ }
+ if synced {
+ log.Debug("Consensus core synced")
+ break ListenLoop
+ }
+ coreHeight = blocks[len(blocks)-1].Finalization.Height
+ }
+ case <-sub.Err():
+ log.Debug("System stopped when syncing consensus core")
+ return nil, errors.New("system stop")
+ case <-b.stopCh:
+ log.Debug("Early stop, before consensus core can run")
+ return nil, errors.New("early stop")
+ }
+ }
+
+ return consensusSync.GetSyncedConsensus()
+}
diff --git a/dex/handler.go b/dex/handler.go
index 9174d8516..ff87884d2 100644
--- a/dex/handler.go
+++ b/dex/handler.go
@@ -713,10 +713,6 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
case msg.Code == NewBlockHashesMsg:
- // Ignore new block hash messages in block proposer mode.
- if pm.isBlockProposer {
- break
- }
var announces newBlockHashesData
if err := msg.Decode(&announces); err != nil {
return errResp(ErrDecode, "%v: %v", msg, err)
@@ -737,10 +733,6 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
case msg.Code == NewBlockMsg:
- // Ignore new block messages in block proposer mode.
- if pm.isBlockProposer {
- break
- }
// Retrieve and decode the propagated block
var block types.Block
if err := msg.Decode(&block); err != nil {
diff --git a/internal/web3ext/web3ext.go b/internal/web3ext/web3ext.go
index 6b98c8b7e..2253142b9 100644
--- a/internal/web3ext/web3ext.go
+++ b/internal/web3ext/web3ext.go
@@ -199,6 +199,14 @@ web3._extend({
name: 'stopWS',
call: 'admin_stopWS'
}),
+ new web3._extend.Method({
+ name: 'startProposing',
+ call: 'admin_startProposing'
+ }),
+ new web3._extend.Method({
+ name: 'stopProposing',
+ call: 'admin_stopProposing'
+ }),
],
properties: [
new web3._extend.Property({
@@ -213,6 +221,14 @@ web3._extend({
name: 'datadir',
getter: 'admin_datadir'
}),
+ new web3._extend.Property({
+ name: 'isLatticeSyncing',
+ getter: 'admin_isLatticeSyncing'
+ }),
+ new web3._extend.Property({
+ name: 'isProposing',
+ getter: 'admin_isProposing'
+ }),
]
});
`
diff --git a/les/handler_test.go b/les/handler_test.go
index 6ef6da8e9..eece82a28 100644
--- a/les/handler_test.go
+++ b/les/handler_test.go
@@ -561,23 +561,4 @@ func TestTransactionStatusLes2(t *testing.T) {
block1hash := rawdb.ReadCanonicalHash(db, 1)
test(tx1, false, txStatus{Status: core.TxStatusIncluded, Lookup: &rawdb.TxLookupEntry{BlockHash: block1hash, BlockIndex: 1, Index: 0}})
test(tx2, false, txStatus{Status: core.TxStatusIncluded, Lookup: &rawdb.TxLookupEntry{BlockHash: block1hash, BlockIndex: 1, Index: 1}})
-
- // create a reorg that rolls them back
- gchain, _ = core.GenerateChain(params.TestChainConfig, chain.GetBlockByNumber(0), ethash.NewFaker(), db, 2, func(i int, block *core.BlockGen) {})
- if _, err := chain.InsertChain(gchain); err != nil {
- panic(err)
- }
- // wait until TxPool processes the reorg
- for i := 0; i < 10; i++ {
- if pending, _ := txpool.Stats(); pending == 3 {
- break
- }
- time.Sleep(100 * time.Millisecond)
- }
- if pending, _ := txpool.Stats(); pending != 3 {
- t.Fatalf("pending count mismatch: have %d, want 3", pending)
- }
- // check if their status is pending again
- test(tx1, false, txStatus{Status: core.TxStatusPending})
- test(tx2, false, txStatus{Status: core.TxStatusPending})
}
diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/agreement.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/agreement.go
new file mode 100644
index 000000000..fee462442
--- /dev/null
+++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/agreement.go
@@ -0,0 +1,181 @@
+// Copyright 2018 The dexon-consensus-core Authors
+// This file is part of the dexon-consensus-core library.
+//
+// The dexon-consensus-core library is free software: you can redistribute it
+// and/or modify it under the terms of the GNU Lesser General Public License as
+// published by the Free Software Foundation, either version 3 of the License,
+// or (at your option) any later version.
+//
+// The dexon-consensus-core library is distributed in the hope that it will be
+// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
+// General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the dexon-consensus-core library. If not, see
+// <http://www.gnu.org/licenses/>.
+
+package syncer
+
+import (
+ "sync"
+
+ "github.com/dexon-foundation/dexon-consensus/common"
+ "github.com/dexon-foundation/dexon-consensus/core"
+ "github.com/dexon-foundation/dexon-consensus/core/types"
+ "github.com/dexon-foundation/dexon-consensus/core/utils"
+)
+
+// Struct agreement implements struct of BA (Byzantine Agreement) protocol
+// needed in syncer, which only receives agreement results.
+type agreement struct {
+ wg *sync.WaitGroup
+ cache *utils.NodeSetCache
+ inputChan chan interface{}
+ outputChan chan<- *types.Block
+ pullChan chan<- common.Hash
+ blocks map[types.Position]map[common.Hash]*types.Block
+ agreementResults map[common.Hash]struct{}
+ latestCRSRound uint64
+ pendings map[uint64]map[common.Hash]*types.AgreementResult
+ logger common.Logger
+ confirmedBlocks map[common.Hash]struct{}
+}
+
+// newAgreement creates a new agreement instance.
+func newAgreement(
+ ch chan<- *types.Block,
+ pullChan chan<- common.Hash,
+ cache *utils.NodeSetCache,
+ wg *sync.WaitGroup,
+ logger common.Logger) *agreement {
+
+ return &agreement{
+ cache: cache,
+ wg: wg,
+ inputChan: make(chan interface{}, 1000),
+ outputChan: ch,
+ pullChan: pullChan,
+ blocks: make(map[types.Position]map[common.Hash]*types.Block),
+ agreementResults: make(map[common.Hash]struct{}),
+ logger: logger,
+ pendings: make(
+ map[uint64]map[common.Hash]*types.AgreementResult),
+ confirmedBlocks: make(map[common.Hash]struct{}),
+ }
+}
+
+// run starts the agreement, this does not start a new routine, go a new
+// routine explicitly in the caller.
+func (a *agreement) run() {
+ a.wg.Add(1)
+ defer a.wg.Done()
+ for {
+ select {
+ case val, ok := <-a.inputChan:
+ if !ok {
+ // InputChan is closed by network when network ends.
+ return
+ }
+ switch v := val.(type) {
+ case *types.Block:
+ a.processBlock(v)
+ case *types.AgreementResult:
+ a.processAgreementResult(v)
+ case uint64:
+ a.processNewCRS(v)
+ }
+ }
+ }
+}
+
+func (a *agreement) processBlock(b *types.Block) {
+ if _, exist := a.confirmedBlocks[b.Hash]; exist {
+ return
+ }
+ if _, exist := a.agreementResults[b.Hash]; exist {
+ a.confirm(b)
+ } else {
+ if _, exist := a.blocks[b.Position]; !exist {
+ a.blocks[b.Position] = make(map[common.Hash]*types.Block)
+ }
+ a.blocks[b.Position][b.Hash] = b
+ }
+}
+
+func (a *agreement) processAgreementResult(r *types.AgreementResult) {
+ // Cache those results that CRS is not ready yet.
+ if _, exists := a.confirmedBlocks[r.BlockHash]; exists {
+ a.logger.Info("agreement result already confirmed", "result", r)
+ return
+ }
+ if r.Position.Round > a.latestCRSRound {
+ pendingsForRound, exists := a.pendings[r.Position.Round]
+ if !exists {
+ pendingsForRound = make(map[common.Hash]*types.AgreementResult)
+ a.pendings[r.Position.Round] = pendingsForRound
+ }
+ pendingsForRound[r.BlockHash] = r
+ a.logger.Info("agreement result cached", "result", r)
+ return
+ }
+ if err := core.VerifyAgreementResult(r, a.cache); err != nil {
+ a.logger.Error("agreement result verification failed",
+ "result", r,
+ "error", err)
+ return
+ }
+ if r.IsEmptyBlock {
+ // Empty block is also confirmed.
+ b := &types.Block{
+ Position: r.Position,
+ }
+ a.confirm(b)
+ } else {
+ needPull := true
+ if bs, exist := a.blocks[r.Position]; exist {
+ if b, exist := bs[r.BlockHash]; exist {
+ a.confirm(b)
+ needPull = false
+ }
+ }
+ if needPull {
+ a.agreementResults[r.BlockHash] = struct{}{}
+ a.pullChan <- r.BlockHash
+ }
+ }
+}
+
+func (a *agreement) processNewCRS(round uint64) {
+ if round <= a.latestCRSRound {
+ return
+ }
+ // Verify all pending results.
+ for r := a.latestCRSRound + 1; r <= round; r++ {
+ pendingsForRound := a.pendings[r]
+ if pendingsForRound == nil {
+ continue
+ }
+ delete(a.pendings, r)
+ for _, res := range pendingsForRound {
+ if err := core.VerifyAgreementResult(res, a.cache); err != nil {
+ a.logger.Error("invalid agreement result", "result", res)
+ continue
+ }
+ a.logger.Error("flush agreement result", "result", res)
+ a.processAgreementResult(res)
+ break
+ }
+ }
+ a.latestCRSRound = round
+}
+
+// confirm notifies consensus the confirmation of a block in BA.
+func (a *agreement) confirm(b *types.Block) {
+ if _, exist := a.confirmedBlocks[b.Hash]; !exist {
+ delete(a.blocks, b.Position)
+ delete(a.agreementResults, b.Hash)
+ a.outputChan <- b
+ a.confirmedBlocks[b.Hash] = struct{}{}
+ }
+}
diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/consensus.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/consensus.go
new file mode 100644
index 000000000..da9d352f4
--- /dev/null
+++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/consensus.go
@@ -0,0 +1,747 @@
+// Copyright 2018 The dexon-consensus Authors
+// This file is part of the dexon-consensus library.
+//
+// The dexon-consensus library is free software: you can redistribute it
+// and/or modify it under the terms of the GNU Lesser General Public License as
+// published by the Free Software Foundation, either version 3 of the License,
+// or (at your option) any later version.
+//
+// The dexon-consensus library is distributed in the hope that it will be
+// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
+// General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the dexon-consensus library. If not, see
+// <http://www.gnu.org/licenses/>.
+
+package syncer
+
+import (
+ "context"
+ "fmt"
+ "sort"
+ "sync"
+ "time"
+
+ "github.com/dexon-foundation/dexon-consensus/common"
+ "github.com/dexon-foundation/dexon-consensus/core"
+ "github.com/dexon-foundation/dexon-consensus/core/crypto"
+ "github.com/dexon-foundation/dexon-consensus/core/db"
+ "github.com/dexon-foundation/dexon-consensus/core/types"
+ "github.com/dexon-foundation/dexon-consensus/core/utils"
+)
+
+var (
+ // ErrAlreadySynced is reported when syncer is synced.
+ ErrAlreadySynced = fmt.Errorf("already synced")
+ // ErrNotSynced is reported when syncer is not synced yet.
+ ErrNotSynced = fmt.Errorf("not synced yet")
+ // ErrGenesisBlockReached is reported when genesis block reached.
+ ErrGenesisBlockReached = fmt.Errorf("genesis block reached")
+ // ErrInvalidBlockOrder is reported when SyncBlocks receives unordered blocks.
+ ErrInvalidBlockOrder = fmt.Errorf("invalid block order")
+ // ErrMismatchBlockHashSequence means the delivering sequence is not
+ // correct, compared to finalized blocks.
+ ErrMismatchBlockHashSequence = fmt.Errorf("mismatch block hash sequence")
+ // ErrInvalidSyncingFinalizationHeight raised when the blocks to sync is
+ // not following the compaction chain tip in database.
+ ErrInvalidSyncingFinalizationHeight = fmt.Errorf(
+ "invalid syncing finalization height")
+)
+
+// Consensus is for syncing consensus module.
+type Consensus struct {
+ db db.Database
+ gov core.Governance
+ dMoment time.Time
+ logger common.Logger
+ app core.Application
+ prv crypto.PrivateKey
+ network core.Network
+ nodeSetCache *utils.NodeSetCache
+
+ lattice *core.Lattice
+ validatedChains map[uint32]struct{}
+ finalizedBlockHashes common.Hashes
+ latticeLastRound uint64
+ randomnessResults []*types.BlockRandomnessResult
+ blocks []types.ByPosition
+ agreements []*agreement
+ configs []*types.Config
+ roundBeginTimes []time.Time
+ agreementRoundCut uint64
+
+ // lock for accessing all fields.
+ lock sync.RWMutex
+ moduleWaitGroup sync.WaitGroup
+ agreementWaitGroup sync.WaitGroup
+ pullChan chan common.Hash
+ receiveChan chan *types.Block
+ ctx context.Context
+ ctxCancel context.CancelFunc
+ syncedLastBlock *types.Block
+ syncedConsensus *core.Consensus
+}
+
+// NewConsensus creates an instance for Consensus (syncer consensus).
+func NewConsensus(
+ dMoment time.Time,
+ app core.Application,
+ gov core.Governance,
+ db db.Database,
+ network core.Network,
+ prv crypto.PrivateKey,
+ logger common.Logger) *Consensus {
+
+ con := &Consensus{
+ dMoment: dMoment,
+ app: app,
+ gov: gov,
+ db: db,
+ network: network,
+ nodeSetCache: utils.NewNodeSetCache(gov),
+ prv: prv,
+ logger: logger,
+ validatedChains: make(map[uint32]struct{}),
+ configs: []*types.Config{
+ utils.GetConfigWithPanic(gov, 0, logger),
+ },
+ roundBeginTimes: []time.Time{dMoment},
+ receiveChan: make(chan *types.Block, 1000),
+ pullChan: make(chan common.Hash, 1000),
+ }
+ con.ctx, con.ctxCancel = context.WithCancel(context.Background())
+ return con
+}
+
+func (con *Consensus) initConsensusObj(initBlock *types.Block) {
+ var cfg *types.Config
+ func() {
+ con.lock.Lock()
+ defer con.lock.Unlock()
+ con.latticeLastRound = initBlock.Position.Round
+ cfg = con.configs[con.latticeLastRound]
+ debugApp, _ := con.app.(core.Debug)
+ con.lattice = core.NewLattice(
+ con.roundBeginTimes[con.latticeLastRound],
+ con.latticeLastRound,
+ cfg,
+ core.NewAuthenticator(con.prv),
+ con.app,
+ debugApp,
+ con.db,
+ con.logger,
+ )
+ }()
+ con.startAgreement(cfg.NumChains)
+ con.startNetwork()
+ con.startCRSMonitor()
+}
+
+func (con *Consensus) checkIfValidated() bool {
+ con.lock.RLock()
+ defer con.lock.RUnlock()
+ var numChains = con.configs[con.blocks[0][0].Position.Round].NumChains
+ var validatedChainCount uint32
+ // Make sure we validate some block in all chains.
+ for chainID := range con.validatedChains {
+ if chainID < numChains {
+ validatedChainCount++
+ }
+ }
+ if validatedChainCount == numChains {
+ return true
+ }
+ con.logger.Info("not validated yet", "validated-chain", validatedChainCount)
+ return false
+}
+
+func (con *Consensus) checkIfSynced(blocks []*types.Block) bool {
+ con.lock.RLock()
+ defer con.lock.RUnlock()
+ var (
+ numChains = con.configs[con.blocks[0][0].Position.Round].NumChains
+ compactionTips = make([]*types.Block, numChains)
+ overlapCount = uint32(0)
+ )
+ // Find tips (newset blocks) of each chain in compaction chain.
+ b := blocks[len(blocks)-1]
+ for tipCount := uint32(0); tipCount < numChains; {
+ if compactionTips[b.Position.ChainID] == nil {
+ // Check chainID for config change.
+ if b.Position.ChainID < numChains {
+ compactionTips[b.Position.ChainID] = b
+ tipCount++
+ }
+ }
+ if (b.Finalization.ParentHash == common.Hash{}) {
+ return false
+ }
+ b1, err := con.db.GetBlock(b.Finalization.ParentHash)
+ if err != nil {
+ panic(err)
+ }
+ b = &b1
+ }
+ // Check if chain tips of compaction chain and current cached confirmed
+ // blocks are overlapped on each chain, numChains is decided by the round
+ // of last block we seen on compaction chain.
+ for chainID, b := range compactionTips {
+ if len(con.blocks[chainID]) > 0 {
+ if !b.Position.Older(&con.blocks[chainID][0].Position) {
+ overlapCount++
+ }
+ }
+ }
+ if overlapCount == numChains {
+ return true
+ }
+ con.logger.Info("not synced yet",
+ "overlap-count", overlapCount,
+ "num-chain", numChains,
+ "last-block", blocks[len(blocks)-1])
+ return false
+}
+
+// ensureAgreementOverlapRound ensures the oldest blocks in each chain in
+// con.blocks are all in the same round, for avoiding config change while
+// syncing.
+func (con *Consensus) ensureAgreementOverlapRound() bool {
+ con.lock.Lock()
+ defer con.lock.Unlock()
+ if con.agreementRoundCut > 0 {
+ return true
+ }
+ // Clean empty blocks on tips of chains.
+ for idx, bs := range con.blocks {
+ for len(bs) > 0 && con.isEmptyBlock(bs[0]) {
+ bs = bs[1:]
+ }
+ con.blocks[idx] = bs
+ }
+ // Build empty blocks.
+ for _, bs := range con.blocks {
+ for i := range bs {
+ if con.isEmptyBlock(bs[i]) {
+ if bs[i-1].Position.Height == bs[i].Position.Height-1 {
+ con.buildEmptyBlock(bs[i], bs[i-1])
+ }
+ }
+ }
+ }
+ var tipRoundMap map[uint64]uint32
+ for {
+ tipRoundMap = make(map[uint64]uint32)
+ for _, bs := range con.blocks {
+ if len(bs) > 0 {
+ tipRoundMap[bs[0].Position.Round]++
+ }
+ }
+ if len(tipRoundMap) <= 1 {
+ break
+ }
+ // Make all tips in same round.
+ var maxRound uint64
+ for r := range tipRoundMap {
+ if r > maxRound {
+ maxRound = r
+ }
+ }
+ for idx, bs := range con.blocks {
+ for len(bs) > 0 && bs[0].Position.Round < maxRound {
+ bs = bs[1:]
+ }
+ con.blocks[idx] = bs
+ }
+ }
+ if len(tipRoundMap) == 1 {
+ var r uint64
+ for r = range tipRoundMap {
+ break
+ }
+ if tipRoundMap[r] == con.configs[r].NumChains {
+ con.agreementRoundCut = r
+ con.logger.Info("agreement round cut found, round", r)
+ return true
+ }
+ }
+ return false
+}
+
+func (con *Consensus) findLatticeSyncBlock(
+ blocks []*types.Block) (*types.Block, error) {
+ lastBlock := blocks[len(blocks)-1]
+ round := lastBlock.Position.Round
+ for {
+ // Find round r which r-1, r, r+1 are all in same total ordering config.
+ for {
+ sameAsPrevRound := round == 0 || !con.isConfigChanged(
+ con.configs[round-1], con.configs[round])
+ sameAsNextRound := !con.isConfigChanged(
+ con.configs[round], con.configs[round+1])
+ if sameAsPrevRound && sameAsNextRound {
+ break
+ }
+ if round == 0 {
+ // Unable to find a safe round, wait for new rounds.
+ return nil, nil
+ }
+ round--
+ }
+ // Find the newset block which round is "round".
+ for lastBlock.Position.Round != round {
+ if (lastBlock.Finalization.ParentHash == common.Hash{}) {
+ return nil, ErrGenesisBlockReached
+ }
+ b, err := con.db.GetBlock(lastBlock.Finalization.ParentHash)
+ if err != nil {
+ return nil, err
+ }
+ lastBlock = &b
+ }
+ // Find the deliver set by hash for two times. Blocks in a deliver set
+ // returned by total ordering is sorted by hash. If a block's parent hash
+ // is greater than its hash means there is a cut between deliver sets.
+ var curBlock, prevBlock *types.Block
+ var deliverSetFirstBlock, deliverSetLastBlock *types.Block
+ curBlock = lastBlock
+ for {
+ if (curBlock.Finalization.ParentHash == common.Hash{}) {
+ return nil, ErrGenesisBlockReached
+ }
+ b, err := con.db.GetBlock(curBlock.Finalization.ParentHash)
+ if err != nil {
+ return nil, err
+ }
+ prevBlock = &b
+ if !prevBlock.Hash.Less(curBlock.Hash) {
+ break
+ }
+ curBlock = prevBlock
+ }
+ deliverSetLastBlock = prevBlock
+ curBlock = prevBlock
+ for {
+ if (curBlock.Finalization.ParentHash == common.Hash{}) {
+ break
+ }
+ b, err := con.db.GetBlock(curBlock.Finalization.ParentHash)
+ if err != nil {
+ return nil, err
+ }
+ prevBlock = &b
+ if !prevBlock.Hash.Less(curBlock.Hash) {
+ break
+ }
+ curBlock = prevBlock
+ }
+ deliverSetFirstBlock = curBlock
+ // Check if all blocks from deliverSetFirstBlock to deliverSetLastBlock
+ // are in the same round.
+ ok := true
+ curBlock = deliverSetLastBlock
+ for {
+ if curBlock.Position.Round != round {
+ ok = false
+ break
+ }
+ b, err := con.db.GetBlock(curBlock.Finalization.ParentHash)
+ if err != nil {
+ return nil, err
+ }
+ curBlock = &b
+ if curBlock.Hash == deliverSetFirstBlock.Hash {
+ break
+ }
+ }
+ if ok {
+ return deliverSetFirstBlock, nil
+ }
+ if round == 0 {
+ return nil, nil
+ }
+ round--
+ }
+}
+
+func (con *Consensus) processFinalizedBlock(block *types.Block) error {
+ if con.lattice == nil {
+ return nil
+ }
+ con.finalizedBlockHashes = append(con.finalizedBlockHashes, block.Hash)
+ delivered, err := con.lattice.ProcessFinalizedBlock(block)
+ if err != nil {
+ return err
+ }
+ for idx, b := range delivered {
+ if con.finalizedBlockHashes[idx] != b.Hash {
+ return ErrMismatchBlockHashSequence
+ }
+ con.validatedChains[b.Position.ChainID] = struct{}{}
+ }
+ con.finalizedBlockHashes = con.finalizedBlockHashes[len(delivered):]
+ return nil
+}
+
+// SyncBlocks syncs blocks from compaction chain, latest is true if the caller
+// regards the blocks are the latest ones. Notice that latest can be true for
+// many times.
+// NOTICE: parameter "blocks" should be consecutive in compaction height.
+func (con *Consensus) SyncBlocks(
+ blocks []*types.Block, latest bool) (bool, error) {
+ if con.syncedLastBlock != nil {
+ return true, ErrAlreadySynced
+ }
+ if len(blocks) == 0 {
+ return false, nil
+ }
+ // Check if blocks are consecutive.
+ for i := 1; i < len(blocks); i++ {
+ if blocks[i].Finalization.Height != blocks[i-1].Finalization.Height+1 {
+ return false, ErrInvalidBlockOrder
+ }
+ }
+ // Make sure the first block is the next block of current compaction chain
+ // tip in DB.
+ _, tipHeight := con.db.GetCompactionChainTipInfo()
+ if blocks[0].Finalization.Height != tipHeight+1 {
+ con.logger.Error("mismatched finalization height",
+ "now", blocks[0].Finalization.Height,
+ "expected", tipHeight+1)
+ return false, ErrInvalidSyncingFinalizationHeight
+ }
+ con.logger.Info("syncBlocks",
+ "position", &blocks[0].Position,
+ "final height", blocks[0].Finalization.Height,
+ "len", len(blocks),
+ "latest", latest,
+ )
+ con.setupConfigs(blocks)
+ for _, b := range blocks {
+ // TODO(haoping) remove this if lattice puts blocks into db.
+ if err := con.db.PutBlock(*b); err != nil {
+ // A block might be put into db when confirmed by BA, but not
+ // finalized yet.
+ if err == db.ErrBlockExists {
+ err = con.db.UpdateBlock(*b)
+ }
+ if err != nil {
+ return false, err
+ }
+ }
+ if err := con.db.PutCompactionChainTipInfo(
+ b.Hash, b.Finalization.Height); err != nil {
+ return false, err
+ }
+ if err := con.processFinalizedBlock(b); err != nil {
+ return false, err
+ }
+ }
+ if latest && con.lattice == nil {
+ // New Lattice and find the deliver set of total ordering when "latest" is
+ // true for first time. Deliver set is found by block hashes.
+ syncBlock, err := con.findLatticeSyncBlock(blocks)
+ if err != nil {
+ return false, err
+ }
+ if syncBlock != nil {
+ con.logger.Info("deliver set found", syncBlock)
+ // New lattice with the round of syncBlock.
+ con.initConsensusObj(syncBlock)
+ // Process blocks from syncBlock to blocks' last block.
+ b := blocks[len(blocks)-1]
+ blocksCount := b.Finalization.Height - syncBlock.Finalization.Height + 1
+ blocksToProcess := make([]*types.Block, blocksCount)
+ for {
+ blocksToProcess[blocksCount-1] = b
+ blocksCount--
+ if b.Hash == syncBlock.Hash {
+ break
+ }
+ b1, err := con.db.GetBlock(b.Finalization.ParentHash)
+ if err != nil {
+ return false, err
+ }
+ b = &b1
+ }
+ for _, b := range blocksToProcess {
+ if err := con.processFinalizedBlock(b); err != nil {
+ return false, err
+ }
+ }
+ }
+ }
+ if latest && con.ensureAgreementOverlapRound() {
+ // Check if compaction and agreements' blocks are overlapped. The
+ // overlapping of compaction chain and BA's oldest blocks means the
+ // syncing is done.
+ if con.checkIfValidated() && con.checkIfSynced(blocks) {
+ if err := con.Stop(); err != nil {
+ return false, err
+ }
+ con.syncedLastBlock = blocks[len(blocks)-1]
+ con.logger.Info("syncer.Consensus synced",
+ "last-block", con.syncedLastBlock)
+ }
+ }
+ return con.syncedLastBlock != nil, nil
+}
+
+// GetSyncedConsensus returns the core.Consensus instance after synced.
+func (con *Consensus) GetSyncedConsensus() (*core.Consensus, error) {
+ if con.syncedConsensus != nil {
+ return con.syncedConsensus, nil
+ }
+ if con.syncedLastBlock == nil {
+ return nil, ErrNotSynced
+ }
+ // flush all blocks in con.blocks into core.Consensus, and build
+ // core.Consensus from syncer.
+ confirmedBlocks := []*types.Block{}
+ func() {
+ con.lock.Lock()
+ defer con.lock.Unlock()
+ for _, bs := range con.blocks {
+ confirmedBlocks = append(confirmedBlocks, bs...)
+ }
+ }()
+ var err error
+ con.syncedConsensus, err = core.NewConsensusFromSyncer(
+ con.syncedLastBlock,
+ con.roundBeginTimes[con.syncedLastBlock.Position.Round],
+ con.app,
+ con.gov,
+ con.db,
+ con.network,
+ con.prv,
+ con.lattice,
+ confirmedBlocks,
+ con.randomnessResults,
+ con.logger)
+ return con.syncedConsensus, err
+}
+
+// Stop the syncer.
+//
+// This method is mainly for caller to stop the syncer before synced, the syncer
+// would call this method automatically after synced.
+func (con *Consensus) Stop() error {
+ // Stop network and CRS routines, wait until they are all stoped.
+ con.ctxCancel()
+ con.moduleWaitGroup.Wait()
+ // Stop agreements.
+ con.stopAgreement()
+ return nil
+}
+
+// isEmptyBlock checks if a block is an empty block by both its hash and parent
+// hash are empty.
+func (con *Consensus) isEmptyBlock(b *types.Block) bool {
+ return b.Hash == common.Hash{} && b.ParentHash == common.Hash{}
+}
+
+// buildEmptyBlock builds an empty block in agreement.
+func (con *Consensus) buildEmptyBlock(b *types.Block, parent *types.Block) {
+ cfg := con.configs[b.Position.Round]
+ b.Timestamp = parent.Timestamp.Add(cfg.MinBlockInterval)
+ b.Witness.Height = parent.Witness.Height
+ b.Witness.Data = make([]byte, len(parent.Witness.Data))
+ copy(b.Witness.Data, parent.Witness.Data)
+ b.Acks = common.NewSortedHashes(common.Hashes{parent.Hash})
+}
+
+// setupConfigs is called by SyncBlocks with blocks from compaction chain. In
+// the first time, setupConfigs setups from round 0.
+func (con *Consensus) setupConfigs(blocks []*types.Block) {
+ // Find max round in blocks.
+ var maxRound uint64
+ for _, b := range blocks {
+ if b.Position.Round > maxRound {
+ maxRound = b.Position.Round
+ }
+ }
+ // Get configs from governance.
+ //
+ // In fullnode, the notification of new round is yet another TX, which
+ // needs to be executed after corresponding block delivered. Thus, the
+ // configuration for 'maxRound + core.ConfigRoundShift' won't be ready when
+ // seeing this block.
+ untilRound := maxRound + core.ConfigRoundShift - 1
+ curMaxNumChains := uint32(0)
+ func() {
+ con.lock.Lock()
+ defer con.lock.Unlock()
+ for r := uint64(len(con.configs)); r <= untilRound; r++ {
+ cfg := utils.GetConfigWithPanic(con.gov, r, con.logger)
+ con.configs = append(con.configs, cfg)
+ con.roundBeginTimes = append(
+ con.roundBeginTimes,
+ con.roundBeginTimes[r-1].Add(con.configs[r-1].RoundInterval))
+ if cfg.NumChains >= curMaxNumChains {
+ curMaxNumChains = cfg.NumChains
+ }
+ }
+ }()
+ con.resizeByNumChains(curMaxNumChains)
+ // Notify core.Lattice for new configs.
+ if con.lattice != nil {
+ for con.latticeLastRound+1 <= maxRound {
+ con.latticeLastRound++
+ if err := con.lattice.AppendConfig(
+ con.latticeLastRound,
+ con.configs[con.latticeLastRound]); err != nil {
+ panic(err)
+ }
+ }
+ }
+}
+
+// resizeByNumChains resizes fake lattice and agreement if numChains increases.
+// Notice the decreasing case is neglected.
+func (con *Consensus) resizeByNumChains(numChains uint32) {
+ con.lock.Lock()
+ defer con.lock.Unlock()
+ if numChains > uint32(len(con.blocks)) {
+ for i := uint32(len(con.blocks)); i < numChains; i++ {
+ // Resize the pool of blocks.
+ con.blocks = append(con.blocks, types.ByPosition{})
+ // Resize agreement modules.
+ a := newAgreement(con.receiveChan, con.pullChan, con.nodeSetCache,
+ &con.agreementWaitGroup, con.logger)
+ con.agreements = append(con.agreements, a)
+ go a.run()
+ }
+ }
+}
+
+// startAgreement starts agreements for receiving votes and agreements.
+func (con *Consensus) startAgreement(numChains uint32) {
+ // Start a routine for listening receive channel and pull block channel.
+ go func() {
+ for {
+ select {
+ case b, ok := <-con.receiveChan:
+ if !ok {
+ return
+ }
+ chainID := b.Position.ChainID
+ func() {
+ con.lock.Lock()
+ defer con.lock.Unlock()
+ // If round is cut in agreements, do not add blocks with round less
+ // then cut round.
+ if b.Position.Round < con.agreementRoundCut {
+ return
+ }
+ con.blocks[chainID] = append(con.blocks[chainID], b)
+ sort.Sort(con.blocks[chainID])
+ }()
+ case h, ok := <-con.pullChan:
+ if !ok {
+ return
+ }
+ con.network.PullBlocks(common.Hashes{h})
+ }
+ }
+ }()
+}
+
+// startNetwork starts network for receiving blocks and agreement results.
+func (con *Consensus) startNetwork() {
+ go func() {
+ con.moduleWaitGroup.Add(1)
+ defer con.moduleWaitGroup.Done()
+ Loop:
+ for {
+ select {
+ case val := <-con.network.ReceiveChan():
+ var pos types.Position
+ switch v := val.(type) {
+ case *types.Block:
+ pos = v.Position
+ case *types.AgreementResult:
+ pos = v.Position
+ case *types.BlockRandomnessResult:
+ func() {
+ con.lock.Lock()
+ defer con.lock.Unlock()
+ if v.Position.Round >= con.agreementRoundCut {
+ con.randomnessResults = append(con.randomnessResults, v)
+ }
+ }()
+ continue Loop
+ default:
+ continue Loop
+ }
+ func() {
+ con.lock.RLock()
+ defer con.lock.RUnlock()
+ if pos.ChainID >= uint32(len(con.agreements)) {
+ con.logger.Error("Unknown chainID message received (syncer)",
+ "position", &pos)
+ }
+ }()
+ con.agreements[pos.ChainID].inputChan <- val
+ case <-con.ctx.Done():
+ return
+ }
+ }
+ }()
+}
+
+// startCRSMonitor is the dummiest way to verify if the CRS for one round
+// is ready or not.
+func (con *Consensus) startCRSMonitor() {
+ var lastNotifiedRound uint64
+ // Notify all agreements for new CRS.
+ notifyNewCRS := func(round uint64) {
+ if round == lastNotifiedRound {
+ return
+ }
+ con.logger.Info("CRS is ready", "round", round)
+ con.lock.RLock()
+ defer con.lock.RUnlock()
+ lastNotifiedRound = round
+ for _, a := range con.agreements {
+ a.inputChan <- round
+ }
+ }
+ go func() {
+ con.moduleWaitGroup.Add(1)
+ defer con.moduleWaitGroup.Done()
+ for {
+ select {
+ case <-con.ctx.Done():
+ return
+ case <-time.After(1 * time.Second):
+ // Notify agreement modules for the latest round that CRS is
+ // available if the round is not notified yet.
+ var crsRound = lastNotifiedRound
+ for (con.gov.CRS(crsRound+1) != common.Hash{}) {
+ crsRound++
+ }
+ notifyNewCRS(crsRound)
+ }
+ }
+ }()
+}
+
+func (con *Consensus) stopAgreement() {
+ func() {
+ con.lock.Lock()
+ defer con.lock.Unlock()
+ for _, a := range con.agreements {
+ close(a.inputChan)
+ }
+ }()
+ con.agreementWaitGroup.Wait()
+ close(con.receiveChan)
+ close(con.pullChan)
+}
+
+func (con *Consensus) isConfigChanged(prev, cur *types.Config) bool {
+ return prev.K != cur.K ||
+ prev.NumChains != cur.NumChains ||
+ prev.PhiRatio != cur.PhiRatio
+}
diff --git a/vendor/vendor.json b/vendor/vendor.json
index f524bce16..56b294989 100644
--- a/vendor/vendor.json
+++ b/vendor/vendor.json
@@ -145,6 +145,12 @@
"revisionTime": "2018-12-20T09:26:30Z"
},
{
+ "checksumSHA1": "VRDwO2+FQkeZFeuLfRFS9FUpdCc=",
+ "path": "github.com/dexon-foundation/dexon-consensus/core/syncer",
+ "revision": "146ed32cf841151b826eafd7d6ade188c56865bf",
+ "revisionTime": "2018-12-20T09:26:30Z"
+ },
+ {
"checksumSHA1": "Z079qQV+aQV9A3kSJ0LbFjx5VO4=",
"path": "github.com/dexon-foundation/dexon-consensus/core/types",
"revision": "146ed32cf841151b826eafd7d6ade188c56865bf",