aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJimmy Hu <jimmy.hu@dexon.org>2018-12-27 09:17:28 +0800
committerWei-Ning Huang <w@dexon.org>2018-12-28 14:15:39 +0800
commit50a0622f1797096a0836246177f41f77cb4b0e8b (patch)
tree3a7ce1407fd555365733bb76c456b19aef37f126
parentbc18f6b29086e1a98590d41b2c1e362505af49aa (diff)
downloaddexon-50a0622f1797096a0836246177f41f77cb4b0e8b.tar
dexon-50a0622f1797096a0836246177f41f77cb4b0e8b.tar.gz
dexon-50a0622f1797096a0836246177f41f77cb4b0e8b.tar.bz2
dexon-50a0622f1797096a0836246177f41f77cb4b0e8b.tar.lz
dexon-50a0622f1797096a0836246177f41f77cb4b0e8b.tar.xz
dexon-50a0622f1797096a0836246177f41f77cb4b0e8b.tar.zst
dexon-50a0622f1797096a0836246177f41f77cb4b0e8b.zip
dex: add pull randomness (#105)
* vendor: sync to latest core * dex: Add PullRandomness
-rw-r--r--dex/cache.go62
-rw-r--r--dex/cache_test.go90
-rw-r--r--dex/handler.go27
-rw-r--r--dex/network.go11
-rw-r--r--dex/peer.go20
-rw-r--r--dex/protocol.go7
-rw-r--r--vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-mgr.go2
-rw-r--r--vendor/github.com/dexon-foundation/dexon-consensus/core/compaction-chain.go12
-rw-r--r--vendor/github.com/dexon-foundation/dexon-consensus/core/configuration-chain.go4
-rw-r--r--vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go203
-rw-r--r--vendor/github.com/dexon-foundation/dexon-consensus/core/interfaces.go3
-rw-r--r--vendor/github.com/dexon-foundation/dexon-consensus/core/lattice.go17
-rw-r--r--vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/agreement.go4
-rw-r--r--vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/consensus.go12
-rw-r--r--vendor/github.com/dexon-foundation/dexon-consensus/core/types/block-randomness.go8
-rw-r--r--vendor/vendor.json46
16 files changed, 361 insertions, 167 deletions
diff --git a/dex/cache.go b/dex/cache.go
index 89bbbe3be..bdc22e114 100644
--- a/dex/cache.go
+++ b/dex/cache.go
@@ -44,21 +44,23 @@ func voteToKey(vote *coreTypes.Vote) voteKey {
}
type cache struct {
- lock sync.RWMutex
- blockCache map[coreCommon.Hash]*coreTypes.Block
- voteCache map[coreTypes.Position]map[voteKey]*coreTypes.Vote
- votePosition []coreTypes.Position
- db coreDb.Database
- voteSize int
- size int
+ lock sync.RWMutex
+ blockCache map[coreCommon.Hash]*coreTypes.Block
+ voteCache map[coreTypes.Position]map[voteKey]*coreTypes.Vote
+ randomnessCache map[coreCommon.Hash]*coreTypes.BlockRandomnessResult
+ votePosition []coreTypes.Position
+ db coreDb.Database
+ voteSize int
+ size int
}
func newCache(size int, db coreDb.Database) *cache {
return &cache{
- blockCache: make(map[coreCommon.Hash]*coreTypes.Block),
- voteCache: make(map[coreTypes.Position]map[voteKey]*coreTypes.Vote),
- db: db,
- size: size,
+ blockCache: make(map[coreCommon.Hash]*coreTypes.Block),
+ voteCache: make(map[coreTypes.Position]map[voteKey]*coreTypes.Vote),
+ randomnessCache: make(map[coreCommon.Hash]*coreTypes.BlockRandomnessResult),
+ db: db,
+ size: size,
}
}
@@ -126,3 +128,41 @@ func (c *cache) blocks(hashes coreCommon.Hashes) []*coreTypes.Block {
}
return cacheBlocks
}
+
+func (c *cache) addRandomness(rand *coreTypes.BlockRandomnessResult) {
+ c.lock.Lock()
+ defer c.lock.Unlock()
+ if len(c.randomnessCache) >= c.size {
+ // Randomly delete one entry.
+ for k := range c.randomnessCache {
+ delete(c.randomnessCache, k)
+ break
+ }
+ }
+ c.randomnessCache[rand.BlockHash] = rand
+}
+
+func (c *cache) randomness(hashes coreCommon.Hashes) []*coreTypes.BlockRandomnessResult {
+ c.lock.RLock()
+ defer c.lock.RUnlock()
+ cacheRandomnesss := make([]*coreTypes.BlockRandomnessResult, 0, len(hashes))
+ for _, hash := range hashes {
+ if block, exist := c.randomnessCache[hash]; exist {
+ cacheRandomnesss = append(cacheRandomnesss, block)
+ } else {
+ block, err := c.db.GetBlock(hash)
+ if err != nil {
+ continue
+ }
+ if len(block.Finalization.Randomness) == 0 {
+ continue
+ }
+ cacheRandomnesss = append(cacheRandomnesss, &coreTypes.BlockRandomnessResult{
+ BlockHash: block.Hash,
+ Position: block.Position,
+ Randomness: block.Finalization.Randomness,
+ })
+ }
+ }
+ return cacheRandomnesss
+}
diff --git a/dex/cache_test.go b/dex/cache_test.go
index 3b43e77aa..536e015f0 100644
--- a/dex/cache_test.go
+++ b/dex/cache_test.go
@@ -18,6 +18,7 @@
package dex
import (
+ "math/rand"
"sort"
"strings"
"testing"
@@ -203,3 +204,92 @@ func TestCacheBlock(t *testing.T) {
}
}
}
+
+func randomBytes() []byte {
+ bytes := make([]byte, 32)
+ for i := range bytes {
+ bytes[i] = byte(rand.Int() % 256)
+ }
+ return bytes
+}
+
+func TestCacheRandomness(t *testing.T) {
+ db, err := coreDb.NewMemBackedDB()
+ if err != nil {
+ panic(err)
+ }
+ cache := newCache(3, db)
+ rand1 := &coreTypes.BlockRandomnessResult{
+ BlockHash: coreCommon.NewRandomHash(),
+ Randomness: randomBytes(),
+ }
+ rand2 := &coreTypes.BlockRandomnessResult{
+ BlockHash: coreCommon.NewRandomHash(),
+ Randomness: randomBytes(),
+ }
+ rand3 := &coreTypes.BlockRandomnessResult{
+ BlockHash: coreCommon.NewRandomHash(),
+ Randomness: randomBytes(),
+ }
+ rand4 := &coreTypes.BlockRandomnessResult{
+ BlockHash: coreCommon.NewRandomHash(),
+ Randomness: randomBytes(),
+ }
+ cache.addRandomness(rand1)
+ cache.addRandomness(rand2)
+ cache.addRandomness(rand3)
+
+ hashes := coreCommon.Hashes{rand1.BlockHash, rand2.BlockHash, rand3.BlockHash, rand4.BlockHash}
+ hashMap := map[coreCommon.Hash]struct{}{
+ rand1.BlockHash: {},
+ rand2.BlockHash: {},
+ rand3.BlockHash: {},
+ }
+ rands := cache.randomness(hashes)
+ if len(rands) != 3 {
+ t.Errorf("fail to get rands: have %d, want 3", len(rands))
+ }
+ for _, rand := range rands {
+ if _, exist := hashMap[rand.BlockHash]; !exist {
+ t.Errorf("get wrong rand: have %s, want %v", rand, hashMap)
+ }
+ }
+
+ cache.addRandomness(rand4)
+
+ rands = cache.randomness(hashes)
+ hashMap[rand4.BlockHash] = struct{}{}
+ if len(rands) != 3 {
+ t.Errorf("fail to get rands: have %d, want 3", len(rands))
+ }
+ hasNewRandomness := false
+ for _, rand := range rands {
+ if _, exist := hashMap[rand.BlockHash]; !exist {
+ t.Errorf("get wrong rand: have %s, want %v", rand, hashMap)
+ }
+ if rand.BlockHash.Equal(rand4.BlockHash) {
+ hasNewRandomness = true
+ }
+ }
+ if !hasNewRandomness {
+ t.Errorf("expect rand %s in cache, have %v", rand4, rands)
+ }
+
+ block := &coreTypes.Block{
+ Hash: coreCommon.NewRandomHash(),
+ Finalization: coreTypes.FinalizationResult{
+ Randomness: randomBytes(),
+ },
+ }
+ if err := db.PutBlock(*block); err != nil {
+ panic(err)
+ }
+ rands = cache.randomness(coreCommon.Hashes{block.Hash})
+ if len(rands) != 1 {
+ t.Errorf("fail to get rands: have %d, want 1", len(rands))
+ } else {
+ if !rands[0].BlockHash.Equal(block.Hash) {
+ t.Errorf("get wrong rand: have %s, want %s", rands[0], block)
+ }
+ }
+}
diff --git a/dex/handler.go b/dex/handler.go
index 9956bd1c0..e117eff3e 100644
--- a/dex/handler.go
+++ b/dex/handler.go
@@ -896,6 +896,21 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
return err
}
}
+ case msg.Code == PullRandomnessMsg:
+ if !pm.isBlockProposer {
+ break
+ }
+ var hashes coreCommon.Hashes
+ if err := msg.Decode(&hashes); err != nil {
+ return errResp(ErrDecode, "msg %v: %v", msg, err)
+ }
+ randomness := pm.cache.randomness(hashes)
+ log.Debug("Push randomness", "randomness", randomness)
+ for _, randomness := range randomness {
+ if err := p.SendRandomness(randomness); err != nil {
+ return err
+ }
+ }
case msg.Code == GetGovStateMsg:
var hash common.Hash
if err := msg.Decode(&hash); err != nil {
@@ -1034,6 +1049,7 @@ func (pm *ProtocolManager) BroadcastAgreementResult(
func (pm *ProtocolManager) BroadcastRandomnessResult(
randomness *coreTypes.BlockRandomnessResult) {
+ pm.cache.addRandomness(randomness)
// send to notary nodes first (direct)
label := peerLabel{
set: notaryset,
@@ -1114,6 +1130,17 @@ func (pm *ProtocolManager) BroadcastPullVotes(
}
}
+func (pm *ProtocolManager) BroadcastPullRandomness(
+ hashes coreCommon.Hashes) {
+ // TODO(jimmy-dexon): pull from dkg set only.
+ for idx, peer := range pm.peers.Peers() {
+ if idx >= maxPullPeers {
+ break
+ }
+ peer.AsyncSendPullRandomness(hashes)
+ }
+}
+
func (pm *ProtocolManager) txBroadcastLoop() {
queueSizeMax := common.StorageSize(100 * 1024) // 100 KB
currentSize := common.StorageSize(0)
diff --git a/dex/network.go b/dex/network.go
index 38ee614ad..c5f81782d 100644
--- a/dex/network.go
+++ b/dex/network.go
@@ -34,6 +34,9 @@ func NewDexconNetwork(pm *ProtocolManager) *DexconNetwork {
// PullBlocks tries to pull blocks from the DEXON network.
func (n *DexconNetwork) PullBlocks(hashes coreCommon.Hashes) {
+ if len(hashes) == 0 {
+ return
+ }
n.pm.BroadcastPullBlocks(hashes)
}
@@ -42,6 +45,14 @@ func (n *DexconNetwork) PullVotes(pos types.Position) {
n.pm.BroadcastPullVotes(pos)
}
+// PullRandomness tries to pull randomness result from the DEXON network.
+func (n *DexconNetwork) PullRandomness(hashes coreCommon.Hashes) {
+ if len(hashes) == 0 {
+ return
+ }
+ n.pm.BroadcastPullRandomness(hashes)
+}
+
// BroadcastVote broadcasts vote to all nodes in DEXON network.
func (n *DexconNetwork) BroadcastVote(vote *types.Vote) {
n.pm.BroadcastVote(vote)
diff --git a/dex/peer.go b/dex/peer.go
index 49a9b64f8..aecf9dc7c 100644
--- a/dex/peer.go
+++ b/dex/peer.go
@@ -100,6 +100,7 @@ const (
maxQueuedDKGParitialSignature = 16
maxQueuedPullBlocks = 128
maxQueuedPullVotes = 128
+ maxQueuedPullRandomness = 128
handshakeTimeout = 5 * time.Second
@@ -160,6 +161,7 @@ type peer struct {
queuedDKGPartialSignatures chan *dkgTypes.PartialSignature
queuedPullBlocks chan coreCommon.Hashes
queuedPullVotes chan coreTypes.Position
+ queuedPullRandomness chan coreCommon.Hashes
term chan struct{} // Termination channel to stop the broadcaster
}
@@ -190,6 +192,7 @@ func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
queuedDKGPartialSignatures: make(chan *dkgTypes.PartialSignature, maxQueuedDKGParitialSignature),
queuedPullBlocks: make(chan coreCommon.Hashes, maxQueuedPullBlocks),
queuedPullVotes: make(chan coreTypes.Position, maxQueuedPullVotes),
+ queuedPullRandomness: make(chan coreCommon.Hashes, maxQueuedPullRandomness),
term: make(chan struct{}),
}
}
@@ -257,6 +260,11 @@ func (p *peer) broadcast() {
return
}
p.Log().Trace("Pulling Votes", "position", pos)
+ case hashes := <-p.queuedPullRandomness:
+ if err := p.SendPullRandomness(hashes); err != nil {
+ return
+ }
+ p.Log().Trace("Pulling Randomness", "hashes", hashes)
case <-p.term:
return
case <-time.After(100 * time.Millisecond):
@@ -530,6 +538,18 @@ func (p *peer) AsyncSendPullVotes(pos coreTypes.Position) {
}
}
+func (p *peer) SendPullRandomness(hashes coreCommon.Hashes) error {
+ return p2p.Send(p.rw, PullRandomnessMsg, hashes)
+}
+
+func (p *peer) AsyncSendPullRandomness(hashes coreCommon.Hashes) {
+ select {
+ case p.queuedPullRandomness <- hashes:
+ default:
+ p.Log().Debug("Dropping Pull Randomness")
+ }
+}
+
// SendBlockHeaders sends a batch of block headers to the remote peer.
func (p *peer) SendBlockHeaders(headers []*types.HeaderWithGovState) error {
return p2p.Send(p.rw, BlockHeadersMsg, headers)
diff --git a/dex/protocol.go b/dex/protocol.go
index c63dd78ca..423b35272 100644
--- a/dex/protocol.go
+++ b/dex/protocol.go
@@ -59,7 +59,7 @@ var ProtocolName = "dex"
var ProtocolVersions = []uint{dex64}
// ProtocolLengths are the number of implemented message corresponding to different protocol versions.
-var ProtocolLengths = []uint64{42}
+var ProtocolLengths = []uint64{43}
const ProtocolMaxMsgSize = 10 * 1024 * 1024 // Maximum cap on the size of a protocol message
@@ -92,9 +92,10 @@ const (
DKGPartialSignatureMsg = 0x25
PullBlocksMsg = 0x26
PullVotesMsg = 0x27
+ PullRandomnessMsg = 0x28
- GetGovStateMsg = 0x28
- GovStateMsg = 0x29
+ GetGovStateMsg = 0x29
+ GovStateMsg = 0x2a
)
type errCode int
diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-mgr.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-mgr.go
index a9fa21df2..2b5c4bc51 100644
--- a/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-mgr.go
+++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-mgr.go
@@ -313,7 +313,7 @@ func (mgr *agreementMgr) runBA(initRound uint64, chainID uint32) {
if config = mgr.getConfig(nextRound); config != nil {
break
} else {
- mgr.logger.Info("round is not ready", "round", nextRound)
+ mgr.logger.Debug("round is not ready", "round", nextRound)
time.Sleep(1 * time.Second)
}
}
diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/compaction-chain.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/compaction-chain.go
index 14e3b265d..8e044293f 100644
--- a/vendor/github.com/dexon-foundation/dexon-consensus/core/compaction-chain.go
+++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/compaction-chain.go
@@ -257,3 +257,15 @@ func (cc *compactionChain) lastPendingBlock() *types.Block {
}
return nil
}
+
+func (cc *compactionChain) pendingBlocksWithoutRandomness() (
+ hashes common.Hashes) {
+ cc.lock.RLock()
+ defer cc.lock.RUnlock()
+ for _, block := range cc.pendingBlocks {
+ if _, exist := cc.blockRandomness[block.Hash]; !exist {
+ hashes = append(hashes, block.Hash)
+ }
+ }
+ return
+}
diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/configuration-chain.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/configuration-chain.go
index ad24e446d..5c389a70f 100644
--- a/vendor/github.com/dexon-foundation/dexon-consensus/core/configuration-chain.go
+++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/configuration-chain.go
@@ -135,7 +135,7 @@ func (cc *configurationChain) runDKG(round uint64) error {
}
cc.logger.Debug("Calling Governance.IsDKGMPKReady", "round", round)
for !cc.gov.IsDKGMPKReady(round) {
- cc.logger.Info("DKG MPKs are not ready yet. Try again later...",
+ cc.logger.Debug("DKG MPKs are not ready yet. Try again later...",
"nodeID", cc.ID)
cc.dkgLock.Unlock()
time.Sleep(500 * time.Millisecond)
@@ -206,7 +206,7 @@ func (cc *configurationChain) runDKG(round uint64) error {
// unexpected network fluctuation and ensure the robustness of DKG protocol.
cc.logger.Debug("Calling Governance.IsDKGFinal", "round", round)
for !cc.gov.IsDKGFinal(round) {
- cc.logger.Info("DKG is not ready yet. Try again later...",
+ cc.logger.Debug("DKG is not ready yet. Try again later...",
"nodeID", cc.ID)
time.Sleep(500 * time.Millisecond)
}
diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go
index 0d4a38a91..3353d1d60 100644
--- a/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go
+++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go
@@ -387,6 +387,7 @@ type Consensus struct {
event *common.Event
logger common.Logger
nonFinalizedBlockDelivered bool
+ resetRandomnessTicker chan struct{}
}
// NewConsensus construct an Consensus instance.
@@ -398,8 +399,8 @@ func NewConsensus(
network Network,
prv crypto.PrivateKey,
logger common.Logger) *Consensus {
-
- return newConsensus(dMoment, app, gov, db, network, prv, logger, true)
+ return newConsensusForRound(
+ &types.Block{}, dMoment, app, gov, db, network, prv, logger, nil, true)
}
// NewConsensusForSimulation creates an instance of Consensus for simulation,
@@ -412,19 +413,60 @@ func NewConsensusForSimulation(
network Network,
prv crypto.PrivateKey,
logger common.Logger) *Consensus {
+ return newConsensusForRound(
+ &types.Block{}, dMoment, app, gov, db, network, prv, logger, nil, false)
+}
- return newConsensus(dMoment, app, gov, db, network, prv, logger, false)
+// NewConsensusFromSyncer constructs an Consensus instance from information
+// provided from syncer.
+//
+// You need to provide the initial block for this newly created Consensus
+// instance to bootstrap with. A proper choice is the last finalized block you
+// delivered to syncer.
+func NewConsensusFromSyncer(
+ initBlock *types.Block,
+ initRoundBeginTime time.Time,
+ app Application,
+ gov Governance,
+ db db.Database,
+ networkModule Network,
+ prv crypto.PrivateKey,
+ latticeModule *Lattice,
+ blocks []*types.Block,
+ randomnessResults []*types.BlockRandomnessResult,
+ logger common.Logger) (*Consensus, error) {
+ // Setup Consensus instance.
+ con := newConsensusForRound(initBlock, initRoundBeginTime, app, gov, db,
+ networkModule, prv, logger, latticeModule, true)
+ // Dump all BA-confirmed blocks to the consensus instance.
+ for _, b := range blocks {
+ con.ccModule.registerBlock(b)
+ if err := con.processBlock(b); err != nil {
+ return nil, err
+ }
+ }
+ // Dump all randomness result to the consensus instance.
+ for _, r := range randomnessResults {
+ if err := con.ProcessBlockRandomnessResult(r, false); err != nil {
+ con.logger.Error("failed to process randomness result when syncing",
+ "result", r)
+ continue
+ }
+ }
+ return con, nil
}
// newConsensus creates a Consensus instance.
-func newConsensus(
- dMoment time.Time,
+func newConsensusForRound(
+ initBlock *types.Block,
+ initRoundBeginTime time.Time,
app Application,
gov Governance,
db db.Database,
network Network,
prv crypto.PrivateKey,
logger common.Logger,
+ latticeModule *Lattice,
usingNonBlocking bool) *Consensus {
// TODO(w): load latest blockHeight from DB, and use config at that height.
@@ -436,12 +478,14 @@ func newConsensus(
if a, ok := app.(Debug); ok {
debugApp = a
}
- // Get configuration for genesis round.
- var round uint64
- config := utils.GetConfigWithPanic(gov, round, logger)
+ // Get configuration for bootstrap round.
+ initRound := initBlock.Position.Round
+ initConfig := utils.GetConfigWithPanic(gov, initRound, logger)
// Init lattice.
- lattice := NewLattice(
- dMoment, round, config, signer, app, debugApp, db, logger)
+ if latticeModule == nil {
+ latticeModule = NewLattice(initRoundBeginTime, initRound, initConfig,
+ signer, app, debugApp, db, logger)
+ }
// Init configuration chain.
ID := types.NewNodeID(prv.PublicKey())
recv := &consensusDKGReceiver{
@@ -452,13 +496,7 @@ func newConsensus(
network: network,
logger: logger,
}
- cfgModule := newConfigurationChain(
- ID,
- recv,
- gov,
- nodeSetCache,
- db,
- logger)
+ cfgModule := newConfigurationChain(ID, recv, gov, nodeSetCache, db, logger)
recv.cfgModule = cfgModule
appModule := app
if usingNonBlocking {
@@ -468,7 +506,7 @@ func newConsensus(
con := &Consensus{
ID: ID,
ccModule: newCompactionChain(gov),
- lattice: lattice,
+ lattice: latticeModule,
app: appModule,
debugApp: debugApp,
gov: gov,
@@ -477,77 +515,6 @@ func newConsensus(
baConfirmedBlock: make(map[common.Hash]chan<- *types.Block),
dkgReady: sync.NewCond(&sync.Mutex{}),
cfgModule: cfgModule,
- dMoment: dMoment,
- nodeSetCache: nodeSetCache,
- signer: signer,
- event: common.NewEvent(),
- logger: logger,
- }
- con.ctx, con.ctxCancel = context.WithCancel(context.Background())
- con.baMgr = newAgreementMgr(con, round, dMoment)
- if err := con.prepare(&types.Block{}); err != nil {
- panic(err)
- }
- return con
-}
-
-// NewConsensusFromSyncer constructs an Consensus instance from information
-// provided from syncer.
-//
-// You need to provide the initial block for this newly created Consensus
-// instance to bootstrap with. A proper choice is the last finalized block you
-// delivered to syncer.
-func NewConsensusFromSyncer(
- initBlock *types.Block,
- initRoundBeginTime time.Time,
- app Application,
- gov Governance,
- db db.Database,
- networkModule Network,
- prv crypto.PrivateKey,
- latticeModule *Lattice,
- blocks []*types.Block,
- randomnessResults []*types.BlockRandomnessResult,
- logger common.Logger) (*Consensus, error) {
- // Setup the cache for node sets.
- nodeSetCache := utils.NewNodeSetCache(gov)
- // Setup signer module.
- signer := utils.NewSigner(prv)
- // Init configuration chain.
- ID := types.NewNodeID(prv.PublicKey())
- recv := &consensusDKGReceiver{
- ID: ID,
- gov: gov,
- signer: signer,
- nodeSetCache: nodeSetCache,
- network: networkModule,
- logger: logger,
- }
- cfgModule := newConfigurationChain(
- ID,
- recv,
- gov,
- nodeSetCache,
- db,
- logger)
- recv.cfgModule = cfgModule
- // Check if the application implement Debug interface.
- var debugApp Debug
- if a, ok := app.(Debug); ok {
- debugApp = a
- }
- // Setup Consensus instance.
- con := &Consensus{
- ID: ID,
- ccModule: newCompactionChain(gov),
- lattice: latticeModule,
- app: newNonBlocking(app, debugApp),
- gov: gov,
- db: db,
- network: networkModule,
- baConfirmedBlock: make(map[common.Hash]chan<- *types.Block),
- dkgReady: sync.NewCond(&sync.Mutex{}),
- cfgModule: cfgModule,
dMoment: initRoundBeginTime,
nodeSetCache: nodeSetCache,
signer: signer,
@@ -555,27 +522,11 @@ func NewConsensusFromSyncer(
logger: logger,
}
con.ctx, con.ctxCancel = context.WithCancel(context.Background())
- con.baMgr = newAgreementMgr(con, initBlock.Position.Round, initRoundBeginTime)
- // Bootstrap the consensus instance.
+ con.baMgr = newAgreementMgr(con, initRound, initRoundBeginTime)
if err := con.prepare(initBlock); err != nil {
- return nil, err
- }
- // Dump all BA-confirmed blocks to the consensus instance.
- for _, b := range blocks {
- con.ccModule.registerBlock(b)
- if err := con.processBlock(b); err != nil {
- return nil, err
- }
- }
- // Dump all randomness result to the consensus instance.
- for _, r := range randomnessResults {
- if err := con.ProcessBlockRandomnessResult(r, false); err != nil {
- con.logger.Error("failed to process randomness result when syncing",
- "result", r)
- continue
- }
+ panic(err)
}
- return con, nil
+ return con
}
// prepare the Consensus instance to be ready for blocks after 'initBlock'.
@@ -634,6 +585,9 @@ func (con *Consensus) Run() {
go con.processMsg(con.network.ReceiveChan())
// Sleep until dMoment come.
time.Sleep(con.dMoment.Sub(time.Now().UTC()))
+ // Take some time to bootstrap.
+ time.Sleep(3 * time.Second)
+ go con.pullRandomness()
// Block until done.
select {
case <-con.ctx.Done():
@@ -673,7 +627,7 @@ func (con *Consensus) runCRS(round uint64) {
con.logger.Debug("Calling Governance.CRS to check if already proposed",
"round", round+1)
if (con.gov.CRS(round+1) != common.Hash{}) {
- con.logger.Info("CRS already proposed", "round", round+1)
+ con.logger.Debug("CRS already proposed", "round", round+1)
return
}
con.logger.Debug("Calling Governance.IsDKGFinal to check if ready to run CRS",
@@ -744,7 +698,7 @@ func (con *Consensus) initialRound(
if (nextCRS != common.Hash{}) {
return true
}
- con.logger.Info("CRS is not ready yet. Try again later...",
+ con.logger.Debug("CRS is not ready yet. Try again later...",
"nodeID", con.ID,
"round", round)
return false
@@ -757,7 +711,7 @@ func (con *Consensus) initialRound(
go func(nextRound uint64) {
if !checkWithCancel(
con.ctx, 500*time.Millisecond, checkCRS(nextRound)) {
- con.logger.Info("unable to prepare CRS for baMgr",
+ con.logger.Debug("unable to prepare CRS for baMgr",
"round", nextRound)
return
}
@@ -781,7 +735,7 @@ func (con *Consensus) initialRound(
// unexpected network fluctuation and ensure the robustness.
if !checkWithCancel(
con.ctx, 500*time.Millisecond, checkCRS(nextRound)) {
- con.logger.Info("unable to prepare CRS for DKG set",
+ con.logger.Debug("unable to prepare CRS for DKG set",
"round", nextRound)
return
}
@@ -1034,9 +988,7 @@ func (con *Consensus) ProcessBlockRandomnessResult(
}
if needBroadcast {
con.logger.Debug("Calling Network.BroadcastRandomnessResult",
- "hash", rand.BlockHash.String()[:6],
- "position", &rand.Position,
- "randomness", hex.EncodeToString(rand.Randomness))
+ "randomness", rand)
con.network.BroadcastRandomnessResult(rand)
}
return con.deliverFinalizedBlocks()
@@ -1051,8 +1003,27 @@ func (con *Consensus) preProcessBlock(b *types.Block) (err error) {
return
}
+func (con *Consensus) pullRandomness() {
+ for {
+ select {
+ case <-con.ctx.Done():
+ return
+ case <-con.resetRandomnessTicker:
+ case <-time.After(1500 * time.Millisecond):
+ // TODO(jimmy): pulling period should be related to lambdaBA.
+ hashes := con.ccModule.pendingBlocksWithoutRandomness()
+ con.logger.Debug("Calling Network.PullRandomness", "blocks", hashes)
+ con.network.PullRandomness(hashes)
+ }
+ }
+}
+
// deliverBlock deliver a block to application layer.
func (con *Consensus) deliverBlock(b *types.Block) {
+ select {
+ case con.resetRandomnessTicker <- struct{}{}:
+ default:
+ }
if err := con.db.UpdateBlock(*b); err != nil {
panic(err)
}
@@ -1134,7 +1105,7 @@ func (con *Consensus) processBlock(block *types.Block) (err error) {
if con.nonFinalizedBlockDelivered {
panic(fmt.Errorf("attempting to skip finalized block: %s", b))
}
- con.logger.Info("skip delivery of finalized block",
+ con.logger.Debug("skip delivery of finalized block",
"block", b,
"finalization-height", b.Finalization.Height)
continue
diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/interfaces.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/interfaces.go
index fc3bf09bc..20770328c 100644
--- a/vendor/github.com/dexon-foundation/dexon-consensus/core/interfaces.go
+++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/interfaces.go
@@ -67,6 +67,9 @@ type Network interface {
// PullVotes tries to pull votes from the DEXON network.
PullVotes(position types.Position)
+ // PullRandomness tries to pull randomness from the DEXON network.
+ PullRandomness(hashes common.Hashes)
+
// BroadcastVote broadcasts vote to all nodes in DEXON network.
BroadcastVote(vote *types.Vote)
diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/lattice.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/lattice.go
index db19cf910..591c63dfd 100644
--- a/vendor/github.com/dexon-foundation/dexon-consensus/core/lattice.go
+++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/lattice.go
@@ -193,17 +193,28 @@ func (l *Lattice) addBlockToLattice(
if err == nil {
var output []*types.Block
if output, err = l.data.addBlock(tip); err != nil {
- l.logger.Error("Sanity Check failed", "error", err)
- continue
+ // We should be able to add this block once sanity check
+ // passed.
+ l.logger.Error("Failed to add sanity-checked block",
+ "block", tip, "error", err)
+ panic(err)
}
hasOutput = true
outputBlocks = append(outputBlocks, output...)
+ l.pool.removeTip(i)
+ continue
}
if _, ok := err.(*ErrAckingBlockNotExists); ok {
+ l.logger.Debug("Pending block for lattice",
+ "pending", tip,
+ "last", l.data.chains[tip.Position.ChainID])
err = nil
continue
+ } else {
+ l.logger.Error("Unexpected sanity check error",
+ "block", tip, "error", err)
+ panic(err)
}
- l.pool.removeTip(i)
}
if !hasOutput {
break
diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/agreement.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/agreement.go
index fee462442..32ea6547a 100644
--- a/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/agreement.go
+++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/agreement.go
@@ -106,7 +106,7 @@ func (a *agreement) processBlock(b *types.Block) {
func (a *agreement) processAgreementResult(r *types.AgreementResult) {
// Cache those results that CRS is not ready yet.
if _, exists := a.confirmedBlocks[r.BlockHash]; exists {
- a.logger.Info("agreement result already confirmed", "result", r)
+ a.logger.Debug("agreement result already confirmed", "result", r)
return
}
if r.Position.Round > a.latestCRSRound {
@@ -116,7 +116,7 @@ func (a *agreement) processAgreementResult(r *types.AgreementResult) {
a.pendings[r.Position.Round] = pendingsForRound
}
pendingsForRound[r.BlockHash] = r
- a.logger.Info("agreement result cached", "result", r)
+ a.logger.Debug("agreement result cached", "result", r)
return
}
if err := core.VerifyAgreementResult(r, a.cache); err != nil {
diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/consensus.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/consensus.go
index 32bbab3b2..c767a6d53 100644
--- a/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/consensus.go
+++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/consensus.go
@@ -153,7 +153,7 @@ func (con *Consensus) checkIfValidated() bool {
if validatedChainCount == numChains {
return true
}
- con.logger.Info("not validated yet", "validated-chain", validatedChainCount)
+ con.logger.Debug("not validated yet", "validated-chain", validatedChainCount)
return false
}
@@ -197,7 +197,7 @@ func (con *Consensus) checkIfSynced(blocks []*types.Block) bool {
if overlapCount == numChains {
return true
}
- con.logger.Info("not synced yet",
+ con.logger.Debug("not synced yet",
"overlap-count", overlapCount,
"num-chain", numChains,
"last-block", blocks[len(blocks)-1])
@@ -262,7 +262,7 @@ func (con *Consensus) ensureAgreementOverlapRound() bool {
}
if tipRoundMap[r] == con.configs[r].NumChains {
con.agreementRoundCut = r
- con.logger.Info("agreement round cut found, round", r)
+ con.logger.Debug("agreement round cut found, round", r)
return true
}
}
@@ -411,7 +411,7 @@ func (con *Consensus) SyncBlocks(
"expected", tipHeight+1)
return false, ErrInvalidSyncingFinalizationHeight
}
- con.logger.Info("syncBlocks",
+ con.logger.Debug("syncBlocks",
"position", &blocks[0].Position,
"final height", blocks[0].Finalization.Height,
"len", len(blocks),
@@ -446,7 +446,7 @@ func (con *Consensus) SyncBlocks(
return false, err
}
if syncBlock != nil {
- con.logger.Info("deliver set found", "block", syncBlock)
+ con.logger.Debug("deliver set found", "block", syncBlock)
// New lattice with the round of syncBlock.
con.initConsensusObj(syncBlock)
con.setupConfigs(blocks)
@@ -700,7 +700,7 @@ func (con *Consensus) startCRSMonitor() {
if round == lastNotifiedRound {
return
}
- con.logger.Info("CRS is ready", "round", round)
+ con.logger.Debug("CRS is ready", "round", round)
con.lock.RLock()
defer con.lock.RUnlock()
lastNotifiedRound = round
diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/types/block-randomness.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/types/block-randomness.go
index 1c64d4ad9..65cb635ca 100644
--- a/vendor/github.com/dexon-foundation/dexon-consensus/core/types/block-randomness.go
+++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/types/block-randomness.go
@@ -18,6 +18,7 @@
package types
import (
+ "encoding/hex"
"fmt"
"github.com/dexon-foundation/dexon-consensus/common"
@@ -42,3 +43,10 @@ type BlockRandomnessResult struct {
Position Position `json:"position"`
Randomness []byte `json:"randomness"`
}
+
+func (r *BlockRandomnessResult) String() string {
+ return fmt.Sprintf("blockRandomness{Block:%s Pos:%s Rand:%s}",
+ r.BlockHash.String()[:6], &r.Position,
+ hex.EncodeToString(r.Randomness)[:6],
+ )
+}
diff --git a/vendor/vendor.json b/vendor/vendor.json
index aa0e13466..275ea6c66 100644
--- a/vendor/vendor.json
+++ b/vendor/vendor.json
@@ -105,14 +105,14 @@
{
"checksumSHA1": "65L1yf+f0OCiLFniljqfRxVdsQA=",
"path": "github.com/dexon-foundation/dexon-consensus/common",
- "revision": "dce509a13ef5873b9cae3c1cabdb97e219b6fb7d",
- "revisionTime": "2018-12-24T02:29:31Z"
+ "revision": "ca82c3c26ac206c2c81dfa636902aacf327381b1",
+ "revisionTime": "2018-12-26T07:56:33Z"
},
{
- "checksumSHA1": "GuNkyaDFHBDzOV5un2efH5CBG7k=",
+ "checksumSHA1": "L3o/oOc6PkXaRrkeMHcpTJBDOAY=",
"path": "github.com/dexon-foundation/dexon-consensus/core",
- "revision": "dce509a13ef5873b9cae3c1cabdb97e219b6fb7d",
- "revisionTime": "2018-12-24T02:29:31Z"
+ "revision": "ca82c3c26ac206c2c81dfa636902aacf327381b1",
+ "revisionTime": "2018-12-26T07:56:33Z"
},
{
"checksumSHA1": "v4fKR7uhoyufi6hAVO44cFEb+tY=",
@@ -123,50 +123,50 @@
{
"checksumSHA1": "tQSbYCu5P00lUhKsx3IbBZCuSLY=",
"path": "github.com/dexon-foundation/dexon-consensus/core/crypto",
- "revision": "dce509a13ef5873b9cae3c1cabdb97e219b6fb7d",
- "revisionTime": "2018-12-24T02:29:31Z"
+ "revision": "ca82c3c26ac206c2c81dfa636902aacf327381b1",
+ "revisionTime": "2018-12-26T07:56:33Z"
},
{
"checksumSHA1": "W2P7pkuJ+26BpJg03K4Y0nB5obI=",
"path": "github.com/dexon-foundation/dexon-consensus/core/crypto/dkg",
- "revision": "dce509a13ef5873b9cae3c1cabdb97e219b6fb7d",
- "revisionTime": "2018-12-24T02:29:31Z"
+ "revision": "ca82c3c26ac206c2c81dfa636902aacf327381b1",
+ "revisionTime": "2018-12-26T07:56:33Z"
},
{
"checksumSHA1": "6Pf6caC8LTNCI7IflFmglKYnxYo=",
"path": "github.com/dexon-foundation/dexon-consensus/core/crypto/ecdsa",
- "revision": "dce509a13ef5873b9cae3c1cabdb97e219b6fb7d",
- "revisionTime": "2018-12-24T02:29:31Z"
+ "revision": "ca82c3c26ac206c2c81dfa636902aacf327381b1",
+ "revisionTime": "2018-12-26T07:56:33Z"
},
{
"checksumSHA1": "PJXR1OuWwVVYrdJMK3skPr1/8ls=",
"path": "github.com/dexon-foundation/dexon-consensus/core/db",
- "revision": "dce509a13ef5873b9cae3c1cabdb97e219b6fb7d",
- "revisionTime": "2018-12-24T02:29:31Z"
+ "revision": "ca82c3c26ac206c2c81dfa636902aacf327381b1",
+ "revisionTime": "2018-12-26T07:56:33Z"
},
{
- "checksumSHA1": "ZQnoWpMJTicqu9UMKi+CPd5r3so=",
+ "checksumSHA1": "TpjFUERtEjqc13MbKMPlUZIpCwQ=",
"path": "github.com/dexon-foundation/dexon-consensus/core/syncer",
- "revision": "dce509a13ef5873b9cae3c1cabdb97e219b6fb7d",
- "revisionTime": "2018-12-24T02:29:31Z"
+ "revision": "ca82c3c26ac206c2c81dfa636902aacf327381b1",
+ "revisionTime": "2018-12-26T07:56:33Z"
},
{
- "checksumSHA1": "Z079qQV+aQV9A3kSJ0LbFjx5VO4=",
+ "checksumSHA1": "tY+yi5kYk1u/scq+6e1KzhPv4kU=",
"path": "github.com/dexon-foundation/dexon-consensus/core/types",
- "revision": "dce509a13ef5873b9cae3c1cabdb97e219b6fb7d",
- "revisionTime": "2018-12-24T02:29:31Z"
+ "revision": "ca82c3c26ac206c2c81dfa636902aacf327381b1",
+ "revisionTime": "2018-12-26T07:56:33Z"
},
{
"checksumSHA1": "sY+2eiOoWvsNMvuPl9qQ+rlT9sA=",
"path": "github.com/dexon-foundation/dexon-consensus/core/types/dkg",
- "revision": "dce509a13ef5873b9cae3c1cabdb97e219b6fb7d",
- "revisionTime": "2018-12-24T02:29:31Z"
+ "revision": "ca82c3c26ac206c2c81dfa636902aacf327381b1",
+ "revisionTime": "2018-12-26T07:56:33Z"
},
{
"checksumSHA1": "0JFlVFny0IyANnlelQDl8ot16wU=",
"path": "github.com/dexon-foundation/dexon-consensus/core/utils",
- "revision": "dce509a13ef5873b9cae3c1cabdb97e219b6fb7d",
- "revisionTime": "2018-12-24T02:29:31Z"
+ "revision": "ca82c3c26ac206c2c81dfa636902aacf327381b1",
+ "revisionTime": "2018-12-26T07:56:33Z"
},
{
"checksumSHA1": "TAkwduKZqLyimyTPPWIllZWYFuE=",