diff options
author | Jimmy Hu <jimmy.hu@dexon.org> | 2019-01-15 16:06:21 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-01-15 16:06:21 +0800 |
commit | 8e1b6161f346993b74558124758cfb650465cf05 (patch) | |
tree | c5b60eae90c3358e6faf43cee9d3ffc3b9945dd1 | |
parent | 809e8def862fdfa792061a448f952747f1af4d3c (diff) | |
download | dexon-consensus-8e1b6161f346993b74558124758cfb650465cf05.tar dexon-consensus-8e1b6161f346993b74558124758cfb650465cf05.tar.gz dexon-consensus-8e1b6161f346993b74558124758cfb650465cf05.tar.bz2 dexon-consensus-8e1b6161f346993b74558124758cfb650465cf05.tar.lz dexon-consensus-8e1b6161f346993b74558124758cfb650465cf05.tar.xz dexon-consensus-8e1b6161f346993b74558124758cfb650465cf05.tar.zst dexon-consensus-8e1b6161f346993b74558124758cfb650465cf05.zip |
core: Fix BA3.0 (#420)
* Add Restart to Ticker
* Change pre allocated size
* Return NextTime from lattice
* Few hacky fixes for BA
* PullVote in FastRollback state
* Add shallowBlock for agreementResult
* Extend period
* Fixup
-rw-r--r-- | core/agreement-mgr.go | 16 | ||||
-rw-r--r-- | core/agreement.go | 30 | ||||
-rw-r--r-- | core/consensus.go | 3 | ||||
-rw-r--r-- | core/interfaces.go | 3 | ||||
-rw-r--r-- | core/lattice-data.go | 57 | ||||
-rw-r--r-- | core/lattice-data_test.go | 30 | ||||
-rw-r--r-- | core/lattice.go | 24 | ||||
-rw-r--r-- | core/lattice_test.go | 10 | ||||
-rw-r--r-- | core/test/state.go | 2 | ||||
-rw-r--r-- | core/ticker.go | 14 | ||||
-rw-r--r-- | core/utils/nodeset-cache.go | 2 | ||||
-rw-r--r-- | core/utils/nodeset-cache_test.go | 11 | ||||
-rw-r--r-- | integration_test/node.go | 2 |
13 files changed, 145 insertions, 59 deletions
diff --git a/core/agreement-mgr.go b/core/agreement-mgr.go index eb4abda..e468e9c 100644 --- a/core/agreement-mgr.go +++ b/core/agreement-mgr.go @@ -42,7 +42,7 @@ func genValidLeader( if block.Timestamp.After(time.Now()) { return false, nil } - if err := mgr.lattice.SanityCheck(block); err != nil { + if err := mgr.lattice.SanityCheck(block, true); err != nil { if err == ErrRetrySanityCheckLater { return false, nil } @@ -190,6 +190,14 @@ func (mgr *agreementMgr) appendConfig( newLeaderSelector(genValidLeader(mgr), mgr.logger), mgr.signer, mgr.logger) + // Hacky way to initialize first notarySet. + nodes, err := mgr.cache.GetNodeSet(round) + if err != nil { + return err + } + agrModule.notarySet = nodes.GetSubSet( + int(config.NotarySetSize), + types.NewNotarySetTarget(crs, i)) // Hacky way to make agreement module self contained. recv.agreementModule = agrModule mgr.baModules = append(mgr.baModules, agrModule) @@ -441,8 +449,10 @@ Loop: } } var nextHeight uint64 + var nextTime time.Time for { - nextHeight, err = mgr.lattice.NextHeight(recv.round(), setting.chainID) + nextHeight, nextTime, err = + mgr.lattice.NextBlock(recv.round(), setting.chainID) if err != nil { mgr.logger.Debug("Error getting next height", "error", err, @@ -471,6 +481,8 @@ Loop: if err != nil { return err } + time.Sleep(nextTime.Sub(time.Now())) + setting.ticker.Restart() agr.restart(setting.notarySet, nextPos, leader, setting.crs) default: } diff --git a/core/agreement.go b/core/agreement.go index f9cf546..62bbe25 100644 --- a/core/agreement.go +++ b/core/agreement.go @@ -252,7 +252,12 @@ func isStop(aID types.Position) bool { func (a *agreement) clocks() int { a.data.lock.RLock() defer a.data.lock.RUnlock() - return a.state.clocks() + scale := int(a.data.period) + // 10 is a magic number derived from many years of experience. + if scale > 10 { + scale = 10 + } + return a.state.clocks() * scale } // pullVotes returns if current agreement requires more votes to continue. @@ -260,6 +265,7 @@ func (a *agreement) pullVotes() bool { a.data.lock.RLock() defer a.data.lock.RUnlock() return a.state.state() == statePullVote || + a.state.state() == stateFastRollback || (a.state.state() == statePreCommit && (a.data.period%3) == 0) } @@ -308,18 +314,21 @@ func (a *agreement) sanityCheck(vote *types.Vote) error { return nil } -func (a *agreement) checkForkVote(vote *types.Vote) error { +func (a *agreement) checkForkVote(vote *types.Vote) ( + alreadyExist bool, err error) { a.data.lock.RLock() defer a.data.lock.RUnlock() if votes, exist := a.data.votes[vote.Period]; exist { if oldVote, exist := votes[vote.Type][vote.ProposerID]; exist { + alreadyExist = true if vote.BlockHash != oldVote.BlockHash { a.data.recv.ReportForkVote(oldVote, vote) - return &ErrForkVote{vote.ProposerID, oldVote, vote} + err = &ErrForkVote{vote.ProposerID, oldVote, vote} + return } } } - return nil + return } // prepareVote prepares a vote. @@ -339,6 +348,13 @@ func (a *agreement) processVote(vote *types.Vote) error { aID := a.agreementID() // Agreement module has stopped. if isStop(aID) { + // Hacky way to not drop first votes for height 0. + if vote.Position.Height == uint64(0) { + a.pendingVote = append(a.pendingVote, pendingVote{ + vote: vote, + receivedTime: time.Now().UTC(), + }) + } return nil } if vote.Position != aID { @@ -351,9 +367,13 @@ func (a *agreement) processVote(vote *types.Vote) error { }) return nil } - if err := a.checkForkVote(vote); err != nil { + exist, err := a.checkForkVote(vote) + if err != nil { return err } + if exist { + return nil + } a.data.lock.Lock() defer a.data.lock.Unlock() diff --git a/core/consensus.go b/core/consensus.go index 2770018..0754e80 100644 --- a/core/consensus.go +++ b/core/consensus.go @@ -932,7 +932,7 @@ MessageLoop: ch, e := con.baConfirmedBlock[val.Hash] return ch, e }(); exist { - if err := con.lattice.SanityCheck(val); err != nil { + if err := con.lattice.SanityCheck(val, false); err != nil { if err == ErrRetrySanityCheckLater { err = nil } else { @@ -1040,6 +1040,7 @@ func (con *Consensus) ProcessAgreementResult( if err := VerifyAgreementResult(rand, con.nodeSetCache); err != nil { return err } + con.lattice.AddShallowBlock(rand.BlockHash, rand.Position) // Syncing BA Module. if err := con.baMgr.processAgreementResult(rand); err != nil { return err diff --git a/core/interfaces.go b/core/interfaces.go index aa87e38..a77ec93 100644 --- a/core/interfaces.go +++ b/core/interfaces.go @@ -159,4 +159,7 @@ type Ticker interface { // Stop the ticker. Stop() + + // Retart the ticker and clear all internal data. + Restart() } diff --git a/core/lattice-data.go b/core/lattice-data.go index 8367539..cf81a11 100644 --- a/core/lattice-data.go +++ b/core/lattice-data.go @@ -113,6 +113,8 @@ type latticeData struct { blockByHash map[common.Hash]*types.Block // This stores configuration for each round. configs []*latticeDataConfig + // shallowBlocks stores the hash of blocks that their body is not receive yet. + shallowBlocks map[common.Hash]types.Position } // newLatticeData creates a new latticeData instance. @@ -126,10 +128,11 @@ func newLatticeData( genesisConfig.fromConfig(round, config) genesisConfig.setRoundBeginTime(dMoment) data = &latticeData{ - db: db, - chains: make([]*chainStatus, genesisConfig.numChains), - blockByHash: make(map[common.Hash]*types.Block), - configs: []*latticeDataConfig{genesisConfig}, + db: db, + chains: make([]*chainStatus, genesisConfig.numChains), + blockByHash: make(map[common.Hash]*types.Block), + configs: []*latticeDataConfig{genesisConfig}, + shallowBlocks: make(map[common.Hash]types.Position), } for i := range data.chains { data.chains[i] = &chainStatus{ @@ -141,15 +144,35 @@ func newLatticeData( return } -func (data *latticeData) checkAckingRelations(b *types.Block) error { +func (data *latticeData) addShallowBlock(hash common.Hash, pos types.Position) { + // We don't care other errors here. This `if` is to prevent being spammed by + // very old blocks. + if _, err := data.findBlock(hash); err != db.ErrBlockDoesNotExist { + return + } + data.shallowBlocks[hash] = pos +} + +func (data *latticeData) checkAckingRelations( + b *types.Block, allowShallow bool) error { acksByChainID := make(map[uint32]struct{}, len(data.chains)) for _, hash := range b.Acks { bAck, err := data.findBlock(hash) if err != nil { if err == db.ErrBlockDoesNotExist { - return &ErrAckingBlockNotExists{hash} + err = &ErrAckingBlockNotExists{hash} + if allowShallow { + if pos, exist := data.shallowBlocks[hash]; exist { + bAck = &types.Block{ + Position: pos, + } + err = nil + } + } + } + if err != nil { + return err } - return err } // Check if it acks blocks from old rounds, the allowed round difference // is 1. @@ -172,7 +195,7 @@ func (data *latticeData) checkAckingRelations(b *types.Block) error { return nil } -func (data *latticeData) sanityCheck(b *types.Block) error { +func (data *latticeData) sanityCheck(b *types.Block, allowShallow bool) error { // TODO(mission): Check if its proposer is in validator set, lattice has no // knowledge about node set. config := data.getConfig(b.Position.Round) @@ -196,7 +219,7 @@ func (data *latticeData) sanityCheck(b *types.Block) error { if !config.isValidGenesisBlockTime(b) { return ErrIncorrectBlockTime } - return data.checkAckingRelations(b) + return data.checkAckingRelations(b, allowShallow) } // Check parent block if parent hash is specified. if !b.ParentHash.Equal(common.Hash{}) { @@ -257,7 +280,7 @@ func (data *latticeData) sanityCheck(b *types.Block) error { return ErrNotAckParent } } - return data.checkAckingRelations(b) + return data.checkAckingRelations(b, allowShallow) } // addBlock processes blocks. It does sanity check, inserts block into lattice @@ -501,19 +524,21 @@ func (data *latticeData) prepareEmptyBlock(b *types.Block) (err error) { } // TODO(mission): make more abstraction for this method. -// nextHeight returns the next height of a chain. -func (data *latticeData) nextHeight( - round uint64, chainID uint32) (uint64, error) { +// nextBlock returns the next height and timestamp of a chain. +func (data *latticeData) nextBlock( + round uint64, chainID uint32) (uint64, time.Time, error) { chainTip := data.chains[chainID].tip bindTip, err := data.isBindTip( types.Position{Round: round, ChainID: chainID}, chainTip) if err != nil { - return 0, err + return 0, time.Time{}, err } + config := data.getConfig(round) if bindTip { - return chainTip.Position.Height + 1, nil + return chainTip.Position.Height + 1, + chainTip.Timestamp.Add(config.minBlockTimeInterval), nil } - return 0, nil + return 0, config.roundBeginTime, nil } // findBlock seeks blocks in memory or db. diff --git a/core/lattice-data_test.go b/core/lattice-data_test.go index 8252361..2e6d684 100644 --- a/core/lattice-data_test.go +++ b/core/lattice-data_test.go @@ -193,7 +193,7 @@ func (s *LatticeDataTestSuite) TestSanityCheck() { ) check := func(expectedErr error, b *types.Block) { s.hashBlock(b) - err := data.sanityCheck(b) + err := data.sanityCheck(b, false) req.NotNil(err) req.IsType(expectedErr, err) } @@ -323,7 +323,7 @@ func (s *LatticeDataTestSuite) TestSanityCheck() { Timestamp: time.Now().UTC().Add(500 * time.Second), } s.hashBlock(b11) - req.NoError(data.sanityCheck(b11)) + req.NoError(data.sanityCheck(b11, false)) _, err := data.addBlock(b11) req.NoError(err) // A block didn't perform round switching. @@ -340,7 +340,7 @@ func (s *LatticeDataTestSuite) TestSanityCheck() { // A block with expected new round ID should be OK. b12.Position.Round = 1 s.hashBlock(b12) - req.NoError(data.sanityCheck(b12)) + req.NoError(data.sanityCheck(b12, false)) } func (s *LatticeDataTestSuite) TestRandomlyGeneratedBlocks() { @@ -404,7 +404,7 @@ func (s *LatticeDataTestSuite) TestRandomlyGeneratedBlocks() { req.NoError(err) revealedHashes = append(revealedHashes, b.Hash) // Pass blocks to lattice. - req.NoError(data.sanityCheck(&b)) + req.NoError(data.sanityCheck(&b, false)) delivered, err = data.addBlock(&b) req.NoError(err) for _, b := range delivered { @@ -550,12 +550,14 @@ func (s *LatticeDataTestSuite) TestPrepareBlock() { req.Equal(b01.Position.Height, uint64(1)) } -func (s *LatticeDataTestSuite) TestNextHeight() { - // Test 'NextHeight' method when lattice is ready. - data, _ := s.genTestCase1() - h, err := data.nextHeight(0, 0) +func (s *LatticeDataTestSuite) TestNextBlock() { + // Test 'NextBlock' method when lattice is ready. + data, blocks := s.genTestCase1() + h, ts, err := data.nextBlock(0, 0) s.Require().NoError(err) - s.Require().Equal(h, uint64(4)) + s.Require().Equal(uint64(4), h) + // 2ns of minBlockTime is defined in genTestCase1(). + s.Require().Equal(blocks[0][3].Timestamp.Add(2*time.Nanosecond), ts) // Test 'NextHeight' method when lattice is empty. // Setup a configuration that no restriction on block interval and // round cutting. @@ -564,10 +566,12 @@ func (s *LatticeDataTestSuite) TestNextHeight() { NumChains: 4, MinBlockInterval: 1 * time.Second, } - data = newLatticeData(nil, time.Now().UTC(), 0, genesisConfig) - h, err = data.nextHeight(0, 0) + now := time.Now().UTC() + data = newLatticeData(nil, now, 0, genesisConfig) + h, ts, err = data.nextBlock(0, 0) s.Require().NoError(err) - s.Require().Equal(h, uint64(0)) + s.Require().Equal(now, ts) + s.Require().Equal(uint64(0), h) } func (s *LatticeDataTestSuite) TestPrepareEmptyBlock() { @@ -666,7 +670,7 @@ func (s *LatticeDataTestSuite) TestNumChainsChange() { req.NoError(err) s.hashBlock(b) // Do the actual lattice usage. - req.NoError(lattice.sanityCheck(b)) + req.NoError(lattice.sanityCheck(b, false)) d, err := lattice.addBlock(b) req.NoError(err) delivered = append(delivered, d...) diff --git a/core/lattice.go b/core/lattice.go index 4642820..d531639 100644 --- a/core/lattice.go +++ b/core/lattice.go @@ -105,11 +105,19 @@ func (l *Lattice) PrepareEmptyBlock(b *types.Block) (err error) { return } +// AddShallowBlock adds a hash of a block that is confirmed by other nodes but +// the content is not arrived yet. +func (l *Lattice) AddShallowBlock(hash common.Hash, pos types.Position) { + l.lock.Lock() + defer l.lock.Unlock() + l.data.addShallowBlock(hash, pos) +} + // SanityCheck checks the validity of a block. // // If any acking block of this block does not exist, Lattice caches this block // and retries when Lattice.ProcessBlock is called. -func (l *Lattice) SanityCheck(b *types.Block) (err error) { +func (l *Lattice) SanityCheck(b *types.Block, allowShallow bool) (err error) { if b.IsEmpty() { // Only need to verify block's hash. var hash common.Hash @@ -137,7 +145,7 @@ func (l *Lattice) SanityCheck(b *types.Block) (err error) { } l.lock.RLock() defer l.lock.RUnlock() - if err = l.data.sanityCheck(b); err != nil { + if err = l.data.sanityCheck(b, allowShallow); err != nil { if _, ok := err.(*ErrAckingBlockNotExists); ok { err = ErrRetrySanityCheckLater } @@ -178,7 +186,7 @@ func (l *Lattice) addBlockToLattice( if tip = l.pool.tip(i); tip == nil { continue } - err = l.data.sanityCheck(tip) + err = l.data.sanityCheck(tip, false) if err == nil { var output []*types.Block if output, err = l.data.addBlock(tip); err != nil { @@ -188,6 +196,7 @@ func (l *Lattice) addBlockToLattice( "block", tip, "error", err) panic(err) } + delete(l.data.shallowBlocks, tip.Hash) hasOutput = true outputBlocks = append(outputBlocks, output...) l.pool.removeTip(i) @@ -272,12 +281,13 @@ func (l *Lattice) ProcessBlock( return } -// NextHeight returns expected height of incoming block for specified chain and -// given round. -func (l *Lattice) NextHeight(round uint64, chainID uint32) (uint64, error) { +// NextBlock returns expected height and timestamp of incoming block for +// specified chain and given round. +func (l *Lattice) NextBlock(round uint64, chainID uint32) ( + uint64, time.Time, error) { l.lock.RLock() defer l.lock.RUnlock() - return l.data.nextHeight(round, chainID) + return l.data.nextBlock(round, chainID) } // PurgeBlocks purges blocks' cache in memory, this is called when the caller diff --git a/core/lattice_test.go b/core/lattice_test.go index 99723d6..488bef5 100644 --- a/core/lattice_test.go +++ b/core/lattice_test.go @@ -55,7 +55,7 @@ func (mgr *testLatticeMgr) processBlock(b *types.Block) (err error) { var ( delivered []*types.Block ) - if err = mgr.lattice.SanityCheck(b); err != nil { + if err = mgr.lattice.SanityCheck(b, false); err != nil { if err == ErrRetrySanityCheckLater { err = nil } else { @@ -235,12 +235,12 @@ func (s *LatticeTestSuite) TestSanityCheck() { Timestamp: time.Now().UTC(), } req.NoError(signer.SignBlock(b)) - req.NoError(lattice.SanityCheck(b)) + req.NoError(lattice.SanityCheck(b, false)) // A block with incorrect signature should not pass sanity check. otherPrvKey, err := ecdsa.NewPrivateKey() req.NoError(err) b.Signature, err = otherPrvKey.Sign(common.NewRandomHash()) - req.Equal(lattice.SanityCheck(b), ErrIncorrectSignature) + req.Equal(lattice.SanityCheck(b, false), ErrIncorrectSignature) // A block with un-sorted acks should not pass sanity check. b.Acks = common.NewSortedHashes(common.Hashes{ common.NewRandomHash(), @@ -251,10 +251,10 @@ func (s *LatticeTestSuite) TestSanityCheck() { }) b.Acks[0], b.Acks[1] = b.Acks[1], b.Acks[0] req.NoError(signer.SignBlock(b)) - req.Equal(lattice.SanityCheck(b), ErrAcksNotSorted) + req.Equal(lattice.SanityCheck(b, false), ErrAcksNotSorted) // A block with incorrect hash should not pass sanity check. b.Hash = common.NewRandomHash() - req.Equal(lattice.SanityCheck(b), ErrIncorrectHash) + req.Equal(lattice.SanityCheck(b, false), ErrIncorrectHash) } func TestLattice(t *testing.T) { diff --git a/core/test/state.go b/core/test/state.go index e943a8a..a5a285b 100644 --- a/core/test/state.go +++ b/core/test/state.go @@ -133,7 +133,7 @@ func NewState( lambdaBA: lambda, lambdaDKG: lambda * 10, roundInterval: lambda * 10000, - minBlockInterval: time.Millisecond * 1, + minBlockInterval: 4 * lambda, crs: []common.Hash{genesisCRS}, nodes: nodes, phiRatio: 0.667, diff --git a/core/ticker.go b/core/ticker.go index f8d0c67..ffd5ab4 100644 --- a/core/ticker.go +++ b/core/ticker.go @@ -36,12 +36,16 @@ const ( // defaultTicker is a wrapper to implement ticker interface based on // time.Ticker. type defaultTicker struct { - ticker *time.Ticker + ticker *time.Ticker + duration time.Duration } // newDefaultTicker constructs an defaultTicker instance by giving an interval. func newDefaultTicker(lambda time.Duration) *defaultTicker { - return &defaultTicker{ticker: time.NewTicker(lambda)} + return &defaultTicker{ + ticker: time.NewTicker(lambda), + duration: lambda, + } } // Tick implements Tick method of ticker interface. @@ -54,6 +58,12 @@ func (t *defaultTicker) Stop() { t.ticker.Stop() } +// Restart implements Stop method of ticker interface. +func (t *defaultTicker) Restart() { + t.ticker.Stop() + t.ticker = time.NewTicker(t.duration) +} + // newTicker is a helper to setup a ticker by giving an Governance. If // the governace object implements a ticker generator, a ticker from that // generator would be returned, else constructs a default one. diff --git a/core/utils/nodeset-cache.go b/core/utils/nodeset-cache.go index 35828b7..8a07c9d 100644 --- a/core/utils/nodeset-cache.go +++ b/core/utils/nodeset-cache.go @@ -247,7 +247,7 @@ func (cache *NodeSetCache) update( nIDs.notarySet[i] = nodeSet.GetSubSet( int(cfg.NotarySetSize), types.NewNotarySetTarget(crs, uint32(i))) } - nodesPerChain := cfg.RoundInterval / (cfg.LambdaBA * 4) + nodesPerChain := cfg.RoundInterval / cfg.MinBlockInterval for i := range nIDs.leaderNode { nIDs.leaderNode[i] = make(map[uint64]types.NodeID, nodesPerChain) } diff --git a/core/utils/nodeset-cache_test.go b/core/utils/nodeset-cache_test.go index 9e6ceee..c600f15 100644 --- a/core/utils/nodeset-cache_test.go +++ b/core/utils/nodeset-cache_test.go @@ -36,11 +36,12 @@ type nsIntf struct { func (g *nsIntf) Configuration(round uint64) (cfg *types.Config) { return &types.Config{ - NotarySetSize: 7, - DKGSetSize: 7, - NumChains: 4, - LambdaBA: 250 * time.Millisecond, - RoundInterval: 60 * time.Second, + NotarySetSize: 7, + DKGSetSize: 7, + NumChains: 4, + LambdaBA: 250 * time.Millisecond, + RoundInterval: 60 * time.Second, + MinBlockInterval: 1 * time.Second, } } func (g *nsIntf) CRS(round uint64) (b common.Hash) { return g.crs } diff --git a/integration_test/node.go b/integration_test/node.go index 2d0ca9f..c2bb806 100644 --- a/integration_test/node.go +++ b/integration_test/node.go @@ -228,7 +228,7 @@ func (n *Node) processBlock(b *types.Block) (events []*test.Event, err error) { ) updated := false for _, p := range n.pendings { - if tmpErr = n.lattice.SanityCheck(p); tmpErr != nil { + if tmpErr = n.lattice.SanityCheck(p, false); tmpErr != nil { if tmpErr == core.ErrRetrySanityCheckLater { newPendings = append(newPendings, p) } else { |