diff options
-rw-r--r-- | core/blockchain.go | 6 | ||||
-rw-r--r-- | core/tx_pool.go | 53 | ||||
-rw-r--r-- | dex/api.go | 16 | ||||
-rw-r--r-- | dex/app.go | 5 | ||||
-rw-r--r-- | dex/backend.go | 41 | ||||
-rw-r--r-- | dex/blockproposer.go | 201 | ||||
-rw-r--r-- | dex/handler.go | 8 | ||||
-rw-r--r-- | internal/web3ext/web3ext.go | 16 | ||||
-rw-r--r-- | les/handler_test.go | 19 | ||||
-rw-r--r-- | vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/agreement.go | 181 | ||||
-rw-r--r-- | vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/consensus.go | 747 | ||||
-rw-r--r-- | vendor/vendor.json | 6 |
12 files changed, 1199 insertions, 100 deletions
diff --git a/core/blockchain.go b/core/blockchain.go index 1b3b22e5e..0e87360c3 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1796,8 +1796,6 @@ func (bc *BlockChain) processPendingBlock( return nil, nil, nil, fmt.Errorf("validate witness data error: %v", err) } - currentBlock := bc.CurrentBlock() - var ( receipts types.Receipts usedGas = new(uint64) @@ -1810,8 +1808,8 @@ func (bc *BlockChain) processPendingBlock( var err error parent, exist := bc.pendingBlocks[block.NumberU64()-1] if !exist { - parentBlock = currentBlock - if parentBlock.NumberU64() != block.NumberU64()-1 { + parentBlock = bc.GetBlockByNumber(block.NumberU64() - 1) + if parentBlock == nil { return nil, nil, nil, fmt.Errorf("parent block %d not exist", block.NumberU64()-1) } } else { diff --git a/core/tx_pool.go b/core/tx_pool.go index ea2025cde..911b6c261 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -19,7 +19,6 @@ package core import ( "errors" "fmt" - "math" "math/big" "sort" "sync" @@ -405,53 +404,6 @@ func (pool *TxPool) lockedReset(oldHead, newHead *types.Header) { // reset retrieves the current state of the blockchain and ensures the content // of the transaction pool is valid with regard to the chain state. func (pool *TxPool) reset(oldHead, newHead *types.Header) { - // If we're reorging an old state, reinject all dropped transactions - var reinject types.Transactions - - if oldHead != nil && oldHead.Hash() != newHead.ParentHash { - // If the reorg is too deep, avoid doing it (will happen during fast sync) - oldNum := oldHead.Number.Uint64() - newNum := newHead.Number.Uint64() - - if depth := uint64(math.Abs(float64(oldNum) - float64(newNum))); depth > 64 { - log.Debug("Skipping deep transaction reorg", "depth", depth) - } else { - // Reorg seems shallow enough to pull in all transactions into memory - var discarded, included types.Transactions - - var ( - rem = pool.chain.GetBlock(oldHead.Hash(), oldHead.Number.Uint64()) - add = pool.chain.GetBlock(newHead.Hash(), newHead.Number.Uint64()) - ) - for rem.NumberU64() > add.NumberU64() { - discarded = append(discarded, rem.Transactions()...) - if rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil { - log.Error("Unrooted old chain seen by tx pool", "block", oldHead.Number, "hash", oldHead.Hash()) - return - } - } - for add.NumberU64() > rem.NumberU64() { - included = append(included, add.Transactions()...) - if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil { - log.Error("Unrooted new chain seen by tx pool", "block", newHead.Number, "hash", newHead.Hash()) - return - } - } - for rem.Hash() != add.Hash() { - discarded = append(discarded, rem.Transactions()...) - if rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil { - log.Error("Unrooted old chain seen by tx pool", "block", oldHead.Number, "hash", oldHead.Hash()) - return - } - included = append(included, add.Transactions()...) - if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil { - log.Error("Unrooted new chain seen by tx pool", "block", newHead.Number, "hash", newHead.Hash()) - return - } - } - reinject = types.TxDifference(discarded, included) - } - } // Initialize the internal state to the current head if newHead == nil { newHead = pool.chain.CurrentBlock().Header() // Special case during testing @@ -465,11 +417,6 @@ func (pool *TxPool) reset(oldHead, newHead *types.Header) { pool.pendingState = state.ManageState(statedb) pool.currentMaxGas = newHead.GasLimit - // Inject any transactions discarded due to reorgs - log.Debug("Reinjecting stale transactions", "count", len(reinject)) - senderCacher.recover(pool.signer, reinject) - pool.addTxsLocked(reinject, false) - // validate the pool of pending transactions, this will remove // any transactions that have been included in the block or // have been invalidated because of another transaction (e.g. diff --git a/dex/api.go b/dex/api.go index 40928a5c1..4d39f9f6e 100644 --- a/dex/api.go +++ b/dex/api.go @@ -131,6 +131,22 @@ func (api *PrivateAdminAPI) ImportChain(file string) (bool, error) { return true, nil } +func (api *PrivateAdminAPI) StartProposing() error { + return api.dex.StartProposing() +} + +func (api *PrivateAdminAPI) StopProposing() { + api.dex.StopProposing() +} + +func (api *PrivateAdminAPI) IsLatticeSyncing() bool { + return api.dex.IsLatticeSyncing() +} + +func (api *PrivateAdminAPI) IsProposing() bool { + return api.dex.IsProposing() +} + // PublicDebugAPI is the collection of Ethereum full node APIs exposed // over the public debugging endpoint. type PublicDebugAPI struct { diff --git a/dex/app.go b/dex/app.go index 9dcfd87e9..d04b2afd6 100644 --- a/dex/app.go +++ b/dex/app.go @@ -451,6 +451,9 @@ func (d *DexconApp) BlockDelivered( blockPosition coreTypes.Position, result coreTypes.FinalizationResult) { + log.Debug("DexconApp block deliver", "height", result.Height, "hash", blockHash, "position", blockPosition.String()) + defer log.Debug("DexconApp block delivered", "height", result.Height, "hash", blockHash, "position", blockPosition.String()) + chainID := blockPosition.ChainID d.chainLock(chainID) defer d.chainUnlock(chainID) @@ -461,6 +464,7 @@ func (d *DexconApp) BlockDelivered( } block.Payload = nil + block.Finalization = result dexconMeta, err := rlp.EncodeToBytes(block) if err != nil { panic(err) @@ -501,6 +505,7 @@ func (d *DexconApp) BlockConfirmed(block coreTypes.Block) { d.chainLock(block.Position.ChainID) defer d.chainUnlock(block.Position.ChainID) + log.Debug("DexconApp block confirmed", "block", block.String()) if err := d.blockchain.AddConfirmedBlock(&block); err != nil { panic(err) } diff --git a/dex/backend.go b/dex/backend.go index 5eb9a85fc..8fe38cd45 100644 --- a/dex/backend.go +++ b/dex/backend.go @@ -21,9 +21,6 @@ import ( "fmt" "time" - dexCore "github.com/dexon-foundation/dexon-consensus/core" - coreEcdsa "github.com/dexon-foundation/dexon-consensus/core/crypto/ecdsa" - "github.com/dexon-foundation/dexon/accounts" "github.com/dexon-foundation/dexon/consensus" "github.com/dexon-foundation/dexon/consensus/dexcon" @@ -31,7 +28,6 @@ import ( "github.com/dexon-foundation/dexon/core/bloombits" "github.com/dexon-foundation/dexon/core/rawdb" "github.com/dexon-foundation/dexon/core/vm" - dexDB "github.com/dexon-foundation/dexon/dex/db" "github.com/dexon-foundation/dexon/dex/downloader" "github.com/dexon-foundation/dexon/eth/filters" "github.com/dexon-foundation/dexon/eth/gasprice" @@ -74,7 +70,8 @@ type Dexon struct { app *DexconApp governance *DexconGovernance network *DexconNetwork - consensus *dexCore.Consensus + + bp *blockProposer networkID uint64 netRPCService *ethapi.PublicNetAPI @@ -154,6 +151,14 @@ func New(ctx *node.ServiceContext, config *Config) (*Dexon, error) { // Set config fetcher so engine can fetch current system configuration from state. engine.SetConfigFetcher(dex.governance) + dMoment := time.Unix(config.DMoment, int64(0)) + log.Info("DEXON Consensus DMoment", "time", dMoment) + + // Force starting with full sync mode if this node is a bootstrap proposer. + if config.BlockProposerEnabled && dMoment.After(time.Now()) { + config.SyncMode = downloader.FullSync + } + pm, err := NewProtocolManager(dex.chainConfig, config.SyncMode, config.NetworkId, dex.eventMux, dex.txPool, dex.engine, dex.blockchain, chainDb, config.BlockProposerEnabled, dex.governance, dex.app) @@ -164,13 +169,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Dexon, error) { dex.protocolManager = pm dex.network = NewDexconNetwork(pm) - privKey := coreEcdsa.NewPrivateKeyFromECDSA(config.PrivateKey) - - dMoment := time.Unix(config.DMoment, int64(0)) - log.Info("DEXON Consensus DMoment", "time", dMoment) - - dex.consensus = dexCore.NewConsensus(dMoment, - dex.app, dex.governance, dexDB.NewDatabase(chainDb), dex.network, privKey, log.Root()) + dex.bp = NewBlockProposer(dex, dMoment) return dex, nil } @@ -240,15 +239,25 @@ func (s *Dexon) Start(srvr *p2p.Server) error { } func (s *Dexon) Stop() error { - s.consensus.Stop() + s.bp.Stop() s.app.Stop() return nil } func (s *Dexon) StartProposing() error { - // TODO: Run with the latest confirmed block in compaction chain. - s.consensus.Run() - return nil + return s.bp.Start() +} + +func (s *Dexon) StopProposing() { + s.bp.Stop() +} + +func (s *Dexon) IsLatticeSyncing() bool { + return s.bp.IsLatticeSyncing() +} + +func (s *Dexon) IsProposing() bool { + return s.bp.IsProposing() } // CreateDB creates the chain database. diff --git a/dex/blockproposer.go b/dex/blockproposer.go new file mode 100644 index 000000000..21b8ddbde --- /dev/null +++ b/dex/blockproposer.go @@ -0,0 +1,201 @@ +package dex + +import ( + "errors" + "fmt" + "sync" + "sync/atomic" + "time" + + dexCore "github.com/dexon-foundation/dexon-consensus/core" + coreEcdsa "github.com/dexon-foundation/dexon-consensus/core/crypto/ecdsa" + "github.com/dexon-foundation/dexon-consensus/core/syncer" + coreTypes "github.com/dexon-foundation/dexon-consensus/core/types" + + "github.com/dexon-foundation/dexon/core" + "github.com/dexon-foundation/dexon/dex/db" + "github.com/dexon-foundation/dexon/log" + "github.com/dexon-foundation/dexon/rlp" +) + +type blockProposer struct { + mu sync.Mutex + running int32 + syncing int32 + proposing int32 + dex *Dexon + dMoment time.Time + + wg sync.WaitGroup + stopCh chan struct{} +} + +func NewBlockProposer(dex *Dexon, dMoment time.Time) *blockProposer { + return &blockProposer{ + dex: dex, + dMoment: dMoment, + } +} + +func (b *blockProposer) Start() error { + b.mu.Lock() + defer b.mu.Unlock() + + if !atomic.CompareAndSwapInt32(&b.running, 0, 1) { + return fmt.Errorf("block proposer is already running") + } + log.Info("Block proposer started") + + b.stopCh = make(chan struct{}) + b.wg.Add(1) + go func() { + defer b.wg.Done() + defer atomic.StoreInt32(&b.running, 0) + + var err error + var c *dexCore.Consensus + + if b.dMoment.After(time.Now()) { + c = b.initConsensus() + } else { + c, err = b.syncConsensus() + } + + if err != nil { + log.Error("Block proposer stopped, before start running", "err", err) + return + } + + b.run(c) + log.Info("Block proposer successfully stopped") + }() + return nil +} + +func (b *blockProposer) run(c *dexCore.Consensus) { + log.Info("Start running consensus core") + go c.Run() + atomic.StoreInt32(&b.proposing, 1) + <-b.stopCh + log.Debug("Block proposer receive stop signal") + c.Stop() +} + +func (b *blockProposer) Stop() { + b.mu.Lock() + defer b.mu.Unlock() + + if atomic.LoadInt32(&b.running) == 1 { + b.dex.protocolManager.isBlockProposer = false + close(b.stopCh) + b.wg.Wait() + atomic.StoreInt32(&b.proposing, 0) + } +} + +func (b *blockProposer) IsLatticeSyncing() bool { + return atomic.LoadInt32(&b.syncing) == 1 +} + +func (b *blockProposer) IsProposing() bool { + return atomic.LoadInt32(&b.proposing) == 1 +} + +func (b *blockProposer) initConsensus() *dexCore.Consensus { + db := db.NewDatabase(b.dex.chainDb) + privkey := coreEcdsa.NewPrivateKeyFromECDSA(b.dex.config.PrivateKey) + return dexCore.NewConsensus(b.dMoment, + b.dex.app, b.dex.governance, db, b.dex.network, privkey, log.Root()) +} + +func (b *blockProposer) syncConsensus() (*dexCore.Consensus, error) { + atomic.StoreInt32(&b.syncing, 1) + defer atomic.StoreInt32(&b.syncing, 0) + + db := db.NewDatabase(b.dex.chainDb) + privkey := coreEcdsa.NewPrivateKeyFromECDSA(b.dex.config.PrivateKey) + consensusSync := syncer.NewConsensus(b.dMoment, b.dex.app, b.dex.governance, + db, b.dex.network, privkey, log.Root()) + + blocksToSync := func(coreHeight, height uint64) []*coreTypes.Block { + var blocks []*coreTypes.Block + for len(blocks) < 1024 && coreHeight < height { + var block coreTypes.Block + b := b.dex.blockchain.GetBlockByNumber(coreHeight + 1) + if err := rlp.DecodeBytes(b.Header().DexconMeta, &block); err != nil { + panic(err) + } + blocks = append(blocks, &block) + coreHeight = coreHeight + 1 + } + return blocks + } + + // Sync all blocks in compaction chain to core. + _, coreHeight := db.GetCompactionChainTipInfo() + +Loop: + for { + currentBlock := b.dex.blockchain.CurrentBlock() + log.Debug("Syncing compaction chain", "core height", coreHeight, + "height", currentBlock.NumberU64()) + blocks := blocksToSync(coreHeight, currentBlock.NumberU64()) + + if len(blocks) == 0 { + break Loop + } + + log.Debug("Filling compaction chain", "num", len(blocks), + "first", blocks[0].Finalization.Height, + "last", blocks[len(blocks)-1].Finalization.Height) + if _, err := consensusSync.SyncBlocks(blocks, false); err != nil { + return nil, err + } + coreHeight = blocks[len(blocks)-1].Finalization.Height + + select { + case <-b.stopCh: + return nil, errors.New("early stop") + default: + } + } + + // Enable isBlockProposer flag to start receiving msg. + b.dex.protocolManager.isBlockProposer = true + + ch := make(chan core.ChainHeadEvent) + sub := b.dex.blockchain.SubscribeChainHeadEvent(ch) + defer sub.Unsubscribe() + + // Listen chain head event until synced. +ListenLoop: + for { + select { + case ev := <-ch: + blocks := blocksToSync(coreHeight, ev.Block.NumberU64()) + if len(blocks) > 0 { + log.Debug("Filling compaction chain", "num", len(blocks), + "first", blocks[0].Finalization.Height, + "last", blocks[len(blocks)-1].Finalization.Height) + synced, err := consensusSync.SyncBlocks(blocks, true) + if err != nil { + log.Error("SyncBlocks fail", "err", err) + return nil, err + } + if synced { + log.Debug("Consensus core synced") + break ListenLoop + } + coreHeight = blocks[len(blocks)-1].Finalization.Height + } + case <-sub.Err(): + log.Debug("System stopped when syncing consensus core") + return nil, errors.New("system stop") + case <-b.stopCh: + log.Debug("Early stop, before consensus core can run") + return nil, errors.New("early stop") + } + } + + return consensusSync.GetSyncedConsensus() +} diff --git a/dex/handler.go b/dex/handler.go index 9174d8516..ff87884d2 100644 --- a/dex/handler.go +++ b/dex/handler.go @@ -713,10 +713,6 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } case msg.Code == NewBlockHashesMsg: - // Ignore new block hash messages in block proposer mode. - if pm.isBlockProposer { - break - } var announces newBlockHashesData if err := msg.Decode(&announces); err != nil { return errResp(ErrDecode, "%v: %v", msg, err) @@ -737,10 +733,6 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } case msg.Code == NewBlockMsg: - // Ignore new block messages in block proposer mode. - if pm.isBlockProposer { - break - } // Retrieve and decode the propagated block var block types.Block if err := msg.Decode(&block); err != nil { diff --git a/internal/web3ext/web3ext.go b/internal/web3ext/web3ext.go index 6b98c8b7e..2253142b9 100644 --- a/internal/web3ext/web3ext.go +++ b/internal/web3ext/web3ext.go @@ -199,6 +199,14 @@ web3._extend({ name: 'stopWS', call: 'admin_stopWS' }), + new web3._extend.Method({ + name: 'startProposing', + call: 'admin_startProposing' + }), + new web3._extend.Method({ + name: 'stopProposing', + call: 'admin_stopProposing' + }), ], properties: [ new web3._extend.Property({ @@ -213,6 +221,14 @@ web3._extend({ name: 'datadir', getter: 'admin_datadir' }), + new web3._extend.Property({ + name: 'isLatticeSyncing', + getter: 'admin_isLatticeSyncing' + }), + new web3._extend.Property({ + name: 'isProposing', + getter: 'admin_isProposing' + }), ] }); ` diff --git a/les/handler_test.go b/les/handler_test.go index 4a22b705b..bca84d346 100644 --- a/les/handler_test.go +++ b/les/handler_test.go @@ -562,23 +562,4 @@ func TestTransactionStatusLes2(t *testing.T) { block1hash := rawdb.ReadCanonicalHash(db, 1) test(tx1, false, txStatus{Status: core.TxStatusIncluded, Lookup: &rawdb.TxLookupEntry{BlockHash: block1hash, BlockIndex: 1, Index: 0}}) test(tx2, false, txStatus{Status: core.TxStatusIncluded, Lookup: &rawdb.TxLookupEntry{BlockHash: block1hash, BlockIndex: 1, Index: 1}}) - - // create a reorg that rolls them back - gchain, _ = core.GenerateChain(params.TestChainConfig, chain.GetBlockByNumber(0), ethash.NewFaker(), db, 2, func(i int, block *core.BlockGen) {}) - if _, err := chain.InsertChain(gchain); err != nil { - panic(err) - } - // wait until TxPool processes the reorg - for i := 0; i < 10; i++ { - if pending, _ := txpool.Stats(); pending == 3 { - break - } - time.Sleep(100 * time.Millisecond) - } - if pending, _ := txpool.Stats(); pending != 3 { - t.Fatalf("pending count mismatch: have %d, want 3", pending) - } - // check if their status is pending again - test(tx1, false, txStatus{Status: core.TxStatusPending}) - test(tx2, false, txStatus{Status: core.TxStatusPending}) } diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/agreement.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/agreement.go new file mode 100644 index 000000000..fee462442 --- /dev/null +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/agreement.go @@ -0,0 +1,181 @@ +// Copyright 2018 The dexon-consensus-core Authors +// This file is part of the dexon-consensus-core library. +// +// The dexon-consensus-core library is free software: you can redistribute it +// and/or modify it under the terms of the GNU Lesser General Public License as +// published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The dexon-consensus-core library is distributed in the hope that it will be +// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the dexon-consensus-core library. If not, see +// <http://www.gnu.org/licenses/>. + +package syncer + +import ( + "sync" + + "github.com/dexon-foundation/dexon-consensus/common" + "github.com/dexon-foundation/dexon-consensus/core" + "github.com/dexon-foundation/dexon-consensus/core/types" + "github.com/dexon-foundation/dexon-consensus/core/utils" +) + +// Struct agreement implements struct of BA (Byzantine Agreement) protocol +// needed in syncer, which only receives agreement results. +type agreement struct { + wg *sync.WaitGroup + cache *utils.NodeSetCache + inputChan chan interface{} + outputChan chan<- *types.Block + pullChan chan<- common.Hash + blocks map[types.Position]map[common.Hash]*types.Block + agreementResults map[common.Hash]struct{} + latestCRSRound uint64 + pendings map[uint64]map[common.Hash]*types.AgreementResult + logger common.Logger + confirmedBlocks map[common.Hash]struct{} +} + +// newAgreement creates a new agreement instance. +func newAgreement( + ch chan<- *types.Block, + pullChan chan<- common.Hash, + cache *utils.NodeSetCache, + wg *sync.WaitGroup, + logger common.Logger) *agreement { + + return &agreement{ + cache: cache, + wg: wg, + inputChan: make(chan interface{}, 1000), + outputChan: ch, + pullChan: pullChan, + blocks: make(map[types.Position]map[common.Hash]*types.Block), + agreementResults: make(map[common.Hash]struct{}), + logger: logger, + pendings: make( + map[uint64]map[common.Hash]*types.AgreementResult), + confirmedBlocks: make(map[common.Hash]struct{}), + } +} + +// run starts the agreement, this does not start a new routine, go a new +// routine explicitly in the caller. +func (a *agreement) run() { + a.wg.Add(1) + defer a.wg.Done() + for { + select { + case val, ok := <-a.inputChan: + if !ok { + // InputChan is closed by network when network ends. + return + } + switch v := val.(type) { + case *types.Block: + a.processBlock(v) + case *types.AgreementResult: + a.processAgreementResult(v) + case uint64: + a.processNewCRS(v) + } + } + } +} + +func (a *agreement) processBlock(b *types.Block) { + if _, exist := a.confirmedBlocks[b.Hash]; exist { + return + } + if _, exist := a.agreementResults[b.Hash]; exist { + a.confirm(b) + } else { + if _, exist := a.blocks[b.Position]; !exist { + a.blocks[b.Position] = make(map[common.Hash]*types.Block) + } + a.blocks[b.Position][b.Hash] = b + } +} + +func (a *agreement) processAgreementResult(r *types.AgreementResult) { + // Cache those results that CRS is not ready yet. + if _, exists := a.confirmedBlocks[r.BlockHash]; exists { + a.logger.Info("agreement result already confirmed", "result", r) + return + } + if r.Position.Round > a.latestCRSRound { + pendingsForRound, exists := a.pendings[r.Position.Round] + if !exists { + pendingsForRound = make(map[common.Hash]*types.AgreementResult) + a.pendings[r.Position.Round] = pendingsForRound + } + pendingsForRound[r.BlockHash] = r + a.logger.Info("agreement result cached", "result", r) + return + } + if err := core.VerifyAgreementResult(r, a.cache); err != nil { + a.logger.Error("agreement result verification failed", + "result", r, + "error", err) + return + } + if r.IsEmptyBlock { + // Empty block is also confirmed. + b := &types.Block{ + Position: r.Position, + } + a.confirm(b) + } else { + needPull := true + if bs, exist := a.blocks[r.Position]; exist { + if b, exist := bs[r.BlockHash]; exist { + a.confirm(b) + needPull = false + } + } + if needPull { + a.agreementResults[r.BlockHash] = struct{}{} + a.pullChan <- r.BlockHash + } + } +} + +func (a *agreement) processNewCRS(round uint64) { + if round <= a.latestCRSRound { + return + } + // Verify all pending results. + for r := a.latestCRSRound + 1; r <= round; r++ { + pendingsForRound := a.pendings[r] + if pendingsForRound == nil { + continue + } + delete(a.pendings, r) + for _, res := range pendingsForRound { + if err := core.VerifyAgreementResult(res, a.cache); err != nil { + a.logger.Error("invalid agreement result", "result", res) + continue + } + a.logger.Error("flush agreement result", "result", res) + a.processAgreementResult(res) + break + } + } + a.latestCRSRound = round +} + +// confirm notifies consensus the confirmation of a block in BA. +func (a *agreement) confirm(b *types.Block) { + if _, exist := a.confirmedBlocks[b.Hash]; !exist { + delete(a.blocks, b.Position) + delete(a.agreementResults, b.Hash) + a.outputChan <- b + a.confirmedBlocks[b.Hash] = struct{}{} + } +} diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/consensus.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/consensus.go new file mode 100644 index 000000000..da9d352f4 --- /dev/null +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/consensus.go @@ -0,0 +1,747 @@ +// Copyright 2018 The dexon-consensus Authors +// This file is part of the dexon-consensus library. +// +// The dexon-consensus library is free software: you can redistribute it +// and/or modify it under the terms of the GNU Lesser General Public License as +// published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The dexon-consensus library is distributed in the hope that it will be +// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the dexon-consensus library. If not, see +// <http://www.gnu.org/licenses/>. + +package syncer + +import ( + "context" + "fmt" + "sort" + "sync" + "time" + + "github.com/dexon-foundation/dexon-consensus/common" + "github.com/dexon-foundation/dexon-consensus/core" + "github.com/dexon-foundation/dexon-consensus/core/crypto" + "github.com/dexon-foundation/dexon-consensus/core/db" + "github.com/dexon-foundation/dexon-consensus/core/types" + "github.com/dexon-foundation/dexon-consensus/core/utils" +) + +var ( + // ErrAlreadySynced is reported when syncer is synced. + ErrAlreadySynced = fmt.Errorf("already synced") + // ErrNotSynced is reported when syncer is not synced yet. + ErrNotSynced = fmt.Errorf("not synced yet") + // ErrGenesisBlockReached is reported when genesis block reached. + ErrGenesisBlockReached = fmt.Errorf("genesis block reached") + // ErrInvalidBlockOrder is reported when SyncBlocks receives unordered blocks. + ErrInvalidBlockOrder = fmt.Errorf("invalid block order") + // ErrMismatchBlockHashSequence means the delivering sequence is not + // correct, compared to finalized blocks. + ErrMismatchBlockHashSequence = fmt.Errorf("mismatch block hash sequence") + // ErrInvalidSyncingFinalizationHeight raised when the blocks to sync is + // not following the compaction chain tip in database. + ErrInvalidSyncingFinalizationHeight = fmt.Errorf( + "invalid syncing finalization height") +) + +// Consensus is for syncing consensus module. +type Consensus struct { + db db.Database + gov core.Governance + dMoment time.Time + logger common.Logger + app core.Application + prv crypto.PrivateKey + network core.Network + nodeSetCache *utils.NodeSetCache + + lattice *core.Lattice + validatedChains map[uint32]struct{} + finalizedBlockHashes common.Hashes + latticeLastRound uint64 + randomnessResults []*types.BlockRandomnessResult + blocks []types.ByPosition + agreements []*agreement + configs []*types.Config + roundBeginTimes []time.Time + agreementRoundCut uint64 + + // lock for accessing all fields. + lock sync.RWMutex + moduleWaitGroup sync.WaitGroup + agreementWaitGroup sync.WaitGroup + pullChan chan common.Hash + receiveChan chan *types.Block + ctx context.Context + ctxCancel context.CancelFunc + syncedLastBlock *types.Block + syncedConsensus *core.Consensus +} + +// NewConsensus creates an instance for Consensus (syncer consensus). +func NewConsensus( + dMoment time.Time, + app core.Application, + gov core.Governance, + db db.Database, + network core.Network, + prv crypto.PrivateKey, + logger common.Logger) *Consensus { + + con := &Consensus{ + dMoment: dMoment, + app: app, + gov: gov, + db: db, + network: network, + nodeSetCache: utils.NewNodeSetCache(gov), + prv: prv, + logger: logger, + validatedChains: make(map[uint32]struct{}), + configs: []*types.Config{ + utils.GetConfigWithPanic(gov, 0, logger), + }, + roundBeginTimes: []time.Time{dMoment}, + receiveChan: make(chan *types.Block, 1000), + pullChan: make(chan common.Hash, 1000), + } + con.ctx, con.ctxCancel = context.WithCancel(context.Background()) + return con +} + +func (con *Consensus) initConsensusObj(initBlock *types.Block) { + var cfg *types.Config + func() { + con.lock.Lock() + defer con.lock.Unlock() + con.latticeLastRound = initBlock.Position.Round + cfg = con.configs[con.latticeLastRound] + debugApp, _ := con.app.(core.Debug) + con.lattice = core.NewLattice( + con.roundBeginTimes[con.latticeLastRound], + con.latticeLastRound, + cfg, + core.NewAuthenticator(con.prv), + con.app, + debugApp, + con.db, + con.logger, + ) + }() + con.startAgreement(cfg.NumChains) + con.startNetwork() + con.startCRSMonitor() +} + +func (con *Consensus) checkIfValidated() bool { + con.lock.RLock() + defer con.lock.RUnlock() + var numChains = con.configs[con.blocks[0][0].Position.Round].NumChains + var validatedChainCount uint32 + // Make sure we validate some block in all chains. + for chainID := range con.validatedChains { + if chainID < numChains { + validatedChainCount++ + } + } + if validatedChainCount == numChains { + return true + } + con.logger.Info("not validated yet", "validated-chain", validatedChainCount) + return false +} + +func (con *Consensus) checkIfSynced(blocks []*types.Block) bool { + con.lock.RLock() + defer con.lock.RUnlock() + var ( + numChains = con.configs[con.blocks[0][0].Position.Round].NumChains + compactionTips = make([]*types.Block, numChains) + overlapCount = uint32(0) + ) + // Find tips (newset blocks) of each chain in compaction chain. + b := blocks[len(blocks)-1] + for tipCount := uint32(0); tipCount < numChains; { + if compactionTips[b.Position.ChainID] == nil { + // Check chainID for config change. + if b.Position.ChainID < numChains { + compactionTips[b.Position.ChainID] = b + tipCount++ + } + } + if (b.Finalization.ParentHash == common.Hash{}) { + return false + } + b1, err := con.db.GetBlock(b.Finalization.ParentHash) + if err != nil { + panic(err) + } + b = &b1 + } + // Check if chain tips of compaction chain and current cached confirmed + // blocks are overlapped on each chain, numChains is decided by the round + // of last block we seen on compaction chain. + for chainID, b := range compactionTips { + if len(con.blocks[chainID]) > 0 { + if !b.Position.Older(&con.blocks[chainID][0].Position) { + overlapCount++ + } + } + } + if overlapCount == numChains { + return true + } + con.logger.Info("not synced yet", + "overlap-count", overlapCount, + "num-chain", numChains, + "last-block", blocks[len(blocks)-1]) + return false +} + +// ensureAgreementOverlapRound ensures the oldest blocks in each chain in +// con.blocks are all in the same round, for avoiding config change while +// syncing. +func (con *Consensus) ensureAgreementOverlapRound() bool { + con.lock.Lock() + defer con.lock.Unlock() + if con.agreementRoundCut > 0 { + return true + } + // Clean empty blocks on tips of chains. + for idx, bs := range con.blocks { + for len(bs) > 0 && con.isEmptyBlock(bs[0]) { + bs = bs[1:] + } + con.blocks[idx] = bs + } + // Build empty blocks. + for _, bs := range con.blocks { + for i := range bs { + if con.isEmptyBlock(bs[i]) { + if bs[i-1].Position.Height == bs[i].Position.Height-1 { + con.buildEmptyBlock(bs[i], bs[i-1]) + } + } + } + } + var tipRoundMap map[uint64]uint32 + for { + tipRoundMap = make(map[uint64]uint32) + for _, bs := range con.blocks { + if len(bs) > 0 { + tipRoundMap[bs[0].Position.Round]++ + } + } + if len(tipRoundMap) <= 1 { + break + } + // Make all tips in same round. + var maxRound uint64 + for r := range tipRoundMap { + if r > maxRound { + maxRound = r + } + } + for idx, bs := range con.blocks { + for len(bs) > 0 && bs[0].Position.Round < maxRound { + bs = bs[1:] + } + con.blocks[idx] = bs + } + } + if len(tipRoundMap) == 1 { + var r uint64 + for r = range tipRoundMap { + break + } + if tipRoundMap[r] == con.configs[r].NumChains { + con.agreementRoundCut = r + con.logger.Info("agreement round cut found, round", r) + return true + } + } + return false +} + +func (con *Consensus) findLatticeSyncBlock( + blocks []*types.Block) (*types.Block, error) { + lastBlock := blocks[len(blocks)-1] + round := lastBlock.Position.Round + for { + // Find round r which r-1, r, r+1 are all in same total ordering config. + for { + sameAsPrevRound := round == 0 || !con.isConfigChanged( + con.configs[round-1], con.configs[round]) + sameAsNextRound := !con.isConfigChanged( + con.configs[round], con.configs[round+1]) + if sameAsPrevRound && sameAsNextRound { + break + } + if round == 0 { + // Unable to find a safe round, wait for new rounds. + return nil, nil + } + round-- + } + // Find the newset block which round is "round". + for lastBlock.Position.Round != round { + if (lastBlock.Finalization.ParentHash == common.Hash{}) { + return nil, ErrGenesisBlockReached + } + b, err := con.db.GetBlock(lastBlock.Finalization.ParentHash) + if err != nil { + return nil, err + } + lastBlock = &b + } + // Find the deliver set by hash for two times. Blocks in a deliver set + // returned by total ordering is sorted by hash. If a block's parent hash + // is greater than its hash means there is a cut between deliver sets. + var curBlock, prevBlock *types.Block + var deliverSetFirstBlock, deliverSetLastBlock *types.Block + curBlock = lastBlock + for { + if (curBlock.Finalization.ParentHash == common.Hash{}) { + return nil, ErrGenesisBlockReached + } + b, err := con.db.GetBlock(curBlock.Finalization.ParentHash) + if err != nil { + return nil, err + } + prevBlock = &b + if !prevBlock.Hash.Less(curBlock.Hash) { + break + } + curBlock = prevBlock + } + deliverSetLastBlock = prevBlock + curBlock = prevBlock + for { + if (curBlock.Finalization.ParentHash == common.Hash{}) { + break + } + b, err := con.db.GetBlock(curBlock.Finalization.ParentHash) + if err != nil { + return nil, err + } + prevBlock = &b + if !prevBlock.Hash.Less(curBlock.Hash) { + break + } + curBlock = prevBlock + } + deliverSetFirstBlock = curBlock + // Check if all blocks from deliverSetFirstBlock to deliverSetLastBlock + // are in the same round. + ok := true + curBlock = deliverSetLastBlock + for { + if curBlock.Position.Round != round { + ok = false + break + } + b, err := con.db.GetBlock(curBlock.Finalization.ParentHash) + if err != nil { + return nil, err + } + curBlock = &b + if curBlock.Hash == deliverSetFirstBlock.Hash { + break + } + } + if ok { + return deliverSetFirstBlock, nil + } + if round == 0 { + return nil, nil + } + round-- + } +} + +func (con *Consensus) processFinalizedBlock(block *types.Block) error { + if con.lattice == nil { + return nil + } + con.finalizedBlockHashes = append(con.finalizedBlockHashes, block.Hash) + delivered, err := con.lattice.ProcessFinalizedBlock(block) + if err != nil { + return err + } + for idx, b := range delivered { + if con.finalizedBlockHashes[idx] != b.Hash { + return ErrMismatchBlockHashSequence + } + con.validatedChains[b.Position.ChainID] = struct{}{} + } + con.finalizedBlockHashes = con.finalizedBlockHashes[len(delivered):] + return nil +} + +// SyncBlocks syncs blocks from compaction chain, latest is true if the caller +// regards the blocks are the latest ones. Notice that latest can be true for +// many times. +// NOTICE: parameter "blocks" should be consecutive in compaction height. +func (con *Consensus) SyncBlocks( + blocks []*types.Block, latest bool) (bool, error) { + if con.syncedLastBlock != nil { + return true, ErrAlreadySynced + } + if len(blocks) == 0 { + return false, nil + } + // Check if blocks are consecutive. + for i := 1; i < len(blocks); i++ { + if blocks[i].Finalization.Height != blocks[i-1].Finalization.Height+1 { + return false, ErrInvalidBlockOrder + } + } + // Make sure the first block is the next block of current compaction chain + // tip in DB. + _, tipHeight := con.db.GetCompactionChainTipInfo() + if blocks[0].Finalization.Height != tipHeight+1 { + con.logger.Error("mismatched finalization height", + "now", blocks[0].Finalization.Height, + "expected", tipHeight+1) + return false, ErrInvalidSyncingFinalizationHeight + } + con.logger.Info("syncBlocks", + "position", &blocks[0].Position, + "final height", blocks[0].Finalization.Height, + "len", len(blocks), + "latest", latest, + ) + con.setupConfigs(blocks) + for _, b := range blocks { + // TODO(haoping) remove this if lattice puts blocks into db. + if err := con.db.PutBlock(*b); err != nil { + // A block might be put into db when confirmed by BA, but not + // finalized yet. + if err == db.ErrBlockExists { + err = con.db.UpdateBlock(*b) + } + if err != nil { + return false, err + } + } + if err := con.db.PutCompactionChainTipInfo( + b.Hash, b.Finalization.Height); err != nil { + return false, err + } + if err := con.processFinalizedBlock(b); err != nil { + return false, err + } + } + if latest && con.lattice == nil { + // New Lattice and find the deliver set of total ordering when "latest" is + // true for first time. Deliver set is found by block hashes. + syncBlock, err := con.findLatticeSyncBlock(blocks) + if err != nil { + return false, err + } + if syncBlock != nil { + con.logger.Info("deliver set found", syncBlock) + // New lattice with the round of syncBlock. + con.initConsensusObj(syncBlock) + // Process blocks from syncBlock to blocks' last block. + b := blocks[len(blocks)-1] + blocksCount := b.Finalization.Height - syncBlock.Finalization.Height + 1 + blocksToProcess := make([]*types.Block, blocksCount) + for { + blocksToProcess[blocksCount-1] = b + blocksCount-- + if b.Hash == syncBlock.Hash { + break + } + b1, err := con.db.GetBlock(b.Finalization.ParentHash) + if err != nil { + return false, err + } + b = &b1 + } + for _, b := range blocksToProcess { + if err := con.processFinalizedBlock(b); err != nil { + return false, err + } + } + } + } + if latest && con.ensureAgreementOverlapRound() { + // Check if compaction and agreements' blocks are overlapped. The + // overlapping of compaction chain and BA's oldest blocks means the + // syncing is done. + if con.checkIfValidated() && con.checkIfSynced(blocks) { + if err := con.Stop(); err != nil { + return false, err + } + con.syncedLastBlock = blocks[len(blocks)-1] + con.logger.Info("syncer.Consensus synced", + "last-block", con.syncedLastBlock) + } + } + return con.syncedLastBlock != nil, nil +} + +// GetSyncedConsensus returns the core.Consensus instance after synced. +func (con *Consensus) GetSyncedConsensus() (*core.Consensus, error) { + if con.syncedConsensus != nil { + return con.syncedConsensus, nil + } + if con.syncedLastBlock == nil { + return nil, ErrNotSynced + } + // flush all blocks in con.blocks into core.Consensus, and build + // core.Consensus from syncer. + confirmedBlocks := []*types.Block{} + func() { + con.lock.Lock() + defer con.lock.Unlock() + for _, bs := range con.blocks { + confirmedBlocks = append(confirmedBlocks, bs...) + } + }() + var err error + con.syncedConsensus, err = core.NewConsensusFromSyncer( + con.syncedLastBlock, + con.roundBeginTimes[con.syncedLastBlock.Position.Round], + con.app, + con.gov, + con.db, + con.network, + con.prv, + con.lattice, + confirmedBlocks, + con.randomnessResults, + con.logger) + return con.syncedConsensus, err +} + +// Stop the syncer. +// +// This method is mainly for caller to stop the syncer before synced, the syncer +// would call this method automatically after synced. +func (con *Consensus) Stop() error { + // Stop network and CRS routines, wait until they are all stoped. + con.ctxCancel() + con.moduleWaitGroup.Wait() + // Stop agreements. + con.stopAgreement() + return nil +} + +// isEmptyBlock checks if a block is an empty block by both its hash and parent +// hash are empty. +func (con *Consensus) isEmptyBlock(b *types.Block) bool { + return b.Hash == common.Hash{} && b.ParentHash == common.Hash{} +} + +// buildEmptyBlock builds an empty block in agreement. +func (con *Consensus) buildEmptyBlock(b *types.Block, parent *types.Block) { + cfg := con.configs[b.Position.Round] + b.Timestamp = parent.Timestamp.Add(cfg.MinBlockInterval) + b.Witness.Height = parent.Witness.Height + b.Witness.Data = make([]byte, len(parent.Witness.Data)) + copy(b.Witness.Data, parent.Witness.Data) + b.Acks = common.NewSortedHashes(common.Hashes{parent.Hash}) +} + +// setupConfigs is called by SyncBlocks with blocks from compaction chain. In +// the first time, setupConfigs setups from round 0. +func (con *Consensus) setupConfigs(blocks []*types.Block) { + // Find max round in blocks. + var maxRound uint64 + for _, b := range blocks { + if b.Position.Round > maxRound { + maxRound = b.Position.Round + } + } + // Get configs from governance. + // + // In fullnode, the notification of new round is yet another TX, which + // needs to be executed after corresponding block delivered. Thus, the + // configuration for 'maxRound + core.ConfigRoundShift' won't be ready when + // seeing this block. + untilRound := maxRound + core.ConfigRoundShift - 1 + curMaxNumChains := uint32(0) + func() { + con.lock.Lock() + defer con.lock.Unlock() + for r := uint64(len(con.configs)); r <= untilRound; r++ { + cfg := utils.GetConfigWithPanic(con.gov, r, con.logger) + con.configs = append(con.configs, cfg) + con.roundBeginTimes = append( + con.roundBeginTimes, + con.roundBeginTimes[r-1].Add(con.configs[r-1].RoundInterval)) + if cfg.NumChains >= curMaxNumChains { + curMaxNumChains = cfg.NumChains + } + } + }() + con.resizeByNumChains(curMaxNumChains) + // Notify core.Lattice for new configs. + if con.lattice != nil { + for con.latticeLastRound+1 <= maxRound { + con.latticeLastRound++ + if err := con.lattice.AppendConfig( + con.latticeLastRound, + con.configs[con.latticeLastRound]); err != nil { + panic(err) + } + } + } +} + +// resizeByNumChains resizes fake lattice and agreement if numChains increases. +// Notice the decreasing case is neglected. +func (con *Consensus) resizeByNumChains(numChains uint32) { + con.lock.Lock() + defer con.lock.Unlock() + if numChains > uint32(len(con.blocks)) { + for i := uint32(len(con.blocks)); i < numChains; i++ { + // Resize the pool of blocks. + con.blocks = append(con.blocks, types.ByPosition{}) + // Resize agreement modules. + a := newAgreement(con.receiveChan, con.pullChan, con.nodeSetCache, + &con.agreementWaitGroup, con.logger) + con.agreements = append(con.agreements, a) + go a.run() + } + } +} + +// startAgreement starts agreements for receiving votes and agreements. +func (con *Consensus) startAgreement(numChains uint32) { + // Start a routine for listening receive channel and pull block channel. + go func() { + for { + select { + case b, ok := <-con.receiveChan: + if !ok { + return + } + chainID := b.Position.ChainID + func() { + con.lock.Lock() + defer con.lock.Unlock() + // If round is cut in agreements, do not add blocks with round less + // then cut round. + if b.Position.Round < con.agreementRoundCut { + return + } + con.blocks[chainID] = append(con.blocks[chainID], b) + sort.Sort(con.blocks[chainID]) + }() + case h, ok := <-con.pullChan: + if !ok { + return + } + con.network.PullBlocks(common.Hashes{h}) + } + } + }() +} + +// startNetwork starts network for receiving blocks and agreement results. +func (con *Consensus) startNetwork() { + go func() { + con.moduleWaitGroup.Add(1) + defer con.moduleWaitGroup.Done() + Loop: + for { + select { + case val := <-con.network.ReceiveChan(): + var pos types.Position + switch v := val.(type) { + case *types.Block: + pos = v.Position + case *types.AgreementResult: + pos = v.Position + case *types.BlockRandomnessResult: + func() { + con.lock.Lock() + defer con.lock.Unlock() + if v.Position.Round >= con.agreementRoundCut { + con.randomnessResults = append(con.randomnessResults, v) + } + }() + continue Loop + default: + continue Loop + } + func() { + con.lock.RLock() + defer con.lock.RUnlock() + if pos.ChainID >= uint32(len(con.agreements)) { + con.logger.Error("Unknown chainID message received (syncer)", + "position", &pos) + } + }() + con.agreements[pos.ChainID].inputChan <- val + case <-con.ctx.Done(): + return + } + } + }() +} + +// startCRSMonitor is the dummiest way to verify if the CRS for one round +// is ready or not. +func (con *Consensus) startCRSMonitor() { + var lastNotifiedRound uint64 + // Notify all agreements for new CRS. + notifyNewCRS := func(round uint64) { + if round == lastNotifiedRound { + return + } + con.logger.Info("CRS is ready", "round", round) + con.lock.RLock() + defer con.lock.RUnlock() + lastNotifiedRound = round + for _, a := range con.agreements { + a.inputChan <- round + } + } + go func() { + con.moduleWaitGroup.Add(1) + defer con.moduleWaitGroup.Done() + for { + select { + case <-con.ctx.Done(): + return + case <-time.After(1 * time.Second): + // Notify agreement modules for the latest round that CRS is + // available if the round is not notified yet. + var crsRound = lastNotifiedRound + for (con.gov.CRS(crsRound+1) != common.Hash{}) { + crsRound++ + } + notifyNewCRS(crsRound) + } + } + }() +} + +func (con *Consensus) stopAgreement() { + func() { + con.lock.Lock() + defer con.lock.Unlock() + for _, a := range con.agreements { + close(a.inputChan) + } + }() + con.agreementWaitGroup.Wait() + close(con.receiveChan) + close(con.pullChan) +} + +func (con *Consensus) isConfigChanged(prev, cur *types.Config) bool { + return prev.K != cur.K || + prev.NumChains != cur.NumChains || + prev.PhiRatio != cur.PhiRatio +} diff --git a/vendor/vendor.json b/vendor/vendor.json index f524bce16..56b294989 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -145,6 +145,12 @@ "revisionTime": "2018-12-20T09:26:30Z" }, { + "checksumSHA1": "VRDwO2+FQkeZFeuLfRFS9FUpdCc=", + "path": "github.com/dexon-foundation/dexon-consensus/core/syncer", + "revision": "146ed32cf841151b826eafd7d6ade188c56865bf", + "revisionTime": "2018-12-20T09:26:30Z" + }, + { "checksumSHA1": "Z079qQV+aQV9A3kSJ0LbFjx5VO4=", "path": "github.com/dexon-foundation/dexon-consensus/core/types", "revision": "146ed32cf841151b826eafd7d6ade188c56865bf", |