aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhaoping-ku <haoping.ku@dexon.org>2018-11-29 14:30:02 +0800
committerGitHub <noreply@github.com>2018-11-29 14:30:02 +0800
commitdaf3bab93c323b173345811adc9a334dad4a7094 (patch)
tree8a3957ec1f77262ce92c56f0384b0dedc307628c
parent8470ac070f097b261fddc42991a4d2e9ec998db6 (diff)
downloaddexon-consensus-daf3bab93c323b173345811adc9a334dad4a7094.tar
dexon-consensus-daf3bab93c323b173345811adc9a334dad4a7094.tar.gz
dexon-consensus-daf3bab93c323b173345811adc9a334dad4a7094.tar.bz2
dexon-consensus-daf3bab93c323b173345811adc9a334dad4a7094.tar.lz
dexon-consensus-daf3bab93c323b173345811adc9a334dad4a7094.tar.xz
dexon-consensus-daf3bab93c323b173345811adc9a334dad4a7094.tar.zst
dexon-consensus-daf3bab93c323b173345811adc9a334dad4a7094.zip
core: syncer: add syncer (#346)
-rw-r--r--core/consensus.go39
-rw-r--r--core/lattice.go30
-rw-r--r--core/syncer/agreement.go177
-rw-r--r--core/syncer/consensus.go622
-rw-r--r--core/test/app.go6
-rw-r--r--core/test/fake-transport.go17
-rw-r--r--core/test/interface.go17
-rw-r--r--core/test/network.go5
-rw-r--r--core/test/revealer.go83
-rw-r--r--core/test/revealer_test.go60
-rw-r--r--core/test/stopper.go4
-rw-r--r--core/total-ordering-syncer_test.go2
-rw-r--r--core/total-ordering.go17
-rw-r--r--core/total-ordering_test.go28
-rw-r--r--core/utils.go45
-rw-r--r--integration_test/consensus_test.go213
-rw-r--r--integration_test/stats.go19
-rw-r--r--integration_test/stats_test.go17
-rw-r--r--integration_test/utils.go17
19 files changed, 1291 insertions, 127 deletions
diff --git a/core/consensus.go b/core/consensus.go
index af40417..0d1955d 100644
--- a/core/consensus.go
+++ b/core/consensus.go
@@ -639,9 +639,9 @@ func (con *Consensus) initialRound(
// Stop the Consensus core.
func (con *Consensus) Stop() {
+ con.ctxCancel()
con.baMgr.stop()
con.event.Reset()
- con.ctxCancel()
}
func (con *Consensus) processMsg(msgChan <-chan interface{}) {
@@ -761,44 +761,9 @@ func (con *Consensus) ProcessVote(vote *types.Vote) (err error) {
func (con *Consensus) ProcessAgreementResult(
rand *types.AgreementResult) error {
// Sanity Check.
- notarySet, err := con.nodeSetCache.GetNotarySet(
- rand.Position.Round, rand.Position.ChainID)
- if err != nil {
+ if err := VerifyAgreementResult(rand, con.nodeSetCache); err != nil {
return err
}
- if len(rand.Votes) < len(notarySet)/3*2+1 {
- return ErrNotEnoughVotes
- }
- if len(rand.Votes) > len(notarySet) {
- return ErrIncorrectVoteProposer
- }
- for _, vote := range rand.Votes {
- if rand.IsEmptyBlock {
- if (vote.BlockHash != common.Hash{}) {
- return ErrIncorrectVoteBlockHash
- }
- } else {
- if vote.BlockHash != rand.BlockHash {
- return ErrIncorrectVoteBlockHash
- }
- }
- if vote.Type != types.VoteCom {
- return ErrIncorrectVoteType
- }
- if vote.Position != rand.Position {
- return ErrIncorrectVotePosition
- }
- if _, exist := notarySet[vote.ProposerID]; !exist {
- return ErrIncorrectVoteProposer
- }
- ok, err := verifyVoteSignature(&vote)
- if err != nil {
- return err
- }
- if !ok {
- return ErrIncorrectVoteSignature
- }
- }
// Syncing BA Module.
if err := con.baMgr.processAgreementResult(rand); err != nil {
return err
diff --git a/core/lattice.go b/core/lattice.go
index f94ca5a..e578e3f 100644
--- a/core/lattice.go
+++ b/core/lattice.go
@@ -39,7 +39,6 @@ type Lattice struct {
app Application
debug Debug
pool blockPool
- retryAdd bool
data *latticeData
toModule *totalOrdering
ctModule *consensusTimestamp
@@ -64,7 +63,7 @@ func NewLattice(
debug: debug,
pool: newBlockPool(cfg.NumChains),
data: newLatticeData(db, dMoment, round, cfg),
- toModule: newTotalOrdering(dMoment, cfg),
+ toModule: newTotalOrdering(dMoment, round, cfg),
ctModule: newConsensusTimestamp(dMoment, round, cfg.NumChains),
logger: logger,
}
@@ -306,4 +305,31 @@ func (l *Lattice) AppendConfig(round uint64, config *types.Config) (err error) {
// ProcessFinalizedBlock is used for syncing lattice data.
func (l *Lattice) ProcessFinalizedBlock(b *types.Block) {
+ l.lock.Lock()
+ defer l.lock.Unlock()
+ // Syncing state for core.latticeData module.
+ if err := l.data.addFinalizedBlock(b); err != nil {
+ panic(err)
+ }
+ l.pool.purgeBlocks(b.Position.ChainID, b.Position.Height)
+ // Syncing state for core.totalOrdering module.
+ toDelivered, deliveredMode, err := l.toModule.processBlock(b)
+ if err != nil {
+ panic(err)
+ }
+ if len(toDelivered) == 0 {
+ return
+ }
+ hashes := make(common.Hashes, len(toDelivered))
+ for idx := range toDelivered {
+ hashes[idx] = toDelivered[idx].Hash
+ }
+ if l.debug != nil {
+ l.debug.TotalOrderingDelivered(hashes, deliveredMode)
+ }
+ // Sync core.consensusTimestamp module.
+ if err = l.ctModule.processBlocks(toDelivered); err != nil {
+ panic(err)
+ }
+ return
}
diff --git a/core/syncer/agreement.go b/core/syncer/agreement.go
new file mode 100644
index 0000000..89b8c8d
--- /dev/null
+++ b/core/syncer/agreement.go
@@ -0,0 +1,177 @@
+// Copyright 2018 The dexon-consensus-core Authors
+// This file is part of the dexon-consensus-core library.
+//
+// The dexon-consensus-core library is free software: you can redistribute it
+// and/or modify it under the terms of the GNU Lesser General Public License as
+// published by the Free Software Foundation, either version 3 of the License,
+// or (at your option) any later version.
+//
+// The dexon-consensus-core library is distributed in the hope that it will be
+// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
+// General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the dexon-consensus-core library. If not, see
+// <http://www.gnu.org/licenses/>.
+
+package syncer
+
+import (
+ "sync"
+
+ "github.com/dexon-foundation/dexon-consensus/common"
+ "github.com/dexon-foundation/dexon-consensus/core"
+ "github.com/dexon-foundation/dexon-consensus/core/types"
+ "github.com/dexon-foundation/dexon-consensus/core/utils"
+)
+
+// Struct agreement implements struct of BA (Byzantine Agreement) protocol
+// needed in syncer, which only receives agreement results.
+type agreement struct {
+ wg *sync.WaitGroup
+ cache *utils.NodeSetCache
+ inputChan chan interface{}
+ outputChan chan<- *types.Block
+ pullChan chan<- common.Hash
+ blocks map[types.Position]map[common.Hash]*types.Block
+ agreementResults map[common.Hash]struct{}
+ latestCRSRound uint64
+ pendings map[uint64]map[common.Hash]*types.AgreementResult
+ logger common.Logger
+ confirmedBlocks map[common.Hash]struct{}
+}
+
+// newAgreement creates a new agreement instance.
+func newAgreement(
+ ch chan<- *types.Block,
+ pullChan chan<- common.Hash,
+ cache *utils.NodeSetCache,
+ wg *sync.WaitGroup,
+ logger common.Logger) *agreement {
+
+ return &agreement{
+ cache: cache,
+ wg: wg,
+ inputChan: make(chan interface{}, 1000),
+ outputChan: ch,
+ pullChan: pullChan,
+ blocks: make(map[types.Position]map[common.Hash]*types.Block),
+ agreementResults: make(map[common.Hash]struct{}),
+ logger: logger,
+ pendings: make(
+ map[uint64]map[common.Hash]*types.AgreementResult),
+ confirmedBlocks: make(map[common.Hash]struct{}),
+ }
+}
+
+// run starts the agreement, this does not start a new routine, go a new
+// routine explicitly in the caller.
+func (a *agreement) run() {
+ a.wg.Add(1)
+ defer a.wg.Done()
+ for {
+ select {
+ case val, ok := <-a.inputChan:
+ if !ok {
+ // InputChan is closed by network when network ends.
+ return
+ }
+ switch v := val.(type) {
+ case *types.Block:
+ a.processBlock(v)
+ case *types.AgreementResult:
+ a.processAgreementResult(v)
+ case uint64:
+ a.processNewCRS(v)
+ }
+ }
+ }
+}
+
+func (a *agreement) processBlock(b *types.Block) {
+ if _, exist := a.confirmedBlocks[b.Hash]; exist {
+ return
+ }
+ if _, exist := a.agreementResults[b.Hash]; exist {
+ a.confirm(b)
+ } else {
+ if _, exist := a.blocks[b.Position]; !exist {
+ a.blocks[b.Position] = make(map[common.Hash]*types.Block)
+ }
+ a.blocks[b.Position][b.Hash] = b
+ }
+}
+
+func (a *agreement) processAgreementResult(r *types.AgreementResult) {
+ // Cache those results that CRS is not ready yet.
+ if r.Position.Round > a.latestCRSRound {
+ pendingsForRound, exists := a.pendings[r.Position.Round]
+ if !exists {
+ pendingsForRound = make(map[common.Hash]*types.AgreementResult)
+ a.pendings[r.Position.Round] = pendingsForRound
+ }
+ pendingsForRound[r.BlockHash] = r
+ a.logger.Info("agreement result cached", "result", r)
+ return
+ }
+ if err := core.VerifyAgreementResult(r, a.cache); err != nil {
+ a.logger.Error("agreement result verification failed",
+ "result", r,
+ "error", err)
+ return
+ }
+ if r.IsEmptyBlock {
+ // Empty block is also confirmed.
+ b := &types.Block{
+ Position: r.Position,
+ }
+ a.confirm(b)
+ } else {
+ needPull := true
+ if bs, exist := a.blocks[r.Position]; exist {
+ if b, exist := bs[r.BlockHash]; exist {
+ a.confirm(b)
+ needPull = false
+ }
+ }
+ if needPull {
+ a.agreementResults[r.BlockHash] = struct{}{}
+ a.pullChan <- r.BlockHash
+ }
+ }
+}
+
+func (a *agreement) processNewCRS(round uint64) {
+ if round <= a.latestCRSRound {
+ return
+ }
+ // Verify all pending results.
+ for r := a.latestCRSRound + 1; r <= round; r++ {
+ pendingsForRound := a.pendings[r]
+ if pendingsForRound == nil {
+ continue
+ }
+ delete(a.pendings, r)
+ for _, res := range pendingsForRound {
+ if err := core.VerifyAgreementResult(res, a.cache); err != nil {
+ a.logger.Error("invalid agreement result", "result", res)
+ continue
+ }
+ a.logger.Error("flush agreement result", "result", res)
+ a.processAgreementResult(res)
+ break
+ }
+ }
+ a.latestCRSRound = round
+}
+
+// confirm notifies consensus the confirmation of a block in BA.
+func (a *agreement) confirm(b *types.Block) {
+ if _, exist := a.confirmedBlocks[b.Hash]; !exist {
+ delete(a.blocks, b.Position)
+ delete(a.agreementResults, b.Hash)
+ a.outputChan <- b
+ a.confirmedBlocks[b.Hash] = struct{}{}
+ }
+}
diff --git a/core/syncer/consensus.go b/core/syncer/consensus.go
new file mode 100644
index 0000000..d84f168
--- /dev/null
+++ b/core/syncer/consensus.go
@@ -0,0 +1,622 @@
+// Copyright 2018 The dexon-consensus Authors
+// This file is part of the dexon-consensus library.
+//
+// The dexon-consensus library is free software: you can redistribute it
+// and/or modify it under the terms of the GNU Lesser General Public License as
+// published by the Free Software Foundation, either version 3 of the License,
+// or (at your option) any later version.
+//
+// The dexon-consensus library is distributed in the hope that it will be
+// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
+// General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the dexon-consensus library. If not, see
+// <http://www.gnu.org/licenses/>.
+
+package syncer
+
+import (
+ "context"
+ "fmt"
+ "sort"
+ "sync"
+ "time"
+
+ "github.com/dexon-foundation/dexon-consensus/common"
+ "github.com/dexon-foundation/dexon-consensus/core"
+ "github.com/dexon-foundation/dexon-consensus/core/blockdb"
+ "github.com/dexon-foundation/dexon-consensus/core/crypto"
+ "github.com/dexon-foundation/dexon-consensus/core/types"
+ "github.com/dexon-foundation/dexon-consensus/core/utils"
+)
+
+var (
+ // ErrAlreadySynced is reported when syncer is synced.
+ ErrAlreadySynced = fmt.Errorf("already synced")
+ // ErrGenesisBlockReached is reported when genesis block reached.
+ ErrGenesisBlockReached = fmt.Errorf("genesis block reached")
+ // ErrInvalidBlockOrder is reported when SyncBlocks receives unordered blocks.
+ ErrInvalidBlockOrder = fmt.Errorf("invalid block order")
+)
+
+// Consensus is for syncing consensus module.
+type Consensus struct {
+ db blockdb.BlockDatabase
+ gov core.Governance
+ dMoment time.Time
+ logger common.Logger
+ app core.Application
+ prv crypto.PrivateKey
+ network core.Network
+ nodeSetCache *utils.NodeSetCache
+
+ lattice *core.Lattice
+ latticeLastRound uint64
+ blocks []types.ByPosition
+ agreements []*agreement
+ configs []*types.Config
+ roundBeginTimes []time.Time
+ agreementRoundCut uint64
+
+ // lock for accessing all fields.
+ lock sync.RWMutex
+ moduleWaitGroup sync.WaitGroup
+ agreementWaitGroup sync.WaitGroup
+ pullChan chan common.Hash
+ receiveChan chan *types.Block
+ ctx context.Context
+ ctxCancel context.CancelFunc
+ isSynced bool
+}
+
+// NewConsensus creates an instance for Consensus (syncer consensus).
+func NewConsensus(
+ dMoment time.Time,
+ app core.Application,
+ gov core.Governance,
+ db blockdb.BlockDatabase,
+ network core.Network,
+ prv crypto.PrivateKey,
+ logger common.Logger) *Consensus {
+
+ con := &Consensus{
+ dMoment: dMoment,
+ app: app,
+ gov: gov,
+ db: db,
+ network: network,
+ nodeSetCache: utils.NewNodeSetCache(gov),
+ prv: prv,
+ logger: logger,
+ isSynced: false,
+ configs: []*types.Config{gov.Configuration(0)},
+ roundBeginTimes: []time.Time{dMoment},
+ receiveChan: make(chan *types.Block, 1000),
+ pullChan: make(chan common.Hash, 1000),
+ }
+ con.ctx, con.ctxCancel = context.WithCancel(context.Background())
+ return con
+}
+
+func (con *Consensus) initConsensusObj(initBlock *types.Block) {
+ var cfg *types.Config
+ func() {
+ con.lock.Lock()
+ defer con.lock.Unlock()
+ con.latticeLastRound = initBlock.Position.Round
+ cfg = con.configs[con.latticeLastRound]
+ debugApp, _ := con.app.(core.Debug)
+ con.lattice = core.NewLattice(
+ con.roundBeginTimes[con.latticeLastRound],
+ con.latticeLastRound,
+ cfg,
+ core.NewAuthenticator(con.prv),
+ con.app,
+ debugApp,
+ con.db,
+ con.logger,
+ )
+ }()
+ con.startAgreement(cfg.NumChains)
+ con.startNetwork()
+ con.startCRSMonitor()
+}
+
+func (con *Consensus) checkIfSynced(blocks []*types.Block) {
+ var (
+ numChains = con.configs[con.blocks[0][0].Position.Round].NumChains
+ compactionTips = make([]*types.Block, numChains)
+ overlapCount = uint32(0)
+ )
+ // Find tips (newset blocks) of each chain in compaction chain.
+ b := blocks[len(blocks)-1]
+ for tipCount := uint32(0); tipCount < numChains; {
+ if compactionTips[b.Position.ChainID] == nil {
+ // Check chainID for config change.
+ if b.Position.ChainID < numChains {
+ compactionTips[b.Position.ChainID] = b
+ tipCount++
+ }
+ }
+ if (b.Finalization.ParentHash == common.Hash{}) {
+ return
+ }
+ b1, err := con.db.Get(b.Finalization.ParentHash)
+ if err != nil {
+ panic(err)
+ }
+ b = &b1
+ }
+ // Check if chain tips of compaction chain and current cached confirmed
+ // blocks are overlapped on each chain, numChains is decided by the round
+ // of last block we seen on compaction chain.
+ con.lock.RLock()
+ defer con.lock.RUnlock()
+ for chainID, b := range compactionTips {
+ if len(con.blocks[chainID]) > 0 {
+ if !b.Position.Older(&con.blocks[chainID][0].Position) {
+ overlapCount++
+ }
+ }
+ }
+ if overlapCount == numChains {
+ con.isSynced = true
+ } else {
+ con.logger.Info("not overlap yet",
+ "overlap-count", overlapCount,
+ "num-chain", numChains,
+ "last-block", blocks[len(blocks)-1])
+ }
+}
+
+// ensureAgreementOverlapRound ensures the oldest blocks in each chain in
+// con.blocks are all in the same round, for avoiding config change while
+// syncing.
+func (con *Consensus) ensureAgreementOverlapRound() bool {
+ if con.agreementRoundCut > 0 {
+ return true
+ }
+ con.lock.Lock()
+ defer con.lock.Unlock()
+ // Clean empty blocks on tips of chains.
+ for idx, bs := range con.blocks {
+ for len(bs) > 0 && con.isEmptyBlock(bs[0]) {
+ bs = bs[1:]
+ }
+ con.blocks[idx] = bs
+ }
+ // Build empty blocks.
+ for _, bs := range con.blocks {
+ for i := range bs {
+ if con.isEmptyBlock(bs[i]) {
+ if bs[i-1].Position.Height == bs[i].Position.Height-1 {
+ con.buildEmptyBlock(bs[i], bs[i-1])
+ }
+ }
+ }
+ }
+ var tipRoundMap map[uint64]uint32
+ for {
+ tipRoundMap = make(map[uint64]uint32)
+ for _, bs := range con.blocks {
+ if len(bs) > 0 {
+ tipRoundMap[bs[0].Position.Round]++
+ }
+ }
+ if len(tipRoundMap) <= 1 {
+ break
+ }
+ // Make all tips in same round.
+ var maxRound uint64
+ for r := range tipRoundMap {
+ if r > maxRound {
+ maxRound = r
+ }
+ }
+ for idx, bs := range con.blocks {
+ for len(bs) > 0 && bs[0].Position.Round < maxRound {
+ bs = bs[1:]
+ }
+ con.blocks[idx] = bs
+ }
+ }
+ if len(tipRoundMap) == 1 {
+ var r uint64
+ for r = range tipRoundMap {
+ break
+ }
+ if tipRoundMap[r] == con.configs[r].NumChains {
+ con.agreementRoundCut = r
+ con.logger.Info("agreement round cut found, round", r)
+ return true
+ }
+ }
+ return false
+}
+
+func (con *Consensus) findLatticeSyncBlock(
+ blocks []*types.Block) (*types.Block, error) {
+ var lastBlock = blocks[len(blocks)-1]
+ for {
+ // Find round r which r-1, r, r+1 are all in same total ordering config.
+ round := lastBlock.Position.Round
+ for {
+ sameAsPrevRound := round == 0 || !con.isConfigChanged(
+ con.configs[round-1], con.configs[round])
+ sameAsNextRound := !con.isConfigChanged(
+ con.configs[round], con.configs[round+1])
+ if sameAsPrevRound && sameAsNextRound {
+ break
+ }
+ if round == 0 {
+ // Unable to find a safe round, wait for new rounds.
+ return nil, nil
+ }
+ round--
+ }
+ // Find the oldest block which round is "round".
+ for lastBlock.Position.Round != round {
+ if (lastBlock.Finalization.ParentHash == common.Hash{}) {
+ return nil, ErrGenesisBlockReached
+ }
+ b, err := con.db.Get(lastBlock.Finalization.ParentHash)
+ if err != nil {
+ return nil, err
+ }
+ lastBlock = &b
+ }
+ // Find the deliver set by hash for two times. Blocks in a deliver set
+ // returned by total ordering is sorted by hash. If a block's parent hash
+ // is greater than its hash means there is a cut between deliver sets.
+ var curBlock, prevBlock *types.Block
+ var deliverSetFirstBlock, deliverSetLastBlock *types.Block
+ curBlock = lastBlock
+ for {
+ if (curBlock.Finalization.ParentHash == common.Hash{}) {
+ return nil, ErrGenesisBlockReached
+ }
+ b, err := con.db.Get(curBlock.Finalization.ParentHash)
+ if err != nil {
+ return nil, err
+ }
+ prevBlock = &b
+ if !prevBlock.Hash.Less(curBlock.Hash) {
+ break
+ }
+ curBlock = prevBlock
+ }
+ deliverSetLastBlock = prevBlock
+ curBlock = prevBlock
+ for {
+ if (curBlock.Finalization.ParentHash == common.Hash{}) {
+ break
+ }
+ b, err := con.db.Get(curBlock.Finalization.ParentHash)
+ if err != nil {
+ return nil, err
+ }
+ prevBlock = &b
+ if !prevBlock.Hash.Less(curBlock.Hash) {
+ break
+ }
+ curBlock = prevBlock
+ }
+ deliverSetFirstBlock = curBlock
+ // Check if all blocks from deliverSetFirstBlock to deliverSetLastBlock
+ // are in the same round.
+ ok := true
+ curBlock = deliverSetLastBlock
+ for {
+ if curBlock.Position.Round != round {
+ ok = false
+ break
+ }
+ b, err := con.db.Get(curBlock.Finalization.ParentHash)
+ if err != nil {
+ return nil, err
+ }
+ curBlock = &b
+ if curBlock.Hash == deliverSetFirstBlock.Hash {
+ break
+ }
+ }
+ if ok {
+ return deliverSetFirstBlock, nil
+ }
+ if round == 0 {
+ return nil, nil
+ }
+ round--
+ }
+}
+
+// SyncBlocks syncs blocks from compaction chain, latest is true if the caller
+// regards the blocks are the latest ones. Notice that latest can be true for
+// many times.
+// NOTICE: parameter "blocks" should be consecutive in compaction height.
+func (con *Consensus) SyncBlocks(
+ blocks []*types.Block, latest bool) (*core.Consensus, error) {
+ if con.isSynced {
+ return nil, ErrAlreadySynced
+ }
+ if len(blocks) == 0 {
+ return nil, nil
+ }
+ // Check if blocks are consecutive.
+ for i := 1; i < len(blocks); i++ {
+ if blocks[i].Finalization.Height != blocks[i-1].Finalization.Height+1 {
+ return nil, ErrInvalidBlockOrder
+ }
+ }
+ con.logger.Info("syncBlocks",
+ "position", &blocks[0].Position,
+ "final height", blocks[0].Finalization.Height,
+ "len", len(blocks),
+ "latest", latest,
+ )
+ con.setupConfigs(blocks)
+ for _, b := range blocks {
+ // TODO(haoping) remove this if lattice puts blocks into db.
+ if err := con.db.Put(*b); err != nil {
+ if err != blockdb.ErrBlockExists {
+ return nil, err
+ }
+ }
+ if con.lattice != nil {
+ con.lattice.ProcessFinalizedBlock(b)
+ }
+ }
+ if latest && con.lattice == nil {
+ // New Lattice and find the deliver set of total ordering when "latest" is
+ // true for first time. Deliver set is found by block hashes.
+ syncBlock, err := con.findLatticeSyncBlock(blocks)
+ if err != nil {
+ return nil, err
+ }
+ if syncBlock != nil {
+ con.logger.Info("deliver set found", syncBlock)
+ // New lattice with the round of syncBlock.
+ con.initConsensusObj(syncBlock)
+ // Process blocks from syncBlock to blocks' last block.
+ b := blocks[len(blocks)-1]
+ blocksCount := b.Finalization.Height - syncBlock.Finalization.Height + 1
+ blocksToProcess := make([]*types.Block, blocksCount)
+ for {
+ blocksToProcess[blocksCount-1] = b
+ blocksCount--
+ if b.Hash == syncBlock.Hash {
+ break
+ }
+ b1, err := con.db.Get(b.Finalization.ParentHash)
+ if err != nil {
+ return nil, err
+ }
+ b = &b1
+ }
+ for _, b := range blocksToProcess {
+ con.lattice.ProcessFinalizedBlock(b)
+ }
+ }
+ }
+ if latest && con.ensureAgreementOverlapRound() {
+ // Check if compaction and agreements' blocks are overlapped. The
+ // overlapping of compaction chain and BA's oldest blocks means the
+ // syncing is done.
+ con.checkIfSynced(blocks)
+ }
+ if con.isSynced {
+ // Stop network and CRS routines, wait until they are all stoped.
+ con.ctxCancel()
+ con.moduleWaitGroup.Wait()
+ // Stop agreements.
+ con.stopAgreement()
+ // TODO: flush all blocks in con.blocks into core.Consensus, and build
+ // core.Consensus from syncer.
+ con.logger.Info("syncer.Consensus synced")
+ return &core.Consensus{}, nil
+ }
+ return nil, nil
+}
+
+// isEmptyBlock checks if a block is an empty block by both its hash and parent
+// hash are empty.
+func (con *Consensus) isEmptyBlock(b *types.Block) bool {
+ return b.Hash == common.Hash{} && b.ParentHash == common.Hash{}
+}
+
+// buildEmptyBlock builds an empty block in agreement.
+func (con *Consensus) buildEmptyBlock(b *types.Block, parent *types.Block) {
+ cfg := con.configs[b.Position.Round]
+ b.Timestamp = parent.Timestamp.Add(cfg.MinBlockInterval)
+ b.Witness.Height = parent.Witness.Height
+ b.Witness.Data = make([]byte, len(parent.Witness.Data))
+ copy(b.Witness.Data, parent.Witness.Data)
+ b.Acks = common.NewSortedHashes(common.Hashes{parent.Hash})
+}
+
+// setupConfigs is called by SyncBlocks with blocks from compaction chain. In
+// the first time, setupConfigs setups from round 0.
+func (con *Consensus) setupConfigs(blocks []*types.Block) {
+ // Find max round in blocks.
+ var maxRound uint64
+ for _, b := range blocks {
+ if b.Position.Round > maxRound {
+ maxRound = b.Position.Round
+ }
+ }
+ // Get configs from governance.
+ untilRound := maxRound + core.ConfigRoundShift
+ curMaxNumChains := uint32(0)
+ func() {
+ con.lock.Lock()
+ defer con.lock.Unlock()
+ for r := uint64(len(con.configs)); r <= untilRound; r++ {
+ cfg := con.gov.Configuration(r)
+ if cfg == nil {
+ panic(fmt.Errorf(
+ "unable to get config for round: %v (syncer)", r))
+ }
+ con.configs = append(con.configs, cfg)
+ con.roundBeginTimes = append(
+ con.roundBeginTimes,
+ con.roundBeginTimes[r-1].Add(con.configs[r-1].RoundInterval))
+ if cfg.NumChains >= curMaxNumChains {
+ curMaxNumChains = cfg.NumChains
+ }
+ }
+ }()
+ con.resizeByNumChains(curMaxNumChains)
+ // Notify core.Lattice for new configs.
+ if con.lattice != nil {
+ for con.latticeLastRound+1 < uint64(len(con.configs)) {
+ con.latticeLastRound++
+ if err := con.lattice.AppendConfig(
+ con.latticeLastRound,
+ con.configs[con.latticeLastRound]); err != nil {
+ panic(err)
+ }
+ }
+ }
+}
+
+// resizeByNumChains resizes fake lattice and agreement if numChains increases.
+// Notice the decreasing case is neglected.
+func (con *Consensus) resizeByNumChains(numChains uint32) {
+ con.lock.Lock()
+ defer con.lock.Unlock()
+ if numChains > uint32(len(con.blocks)) {
+ for i := uint32(len(con.blocks)); i < numChains; i++ {
+ // Resize the pool of blocks.
+ con.blocks = append(con.blocks, types.ByPosition{})
+ // Resize agreement modules.
+ a := newAgreement(con.receiveChan, con.pullChan, con.nodeSetCache,
+ &con.agreementWaitGroup, con.logger)
+ con.agreements = append(con.agreements, a)
+ go a.run()
+ }
+ }
+}
+
+// startAgreement starts agreements for receiving votes and agreements.
+func (con *Consensus) startAgreement(numChains uint32) {
+ // Start a routine for listening receive channel and pull block channel.
+ go func() {
+ for {
+ select {
+ case b, ok := <-con.receiveChan:
+ if !ok {
+ return
+ }
+ chainID := b.Position.ChainID
+ func() {
+ con.lock.Lock()
+ defer con.lock.Unlock()
+ // If round is cut in agreements, do not add blocks with round less
+ // then cut round.
+ if b.Position.Round < con.agreementRoundCut {
+ return
+ }
+ con.blocks[chainID] = append(con.blocks[chainID], b)
+ sort.Sort(con.blocks[chainID])
+ }()
+ case h, ok := <-con.pullChan:
+ if !ok {
+ return
+ }
+ con.network.PullBlocks(common.Hashes{h})
+ }
+ }
+ }()
+}
+
+// startNetwork starts network for receiving blocks and agreement results.
+func (con *Consensus) startNetwork() {
+ go func() {
+ con.moduleWaitGroup.Add(1)
+ defer con.moduleWaitGroup.Done()
+ Loop:
+ for {
+ select {
+ case val := <-con.network.ReceiveChan():
+ var pos types.Position
+ switch v := val.(type) {
+ case *types.Block:
+ pos = v.Position
+ case *types.AgreementResult:
+ pos = v.Position
+ default:
+ continue Loop
+ }
+ func() {
+ con.lock.RLock()
+ defer con.lock.RUnlock()
+ if pos.ChainID >= uint32(len(con.agreements)) {
+ con.logger.Error("Unknown chainID message received (syncer)",
+ "position", &pos)
+ }
+ }()
+ con.agreements[pos.ChainID].inputChan <- val
+ case <-con.ctx.Done():
+ return
+ }
+ }
+ }()
+}
+
+// startCRSMonitor is the dummiest way to verify if the CRS for one round
+// is ready or not.
+func (con *Consensus) startCRSMonitor() {
+ var lastNotifiedRound uint64
+ // Notify all agreements for new CRS.
+ notifyNewCRS := func(round uint64) {
+ if round == lastNotifiedRound {
+ return
+ }
+ con.logger.Info("CRS is ready", "round", round)
+ con.lock.RLock()
+ defer con.lock.RUnlock()
+ lastNotifiedRound = round
+ for _, a := range con.agreements {
+ a.inputChan <- round
+ }
+ }
+ go func() {
+ con.moduleWaitGroup.Add(1)
+ defer con.moduleWaitGroup.Done()
+ for {
+ select {
+ case <-con.ctx.Done():
+ return
+ case <-time.After(1 * time.Second):
+ // Notify agreement modules for the latest round that CRS is
+ // available if the round is not notified yet.
+ var crsRound = lastNotifiedRound
+ for (con.gov.CRS(crsRound+1) != common.Hash{}) {
+ crsRound++
+ }
+ notifyNewCRS(crsRound)
+ }
+ }
+ }()
+}
+
+func (con *Consensus) stopAgreement() {
+ func() {
+ con.lock.Lock()
+ defer con.lock.Unlock()
+ for _, a := range con.agreements {
+ close(a.inputChan)
+ }
+ }()
+ con.agreementWaitGroup.Wait()
+ close(con.receiveChan)
+ close(con.pullChan)
+}
+
+func (con *Consensus) isConfigChanged(prev, cur *types.Config) bool {
+ return prev.K != cur.K ||
+ prev.NumChains != cur.NumChains ||
+ prev.PhiRatio != cur.PhiRatio
+}
diff --git a/core/test/app.go b/core/test/app.go
index 327555b..9f030e9 100644
--- a/core/test/app.go
+++ b/core/test/app.go
@@ -280,8 +280,8 @@ Loop:
return nil
}
-// Check provides a backdoor to check status of App with reader lock.
-func (app *App) Check(checker func(*App)) {
+// WithLock provides a backdoor to check status of App with reader lock.
+func (app *App) WithLock(function func(*App)) {
app.confirmedLock.RLock()
defer app.confirmedLock.RUnlock()
app.totalOrderedLock.RLock()
@@ -289,5 +289,5 @@ func (app *App) Check(checker func(*App)) {
app.deliveredLock.RLock()
defer app.deliveredLock.RUnlock()
- checker(app)
+ function(app)
}
diff --git a/core/test/fake-transport.go b/core/test/fake-transport.go
index a783ac9..388d0aa 100644
--- a/core/test/fake-transport.go
+++ b/core/test/fake-transport.go
@@ -15,23 +15,6 @@
// along with the dexon-consensus library. If not, see
// <http://www.gnu.org/licenses/>.
-// Copyright 2018 The dexon-consensus Authors
-// This file is part of the dexon-consensus library.
-//
-// The dexon-consensus library is free software: you can redistribute it and/or
-// modify it under the terms of the GNU Lesser General Public License as
-// published by the Free Software Foundation, either version 3 of the License,
-// or (at your option) any later version.
-//
-// The dexon-consensus library is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Lesser General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with the dexon-consensus library. If not, see
-// <http://www.gnu.org/licenses/>.
-
package test
import (
diff --git a/core/test/interface.go b/core/test/interface.go
index e7a8f70..dcecee8 100644
--- a/core/test/interface.go
+++ b/core/test/interface.go
@@ -15,23 +15,6 @@
// along with the dexon-consensus library. If not, see
// <http://www.gnu.org/licenses/>.
-// Copyright 2018 The dexon-consensus Authors
-// This file is part of the dexon-consensus library.
-//
-// The dexon-consensus library is free software: you can redistribute it
-// and/or modify it under the terms of the GNU Lesser General Public License as
-// published by the Free Software Foundation, either version 3 of the License,
-// or (at your option) any later version.
-//
-// The dexon-consensus library is distributed in the hope that it will be
-// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Lesser General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with the dexon-consensus library. If not, see
-// <http://www.gnu.org/licenses/>.
-
package test
import (
diff --git a/core/test/network.go b/core/test/network.go
index 8bcb050..207d8a4 100644
--- a/core/test/network.go
+++ b/core/test/network.go
@@ -437,6 +437,11 @@ func (n *Network) Broadcast(msg interface{}) {
}
}
+// Send exports 'Send' method of Transport.
+func (n *Network) Send(nodeID types.NodeID, msg interface{}) error {
+ return n.trans.Send(nodeID, msg)
+}
+
// ReceiveChanForNode returns a channel for messages not handled by
// core.Consensus.
func (n *Network) ReceiveChanForNode() <-chan interface{} {
diff --git a/core/test/revealer.go b/core/test/revealer.go
index b39a898..dcd75a5 100644
--- a/core/test/revealer.go
+++ b/core/test/revealer.go
@@ -15,26 +15,10 @@
// along with the dexon-consensus library. If not, see
// <http://www.gnu.org/licenses/>.
-// Copyright 2018 The dexon-consensus Authors
-// This file is part of the dexon-consensus library.
-//
-// The dexon-consensus library is free software: you can redistribute it and/or
-// modify it under the terms of the GNU Lesser General Public License as
-// published by the Free Software Foundation, either version 3 of the License,
-// or (at your option) any later version.
-//
-// The dexon-consensus library is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Lesser General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with the dexon-consensus library. If not, see
-// <http://www.gnu.org/licenses/>.
-
package test
import (
+ "errors"
"math/rand"
"sort"
"time"
@@ -44,6 +28,11 @@ import (
"github.com/dexon-foundation/dexon-consensus/core/types"
)
+// Errors returns from revealer.
+var (
+ ErrNotValidCompactionChain = errors.New("not valid compaction chain")
+)
+
// isAllAckingBlockRevealed is a helper to check if all acking blocks of
// one block are revealed.
func isAllAckingBlockRevealed(
@@ -280,3 +269,63 @@ func (r *RandomTipRevealer) Reset() {
r.chainTip[i] = 0
}
}
+
+// CompactionChainRevealer implements Revealer interface, which would load
+// all blocks from blockdb, reveal them in the order of compaction chain, from
+// the genesis block to the latest one.
+type CompactionChainRevealer struct {
+ blocks types.ByFinalizationHeight
+ nextRevealIndex int
+}
+
+// NewCompactionChainRevealer constructs a revealer in the order of compaction
+// chain.
+func NewCompactionChainRevealer(iter blockdb.BlockIterator,
+ startHeight uint64) (r *CompactionChainRevealer, err error) {
+ blocksByHash, err := loadAllBlocks(iter)
+ if err != nil {
+ return
+ }
+ if startHeight == 0 {
+ startHeight = 1
+ }
+ blocks := types.ByFinalizationHeight{}
+ for _, b := range blocksByHash {
+ if b.Finalization.Height < startHeight {
+ continue
+ }
+ blocks = append(blocks, b)
+ }
+ sort.Sort(types.ByFinalizationHeight(blocks))
+ // Make sure the finalization height of blocks are incremental with step 1.
+ for idx, b := range blocks {
+ if idx == 0 {
+ continue
+ }
+ if b.Finalization.Height != blocks[idx-1].Finalization.Height+1 {
+ err = ErrNotValidCompactionChain
+ return
+ }
+ }
+ r = &CompactionChainRevealer{
+ blocks: blocks,
+ }
+ r.Reset()
+ return
+}
+
+// Next implements Revealer.Next method, which would reveal blocks in the order
+// of compaction chain.
+func (r *CompactionChainRevealer) Next() (types.Block, error) {
+ if r.nextRevealIndex == len(r.blocks) {
+ return types.Block{}, blockdb.ErrIterationFinished
+ }
+ b := r.blocks[r.nextRevealIndex]
+ r.nextRevealIndex++
+ return *b, nil
+}
+
+// Reset implement Revealer.Reset method, which would reset revealing.
+func (r *CompactionChainRevealer) Reset() {
+ r.nextRevealIndex = 0
+}
diff --git a/core/test/revealer_test.go b/core/test/revealer_test.go
index d9920c7..5a1bc07 100644
--- a/core/test/revealer_test.go
+++ b/core/test/revealer_test.go
@@ -157,6 +157,66 @@ func (s *RevealerTestSuite) TestRandomTipReveal() {
s.baseTest(revealer, 10, checkFunc)
}
+func (s *RevealerTestSuite) TestCompactionChainReveal() {
+ db, err := blockdb.NewMemBackedBlockDB()
+ s.Require().NoError(err)
+ // Put several blocks with finalization field ready.
+ b1 := &types.Block{
+ Hash: common.NewRandomHash(),
+ Finalization: types.FinalizationResult{
+ Height: 1,
+ }}
+ b2 := &types.Block{
+ Hash: common.NewRandomHash(),
+ Finalization: types.FinalizationResult{
+ ParentHash: b1.Hash,
+ Height: 2,
+ }}
+ b3 := &types.Block{
+ Hash: common.NewRandomHash(),
+ Finalization: types.FinalizationResult{
+ ParentHash: b2.Hash,
+ Height: 3,
+ }}
+ s.Require().NoError(db.Put(*b1))
+ s.Require().NoError(db.Put(*b3))
+ iter, err := db.GetAll()
+ s.Require().NoError(err)
+ // The compaction chain is not complete, we can't construct a revealer
+ // instance successfully.
+ r, err := NewCompactionChainRevealer(iter, 0)
+ s.Require().Nil(r)
+ s.Require().IsType(ErrNotValidCompactionChain, err)
+ // Put a block to make the compaction chain complete.
+ s.Require().NoError(db.Put(*b2))
+ // We can construct that revealer now.
+ iter, err = db.GetAll()
+ s.Require().NoError(err)
+ r, err = NewCompactionChainRevealer(iter, 0)
+ s.Require().NotNil(r)
+ s.Require().NoError(err)
+ // The revealing order should be ok.
+ chk := func(h uint64) {
+ b, err := r.Next()
+ s.Require().NoError(err)
+ s.Require().Equal(b.Finalization.Height, h)
+ }
+ chk(1)
+ chk(2)
+ chk(3)
+ // Iteration should be finished
+ _, err = r.Next()
+ s.Require().IsType(blockdb.ErrIterationFinished, err)
+ // Test 'startHeight' parameter.
+ iter, err = db.GetAll()
+ s.Require().NoError(err)
+ r, err = NewCompactionChainRevealer(iter, 2)
+ s.Require().NotNil(r)
+ s.Require().NoError(err)
+ chk(2)
+ chk(3)
+}
+
func TestRevealer(t *testing.T) {
suite.Run(t, new(RevealerTestSuite))
}
diff --git a/core/test/stopper.go b/core/test/stopper.go
index abc93c3..40868d8 100644
--- a/core/test/stopper.go
+++ b/core/test/stopper.go
@@ -61,7 +61,7 @@ func (s *StopByConfirmedBlocks) ShouldStop(nID types.NodeID) bool {
lastChecked := s.lastCheckDelivered[nID]
currentConfirmedBlocks := s.confirmedBlocks[nID]
db := s.dbs[nID]
- s.apps[nID].Check(func(app *App) {
+ s.apps[nID].WithLock(func(app *App) {
for _, h := range app.DeliverSequence[lastChecked:] {
b, err := db.Get(h)
if err != nil {
@@ -116,7 +116,7 @@ func (s *StopByRound) ShouldStop(nID types.NodeID) bool {
if curRound := s.currentRounds[nID]; curRound < s.untilRound {
lastChecked := s.lastCheckDelivered[nID]
db := s.dbs[nID]
- s.apps[nID].Check(func(app *App) {
+ s.apps[nID].WithLock(func(app *App) {
for _, h := range app.DeliverSequence[lastChecked:] {
b, err := db.Get(h)
if err != nil {
diff --git a/core/total-ordering-syncer_test.go b/core/total-ordering-syncer_test.go
index 5f9fd0f..92b5dac 100644
--- a/core/total-ordering-syncer_test.go
+++ b/core/total-ordering-syncer_test.go
@@ -44,7 +44,7 @@ func (s *TotalOrderingSyncerTestSuite) genDeliverySet(numChains uint32) (
RoundInterval: 1000 * time.Second,
}
- to := newTotalOrdering(genesisTime, genesisConfig)
+ to := newTotalOrdering(genesisTime, 0, genesisConfig)
gen := test.NewBlocksGenerator(&test.BlocksGeneratorConfig{
NumChains: numChains,
diff --git a/core/total-ordering.go b/core/total-ordering.go
index 90848ce..52f9270 100644
--- a/core/total-ordering.go
+++ b/core/total-ordering.go
@@ -44,9 +44,9 @@ const (
)
var (
- // ErrNotValidDAG would be reported when block subbmitted to totalOrdering
+ // ErrInvalidDAG is reported when block subbmitted to totalOrdering
// didn't form a DAG.
- ErrNotValidDAG = errors.New("not a valid dag")
+ ErrInvalidDAG = errors.New("invalid dag")
// ErrFutureRoundDelivered means some blocks from later rounds are
// delivered, this means program error.
ErrFutureRoundDelivered = errors.New("future round delivered")
@@ -347,7 +347,7 @@ func (v *totalOrderingCandidateInfo) addBlock(b *types.Block) error {
rec.count = 1
} else {
if b.Position.Height <= rec.minHeight {
- return ErrNotValidDAG
+ return ErrInvalidDAG
}
rec.count++
}
@@ -640,12 +640,12 @@ func (global *totalOrderingGlobalVector) addBlock(
if tip != nil {
// Perform light weight sanity check based on tip.
if tip.Position.Round > b.Position.Round {
- err = ErrNotValidDAG
+ err = ErrInvalidDAG
return
}
if DiffUint64(tip.Position.Round, b.Position.Round) > 1 {
if b.Position.Height != 0 {
- err = ErrNotValidDAG
+ err = ErrInvalidDAG
return
}
// Add breakpoint.
@@ -657,7 +657,7 @@ func (global *totalOrderingGlobalVector) addBlock(
})
} else {
if b.Position.Height != tip.Position.Height+1 {
- err = ErrNotValidDAG
+ err = ErrInvalidDAG
return
}
}
@@ -792,9 +792,9 @@ type totalOrdering struct {
}
// newTotalOrdering constructs an totalOrdering instance.
-func newTotalOrdering(dMoment time.Time, cfg *types.Config) *totalOrdering {
+func newTotalOrdering(dMoment time.Time, round uint64, cfg *types.Config) *totalOrdering {
config := &totalOrderingConfig{}
- config.fromConfig(0, cfg)
+ config.fromConfig(round, cfg)
config.setRoundBeginTime(dMoment)
candidates := make([]*totalOrderingCandidateInfo, config.numChains)
to := &totalOrdering{
@@ -816,6 +816,7 @@ func newTotalOrdering(dMoment time.Time, cfg *types.Config) *totalOrdering {
// round R, next time you can only add the config for round R+1.
func (to *totalOrdering) appendConfig(
round uint64, config *types.Config) error {
+
if round != uint64(len(to.configs))+to.configs[0].roundID {
return ErrRoundNotIncreasing
}
diff --git a/core/total-ordering_test.go b/core/total-ordering_test.go
index ac8956a..77bf21a 100644
--- a/core/total-ordering_test.go
+++ b/core/total-ordering_test.go
@@ -157,7 +157,7 @@ func (s *TotalOrderingTestSuite) TestBlockRelation() {
NumChains: uint32(len(nodes)),
}
genesisTime := time.Now().UTC()
- to := newTotalOrdering(genesisTime, genesisConfig)
+ to := newTotalOrdering(genesisTime, 0, genesisConfig)
s.checkNotDeliver(to, blockA)
s.checkNotDeliver(to, blockB)
s.checkNotDeliver(to, blockC)
@@ -345,7 +345,7 @@ func (s *TotalOrderingTestSuite) TestCycleDetection() {
NumChains: uint32(len(nodes)),
}
genesisTime := time.Now().UTC()
- to := newTotalOrdering(genesisTime, genesisConfig)
+ to := newTotalOrdering(genesisTime, 0, genesisConfig)
s.checkNotDeliver(to, b00)
s.checkNotDeliver(to, b01)
s.checkNotDeliver(to, b02)
@@ -375,7 +375,7 @@ func (s *TotalOrderingTestSuite) TestEarlyDeliver() {
NumChains: uint32(len(nodes)),
}
genesisTime := time.Now().UTC()
- to := newTotalOrdering(genesisTime, genesisConfig)
+ to := newTotalOrdering(genesisTime, 0, genesisConfig)
genNextBlock := func(b *types.Block) *types.Block {
return &types.Block{
ProposerID: b.ProposerID,
@@ -487,7 +487,7 @@ func (s *TotalOrderingTestSuite) TestBasicCaseForK2() {
NumChains: uint32(len(nodes)),
}
genesisTime := time.Now().UTC()
- to := newTotalOrdering(genesisTime, genesisConfig)
+ to := newTotalOrdering(genesisTime, 0, genesisConfig)
// Setup blocks.
b00 := s.genGenesisBlock(nodes, 0, common.Hashes{})
b10 := s.genGenesisBlock(nodes, 1, common.Hashes{})
@@ -830,7 +830,7 @@ func (s *TotalOrderingTestSuite) TestBasicCaseForK0() {
}
req = s.Require()
genesisTime = time.Now().UTC()
- to = newTotalOrdering(genesisTime, genesisConfig)
+ to = newTotalOrdering(genesisTime, 0, genesisConfig)
)
// Setup blocks.
b00 := s.genGenesisBlock(nodes, 0, common.Hashes{})
@@ -992,7 +992,7 @@ func (s *TotalOrderingTestSuite) TestRandomlyGeneratedBlocks() {
PhiRatio: phi,
NumChains: numChains,
}
- to := newTotalOrdering(genesisTime, genesisConfig)
+ to := newTotalOrdering(genesisTime, 0, genesisConfig)
// Add config for next round.
s.Require().NoError(to.appendConfig(1, &types.Config{
K: 0,
@@ -1010,7 +1010,7 @@ func (s *TotalOrderingTestSuite) TestRandomlyGeneratedBlocks() {
PhiRatio: phi,
NumChains: numChains,
}
- to := newTotalOrdering(genesisTime, genesisConfig)
+ to := newTotalOrdering(genesisTime, 0, genesisConfig)
// Add config for next round.
s.Require().NoError(to.appendConfig(1, &types.Config{
K: 1,
@@ -1028,7 +1028,7 @@ func (s *TotalOrderingTestSuite) TestRandomlyGeneratedBlocks() {
PhiRatio: phi,
NumChains: numChains,
}
- to := newTotalOrdering(genesisTime, genesisConfig)
+ to := newTotalOrdering(genesisTime, 0, genesisConfig)
s.Require().NoError(to.appendConfig(1, &types.Config{
K: 2,
PhiRatio: 0.5,
@@ -1045,7 +1045,7 @@ func (s *TotalOrderingTestSuite) TestRandomlyGeneratedBlocks() {
PhiRatio: phi,
NumChains: numChains,
}
- to := newTotalOrdering(genesisTime, genesisConfig)
+ to := newTotalOrdering(genesisTime, 0, genesisConfig)
s.Require().NoError(to.appendConfig(1, &types.Config{
K: 3,
PhiRatio: 0.5,
@@ -1085,7 +1085,7 @@ func (s *TotalOrderingTestSuite) baseTestForRoundChange(
revealingSequence := make(map[string]struct{})
orderingSequence := make(map[string]struct{})
for i := 0; i < repeat; i++ {
- to := newTotalOrdering(genesisTime, configs[0])
+ to := newTotalOrdering(genesisTime, 0, configs[0])
for roundID, config := range configs[1:] {
req.NoError(to.appendConfig(uint64(roundID+1), config))
}
@@ -1212,7 +1212,7 @@ func (s *TotalOrderingTestSuite) TestSync() {
PhiRatio: 0.67,
NumChains: numChains,
}
- to1 := newTotalOrdering(genesisTime, genesisConfig)
+ to1 := newTotalOrdering(genesisTime, 0, genesisConfig)
s.Require().NoError(to1.appendConfig(1, &types.Config{
K: 0,
PhiRatio: 0.5,
@@ -1236,7 +1236,7 @@ func (s *TotalOrderingTestSuite) TestSync() {
}
// Run new total ordering again.
offset := len(deliveredBlockSets1) / 2
- to2 := newTotalOrdering(genesisTime, genesisConfig)
+ to2 := newTotalOrdering(genesisTime, 0, genesisConfig)
s.Require().NoError(to2.appendConfig(1, &types.Config{
K: 0,
PhiRatio: 0.5,
@@ -1338,7 +1338,7 @@ func (s *TotalOrderingTestSuite) TestSyncWithConfigChange() {
blocks = append(blocks, &b)
}
- to1 := newTotalOrdering(genesisTime, configs[0])
+ to1 := newTotalOrdering(genesisTime, 0, configs[0])
for i, cfg := range configs[1:] {
req.NoError(to1.appendConfig(uint64(i+1), cfg))
}
@@ -1366,7 +1366,7 @@ func (s *TotalOrderingTestSuite) TestSyncWithConfigChange() {
// or nothing is tested.
req.True(uint64(0) < offsetRound && offsetRound < uint64(len(configs)-1))
- to2 := newTotalOrdering(genesisTime, configs[0])
+ to2 := newTotalOrdering(genesisTime, 0, configs[0])
for i, cfg := range configs[1:] {
req.NoError(to2.appendConfig(uint64(i+1), cfg))
}
diff --git a/core/utils.go b/core/utils.go
index 441aac1..bc5e336 100644
--- a/core/utils.go
+++ b/core/utils.go
@@ -167,6 +167,51 @@ func VerifyBlock(b *types.Block) (err error) {
return
}
+// VerifyAgreementResult perform sanity check against a types.AgreementResult
+// instance.
+func VerifyAgreementResult(
+ res *types.AgreementResult, cache *utils.NodeSetCache) error {
+ notarySet, err := cache.GetNotarySet(
+ res.Position.Round, res.Position.ChainID)
+ if err != nil {
+ return err
+ }
+ if len(res.Votes) < len(notarySet)/3*2+1 {
+ return ErrNotEnoughVotes
+ }
+ if len(res.Votes) > len(notarySet) {
+ return ErrIncorrectVoteProposer
+ }
+ for _, vote := range res.Votes {
+ if res.IsEmptyBlock {
+ if (vote.BlockHash != common.Hash{}) {
+ return ErrIncorrectVoteBlockHash
+ }
+ } else {
+ if vote.BlockHash != res.BlockHash {
+ return ErrIncorrectVoteBlockHash
+ }
+ }
+ if vote.Type != types.VoteCom {
+ return ErrIncorrectVoteType
+ }
+ if vote.Position != res.Position {
+ return ErrIncorrectVotePosition
+ }
+ if _, exist := notarySet[vote.ProposerID]; !exist {
+ return ErrIncorrectVoteProposer
+ }
+ ok, err := verifyVoteSignature(&vote)
+ if err != nil {
+ return err
+ }
+ if !ok {
+ return ErrIncorrectVoteSignature
+ }
+ }
+ return nil
+}
+
// DiffUint64 calculates difference between two uint64.
func DiffUint64(a, b uint64) uint64 {
if a > b {
diff --git a/integration_test/consensus_test.go b/integration_test/consensus_test.go
index 8fd3fa4..6432e9f 100644
--- a/integration_test/consensus_test.go
+++ b/integration_test/consensus_test.go
@@ -18,6 +18,7 @@
package integration
import (
+ "context"
"sync"
"testing"
"time"
@@ -26,6 +27,7 @@ import (
"github.com/dexon-foundation/dexon-consensus/core"
"github.com/dexon-foundation/dexon-consensus/core/blockdb"
"github.com/dexon-foundation/dexon-consensus/core/crypto"
+ "github.com/dexon-foundation/dexon-consensus/core/syncer"
"github.com/dexon-foundation/dexon-consensus/core/test"
"github.com/dexon-foundation/dexon-consensus/core/types"
"github.com/dexon-foundation/dexon-consensus/core/utils"
@@ -39,10 +41,12 @@ type ConsensusTestSuite struct {
}
type node struct {
- con *core.Consensus
- app *test.App
- gov *test.Governance
- db blockdb.BlockDatabase
+ ID types.NodeID
+ con *core.Consensus
+ app *test.App
+ gov *test.Governance
+ db blockdb.BlockDatabase
+ network *test.Network
}
func (s *ConsensusTestSuite) setupNodes(
@@ -81,8 +85,9 @@ func (s *ConsensusTestSuite) setupNodes(
db,
networkModule,
k,
- &common.NullLogger{})
- nodes[con.ID] = &node{con, app, gov, db}
+ &common.NullLogger{},
+ )
+ nodes[con.ID] = &node{con.ID, con, app, gov, db, networkModule}
go func() {
defer wg.Done()
s.Require().NoError(networkModule.Setup(serverChannel))
@@ -107,6 +112,53 @@ func (s *ConsensusTestSuite) verifyNodes(nodes map[types.NodeID]*node) {
}
}
+func (s *ConsensusTestSuite) syncBlocksWithSomeNode(
+ sourceNode *node, syncerObj *syncer.Consensus, nextSyncHeight uint64) (
+ syncedCon *core.Consensus, syncerHeight uint64, err error) {
+
+ syncerHeight = nextSyncHeight
+ // Setup revealer.
+ DBAll, err := sourceNode.db.GetAll()
+ if err != nil {
+ return
+ }
+ r, err := test.NewCompactionChainRevealer(DBAll, nextSyncHeight)
+ if err != nil {
+ return
+ }
+ // Load all blocks from revealer and dump them into syncer.
+ var compactionChainBlocks []*types.Block
+ syncBlocks := func() (done bool) {
+ syncedCon, err = syncerObj.SyncBlocks(compactionChainBlocks, true)
+ if syncedCon != nil || err != nil {
+ done = true
+ }
+ compactionChainBlocks = nil
+ return
+ }
+ for {
+ var b types.Block
+ b, err = r.Next()
+ if err != nil {
+ if err == blockdb.ErrIterationFinished {
+ err = nil
+ if syncBlocks() {
+ break
+ }
+ }
+ break
+ }
+ syncerHeight = b.Finalization.Height + 1
+ compactionChainBlocks = append(compactionChainBlocks, &b)
+ if len(compactionChainBlocks) >= 100 {
+ if syncBlocks() {
+ break
+ }
+ }
+ }
+ return
+}
+
func (s *ConsensusTestSuite) TestSimple() {
// The simplest test case:
// - Node set is equals to DKG set and notary set for each chain in each
@@ -143,7 +195,7 @@ Loop:
s.T().Log("check latest position delivered by each node")
for _, n := range nodes {
latestPos := n.app.GetLatestDeliveredPosition()
- s.T().Log("latestPos", n.con.ID, &latestPos)
+ s.T().Log("latestPos", n.ID, &latestPos)
if latestPos.Round < untilRound {
continue Loop
}
@@ -152,6 +204,7 @@ Loop:
break
}
s.verifyNodes(nodes)
+ // TODO(haoping) stop consensus.
}
func (s *ConsensusTestSuite) TestNumChainsChange() {
@@ -215,7 +268,7 @@ Loop:
s.T().Log("check latest position delivered by each node")
for _, n := range nodes {
latestPos := n.app.GetLatestDeliveredPosition()
- s.T().Log("latestPos", n.con.ID, &latestPos)
+ s.T().Log("latestPos", n.ID, &latestPos)
if latestPos.Round < untilRound {
continue Loop
}
@@ -226,6 +279,150 @@ Loop:
s.verifyNodes(nodes)
}
+func (s *ConsensusTestSuite) TestSync() {
+ // The sync test case:
+ // - No configuration change.
+ // - One node does not run when all others starts until aliveRound exceeded.
+ var (
+ req = s.Require()
+ peerCount = 4
+ dMoment = time.Now().UTC()
+ untilRound = uint64(5)
+ aliveRound = uint64(1)
+ errChan = make(chan error, 100)
+ )
+ prvKeys, pubKeys, err := test.NewKeys(peerCount)
+ req.NoError(err)
+ // Setup seed governance instance. Give a short latency to make this test
+ // run faster.
+ seedGov, err := test.NewGovernance(
+ pubKeys, 100*time.Millisecond, core.ConfigRoundShift)
+ req.NoError(err)
+ req.NoError(seedGov.State().RequestChange(
+ test.StateChangeRoundInterval, 30*time.Second))
+ // A short round interval.
+ nodes := s.setupNodes(dMoment, prvKeys, seedGov)
+ // Choose the first node as "syncNode" that its consensus' Run() is called
+ // later.
+ syncNode := nodes[types.NewNodeID(pubKeys[0])]
+ syncNode.con = nil
+ // Use other node's governance instance. Normally, fullnode would make
+ // governance when syncing. In our test, it's the simplest way to achieve
+ // that.
+ syncNode.gov = nodes[types.NewNodeID(pubKeys[1])].gov
+ for _, n := range nodes {
+ if n.ID != syncNode.ID {
+ go n.con.Run(&types.Block{})
+ }
+ }
+ // Clean syncNode's network receive channel, or it might exceed the limit
+ // and block other go routines.
+ dummyReceiverCtx, dummyReceiverCtxCancel := context.WithCancel(
+ context.Background())
+ go func() {
+ Loop:
+ for {
+ select {
+ case <-syncNode.network.ReceiveChan():
+ case <-dummyReceiverCtx.Done():
+ break Loop
+ }
+ }
+ }()
+ReachAlive:
+ for {
+ // If all nodes excepts syncNode have reached aliveRound, call syncNode's
+ // Run() and send it all blocks in one of normal node's compaction chain.
+ for id, n := range nodes {
+ if id == syncNode.ID {
+ continue
+ }
+ if n.app.GetLatestDeliveredPosition().Round < aliveRound {
+ continue ReachAlive
+ }
+ // Check if any error happened or sleep for a period of time.
+ select {
+ case err := <-errChan:
+ req.NoError(err)
+ case <-time.After(5 * time.Second):
+ }
+ }
+ dummyReceiverCtxCancel()
+ break
+ }
+ // Initiate Syncer.
+ runnerCtx, runnerCtxCancel := context.WithCancel(context.Background())
+ defer runnerCtxCancel()
+ syncerObj := syncer.NewConsensus(
+ dMoment,
+ syncNode.app,
+ syncNode.gov,
+ syncNode.db,
+ syncNode.network,
+ prvKeys[0],
+ &common.NullLogger{},
+ )
+ // Initialize communication channel, it's not recommended to assertion in
+ // another go routine.
+ go func() {
+ var (
+ node *node
+ syncedHeight uint64
+ err error
+ syncedCon *core.Consensus
+ )
+ SyncLoop:
+ for {
+ for _, n := range nodes {
+ if n.ID == syncNode.ID {
+ continue
+ }
+ node = n
+ break
+ }
+ syncedCon, syncedHeight, err = s.syncBlocksWithSomeNode(
+ node, syncerObj, syncedHeight)
+ if syncedCon != nil {
+ // TODO(mission): run it and make sure it can follow up with
+ // other nodes.
+ runnerCtxCancel()
+ break SyncLoop
+ }
+ if err != nil {
+ errChan <- err
+ break SyncLoop
+ }
+ select {
+ case <-runnerCtx.Done():
+ break SyncLoop
+ case <-time.After(2 * time.Second):
+ }
+ }
+ }()
+ // Wait until all nodes reach 'untilRound'.
+ go func() {
+ ReachFinished:
+ for {
+ time.Sleep(5 * time.Second)
+ for _, n := range nodes {
+ if n.app.GetLatestDeliveredPosition().Round < untilRound {
+ continue ReachFinished
+ }
+ }
+ break
+ }
+ runnerCtxCancel()
+ }()
+ // Block until any reasonable testing milestone reached.
+ select {
+ case err := <-errChan:
+ req.NoError(err)
+ case <-runnerCtx.Done():
+ // This test passed.
+ // TODO(haoping) stop consensus.
+ }
+}
+
func TestConsensus(t *testing.T) {
suite.Run(t, new(ConsensusTestSuite))
}
diff --git a/integration_test/stats.go b/integration_test/stats.go
index 79f1c20..5d66e29 100644
--- a/integration_test/stats.go
+++ b/integration_test/stats.go
@@ -1,3 +1,20 @@
+// Copyright 2018 The dexon-consensus Authors
+// This file is part of the dexon-consensus library.
+//
+// The dexon-consensus library is free software: you can redistribute it
+// and/or modify it under the terms of the GNU Lesser General Public License as
+// published by the Free Software Foundation, either version 3 of the License,
+// or (at your option) any later version.
+//
+// The dexon-consensus library is distributed in the hope that it will be
+// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
+// General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the dexon-consensus library. If not, see
+// <http://www.gnu.org/licenses/>.
+
package integration
import (
@@ -58,7 +75,7 @@ func (s *StatsSet) newBlockReceiveEvent(
// Find statistics from test.App
block := payload.PiggyBack.(*types.Block)
- app.Check(func(app *test.App) {
+ app.WithLock(func(app *test.App) {
// Is this block confirmed?
if _, exists := app.Confirmed[block.Hash]; !exists {
return
diff --git a/integration_test/stats_test.go b/integration_test/stats_test.go
index d61799b..5c1f412 100644
--- a/integration_test/stats_test.go
+++ b/integration_test/stats_test.go
@@ -1,3 +1,20 @@
+// Copyright 2018 The dexon-consensus Authors
+// This file is part of the dexon-consensus library.
+//
+// The dexon-consensus library is free software: you can redistribute it
+// and/or modify it under the terms of the GNU Lesser General Public License as
+// published by the Free Software Foundation, either version 3 of the License,
+// or (at your option) any later version.
+//
+// The dexon-consensus library is distributed in the hope that it will be
+// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
+// General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the dexon-consensus library. If not, see
+// <http://www.gnu.org/licenses/>.
+
package integration
import (
diff --git a/integration_test/utils.go b/integration_test/utils.go
index 1530507..a6cc139 100644
--- a/integration_test/utils.go
+++ b/integration_test/utils.go
@@ -1,3 +1,20 @@
+// Copyright 2018 The dexon-consensus Authors
+// This file is part of the dexon-consensus library.
+//
+// The dexon-consensus library is free software: you can redistribute it
+// and/or modify it under the terms of the GNU Lesser General Public License as
+// published by the Free Software Foundation, either version 3 of the License,
+// or (at your option) any later version.
+//
+// The dexon-consensus library is distributed in the hope that it will be
+// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
+// General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the dexon-consensus library. If not, see
+// <http://www.gnu.org/licenses/>.
+
package integration
import (