aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--core/agreement-mgr.go16
-rw-r--r--core/agreement.go30
-rw-r--r--core/consensus.go3
-rw-r--r--core/interfaces.go3
-rw-r--r--core/lattice-data.go57
-rw-r--r--core/lattice-data_test.go30
-rw-r--r--core/lattice.go24
-rw-r--r--core/lattice_test.go10
-rw-r--r--core/test/state.go2
-rw-r--r--core/ticker.go14
-rw-r--r--core/utils/nodeset-cache.go2
-rw-r--r--core/utils/nodeset-cache_test.go11
-rw-r--r--integration_test/node.go2
13 files changed, 145 insertions, 59 deletions
diff --git a/core/agreement-mgr.go b/core/agreement-mgr.go
index eb4abda..e468e9c 100644
--- a/core/agreement-mgr.go
+++ b/core/agreement-mgr.go
@@ -42,7 +42,7 @@ func genValidLeader(
if block.Timestamp.After(time.Now()) {
return false, nil
}
- if err := mgr.lattice.SanityCheck(block); err != nil {
+ if err := mgr.lattice.SanityCheck(block, true); err != nil {
if err == ErrRetrySanityCheckLater {
return false, nil
}
@@ -190,6 +190,14 @@ func (mgr *agreementMgr) appendConfig(
newLeaderSelector(genValidLeader(mgr), mgr.logger),
mgr.signer,
mgr.logger)
+ // Hacky way to initialize first notarySet.
+ nodes, err := mgr.cache.GetNodeSet(round)
+ if err != nil {
+ return err
+ }
+ agrModule.notarySet = nodes.GetSubSet(
+ int(config.NotarySetSize),
+ types.NewNotarySetTarget(crs, i))
// Hacky way to make agreement module self contained.
recv.agreementModule = agrModule
mgr.baModules = append(mgr.baModules, agrModule)
@@ -441,8 +449,10 @@ Loop:
}
}
var nextHeight uint64
+ var nextTime time.Time
for {
- nextHeight, err = mgr.lattice.NextHeight(recv.round(), setting.chainID)
+ nextHeight, nextTime, err =
+ mgr.lattice.NextBlock(recv.round(), setting.chainID)
if err != nil {
mgr.logger.Debug("Error getting next height",
"error", err,
@@ -471,6 +481,8 @@ Loop:
if err != nil {
return err
}
+ time.Sleep(nextTime.Sub(time.Now()))
+ setting.ticker.Restart()
agr.restart(setting.notarySet, nextPos, leader, setting.crs)
default:
}
diff --git a/core/agreement.go b/core/agreement.go
index f9cf546..62bbe25 100644
--- a/core/agreement.go
+++ b/core/agreement.go
@@ -252,7 +252,12 @@ func isStop(aID types.Position) bool {
func (a *agreement) clocks() int {
a.data.lock.RLock()
defer a.data.lock.RUnlock()
- return a.state.clocks()
+ scale := int(a.data.period)
+ // 10 is a magic number derived from many years of experience.
+ if scale > 10 {
+ scale = 10
+ }
+ return a.state.clocks() * scale
}
// pullVotes returns if current agreement requires more votes to continue.
@@ -260,6 +265,7 @@ func (a *agreement) pullVotes() bool {
a.data.lock.RLock()
defer a.data.lock.RUnlock()
return a.state.state() == statePullVote ||
+ a.state.state() == stateFastRollback ||
(a.state.state() == statePreCommit && (a.data.period%3) == 0)
}
@@ -308,18 +314,21 @@ func (a *agreement) sanityCheck(vote *types.Vote) error {
return nil
}
-func (a *agreement) checkForkVote(vote *types.Vote) error {
+func (a *agreement) checkForkVote(vote *types.Vote) (
+ alreadyExist bool, err error) {
a.data.lock.RLock()
defer a.data.lock.RUnlock()
if votes, exist := a.data.votes[vote.Period]; exist {
if oldVote, exist := votes[vote.Type][vote.ProposerID]; exist {
+ alreadyExist = true
if vote.BlockHash != oldVote.BlockHash {
a.data.recv.ReportForkVote(oldVote, vote)
- return &ErrForkVote{vote.ProposerID, oldVote, vote}
+ err = &ErrForkVote{vote.ProposerID, oldVote, vote}
+ return
}
}
}
- return nil
+ return
}
// prepareVote prepares a vote.
@@ -339,6 +348,13 @@ func (a *agreement) processVote(vote *types.Vote) error {
aID := a.agreementID()
// Agreement module has stopped.
if isStop(aID) {
+ // Hacky way to not drop first votes for height 0.
+ if vote.Position.Height == uint64(0) {
+ a.pendingVote = append(a.pendingVote, pendingVote{
+ vote: vote,
+ receivedTime: time.Now().UTC(),
+ })
+ }
return nil
}
if vote.Position != aID {
@@ -351,9 +367,13 @@ func (a *agreement) processVote(vote *types.Vote) error {
})
return nil
}
- if err := a.checkForkVote(vote); err != nil {
+ exist, err := a.checkForkVote(vote)
+ if err != nil {
return err
}
+ if exist {
+ return nil
+ }
a.data.lock.Lock()
defer a.data.lock.Unlock()
diff --git a/core/consensus.go b/core/consensus.go
index 2770018..0754e80 100644
--- a/core/consensus.go
+++ b/core/consensus.go
@@ -932,7 +932,7 @@ MessageLoop:
ch, e := con.baConfirmedBlock[val.Hash]
return ch, e
}(); exist {
- if err := con.lattice.SanityCheck(val); err != nil {
+ if err := con.lattice.SanityCheck(val, false); err != nil {
if err == ErrRetrySanityCheckLater {
err = nil
} else {
@@ -1040,6 +1040,7 @@ func (con *Consensus) ProcessAgreementResult(
if err := VerifyAgreementResult(rand, con.nodeSetCache); err != nil {
return err
}
+ con.lattice.AddShallowBlock(rand.BlockHash, rand.Position)
// Syncing BA Module.
if err := con.baMgr.processAgreementResult(rand); err != nil {
return err
diff --git a/core/interfaces.go b/core/interfaces.go
index aa87e38..a77ec93 100644
--- a/core/interfaces.go
+++ b/core/interfaces.go
@@ -159,4 +159,7 @@ type Ticker interface {
// Stop the ticker.
Stop()
+
+ // Retart the ticker and clear all internal data.
+ Restart()
}
diff --git a/core/lattice-data.go b/core/lattice-data.go
index 8367539..cf81a11 100644
--- a/core/lattice-data.go
+++ b/core/lattice-data.go
@@ -113,6 +113,8 @@ type latticeData struct {
blockByHash map[common.Hash]*types.Block
// This stores configuration for each round.
configs []*latticeDataConfig
+ // shallowBlocks stores the hash of blocks that their body is not receive yet.
+ shallowBlocks map[common.Hash]types.Position
}
// newLatticeData creates a new latticeData instance.
@@ -126,10 +128,11 @@ func newLatticeData(
genesisConfig.fromConfig(round, config)
genesisConfig.setRoundBeginTime(dMoment)
data = &latticeData{
- db: db,
- chains: make([]*chainStatus, genesisConfig.numChains),
- blockByHash: make(map[common.Hash]*types.Block),
- configs: []*latticeDataConfig{genesisConfig},
+ db: db,
+ chains: make([]*chainStatus, genesisConfig.numChains),
+ blockByHash: make(map[common.Hash]*types.Block),
+ configs: []*latticeDataConfig{genesisConfig},
+ shallowBlocks: make(map[common.Hash]types.Position),
}
for i := range data.chains {
data.chains[i] = &chainStatus{
@@ -141,15 +144,35 @@ func newLatticeData(
return
}
-func (data *latticeData) checkAckingRelations(b *types.Block) error {
+func (data *latticeData) addShallowBlock(hash common.Hash, pos types.Position) {
+ // We don't care other errors here. This `if` is to prevent being spammed by
+ // very old blocks.
+ if _, err := data.findBlock(hash); err != db.ErrBlockDoesNotExist {
+ return
+ }
+ data.shallowBlocks[hash] = pos
+}
+
+func (data *latticeData) checkAckingRelations(
+ b *types.Block, allowShallow bool) error {
acksByChainID := make(map[uint32]struct{}, len(data.chains))
for _, hash := range b.Acks {
bAck, err := data.findBlock(hash)
if err != nil {
if err == db.ErrBlockDoesNotExist {
- return &ErrAckingBlockNotExists{hash}
+ err = &ErrAckingBlockNotExists{hash}
+ if allowShallow {
+ if pos, exist := data.shallowBlocks[hash]; exist {
+ bAck = &types.Block{
+ Position: pos,
+ }
+ err = nil
+ }
+ }
+ }
+ if err != nil {
+ return err
}
- return err
}
// Check if it acks blocks from old rounds, the allowed round difference
// is 1.
@@ -172,7 +195,7 @@ func (data *latticeData) checkAckingRelations(b *types.Block) error {
return nil
}
-func (data *latticeData) sanityCheck(b *types.Block) error {
+func (data *latticeData) sanityCheck(b *types.Block, allowShallow bool) error {
// TODO(mission): Check if its proposer is in validator set, lattice has no
// knowledge about node set.
config := data.getConfig(b.Position.Round)
@@ -196,7 +219,7 @@ func (data *latticeData) sanityCheck(b *types.Block) error {
if !config.isValidGenesisBlockTime(b) {
return ErrIncorrectBlockTime
}
- return data.checkAckingRelations(b)
+ return data.checkAckingRelations(b, allowShallow)
}
// Check parent block if parent hash is specified.
if !b.ParentHash.Equal(common.Hash{}) {
@@ -257,7 +280,7 @@ func (data *latticeData) sanityCheck(b *types.Block) error {
return ErrNotAckParent
}
}
- return data.checkAckingRelations(b)
+ return data.checkAckingRelations(b, allowShallow)
}
// addBlock processes blocks. It does sanity check, inserts block into lattice
@@ -501,19 +524,21 @@ func (data *latticeData) prepareEmptyBlock(b *types.Block) (err error) {
}
// TODO(mission): make more abstraction for this method.
-// nextHeight returns the next height of a chain.
-func (data *latticeData) nextHeight(
- round uint64, chainID uint32) (uint64, error) {
+// nextBlock returns the next height and timestamp of a chain.
+func (data *latticeData) nextBlock(
+ round uint64, chainID uint32) (uint64, time.Time, error) {
chainTip := data.chains[chainID].tip
bindTip, err := data.isBindTip(
types.Position{Round: round, ChainID: chainID}, chainTip)
if err != nil {
- return 0, err
+ return 0, time.Time{}, err
}
+ config := data.getConfig(round)
if bindTip {
- return chainTip.Position.Height + 1, nil
+ return chainTip.Position.Height + 1,
+ chainTip.Timestamp.Add(config.minBlockTimeInterval), nil
}
- return 0, nil
+ return 0, config.roundBeginTime, nil
}
// findBlock seeks blocks in memory or db.
diff --git a/core/lattice-data_test.go b/core/lattice-data_test.go
index 8252361..2e6d684 100644
--- a/core/lattice-data_test.go
+++ b/core/lattice-data_test.go
@@ -193,7 +193,7 @@ func (s *LatticeDataTestSuite) TestSanityCheck() {
)
check := func(expectedErr error, b *types.Block) {
s.hashBlock(b)
- err := data.sanityCheck(b)
+ err := data.sanityCheck(b, false)
req.NotNil(err)
req.IsType(expectedErr, err)
}
@@ -323,7 +323,7 @@ func (s *LatticeDataTestSuite) TestSanityCheck() {
Timestamp: time.Now().UTC().Add(500 * time.Second),
}
s.hashBlock(b11)
- req.NoError(data.sanityCheck(b11))
+ req.NoError(data.sanityCheck(b11, false))
_, err := data.addBlock(b11)
req.NoError(err)
// A block didn't perform round switching.
@@ -340,7 +340,7 @@ func (s *LatticeDataTestSuite) TestSanityCheck() {
// A block with expected new round ID should be OK.
b12.Position.Round = 1
s.hashBlock(b12)
- req.NoError(data.sanityCheck(b12))
+ req.NoError(data.sanityCheck(b12, false))
}
func (s *LatticeDataTestSuite) TestRandomlyGeneratedBlocks() {
@@ -404,7 +404,7 @@ func (s *LatticeDataTestSuite) TestRandomlyGeneratedBlocks() {
req.NoError(err)
revealedHashes = append(revealedHashes, b.Hash)
// Pass blocks to lattice.
- req.NoError(data.sanityCheck(&b))
+ req.NoError(data.sanityCheck(&b, false))
delivered, err = data.addBlock(&b)
req.NoError(err)
for _, b := range delivered {
@@ -550,12 +550,14 @@ func (s *LatticeDataTestSuite) TestPrepareBlock() {
req.Equal(b01.Position.Height, uint64(1))
}
-func (s *LatticeDataTestSuite) TestNextHeight() {
- // Test 'NextHeight' method when lattice is ready.
- data, _ := s.genTestCase1()
- h, err := data.nextHeight(0, 0)
+func (s *LatticeDataTestSuite) TestNextBlock() {
+ // Test 'NextBlock' method when lattice is ready.
+ data, blocks := s.genTestCase1()
+ h, ts, err := data.nextBlock(0, 0)
s.Require().NoError(err)
- s.Require().Equal(h, uint64(4))
+ s.Require().Equal(uint64(4), h)
+ // 2ns of minBlockTime is defined in genTestCase1().
+ s.Require().Equal(blocks[0][3].Timestamp.Add(2*time.Nanosecond), ts)
// Test 'NextHeight' method when lattice is empty.
// Setup a configuration that no restriction on block interval and
// round cutting.
@@ -564,10 +566,12 @@ func (s *LatticeDataTestSuite) TestNextHeight() {
NumChains: 4,
MinBlockInterval: 1 * time.Second,
}
- data = newLatticeData(nil, time.Now().UTC(), 0, genesisConfig)
- h, err = data.nextHeight(0, 0)
+ now := time.Now().UTC()
+ data = newLatticeData(nil, now, 0, genesisConfig)
+ h, ts, err = data.nextBlock(0, 0)
s.Require().NoError(err)
- s.Require().Equal(h, uint64(0))
+ s.Require().Equal(now, ts)
+ s.Require().Equal(uint64(0), h)
}
func (s *LatticeDataTestSuite) TestPrepareEmptyBlock() {
@@ -666,7 +670,7 @@ func (s *LatticeDataTestSuite) TestNumChainsChange() {
req.NoError(err)
s.hashBlock(b)
// Do the actual lattice usage.
- req.NoError(lattice.sanityCheck(b))
+ req.NoError(lattice.sanityCheck(b, false))
d, err := lattice.addBlock(b)
req.NoError(err)
delivered = append(delivered, d...)
diff --git a/core/lattice.go b/core/lattice.go
index 4642820..d531639 100644
--- a/core/lattice.go
+++ b/core/lattice.go
@@ -105,11 +105,19 @@ func (l *Lattice) PrepareEmptyBlock(b *types.Block) (err error) {
return
}
+// AddShallowBlock adds a hash of a block that is confirmed by other nodes but
+// the content is not arrived yet.
+func (l *Lattice) AddShallowBlock(hash common.Hash, pos types.Position) {
+ l.lock.Lock()
+ defer l.lock.Unlock()
+ l.data.addShallowBlock(hash, pos)
+}
+
// SanityCheck checks the validity of a block.
//
// If any acking block of this block does not exist, Lattice caches this block
// and retries when Lattice.ProcessBlock is called.
-func (l *Lattice) SanityCheck(b *types.Block) (err error) {
+func (l *Lattice) SanityCheck(b *types.Block, allowShallow bool) (err error) {
if b.IsEmpty() {
// Only need to verify block's hash.
var hash common.Hash
@@ -137,7 +145,7 @@ func (l *Lattice) SanityCheck(b *types.Block) (err error) {
}
l.lock.RLock()
defer l.lock.RUnlock()
- if err = l.data.sanityCheck(b); err != nil {
+ if err = l.data.sanityCheck(b, allowShallow); err != nil {
if _, ok := err.(*ErrAckingBlockNotExists); ok {
err = ErrRetrySanityCheckLater
}
@@ -178,7 +186,7 @@ func (l *Lattice) addBlockToLattice(
if tip = l.pool.tip(i); tip == nil {
continue
}
- err = l.data.sanityCheck(tip)
+ err = l.data.sanityCheck(tip, false)
if err == nil {
var output []*types.Block
if output, err = l.data.addBlock(tip); err != nil {
@@ -188,6 +196,7 @@ func (l *Lattice) addBlockToLattice(
"block", tip, "error", err)
panic(err)
}
+ delete(l.data.shallowBlocks, tip.Hash)
hasOutput = true
outputBlocks = append(outputBlocks, output...)
l.pool.removeTip(i)
@@ -272,12 +281,13 @@ func (l *Lattice) ProcessBlock(
return
}
-// NextHeight returns expected height of incoming block for specified chain and
-// given round.
-func (l *Lattice) NextHeight(round uint64, chainID uint32) (uint64, error) {
+// NextBlock returns expected height and timestamp of incoming block for
+// specified chain and given round.
+func (l *Lattice) NextBlock(round uint64, chainID uint32) (
+ uint64, time.Time, error) {
l.lock.RLock()
defer l.lock.RUnlock()
- return l.data.nextHeight(round, chainID)
+ return l.data.nextBlock(round, chainID)
}
// PurgeBlocks purges blocks' cache in memory, this is called when the caller
diff --git a/core/lattice_test.go b/core/lattice_test.go
index 99723d6..488bef5 100644
--- a/core/lattice_test.go
+++ b/core/lattice_test.go
@@ -55,7 +55,7 @@ func (mgr *testLatticeMgr) processBlock(b *types.Block) (err error) {
var (
delivered []*types.Block
)
- if err = mgr.lattice.SanityCheck(b); err != nil {
+ if err = mgr.lattice.SanityCheck(b, false); err != nil {
if err == ErrRetrySanityCheckLater {
err = nil
} else {
@@ -235,12 +235,12 @@ func (s *LatticeTestSuite) TestSanityCheck() {
Timestamp: time.Now().UTC(),
}
req.NoError(signer.SignBlock(b))
- req.NoError(lattice.SanityCheck(b))
+ req.NoError(lattice.SanityCheck(b, false))
// A block with incorrect signature should not pass sanity check.
otherPrvKey, err := ecdsa.NewPrivateKey()
req.NoError(err)
b.Signature, err = otherPrvKey.Sign(common.NewRandomHash())
- req.Equal(lattice.SanityCheck(b), ErrIncorrectSignature)
+ req.Equal(lattice.SanityCheck(b, false), ErrIncorrectSignature)
// A block with un-sorted acks should not pass sanity check.
b.Acks = common.NewSortedHashes(common.Hashes{
common.NewRandomHash(),
@@ -251,10 +251,10 @@ func (s *LatticeTestSuite) TestSanityCheck() {
})
b.Acks[0], b.Acks[1] = b.Acks[1], b.Acks[0]
req.NoError(signer.SignBlock(b))
- req.Equal(lattice.SanityCheck(b), ErrAcksNotSorted)
+ req.Equal(lattice.SanityCheck(b, false), ErrAcksNotSorted)
// A block with incorrect hash should not pass sanity check.
b.Hash = common.NewRandomHash()
- req.Equal(lattice.SanityCheck(b), ErrIncorrectHash)
+ req.Equal(lattice.SanityCheck(b, false), ErrIncorrectHash)
}
func TestLattice(t *testing.T) {
diff --git a/core/test/state.go b/core/test/state.go
index e943a8a..a5a285b 100644
--- a/core/test/state.go
+++ b/core/test/state.go
@@ -133,7 +133,7 @@ func NewState(
lambdaBA: lambda,
lambdaDKG: lambda * 10,
roundInterval: lambda * 10000,
- minBlockInterval: time.Millisecond * 1,
+ minBlockInterval: 4 * lambda,
crs: []common.Hash{genesisCRS},
nodes: nodes,
phiRatio: 0.667,
diff --git a/core/ticker.go b/core/ticker.go
index f8d0c67..ffd5ab4 100644
--- a/core/ticker.go
+++ b/core/ticker.go
@@ -36,12 +36,16 @@ const (
// defaultTicker is a wrapper to implement ticker interface based on
// time.Ticker.
type defaultTicker struct {
- ticker *time.Ticker
+ ticker *time.Ticker
+ duration time.Duration
}
// newDefaultTicker constructs an defaultTicker instance by giving an interval.
func newDefaultTicker(lambda time.Duration) *defaultTicker {
- return &defaultTicker{ticker: time.NewTicker(lambda)}
+ return &defaultTicker{
+ ticker: time.NewTicker(lambda),
+ duration: lambda,
+ }
}
// Tick implements Tick method of ticker interface.
@@ -54,6 +58,12 @@ func (t *defaultTicker) Stop() {
t.ticker.Stop()
}
+// Restart implements Stop method of ticker interface.
+func (t *defaultTicker) Restart() {
+ t.ticker.Stop()
+ t.ticker = time.NewTicker(t.duration)
+}
+
// newTicker is a helper to setup a ticker by giving an Governance. If
// the governace object implements a ticker generator, a ticker from that
// generator would be returned, else constructs a default one.
diff --git a/core/utils/nodeset-cache.go b/core/utils/nodeset-cache.go
index 35828b7..8a07c9d 100644
--- a/core/utils/nodeset-cache.go
+++ b/core/utils/nodeset-cache.go
@@ -247,7 +247,7 @@ func (cache *NodeSetCache) update(
nIDs.notarySet[i] = nodeSet.GetSubSet(
int(cfg.NotarySetSize), types.NewNotarySetTarget(crs, uint32(i)))
}
- nodesPerChain := cfg.RoundInterval / (cfg.LambdaBA * 4)
+ nodesPerChain := cfg.RoundInterval / cfg.MinBlockInterval
for i := range nIDs.leaderNode {
nIDs.leaderNode[i] = make(map[uint64]types.NodeID, nodesPerChain)
}
diff --git a/core/utils/nodeset-cache_test.go b/core/utils/nodeset-cache_test.go
index 9e6ceee..c600f15 100644
--- a/core/utils/nodeset-cache_test.go
+++ b/core/utils/nodeset-cache_test.go
@@ -36,11 +36,12 @@ type nsIntf struct {
func (g *nsIntf) Configuration(round uint64) (cfg *types.Config) {
return &types.Config{
- NotarySetSize: 7,
- DKGSetSize: 7,
- NumChains: 4,
- LambdaBA: 250 * time.Millisecond,
- RoundInterval: 60 * time.Second,
+ NotarySetSize: 7,
+ DKGSetSize: 7,
+ NumChains: 4,
+ LambdaBA: 250 * time.Millisecond,
+ RoundInterval: 60 * time.Second,
+ MinBlockInterval: 1 * time.Second,
}
}
func (g *nsIntf) CRS(round uint64) (b common.Hash) { return g.crs }
diff --git a/integration_test/node.go b/integration_test/node.go
index 2d0ca9f..c2bb806 100644
--- a/integration_test/node.go
+++ b/integration_test/node.go
@@ -228,7 +228,7 @@ func (n *Node) processBlock(b *types.Block) (events []*test.Event, err error) {
)
updated := false
for _, p := range n.pendings {
- if tmpErr = n.lattice.SanityCheck(p); tmpErr != nil {
+ if tmpErr = n.lattice.SanityCheck(p, false); tmpErr != nil {
if tmpErr == core.ErrRetrySanityCheckLater {
newPendings = append(newPendings, p)
} else {