diff options
author | Jimmy Hu <jimmy.hu@dexon.org> | 2018-12-27 09:17:28 +0800 |
---|---|---|
committer | Wei-Ning Huang <w@dexon.org> | 2018-12-28 14:15:39 +0800 |
commit | 50a0622f1797096a0836246177f41f77cb4b0e8b (patch) | |
tree | 3a7ce1407fd555365733bb76c456b19aef37f126 | |
parent | bc18f6b29086e1a98590d41b2c1e362505af49aa (diff) | |
download | dexon-50a0622f1797096a0836246177f41f77cb4b0e8b.tar dexon-50a0622f1797096a0836246177f41f77cb4b0e8b.tar.gz dexon-50a0622f1797096a0836246177f41f77cb4b0e8b.tar.bz2 dexon-50a0622f1797096a0836246177f41f77cb4b0e8b.tar.lz dexon-50a0622f1797096a0836246177f41f77cb4b0e8b.tar.xz dexon-50a0622f1797096a0836246177f41f77cb4b0e8b.tar.zst dexon-50a0622f1797096a0836246177f41f77cb4b0e8b.zip |
dex: add pull randomness (#105)
* vendor: sync to latest core
* dex: Add PullRandomness
-rw-r--r-- | dex/cache.go | 62 | ||||
-rw-r--r-- | dex/cache_test.go | 90 | ||||
-rw-r--r-- | dex/handler.go | 27 | ||||
-rw-r--r-- | dex/network.go | 11 | ||||
-rw-r--r-- | dex/peer.go | 20 | ||||
-rw-r--r-- | dex/protocol.go | 7 | ||||
-rw-r--r-- | vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-mgr.go | 2 | ||||
-rw-r--r-- | vendor/github.com/dexon-foundation/dexon-consensus/core/compaction-chain.go | 12 | ||||
-rw-r--r-- | vendor/github.com/dexon-foundation/dexon-consensus/core/configuration-chain.go | 4 | ||||
-rw-r--r-- | vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go | 203 | ||||
-rw-r--r-- | vendor/github.com/dexon-foundation/dexon-consensus/core/interfaces.go | 3 | ||||
-rw-r--r-- | vendor/github.com/dexon-foundation/dexon-consensus/core/lattice.go | 17 | ||||
-rw-r--r-- | vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/agreement.go | 4 | ||||
-rw-r--r-- | vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/consensus.go | 12 | ||||
-rw-r--r-- | vendor/github.com/dexon-foundation/dexon-consensus/core/types/block-randomness.go | 8 | ||||
-rw-r--r-- | vendor/vendor.json | 46 |
16 files changed, 361 insertions, 167 deletions
diff --git a/dex/cache.go b/dex/cache.go index 89bbbe3be..bdc22e114 100644 --- a/dex/cache.go +++ b/dex/cache.go @@ -44,21 +44,23 @@ func voteToKey(vote *coreTypes.Vote) voteKey { } type cache struct { - lock sync.RWMutex - blockCache map[coreCommon.Hash]*coreTypes.Block - voteCache map[coreTypes.Position]map[voteKey]*coreTypes.Vote - votePosition []coreTypes.Position - db coreDb.Database - voteSize int - size int + lock sync.RWMutex + blockCache map[coreCommon.Hash]*coreTypes.Block + voteCache map[coreTypes.Position]map[voteKey]*coreTypes.Vote + randomnessCache map[coreCommon.Hash]*coreTypes.BlockRandomnessResult + votePosition []coreTypes.Position + db coreDb.Database + voteSize int + size int } func newCache(size int, db coreDb.Database) *cache { return &cache{ - blockCache: make(map[coreCommon.Hash]*coreTypes.Block), - voteCache: make(map[coreTypes.Position]map[voteKey]*coreTypes.Vote), - db: db, - size: size, + blockCache: make(map[coreCommon.Hash]*coreTypes.Block), + voteCache: make(map[coreTypes.Position]map[voteKey]*coreTypes.Vote), + randomnessCache: make(map[coreCommon.Hash]*coreTypes.BlockRandomnessResult), + db: db, + size: size, } } @@ -126,3 +128,41 @@ func (c *cache) blocks(hashes coreCommon.Hashes) []*coreTypes.Block { } return cacheBlocks } + +func (c *cache) addRandomness(rand *coreTypes.BlockRandomnessResult) { + c.lock.Lock() + defer c.lock.Unlock() + if len(c.randomnessCache) >= c.size { + // Randomly delete one entry. + for k := range c.randomnessCache { + delete(c.randomnessCache, k) + break + } + } + c.randomnessCache[rand.BlockHash] = rand +} + +func (c *cache) randomness(hashes coreCommon.Hashes) []*coreTypes.BlockRandomnessResult { + c.lock.RLock() + defer c.lock.RUnlock() + cacheRandomnesss := make([]*coreTypes.BlockRandomnessResult, 0, len(hashes)) + for _, hash := range hashes { + if block, exist := c.randomnessCache[hash]; exist { + cacheRandomnesss = append(cacheRandomnesss, block) + } else { + block, err := c.db.GetBlock(hash) + if err != nil { + continue + } + if len(block.Finalization.Randomness) == 0 { + continue + } + cacheRandomnesss = append(cacheRandomnesss, &coreTypes.BlockRandomnessResult{ + BlockHash: block.Hash, + Position: block.Position, + Randomness: block.Finalization.Randomness, + }) + } + } + return cacheRandomnesss +} diff --git a/dex/cache_test.go b/dex/cache_test.go index 3b43e77aa..536e015f0 100644 --- a/dex/cache_test.go +++ b/dex/cache_test.go @@ -18,6 +18,7 @@ package dex import ( + "math/rand" "sort" "strings" "testing" @@ -203,3 +204,92 @@ func TestCacheBlock(t *testing.T) { } } } + +func randomBytes() []byte { + bytes := make([]byte, 32) + for i := range bytes { + bytes[i] = byte(rand.Int() % 256) + } + return bytes +} + +func TestCacheRandomness(t *testing.T) { + db, err := coreDb.NewMemBackedDB() + if err != nil { + panic(err) + } + cache := newCache(3, db) + rand1 := &coreTypes.BlockRandomnessResult{ + BlockHash: coreCommon.NewRandomHash(), + Randomness: randomBytes(), + } + rand2 := &coreTypes.BlockRandomnessResult{ + BlockHash: coreCommon.NewRandomHash(), + Randomness: randomBytes(), + } + rand3 := &coreTypes.BlockRandomnessResult{ + BlockHash: coreCommon.NewRandomHash(), + Randomness: randomBytes(), + } + rand4 := &coreTypes.BlockRandomnessResult{ + BlockHash: coreCommon.NewRandomHash(), + Randomness: randomBytes(), + } + cache.addRandomness(rand1) + cache.addRandomness(rand2) + cache.addRandomness(rand3) + + hashes := coreCommon.Hashes{rand1.BlockHash, rand2.BlockHash, rand3.BlockHash, rand4.BlockHash} + hashMap := map[coreCommon.Hash]struct{}{ + rand1.BlockHash: {}, + rand2.BlockHash: {}, + rand3.BlockHash: {}, + } + rands := cache.randomness(hashes) + if len(rands) != 3 { + t.Errorf("fail to get rands: have %d, want 3", len(rands)) + } + for _, rand := range rands { + if _, exist := hashMap[rand.BlockHash]; !exist { + t.Errorf("get wrong rand: have %s, want %v", rand, hashMap) + } + } + + cache.addRandomness(rand4) + + rands = cache.randomness(hashes) + hashMap[rand4.BlockHash] = struct{}{} + if len(rands) != 3 { + t.Errorf("fail to get rands: have %d, want 3", len(rands)) + } + hasNewRandomness := false + for _, rand := range rands { + if _, exist := hashMap[rand.BlockHash]; !exist { + t.Errorf("get wrong rand: have %s, want %v", rand, hashMap) + } + if rand.BlockHash.Equal(rand4.BlockHash) { + hasNewRandomness = true + } + } + if !hasNewRandomness { + t.Errorf("expect rand %s in cache, have %v", rand4, rands) + } + + block := &coreTypes.Block{ + Hash: coreCommon.NewRandomHash(), + Finalization: coreTypes.FinalizationResult{ + Randomness: randomBytes(), + }, + } + if err := db.PutBlock(*block); err != nil { + panic(err) + } + rands = cache.randomness(coreCommon.Hashes{block.Hash}) + if len(rands) != 1 { + t.Errorf("fail to get rands: have %d, want 1", len(rands)) + } else { + if !rands[0].BlockHash.Equal(block.Hash) { + t.Errorf("get wrong rand: have %s, want %s", rands[0], block) + } + } +} diff --git a/dex/handler.go b/dex/handler.go index 9956bd1c0..e117eff3e 100644 --- a/dex/handler.go +++ b/dex/handler.go @@ -896,6 +896,21 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { return err } } + case msg.Code == PullRandomnessMsg: + if !pm.isBlockProposer { + break + } + var hashes coreCommon.Hashes + if err := msg.Decode(&hashes); err != nil { + return errResp(ErrDecode, "msg %v: %v", msg, err) + } + randomness := pm.cache.randomness(hashes) + log.Debug("Push randomness", "randomness", randomness) + for _, randomness := range randomness { + if err := p.SendRandomness(randomness); err != nil { + return err + } + } case msg.Code == GetGovStateMsg: var hash common.Hash if err := msg.Decode(&hash); err != nil { @@ -1034,6 +1049,7 @@ func (pm *ProtocolManager) BroadcastAgreementResult( func (pm *ProtocolManager) BroadcastRandomnessResult( randomness *coreTypes.BlockRandomnessResult) { + pm.cache.addRandomness(randomness) // send to notary nodes first (direct) label := peerLabel{ set: notaryset, @@ -1114,6 +1130,17 @@ func (pm *ProtocolManager) BroadcastPullVotes( } } +func (pm *ProtocolManager) BroadcastPullRandomness( + hashes coreCommon.Hashes) { + // TODO(jimmy-dexon): pull from dkg set only. + for idx, peer := range pm.peers.Peers() { + if idx >= maxPullPeers { + break + } + peer.AsyncSendPullRandomness(hashes) + } +} + func (pm *ProtocolManager) txBroadcastLoop() { queueSizeMax := common.StorageSize(100 * 1024) // 100 KB currentSize := common.StorageSize(0) diff --git a/dex/network.go b/dex/network.go index 38ee614ad..c5f81782d 100644 --- a/dex/network.go +++ b/dex/network.go @@ -34,6 +34,9 @@ func NewDexconNetwork(pm *ProtocolManager) *DexconNetwork { // PullBlocks tries to pull blocks from the DEXON network. func (n *DexconNetwork) PullBlocks(hashes coreCommon.Hashes) { + if len(hashes) == 0 { + return + } n.pm.BroadcastPullBlocks(hashes) } @@ -42,6 +45,14 @@ func (n *DexconNetwork) PullVotes(pos types.Position) { n.pm.BroadcastPullVotes(pos) } +// PullRandomness tries to pull randomness result from the DEXON network. +func (n *DexconNetwork) PullRandomness(hashes coreCommon.Hashes) { + if len(hashes) == 0 { + return + } + n.pm.BroadcastPullRandomness(hashes) +} + // BroadcastVote broadcasts vote to all nodes in DEXON network. func (n *DexconNetwork) BroadcastVote(vote *types.Vote) { n.pm.BroadcastVote(vote) diff --git a/dex/peer.go b/dex/peer.go index 49a9b64f8..aecf9dc7c 100644 --- a/dex/peer.go +++ b/dex/peer.go @@ -100,6 +100,7 @@ const ( maxQueuedDKGParitialSignature = 16 maxQueuedPullBlocks = 128 maxQueuedPullVotes = 128 + maxQueuedPullRandomness = 128 handshakeTimeout = 5 * time.Second @@ -160,6 +161,7 @@ type peer struct { queuedDKGPartialSignatures chan *dkgTypes.PartialSignature queuedPullBlocks chan coreCommon.Hashes queuedPullVotes chan coreTypes.Position + queuedPullRandomness chan coreCommon.Hashes term chan struct{} // Termination channel to stop the broadcaster } @@ -190,6 +192,7 @@ func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { queuedDKGPartialSignatures: make(chan *dkgTypes.PartialSignature, maxQueuedDKGParitialSignature), queuedPullBlocks: make(chan coreCommon.Hashes, maxQueuedPullBlocks), queuedPullVotes: make(chan coreTypes.Position, maxQueuedPullVotes), + queuedPullRandomness: make(chan coreCommon.Hashes, maxQueuedPullRandomness), term: make(chan struct{}), } } @@ -257,6 +260,11 @@ func (p *peer) broadcast() { return } p.Log().Trace("Pulling Votes", "position", pos) + case hashes := <-p.queuedPullRandomness: + if err := p.SendPullRandomness(hashes); err != nil { + return + } + p.Log().Trace("Pulling Randomness", "hashes", hashes) case <-p.term: return case <-time.After(100 * time.Millisecond): @@ -530,6 +538,18 @@ func (p *peer) AsyncSendPullVotes(pos coreTypes.Position) { } } +func (p *peer) SendPullRandomness(hashes coreCommon.Hashes) error { + return p2p.Send(p.rw, PullRandomnessMsg, hashes) +} + +func (p *peer) AsyncSendPullRandomness(hashes coreCommon.Hashes) { + select { + case p.queuedPullRandomness <- hashes: + default: + p.Log().Debug("Dropping Pull Randomness") + } +} + // SendBlockHeaders sends a batch of block headers to the remote peer. func (p *peer) SendBlockHeaders(headers []*types.HeaderWithGovState) error { return p2p.Send(p.rw, BlockHeadersMsg, headers) diff --git a/dex/protocol.go b/dex/protocol.go index c63dd78ca..423b35272 100644 --- a/dex/protocol.go +++ b/dex/protocol.go @@ -59,7 +59,7 @@ var ProtocolName = "dex" var ProtocolVersions = []uint{dex64} // ProtocolLengths are the number of implemented message corresponding to different protocol versions. -var ProtocolLengths = []uint64{42} +var ProtocolLengths = []uint64{43} const ProtocolMaxMsgSize = 10 * 1024 * 1024 // Maximum cap on the size of a protocol message @@ -92,9 +92,10 @@ const ( DKGPartialSignatureMsg = 0x25 PullBlocksMsg = 0x26 PullVotesMsg = 0x27 + PullRandomnessMsg = 0x28 - GetGovStateMsg = 0x28 - GovStateMsg = 0x29 + GetGovStateMsg = 0x29 + GovStateMsg = 0x2a ) type errCode int diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-mgr.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-mgr.go index a9fa21df2..2b5c4bc51 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-mgr.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-mgr.go @@ -313,7 +313,7 @@ func (mgr *agreementMgr) runBA(initRound uint64, chainID uint32) { if config = mgr.getConfig(nextRound); config != nil { break } else { - mgr.logger.Info("round is not ready", "round", nextRound) + mgr.logger.Debug("round is not ready", "round", nextRound) time.Sleep(1 * time.Second) } } diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/compaction-chain.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/compaction-chain.go index 14e3b265d..8e044293f 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/compaction-chain.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/compaction-chain.go @@ -257,3 +257,15 @@ func (cc *compactionChain) lastPendingBlock() *types.Block { } return nil } + +func (cc *compactionChain) pendingBlocksWithoutRandomness() ( + hashes common.Hashes) { + cc.lock.RLock() + defer cc.lock.RUnlock() + for _, block := range cc.pendingBlocks { + if _, exist := cc.blockRandomness[block.Hash]; !exist { + hashes = append(hashes, block.Hash) + } + } + return +} diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/configuration-chain.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/configuration-chain.go index ad24e446d..5c389a70f 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/configuration-chain.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/configuration-chain.go @@ -135,7 +135,7 @@ func (cc *configurationChain) runDKG(round uint64) error { } cc.logger.Debug("Calling Governance.IsDKGMPKReady", "round", round) for !cc.gov.IsDKGMPKReady(round) { - cc.logger.Info("DKG MPKs are not ready yet. Try again later...", + cc.logger.Debug("DKG MPKs are not ready yet. Try again later...", "nodeID", cc.ID) cc.dkgLock.Unlock() time.Sleep(500 * time.Millisecond) @@ -206,7 +206,7 @@ func (cc *configurationChain) runDKG(round uint64) error { // unexpected network fluctuation and ensure the robustness of DKG protocol. cc.logger.Debug("Calling Governance.IsDKGFinal", "round", round) for !cc.gov.IsDKGFinal(round) { - cc.logger.Info("DKG is not ready yet. Try again later...", + cc.logger.Debug("DKG is not ready yet. Try again later...", "nodeID", cc.ID) time.Sleep(500 * time.Millisecond) } diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go index 0d4a38a91..3353d1d60 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go @@ -387,6 +387,7 @@ type Consensus struct { event *common.Event logger common.Logger nonFinalizedBlockDelivered bool + resetRandomnessTicker chan struct{} } // NewConsensus construct an Consensus instance. @@ -398,8 +399,8 @@ func NewConsensus( network Network, prv crypto.PrivateKey, logger common.Logger) *Consensus { - - return newConsensus(dMoment, app, gov, db, network, prv, logger, true) + return newConsensusForRound( + &types.Block{}, dMoment, app, gov, db, network, prv, logger, nil, true) } // NewConsensusForSimulation creates an instance of Consensus for simulation, @@ -412,19 +413,60 @@ func NewConsensusForSimulation( network Network, prv crypto.PrivateKey, logger common.Logger) *Consensus { + return newConsensusForRound( + &types.Block{}, dMoment, app, gov, db, network, prv, logger, nil, false) +} - return newConsensus(dMoment, app, gov, db, network, prv, logger, false) +// NewConsensusFromSyncer constructs an Consensus instance from information +// provided from syncer. +// +// You need to provide the initial block for this newly created Consensus +// instance to bootstrap with. A proper choice is the last finalized block you +// delivered to syncer. +func NewConsensusFromSyncer( + initBlock *types.Block, + initRoundBeginTime time.Time, + app Application, + gov Governance, + db db.Database, + networkModule Network, + prv crypto.PrivateKey, + latticeModule *Lattice, + blocks []*types.Block, + randomnessResults []*types.BlockRandomnessResult, + logger common.Logger) (*Consensus, error) { + // Setup Consensus instance. + con := newConsensusForRound(initBlock, initRoundBeginTime, app, gov, db, + networkModule, prv, logger, latticeModule, true) + // Dump all BA-confirmed blocks to the consensus instance. + for _, b := range blocks { + con.ccModule.registerBlock(b) + if err := con.processBlock(b); err != nil { + return nil, err + } + } + // Dump all randomness result to the consensus instance. + for _, r := range randomnessResults { + if err := con.ProcessBlockRandomnessResult(r, false); err != nil { + con.logger.Error("failed to process randomness result when syncing", + "result", r) + continue + } + } + return con, nil } // newConsensus creates a Consensus instance. -func newConsensus( - dMoment time.Time, +func newConsensusForRound( + initBlock *types.Block, + initRoundBeginTime time.Time, app Application, gov Governance, db db.Database, network Network, prv crypto.PrivateKey, logger common.Logger, + latticeModule *Lattice, usingNonBlocking bool) *Consensus { // TODO(w): load latest blockHeight from DB, and use config at that height. @@ -436,12 +478,14 @@ func newConsensus( if a, ok := app.(Debug); ok { debugApp = a } - // Get configuration for genesis round. - var round uint64 - config := utils.GetConfigWithPanic(gov, round, logger) + // Get configuration for bootstrap round. + initRound := initBlock.Position.Round + initConfig := utils.GetConfigWithPanic(gov, initRound, logger) // Init lattice. - lattice := NewLattice( - dMoment, round, config, signer, app, debugApp, db, logger) + if latticeModule == nil { + latticeModule = NewLattice(initRoundBeginTime, initRound, initConfig, + signer, app, debugApp, db, logger) + } // Init configuration chain. ID := types.NewNodeID(prv.PublicKey()) recv := &consensusDKGReceiver{ @@ -452,13 +496,7 @@ func newConsensus( network: network, logger: logger, } - cfgModule := newConfigurationChain( - ID, - recv, - gov, - nodeSetCache, - db, - logger) + cfgModule := newConfigurationChain(ID, recv, gov, nodeSetCache, db, logger) recv.cfgModule = cfgModule appModule := app if usingNonBlocking { @@ -468,7 +506,7 @@ func newConsensus( con := &Consensus{ ID: ID, ccModule: newCompactionChain(gov), - lattice: lattice, + lattice: latticeModule, app: appModule, debugApp: debugApp, gov: gov, @@ -477,77 +515,6 @@ func newConsensus( baConfirmedBlock: make(map[common.Hash]chan<- *types.Block), dkgReady: sync.NewCond(&sync.Mutex{}), cfgModule: cfgModule, - dMoment: dMoment, - nodeSetCache: nodeSetCache, - signer: signer, - event: common.NewEvent(), - logger: logger, - } - con.ctx, con.ctxCancel = context.WithCancel(context.Background()) - con.baMgr = newAgreementMgr(con, round, dMoment) - if err := con.prepare(&types.Block{}); err != nil { - panic(err) - } - return con -} - -// NewConsensusFromSyncer constructs an Consensus instance from information -// provided from syncer. -// -// You need to provide the initial block for this newly created Consensus -// instance to bootstrap with. A proper choice is the last finalized block you -// delivered to syncer. -func NewConsensusFromSyncer( - initBlock *types.Block, - initRoundBeginTime time.Time, - app Application, - gov Governance, - db db.Database, - networkModule Network, - prv crypto.PrivateKey, - latticeModule *Lattice, - blocks []*types.Block, - randomnessResults []*types.BlockRandomnessResult, - logger common.Logger) (*Consensus, error) { - // Setup the cache for node sets. - nodeSetCache := utils.NewNodeSetCache(gov) - // Setup signer module. - signer := utils.NewSigner(prv) - // Init configuration chain. - ID := types.NewNodeID(prv.PublicKey()) - recv := &consensusDKGReceiver{ - ID: ID, - gov: gov, - signer: signer, - nodeSetCache: nodeSetCache, - network: networkModule, - logger: logger, - } - cfgModule := newConfigurationChain( - ID, - recv, - gov, - nodeSetCache, - db, - logger) - recv.cfgModule = cfgModule - // Check if the application implement Debug interface. - var debugApp Debug - if a, ok := app.(Debug); ok { - debugApp = a - } - // Setup Consensus instance. - con := &Consensus{ - ID: ID, - ccModule: newCompactionChain(gov), - lattice: latticeModule, - app: newNonBlocking(app, debugApp), - gov: gov, - db: db, - network: networkModule, - baConfirmedBlock: make(map[common.Hash]chan<- *types.Block), - dkgReady: sync.NewCond(&sync.Mutex{}), - cfgModule: cfgModule, dMoment: initRoundBeginTime, nodeSetCache: nodeSetCache, signer: signer, @@ -555,27 +522,11 @@ func NewConsensusFromSyncer( logger: logger, } con.ctx, con.ctxCancel = context.WithCancel(context.Background()) - con.baMgr = newAgreementMgr(con, initBlock.Position.Round, initRoundBeginTime) - // Bootstrap the consensus instance. + con.baMgr = newAgreementMgr(con, initRound, initRoundBeginTime) if err := con.prepare(initBlock); err != nil { - return nil, err - } - // Dump all BA-confirmed blocks to the consensus instance. - for _, b := range blocks { - con.ccModule.registerBlock(b) - if err := con.processBlock(b); err != nil { - return nil, err - } - } - // Dump all randomness result to the consensus instance. - for _, r := range randomnessResults { - if err := con.ProcessBlockRandomnessResult(r, false); err != nil { - con.logger.Error("failed to process randomness result when syncing", - "result", r) - continue - } + panic(err) } - return con, nil + return con } // prepare the Consensus instance to be ready for blocks after 'initBlock'. @@ -634,6 +585,9 @@ func (con *Consensus) Run() { go con.processMsg(con.network.ReceiveChan()) // Sleep until dMoment come. time.Sleep(con.dMoment.Sub(time.Now().UTC())) + // Take some time to bootstrap. + time.Sleep(3 * time.Second) + go con.pullRandomness() // Block until done. select { case <-con.ctx.Done(): @@ -673,7 +627,7 @@ func (con *Consensus) runCRS(round uint64) { con.logger.Debug("Calling Governance.CRS to check if already proposed", "round", round+1) if (con.gov.CRS(round+1) != common.Hash{}) { - con.logger.Info("CRS already proposed", "round", round+1) + con.logger.Debug("CRS already proposed", "round", round+1) return } con.logger.Debug("Calling Governance.IsDKGFinal to check if ready to run CRS", @@ -744,7 +698,7 @@ func (con *Consensus) initialRound( if (nextCRS != common.Hash{}) { return true } - con.logger.Info("CRS is not ready yet. Try again later...", + con.logger.Debug("CRS is not ready yet. Try again later...", "nodeID", con.ID, "round", round) return false @@ -757,7 +711,7 @@ func (con *Consensus) initialRound( go func(nextRound uint64) { if !checkWithCancel( con.ctx, 500*time.Millisecond, checkCRS(nextRound)) { - con.logger.Info("unable to prepare CRS for baMgr", + con.logger.Debug("unable to prepare CRS for baMgr", "round", nextRound) return } @@ -781,7 +735,7 @@ func (con *Consensus) initialRound( // unexpected network fluctuation and ensure the robustness. if !checkWithCancel( con.ctx, 500*time.Millisecond, checkCRS(nextRound)) { - con.logger.Info("unable to prepare CRS for DKG set", + con.logger.Debug("unable to prepare CRS for DKG set", "round", nextRound) return } @@ -1034,9 +988,7 @@ func (con *Consensus) ProcessBlockRandomnessResult( } if needBroadcast { con.logger.Debug("Calling Network.BroadcastRandomnessResult", - "hash", rand.BlockHash.String()[:6], - "position", &rand.Position, - "randomness", hex.EncodeToString(rand.Randomness)) + "randomness", rand) con.network.BroadcastRandomnessResult(rand) } return con.deliverFinalizedBlocks() @@ -1051,8 +1003,27 @@ func (con *Consensus) preProcessBlock(b *types.Block) (err error) { return } +func (con *Consensus) pullRandomness() { + for { + select { + case <-con.ctx.Done(): + return + case <-con.resetRandomnessTicker: + case <-time.After(1500 * time.Millisecond): + // TODO(jimmy): pulling period should be related to lambdaBA. + hashes := con.ccModule.pendingBlocksWithoutRandomness() + con.logger.Debug("Calling Network.PullRandomness", "blocks", hashes) + con.network.PullRandomness(hashes) + } + } +} + // deliverBlock deliver a block to application layer. func (con *Consensus) deliverBlock(b *types.Block) { + select { + case con.resetRandomnessTicker <- struct{}{}: + default: + } if err := con.db.UpdateBlock(*b); err != nil { panic(err) } @@ -1134,7 +1105,7 @@ func (con *Consensus) processBlock(block *types.Block) (err error) { if con.nonFinalizedBlockDelivered { panic(fmt.Errorf("attempting to skip finalized block: %s", b)) } - con.logger.Info("skip delivery of finalized block", + con.logger.Debug("skip delivery of finalized block", "block", b, "finalization-height", b.Finalization.Height) continue diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/interfaces.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/interfaces.go index fc3bf09bc..20770328c 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/interfaces.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/interfaces.go @@ -67,6 +67,9 @@ type Network interface { // PullVotes tries to pull votes from the DEXON network. PullVotes(position types.Position) + // PullRandomness tries to pull randomness from the DEXON network. + PullRandomness(hashes common.Hashes) + // BroadcastVote broadcasts vote to all nodes in DEXON network. BroadcastVote(vote *types.Vote) diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/lattice.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/lattice.go index db19cf910..591c63dfd 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/lattice.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/lattice.go @@ -193,17 +193,28 @@ func (l *Lattice) addBlockToLattice( if err == nil { var output []*types.Block if output, err = l.data.addBlock(tip); err != nil { - l.logger.Error("Sanity Check failed", "error", err) - continue + // We should be able to add this block once sanity check + // passed. + l.logger.Error("Failed to add sanity-checked block", + "block", tip, "error", err) + panic(err) } hasOutput = true outputBlocks = append(outputBlocks, output...) + l.pool.removeTip(i) + continue } if _, ok := err.(*ErrAckingBlockNotExists); ok { + l.logger.Debug("Pending block for lattice", + "pending", tip, + "last", l.data.chains[tip.Position.ChainID]) err = nil continue + } else { + l.logger.Error("Unexpected sanity check error", + "block", tip, "error", err) + panic(err) } - l.pool.removeTip(i) } if !hasOutput { break diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/agreement.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/agreement.go index fee462442..32ea6547a 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/agreement.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/agreement.go @@ -106,7 +106,7 @@ func (a *agreement) processBlock(b *types.Block) { func (a *agreement) processAgreementResult(r *types.AgreementResult) { // Cache those results that CRS is not ready yet. if _, exists := a.confirmedBlocks[r.BlockHash]; exists { - a.logger.Info("agreement result already confirmed", "result", r) + a.logger.Debug("agreement result already confirmed", "result", r) return } if r.Position.Round > a.latestCRSRound { @@ -116,7 +116,7 @@ func (a *agreement) processAgreementResult(r *types.AgreementResult) { a.pendings[r.Position.Round] = pendingsForRound } pendingsForRound[r.BlockHash] = r - a.logger.Info("agreement result cached", "result", r) + a.logger.Debug("agreement result cached", "result", r) return } if err := core.VerifyAgreementResult(r, a.cache); err != nil { diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/consensus.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/consensus.go index 32bbab3b2..c767a6d53 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/consensus.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/consensus.go @@ -153,7 +153,7 @@ func (con *Consensus) checkIfValidated() bool { if validatedChainCount == numChains { return true } - con.logger.Info("not validated yet", "validated-chain", validatedChainCount) + con.logger.Debug("not validated yet", "validated-chain", validatedChainCount) return false } @@ -197,7 +197,7 @@ func (con *Consensus) checkIfSynced(blocks []*types.Block) bool { if overlapCount == numChains { return true } - con.logger.Info("not synced yet", + con.logger.Debug("not synced yet", "overlap-count", overlapCount, "num-chain", numChains, "last-block", blocks[len(blocks)-1]) @@ -262,7 +262,7 @@ func (con *Consensus) ensureAgreementOverlapRound() bool { } if tipRoundMap[r] == con.configs[r].NumChains { con.agreementRoundCut = r - con.logger.Info("agreement round cut found, round", r) + con.logger.Debug("agreement round cut found, round", r) return true } } @@ -411,7 +411,7 @@ func (con *Consensus) SyncBlocks( "expected", tipHeight+1) return false, ErrInvalidSyncingFinalizationHeight } - con.logger.Info("syncBlocks", + con.logger.Debug("syncBlocks", "position", &blocks[0].Position, "final height", blocks[0].Finalization.Height, "len", len(blocks), @@ -446,7 +446,7 @@ func (con *Consensus) SyncBlocks( return false, err } if syncBlock != nil { - con.logger.Info("deliver set found", "block", syncBlock) + con.logger.Debug("deliver set found", "block", syncBlock) // New lattice with the round of syncBlock. con.initConsensusObj(syncBlock) con.setupConfigs(blocks) @@ -700,7 +700,7 @@ func (con *Consensus) startCRSMonitor() { if round == lastNotifiedRound { return } - con.logger.Info("CRS is ready", "round", round) + con.logger.Debug("CRS is ready", "round", round) con.lock.RLock() defer con.lock.RUnlock() lastNotifiedRound = round diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/types/block-randomness.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/types/block-randomness.go index 1c64d4ad9..65cb635ca 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/types/block-randomness.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/types/block-randomness.go @@ -18,6 +18,7 @@ package types import ( + "encoding/hex" "fmt" "github.com/dexon-foundation/dexon-consensus/common" @@ -42,3 +43,10 @@ type BlockRandomnessResult struct { Position Position `json:"position"` Randomness []byte `json:"randomness"` } + +func (r *BlockRandomnessResult) String() string { + return fmt.Sprintf("blockRandomness{Block:%s Pos:%s Rand:%s}", + r.BlockHash.String()[:6], &r.Position, + hex.EncodeToString(r.Randomness)[:6], + ) +} diff --git a/vendor/vendor.json b/vendor/vendor.json index aa0e13466..275ea6c66 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -105,14 +105,14 @@ { "checksumSHA1": "65L1yf+f0OCiLFniljqfRxVdsQA=", "path": "github.com/dexon-foundation/dexon-consensus/common", - "revision": "dce509a13ef5873b9cae3c1cabdb97e219b6fb7d", - "revisionTime": "2018-12-24T02:29:31Z" + "revision": "ca82c3c26ac206c2c81dfa636902aacf327381b1", + "revisionTime": "2018-12-26T07:56:33Z" }, { - "checksumSHA1": "GuNkyaDFHBDzOV5un2efH5CBG7k=", + "checksumSHA1": "L3o/oOc6PkXaRrkeMHcpTJBDOAY=", "path": "github.com/dexon-foundation/dexon-consensus/core", - "revision": "dce509a13ef5873b9cae3c1cabdb97e219b6fb7d", - "revisionTime": "2018-12-24T02:29:31Z" + "revision": "ca82c3c26ac206c2c81dfa636902aacf327381b1", + "revisionTime": "2018-12-26T07:56:33Z" }, { "checksumSHA1": "v4fKR7uhoyufi6hAVO44cFEb+tY=", @@ -123,50 +123,50 @@ { "checksumSHA1": "tQSbYCu5P00lUhKsx3IbBZCuSLY=", "path": "github.com/dexon-foundation/dexon-consensus/core/crypto", - "revision": "dce509a13ef5873b9cae3c1cabdb97e219b6fb7d", - "revisionTime": "2018-12-24T02:29:31Z" + "revision": "ca82c3c26ac206c2c81dfa636902aacf327381b1", + "revisionTime": "2018-12-26T07:56:33Z" }, { "checksumSHA1": "W2P7pkuJ+26BpJg03K4Y0nB5obI=", "path": "github.com/dexon-foundation/dexon-consensus/core/crypto/dkg", - "revision": "dce509a13ef5873b9cae3c1cabdb97e219b6fb7d", - "revisionTime": "2018-12-24T02:29:31Z" + "revision": "ca82c3c26ac206c2c81dfa636902aacf327381b1", + "revisionTime": "2018-12-26T07:56:33Z" }, { "checksumSHA1": "6Pf6caC8LTNCI7IflFmglKYnxYo=", "path": "github.com/dexon-foundation/dexon-consensus/core/crypto/ecdsa", - "revision": "dce509a13ef5873b9cae3c1cabdb97e219b6fb7d", - "revisionTime": "2018-12-24T02:29:31Z" + "revision": "ca82c3c26ac206c2c81dfa636902aacf327381b1", + "revisionTime": "2018-12-26T07:56:33Z" }, { "checksumSHA1": "PJXR1OuWwVVYrdJMK3skPr1/8ls=", "path": "github.com/dexon-foundation/dexon-consensus/core/db", - "revision": "dce509a13ef5873b9cae3c1cabdb97e219b6fb7d", - "revisionTime": "2018-12-24T02:29:31Z" + "revision": "ca82c3c26ac206c2c81dfa636902aacf327381b1", + "revisionTime": "2018-12-26T07:56:33Z" }, { - "checksumSHA1": "ZQnoWpMJTicqu9UMKi+CPd5r3so=", + "checksumSHA1": "TpjFUERtEjqc13MbKMPlUZIpCwQ=", "path": "github.com/dexon-foundation/dexon-consensus/core/syncer", - "revision": "dce509a13ef5873b9cae3c1cabdb97e219b6fb7d", - "revisionTime": "2018-12-24T02:29:31Z" + "revision": "ca82c3c26ac206c2c81dfa636902aacf327381b1", + "revisionTime": "2018-12-26T07:56:33Z" }, { - "checksumSHA1": "Z079qQV+aQV9A3kSJ0LbFjx5VO4=", + "checksumSHA1": "tY+yi5kYk1u/scq+6e1KzhPv4kU=", "path": "github.com/dexon-foundation/dexon-consensus/core/types", - "revision": "dce509a13ef5873b9cae3c1cabdb97e219b6fb7d", - "revisionTime": "2018-12-24T02:29:31Z" + "revision": "ca82c3c26ac206c2c81dfa636902aacf327381b1", + "revisionTime": "2018-12-26T07:56:33Z" }, { "checksumSHA1": "sY+2eiOoWvsNMvuPl9qQ+rlT9sA=", "path": "github.com/dexon-foundation/dexon-consensus/core/types/dkg", - "revision": "dce509a13ef5873b9cae3c1cabdb97e219b6fb7d", - "revisionTime": "2018-12-24T02:29:31Z" + "revision": "ca82c3c26ac206c2c81dfa636902aacf327381b1", + "revisionTime": "2018-12-26T07:56:33Z" }, { "checksumSHA1": "0JFlVFny0IyANnlelQDl8ot16wU=", "path": "github.com/dexon-foundation/dexon-consensus/core/utils", - "revision": "dce509a13ef5873b9cae3c1cabdb97e219b6fb7d", - "revisionTime": "2018-12-24T02:29:31Z" + "revision": "ca82c3c26ac206c2c81dfa636902aacf327381b1", + "revisionTime": "2018-12-26T07:56:33Z" }, { "checksumSHA1": "TAkwduKZqLyimyTPPWIllZWYFuE=", |